iT邦幫忙

2021 iThome 鐵人賽

DAY 22
1

一個服務發出訊息之後, 可以由多個服務分別註冊多個channel來監聽, 同一個TOPIC底下的每個channel都會拿到一樣的訊息。

當後端的溝通都是透過NSQ訊號的時候, 流程上每個環節的每個動作都可以拆分出來, 每個Consumer都可以接收到第一手的資料, 不需要再經過其他服務的傳遞。 這樣可以減少資料傳遞所消耗的時間、避免資料內容缺失、需要調整資料內容時也不用再大家都改, 只要源頭修改好了, 大家都會拿到一樣的資訊。

但Consumer收到訊息之後可能會做很多複雜的處理, 所以Consumer也需要graceful shutdown。
以下程式大概會是這樣:

  • 建立Consumer

        // 建立空白設定檔。
        ConsumerConfig := nsq.NewConfig()
        // 設置重連時間
        ConsumerConfig.LookupdPollInterval = time.Second * 2
        consumer, _ := nsq.NewConsumer("COCONUT_UPDATE_POINT", "coconut", ConsumerConfig)
        consumer.AddConcurrentHandlers(TestNSQConsumer(), config.NsqConsumerWorkers)
        err = consumer.ConnectToNSQLookupd(config.NsqLookupdAddr)
        if err != nil {
            return
        }
    
    

    AddConcurrentHandlers 是設定要開幾個worker處理, 如果會同時收大批訊息就要測試worker設定的合理值才能在時間內收下所有資料。
    訊息進來後由TestNSQConsumer()負責收下來處理, message.DisableAutoResponse() & defer message.Finish()是不要再讓訊息Requeue回去, 這樣可能會發生處理到一半的狀態, 這邊要看使用的情境, 如果是需要訊息重發的可以使用Requeue, 可以參考官方的test case

    func TestNSQConsumer() nsq.Handler {
       return nsq.HandlerFunc(func(message *nsq.Message) (err error) {
           message.DisableAutoResponse()
           defer message.Finish()
           log.Printf("========================== 收到的訊息是:%v", string(message.Body))
           time.Sleep(time.Duration(20) * time.Second)
           log.Printf("========= 故意睡 20s 測試是否有等consumer處理完才結束")
           return nil
       })
    }
    
  • NewConsumer完成之後, 宣告stopOnSignalExitNCL 把Consumer收集起來, 透過SnowFlake演算法給他一個不重複的ID

        var (
            wg                  sync.WaitGroup
            stopOnSignalExitNCL = NsqConsumerList{} // 收集停止訊號時停止nsq consumer 的列表
        )
    
        stopOnSignalExitNCL.Set(snowflakeNode.Generate().Int64(), consumer)
        // 停止部分Nsq Consumer避免有訊息進來
        Logger.Debugf("停止部分Nsq Consumer避免有訊息進來...")
    
        err = stopOnSignalExitNCL.Each(func(c *nsq.Consumer) error {
            wg.Add(1)
            go func(c *nsq.Consumer, wg *sync.WaitGroup) {
                // 停止訊號會等待正在處理的訊息做完才結束
                c.Stop()
                <-c.StopChan
                wg.Done()
            }(c, &wg)
    
            return nil
        })
    
        if err != nil {
            Logger.Errorf("stopOnSignalExitNCL.Each err: %s", err.Error())
        }
    
        wg.Wait()
        Logger.Debugf("停止 NSQ Consumer完成...")
    
    
  • NsqConsumerList 結構, 丟一個callback到consumer, 等到所有的worker都wg.Done()才能結束

        type NsqConsumerList struct {
            sync.Map
        }
    
        // Set 存Consumer
        func (ncl *NsqConsumerList) Set(key int64, c *nsq.Consumer) *NsqConsumerList {
            ncl.Store(key, c)
            return ncl
        }
    
        // Each callback每個Consumer
        func (ncl *NsqConsumerList) Each(callback func(c *nsq.Consumer) error) error {
            ncl.Range(func(k, v interface{}) bool {
                c, ok := v.(*nsq.Consumer)
    
                if !ok {
                    return false
                }
    
                err := callback(c)
    
                if err != nil {
                    return false
                }
    
                return true
            })
    
            return nil
        }
    
    

NSQ本身對於收到的訊息並沒有備份的機制, 如果需要額外紀錄下NSQ訊號的歷程資訊, 可以開一個channel在TOPIC底下監聽, 再看要儲存在哪個資料庫。 資料收集之後,假設有一天發生不可預期的嚴重資料遺失問題,至少還有備份可以回放時間歷程。

備份很重要,重要的資料一定要備份。


上一篇
day 21 - NSQ Producer
下一篇
day 23 - 取號機 AUTO_INCREMENT(MYSQL) > INCR(Redis) > snowflake演算法
系列文
Let's Go! 解剖Go server開發到部署的過程30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言