一個服務發出訊息之後, 可以由多個服務分別註冊多個channel來監聽, 同一個TOPIC底下的每個channel都會拿到一樣的訊息。
當後端的溝通都是透過NSQ訊號的時候, 流程上每個環節的每個動作都可以拆分出來, 每個Consumer都可以接收到第一手的資料, 不需要再經過其他服務的傳遞。 這樣可以減少資料傳遞所消耗的時間、避免資料內容缺失、需要調整資料內容時也不用再大家都改, 只要源頭修改好了, 大家都會拿到一樣的資訊。
但Consumer收到訊息之後可能會做很多複雜的處理, 所以Consumer也需要graceful shutdown。
以下程式大概會是這樣:
建立Consumer
// 建立空白設定檔。
ConsumerConfig := nsq.NewConfig()
// 設置重連時間
ConsumerConfig.LookupdPollInterval = time.Second * 2
consumer, _ := nsq.NewConsumer("COCONUT_UPDATE_POINT", "coconut", ConsumerConfig)
consumer.AddConcurrentHandlers(TestNSQConsumer(), config.NsqConsumerWorkers)
err = consumer.ConnectToNSQLookupd(config.NsqLookupdAddr)
if err != nil {
return
}
AddConcurrentHandlers
是設定要開幾個worker處理, 如果會同時收大批訊息就要測試worker設定的合理值才能在時間內收下所有資料。
訊息進來後由TestNSQConsumer()
負責收下來處理, message.DisableAutoResponse()
& defer message.Finish()
是不要再讓訊息Requeue回去, 這樣可能會發生處理到一半的狀態, 這邊要看使用的情境, 如果是需要訊息重發的可以使用Requeue
, 可以參考官方的test case。
func TestNSQConsumer() nsq.Handler {
return nsq.HandlerFunc(func(message *nsq.Message) (err error) {
message.DisableAutoResponse()
defer message.Finish()
log.Printf("========================== 收到的訊息是:%v", string(message.Body))
time.Sleep(time.Duration(20) * time.Second)
log.Printf("========= 故意睡 20s 測試是否有等consumer處理完才結束")
return nil
})
}
NewConsumer完成之後, 宣告stopOnSignalExitNCL
把Consumer收集起來, 透過SnowFlake演算法給他一個不重複的ID
var (
wg sync.WaitGroup
stopOnSignalExitNCL = NsqConsumerList{} // 收集停止訊號時停止nsq consumer 的列表
)
stopOnSignalExitNCL.Set(snowflakeNode.Generate().Int64(), consumer)
// 停止部分Nsq Consumer避免有訊息進來
Logger.Debugf("停止部分Nsq Consumer避免有訊息進來...")
err = stopOnSignalExitNCL.Each(func(c *nsq.Consumer) error {
wg.Add(1)
go func(c *nsq.Consumer, wg *sync.WaitGroup) {
// 停止訊號會等待正在處理的訊息做完才結束
c.Stop()
<-c.StopChan
wg.Done()
}(c, &wg)
return nil
})
if err != nil {
Logger.Errorf("stopOnSignalExitNCL.Each err: %s", err.Error())
}
wg.Wait()
Logger.Debugf("停止 NSQ Consumer完成...")
NsqConsumerList 結構, 丟一個callback到consumer, 等到所有的worker都wg.Done()
才能結束
type NsqConsumerList struct {
sync.Map
}
// Set 存Consumer
func (ncl *NsqConsumerList) Set(key int64, c *nsq.Consumer) *NsqConsumerList {
ncl.Store(key, c)
return ncl
}
// Each callback每個Consumer
func (ncl *NsqConsumerList) Each(callback func(c *nsq.Consumer) error) error {
ncl.Range(func(k, v interface{}) bool {
c, ok := v.(*nsq.Consumer)
if !ok {
return false
}
err := callback(c)
if err != nil {
return false
}
return true
})
return nil
}
NSQ本身對於收到的訊息並沒有備份的機制, 如果需要額外紀錄下NSQ訊號的歷程資訊, 可以開一個channel在TOPIC底下監聽, 再看要儲存在哪個資料庫。 資料收集之後,假設有一天發生不可預期的嚴重資料遺失問題,至少還有備份可以回放時間歷程。
備份很重要,重要的資料一定要備份。