昨天有提到etcd Service提供了六類服務, 其中包含了今天的主題Lease
所以就來玩一etcd裡面的Lease
Lease租約, 主要用來實做Key的定時刪除
, 跟Redis的Expire類似.
Lease提供了以下接口.
type Lease interface {
// 獲取新租約
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
// 撤銷一個租約
Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
// 根據租約ID取得租約訊息
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
// 列出所有租約
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
// 自動續約, 續約訊息會放到返回的參數, 透過[唯讀channel](https://ithelp.ithome.com.tw/articles/10218923)內取得.
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
// 續約一次
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
// 關閉該client建立的所有租約
Close() error
}
LeaseGrantResponse結構, 當獲取租約成功時, 會返回一個租約ID還有TTL, 租成功當下就開始算囉.
// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
type LeaseGrantResponse struct {
*pb.ResponseHeader
ID LeaseID
TTL int64
Error string
}
Demo :
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"time"
"go.etcd.io/etcd/clientv3"
)
var (
dialTimeout = 10 * time.Second
requestTimeout = 5 * time.Second
keepAliveTimeout = 5 * time.Second
)
func main() {
cancelCtx, canFunc := context.WithCancel(context.Background())
var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.101:2379"}
cli, err := clientv3.New(clientv3.Config{
Context: cancelCtx,
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatalln(err)
}
defer cli.Close()
// 建立一個Lease service client
leaseCli := clientv3.NewLease(cli)
var leaseGrantRes *clientv3.LeaseGrantResponse
// 申請一個10秒的租約, Grant的第二個參數是second為單位
if leaseGrantRes, err = leaseCli.Grant(cancelCtx, 10); err != nil {
fmt.Println(err)
}
// 取得租約ID
leaseID := leaseGrantRes.ID
kv := clientv3.NewKV(cli)
var putResp *clientv3.PutResponse
// 新增一個KV, 並與租約做關聯, 租約10S後到期
if putResp, err = kv.Put(context.TODO(), "ithome", "", clientv3.WithLease(leaseID)); err != nil {
fmt.Println(err)
return
}
startTime := time.Now()
fmt.Printf("KV寫入成功:%d, time:%v\n", putResp.Header.Revision, startTime)
var keepResp *clientv3.LeaseKeepAliveResponse
timeOutCtx, _ := context.WithTimeout(context.Background(), keepAliveTimeout)
var keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
if keepAliveChan, err = leaseCli.KeepAlive(timeOutCtx, leaseID); err != nil {
fmt.Println(err)
}
// 處理續約
go func() {
for {
select {
case keepResp = <-keepAliveChan:
if keepAliveChan == nil {
fmt.Println("租約到期失效了")
return
} else {
// 每秒會自動續租5次
if keepResp != nil {
fmt.Printf("收到自動續約回應: Id:%d, TTL:%d, time:%v\n", keepResp.ID, keepResp.TTL, time.Now())
}
}
}
}
}()
var getResp *clientv3.GetResponse
// 定期檢查KV的狀況
for {
if getResp, err = kv.Get(cancelCtx, "ithome"); err != nil {
fmt.Println(err)
return
}
if getResp.Count == 0 {
endTime := time.Now()
fmt.Printf("KV過期了, time:%v\n", endTime.Sub(startTime).Seconds())
break
}
fmt.Printf("KV還沒到期:%v, time:%v\n", getResp.Kvs, time.Now())
time.Sleep(1 * time.Second)
}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
canFunc()
fmt.Println("Service Terminate")
}
建立了一個10秒的租約, 套用在ithome
這Key上.
每次會自動續租5秒, 但續租的線程5秒後就timeout不執行了.
所以這Key最多就是存在著續約的5秒+本身的生命週期10秒約莫15秒的時間.
如果KeepAlive帶入的是不會被取消的context, 那就會一直續租.
看看KeepAlive的實現
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
// 建立channel 來接收keep alive response
ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
l.mu.Lock()
// ensure that recvKeepAliveLoop is still running
select {
case <-l.donec:
// 當channel被關閉, 接收keep alive response的ch也會被跟著關閉
err := l.loopErr
l.mu.Unlock()
close(ch)
return ch, ErrKeepAliveHalted{Reason: err}
default:
}
// 建立LeaseID對應的keepAlive structure, 並且把context跟要返回的channel帶入建構式
ka, ok := l.keepAlives[id]
if !ok {
// create fresh keep alive
ka = &keepAlive{
chs: []chan<- *LeaseKeepAliveResponse{ch},
ctxs: []context.Context{ctx},
deadline: time.Now().Add(l.firstKeepAliveTimeout),
nextKeepAlive: time.Now(),
donec: make(chan struct{}),
}
l.keepAlives[id] = ka
} else {
// add channel and context to existing keep alive
ka.ctxs = append(ka.ctxs, ctx)
ka.chs = append(ka.chs, ch)
}
l.mu.Unlock()
// 建立goroutine, 會持續執行recvKeepAliveLoop(), 來跟etcd server續租該租約
go l.keepAliveCtxCloser(ctx, id, ka.donec)
l.firstKeepAliveOnce.Do(func() {
go l.recvKeepAliveLoop()
go l.deadlineLoop()
})
return ch, nil
}
Demo 2 :
租約Grant成功租到,但沒在TTL內綁定在Key上?
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"time"
"go.etcd.io/etcd/clientv3"
)
var (
dialTimeout = 10 * time.Second
requestTimeout = 5 * time.Second
keepAliveTimeout = 5 * time.Second
)
func main() {
cancelCtx, canFunc := context.WithCancel(context.Background())
var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.101:2379"}
cli, err := clientv3.New(clientv3.Config{
Context: cancelCtx,
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatalln(err)
}
defer cli.Close()
// 建立一個Lease service client
leaseCli := clientv3.NewLease(cli)
var leaseGrantRes *clientv3.LeaseGrantResponse
// 申請一個2秒的租約
if leaseGrantRes, err = leaseCli.Grant(cancelCtx, 2); err != nil {
fmt.Println(err)
}
// 睡個4秒先
time.Sleep(4 * time.Second)
// 取得租約ID
leaseID := leaseGrantRes.ID
kv := clientv3.NewKV(cli)
var putResp *clientv3.PutResponse
// 新增一個KV, 並與租約做關聯, 租約10S後到期
if putResp, err = kv.Put(context.TODO(), "ithome", "", clientv3.WithLease(leaseID)); err != nil {
fmt.Println(err)
return
}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
canFunc()
fmt.Println("Service Terminate")
}
/*
{"level":"warn","ts":"2020-09-23T01:02:23.164+0800","caller":"clientv3/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"endpoint://client-58a4a79a-0aa8-4cb3-9187-00806ce6473e/172.16.238.100:2379","attempt":0,"error":"rpc error: code = NotFound desc = etcdserver: requested lease not found"}
etcdserver: requested lease not found
*/
當Lease綁定給KV, 並且KV把請求發送給etcd server時, 會報錯誤, 因為租約已經過期不在了.
當有了租約這概念, 再來來介紹Watch監看跟Txn事務.
這些技能組合在一起就能實做, 服務發現跟分散式鎖的實現了.