iT邦幫忙

2025 iThome 鐵人賽

DAY 9
0

Go 的一大賣點就是 goroutine,開發者可以開成千上萬個 goroutine 來並行處理。但如果不設限,所有請求同時打爆下游,就會出現:

  • 下游服務(例如 Elasticsearch)瞬間被淹沒。
  • 本地記憶體/CPU 被 goroutine 爆量佔滿。

所以今天要做的:有限併發的 worker pool。

讓我們能夠「同時跑多個工作,但總數不超過 N」,進而控制壓力。


Step 1:專案結構新增 worker.go

.
├── go.mod
├── main.go
├── ...
└── worker.go   # 新增


Step 2:Worker Pool 實作

// worker.go
package main

import (
	"context"
	"sync"
)

// WorkItem 定義一個要處理的任務
type WorkItem struct {
	ID   int
	Task func(context.Context) error
}

// RunWorkerPool 執行一個有限併發的 worker pool
//   - ctx:用於取消整個 pool
//   - maxWorkers:同時最多跑多少 goroutine
//   - items:要處理的任務 slice
func RunWorkerPool(ctx context.Context, maxWorkers int, items []WorkItem) []error {
	var wg sync.WaitGroup
	errs := make([]error, len(items))

	// channel 傳輸任務
	workCh := make(chan WorkItem)

	// 啟動 worker
	for w := 0; w < maxWorkers; w++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for item := range workCh {
				select {
				case <-ctx.Done():
					// 整個 pool 被取消
					return
				default:
					errs[item.ID] = item.Task(ctx)
				}
			}
		}()
	}

	// 丟任務
	go func() {
		defer close(workCh)
		for _, item := range items {
			select {
			case <-ctx.Done():
				return
			case workCh <- item:
			}
		}
	}()

	wg.Wait()
	return errs
}

設計要點

  • 控制上限:只開 maxWorkers 個 goroutine。
  • ctx 控制:整個 pool 可隨時取消。
  • 錯誤收集:每個工作執行結果存進 errs[item.ID]

Step 3:單元測試

新增 worker_test.go

// worker_test.go
package main

import (
	"context"
	"errors"
	"sync/atomic"
	"testing"
	"time"
)

func TestRunWorkerPool_AllSuccess(t *testing.T) {
	ctx := context.Background()
	n := 5
	items := make([]WorkItem, n)
	for i := 0; i < n; i++ {
		id := i
		items[i] = WorkItem{
			ID: id,
			Task: func(ctx context.Context) error {
				return nil
			},
		}
	}
	errs := RunWorkerPool(ctx, 2, items)
	for _, err := range errs {
		if err != nil {
			t.Fatalf("expected nil error, got %v", err)
		}
	}
}

func TestRunWorkerPool_WithError(t *testing.T) {
	ctx := context.Background()
	items := []WorkItem{
		{ID: 0, Task: func(ctx context.Context) error { return errors.New("boom") }},
		{ID: 1, Task: func(ctx context.Context) error { return nil }},
	}
	errs := RunWorkerPool(ctx, 1, items)
	if errs[0] == nil || errs[1] != nil {
		t.Fatalf("unexpected errs: %#v", errs)
	}
}

func TestRunWorkerPool_CancelEarly(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	var counter int32
	items := []WorkItem{
		{ID: 0, Task: func(ctx context.Context) error {
			atomic.AddInt32(&counter, 1)
			time.Sleep(50 * time.Millisecond)
			return nil
		}},
		{ID: 1, Task: func(ctx context.Context) error {
			atomic.AddInt32(&counter, 1)
			return nil
		}},
	}

	// 立刻取消
	cancel()

	errs := RunWorkerPool(ctx, 2, items)
	if counter > 1 {
		t.Fatalf("expected at most 1 task executed, got %d", counter)
	}
	if errs[0] != nil && !errors.Is(errs[0], context.Canceled) {
		// 部分可能是 context.Canceled
		t.Logf("errs: %#v", errs)
	}
}


Step 4:如何串接到我們的搜尋服務?

注意:這裡的 worker pool 主要是「知識點」;真實的 /search API 一次只處理一個請求,沒必要啟動 pool。

真正的應用場景是 在一個 API 裡要對多個下游發並行請求

範例:假設 /search 要同時去「ES + Redis + 本地 Cache」查,並把結果合併:

// pseudo-code
items := []WorkItem{
    {ID: 0, Task: func(ctx context.Context) error { return callES(ctx, q) }},
    {ID: 1, Task: func(ctx context.Context) error { return callRedis(ctx, q) }},
    {ID: 2, Task: func(ctx context.Context) error { return callCache(ctx, q) }},
}
errs := RunWorkerPool(ctx, 2, items) // 最多同時 2 個下游


Step 5:效能觀察

可以用 bench 大概感受一下,在 bench_search_test.go 最下面加入:

func BenchmarkRunWorkerPool(b *testing.B) {
	ctx := context.Background()
	items := make([]WorkItem, 100)
	for i := range items {
		items[i] = WorkItem{
			ID: i,
			Task: func(ctx context.Context) error {
				time.Sleep(1 * time.Millisecond)
				return nil
			},
		}
	}
	for i := 0; i < b.N; i++ {
		RunWorkerPool(ctx, 10, items) // 最多 10 個並行
	}
}

執行:

go test -bench=BenchmarkRunWorkerPool -benchmem ./...

# 加入 -run=^$ 避免跑一般測試 
go test -run=^$ -bench=BenchmarkRunWorkerPool -benchmem ./...

https://ithelp.ithome.com.tw/upload/images/20250923/20138331vYz3RxmdL7.png

接下來文章會在另一台電腦實作,CPU 換成 Apple M1,重跑所有 benchmark,作為跨機器的效能參考:

go test -run=^$ -bench=. -benchmem -count=3 ./...

goos: darwin
goarch: arm64
pkg: github.com/arealclimber/cloud-native-search
cpu: Apple M1
BenchmarkBuildHitsAppend-8            	40355931	        39.20 ns/op	      80 B/op	       1 allocs/op
BenchmarkBuildHitsAppend-8            	36648588	        39.58 ns/op	      80 B/op	       1 allocs/op
BenchmarkBuildHitsAppend-8            	28023060	        43.21 ns/op	      80 B/op	       1 allocs/op
BenchmarkBuildHitsPrealloc-8          	45081920	        47.35 ns/op	      80 B/op	       1 allocs/op
BenchmarkBuildHitsPrealloc-8          	47359362	        26.83 ns/op	      80 B/op	       1 allocs/op
BenchmarkBuildHitsPrealloc-8          	49861308	        25.18 ns/op	      80 B/op	       1 allocs/op
BenchmarkJSONMarshal-8                	 2702668	       417.1 ns/op	     192 B/op	       2 allocs/op
BenchmarkJSONMarshal-8                	 3682399	       477.4 ns/op	     192 B/op	       2 allocs/op
BenchmarkJSONMarshal-8                	 3664228	       434.4 ns/op	     192 B/op	       2 allocs/op
BenchmarkJSONEncoder_ReusedBuffer-8   	 4111888	       292.2 ns/op	      48 B/op	       1 allocs/op
BenchmarkJSONEncoder_ReusedBuffer-8   	 4102227	       294.3 ns/op	      48 B/op	       1 allocs/op
BenchmarkJSONEncoder_ReusedBuffer-8   	 4024669	       915.4 ns/op	      48 B/op	       1 allocs/op
BenchmarkHandlerPipeline-8            	  323113	     31023 ns/op	    1409 B/op	      18 allocs/op
BenchmarkHandlerPipeline-8            	  591518	      3010 ns/op	    1409 B/op	      18 allocs/op
BenchmarkHandlerPipeline-8            	  173742	      6451 ns/op	    1409 B/op	      18 allocs/op
BenchmarkRunWorkerPool-8              	     100	  12540262 ns/op	    4160 B/op	      25 allocs/op
BenchmarkRunWorkerPool-8              	      99	  22706217 ns/op	    3856 B/op	      25 allocs/op
BenchmarkRunWorkerPool-8              	      90	  18277304 ns/op	    3798 B/op	      24 allocs/op
PASS
ok  	github.com/arealclimber/cloud-native-search	44.211s

小結

今天完成:

  • 實作一個「有限併發的 worker pool」,確保 goroutine 數量可控。
  • 單元測試涵蓋成功、錯誤、被取消三種情境。
  • 認識到 worker pool 真正的價值:當一個 API 要並行呼叫多個下游服務時,能充分利用 CPU 並避免壓爆下游

👉 明天會進入 pprof,從真實的效能剖析工具找出「哪裡慢、哪裡值得優化」,而不只是靠直覺。


上一篇
Day 8 - 微基準:用 go test -bench 建立效能基線
下一篇
Day 10 - pprof:找到 1 個熱點並改善
系列文
用 Golang + Elasticsearch + Kubernetes 打造雲原生搜尋服務10
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言