Day22提到的場景
一個訊息能對到多個Topic.
這需要Wildcard Subscriptions
的搭配使用.
先介紹這場景, 因為這場景只有NATS能用, NATS Streaming不能QQ
Subject(以下另稱Topic, 因為我習慣了QQ).
Topic可以用.
串多個階層.
像是time.us.east.atlanta
, time.*
.
每個.
表示一個階層, 所以Topic是time.*
, 只能批配time.us
or time.cn
, 不能批配time.us.east
.
*
表示能批配該層的Any.
如果想批配底下多個階層呢?>
採用這個, 例如Topictime.us.>
, 能夠批配time.us.east
和 time.us.east.atlanta
.
*.*.east.>
這種Topic混和使用.
和>
, 則是能批配time.us.east.atlanta
這類滿足規則的Topic.
Demo:
2個Topic, 一個是testTopic.any
, 另一個是testTopic.ithome
2個Publisher, 分別送出資料給這兩個Topic
2個Subscriber, 分別訂閱testTopic.*
和testTopic.ithome
package main
import (
"fmt"
"log"
"os"
"os/signal"
"time"
nats "github.com/nats-io/nats.go"
)
func main() {
opts := []nats.Option{nats.Timeout(10 * 60 * time.Second),
nats.MaxReconnects(50), nats.ReconnectWait(10 * time.Second), nats.ReconnectHandler(func(_ *nats.Conn) {
log.Println("nats client reconnected")
})}
var URL string = "nats://172.16.230.100:4222,nats://172.16.230.101:4222"
nc, err := nats.Connect(URL, opts...)
if err != nil {
log.Println("nats connect :", err)
}
defer nc.Close()
go func() {
var cnt = 0
timer := time.NewTimer(1 * time.Second)
for {
<-timer.C
log.Println("send:", fmt.Sprintf("hello_ithome_%d", cnt))
nc.Publish("testTopic.ithome", []byte(fmt.Sprintf("hello_ithome_%d", cnt)))
cnt++
timer.Reset(3 * time.Second)
}
}()
go func() {
var cnt = 0
timer := time.NewTimer(3 * time.Second)
for {
<-timer.C
log.Println("send:", fmt.Sprintf("hello_any_%d", cnt))
nc.Publish("testTopic.any", []byte(fmt.Sprintf("hello_any_%d", cnt)))
cnt++
timer.Reset(5 * time.Second)
}
}()
mcbAny := func(msg *nats.Msg) {
log.Println("Any:", string(msg.Data))
}
mcbIthome := func(msg *nats.Msg) {
log.Println("Ithome:", string(msg.Data))
}
var Sub1Cb *nats.Subscription
var Sub2Cb *nats.Subscription
go func() {
Sub1Cb, err = nc.Subscribe("testTopic.*", mcbAny)
if err != nil {
log.Println("queue subscribe testTopic.*:", err)
}
}()
go func() {
Sub2Cb, err = nc.Subscribe("testTopic.ithome", mcbIthome)
if err != nil {
log.Println("queue subscribe testTopic.ithome:", err)
}
}()
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
Sub1Cb.Unsubscribe()
Sub2Cb.Unsubscribe()
}
/*
2020/09/29 23:39:45 send: hello_ithome_0
2020/09/29 23:39:45 Any: hello_ithome_0
2020/09/29 23:39:45 Ithome: hello_ithome_0
2020/09/29 23:39:47 send: hello_any_0
2020/09/29 23:39:47 Any: hello_any_0
2020/09/29 23:39:48 send: hello_ithome_1
2020/09/29 23:39:48 Ithome: hello_ithome_1
2020/09/29 23:39:48 Any: hello_ithome_1
*/
可以看出當送出Topic是testTopic.ithome時, 2個Subscriber都會收到, 因為*
可以批配任意.
可當Topic是testTopic.any時, 就只有訂閱testTopic.*
的Subscriber會收到.
NATS Streaming以下的情境都支持
Day22提到的觀察者模式, 就該Topic有多個Subscriber.
Reference
package main
import (
"fmt"
"log"
"os"
"os/signal"
"time"
nats "github.com/nats-io/nats.go"
"github.com/nats-io/stan.go"
)
func main() {
opts := []nats.Option{nats.Timeout(10 * 60 * time.Second),
nats.MaxReconnects(50), nats.ReconnectWait(10 * time.Second), nats.ReconnectHandler(func(_ *nats.Conn) {
log.Println("nats client reconnected")
})}
var URL string = "nats://172.16.230.100:4222,nats://172.16.230.101:4222"
nc, err := nats.Connect(URL, opts...)
if err != nil {
log.Println("nats connect :", err)
}
defer nc.Close()
sc, err := stan.Connect("test-cluster", "nathan02", stan.NatsConn(nc),
stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
log.Printf("Connection lost, reason: %v\n", reason)
}))
if err != nil {
log.Println("Can't connect:", err)
fmt.Printf("CMake sure a NATS Streaming Server is running at: %s", URL)
}
go func() {
timer := time.NewTimer(1 * time.Second)
for {
sc.Publish("new_job_info", []byte(fmt.Sprintf("Software Engineer * %d", 1)))
<-timer.C
timer.Reset(1 * time.Second)
}
}()
mcbPeopleA := func(msg *stan.Msg) {
log.Println("PeopleA:", string(msg.Data))
}
mcbPeopleB := func(msg *stan.Msg) {
log.Println("PeopleB:", string(msg.Data))
}
var subPeopleA stan.Subscription
var subPeopleB stan.Subscription
go func() {
subPeopleA, err = sc.QueueSubscribe("new_job_info", "PeopleA", mcbPeopleA)
if err != nil {
log.Println("queue subscribe new_job_info:", err)
}
}()
go func() {
subPeopleB, err = sc.QueueSubscribe("new_job_info", "PeopleB", mcbPeopleB)
if err != nil {
log.Println("queue subscribe new_job_info:", err)
}
}()
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
subPeopleA.Unsubscribe()
subPeopleB.Unsubscribe()
subPeopleA.Close()
subPeopleB.Close()
sc.Close()
}
/*
2020/09/30 00:12:30 PeopleA: Software Engineer * 1
2020/09/30 00:12:30 PeopleB: Software Engineer * 1
*/
兩個Subscriber都會收到相同關注的Topic發布的訊息.
主要用在廣播
場景上.
這場景, 跟上面有個差異, 就是QueueSubscribe
第二個參數是queue group name, 上個範例給的group name不同, 所以大家都會收到訊息.
這範例則是每個Subscriber都屬於同一個group name.
package main
import (
"fmt"
"log"
"os"
"os/signal"
"time"
nats "github.com/nats-io/nats.go"
"github.com/nats-io/stan.go"
"github.com/nats-io/stan.go/pb"
)
func main() {
opts := []nats.Option{nats.Timeout(10 * 60 * time.Second),
nats.MaxReconnects(50), nats.ReconnectWait(10 * time.Second), nats.ReconnectHandler(func(_ *nats.Conn) {
log.Println("nats client reconnected")
})}
var URL string = "nats://172.16.230.100:4222,nats://172.16.230.101:4222"
nc, err := nats.Connect(URL, opts...)
if err != nil {
log.Println("nats connect :", err)
}
defer nc.Close()
sc, err := stan.Connect("test-cluster", "nathan01", stan.NatsConn(nc),
stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
log.Printf("Connection lost, reason: %v\n", reason)
}))
if err != nil {
log.Println("Can't connect:", err)
fmt.Printf("CMake sure a NATS Streaming Server is running at: %s", URL)
}
startOpt := stan.StartAt(pb.StartPosition_NewOnly)
subAck := stan.SetManualAckMode()
ackWait := stan.AckWait(10 * time.Second)
go func() {
var cnt = 0
timer := time.NewTimer(1 * time.Second)
for {
sc.Publish("topic", []byte(fmt.Sprintf("hello_%d", cnt)))
cnt++
<-timer.C
timer.Reset(1 * time.Second)
}
}()
mcbSub1 := func(msg *stan.Msg) {
log.Println("Sub1:", string(msg.Data))
defer msg.Ack()
}
mcbSub2 := func(msg *stan.Msg) {
log.Println("Sub2:", string(msg.Data))
defer msg.Ack()
}
var sub1 stan.Subscription
var sub2 stan.Subscription
go func() {
sub1, err = sc.QueueSubscribe("topic", "g1", mcbSub1, startOpt, stan.DurableName(""), stan.MaxInflight(1), subAck, ackWait)
if err != nil {
log.Println("queue subscribe Topic:", err)
}
}()
go func() {
sub2, err = sc.QueueSubscribe("topic", "g1", mcbSub2, startOpt, stan.DurableName(""), stan.MaxInflight(1), subAck, ackWait)
if err != nil {
log.Println("queue subscribe testTopic:", err)
}
}()
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
sub1.Unsubscribe()
sub2.Unsubscribe()
sub1.Close()
sub2.ClearMaxPending()
sc.Close()
}
/*
2020/09/30 00:17:57 Sub2: hello_0
2020/09/30 00:17:58 Sub2: hello_1
2020/09/30 00:17:59 Sub1: hello_2
2020/09/30 00:18:00 Sub2: hello_3
2020/09/30 00:18:01 Sub1: hello_4
2020/09/30 00:18:02 Sub2: hello_5
2020/09/30 00:18:03 Sub1: hello_6
*/
這Queue Group模式下, NATS會進行負載均衡, 隨機發送給同一組的任意Subscriber.
這種的好處可以搭配監控數據, 對Subscriber做自動伸縮. 提高系統的可用性.
Reference
Request-Reply 發送應答, 可以支持1-1或者1-Many, 可以指定要幾個訂閱者收到.
但通常是收到第一個Reply後, 其他就丟棄了, 減少等待時間.
Publisher Request:
package main
import (
// "encoding/json"
"fmt"
"log"
"os"
"os/signal"
"time"
nats "github.com/nats-io/nats.go"
"github.com/nats-io/stan.go"
)
func main() {
opts := []nats.Option{nats.Timeout(10 * 60 * time.Second),
nats.MaxReconnects(50), nats.ReconnectWait(10 * time.Second), nats.ReconnectHandler(func(_ *nats.Conn) {
log.Println("nats client reconnected")
})}
var URL string = "nats://172.16.230.100:4222,nats://172.16.230.101:4222"
nc, err := nats.Connect(URL, opts...)
if err != nil {
log.Println("nats connect :", err)
}
defer nc.Close()
sc, err := stan.Connect("test-cluster", "nathan01", stan.NatsConn(nc),
stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
log.Printf("Connection lost, reason: %v\n", reason)
}))
if err != nil {
log.Println("Can't connect:", err)
fmt.Printf("CMake sure a NATS Streaming Server is running at: %s", URL)
}
var replyMsg *nats.Msg
log.Println("Start send request:", time.Now())
replyMsg, err = sc.NatsConn().Request("ithome", []byte("ithome_request"), 4*time.Second)
if err != nil {
log.Println("nats connect :", err)
}
log.Println("get reply:", string(replyMsg.Data))
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
// sub.Unsubscribe()
sc.Close()
}
Subscriber Reply:
package main
import (
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"time"
nats "github.com/nats-io/nats.go"
"github.com/nats-io/stan.go"
)
func main() {
opts := []nats.Option{nats.Timeout(10 * 60 * time.Second),
nats.MaxReconnects(50), nats.ReconnectWait(10 * time.Second), nats.ReconnectHandler(func(_ *nats.Conn) {
log.Println("nats client reconnected")
})}
var URL string = "nats://172.16.230.100:4222,nats://172.16.230.101:4222"
nc, err := nats.Connect(URL, opts...)
if err != nil {
log.Println("nats connect :", err)
}
defer nc.Close()
sc, err := stan.Connect("test-cluster", "nathan02", stan.NatsConn(nc),
stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
log.Printf("Connection lost, reason: %v\n", reason)
}))
if err != nil {
log.Println("Can't connect:", err)
fmt.Printf("CMake sure a NATS Streaming Server is running at: %s", URL)
}
var sub *nats.Subscription
sub, _ = sc.NatsConn().Subscribe("ithome", func(msg *nats.Msg) {
log.Println(string(msg.Data), "from nats")
result, _ := json.Marshal(msg)
log.Println("the reply info is ", string(result))
sc.NatsConn().Publish(msg.Reply, []byte("ithome can help you"))
})
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
sub.Unsubscribe()
sc.Close()
}
場景大概支持的就這幾個.
一些代碼中的東西,跟一些監控的明天再來分享.