第11天提到服務註冊與發現的一些好處.
第17天講了租約, 可以做Key的定時刪除.
第18天講了怎透過etcd, 對key來實做觀察者模式跟通知.
來透過etcd介紹的東西來實做服務註冊
與服務發現
簡單實現.
服務實例自動把自己的訊息註冊到服務註冊跟發現中心,
執行過程中, 定期透過heartbeat或keepalive方式向服務註冊與發現中心匯報自身服務狀態.
服務退出時, 也可以主動向服務註冊與發現中心, 取消自己在註冊表中的資訊.
服務發現是指服務實例向服務註冊與發現中心獲得其它服務實例的訊息.
便於後面的遠程服務調用.
Server實例, 會進行服務註冊
package main
import (
"context"
"log"
"time"
"math/rand"
"go.etcd.io/etcd/clientv3"
)
type ServiceRegister struct {
cli *clientv3.Client
leaseID clientv3.LeaseID
//續約Chan Keepalieve
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
key string // 服務名稱
val string // 服務本身資訊
ctx context.Context
}
func NewServiceRegister(ctx context.Context, endpoints []string, key, val string, lease int64) (*ServiceRegister, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
ser := &ServiceRegister{
cli: cli,
key: key,
val: val,
ctx: ctx,
}
// 設定Key+租約
if err := ser.putKeyWithLease(lease); err != nil {
return nil, err
}
return ser, nil
}
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
//取得1個秒數是lease值的租約
resp, err := s.cli.Grant(s.ctx, lease)
if err != nil {
return err
}
// 服務註冊, key是自身服務名稱, value是服務訊息
_, err = s.cli.Put(s.ctx, s.key, s.val, clientv3.WithLease(resp.ID))
if err != nil {
return err
}
// 設定自動續約(heartbeat)
leaseRespChan, err := s.cli.KeepAlive(s.ctx, resp.ID)
if err != nil {
return err
}
s.leaseID = resp.ID
s.keepAliveChan = leaseRespChan
log.Printf("Register service success, key:%s val:%s lease:%d", s.key, s.val, s.leaseID)
return nil
}
// 處理續租
func (s *ServiceRegister) ListenLeaseRespChan() {
for {
select {
case leaseKeepResp := <-s.keepAliveChan:
log.Println("續租成功", leaseKeepResp)
case <-s.ctx.Done():
log.Println("停止續租")
return
}
}
}
// 透過Revoke會取消該client連線所開啟的所有租約
func (s *ServiceRegister) Close() error {
if _, err := s.cli.Revoke(s.ctx, s.leaseID); err != nil {
return err
}
log.Println("取消租約")
return s.cli.Close()
}
func main() {
cancelCtx, canFunc := context.WithCancel(context.Background())
var leaseTTL int64 = 5
var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.102:2379"}
ser, err := NewServiceRegister(cancelCtx, endpoints, "/web/node2", "localhost:8000", leaseTTL)
if err != nil {
log.Fatalln(err)
}
//啟動一個gourtine處理續租
go ser.ListenLeaseRespChan()
// 亂數時間觸發服務取消註冊或是服務中斷
rand.Seed(time.Now().UnixNano())
select {
case <-time.After(time.Duration(rand.Intn(20-5+1)+5) * time.Second):
log.Println("呼叫Close()")
ser.Close()
case <-time.After(time.Duration(rand.Intn(20-5+1)+5) * time.Second):
log.Println("cancel context被觸發")
canFunc()
}
log.Println("Service Terminate")
}
client實例, 會進行服務發現, 並定期列印出有發現的遠端server清單.
package main
import (
"context"
"log"
"os"
"os/signal"
"sync"
"time"
"github.com/coreos/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/clientv3"
)
var (
dialTimeout = 5 * time.Second
)
type ServiceDiscovery struct {
cli *clientv3.Client
serverList map[string]string // 服務列表
lock sync.Mutex
ctx context.Context
}
func NewServiceDiscovery(ctx context.Context, endpoints []string) *ServiceDiscovery {
cli, err := clientv3.New(clientv3.Config{
Context: ctx,
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
return &ServiceDiscovery{
cli: cli,
serverList: make(map[string]string),
ctx: ctx,
}
}
// 初始化遠端調用服務列表, 以及監聽它們的變化
func (s *ServiceDiscovery) WatchService(prefix string) error {
// 根據prefix來取得目前所有的key
resp, err := s.cli.Get(s.ctx, prefix, clientv3.WithPrefix())
if err != nil {
return err
}
for _, ev := range resp.Kvs {
s.SetServiceList(string(ev.Key), string(ev.Value))
}
// 監視滿足該prfix的所有Key
go s.watcher(prefix)
return nil
}
//watcher 监听前缀
func (s *ServiceDiscovery) watcher(prefix string) {
rch := s.cli.Watch(s.ctx, prefix, clientv3.WithPrefix())
log.Printf("watching prefix:%s now...", prefix)
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT: //新增或者該服務節點的端口或IP更動, 將之新增進遠端service list
s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
case mvccpb.DELETE: // 刪除節點, 並更新遠端service list
s.DelServiceList(string(ev.Kv.Key))
}
}
}
}
func (s *ServiceDiscovery) SetServiceList(key, val string) {
// 這裡加上mutex, 是因為map只要多個線程同時操作, 會發生panic, 也可以用sync.Map
s.lock.Lock()
defer s.lock.Unlock()
s.serverList[key] = string(val)
log.Println("put key :", key, "val:", val)
}
func (s *ServiceDiscovery) DelServiceList(key string) {
// 這裡加上mutex, 是因為map只要多個線程同時操作, 會發生panic, 也可以用sync.Map
s.lock.Lock()
defer s.lock.Unlock()
delete(s.serverList, key)
log.Println("del key:", key)
}
// 獲取目前的遠端service list
func (s *ServiceDiscovery) GetServices() []string {
s.lock.Lock()
defer s.lock.Unlock()
addrs := make([]string, 0)
for _, v := range s.serverList {
addrs = append(addrs, v)
}
return addrs
}
// 關閉etcd client連線
func (s *ServiceDiscovery) Close() error {
return s.cli.Close()
}
func main() {
cancelCtx, canFunc := context.WithCancel(context.Background())
var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.101:2379"}
ser := NewServiceDiscovery(cancelCtx, endpoints)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
defer ser.Close()
//ser.WatchService("/APIinventory/")
// 監聽所有的gRPCaccount服務
ser.WatchService("/gRPCaccount/")
for {
select {
case <-time.Tick(5 * time.Second):
log.Println(ser.GetServices())
case <-c:
canFunc()
log.Println("Service Terminate")
return
}
}
}
服務實例, 啟動2次, 每次都會進行服務註冊跟續約. 來看看效果.
當服務註冊成功後, client會立刻被通知有監看對象被新增了.
當服務撤銷註冊後, client會立刻被通知有監看對象下線了.
服務註冊與發現中心, 很方便我們在微服務架構下, 來進行有效的服務管理.
服務註冊有助於對微服務架構中的服務實例做管理跟通知.
服務發現使得微服務在動態部屬的情況中, 能夠發起有效的服務調用.
這種發現
機制也能實做在監看某個設定檔內容.
監看通知不難, 難的是程式怎熱抽換一些實例.