iT邦幫忙

2025 iThome 鐵人賽

DAY 18
0
Cloud Native

Go 語言搶票煉金術:解鎖千萬級併發下的原子交易奇蹟系列 第 18

Go 語言搶票煉金術 Day 18 - 生產者:將「成功訂單」送入消息佇列

  • 分享至 

  • xImage
  •  

https://ithelp.ithome.com.tw/upload/images/20250915/20124462YARMcXUyfa.png

Day 18 - 生產者:將「成功訂單」送入消息佇列

昨天我們討論了為什麼需要消息佇列——為了讓 API 變得俐落,並且將系統的寫入壓力轉移到後端。

今天,我們來實作生產者 (Producer)。

重新定義原則:生產者的唯一職責

生產者的設計原則只有一個:快速地把訊息丟給佇列

  1. 快速:API 的生命週期內不應該有任何延遲。

    • 生產者不能有複雜的同步邏輯、不能有重試、不能等待任何外部確認。
    • 它的工作就是「發射後不管」(Fire and Forget)。
  2. 單純:它不需要知道這個訊息是什麼,也不需要知道如果發送失敗了該怎麼辦。

    • 它不是資料庫,也不是交易協調器。
    • 它只是一個郵差,郵筒滿了(佇列掛了),它的工作就失敗了。
  3. 誠實:如果發送失敗,它必須立刻、誠實地向上層(API 處理函數)報告失敗。

    • 它不能假裝「我正在努力」,也不能把訊息藏在某個本地檔案裡假裝成功。
    • 失敗就是失敗。

訂單訊息結構

先設計好資料結構。

package model

import "time"

// OrderMessage 是我們要發送到佇列的唯一資料結構。
// 它必須包含完成訂單所需的所有資訊。
type OrderMessage struct {
    OrderID     string    `json:"order_id"`
    UserID      string    `json:"user_id"`
    TicketID    string    `json:"ticket_id"`
    Quantity    int       `json:"quantity"`
    TotalAmount float64   `json:"total_amount"`
    CreatedAt   time.Time `json:"created_at"`
    RequestID   string    `json:"request_id"` // 這是最重要的欄位,沒有它日誌就是無效的了。
}

RequestID 是系統的黃金線索。確保它從 Gin 的 middleware 開始,一路被傳遞到這裡。

Redis Streams 生產者實現:保持簡單

我們只需要一個能和 Redis 溝通的 struct。

package queue

import (
    "context"
    "fmt"
    "github.com/redis/go-redis/v9"
)

// RedisProducer 負責將消息發送到 Redis Streams。
// 注意:它很笨,沒有重試邏輯。這是故意的。
type RedisProducer struct {
    client *redis.Client
}

// NewRedisProducer 建立一個新的生產者。
func NewRedisProducer(client *redis.Client) (*RedisProducer, error) {
    // 假設 client 已經被外部正確初始化和管理。
    // 我們不應該在一個小小的生產者裡還要去管連線池、密碼這些破事。
    if client == nil {
        return nil, fmt.Errorf("redis client cannot be nil")
    }
    return &RedisProducer{client: client}, nil
}

// Send 將序列化後的訊息發送到指定的 stream (topic)。
// 它的邏輯很簡單:執行 XADD 命令,然後返回 Redis 給的 messageID 或一個錯誤。
func (p *RedisProducer) Send(ctx context.Context, topic string, message []byte) (string, error) {
    // Redis Streams 的 Values 是一個 map。我們用一個固定的 key "data" 來儲存我們的 JSON 訊息。
    // 消費者端也必須知道這個 key。
    values := map[string]interface{}{
        "data": message,
    }

    // "*" 讓 Redis 自動生成一個基於時間的、唯一的 ID。
    args := &redis.XAddArgs{
        Stream: topic,
        ID:     "*",
        Values: values,
    }

    // 這就是生產者的全部工作。
    // 如果 Redis 慢或者掛了,這個呼叫會返回錯誤,然後就沒它的事了。
    messageID, err := p.client.XAdd(ctx, args).Result()
    if err != nil {
        return "", fmt.Errorf("failed to send message to redis stream %s: %w", topic, err)
    }

    return messageID, nil
}

// Health 檢查與 Redis 的連線狀態。
// PING 命令就夠了。簡單、有效。
func (p *RedisProducer) Health(ctx context.Context) error {
    return p.client.Ping(ctx).Err()
}

這個生產者非常單純 (stupidly simple),這正是我們想要的。
它沒有內部狀態、沒有背景 goroutine,只是一個 Redis 命令的乾淨封裝。

在 API 服務中正確地使用生產者

現在,我們來優化主要邏輯中的 purchaseTicket 函式。

package main

// ... (省略 main, init, and middleware 的部分,假設它們都已正確設定) ...

