2023/04/05 更新: 為了避免本文章散落在不同網站,之後統一由部落格更新,再麻煩從部落格查看~
將 input 由一個 producer 分發多個 goroutine 運行,再將多個 task goroutine 運行的結果由一個 consumer 收集資料合併為 output
如果程式的有著複雜的計算或者多個 IO 運行,可以將這些運行分發給 task goroutine 執行,使 task 執行更快速,在統一收集繼續下個流程。其中分發與收集的行為又被稱為 Fan Out、Fan In:
設計一個新聞資訊網頁系統,需要從 A、B、C server 拿取資料,這些資料都沒有順序性,純粹是要都顯示在網頁上而已,所以如果 A 資料拿完再拿 B,這樣就太浪費時間了。可以同時拿取 A、B、C server 的資料加快取資料的速度。
實作有問題的系統如下, A、B、C server 透過GetServerData()
拿取資料,再透過ShowNews
顯示新聞資料:
package main
import (
"fmt"
"math/rand"
"time"
)
func GetServerData(serverName string) string {
time.Sleep(time.Duration(rand.Intn(3)) * time.Second) //模擬取得server data消耗的時間
return fmt.Sprintf("%s server data", serverName)
}
func ShowNews(news ...interface{}) {
fmt.Println(news...)
}
func main() {
start := time.Now()
responseByServerA := GetServerData("A")
responseByServerB := GetServerData("B")
responseByServerC := GetServerData("C")
ShowNews(responseByServerA, responseByServerB, responseByServerC)
fmt.Printf("cost %s", time.Since(start))
}
會發現因為拿取資料無法並行,所以耗時較久
實作Producer()
、Task()
、Consumer()
來分別分發任務、執行任務、收集資料,
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func Producer(serverNames ...string) <-chan string {
producerCh := make(chan string, len(serverNames))
go func() {
defer close(producerCh)
for _, serverName := range serverNames {
producerCh <- serverName
}
}()
return producerCh
}
func Task(producerCh <-chan string) <-chan string {
taskCh := make(chan string)
go func() {
defer close(taskCh)
for serverName := range producerCh {
taskCh <- GetServerData(serverName)
}
}()
return taskCh
}
func Consumer(taskChs ...<-chan string) <-chan string {
consumerCh := make(chan string)
var wg sync.WaitGroup
wg.Add(len(taskChs))
go func() {
wg.Wait()
close(consumerCh)
}()
for _, task := range taskChs {
go func(task <-chan string) {
defer wg.Done()
for new := range task {
consumerCh <- new
}
}(task)
}
return consumerCh
}
func GetServerData(serverName string) string {
time.Sleep(time.Duration(rand.Intn(3)) * time.Second) //模擬取得server data消耗的時間
return fmt.Sprintf("%s server data", serverName)
}
func ShowNews(news ...interface{}) {
fmt.Println(news...)
}
func main() {
start := time.Now()
producerCh := Producer("A", "B", "C")
task1 := Task(producerCh)
task2 := Task(producerCh)
task3 := Task(producerCh)
consumerCh := Consumer(task1, task2, task3)
for new := range consumerCh {
ShowNews(new)
}
fmt.Printf("cost %s", time.Since(start))
}
程式碼較長,重點如下:
Producer()
後,Producer()
會創建一個 Channel 來分發任務,所以需再將此 Channel 傳給Task()
使其 goroutine 獲得任務Task()
獲得任務開始執行後,也會產生各自的 Channel 用來傳遞 server 的資料,所以需再將此 Channel 送至Consumer()
Consumer()
獲得所有Task()
的 Channel 後,會在啟動相對數量的 gorotine 合併資料至consumerCh{}
,為了要確保資料取得完畢後關閉consumerCh{}
,需透過sync.WaitGroup{}
來取得close(consumerCh)
的時機,時機的邏輯如下:
wg.Add()
會加入需等待的數目,這邊輸入 goroutine 的數量wg.Done()
會減去需等待的數目wg.Wait()
會使程式等待,等待至sync.WaitGroup{}
等待數目被減去至 0 時才會繼續執行wg.Done()
都安排在讀取完Task()
Channel 後,就可以確保讀完資料再close(consumerCh)
for new := range consumerCh
會讀取 Channel 資料,直到close(consumerCh)
後跳脫 for 迴圈執行後由於取的資料可以同時執行因此加快了執行速度: