第16天有提到etcd Service提供了六類服務, 其中包含了今天的主題Watch.
就來玩一下Watch.
Watch用來監聽一個定義的Key值, 也能監控一組的Key(透過前綴來指定), 任何該監看對象的變化, 都會通知.
也相當於設計模式中的觀察者模式.
適用的情境很多, 像是之後要介紹的服務發現, 配置的異動, 或者某資料狀態的異動監看.
但如果超過上萬過監看, 還是透過一個中央的Proxy來監看,並透過MQ做fanout通知比較實在.
因為etcd Server要逐個做notify也是很吃它自己的資源.
Watch提供了以下接口.
type Watcher interface {
// 開始監看指定的Key, 以及傳入context, 便於控制goroutine的退出.
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
// RequestProgress requests a progress notify response be sent in all watch channels.
RequestProgress(ctx context.Context) error
// 關閉Watch
Close() error
}
// 透過Watch取得的是個唯讀Channel
type WatchChan <-chan WatchResponse
type WatchResponse struct {
Header pb.ResponseHeader
// 對應的操作事件集合, 有DELETE/PUT
Events []*Event
CompactRevision int64
Canceled bool
// Created is used to indicate the creation of the watcher.
Created bool
closeErr error
// cancelReason is a reason of canceling watch
cancelReason string
}
Demo1:
監聽ithome
該KEY, 該KEY有5S的租約.
並設置timer每秒鐘去更改其值.
如果是PUT
, 則列印出當前revision的值, 該Key被建立的revision, 以及現在的revision.
如果是DELETE
, 則列印現在的revision.
package main
import (
"context"
"fmt"
"log"
"strconv"
"time"
"github.com/coreos/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/clientv3"
)
var (
dialTimeout = 10 * time.Second
requestTimeout = 5 * time.Second
keepAliveTimeout = 5 * time.Second
)
func main() {
cancelCtx, canFunc := context.WithCancel(context.Background())
var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.101:2379"}
cli, err := clientv3.New(clientv3.Config{
Context: cancelCtx,
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatalln(err)
}
defer cli.Close()
// 建立一個Lease service client
leaseCli := clientv3.NewLease(cli)
var leaseGrantRes *clientv3.LeaseGrantResponse
// 申請一個5秒的租約
if leaseGrantRes, err = leaseCli.Grant(cancelCtx, 5); err != nil {
fmt.Println(err)
}
// 取得租約ID
leaseID := leaseGrantRes.ID
kv := clientv3.NewKV(cli)
// 新增一個KV, 並與租約做關聯, 租約5s後到期
if _, err = kv.Put(context.TODO(), "ithome", "Day1", clientv3.WithLease(leaseID)); err != nil {
fmt.Println(err)
return
}
timer := time.NewTimer(1 * time.Second)
go func(timer *time.Timer) {
for {
select {
case <-timer.C:
// 每秒改變一下該KV
value := "Day" + strconv.Itoa(time.Now().Second()%31)
fmt.Printf("%s\n", value)
kv.Put(cancelCtx, "ithome", value, clientv3.WithLease(leaseID))
}
timer.Reset(1 * time.Second)
}
}(timer)
// 建立一個ithome的Watcher
watcher := clientv3.NewWatcher(cli)
// 指定開始監看ithome
watchRespChan := watcher.Watch(cancelCtx, "ithome")
// 處理監聽到的KV變化event
for watchResp := range watchRespChan {
for _, event := range watchResp.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("修改成:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
case mvccpb.DELETE:
fmt.Println("刪除了", "Revision:", event.Kv.ModRevision)
goto END
}
}
}
END:
canFunc()
fmt.Println("Service Terminate")
}
/*
Day14
修改成: Day14 Revision: 1954735 1954736
Day15
修改成: Day15 Revision: 1954735 1954737
Day16
修改成: Day16 Revision: 1954735 1954738
Day17
修改成: Day17 Revision: 1954735 1954739
Day18
修改成: Day18 Revision: 1954735 1954740
刪除了 Revision: 1954741
Service Terminate
*/
Demo2 :
透過WithPrefix
指定滿足prefix前綴的key做監看
package main
import (
"context"
"fmt"
"log"
"strconv"
"time"
"github.com/coreos/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/clientv3"
)
var (
dialTimeout = 10 * time.Second
requestTimeout = 5 * time.Second
keepAliveTimeout = 5 * time.Second
)
func main() {
cancelCtx, canFunc := context.WithCancel(context.Background())
var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.101:2379"}
cli, err := clientv3.New(clientv3.Config{
Context: cancelCtx,
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatalln(err)
}
defer cli.Close()
// 建立一個Lease service client
leaseCli := clientv3.NewLease(cli)
var leaseGrantRes *clientv3.LeaseGrantResponse
// 申請一個5秒的租約
if leaseGrantRes, err = leaseCli.Grant(cancelCtx, 5); err != nil {
fmt.Println(err)
}
// 取得租約ID
leaseID := leaseGrantRes.ID
kv := clientv3.NewKV(cli)
timer := time.NewTimer(1 * time.Second)
go func(timer *time.Timer) {
for {
select {
case <-timer.C:
// 每秒改變一下該KV
key := "ithome"
value := "Day" + strconv.Itoa(time.Now().Second()%31)
fmt.Printf("%s\n", value)
// 這裡Key改成ithomeDay**
kv.Put(cancelCtx, key+value, value, clientv3.WithLease(leaseID))
}
timer.Reset(1 * time.Second)
}
}(timer)
// 建立一個ithome的Watcher
watcher := clientv3.NewWatcher(cli)
// 指定開始監看ithome
watchRespChan := watcher.Watch(cancelCtx, "ithome", clientv3.WithPrefix())
// 處理監聽到的KV變化event
for watchResp := range watchRespChan {
for _, event := range watchResp.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("修改成:", string(event.Kv.Key), string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
case mvccpb.DELETE:
fmt.Println("刪除了", "Revision:", event.Kv.ModRevision)
goto END
}
}
}
END:
canFunc()
fmt.Println("Service Terminate")
}
/*
Day24
修改成: ithomeDay24 Day24 Revision: 1954825 1954825
Day25
修改成: ithomeDay25 Day25 Revision: 1954826 1954826
Day26
修改成: ithomeDay26 Day26 Revision: 1954827 1954827
Day27
修改成: ithomeDay27 Day27 Revision: 1954828 1954828
Day28
修改成: ithomeDay28 Day28 Revision: 1954829 1954829
刪除了 Revision: 1954830
Service Terminate
*/
Demo 3:
透過WithRange
監聽一個範圍的Key,該範圍是["KeyStart", "KeyEnd").
舉例["Key1", "Key3") ; 就是包含Key1 & Key2, 但是不包含Key3之後的.
這裡Key有ithomeDay0
-ithomeDay3
;
但我們只監聽[ithomeDay0
, ithomeDay3
); 就是ithomeDay0-ithomeDay02
package main
import (
"context"
"fmt"
"log"
"strconv"
"time"
"github.com/coreos/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/clientv3"
)
var (
dialTimeout = 10 * time.Second
requestTimeout = 5 * time.Second
keepAliveTimeout = 5 * time.Second
)
func main() {
cancelCtx, canFunc := context.WithCancel(context.Background())
var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.101:2379"}
cli, err := clientv3.New(clientv3.Config{
Context: cancelCtx,
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatalln(err)
}
defer cli.Close()
// 建立一個Lease service client
leaseCli := clientv3.NewLease(cli)
var leaseGrantRes *clientv3.LeaseGrantResponse
// 申請一個5秒的租約
if leaseGrantRes, err = leaseCli.Grant(cancelCtx, 5); err != nil {
fmt.Println(err)
}
// 取得租約ID
leaseID := leaseGrantRes.ID
kv := clientv3.NewKV(cli)
timer := time.NewTimer(1 * time.Second)
go func(timer *time.Timer) {
for {
select {
case <-timer.C:
// 每秒改變一下該KV
key := "ithome"
// 這裡改成對3取餘數
value := "Day" + strconv.Itoa(time.Now().Second()%4)
fmt.Printf("%s\n", value)
// 這裡Key改成ithomeDay**
kv.Put(cancelCtx, key+value, value, clientv3.WithLease(leaseID))
}
timer.Reset(1 * time.Second)
}
}(timer)
// 建立一個ithome的Watcher
watcher := clientv3.NewWatcher(cli)
// 指定開始監看[ithomeDay0, ithomeDay3)
watchRespChan := watcher.Watch(cancelCtx, "ithomeDay0", clientv3.WithRange("ithomeDay3"))
// 處理監聽到的KV變化event
for watchResp := range watchRespChan {
for _, event := range watchResp.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("修改成:", string(event.Kv.Key), string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
case mvccpb.DELETE:
fmt.Println("刪除了", "Revision:", event.Kv.ModRevision)
goto END
}
}
}
END:
canFunc()
fmt.Println("Service Terminate")
}
/*
Day3
Day0
修改成: ithomeDay0 Day0 Revision: 1954860 1954860
Day1
修改成: ithomeDay1 Day1 Revision: 1954861 1954861
Day2
修改成: ithomeDay2 Day2 Revision: 1954862 1954862
Day3
刪除了 Revision: 1954864
Service Terminate
*/
這裡能參考更多OpOption