Go 的一大賣點就是 goroutine,開發者可以開成千上萬個 goroutine 來並行處理。但如果不設限,所有請求同時打爆下游,就會出現:
所以今天要做的:有限併發的 worker pool。
讓我們能夠「同時跑多個工作,但總數不超過 N」,進而控制壓力。
worker.go
.
├── go.mod
├── main.go
├── ...
└── worker.go # 新增
// worker.go
package main
import (
"context"
"sync"
)
// WorkItem 定義一個要處理的任務
type WorkItem struct {
ID int
Task func(context.Context) error
}
// RunWorkerPool 執行一個有限併發的 worker pool
// - ctx:用於取消整個 pool
// - maxWorkers:同時最多跑多少 goroutine
// - items:要處理的任務 slice
func RunWorkerPool(ctx context.Context, maxWorkers int, items []WorkItem) []error {
var wg sync.WaitGroup
errs := make([]error, len(items))
// channel 傳輸任務
workCh := make(chan WorkItem)
// 啟動 worker
for w := 0; w < maxWorkers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for item := range workCh {
select {
case <-ctx.Done():
// 整個 pool 被取消
return
default:
errs[item.ID] = item.Task(ctx)
}
}
}()
}
// 丟任務
go func() {
defer close(workCh)
for _, item := range items {
select {
case <-ctx.Done():
return
case workCh <- item:
}
}
}()
wg.Wait()
return errs
}
maxWorkers
個 goroutine。errs[item.ID]
。新增 worker_test.go
:
// worker_test.go
package main
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
)
func TestRunWorkerPool_AllSuccess(t *testing.T) {
ctx := context.Background()
n := 5
items := make([]WorkItem, n)
for i := 0; i < n; i++ {
id := i
items[i] = WorkItem{
ID: id,
Task: func(ctx context.Context) error {
return nil
},
}
}
errs := RunWorkerPool(ctx, 2, items)
for _, err := range errs {
if err != nil {
t.Fatalf("expected nil error, got %v", err)
}
}
}
func TestRunWorkerPool_WithError(t *testing.T) {
ctx := context.Background()
items := []WorkItem{
{ID: 0, Task: func(ctx context.Context) error { return errors.New("boom") }},
{ID: 1, Task: func(ctx context.Context) error { return nil }},
}
errs := RunWorkerPool(ctx, 1, items)
if errs[0] == nil || errs[1] != nil {
t.Fatalf("unexpected errs: %#v", errs)
}
}
func TestRunWorkerPool_CancelEarly(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var counter int32
items := []WorkItem{
{ID: 0, Task: func(ctx context.Context) error {
atomic.AddInt32(&counter, 1)
time.Sleep(50 * time.Millisecond)
return nil
}},
{ID: 1, Task: func(ctx context.Context) error {
atomic.AddInt32(&counter, 1)
return nil
}},
}
// 立刻取消
cancel()
errs := RunWorkerPool(ctx, 2, items)
if counter > 1 {
t.Fatalf("expected at most 1 task executed, got %d", counter)
}
if errs[0] != nil && !errors.Is(errs[0], context.Canceled) {
// 部分可能是 context.Canceled
t.Logf("errs: %#v", errs)
}
}
注意:這裡的 worker pool 主要是「知識點」;真實的 /search API 一次只處理一個請求,沒必要啟動 pool。
真正的應用場景是 在一個 API 裡要對多個下游發並行請求。
範例:假設 /search
要同時去「ES + Redis + 本地 Cache」查,並把結果合併:
// pseudo-code
items := []WorkItem{
{ID: 0, Task: func(ctx context.Context) error { return callES(ctx, q) }},
{ID: 1, Task: func(ctx context.Context) error { return callRedis(ctx, q) }},
{ID: 2, Task: func(ctx context.Context) error { return callCache(ctx, q) }},
}
errs := RunWorkerPool(ctx, 2, items) // 最多同時 2 個下游
可以用 bench
大概感受一下,在 bench_search_test.go
最下面加入:
func BenchmarkRunWorkerPool(b *testing.B) {
ctx := context.Background()
items := make([]WorkItem, 100)
for i := range items {
items[i] = WorkItem{
ID: i,
Task: func(ctx context.Context) error {
time.Sleep(1 * time.Millisecond)
return nil
},
}
}
for i := 0; i < b.N; i++ {
RunWorkerPool(ctx, 10, items) // 最多 10 個並行
}
}
執行:
go test -bench=BenchmarkRunWorkerPool -benchmem ./...
# 加入 -run=^$ 避免跑一般測試
go test -run=^$ -bench=BenchmarkRunWorkerPool -benchmem ./...
接下來文章會在另一台電腦實作,CPU 換成 Apple M1,重跑所有 benchmark,作為跨機器的效能參考:
go test -run=^$ -bench=. -benchmem -count=3 ./...
goos: darwin
goarch: arm64
pkg: github.com/arealclimber/cloud-native-search
cpu: Apple M1
BenchmarkBuildHitsAppend-8 40355931 39.20 ns/op 80 B/op 1 allocs/op
BenchmarkBuildHitsAppend-8 36648588 39.58 ns/op 80 B/op 1 allocs/op
BenchmarkBuildHitsAppend-8 28023060 43.21 ns/op 80 B/op 1 allocs/op
BenchmarkBuildHitsPrealloc-8 45081920 47.35 ns/op 80 B/op 1 allocs/op
BenchmarkBuildHitsPrealloc-8 47359362 26.83 ns/op 80 B/op 1 allocs/op
BenchmarkBuildHitsPrealloc-8 49861308 25.18 ns/op 80 B/op 1 allocs/op
BenchmarkJSONMarshal-8 2702668 417.1 ns/op 192 B/op 2 allocs/op
BenchmarkJSONMarshal-8 3682399 477.4 ns/op 192 B/op 2 allocs/op
BenchmarkJSONMarshal-8 3664228 434.4 ns/op 192 B/op 2 allocs/op
BenchmarkJSONEncoder_ReusedBuffer-8 4111888 292.2 ns/op 48 B/op 1 allocs/op
BenchmarkJSONEncoder_ReusedBuffer-8 4102227 294.3 ns/op 48 B/op 1 allocs/op
BenchmarkJSONEncoder_ReusedBuffer-8 4024669 915.4 ns/op 48 B/op 1 allocs/op
BenchmarkHandlerPipeline-8 323113 31023 ns/op 1409 B/op 18 allocs/op
BenchmarkHandlerPipeline-8 591518 3010 ns/op 1409 B/op 18 allocs/op
BenchmarkHandlerPipeline-8 173742 6451 ns/op 1409 B/op 18 allocs/op
BenchmarkRunWorkerPool-8 100 12540262 ns/op 4160 B/op 25 allocs/op
BenchmarkRunWorkerPool-8 99 22706217 ns/op 3856 B/op 25 allocs/op
BenchmarkRunWorkerPool-8 90 18277304 ns/op 3798 B/op 24 allocs/op
PASS
ok github.com/arealclimber/cloud-native-search 44.211s
今天完成:
👉 明天會進入 pprof,從真實的效能剖析工具找出「哪裡慢、哪裡值得優化」,而不只是靠直覺。