iT邦幫忙

0

Go Singleflight 實作全攻略:優化 API 消耗、並發控制與監控實務

go
  • 分享至 

  • xImage
  •  

完整內容請至幹話王 Blog

Go Singleflight 實作全攻略:優化 API 消耗、並發控制與監控實務

在開發高併發應用程式(如股票分析機器人)時,我們常面臨 「驚群效應」(Thundering Herd):當快取失效或系統剛啟動時,大量請求同時湧入,導致昂貴的 API(如 Gemini)成本爆炸或資料庫崩潰。

singleflight 是 Go 官方擴充套件(golang.org/x/sync/singleflight)中的神兵利器,確保當多個請求同時要求同一個結果時,實際的運算只會執行一次


1. 為什麼需要 Singleflight?

想像 1,000 個用戶同時查詢「台積電 (2330)」的分析報告:

  • 無控制:1,000 次 Gemini API 呼叫(帳單飆升)、1,000 次資料庫連線(系統卡死)。
  • 有 Singleflight1 次 API 呼叫,其餘 999 人在記憶體中等待結果,隨後共享同一份數據。

2. 深入核心:Singleflight 原始碼拆解

理解 singleflight 的底層源碼結構,能幫助我們更精準地掌握並發控制。

核心結構定義

  1. Group: 代表一個命名空間,管理所有正在進行中的任務。
  • m map[string]*call: 記錄目前有哪些 Key 正在「飛行中」(In-flight)。
  • mu sync.Mutex: 保護 Map,防止高併發下的 Race Condition。
  1. call: 代表一個正在執行的具體任務。
  • wg sync.WaitGroup: 最關鍵的同步元件。用來讓「後到」的請求等待「先到」的請求完成。
  • val / err: 存放執行後的結果與錯誤。

運作流程圖解:Do(key, fn)

sequenceDiagram
    autonumber
    participant G as Group (櫃台)
    participant C as Call (任務)
    participant FN as 執行函數 (fn)

    Note over G: 1. 鎖定 Map 並檢查 Key
    alt 情境 A:已經有人在做了 (Duplicate Call)
        G->>C: 發現 Key 存在
        G->>C: c.dups++ (記錄重複)
        G->>G: Unlock Map
        G->>C: c.wg.Wait() (原地卡住等待)
        C-->>G: 喚醒並回傳結果
    else 情境 B:我是第一個做的 (Original Call)
        G->>C: 建立 new(call)
        G->>C: c.wg.Add(1) (任務開始標記)
        G->>G: 將 call 掛入 Map
        G->>G: Unlock Map
        G->>FN: g.doCall(c, key, fn)
        FN-->>C: 填入結果與錯誤
        Note over G: 2. 清理與廣播
        G->>G: 鎖定 Map 並 delete(key)
        G->>C: c.wg.Done() (大喊:好囉!)
    end


3. 核心實作範例:L1 快取 + Singleflight

我們在 AnalysisService 中構建兩層防護:記憶體快取 (L1)Singleflight

package analyzer

import (
	"context"
	"sync"

	"golang.org/x/sync/singleflight" 
	"github.com/nathan/stock_bot/internal/storage"
)

type AnalysisService struct {
	genai      *GenAIClient
	d1Client   *storage.D1Client
	stockCache map[string]*StockAnalysisResult
	mu         sync.RWMutex
	sf         singleflight.Group
}

func (s *AnalysisService) analyzeStock(ctx context.Context, code, name string) (*StockAnalysisResult, error) {
	// 1. 第一層防護:檢查記憶體快取 (L1 Cache)
	s.mu.RLock()
	if result, ok := s.stockCache[code]; ok {
		s.mu.RUnlock()
		return result, nil
	}
	s.mu.RUnlock()

	// 2. 第二層防護:Singleflight (請求合併)
	key := "stock:" + code
	v, err, _ := s.sf.Do(key, func() (interface{}, error) {
		// 3. 執行昂貴的邏輯 (DB + Gemini API)
		result, err := s.doAnalyzeStock(ctx, code, name) 
		if err != nil {
			return nil, err
		}

		// 4. 寫入快取 (務必在 singleflight 內部完成,防止下一波瞬間擊穿)
		s.mu.Lock()
		s.stockCache[code] = result
		s.mu.Unlock()

		return result, nil
	})

	if err != nil {
		return nil, err
	}
	return v.(*StockAnalysisResult), nil
}


