iT邦幫忙

第 12 屆 iThome 鐵人賽

DAY 24
1
Software Development

服務開發雜談系列 第 24

NATS Client使用&支持場景

  • 分享至 

  • xImage
  •  

NAT Client基本使用與支持場景

Publisher-Subscriber

Day22提到的場景
一個訊息能對到多個Topic.
這需要Wildcard Subscriptions的搭配使用.


Reference

先介紹這場景, 因為這場景只有NATS能用, NATS Streaming不能QQ

Subject(以下另稱Topic, 因為我習慣了QQ).

Topic可以用.串多個階層.
像是time.us.east.atlanta, time.*.
每個.表示一個階層, 所以Topic是time.*, 只能批配time.usor time.cn, 不能批配time.us.east.

*表示能批配該層的Any.

如果想批配底下多個階層呢?
>採用這個, 例如Topictime.us.>, 能夠批配time.us.easttime.us.east.atlanta.

*.*.east.>這種Topic混和使用.>, 則是能批配time.us.east.atlanta這類滿足規則的Topic.

Demo:
2個Topic, 一個是testTopic.any, 另一個是testTopic.ithome
2個Publisher, 分別送出資料給這兩個Topic
2個Subscriber, 分別訂閱testTopic.*testTopic.ithome

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"time"

	nats "github.com/nats-io/nats.go"
)

func main() {
	opts := []nats.Option{nats.Timeout(10 * 60 * time.Second),
		nats.MaxReconnects(50), nats.ReconnectWait(10 * time.Second), nats.ReconnectHandler(func(_ *nats.Conn) {
			log.Println("nats client reconnected")
		})}

	var URL string = "nats://172.16.230.100:4222,nats://172.16.230.101:4222"
	nc, err := nats.Connect(URL, opts...)

	if err != nil {
		log.Println("nats connect :", err)
	}
	defer nc.Close()

	go func() {
		var cnt = 0
		timer := time.NewTimer(1 * time.Second)
		for {
			<-timer.C
			log.Println("send:", fmt.Sprintf("hello_ithome_%d", cnt))
			nc.Publish("testTopic.ithome", []byte(fmt.Sprintf("hello_ithome_%d", cnt)))
			cnt++
			timer.Reset(3 * time.Second)
		}
	}()
	go func() {
		var cnt = 0
		timer := time.NewTimer(3 * time.Second)
		for {
			<-timer.C
			log.Println("send:", fmt.Sprintf("hello_any_%d", cnt))
			nc.Publish("testTopic.any", []byte(fmt.Sprintf("hello_any_%d", cnt)))
			cnt++
			timer.Reset(5 * time.Second)
		}
	}()

	mcbAny := func(msg *nats.Msg) {
		log.Println("Any:", string(msg.Data))

	}
	mcbIthome := func(msg *nats.Msg) {
		log.Println("Ithome:", string(msg.Data))

	}
	var Sub1Cb *nats.Subscription
	var Sub2Cb *nats.Subscription
	go func() {
		Sub1Cb, err = nc.Subscribe("testTopic.*", mcbAny)
		if err != nil {
			log.Println("queue subscribe testTopic.*:", err)
		}
	}()

	go func() {
		Sub2Cb, err = nc.Subscribe("testTopic.ithome", mcbIthome)
		if err != nil {
			log.Println("queue subscribe testTopic.ithome:", err)
		}
	}()

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)

	<-c
	Sub1Cb.Unsubscribe()
	Sub2Cb.Unsubscribe()
}
/*
2020/09/29 23:39:45 send: hello_ithome_0
2020/09/29 23:39:45 Any: hello_ithome_0
2020/09/29 23:39:45 Ithome: hello_ithome_0
2020/09/29 23:39:47 send: hello_any_0
2020/09/29 23:39:47 Any: hello_any_0
2020/09/29 23:39:48 send: hello_ithome_1
2020/09/29 23:39:48 Ithome: hello_ithome_1
2020/09/29 23:39:48 Any: hello_ithome_1
*/

可以看出當送出Topic是testTopic.ithome時, 2個Subscriber都會收到, 因為*可以批配任意.
可當Topic是testTopic.any時, 就只有訂閱testTopic.*的Subscriber會收到.

NAT Streaming Client基本使用與支持場景

NATS Streaming以下的情境都支持

觀察者模式

Day22提到的觀察者模式, 就該Topic有多個Subscriber.

Reference

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"time"

	nats "github.com/nats-io/nats.go"
	"github.com/nats-io/stan.go"
)