// purchaseTicket 處理購票請求。
// 這是系統中最重要的 hot path,它必須快。
func purchaseTicket(c *gin.Context) {
    ctx := c.Request.Context()
    requestID, _ := c.Get("RequestID").(string) // 我們信任 middleware

    var req struct {
        UserID   string `json:"user_id"`
        TicketID string `json:"ticket_id"`
        Quantity int    `json:"quantity"`
    }

    if err := c.ShouldBindJSON(&req); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request payload"})
        return
    }

    // 1. 原子地扣減庫存
    // 這一步是唯一需要鎖定資源的操作,我們用 Lua 腳本保證其原子性。
    stockKey := "ticket:" + req.TicketID + ":stock"
    decrScript := `
        local stock = tonumber(redis.call("GET", KEYS[1]))
        local requested = tonumber(ARGV[1])
        if not stock or stock < requested then
            return -1 -- 庫存不足
        end
        return redis.call("DECRBY", KEYS[1], requested)
    `
    result, err := redisClient.Eval(ctx, decrScript, []string{stockKey}, req.Quantity).Int64()
    if err != nil {
        log.Printf("Redis Eval error [req_id: %s]: %v", requestID, err)
        c.JSON(http.StatusInternalServerError, gin.H{"error": "internal system error"})
        return
    }

    if result < 0 {
        c.JSON(http.StatusConflict, gin.H{"error": "ticket sold out or not enough stock"})
        return
    }

    // 2. 建立訂單訊息
    // 庫存已成功扣減。從現在開始,我們必須成功地將訂單訊息發送出去。
    orderID := uuid.New().String()
    orderMsg := model.OrderMessage{
        OrderID:     orderID,
        UserID:      req.UserID,
        TicketID:    req.TicketID,
        Quantity:    req.Quantity,
        TotalAmount: 100.0 * float64(req.Quantity), // 假設票價固定
        CreatedAt:   time.Now(),
        RequestID:   requestID,
    }

    orderJSON, err := json.Marshal(orderMsg)
    if err != nil {
        // 這是一個嚴重的內部錯誤,代表我們的 model struct 有問題。
        // 它幾乎不應該發生。
        log.Printf("Failed to marshal order message [req_id: %s]: %v", requestID, err)
        // 注意:我們不會在這裡嘗試恢復庫存!那只會讓事情更糟。
        c.JSON(http.StatusInternalServerError, gin.H{"error": "internal system error"})
        return
    }

    // 3. 發送訂單到佇列
    _, err = producer.Send(ctx, "orders", orderJSON)
    if err != nil {
        // 這是最關鍵的失敗場景!
        log.Printf("FATAL: Failed to send order to message queue [req_id: %s]: %v. STOCK IS NOW INCONSISTENT.", requestID, err)
        
        // 我們絕不能嘗試恢復庫存。你不能保證恢復操作本身會成功。
        // 我們能做的唯一正確的事,就是誠實地告訴客戶端我們出錯了,
        // 同時發出最高級別的告警,讓工程師立刻介入處理這個「不一致」的庫存。
        c.JSON(http.StatusServiceUnavailable, gin.H{
            "error": "The service is temporarily unavailable. Please try again later.",
        })
        return
    }

    // 4. 立即返回成功
    // 訊息已進入佇列。API 的任務完成了。
    log.Printf("Order enqueued successfully [req_id: %s, order_id: %s]", requestID, orderID)
    c.JSON(http.StatusAccepted, gin.H{
        "message":   "Your order is being processed.",
        "order_id":  orderID,
    })
}

https://ithelp.ithome.com.tw/upload/images/20251002/20124462hHifBOYSZo.png

如何處理真正的「生產者失敗」?

當你看到上面那段 FATAL 等級的日誌時,代表系統進入了異常狀態,這時該怎麼辦?

  1. 告警 (Alerting):當你看到那條日誌時,你的監控系統(Prometheus, Datadog)應該立刻觸發 P1 等級的告警,通知值班的工程師 。

  2. 監控 (Monitoring):建立一個儀表板,專門追蹤「佇列發送失敗訂單數」。在正常情況下,這個數字必須是 0。

  3. 手動恢復 (Manual Intervention):收到告警後,工程師需要根據日誌中的 RequestID 與訂單內容,檢查 Redis 中的實際庫存。接著,判斷應該手動將訂單訊息重新送入佇列,或是將庫存加回去。在成熟的系統中,這通常會被做成一個內部維運工具。

我們不該期望程式碼能神奇地解決所有基礎設施等級的故障。
當資料庫或消息佇列服務中斷時,這就是一個系統性故障,需要來介入。

總結

今天,我們寫了一個簡單的生產者。

  • 它只做一件事:XADD
  • 它專注於發送,而不處理它能力範圍外的錯誤(不處理 Message Queue 服務中斷)。
  • API 的處理流程變得非常清晰:扣庫存 -> 發消息 -> 返回。任何一步失敗,都立即記錄日誌、觸發告警,並回報錯誤。

這樣我們就有了一個行為可預測的元件了。


上一篇
Go 語言搶票煉金術 Day 17 - 選擇你的佇列:Redis Streams vs RabbitMQ Kafka
系列文
Go 語言搶票煉金術:解鎖千萬級併發下的原子交易奇蹟18
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言