iT邦幫忙

第 12 屆 iThome 鐵人賽

DAY 18
1
Software Development

服務開發雜談系列 第 18

etcd Watch監看

第16天有提到etcd Service提供了六類服務, 其中包含了今天的主題Watch.
就來玩一下Watch.

Watch Service

Watch用來監聽一個定義的Key值, 也能監控一組的Key(透過前綴來指定), 任何該監看對象的變化, 都會通知.
也相當於設計模式中的觀察者模式.

適用的情境很多, 像是之後要介紹的服務發現, 配置的異動, 或者某資料狀態的異動監看.
但如果超過上萬過監看, 還是透過一個中央的Proxy來監看,並透過MQ做fanout通知比較實在.
因為etcd Server要逐個做notify也是很吃它自己的資源.

Watch提供了以下接口.

type Watcher interface {
    // 開始監看指定的Key, 以及傳入context, 便於控制goroutine的退出.
	Watch(ctx context.Context, key string, opts ...OpOption) WatchChan

	// RequestProgress requests a progress notify response be sent in all watch channels.
	RequestProgress(ctx context.Context) error

	// 關閉Watch
	Close() error
}
// 透過Watch取得的是個唯讀Channel
type WatchChan <-chan WatchResponse

type WatchResponse struct {
	Header pb.ResponseHeader
    
    // 對應的操作事件集合, 有DELETE/PUT
	Events []*Event

	CompactRevision int64

	Canceled bool

	// Created is used to indicate the creation of the watcher.
	Created bool

	closeErr error

	// cancelReason is a reason of canceling watch
	cancelReason string
}

Demo1:
監聽ithome該KEY, 該KEY有5S的租約.
並設置timer每秒鐘去更改其值.
如果是PUT, 則列印出當前revision的值, 該Key被建立的revision, 以及現在的revision.
如果是DELETE, 則列印現在的revision.

package main

import (
	"context"
	"fmt"
	"log"
	"strconv"
	"time"

	"github.com/coreos/etcd/mvcc/mvccpb"
	"go.etcd.io/etcd/clientv3"
)

var (
	dialTimeout      = 10 * time.Second
	requestTimeout   = 5 * time.Second
	keepAliveTimeout = 5 * time.Second
)