func main() {
	opts := []nats.Option{nats.Timeout(10 * 60 * time.Second),
		nats.MaxReconnects(50), nats.ReconnectWait(10 * time.Second), nats.ReconnectHandler(func(_ *nats.Conn) {
			log.Println("nats client reconnected")
		})}

	var URL string = "nats://172.16.230.100:4222,nats://172.16.230.101:4222"
	nc, err := nats.Connect(URL, opts...)

	if err != nil {
		log.Println("nats connect :", err)
	}
	defer nc.Close()

	sc, err := stan.Connect("test-cluster", "nathan02", stan.NatsConn(nc),
		stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
			log.Printf("Connection lost, reason: %v\n", reason)

		}))
	if err != nil {
		log.Println("Can't connect:", err)
		fmt.Printf("CMake sure a NATS Streaming Server is running at: %s", URL)

	}

	go func() {
		timer := time.NewTimer(1 * time.Second)
		for {

			sc.Publish("new_job_info", []byte(fmt.Sprintf("Software Engineer * %d", 1)))
			<-timer.C
			timer.Reset(1 * time.Second)
		}
	}()

	mcbPeopleA := func(msg *stan.Msg) {
		log.Println("PeopleA:", string(msg.Data))

	}
	mcbPeopleB := func(msg *stan.Msg) {
		log.Println("PeopleB:", string(msg.Data))

	}
	var subPeopleA stan.Subscription
	var subPeopleB stan.Subscription
	go func() {
		subPeopleA, err = sc.QueueSubscribe("new_job_info", "PeopleA", mcbPeopleA)
		if err != nil {
			log.Println("queue subscribe new_job_info:", err)
		}
	}()

	go func() {
		subPeopleB, err = sc.QueueSubscribe("new_job_info", "PeopleB", mcbPeopleB)
		if err != nil {
			log.Println("queue subscribe new_job_info:", err)
		}
	}()

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)

	<-c
	subPeopleA.Unsubscribe()
	subPeopleB.Unsubscribe()

	subPeopleA.Close()
	subPeopleB.Close()
	sc.Close()
}
/*
2020/09/30 00:12:30 PeopleA: Software Engineer * 1
2020/09/30 00:12:30 PeopleB: Software Engineer * 1
*/

兩個Subscriber都會收到相同關注的Topic發布的訊息.
主要用在廣播場景上.

Queue Group


這場景, 跟上面有個差異, 就是QueueSubscribe第二個參數是queue group name, 上個範例給的group name不同, 所以大家都會收到訊息.
這範例則是每個Subscriber都屬於同一個group name.

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"time"

	nats "github.com/nats-io/nats.go"
	"github.com/nats-io/stan.go"
	"github.com/nats-io/stan.go/pb"
)

func main() {
	opts := []nats.Option{nats.Timeout(10 * 60 * time.Second),
		nats.MaxReconnects(50), nats.ReconnectWait(10 * time.Second), nats.ReconnectHandler(func(_ *nats.Conn) {
			log.Println("nats client reconnected")
		})}

	var URL string = "nats://172.16.230.100:4222,nats://172.16.230.101:4222"
	nc, err := nats.Connect(URL, opts...)

	if err != nil {
		log.Println("nats connect :", err)
	}
	defer nc.Close()

	sc, err := stan.Connect("test-cluster", "nathan01", stan.NatsConn(nc),
		stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
			log.Printf("Connection lost, reason: %v\n", reason)

		}))
	if err != nil {
		log.Println("Can't connect:", err)
		fmt.Printf("CMake sure a NATS Streaming Server is running at: %s", URL)

	}
	startOpt := stan.StartAt(pb.StartPosition_NewOnly)
	subAck := stan.SetManualAckMode()
	ackWait := stan.AckWait(10 * time.Second)

	go func() {
		var cnt = 0
		timer := time.NewTimer(1 * time.Second)
		for {

			sc.Publish("topic", []byte(fmt.Sprintf("hello_%d", cnt)))
			cnt++
			<-timer.C
			timer.Reset(1 * time.Second)
		}
	}()

	mcbSub1 := func(msg *stan.Msg) {
		log.Println("Sub1:", string(msg.Data))
		defer msg.Ack()
	}
	mcbSub2 := func(msg *stan.Msg) {
		log.Println("Sub2:", string(msg.Data))
		defer msg.Ack()
	}
	var sub1 stan.Subscription
	var sub2 stan.Subscription
	go func() {
		sub1, err = sc.QueueSubscribe("topic", "g1", mcbSub1, startOpt, stan.DurableName(""), stan.MaxInflight(1), subAck, ackWait)
		if err != nil {
			log.Println("queue subscribe Topic:", err)
		}
	}()

	go func() {
		sub2, err = sc.QueueSubscribe("topic", "g1", mcbSub2, startOpt, stan.DurableName(""), stan.MaxInflight(1), subAck, ackWait)
		if err != nil {
			log.Println("queue subscribe testTopic:", err)
		}
	}()

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)

	<-c
	sub1.Unsubscribe()
	sub2.Unsubscribe()
	sub1.Close()
	sub2.ClearMaxPending()
	sc.Close()
}
/*
2020/09/30 00:17:57 Sub2: hello_0
2020/09/30 00:17:58 Sub2: hello_1
2020/09/30 00:17:59 Sub1: hello_2
2020/09/30 00:18:00 Sub2: hello_3
2020/09/30 00:18:01 Sub1: hello_4
2020/09/30 00:18:02 Sub2: hello_5
2020/09/30 00:18:03 Sub1: hello_6
*/

