iT邦幫忙

第 11 屆 iT 邦幫忙鐵人賽

DAY 22
0
Software Development

分散式系統 - 在分散的世界中保持一致系列 第 22

Day 22 - Zookeeper - Leader Election 與 Reverse Proxy 實作,使用Golang

前言

今天我們繼續看幾個跟Zookeeper有關的實作,主要是因為這是那時的作業,使用Golang,我覺得蠻好玩的。

Leader Election

許多分散式系統都是採Master-Slave的需求
一種方法是像Kubernetes那樣部署時就指名誰是Master誰是Node,
這種的缺點是要做到HA,必須一開始就多部署幾個Master,這樣當Master失效時,可以由其他的Master替補。

另一種是每當有Master失效時,都重新從Slave中選出一個作為Master,
好處就是一開始部署時就沒有特別區分Master-Slave,Master失效時可以依賴Leader Election來替補Master。

已經見過的例子如: Raft、Multi-Paxos、Chubby、Zookeeper等都是利用共識演算法附帶的Leader Election來完成。

而Zookeeper/Chubby的目的就是希望開發者不需要將共識演算法嵌入系統裡,而獨立提供的Lock Service。

因此Zookeeper/Chubby也可已讓使用者用簡單的API來完成Leader Election。

步驟

如同昨天的Lock Service實作,這邊的做法也是大同小異

  1. 所有的Client嘗試Create "/Election/Leader" ephermeral node
  2. 成功創建的那個Client成為Leader,將自己的訊息寫入該znode
  3. 其餘的Client為Slave,從Leader node讀取Master位置資訊,並Watch該Leader Node

  1. 當Leader失效時,因為該znode為ephermeral node,Client連線失效也會被Zookeeper刪除
  2. 其餘的Slave會收到來自Zookeeper的通知,因此知道Leader失效
  3. 重複上一輪的步驟,嘗試Create "/Election/Leader"選新Leader

討論:

  • 因為要通知所有的Client,效能差且Scalibility也不好
  • 選舉是隨機的,每一次看哪一個Client訊息先送達

改進

如同昨天的Lock Service,可以改為以下的方式

  1. 改為 Create "/Election/Leader-" ephermeral sequential node
  2. 由數字最小的成為Leader
  3. 其餘Slave Watch前一個znode

當Leader失效時,下一個Slave就會收到通知,直接成為Leader即可。

優點就是Zookeeper不需要通知所有的Client,
也不需要重新經過大家搶著創建znode的過程,
系統得以增進Scalibility。

Reverse Proxy

另一個是可以應用在Reverse Proxy裡面去記錄目前有哪些Servers可以提供服務,
將Client的請求根據自訂義規則(隨機、RoundRobin...等)
轉發到Server身上。

因為Server可能是動態增加移除的(雲端裡面常見的Autoscaling)
因此Proxy要能夠感知Server上線且記錄他的ip位址。

前幾個例子都是用文字與圖片來說明,
我們這個用程式碼吧~

使用
"github.com/samuel/go-zookeeper/zk" 為Zookeeper的Library

首先是Server,是一個簡單的Golang HTTP Server。

server.go

func main() {
	var zkHosts = []string{"zookeeper:2181"}
	
	// gserveName 為將來作為自己znode的名字
	// gserverZkPath 為整個znode的path,放在/grproxy下
	gserveName = os.Getenv("gserve_name")
	gserverZkPath = "/grproxy/" + gserveName

	// 連接Zookeeper
	conn, _, err := zk.Connect(zkHosts, time.Second*5)
	if err != nil {
		fmt.Printf("Couldn't connect to zookeeper: %v", err)
		return
	}
	defer conn.Close()

	for conn.State() != zk.StateHasSession {
		fmt.Printf("gserve is connecting to the Zookeeper \n...")
		time.Sleep(5)
	}
	fmt.Println("Connected with Zookeeper")
	
	// 設定創建的znode為 "讀寫修改ACL" 都允許的權限
	acl := zk.WorldACL(zk.PermAll)
	// 設定創建的znode為Ephemeral node
	flag := int32(zk.FlagEphemeral)
	// 確定proxy server存在且在zk裡面已經Create /grproxy znode
	exists, _, _ := conn.Exists("/grproxy")

	if !exists {
		for {
			time.Sleep(5)
			exists, _, _ := conn.Exists("/grproxy")
			if exists {
				break
			}
		}
	}
	// 創建自己的znode
	gserverZkPathNode, err := conn.Create(gserverZkPath, []byte(gserveName+":9091"), flag, acl)
	if err != nil {
		fmt.Printf("Couldn't create the znode for root path %v\n", err)
		return
	}
	fmt.Printf("create ephemeral node: %v\n", gserverZkPathNode)
	// 啟動 HTTP server
	fmt.Println("Run server ...")
	http.HandleFunc("/library", apiHandler)
	log.Fatal(http.ListenAndServe(":9091", nil))
}

