在開發高併發應用程式(如股票分析機器人)時,我們常面臨 「驚群效應」(Thundering Herd):當快取失效或系統剛啟動時,大量請求同時湧入,導致昂貴的 API(如 Gemini)成本爆炸或資料庫崩潰。
singleflight 是 Go 官方擴充套件(golang.org/x/sync/singleflight)中的神兵利器,確保當多個請求同時要求同一個結果時,實際的運算只會執行一次。
想像 1,000 個用戶同時查詢「台積電 (2330)」的分析報告:
理解 singleflight 的底層源碼結構,能幫助我們更精準地掌握並發控制。
Group: 代表一個命名空間,管理所有正在進行中的任務。m map[string]*call: 記錄目前有哪些 Key 正在「飛行中」(In-flight)。mu sync.Mutex: 保護 Map,防止高併發下的 Race Condition。call: 代表一個正在執行的具體任務。wg sync.WaitGroup: 最關鍵的同步元件。用來讓「後到」的請求等待「先到」的請求完成。val / err: 存放執行後的結果與錯誤。sequenceDiagram
autonumber
participant G as Group (櫃台)
participant C as Call (任務)
participant FN as 執行函數 (fn)
Note over G: 1. 鎖定 Map 並檢查 Key
alt 情境 A:已經有人在做了 (Duplicate Call)
G->>C: 發現 Key 存在
G->>C: c.dups++ (記錄重複)
G->>G: Unlock Map
G->>C: c.wg.Wait() (原地卡住等待)
C-->>G: 喚醒並回傳結果
else 情境 B:我是第一個做的 (Original Call)
G->>C: 建立 new(call)
G->>C: c.wg.Add(1) (任務開始標記)
G->>G: 將 call 掛入 Map
G->>G: Unlock Map
G->>FN: g.doCall(c, key, fn)
FN-->>C: 填入結果與錯誤
Note over G: 2. 清理與廣播
G->>G: 鎖定 Map 並 delete(key)
G->>C: c.wg.Done() (大喊:好囉!)
end
我們在 AnalysisService 中構建兩層防護:記憶體快取 (L1) 與 Singleflight。
package analyzer
import (
"context"
"sync"
"golang.org/x/sync/singleflight"
"github.com/nathan/stock_bot/internal/storage"
)
type AnalysisService struct {
genai *GenAIClient
d1Client *storage.D1Client
stockCache map[string]*StockAnalysisResult
mu sync.RWMutex
sf singleflight.Group
}
func (s *AnalysisService) analyzeStock(ctx context.Context, code, name string) (*StockAnalysisResult, error) {
// 1. 第一層防護:檢查記憶體快取 (L1 Cache)
s.mu.RLock()
if result, ok := s.stockCache[code]; ok {
s.mu.RUnlock()
return result, nil
}
s.mu.RUnlock()
// 2. 第二層防護:Singleflight (請求合併)
key := "stock:" + code
v, err, _ := s.sf.Do(key, func() (interface{}, error) {
// 3. 執行昂貴的邏輯 (DB + Gemini API)
result, err := s.doAnalyzeStock(ctx, code, name)
if err != nil {
return nil, err
}
// 4. 寫入快取 (務必在 singleflight 內部完成,防止下一波瞬間擊穿)
s.mu.Lock()
s.stockCache[code] = result
s.mu.Unlock()
return result, nil
})
if err != nil {
return nil, err
}
return v.(*StockAnalysisResult), nil
}
在 doAnalyzeStock 內部,如果我們有其他的非同步操作(例如同時查多個 API),我們必須正確傳遞並檢查 ctx.Err()。這樣做是為了確保:如果外部請求(領頭者)超時了,後續的耗時運算能立即停止,釋放資源。
func (s *AnalysisService) doAnalyzeStock(ctx context.Context, code, name string) (*StockAnalysisResult, error) {
// 建立一個子 Context 用於內部的多個非同步任務
g, ctx := errgroup.WithContext(ctx)
var dbData string
var aiResult string
// 任務 1:查資料庫
g.Go(func() error {
// 隨時檢查 Context 是否已取消
select {
case <-ctx.Done():
return ctx.Err()
default:
// 模擬資料庫查詢
dbData = "Historical Data"
return nil
}
})
// 任務 2:呼叫 Gemini API
g.Go(func() error {
// 將 ctx 傳入 API 客戶端,讓它能跟隨整體的超時控制
res, err := s.genai.Generate(ctx, "Analyze this: "+code)
if err != nil {
return err
}
aiResult = res
return nil
})
// 等待所有任務完成或其中一個出錯
if err := g.Wait(); err != nil {
return nil, err
}
return &StockAnalysisResult{Data: dbData, Analysis: aiResult}, nil
}
| 指標 | 無 Singleflight | 有 Singleflight | 優化率 |
|---|---|---|---|
| Gemini API 呼叫次數 | 100 次 | 1 次 | 99% |
| 資料庫讀取壓力 | 高 (100 併發) | 極低 (1 併發) | 99% |
| 平均回應時間 | ~2,500ms | ~2,100ms | ~16% |
為了知道 Singleflight 幫我們省了多少錢,我們可以追蹤 shared 這個回傳值。shared 為 true 表示該請求是「搭便車」成功的。
func (s *AnalysisService) analyzeStockWithMetrics(ctx context.Context, code string) (*StockAnalysisResult, error) {
key := "stock:" + code
v, err, shared := s.sf.Do(key, func() (interface{}, error) {
return s.doAnalyzeStock(ctx, code, "Name")
})
// 紀錄監控指標:分辨是「原始呼叫」還是「共享結果」
status := "original"
if shared {
status = "shared"
}
s.sfCounter.Add(ctx, 1, metric.WithAttributes(
attribute.String("stock_code", code),
attribute.String("type", status),
))
if err != nil {
return nil, err
}
return v.(*StockAnalysisResult), nil
}
儀表板觀測指標 (Prometheus / Grafana)
透過這組指標,你可以在 Grafana 畫出以下圖表:
type="original": 實際產生的 API 呼叫次數(你的成本預算)。
type="shared": 被合併的請求次數(你省下的錢)。
singleflight 的機制就像是:
開發者筆記:
sf.Do 的 func 內部寫入快取。ctx.Err() 是確保系統在高壓下不會資源洩漏的關鍵。「多人請求,一人做事,結果共享」 —— 這不僅是效能優化,更是對昂貴資源與系統穩定性的極致追求。