etcd提供了事務實現.
可以實現多個Key的原子(Atomic)操作.
MySQL主要是透過lock或者是MVCC+Next-Key Lock這些機制來實現.
但etcd不同, 主要是基於CAS(Compare and Swap)方式(也是樂觀鎖的一種實現方式)來實現.
因為有ModRevision可以比較來做實現.
Txn提供了以下的接口
type Txn interface {
If(cs ...Cmp) Txn
Then(ops ...Op) Txn
Else(ops ...Op) Txn
Commit() (*TxnResponse, error)
}
對就只有這樣If-Then-Else, 最後就commit.
txn().If(cond1, cond2, ...).Then(op1, op2,...).Else(op1', op2',...).Commit)()
如果If的條件全為真, 則會執行Then中的操作, 且整個事務返回true;
反之執行Else內的操作.
Demo1:
演示一下用戶相互轉帳, 做事務交易的場景.
當userA轉100$給userB, 只要餘額不夠就取消交易.
餘額足夠就執行事務.
一開始userFrom有200$, userTo有10$, 一次轉帳100$過去.
依序轉帳3次
package main
import (
"context"
"encoding/binary"
"errors"
"fmt"
"log"
"strconv"
"time"
"go.etcd.io/etcd/clientv3"
)
var (
dialTimeout = 10 * time.Second
requestTimeout = 5 * time.Second
keepAliveTimeout = 5 * time.Second
)
func main() {
cancelCtx, canFunc := context.WithCancel(context.Background())
var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.101:2379"}
cli, err := clientv3.New(clientv3.Config{
Context: cancelCtx,
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatalln(err)
}
defer cli.Close()
kv := clientv3.NewKV(cli)
getUsersBalance(cancelCtx, kv, "user1", "user2")
updateUserBalance(cancelCtx, kv, "user1", "user2", 100)
getUsersBalance(cancelCtx, kv, "user1", "user2")
canFunc()
fmt.Println("Service Terminate")
}
func getUsersBalance(ctx context.Context, cli clientv3.KV, userFrom, userTo string) {
getResp, err := cli.Txn(ctx).Then(clientv3.OpGet(userFrom), clientv3.OpGet(userTo)).Commit()
if err != nil {
return
}
fromUserKv := getResp.Responses[0].GetResponseRange().Kvs[0]
toUserKv := getResp.Responses[1].GetResponseRange().Kvs[0]
userFromBalance, _ := toUint64(fromUserKv.Value)
userToBalance, _ := toUint64(toUserKv.Value)
fmt.Printf("userFrom:%d, userTo:%d\n", userFromBalance, userToBalance)
}
func updateUserBalance(ctx context.Context, cli clientv3.KV, userFrom, userTo string, amount uint64) (bool, error) {
getResp, err := cli.Txn(ctx).Then(clientv3.OpGet(userFrom), clientv3.OpGet(userTo)).Commit()
if err != nil {
return false, err
}
fromUserKv := getResp.Responses[0].GetResponseRange().Kvs[0]
toUserKv := getResp.Responses[1].GetResponseRange().Kvs[0]
userFromBalance, _ := toUint64(fromUserKv.Value)
userToBalance, _ := toUint64(toUserKv.Value)
if userFromBalance < uint64(amount) {
fmt.Println("insufficient balance")
return false, errors.New("insufficient balance")
}
fmt.Println("start transaction")
txn := cli.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(userFrom), "=", fromUserKv.ModRevision),
clientv3.Compare(clientv3.ModRevision(userTo), "=", toUserKv.ModRevision)).
Then(clientv3.OpPut(userFrom, strconv.FormatInt(int64(userFromBalance-amount), 10)),
clientv3.OpPut(userTo, strconv.FormatInt(int64(userToBalance+amount), 10)))
updateResp, err := txn.Commit()
if err != nil {
return false, err
}
fmt.Println("transaction success")
return updateResp.Succeeded, nil
}
func toUint64(v []byte) (uint64, error) {
str := string(v[:])
return strconv.ParseUint(str, 10, 64)
}
func fromUInt64(v uint64) []byte {
b := make([]byte, binary.MaxVarintLen64)
return b[:binary.PutUvarint(b, v)]
}
// 執行3次的結果
/*
nathan@pc:~/go/src/ITHOME/demo/day19$ go run main.go
userFrom:200, userTo:10
start transaction
transaction success
userFrom:100, userTo:110
Service Terminate
nathan@pc:~/go/src/ITHOME/demo/day19$ go run main.go
userFrom:100, userTo:110
start transaction
transaction success
userFrom:0, userTo:210
Service Terminate
nathan@pc:~/go/src/ITHOME/demo/day19$ go run main.go
userFrom:0, userTo:210
insufficient balance
userFrom:0, userTo:210
Service Terminate
*/
來講解一下.
etcd為了處理來自不同客戶端的併發請求, 還有達成事務隔離, 避免Race condition.
允許client在一次修改中批量執行多組操作命令, 就表示一組操作被綁成一個原子操作, 並且共享同一個revision.
MySQL為了實現Serializable(串行化執行可能會有衝突的事務, 就讓衝突的事務,依照順序進行阻塞等待), 往往使用上了悲觀鎖
, 這導致效能非常低.
etcd這裡用的則是CAS, 沒有實際的Lock.
這裡先用一個事務, 原子操作同時讀取兩個KV
getResp, err := cli.Txn(ctx).Then(clientv3.OpGet(userFrom), clientv3.OpGet(userTo)).Commit()
fromUserKv := getResp.Responses[0].GetResponseRange().Kvs[0]
toUserKv := getResp.Responses[1].GetResponseRange().Kvs[0]
在準備第二個事務, 同時檢查兩個KV, 現在的ModRevision有沒有等於我們當時讀取到的ModRevision.
條件全成真, 則進行Then裡面的加減錢的操作.
這裡都是同一個原子操作的
txn := cli.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(userFrom), "=", fromUserKv.ModRevision),
clientv3.Compare(clientv3.ModRevision(userTo), "=", toUserKv.ModRevision)).
Then(clientv3.OpPut(userFrom, strconv.FormatInt(int64(userFromBalance-amount), 10)),
clientv3.OpPut(userTo, strconv.FormatInt(int64(userToBalance+amount), 10)))
updateResp, err := txn.Commit()
Demo2:
演示一下用戶相互轉帳,卻有兩個線程併發操作同一筆資料.
一開始userFrom有100$, userTo有10$, 一次轉帳100$過去.
如果這事務隔離不能防Non-Repeatable Read的話, 會變成userFrom 0$, 但userTo會有210$. (真好送錢了)
package main
import (
"context"
"encoding/binary"
"errors"
"fmt"
"log"
"strconv"
"sync"
"time"
"go.etcd.io/etcd/clientv3"
)
var (
dialTimeout = 10 * time.Second
requestTimeout = 5 * time.Second
keepAliveTimeout = 5 * time.Second
)
func main() {
cancelCtx, canFunc := context.WithCancel(context.Background())
var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.101:2379"}
cli, err := clientv3.New(clientv3.Config{
Context: cancelCtx,
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatalln(err)
}
defer cli.Close()
var wg sync.WaitGroup
wg.Add(2)
kv := clientv3.NewKV(cli)
getUsersBalance(cancelCtx, kv, "user1", "user2")
go updateUserBalance(cancelCtx, kv, "user1", "user2", 100, &wg, true)
time.Sleep(10 * time.Millisecond)
go updateUserBalance(cancelCtx, kv, "user1", "user2", 100, &wg, false)
wg.Wait()
getUsersBalance(cancelCtx, kv, "user1", "user2")
canFunc()
fmt.Println("Service Terminate")
}
func getUsersBalance(ctx context.Context, cli clientv3.KV, userFrom, userTo string) {
getResp, err := cli.Txn(ctx).Then(clientv3.OpGet(userFrom), clientv3.OpGet(userTo)).Commit()
if err != nil {
return
}
fromUserKv := getResp.Responses[0].GetResponseRange().Kvs[0]
toUserKv := getResp.Responses[1].GetResponseRange().Kvs[0]
userFromBalance, _ := toUint64(fromUserKv.Value)
userToBalance, _ := toUint64(toUserKv.Value)
fmt.Printf("userFrom:%d, userTo:%d\n", userFromBalance, userToBalance)
}
func updateUserBalance(ctx context.Context, cli clientv3.KV, userFrom, userTo string, amount uint64, wg *sync.WaitGroup, isFirst bool) (bool, error) {
defer wg.Done()
getResp, err := cli.Txn(ctx).Then(clientv3.OpGet(userFrom), clientv3.OpGet(userTo)).Commit()
if err != nil {
return false, err
}
fromUserKv := getResp.Responses[0].GetResponseRange().Kvs[0]
toUserKv := getResp.Responses[1].GetResponseRange().Kvs[0]
userFromBalance, _ := toUint64(fromUserKv.Value)
userToBalance, _ := toUint64(toUserKv.Value)
if userFromBalance < uint64(amount) {
fmt.Println("insufficient balance")
return false, errors.New("insufficient balance")
}
fmt.Println("start transaction")
if isFirst {
time.Sleep(2 * time.Second)
}
txn := cli.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(userFrom), "=", fromUserKv.ModRevision),
clientv3.Compare(clientv3.ModRevision(userTo), "=", toUserKv.ModRevision)).
Then(clientv3.OpPut(userFrom, strconv.FormatInt(int64(userFromBalance-amount), 10)),
clientv3.OpPut(userTo, strconv.FormatInt(int64(userToBalance+amount), 10)))
updateResp, err := txn.Commit()
if err != nil {
return false, err
}
if updateResp.Succeeded == false {
fmt.Println("transaction fail")
return false, errors.New("transaction fail")
}
fmt.Println("transaction success")
return updateResp.Succeeded, nil
}
func toUint64(v []byte) (uint64, error) {
str := string(v[:])
return strconv.ParseUint(str, 10, 64)
}
func fromUInt64(v uint64) []byte {
b := make([]byte, binary.MaxVarintLen64)
return b[:binary.PutUvarint(b, v)]
}
/*
userFrom:200, userTo:1010
start transaction
start transaction
transaction success
transaction fail
userFrom:100, userTo:1110
Service Terminate
*/
從結果來看, 可以發現第一個提出事務交易的線程最後是提交失敗的.
這裡刻意用兩個線程, 線程1先執行了updateUserBalance, 線程2後面才執行.
go updateUserBalance(cancelCtx, kv, "user1", "user2", 100, &wg, true)
time.Sleep(10 * time.Millisecond)
go updateUserBalance(cancelCtx, kv, "user1", "user2", 100, &wg, false)
第一個線程取得兩個KV當下的revision,就睡個2秒去了XD
然後第二個線程取得兩個KV當下的revision, 並且事務檢查成功, 提交了, 扣了錢.
此時這兩個kv的ModRevision就被異動了.
第一個線程醒來後, 開始txn的檢查, 在提交時, 就會回傳提交失敗了.
這樣不會白白送錢給人了, 但當然client side自己要handle這類的錯誤就是.
if isFirst {
time.Sleep(2 * time.Second)
}
txn := cli.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(userFrom), "=", fromUserKv.ModRevision),
clientv3.Compare(clientv3.ModRevision(userTo), "=", toUserKv.ModRevision)).
Then(clientv3.OpPut(userFrom, strconv.FormatInt(int64(userFromBalance-amount), 10)),
clientv3.OpPut(userTo, strconv.FormatInt(int64(userToBalance+amount), 10)))
updateResp, err := txn.Commit()
if err != nil {
return false, err
}
if updateResp.Succeeded == false {
fmt.Println("transaction fail")
return false, errors.New("transaction fail")
}
fmt.Println("transaction success")