iT邦幫忙

第 12 屆 iThome 鐵人賽

DAY 25
0
DevOps

欸你這週GO了嘛系列 第 25

[DAY25]Golang的實時分佈式消息傳遞平台-NSQ

  • 分享至 

  • xImage
  •  

什麼是NSQ

NSQ是一個基於Go語言的分佈式實時消息平台,它基於MIT開源協議發布,由bitly公司開源出來的一款簡單易用的消息中間件。
NSQ可用於大規模系統中的實時消息服務,並且每天能夠處理數億級別的消息,其設計目標是為在分佈式環境下運行的去中心化服務提供一個強大的基礎架構。
NSQ具有分佈式、去中心化的拓撲結構,該結構具有無單點故障、故障容錯、高可用性以及能夠保證消息的可靠傳遞的特徵。NSQ非常容易配置和部署,且具有最大的靈活性,支持眾多消息協議。

使用NSQ的公司與服務也不少,基本上大部份人的使用情境應該都是負荷的了
https://ithelp.ithome.com.tw/upload/images/20201002/20129515X57Imn9ZVX.png

以前開發的系統架構都是一條龍式的,像門診系統,只要跟門診相對的功能都直接實作在門診系統內,
最大的優點就是資料正確性最高,因為只要透過transaction,就算是寫入N個系統的TABLE,只要包在transaction內,發生錯誤也是rollback掉,不會有對一半的狀態
缺點就很明顯
1.效能差:因為需要寫入十多個table,所以就需要不同系統的邏輯判斷,例如有掛下次回診,就要寫入預約掛號的table,中間會走一輪預約掛號相關邏輯,如果有排程檢查,還要進行科室排程...等等,基本上一次存檔要做的事情太多,需要把其他系統的邏輯引用進來或是直接HARD CODE進來,因為最後都是寫入TABLE。
2.系統耦合度超高:最核心系統會耦合超多其他系統進來,最後很難把系統獨立運作

如果要改成微服務+消息傳遞的架構來看
門診跟掛號系統為獨立系統,門診存檔時去呼叫掛號系統的預約掛號功能,二者行為為平行線,互不干擾,所以門診系統呼叫完就繼續做它自己的後續動作,掛號系統收到門診來的預約掛號訊息,就進行預約掛號,完成後再發個系統給門診系統通知已預約掛號完成,並把預掛資料傳回去。
以上行為就變成二個系統的個自TABLE的transaction,所以已經進入平行作業,效能上就沒有誰等誰的問題,只是
因為個自系統的transaction,發生ROLLBACK時,要怎麼進行異常恢復會是最大的課題~~

以上為以前的工作經驗跟微服務+消息傳遞架構的發想,只是目前還沒有看到HIS系統走在微服務+消息傳遞上面就是了。

回到NSQ上面,NSQ分成3個服務組合而己

nsqlookupd

nsqlookupd是負責管理拓撲信息並提供最終一致性的發現服務的守護進程(daemon),它使用tcp(4160 port)管理nsqd服務,使用http(4161 port)管理nsqadmin服務。同時為客戶端提供查詢功能

nsqlookupd有下面特點

  • 唯一性,在一個Nsq服務中只有一個nsqlookupd服務。當然也可以在叢集中部署多個nsqlookupd,但它們之間是沒有關聯的
  • 去中心化,即使nsqlookupd崩潰,也會不影響正在執行的nsqd服務
  • 充當nsqd和naqadmin資訊互動的中介軟體
  • 提供一個http查詢服務,給客戶端定時更新nsqd的地址目錄

nsqd

nsqd是一個負責接收、排隊、投遞消息給客戶端的守護進程。客戶端通過查詢nsqlookupd 來發現topic的nsqd生產者,nsqd節點會廣播topic和channel信息。
單個nsqd可以有多個topic,每個topic可以有多個channel。channel接收這個topic所有消息的副本,從而實現多播分發,而channel上的每個消息被分發給它的訂閱者,從而實現負載均衡。
nsqd會預設監聽一個tcp埠(4150)和一個http埠(4151)以及一個可選的https埠

  • 對訂閱了同一個topic,同一個channel的消費者使用負載均衡策略(不是輪詢)
  • 只要channel存在,即使沒有該channel的消費者,也會將生產者的message快取到佇列中(注意訊息的過期處理)
  • 保證佇列中的message至少會被消費一次,即使nsqd退出,也會將佇列中的訊息暫存磁碟上(結束程序等意外情況除外)
  • 限定記憶體佔用,能夠配置nsqd中每個channel佇列在記憶體中快取的message數量,一旦超出,message將被快取到磁碟中
  • topic,channel一旦建立,將會一直存在,要及時在管理臺或者用程式碼清除無效的topic和channel,避免資源的浪費

