昨天簡單介紹過 NSQ架構與組成要件,今天我們來看一下實際使用的方式,與使用時需要注意的地方。範例會延續昨天架在 kubernetes 的 NSQ,請還沒有安裝朋友,自行利用範例設定部署。
如果照著範例啟動的話,我們可以得到 3個使用相同 channel 的 consumer pods,與一個負責推送訊息至 topic 的 producer。注意到這次範例實作 Producer-Consumer Mode,故 3個 consumer 收到的 messages 總數,應等於 channel messages 總數。
官方提供了各種不同語言版本的 client libraries,包含 go, python, javascript, java, php ...,我們因應 golang 主題使用的是 go-nsq。可參考GoDoc內有針對每一個 method 進行更詳細說明。
負責發起訊息傳遞至 nsqd,一般會使用 Sync 方法 Publish()
發送訊息,並等待 nsqd 回傳確認接收後完成發送動作。
func main() {
config := nsq.NewConfig()
p, err := nsq.NewProducer("nsqd:4150", config)
if err != nil {
log.Fatalln("create nsq producer fail")
return
}
for {
time.Sleep(time.Second)
now := time.Now().String()
// Synchronously publish a single message to the specified topic.
// Messages can also be sent asynchronously and/or in batches.
if err := p.Publish("MY_TOPIC", []byte(now)); err != nil {
log.Println("nsq publish fail")
}
}
}
也可以使用 PublishAsync()
方法,不等待 nsqd 確認的方式發送。值得注意的是,不論是Publish()
or PublishAsync()
底層都是由 sendCommandAsync()
處理,也就是說 sync 或 async 僅是 client 端的實作方法,並非由 nsqd 實現。
func (w *Producer) sendCommand(cmd *Command) error {
doneChan := make(chan *ProducerTransaction)
err := w.sendCommandAsync(cmd, doneChan, nil)
if err != nil {
close(doneChan)
return err
}
t := <-doneChan
return t.Error
}
持續針對特定 topics 接收訊息,並且可以藉由 channel 的使用方式,實作出 Producer-Consumer Mode or Broadcast Mode。
Producer-Consumer Mode
type myMessageHandler struct{}
func main() {
config := nsq.NewConfig()
c, err := nsq.NewConsumer("MY_TOPIC", "ch1", config)
if err != nil {
log.Fatalln("create nsq consumer fail", err.Error())
return
}
// Set the Handler for messages received by this Consumer. Can be called multiple times.
// See also AddConcurrentHandlers.
c.AddHandler(&myMessageHandler{})
err = c.ConnectToNSQLookupd("nsqlookupd:4160")
if err != nil {
log.Fatal(err.Error())
return
}
defer c.Stop()
select {}
}
注意我們在 consumer 連線時,建議對 nsqlookupd 進行連線而非 nsqd,讓 nsqlookupd 協助你導向 topic 所屬 nsqd。目的在於防止 SPOFs(單點脆弱),由於 topic 上的 messages 其實是儲存於 nsqd 實體內,故此我們可以建立多個 nsqd 實體,讓多個 topic 分布於不同 nsqd 上,最後藉由 nsqlookupd 統合決定 consumer 所監聽的 topic,應該導向哪一個 nsqd 實體。
Implement Handler
另一個在使用 client consumer 端需注意的部分是,針對監聽到的訊息處理,需實作一個 Handler
滿足 Handler interface
,如此才能滿足 func (r *Consumer) AddHandler(handler Handler)
的註冊方法。
type myMessageHandler struct{}
// HandleMessage implements the Handler interface.
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
if len(m.Body) == 0 {
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
// In this case, a message with an empty body is simply ignored/discarded.
return nil
}
// do whatever actual message processing is desired
err := processMessage(m.Body)
// Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
return err
}
func processMessage(msg []byte) error {
log.Println(string(msg))
return nil
}
Handler 回傳的 error
若不等於 nil
,將自動觸發 NSQ 的 re-queue 機制,訊息將被 nsqd 重發而非消除,這點是我們在設計時需注意的,邏輯上當前狀態能否直接進行 re-try,不行的話在 handler 還是應該回傳 nil
,並另行處理。
RDY
是 NSQ 設計出來,決定 consumer 能夠消化多少流量的一種機制。
在 client consumer 取得連線後,會先得到一個 RDY 0
的狀態,代表目前 client 端無法接收任何一則訊息,待 client 準備完成後傳遞 RDY 2
給 nsqd,表示目前 client 可以接收兩筆訊息了,一但 nsqd 收到 RDY > 0
將會無條件的將續息送向 client consumer。最後 client 依據處理的狀況,回傳處理完成FIN
或重新佇列 REQ
訊號給 nsqd,結束一次傳遞請求。
在 client side 若有能力進行併發處理時,可以使用AddConcurrentHandlers()
配合設定max-in-flight
來決定併發數量,透過調整併發數量可以更獲得更佳的使用體驗,畢竟 NSQ 提供的處理速度是 µs/op 等級,我們可以善用其強大的流通量,提供我們的服務加良好的使用體驗。