NSQ是一個基於Go語言的分佈式實時消息平台,它基於MIT開源協議發布,由bitly公司開源出來的一款簡單易用的消息中間件。
NSQ可用於大規模系統中的實時消息服務,並且每天能夠處理數億級別的消息,其設計目標是為在分佈式環境下運行的去中心化服務提供一個強大的基礎架構。
NSQ具有分佈式、去中心化的拓撲結構,該結構具有無單點故障、故障容錯、高可用性以及能夠保證消息的可靠傳遞的特徵。NSQ非常容易配置和部署,且具有最大的靈活性,支持眾多消息協議。
使用NSQ的公司與服務也不少,基本上大部份人的使用情境應該都是負荷的了
以前開發的系統架構都是一條龍式的,像門診系統,只要跟門診相對的功能都直接實作在門診系統內,
最大的優點就是資料正確性最高,因為只要透過transaction,就算是寫入N個系統的TABLE,只要包在transaction內,發生錯誤也是rollback掉,不會有對一半的狀態
缺點就很明顯
1.效能差:因為需要寫入十多個table,所以就需要不同系統的邏輯判斷,例如有掛下次回診,就要寫入預約掛號的table,中間會走一輪預約掛號相關邏輯,如果有排程檢查,還要進行科室排程...等等,基本上一次存檔要做的事情太多,需要把其他系統的邏輯引用進來或是直接HARD CODE進來,因為最後都是寫入TABLE。
2.系統耦合度超高:最核心系統會耦合超多其他系統進來,最後很難把系統獨立運作
如果要改成微服務+消息傳遞的架構來看
門診跟掛號系統為獨立系統,門診存檔時去呼叫掛號系統的預約掛號功能,二者行為為平行線,互不干擾,所以門診系統呼叫完就繼續做它自己的後續動作,掛號系統收到門診來的預約掛號訊息,就進行預約掛號,完成後再發個系統給門診系統通知已預約掛號完成,並把預掛資料傳回去。
以上行為就變成二個系統的個自TABLE的transaction,所以已經進入平行作業,效能上就沒有誰等誰的問題,只是
因為個自系統的transaction,發生ROLLBACK時,要怎麼進行異常恢復會是最大的課題~~
回到NSQ上面,NSQ分成3個服務組合而己
nsqlookupd是負責管理拓撲信息並提供最終一致性的發現服務的守護進程(daemon),它使用tcp(4160 port)管理nsqd服務,使用http(4161 port)管理nsqadmin服務。同時為客戶端提供查詢功能
nsqlookupd有下面特點
nsqd是一個負責接收、排隊、投遞消息給客戶端的守護進程。客戶端通過查詢nsqlookupd 來發現topic的nsqd生產者,nsqd節點會廣播topic和channel信息。
單個nsqd可以有多個topic,每個topic可以有多個channel。channel接收這個topic所有消息的副本,從而實現多播分發,而channel上的每個消息被分發給它的訂閱者,從而實現負載均衡。
nsqd會預設監聽一個tcp埠(4150)和一個http埠(4151)以及一個可選的https埠
nsqadmin 是一套WEB管理UI(長的像bootstrap XD),用來匯集集群的實時統計,並執行不同的管理任務,主要功用還是看多少MESSAGE卡著沒發送出去~
照字面上來說明就是,Producer是產生訊息的人,Consumer是接受訊息的人
以下為詳細說明
這邊使用docker compose來安裝
version: '3'
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
ports:
- "4160"
- "4161"
nsqd:
image: nsqio/nsq
command: /nsqd --lookupd-tcp-address=nsqlookupd:4160
depends_on:
- nsqlookupd
ports:
- "4150"
- "4151"
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
depends_on:
- nsqlookupd
ports:
- "4171"
NSQ官方套件 go-nsq
$go get -u -v github.com/nsqio/go-nsq
分別實作Producer與Consumer
package main
import (
"log"
"github.com/nsqio/go-nsq"
)
func main() {
//建立一個Producer
config := nsq.NewConfig()
producer, err := nsq.NewProducer("127.0.0.1:4150", config)
if err != nil {
log.Fatal(err)
}
messageBody := []byte("hello")
topicName := "GONSQ_TOPIC"
//發佈到定義好的topic
err = producer.Publish(topicName, messageBody)
if err != nil {
log.Fatal(err)
}
producer.Stop()
}
package main
import (
"log"
"os"
"os/signal"
"syscall"
"github.com/nsqio/go-nsq"
)
type myMessageHandler struct{}
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
log.Printf("接收到了一個訊息:%v", m)
log.Printf("這個訊息轉換成字串就是:%v", string(m.Body))
return nil
}
func main() {
//建立Consumer
config := nsq.NewConfig()
consumer, err := nsq.NewConsumer("GONSQ_TOPIC", "channel", config)
if err != nil {
log.Fatal(err)
}
//新增一個handler來處理收到訊息時動作
consumer.AddHandler(&myMessageHandler{})
//連線到NSQD
err = consumer.ConnectToNSQD("localhost:4150")
if err != nil {
log.Fatal(err)
}
//卡住,不要讓main.go執行完就結束
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
consumer.Stop()
}
producer的情形,連到nsqd並推送一下叫hello的內容出去
2020/10/03 13:30:30 INF 1 (127.0.0.1:4150) connecting to nsqd
2020/10/03 13:30:30 INF 1 stopping
2020/10/03 13:30:30 INF 1 exiting router
2020/10/03 13:30:30 INF 1 (127.0.0.1:4150) beginning close
consumer的情形,收到producer送出來的訊息
2020/10/03 13:30:27 INF 1 [GONSQ_TOPIC/channel] (localhost:4150) connecting to nsqd
2020/10/03 13:30:30 接收到了一個訊息:&{[48 100 101 52 54 101 54 100 101 56 56 102 101 48 48 48] [104 101 108 108 111] 1601703030877634330 1 localhost:4150 0xc00012a050 0 0}
2020/10/03 13:30:30 這個訊息轉換成字串就是:hello
服務可以是producer的角色也可以是consumer的角色,視服務的功能來實行producer/consumer