iT邦幫忙

第 12 屆 iT 邦幫忙鐵人賽

DAY 17
1

昨天有提到etcd Service提供了六類服務, 其中包含了今天的主題Lease
所以就來玩一etcd裡面的Lease

Lease

Lease租約, 主要用來實做Key的定時刪除, 跟Redis的Expire類似.
Lease提供了以下接口.

type Lease interface {
    // 獲取新租約
	Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)

    // 撤銷一個租約
	Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)

    // 根據租約ID取得租約訊息
	TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)

    // 列出所有租約
	Leases(ctx context.Context) (*LeaseLeasesResponse, error)

    // 自動續約, 續約訊息會放到返回的參數, 透過[唯讀channel](https://ithelp.ithome.com.tw/articles/10218923)內取得.
	KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)

    // 續約一次
	KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
    
    // 關閉該client建立的所有租約
	Close() error
}

LeaseGrantResponse結構, 當獲取租約成功時, 會返回一個租約ID還有TTL, 租成功當下就開始算囉.

// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
type LeaseGrantResponse struct {
	*pb.ResponseHeader
	ID    LeaseID
	TTL   int64
	Error string
}

Demo :

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"time"

	"go.etcd.io/etcd/clientv3"
)

var (
	dialTimeout      = 10 * time.Second
	requestTimeout   = 5 * time.Second
	keepAliveTimeout = 5 * time.Second
)

func main() {
	cancelCtx, canFunc := context.WithCancel(context.Background())

	var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.101:2379"}
	cli, err := clientv3.New(clientv3.Config{
		Context:     cancelCtx,
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatalln(err)
	}
	defer cli.Close()
	// 建立一個Lease service client
	leaseCli := clientv3.NewLease(cli)

	var leaseGrantRes *clientv3.LeaseGrantResponse
	// 申請一個10秒的租約, Grant的第二個參數是second為單位
	if leaseGrantRes, err = leaseCli.Grant(cancelCtx, 10); err != nil {
		fmt.Println(err)
	}

	// 取得租約ID
	leaseID := leaseGrantRes.ID

	kv := clientv3.NewKV(cli)

	var putResp *clientv3.PutResponse
	// 新增一個KV, 並與租約做關聯, 租約10S後到期
	if putResp, err = kv.Put(context.TODO(), "ithome", "", clientv3.WithLease(leaseID)); err != nil {
		fmt.Println(err)
		return
	}

	startTime := time.Now()
	fmt.Printf("KV寫入成功:%d, time:%v\n", putResp.Header.Revision, startTime)

	var keepResp *clientv3.LeaseKeepAliveResponse
	timeOutCtx, _ := context.WithTimeout(context.Background(), keepAliveTimeout)
	var keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
	if keepAliveChan, err = leaseCli.KeepAlive(timeOutCtx, leaseID); err != nil {
		fmt.Println(err)
	}
	// 處理續約
	go func() {
		for {
			select {
			case keepResp = <-keepAliveChan:
				if keepAliveChan == nil {
					fmt.Println("租約到期失效了")
					return
				} else {
					// 每秒會自動續租5次
					if keepResp != nil {
						fmt.Printf("收到自動續約回應: Id:%d, TTL:%d, time:%v\n", keepResp.ID, keepResp.TTL, time.Now())
					}

				}
			}
		}
	}()

	var getResp *clientv3.GetResponse
	//  定期檢查KV的狀況
	for {
		if getResp, err = kv.Get(cancelCtx, "ithome"); err != nil {
			fmt.Println(err)
			return
		}
		if getResp.Count == 0 {
			endTime := time.Now()
			fmt.Printf("KV過期了, time:%v\n", endTime.Sub(startTime).Seconds())
			break
		}
		fmt.Printf("KV還沒到期:%v, time:%v\n", getResp.Kvs, time.Now())
		time.Sleep(1 * time.Second)
	}

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)

	<-c
	canFunc()
	fmt.Println("Service Terminate")
}

