建立管道後數據在你的系統中歡暢地流動,並在連接在一起的各個階段發生變化。
有時候,管道中的各個階段可能在計算上特別耗費資源。當發生這種情況時,管道中的上遊階段可能會在 等待完成時被阻塞。不僅如此,管道本身可能需要很長時間才能整體執行。
我們如何解決這個問題?
管道的一個有趣屬性是它的各個階段相互獨立,方便組合。你可以多次重覆使用管道的各個階段。因此, 在多個goroutine上重用管道的單個階段實現並行化,將有助於提高管道的性能。
事實上,這種模式被稱為扇入扇出。
扇出(Fan-out)是一個術語,用於描述啟動多個goroutines以處理來自管道的輸入的過程,並且扇入 (fan-in)是描述將多個結果組合到一個通道中的過程的術語。
那麽在什麽情況下適用於這種模式呢?如果出現以下兩種情況,你就可以考慮這麽幹了:
1.計算不依賴於前一個階段的計算結果。
2.運行需要很長時間。
運行的獨立性是非常重要的,因為你無法保證各階段的併發程序以何種順序運行,也無法保證其返回的順
序。
我們來看一個例子。在下面的例子中,構建了一個尋找質數的方法。我們將使用在“管道”中的經驗,創
建各個階段,並將它們拼接在一起:
func main() {
repeatFn := func(
done <-chan interface{},
fn func() interface{},
) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
select {
case <-done:
return
case valueStream <- fn():
}
}
}()
return valueStream
}
take := func(
done <-chan interface{},
valueStream <-chan interface{},
num int,
) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
toInt := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for v := range valueStream {
select {
case <-done:
return
case intStream <- v.(int):
}
}
}()
return intStream
}
primeFinder := func(done <-chan interface{}, intStream <-chan int) <-chan interface{} {
primeStream := make(chan interface{})
go func() {
defer close(primeStream)
for integer := range intStream {
integer -= 1
prime := true
for divisor := integer - 1; divisor > 1; divisor-- {
if integer%divisor == 0 {
prime = false
break
}
}
if prime {
select {
case <-done:
return
case primeStream <- integer:
}
}
}
}()
return primeStream
}
rand := func() interface{} { return rand.Intn(50000000) }
done := make(chan interface{})
defer close(done)
start := time.Now()
randIntStream := toInt(done, repeatFn(done, rand))
fmt.Println("Primes:")
for prime := range take(done, primeFinder(done, randIntStream), 10) {
fmt.Printf("\t%d\n", prime)
}
fmt.Printf("Search took: %v", time.Since(start))
}
會輸出
Primes:
24941317
36122539
6410693
10128161
25511527
2107939
14004383
7190363
45931967
2393161
Search took: 23.437511647s
我們生成一串隨機數,最大值為50000000,將數據流轉換為整數流,然後將其傳入primeFinder。primeFinder會嘗試將輸入流提供的數字除以比它小的每個數字。如果不成功,會將該值傳遞到下一個階段。
當然,這個方法很低效,但它符合我們程序運行時間較長的要求。
在我們的for循環中,搜索找到的質數,在進入時將它們打印出來,並且take在找到10個質數後關閉管道。然後,我們打印出搜索需要多長時間,完成的通道被延遲聲明關閉,管道停止。
通常遇到這種情況,我們首先看一下演算法本身,也許是拿一本算法書籍,然後看看我們是否能在哪個階段改進。但是,由於目的是通過扇出(fan out)來解決該問題,所以我們暫時先不去管演算法。
我們的程序現在有兩個階段:生成隨機數和篩選質數。在更大的程序中,你的管道可能由更多的階段組成,那我們該對什麽樣的階段使用扇出模式進行改進?
請記住我們之前提出的標準:執行順序的獨立性和執行時間。
我們的隨機數生成器肯定是與順序無關的,但運行起來並不需要很長的時間。
PrimeFinder也是順序無關的,因為我們用的演算法效率非常低下,它需要很長時間才能運行完成。因此,我們可以 把關注點放在PrimeFinder身上。
為此,我們可以將其操作拆散,就像這樣:
numFinders := runtime.NumCPU()
finders := make([]<-chan int, numFinders)
for i := 0; i < numFinders; i++ {
finders[i] = primeFinder(done, randIntStream)
}
在本書的範例上,runtime.NumCPU()返回8,在生產中,我們可能會做一些經驗性的測試來確定CPU的最佳數量,但在這里我們將保持簡單,並且假設只有一個findPrimes階段的CPU會被占用。
這就好像一個班級的作業,原本由1位老師批改,現在變成了8位老師同時批改。
接下來我們遇到的問題是,如何將結果匯總到一起。為此,我們開始考慮使用扇入(fan-in)。 正如我們前面所提到的,扇入意味著將多個數據流覆用或合並成一個流。
fanIn := func(
done <-chan interface{},
channels ...<-chan interface{},
) <-chan interface{} { // <1> 一如既往,我們使用done通道來關閉衍生的goroutine,並接收接口類型的通道切片來彙總數據。
var wg sync.WaitGroup // <2> 這裡我們使用sync.WaitGroup以等待全部通道讀取完成。
multiplex := func(c <-chan interface{}) { // <3> 我們在這裡建立函數multiplex,它會讀取傳入的通道,並把該通道的值放入multiplexedStream。
defer wg.Done()
for i := range c {
select {
case <-done:
return
case multiplexedStream <- i:
}
}
}
// Select from all the channels
wg.Add(len(channels)) // <4> 這裡增加等待計數。
for _, c := range channels {
go multiplex(c)
}
// Wait for all the reads to complete
go func() { // <5> 這裡我們建立一個goroutine等待彙總完畢。這樣函數塊可以快速return,不必等待wg.Wait()。這種用法不多見,但在這裡很符合場景需求。
wg.Wait()
close(multiplexedStream)
}()
return multiplexedStream
}
簡而言之,扇入涉及讀取多路覆用通道,然後為每個傳入通道啟動一個goroutine,以及在傳入通道全部關閉時關閉覆用通道。由於我們要創建一個等待N個其他goroutine完成的goroutine,因此創建sync.WaitGroup來協調處理是有意義的。
multiplex還通知WaitGroup它已執行完成。
額外提醒,在對返回結果的順序有要求的情況下扇入扇出可能工作的不是很好。我們沒有做任何事情來保 證從randIntStream中讀取數據的順序。
func main() {
repeatFn := func(
done <-chan interface{},
fn func() interface{},
) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
select {
case <-done:
return
case valueStream <- fn():
}
}
}()
return valueStream
}
take := func(
done <-chan interface{},
valueStream <-chan interface{},
num int,
) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
toInt := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for v := range valueStream {
select {
case <-done:
return
case intStream <- v.(int):
}
}
}()
return intStream
}
primeFinder := func(done <-chan interface{}, intStream <-chan int) <-chan interface{} {
primeStream := make(chan interface{})
go func() {
defer close(primeStream)
for integer := range intStream {
integer -= 1
prime := true
for divisor := integer - 1; divisor > 1; divisor-- {
if integer%divisor == 0 {
prime = false
break
}
}
if prime {
select {
case <-done:
return
case primeStream <- integer:
}
}
}
}()
return primeStream
}
fanIn := func(
done <-chan interface{},
channels ...<-chan interface{},
) <-chan interface{} { // <1>
var wg sync.WaitGroup // <2>
multiplexedStream := make(chan interface{})
multiplex := func(c <-chan interface{}) { // <3>
defer wg.Done()
for i := range c {
select {
case <-done:
return
case multiplexedStream <- i:
}
}
}
// Select from all the channels
wg.Add(len(channels)) // <4>
for _, c := range channels {
go multiplex(c)
}
// Wait for all the reads to complete
go func() { // <5>
wg.Wait()
close(multiplexedStream)
}()
return multiplexedStream
}
done := make(chan interface{})
defer close(done)
start := time.Now()
rand := func() interface{} { return rand.Intn(50000000) }
randIntStream := toInt(done, repeatFn(done, rand))
numFinders := runtime.NumCPU()
fmt.Printf("Spinning up %d prime finders.\n", numFinders)
finders := make([]<-chan interface{}, numFinders)
fmt.Println("Primes:")
for i := 0; i < numFinders; i++ {
finders[i] = primeFinder(done, randIntStream)
}
for prime := range take(done, fanIn(done, finders...), 10) {
fmt.Printf("\t%d\n", prime)
}
fmt.Printf("Search took: %v", time.Since(start))
}
輸出:
Spinning up 8 prime finders. Primes:
6410693
24941317
10128161
36122539
25511527
2107939
14004383
7190363
2393161
45931967
Search took: 5.438491216s
這是完整的程式碼,在本書的範例上最大降幅23秒,運用扇入扇出可以在不大幅改變程序結構的前提下將運行時間縮短了大約78%。