今天我們繼續看幾個跟Zookeeper有關的實作,主要是因為這是那時的作業,使用Golang,我覺得蠻好玩的。
許多分散式系統都是採Master-Slave的需求
一種方法是像Kubernetes那樣部署時就指名誰是Master誰是Node,
這種的缺點是要做到HA,必須一開始就多部署幾個Master,這樣當Master失效時,可以由其他的Master替補。
另一種是每當有Master失效時,都重新從Slave中選出一個作為Master,
好處就是一開始部署時就沒有特別區分Master-Slave,Master失效時可以依賴Leader Election來替補Master。
已經見過的例子如: Raft、Multi-Paxos、Chubby、Zookeeper等都是利用共識演算法附帶的Leader Election來完成。
而Zookeeper/Chubby的目的就是希望開發者不需要將共識演算法嵌入系統裡,而獨立提供的Lock Service。
因此Zookeeper/Chubby也可已讓使用者用簡單的API來完成Leader Election。
如同昨天的Lock Service實作,這邊的做法也是大同小異
如同昨天的Lock Service,可以改為以下的方式
當Leader失效時,下一個Slave就會收到通知,直接成為Leader即可。
優點就是Zookeeper不需要通知所有的Client,
也不需要重新經過大家搶著創建znode的過程,
系統得以增進Scalibility。
另一個是可以應用在Reverse Proxy裡面去記錄目前有哪些Servers可以提供服務,
將Client的請求根據自訂義規則(隨機、RoundRobin...等)
轉發到Server身上。
因為Server可能是動態增加移除的(雲端裡面常見的Autoscaling)
因此Proxy要能夠感知Server上線且記錄他的ip位址。
前幾個例子都是用文字與圖片來說明,
我們這個用程式碼吧~
使用"github.com/samuel/go-zookeeper/zk"
為Zookeeper的Library
首先是Server,是一個簡單的Golang HTTP Server。
server.go
func main() {
var zkHosts = []string{"zookeeper:2181"}
// gserveName 為將來作為自己znode的名字
// gserverZkPath 為整個znode的path,放在/grproxy下
gserveName = os.Getenv("gserve_name")
gserverZkPath = "/grproxy/" + gserveName
// 連接Zookeeper
conn, _, err := zk.Connect(zkHosts, time.Second*5)
if err != nil {
fmt.Printf("Couldn't connect to zookeeper: %v", err)
return
}
defer conn.Close()
for conn.State() != zk.StateHasSession {
fmt.Printf("gserve is connecting to the Zookeeper \n...")
time.Sleep(5)
}
fmt.Println("Connected with Zookeeper")
// 設定創建的znode為 "讀寫修改ACL" 都允許的權限
acl := zk.WorldACL(zk.PermAll)
// 設定創建的znode為Ephemeral node
flag := int32(zk.FlagEphemeral)
// 確定proxy server存在且在zk裡面已經Create /grproxy znode
exists, _, _ := conn.Exists("/grproxy")
if !exists {
for {
time.Sleep(5)
exists, _, _ := conn.Exists("/grproxy")
if exists {
break
}
}
}
// 創建自己的znode
gserverZkPathNode, err := conn.Create(gserverZkPath, []byte(gserveName+":9091"), flag, acl)
if err != nil {
fmt.Printf("Couldn't create the znode for root path %v\n", err)
return
}
fmt.Printf("create ephemeral node: %v\n", gserverZkPathNode)
// 啟動 HTTP server
fmt.Println("Run server ...")
http.HandleFunc("/library", apiHandler)
log.Fatal(http.ListenAndServe(":9091", nil))
}
接下來proxy.go這邊要能夠
grproxy.go
type RoundRobinList struct {
urls []string
index int
}
var targets RoundRobinList
func main() {
// 連線Zookeeper
var zkHosts = []string{"zookeeper:2181"}
conn, _, err := zk.Connect(zkHosts, time.Second*5)
if err != nil {
fmt.Printf("Couldn't connect to zookeeper: %v \n", err)
return
}
defer conn.Close()
for conn.State() != zk.StateHasSession {
fmt.Printf("grproxy is connecting to the Zookeeper \n...")
time.Sleep(5)
}
fmt.Println("Connected with Zookeeper \n")
// 檢查/grproxy是否已經存在,創建/grproxy並把自己的位置 "grproxy:80" 寫入 znode 裡面
acl := zk.WorldACL(zk.PermAll)
flag := int32(0)
exists, _, err := conn.Exists("/grproxy")
if err != nil {
fmt.Printf("Couldn't check the existence of root path : %v \n", err)
return
}
if !exists {
rootPathNode, err := conn.Create("/grproxy", []byte("grproxy:80"), flag, acl)
if err != nil {
fmt.Printf("Couldn't create the znode for root path %v \n", err)
return
}
fmt.Printf("create: %v \n", rootPathNode)
}
targets.index = 0
// 取得/grproxy底下的所有目前運作的Server,使用watchZkChild API
childrenChan, errorsChan := watchZkChild(conn, "/grproxy")
go getAllGserves(conn, childrenChan, errorsChan)
// 啟動proxy server,本身也是一個HTTP server
// 收到HTTP Request判斷header後,轉發給Server來處理
// redirect the request to different gserve
proxy := newMultiHostReverseProxy()
log.Fatal(http.ListenAndServe(":8000", proxy))
}
// 如果有新的Server加入,事件訊息會被放入childrenChan,在這邊處理把他加入我們可以轉發的target list中
func getAllGserves(conn *zk.Conn, childrenChan chan []string, errorsChan chan error) {
for {
select {
case children := <-childrenChan:
fmt.Printf("gserve %v \n", children)
var urlTemp []string
for _, child := range children {
gserveUrl, _, err := conn.Get(rootPath + "/" + child)
if err != nil {
fmt.Printf("Couldn't get data from node %v , error %v \n", rootPath+"/"+child, err)
}
fmt.Printf("gserveUrl: %v", gserveUrl)
urlTemp = append(urlTemp, string(gserveUrl))
}
targets.urls = urlTemp
fmt.Printf("current gserves %+v \n", targets.urls)
case err := <-errorsChan:
fmt.Printf("%+v go routine for watching Zk children error \n", err)
}
}
}
// 實作轉發的邏輯
func newMultiHostReverseProxy() *httputil.ReverseProxy {
director := func(req *http.Request) {
if req.URL.Path == "/library" {
fmt.Println("This is for gserver")
// round robin
lens := len(targets.urls)
if targets.index >= lens {
targets.index = 0
}
gserveTarget := targets.urls[targets.index]
targets.index = (targets.index + 1) % lens
req.URL.Scheme = "http"
req.URL.Host = gserveTarget
} else {
fmt.Println("This is for nginx")
req.URL.Scheme = "http"
req.URL.Host = "nginx"
}
}
return &httputil.ReverseProxy{Director: director}
}
以上就是簡單的Reverse Proxy實作
如果大家有興趣玩幾個利用Zookeeper實作的應用,
可以參考官網的例子
https://zookeeper.apache.org/doc/current/recipes.html
這幾個算是踏入分散式系統常見的案例,
拿來寫寫當side project不錯。