建立了一個10秒的租約, 套用在ithome這Key上.
每次會自動續租5秒, 但續租的線程5秒後就timeout不執行了.
所以這Key最多就是存在著續約的5秒+本身的生命週期10秒約莫15秒的時間.
如果KeepAlive帶入的是不會被取消的context, 那就會一直續租.

看看KeepAlive的實現

func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
    // 建立channel 來接收keep alive response
	ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)

	l.mu.Lock()
	// ensure that recvKeepAliveLoop is still running
	select {
	case <-l.donec:
        // 當channel被關閉, 接收keep alive response的ch也會被跟著關閉
		err := l.loopErr
		l.mu.Unlock()
		close(ch)
		return ch, ErrKeepAliveHalted{Reason: err}
	default:
	}
    // 建立LeaseID對應的keepAlive structure, 並且把context跟要返回的channel帶入建構式
	ka, ok := l.keepAlives[id]
	if !ok {
		// create fresh keep alive
		ka = &keepAlive{
			chs:           []chan<- *LeaseKeepAliveResponse{ch},
			ctxs:          []context.Context{ctx},
			deadline:      time.Now().Add(l.firstKeepAliveTimeout),
			nextKeepAlive: time.Now(),
			donec:         make(chan struct{}),
		}
		l.keepAlives[id] = ka
	} else {
		// add channel and context to existing keep alive
		ka.ctxs = append(ka.ctxs, ctx)
		ka.chs = append(ka.chs, ch)
	}
	l.mu.Unlock()

// 建立goroutine, 會持續執行recvKeepAliveLoop(), 來跟etcd server續租該租約
	go l.keepAliveCtxCloser(ctx, id, ka.donec)
	l.firstKeepAliveOnce.Do(func() {
		go l.recvKeepAliveLoop()
		go l.deadlineLoop()
	})

	return ch, nil
}

Demo 2 :
租約Grant成功租到,但沒在TTL內綁定在Key上?

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"time"

	"go.etcd.io/etcd/clientv3"
)

var (
	dialTimeout      = 10 * time.Second
	requestTimeout   = 5 * time.Second
	keepAliveTimeout = 5 * time.Second
)

func main() {
	cancelCtx, canFunc := context.WithCancel(context.Background())

	var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.101:2379"}
	cli, err := clientv3.New(clientv3.Config{
		Context:     cancelCtx,
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatalln(err)
	}
	defer cli.Close()
	// 建立一個Lease service client
	leaseCli := clientv3.NewLease(cli)

	var leaseGrantRes *clientv3.LeaseGrantResponse
	// 申請一個2秒的租約
	if leaseGrantRes, err = leaseCli.Grant(cancelCtx, 2); err != nil {
		fmt.Println(err)
	}
    // 睡個4秒先
	time.Sleep(4 * time.Second)

	// 取得租約ID
	leaseID := leaseGrantRes.ID

	kv := clientv3.NewKV(cli)

	var putResp *clientv3.PutResponse
	// 新增一個KV, 並與租約做關聯, 租約10S後到期
	if putResp, err = kv.Put(context.TODO(), "ithome", "", clientv3.WithLease(leaseID)); err != nil {
		fmt.Println(err)
		return
	}

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)

	<-c
	canFunc()
	fmt.Println("Service Terminate")
}
/*
{"level":"warn","ts":"2020-09-23T01:02:23.164+0800","caller":"clientv3/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"endpoint://client-58a4a79a-0aa8-4cb3-9187-00806ce6473e/172.16.238.100:2379","attempt":0,"error":"rpc error: code = NotFound desc = etcdserver: requested lease not found"}
etcdserver: requested lease not found
*/

當Lease綁定給KV, 並且KV把請求發送給etcd server時, 會報錯誤, 因為租約已經過期不在了.

當有了租約這概念, 再來來介紹Watch監看跟Txn事務.
這些技能組合在一起就能實做, 服務發現跟分散式鎖的實現了.


上一篇
etcd 連線基本設定與KV CRUD
下一篇
etcd Watch監看
系列文
服務開發雜談33

尚未有邦友留言

立即登入留言