iT邦幫忙

第 12 屆 iThome 鐵人賽

DAY 16
0
Software Development

服務開發雜談系列 第 16

etcd 連線基本設定與KV CRUD

  • 分享至 

  • xImage
  •  

etcd Client V3基本服務

ectd在V3之後, 都透過rRPC+Protobuf來跟etcd Server做溝通調用.
所以在etcd/etcdserver/etcdserverpb/rpc.proto底下定義了6類gRPC service

  1. KV : 負責KV操作, 增刪K-V, 修改Value, 觸發compact
  2. Watch : 負責對指定的Key增加Watch(觀察者模式)
  3. Lease : 負責租約的操作, 新建和回收租約, 續約.
  4. Cluster : 集群節點的管理操作, 增刪集群的節點, 或是查詢節點的狀態
  5. Maintenance : 提供了Alarm功能, 獲取節點的狀態, 進行碎片整理, 取得快照, 轉移Leader
  6. Auth : 負責權限相關的操作, 添加用戶or角色,為用戶分配角色, 啟/停用身份驗證.

etcd clientV3則是對這些這些服務又定義了Interface並且實做了具體類別.

kv.go

type KV interface {
	Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
	Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

	Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
	Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
	Do(ctx context.Context, op Op) (OpResponse, error)
	Txn(ctx context.Context) Txn
}
// Client provides and manages an etcd v3 client session.
type Client struct {
	Cluster
	KV
	Lease
	Watcher
	Auth
	Maintenance

	conn *grpc.ClientConn

	cfg           Config
	creds         grpccredentials.TransportCredentials
	resolverGroup *endpoint.ResolverGroup
	mu            *sync.RWMutex

	ctx    context.Context
	cancel context.CancelFunc

	// Username is a user name for authentication.
	Username string
	// Password is a password for authentication.
	Password        string
	authTokenBundle credentials.Bundle

	callOpts []grpc.CallOption

	lg *zap.Logger
}

可以看到Client結構裡面包含了gRPC的conn, 該conn還實現了gRPC RoundRobin的負載均衡策略.

初始化etcd Server Client

利用第12天的Docker來連線
先建立Config物件

	var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.101:2379"}
	cli, err := clientv3.New(clientv3.Config{
		Context:     cancelCtx,
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})

這裡只有簡單的把etcd節點(多個節點用,併成字串陣列)跟建立grpc連線的timeout,如果超時都沒連線成功就返回error.
還有context,這能用來取消grpc的dial-out連線.
也能設定Username,Password,如果etcd的Auth有配置帳密的話.
或者透過TLS給憑證.

透過clientv3.New取得Client實例.
它會呼叫一個私有方法newClient()

// New creates a new etcdv3 client from a given configuration.
func New(cfg Config) (*Client, error) {
	if len(cfg.Endpoints) == 0 {
		return nil, ErrNoAvailableEndpoints
	}

	return newClient(&cfg)
}
func newClient(cfg *Config) (*Client, error) {
    // 略
	conn, err := client.dialWithBalancer(dialEndpoint, grpc.WithBalancerName(roundRobinBalancerName))
	if err != nil {
		client.cancel()
		client.resolverGroup.Close()
		return nil, err
	}
	// TODO: With the old grpc balancer interface, we waited until the dial timeout
	// for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface?
	client.conn = conn

	client.Cluster = NewCluster(client)
	client.KV = NewKV(client)
	client.Lease = NewLease(client)
	client.Watcher = NewWatcher(client)
	client.Auth = NewAuth(client)
	client.Maintenance = NewMaintenance(client)
    // 略
	return client, nil
}

可以看到就是這段實做了負載均衡.
然後NewKV就是建立了KV gRPC Client的實例.
其他gRPC Client也是以一樣的方式建立, 並組合進去client物件後返回.
只要初始化連線上了, 之後就不需要關心重連問題, client內部會自動重連.

建立KV對象

對etcdClient物件, 呼叫NewKV, 就會得到一個KV service的實現;
這裡是獲得一個會處理錯誤retry的KV對象.

func NewKV(c *Client) KV {
	api := &kv{remote: RetryKVClient(c)}
	if c != nil {
		api.callOpts = c.callOpts
	}
	return api
}

error retry主要的代碼在此
概念就是對grpc的Unary或Stream配置Interceptor攔截器.

etcd Client KV基本操作-Get、Put、Delete

來玩看看CRUD, 順便看看第11天介紹的Revision

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"time"

	"go.etcd.io/etcd/clientv3"
)

var (
	dialTimeout    = 10 * time.Second
	requestTimeout = 3 * time.Second
)

func main() {
	cancelCtx, canFunc := context.WithCancel(context.Background())
	timeOutCtx, _ := context.WithTimeout(context.Background(), requestTimeout)
	var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.101:2379"}
	cli, err := clientv3.New(clientv3.Config{
		Context:     cancelCtx,
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatalln(err)
	}
	defer cli.Close()

	kv := clientv3.NewKV(cli)
	singleKeyWithRevision(timeOutCtx, kv)

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)

	<-c
	canFunc()
	fmt.Println("Service Terminate")
}

func singleKeyWithRevision(ctx context.Context, kv clientv3.KV) {
	fmt.Println("SingleKeyWithRevision()")
	key := "demo"
	// 刪除之前遺留的Key
	kv.Delete(ctx, key, clientv3.WithPrefix())

	// 新增一組KV {"Demo": "444"}
	pr, _ := kv.Put(ctx, key, time.Now().UTC().String())
	rev := pr.Header.Revision
	fmt.Println("Revision:", rev)

	gr, _ := kv.Get(ctx, key)
	fmt.Println("Value: ", string(gr.Kvs[0].Value), "Revision: ", gr.Header.Revision)
    
    // 透過Put修改現有的Key, 會建立新的revision
	kv.Put(ctx, key, time.Now().UTC().String())

	gr, _ = kv.Get(ctx, key)
	fmt.Println("Value: ", string(gr.Kvs[0].Value), "Revision: ", gr.Header.Revision)

	// 取得上一版的revision
	gr, _ = kv.Get(ctx, key, clientv3.WithRev(rev))
	fmt.Println("Value: ", string(gr.Kvs[0].Value), "Revision: ", gr.Header.Revision)
}

/*
SingleKeyWithRevision()
Revision: 1954477
Value:  2020-09-06 15:47:16.398487325 +0000 UTC Revision:  1954477
Value:  2020-09-06 15:47:16.400697144 +0000 UTC Revision:  1954478
Value:  2020-09-06 15:47:16.398487325 +0000 UTC Revision:  1954478
*/

從上面結果能看出, 每次修改都會保存之前的revision與資料.

參考:
etcd/clientv3 Go Doc


上一篇
etcd Raft淺談(下) 日誌複製
下一篇
etcd Lease租約
系列文
服務開發雜談33
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言