iT邦幫忙

2021 iThome 鐵人賽

DAY 21
0

Producer是訊息發送方, 他會對nsqd發送訊息, nsqd支援TCP(port:4150) & HTTP(port:4151),
本機啟動nsq環境時要記得把相關address指向本機

  • nsqd
    ./nsqd --broadcast-address=127.0.0.1 --lookupd-tcp-address=127.0.0.1:4160
    
  • nsqlookupd
    ./nsqlookupd
    
  • nsqadmin
    ./nsqadmin --lookupd-http-address=127.0.0.1:4161
    

環境跑起來之後可以透過curl測試創建topic&channel, 詳細支援內容可以參考官網

  • 使用NSQ時, nsq Config 參數設定可參考配置參數, 這邊示範調整max_in_flight 加大允許處理中的消息數

    func newNSQProducer(addr string) (r *nsq.Producer, err error) {
        NSQconfig := nsq.NewConfig()
        err = NSQconfig.Set("max_in_flight", config.NsqdMaxInFlight)
        if err != nil {
            return
        }
    
        r, err = nsq.NewProducer(addr, NSQconfig)
        if err != nil {
            return
        }
    
        err = r.Ping()
        if err != nil {
            return
        }
    
        return r, err
    }
    
  • 接著在 cmd.go 加入 nsqProducer, 並放在server結構裡面, 方便調用

        // nsq producer
        nsqProducer, err := newNSQProducer(config.NsqdAddr)
        if err != nil {
            return
        }
    
        defer func() {
            nsqProducer.Stop()
        }()
    
        sv := &server{
            ScyllaSession: session,
            RedisClient:   redisClient,
            NsqProducer:   nsqProducer,
        }
    
  • NSQ發送訊息只需要producer.Publish到指定的TOPIC(COCONUT_UPDATE_POINT)就完成了。
    TOPIC由負責發送的服務定義好之後就很少會去修改, 所以後來我們就不設定為參數了;
    發送訊息的內容會採用JSON格式, 方便接收到資料的人轉出結構,
    如果是要判斷接收訊息先後順序的資料就需要加上發送時間。

        err = s.NsqProducer.Publish("COCONUT_UPDATE_POINT", []byte(str))
        if err != nil {
            return nil, coconutError.ParseError(coconutError.ErrServer, err)
        }
    
  • 起一個最精簡的consumer就可以測試收到的訊息效果

    COCONUT_UPDATE_POINT 收到的訊息是:[{"name":"A","points":100},{"name":"B","points":100},{"name":"C","points":100}]
    

Producer發送訊息之後, 如果沒有Consumer接走, 就會堆在queue裡面
此時可以透過nsqadmin查看nsq訊息狀況

  • 查看TOPIC列表
    https://i.imgur.com/FzR7H4Z.png
  • 查看每個TOPIC底下的監聽channel & 訊息狀況
    https://i.imgur.com/RQqSaap.png
  • 訊息如果一直堆著沒有被接走, 長期下來會造成記憶體或硬碟空間不足, 可以在channel後面加上#ephemeral, 讓channel斷線之後就自動消失, 這樣可以避免訊息累積。
    https://i.imgur.com/4HhZV36.png

參考資料


上一篇
day 20 - 新增需求:隨時通知目前統計狀況 nsq / websocket 介紹
下一篇
day 22 - NSQ Consumer & graceful shutdown
系列文
Let's Go! 解剖Go server開發到部署的過程30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言