func main() {
	cancelCtx, canFunc := context.WithCancel(context.Background())

	var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.101:2379"}
	cli, err := clientv3.New(clientv3.Config{
		Context:     cancelCtx,
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatalln(err)
	}
	defer cli.Close()
	// 建立一個Lease service client
	leaseCli := clientv3.NewLease(cli)
	var leaseGrantRes *clientv3.LeaseGrantResponse
	// 申請一個5秒的租約
	if leaseGrantRes, err = leaseCli.Grant(cancelCtx, 5); err != nil {
		fmt.Println(err)
	}

	// 取得租約ID
	leaseID := leaseGrantRes.ID

	kv := clientv3.NewKV(cli)

	// 新增一個KV, 並與租約做關聯, 租約5s後到期
	if _, err = kv.Put(context.TODO(), "ithome", "Day1", clientv3.WithLease(leaseID)); err != nil {
		fmt.Println(err)
		return
	}

	timer := time.NewTimer(1 * time.Second)
	go func(timer *time.Timer) {
		for {
			select {
			case <-timer.C:
				// 每秒改變一下該KV
				value := "Day" + strconv.Itoa(time.Now().Second()%31)
				fmt.Printf("%s\n", value)
				kv.Put(cancelCtx, "ithome", value, clientv3.WithLease(leaseID))
			}
			timer.Reset(1 * time.Second)
		}
	}(timer)

	// 建立一個ithome的Watcher
	watcher := clientv3.NewWatcher(cli)

	// 指定開始監看ithome
	watchRespChan := watcher.Watch(cancelCtx, "ithome")

	// 處理監聽到的KV變化event
	for watchResp := range watchRespChan {
		for _, event := range watchResp.Events {
			switch event.Type {
			case mvccpb.PUT:
				fmt.Println("修改成:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
			case mvccpb.DELETE:
				fmt.Println("刪除了", "Revision:", event.Kv.ModRevision)
				goto END
			}
		}
	}
END:
	canFunc()
	fmt.Println("Service Terminate")

}
/*
Day14
修改成: Day14 Revision: 1954735 1954736
Day15
修改成: Day15 Revision: 1954735 1954737
Day16
修改成: Day16 Revision: 1954735 1954738
Day17
修改成: Day17 Revision: 1954735 1954739
Day18
修改成: Day18 Revision: 1954735 1954740
刪除了 Revision: 1954741
Service Terminate
*/

Demo2 :
透過WithPrefix指定滿足prefix前綴的key做監看

package main

import (
	"context"
	"fmt"
	"log"
	"strconv"
	"time"

	"github.com/coreos/etcd/mvcc/mvccpb"
	"go.etcd.io/etcd/clientv3"
)

var (
	dialTimeout      = 10 * time.Second
	requestTimeout   = 5 * time.Second
	keepAliveTimeout = 5 * time.Second
)

func main() {
	cancelCtx, canFunc := context.WithCancel(context.Background())

	var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.101:2379"}
	cli, err := clientv3.New(clientv3.Config{
		Context:     cancelCtx,
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatalln(err)
	}
	defer cli.Close()
	// 建立一個Lease service client
	leaseCli := clientv3.NewLease(cli)
	var leaseGrantRes *clientv3.LeaseGrantResponse
	// 申請一個5秒的租約
	if leaseGrantRes, err = leaseCli.Grant(cancelCtx, 5); err != nil {
		fmt.Println(err)
	}

	// 取得租約ID
	leaseID := leaseGrantRes.ID

	kv := clientv3.NewKV(cli)

	timer := time.NewTimer(1 * time.Second)
	go func(timer *time.Timer) {
		for {
			select {
			case <-timer.C:
				// 每秒改變一下該KV
				key := "ithome"
				value := "Day" + strconv.Itoa(time.Now().Second()%31)
				fmt.Printf("%s\n", value)
				// 這裡Key改成ithomeDay**
				kv.Put(cancelCtx, key+value, value, clientv3.WithLease(leaseID))

			}
			timer.Reset(1 * time.Second)
		}
	}(timer)

	// 建立一個ithome的Watcher
	watcher := clientv3.NewWatcher(cli)

	// 指定開始監看ithome
	watchRespChan := watcher.Watch(cancelCtx, "ithome", clientv3.WithPrefix())

	// 處理監聽到的KV變化event
	for watchResp := range watchRespChan {
		for _, event := range watchResp.Events {
			switch event.Type {
			case mvccpb.PUT:
				fmt.Println("修改成:", string(event.Kv.Key), string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
			case mvccpb.DELETE:
				fmt.Println("刪除了", "Revision:", event.Kv.ModRevision)
				goto END
			}
		}
	}
END:
	canFunc()
	fmt.Println("Service Terminate")

}

/*
Day24
修改成: ithomeDay24 Day24 Revision: 1954825 1954825
Day25
修改成: ithomeDay25 Day25 Revision: 1954826 1954826
Day26
修改成: ithomeDay26 Day26 Revision: 1954827 1954827
Day27
修改成: ithomeDay27 Day27 Revision: 1954828 1954828
Day28
修改成: ithomeDay28 Day28 Revision: 1954829 1954829
刪除了 Revision: 1954830
Service Terminate
*/

Demo 3:
透過WithRange監聽一個範圍的Key,該範圍是["KeyStart", "KeyEnd").
舉例["Key1", "Key3") ; 就是包含Key1 & Key2, 但是不包含Key3之後的.
這裡Key有ithomeDay0-ithomeDay3;
但我們只監聽[ithomeDay0, ithomeDay3); 就是ithomeDay0-ithomeDay02

package main

import (
	"context"
	"fmt"
	"log"
	"strconv"
	"time"

	"github.com/coreos/etcd/mvcc/mvccpb"
	"go.etcd.io/etcd/clientv3"
)

var (
	dialTimeout      = 10 * time.Second
	requestTimeout   = 5 * time.Second
	keepAliveTimeout = 5 * time.Second
)

func main() {
	cancelCtx, canFunc := context.WithCancel(context.Background())

	var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.101:2379"}
	cli, err := clientv3.New(clientv3.Config{
		Context:     cancelCtx,
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatalln(err)
	}
	defer cli.Close()
	// 建立一個Lease service client
	leaseCli := clientv3.NewLease(cli)
	var leaseGrantRes *clientv3.LeaseGrantResponse
	// 申請一個5秒的租約
	if leaseGrantRes, err = leaseCli.Grant(cancelCtx, 5); err != nil {
		fmt.Println(err)
	}

	// 取得租約ID
	leaseID := leaseGrantRes.ID

	kv := clientv3.NewKV(cli)

	timer := time.NewTimer(1 * time.Second)
	go func(timer *time.Timer) {
		for {
			select {
			case <-timer.C:
				// 每秒改變一下該KV
				key := "ithome"
                // 這裡改成對3取餘數
				value := "Day" + strconv.Itoa(time.Now().Second()%4)
				fmt.Printf("%s\n", value)
				// 這裡Key改成ithomeDay**
				kv.Put(cancelCtx, key+value, value, clientv3.WithLease(leaseID))

			}
			timer.Reset(1 * time.Second)
		}
	}(timer)

	// 建立一個ithome的Watcher
	watcher := clientv3.NewWatcher(cli)

// 指定開始監看[ithomeDay0, ithomeDay3)
	watchRespChan := watcher.Watch(cancelCtx, "ithomeDay0", clientv3.WithRange("ithomeDay3"))

	// 處理監聽到的KV變化event
	for watchResp := range watchRespChan {
		for _, event := range watchResp.Events {
			switch event.Type {
			case mvccpb.PUT:
				fmt.Println("修改成:", string(event.Kv.Key), string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
			case mvccpb.DELETE:
				fmt.Println("刪除了", "Revision:", event.Kv.ModRevision)
				goto END
			}
		}
	}
END:
	canFunc()
	fmt.Println("Service Terminate")
}
/*
Day3
Day0
修改成: ithomeDay0 Day0 Revision: 1954860 1954860
Day1
修改成: ithomeDay1 Day1 Revision: 1954861 1954861
Day2
修改成: ithomeDay2 Day2 Revision: 1954862 1954862
Day3
刪除了 Revision: 1954864
Service Terminate
*/

這裡能參考更多OpOption


上一篇
etcd Lease租約
下一篇
etcd Txn事務
系列文
服務開發雜談33
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言