這Queue Group模式下, NATS會進行負載均衡, 隨機發送給同一組的任意Subscriber.
這種的好處可以搭配監控數據, 對Subscriber做自動伸縮. 提高系統的可用性.

Request-Reply


Reference
Request-Reply 發送應答, 可以支持1-1或者1-Many, 可以指定要幾個訂閱者收到.
但通常是收到第一個Reply後, 其他就丟棄了, 減少等待時間.

Publisher Request:

package main

import (
	//	"encoding/json"
	"fmt"
	"log"
	"os"
	"os/signal"
	"time"

	nats "github.com/nats-io/nats.go"
	"github.com/nats-io/stan.go"
)

func main() {
	opts := []nats.Option{nats.Timeout(10 * 60 * time.Second),
		nats.MaxReconnects(50), nats.ReconnectWait(10 * time.Second), nats.ReconnectHandler(func(_ *nats.Conn) {
			log.Println("nats client reconnected")
		})}

	var URL string = "nats://172.16.230.100:4222,nats://172.16.230.101:4222"
	nc, err := nats.Connect(URL, opts...)

	if err != nil {
		log.Println("nats connect :", err)
	}
	defer nc.Close()
	sc, err := stan.Connect("test-cluster", "nathan01", stan.NatsConn(nc),
		stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
			log.Printf("Connection lost, reason: %v\n", reason)

		}))
	if err != nil {
		log.Println("Can't connect:", err)
		fmt.Printf("CMake sure a NATS Streaming Server is running at: %s", URL)

	}

	var replyMsg *nats.Msg
	log.Println("Start send request:", time.Now())
	replyMsg, err = sc.NatsConn().Request("ithome", []byte("ithome_request"), 4*time.Second)
	if err != nil {
		log.Println("nats connect :", err)
	}
	log.Println("get reply:", string(replyMsg.Data))

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)

	<-c
	// sub.Unsubscribe()
	sc.Close()
}

Subscriber Reply:

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"os"
	"os/signal"
	"time"

	nats "github.com/nats-io/nats.go"
	"github.com/nats-io/stan.go"
)

func main() {
	opts := []nats.Option{nats.Timeout(10 * 60 * time.Second),
		nats.MaxReconnects(50), nats.ReconnectWait(10 * time.Second), nats.ReconnectHandler(func(_ *nats.Conn) {
			log.Println("nats client reconnected")
		})}

	var URL string = "nats://172.16.230.100:4222,nats://172.16.230.101:4222"
	nc, err := nats.Connect(URL, opts...)

	if err != nil {
		log.Println("nats connect :", err)
	}
	defer nc.Close()

	sc, err := stan.Connect("test-cluster", "nathan02", stan.NatsConn(nc),
		stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
			log.Printf("Connection lost, reason: %v\n", reason)

		}))
	if err != nil {
		log.Println("Can't connect:", err)
		fmt.Printf("CMake sure a NATS Streaming Server is running at: %s", URL)

	}
	var sub *nats.Subscription
	sub, _ = sc.NatsConn().Subscribe("ithome", func(msg *nats.Msg) {
		log.Println(string(msg.Data), "from nats")
		result, _ := json.Marshal(msg)
		log.Println("the reply info is ", string(result))
		sc.NatsConn().Publish(msg.Reply, []byte("ithome can help you"))
	})

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)

	<-c
	sub.Unsubscribe()
	sc.Close()
}

場景大概支持的就這幾個.
一些代碼中的東西,跟一些監控的明天再來分享.


上一篇
NATS & NATS Streaming 叢集安裝 By Docker Compose
下一篇
NATS Monitor介紹
系列文
服務開發雜談33
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言