nsqadmin

nsqadmin 是一套WEB管理UI(長的像bootstrap XD),用來匯集集群的實時統計,並執行不同的管理任務,主要功用還是看多少MESSAGE卡著沒發送出去~

NSQ中最重點的二個角色Producer與Consumer

照字面上來說明就是,Producer是產生訊息的人,Consumer是接受訊息的人
以下為詳細說明

Producer

  • producer通過HTTP API將消息發佈到nsqd的指定topic,一般有pub/mpub兩種方式,pub發布一個消息,mpub一個往返發佈多個消息。
  • producer也可以通過nsqd客戶端的TCP接口將消息發佈給nsqd的指定topic。
  • 當生產者producer初次發佈帶topic的消息給nsqd時,如果topic不存在,則會在nsqd中創建topic。

Consumer

  • consumer通過TCP subscribe自己需要的 channel
  • topic和channel都沒有預先配置。 topic由第一次發布消息到命名 topic的producer創建或第一次通過 subscribe訂閱一個命名 topic的 consumer來創建。 channel被 consumer第一次 subscribe訂閱到指定的 channel創建。
  • 多個 consumer subscribe一個 channel,假設所有已連接的客戶端處於準備接收消息的狀態,每個消息將被傳遞到一個隨機的 consumer。
  • NSQ支持延時消息, consumer在配置的延時時間後才能接受相關消息。
  • Channel在 consumer退出後並不會刪除,這點需要特別注意。

安裝nsq

這邊使用docker compose來安裝

version: '3'
services:
  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    ports:
      - "4160"
      - "4161"
  nsqd:
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160
    depends_on:
      - nsqlookupd
    ports:
      - "4150"
      - "4151"
  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
    depends_on:
      - nsqlookupd  
    ports:
      - "4171"

安裝Golang NSQ套件

NSQ官方套件 go-nsq

$go get -u -v github.com/nsqio/go-nsq

分別實作Producer與Consumer

Producer

package main

import (
	"log"
	"github.com/nsqio/go-nsq"
)

func main() {
    //建立一個Producer
	config := nsq.NewConfig()
	producer, err := nsq.NewProducer("127.0.0.1:4150", config)
	if err != nil {
		log.Fatal(err)
	}
    
	messageBody := []byte("hello")
	topicName := "GONSQ_TOPIC"
    //發佈到定義好的topic
	err = producer.Publish(topicName, messageBody)
	if err != nil {
		log.Fatal(err)
	}

	producer.Stop()
}

Consumer

package main

import (
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/nsqio/go-nsq"
)

type myMessageHandler struct{}

func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
	log.Printf("接收到了一個訊息:%v", m)
	log.Printf("這個訊息轉換成字串就是:%v", string(m.Body))
	return nil
}

func main() {
    //建立Consumer
	config := nsq.NewConfig()
	consumer, err := nsq.NewConsumer("GONSQ_TOPIC", "channel", config)
	if err != nil {
		log.Fatal(err)
	}
    //新增一個handler來處理收到訊息時動作
	consumer.AddHandler(&myMessageHandler{})

    //連線到NSQD
	err = consumer.ConnectToNSQD("localhost:4150")
	if err != nil {
		log.Fatal(err)
	}
    
    //卡住,不要讓main.go執行完就結束
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
	<-sigChan

	consumer.Stop()
}

執行結果

producer的情形,連到nsqd並推送一下叫hello的內容出去

2020/10/03 13:30:30 INF    1 (127.0.0.1:4150) connecting to nsqd
2020/10/03 13:30:30 INF    1 stopping
2020/10/03 13:30:30 INF    1 exiting router
2020/10/03 13:30:30 INF    1 (127.0.0.1:4150) beginning close

consumer的情形,收到producer送出來的訊息

2020/10/03 13:30:27 INF    1 [GONSQ_TOPIC/channel] (localhost:4150) connecting to nsqd
2020/10/03 13:30:30 接收到了一個訊息:&{[48 100 101 52 54 101 54 100 101 56 56 102 101 48 48 48] [104 101 108 108 111] 1601703030877634330 1 localhost:4150 0xc00012a050 0 0}
2020/10/03 13:30:30 這個訊息轉換成字串就是:hello

服務可以是producer的角色也可以是consumer的角色,視服務的功能來實行producer/consumer

文章引用
分佈式實時消息平台NSQ
為什麼要使用Nsq


上一篇
[DAY24]Golang也來玩NoSQL-Redis
下一篇
[DAY26]Golang最強大的特性:Goroutine -1
系列文
欸你這週GO了嘛30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言