Producer是訊息發送方, 他會對nsqd發送訊息, nsqd支援TCP(port:4150) & HTTP(port:4151),
本機啟動nsq環境時要記得把相關address指向本機
./nsqd --broadcast-address=127.0.0.1 --lookupd-tcp-address=127.0.0.1:4160
./nsqlookupd
./nsqadmin --lookupd-http-address=127.0.0.1:4161
環境跑起來之後可以透過curl測試創建topic&channel, 詳細支援內容可以參考官網
使用NSQ時, nsq Config 參數設定可參考配置參數, 這邊示範調整max_in_flight
加大允許處理中的消息數
func newNSQProducer(addr string) (r *nsq.Producer, err error) {
NSQconfig := nsq.NewConfig()
err = NSQconfig.Set("max_in_flight", config.NsqdMaxInFlight)
if err != nil {
return
}
r, err = nsq.NewProducer(addr, NSQconfig)
if err != nil {
return
}
err = r.Ping()
if err != nil {
return
}
return r, err
}
接著在 cmd.go 加入 nsqProducer, 並放在server結構裡面, 方便調用
// nsq producer
nsqProducer, err := newNSQProducer(config.NsqdAddr)
if err != nil {
return
}
defer func() {
nsqProducer.Stop()
}()
sv := &server{
ScyllaSession: session,
RedisClient: redisClient,
NsqProducer: nsqProducer,
}
NSQ發送訊息只需要producer.Publish
到指定的TOPIC(COCONUT_UPDATE_POINT)就完成了。
TOPIC由負責發送的服務定義好之後就很少會去修改, 所以後來我們就不設定為參數了;
發送訊息的內容會採用JSON格式, 方便接收到資料的人轉出結構,
如果是要判斷接收訊息先後順序的資料就需要加上發送時間。
err = s.NsqProducer.Publish("COCONUT_UPDATE_POINT", []byte(str))
if err != nil {
return nil, coconutError.ParseError(coconutError.ErrServer, err)
}
起一個最精簡的consumer就可以測試收到的訊息效果
COCONUT_UPDATE_POINT 收到的訊息是:[{"name":"A","points":100},{"name":"B","points":100},{"name":"C","points":100}]
Producer發送訊息之後, 如果沒有Consumer接走, 就會堆在queue裡面
此時可以透過nsqadmin查看nsq訊息狀況
#ephemeral
, 讓channel斷線之後就自動消失, 這樣可以避免訊息累積。參考資料