4. 進階實戰:處理「巢狀 Context」與非同步操作

doAnalyzeStock 內部,如果我們有其他的非同步操作(例如同時查多個 API),我們必須正確傳遞並檢查 ctx.Err()。這樣做是為了確保:如果外部請求(領頭者)超時了,後續的耗時運算能立即停止,釋放資源。

func (s *AnalysisService) doAnalyzeStock(ctx context.Context, code, name string) (*StockAnalysisResult, error) {
    // 建立一個子 Context 用於內部的多個非同步任務
    g, ctx := errgroup.WithContext(ctx)
    
    var dbData string
    var aiResult string

    // 任務 1:查資料庫
    g.Go(func() error {
        // 隨時檢查 Context 是否已取消
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            // 模擬資料庫查詢
            dbData = "Historical Data"
            return nil
        }
    })

    // 任務 2:呼叫 Gemini API
    g.Go(func() error {
        // 將 ctx 傳入 API 客戶端,讓它能跟隨整體的超時控制
        res, err := s.genai.Generate(ctx, "Analyze this: "+code)
        if err != nil {
			return err
		}
        aiResult = res
        return nil
    })

    // 等待所有任務完成或其中一個出錯
    if err := g.Wait(); err != nil {
        return nil, err
    }

    return &StockAnalysisResult{Data: dbData, Analysis: aiResult}, nil
}


5. 性能數據對比 (100 個併發請求)

指標 無 Singleflight 有 Singleflight 優化率
Gemini API 呼叫次數 100 次 1 次 99%
資料庫讀取壓力 高 (100 併發) 極低 (1 併發) 99%
平均回應時間 ~2,500ms ~2,100ms ~16%

6. 監控與量化:加入 OpenTelemetry 追蹤

為了知道 Singleflight 幫我們省了多少錢,我們可以追蹤 shared 這個回傳值。sharedtrue 表示該請求是「搭便車」成功的。

func (s *AnalysisService) analyzeStockWithMetrics(ctx context.Context, code string) (*StockAnalysisResult, error) {
	key := "stock:" + code
	v, err, shared := s.sf.Do(key, func() (interface{}, error) {
		return s.doAnalyzeStock(ctx, code, "Name")
	})

	// 紀錄監控指標:分辨是「原始呼叫」還是「共享結果」
	status := "original"
	if shared {
		status = "shared"
	}
	
	s.sfCounter.Add(ctx, 1, metric.WithAttributes(
		attribute.String("stock_code", code),
		attribute.String("type", status),
	))

	if err != nil {
		return nil, err
	}
	return v.(*StockAnalysisResult), nil
}

儀表板觀測指標 (Prometheus / Grafana)
透過這組指標,你可以在 Grafana 畫出以下圖表:

  1. Request Stacked Area Chart:
  • type="original": 實際產生的 API 呼叫次數(你的成本預算)。

  • type="shared": 被合併的請求次數(你省下的錢)。

  1. Saving Ratio (節省率):
  • 公式:sum(rate(shared)) / sum(rate(total))。

7. 總結

singleflight 的機制就像是:

  1. 進門先看櫃台 (Map):有沒有掛牌子 (Key)?
  2. 有牌子:代表有人處理了,我就站在旁邊等 (Wait),等他處理好直接拿結果。
  3. 沒牌子:我掛上牌子 (Add Map),開始處理 (Do Fn)。

開發者筆記:

  • 快取寫入:務必在 sf.Do 的 func 內部寫入快取。
  • Context 意識:傳遞並檢查 ctx.Err() 是確保系統在高壓下不會資源洩漏的關鍵。
  • 可觀測性:沒有監控,優化就只是傳說。加入 OTel 讓數據說話。

「多人請求,一人做事,結果共享」 —— 這不僅是效能優化,更是對昂貴資源與系統穩定性的極致追求。


圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言