在某些情況下,你可能會發現自己想要使用一系列通道的值:
<-chan <-chan interface{}
這與將某個通道的數據切片合併到一個通道中稍有不同,這種調用方式意味著一系列通道有序的寫入操作。這與管道的單個“階段”類似,其生命周期是間歇性的。按“Confinement”章節所提到的,通道由寫入它們的goroutine所擁有,每當在新的goroutine中啟動一個管道的“階段”時,就會創建一個新的通 道——這意味著我們會得到一個通道隊列。我們會在第五章“Healing Unhealthy Goroutine”中詳細討論。
作為消費者,代碼可能不關心其值來自於一系列通道的事實。在這種情況下,處理一系列通道中的單個通道可能很麻煩。如果我們定義一個函數,可以將一系列通道拆解為一個簡單的通道——我們成為通道橋接 (bridge-channle),這使得消費者更容易關注手頭的問題:
package main
import "fmt"
func main() {
// orDone 是一個輔助函數,確保當 done 頻道被關閉時我們會退出
orDone := func(done, c <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if ok == false {
return
}
select {
case valStream <- v:
case <-done:
}
}
}
}()
return valStream
}
// bridge 會接受一個頻道的頻道,並確保值被轉發
bridge := func(
done <-chan interface{},
chanStream <-chan <-chan interface{},
) <-chan interface{} {
valStream := make(chan interface{}) // <1> 這個通道會返回所有传入 bridge 的通道。
go func() {
defer close(valStream)
for { // <2> 這個迴圈負責從 chanStream 中提取通道,並提供給嵌套迴圈以供使用。
var stream <-chan interface{}
select {
case maybeStream, ok := <-chanStream:
if ok == false {
return
}
stream = maybeStream
case <-done:
return
}
for val := range orDone(done, stream) { // <3> 這個迴圈負責讀取給定的通道的值,並將這些值重複到 valStream 中。當正在循環的流被關閉時,我們跳出此迴圈,並繼續選擇要讀取的通道的下一次迴圈。
select {
case valStream <- val:
case <-done:
}
}
}
}()
return valStream
}
// genVals 生成一系列每個只有單一值的通道
genVals := func() <-chan <-chan interface{} {
chanStream := make(chan (<-chan interface{}))
go func() {
defer close(chanStream)
for i := 0; i < 10; i++ {
stream := make(chan interface{}, 1)
stream <- i
close(stream)
chanStream <- stream
}
}()
return chanStream
}
// 從 bridge 函數讀取值並打印它們
for v := range bridge(nil, genVals()) {
fmt.Printf("%v ", v)
}
}
通過使用bridge,我們可以專注於解構之外的邏輯,而無需去關心大量的通道處理問題。