接下來proxy.go這邊要能夠

  1. 動態感知server的加入
  2. Round Robin轉發請求。

grproxy.go


type RoundRobinList struct {
	urls  []string
	index int
}

var targets RoundRobinList

func main() {
	// 連線Zookeeper
	var zkHosts = []string{"zookeeper:2181"}
	conn, _, err := zk.Connect(zkHosts, time.Second*5)
	if err != nil {
		fmt.Printf("Couldn't connect to zookeeper: %v \n", err)
		return
	}
	defer conn.Close()

	for conn.State() != zk.StateHasSession {
		fmt.Printf("grproxy is connecting to the Zookeeper \n...")
		time.Sleep(5)
	}
	fmt.Println("Connected with Zookeeper \n")

	// 檢查/grproxy是否已經存在,創建/grproxy並把自己的位置 "grproxy:80" 寫入 znode 裡面
	acl := zk.WorldACL(zk.PermAll)

	flag := int32(0)

	exists, _, err := conn.Exists("/grproxy")
	if err != nil {
		fmt.Printf("Couldn't check the existence of root path : %v \n", err)
		return
	}

	if !exists {
		rootPathNode, err := conn.Create("/grproxy", []byte("grproxy:80"), flag, acl)
		if err != nil {
			fmt.Printf("Couldn't create the znode for root path %v \n", err)
			return
		}
		fmt.Printf("create: %v \n", rootPathNode)
	}

	targets.index = 0

	// 取得/grproxy底下的所有目前運作的Server,使用watchZkChild API
	childrenChan, errorsChan := watchZkChild(conn, "/grproxy")

	go getAllGserves(conn, childrenChan, errorsChan)

	// 啟動proxy server,本身也是一個HTTP server
	// 收到HTTP Request判斷header後,轉發給Server來處理
	// redirect the request to different gserve
	proxy := newMultiHostReverseProxy()
	log.Fatal(http.ListenAndServe(":8000", proxy))
}


// 如果有新的Server加入,事件訊息會被放入childrenChan,在這邊處理把他加入我們可以轉發的target list中
func getAllGserves(conn *zk.Conn, childrenChan chan []string, errorsChan chan error) {
	for {
		select {
		case children := <-childrenChan:
			fmt.Printf("gserve %v \n", children)
			var urlTemp []string
			for _, child := range children {
				gserveUrl, _, err := conn.Get(rootPath + "/" + child)
				if err != nil {
					fmt.Printf("Couldn't get data from node %v , error %v \n", rootPath+"/"+child, err)
				}
				fmt.Printf("gserveUrl: %v", gserveUrl)
				urlTemp = append(urlTemp, string(gserveUrl))
			}
			targets.urls = urlTemp
			fmt.Printf("current gserves %+v \n", targets.urls)

		case err := <-errorsChan:
			fmt.Printf("%+v go routine for watching Zk children error \n", err)
		}
	}
}


// 實作轉發的邏輯
func newMultiHostReverseProxy() *httputil.ReverseProxy {
	director := func(req *http.Request) {

		if req.URL.Path == "/library" {
			fmt.Println("This is for gserver")

			// round robin
			lens := len(targets.urls)
			if targets.index >= lens {
				targets.index = 0
			}
			gserveTarget := targets.urls[targets.index]
			targets.index = (targets.index + 1) % lens

			req.URL.Scheme = "http"
			req.URL.Host = gserveTarget

		} else {
			fmt.Println("This is for nginx")
			req.URL.Scheme = "http"
			req.URL.Host = "nginx"
		}

	}
	return &httputil.ReverseProxy{Director: director}
}

以上就是簡單的Reverse Proxy實作

總結

如果大家有興趣玩幾個利用Zookeeper實作的應用,
可以參考官網的例子
https://zookeeper.apache.org/doc/current/recipes.html

這幾個算是踏入分散式系統常見的案例,
拿來寫寫當side project不錯。


上一篇
Day 21 - Zookeeper - 介紹與實作Lock與2PC
下一篇
Day 23 - Data Partitioning - Distributed Hash Table and Consistent Hashing
系列文
分散式系統 - 在分散的世界中保持一致30

尚未有邦友留言

立即登入留言