在前面有介紹過了 golang 的重點之一 goroutine ,golang 的設計讓 concurrency 非常容易,但是大家有沒有想過,goroutine 也不是什麼黑科技,最終電腦還是有所謂的 CPU & MEM 上限,所以也就意味著 goroutine 不能無窮無境的開,那今天就來討論 Concurrency Pattern,到底該如何設計。
接下來,我們假設一個情境,我們需要在某個 http handler ,加入統計 request 相關的方法,但是又不希望它影響這個 api response 的速度,所以我們一定是要用 goroutine 把統計的相關 func ,丟到背景平行處理。就像下面一樣
//模擬需要統計 reqesut 相關數據的
func Statistic(r *http.Request) {
}
func main() {
http.HandleFunc("/api/query", func(w http.ResponseWriter, r *http.Request) {
//假設需要統計 request 相關數據,所以丟背景
go Statistic(r)
u := &UserInfo{
Name: "syhlion",
Age: 18,
}
b, err := json.Marshal(u)
if err != nil {
log.Println(err)
return
}
w.Header().Set("Content-Type", "application/json;charset=UTF-8")
w.WriteHeader(http.StatusOK)
w.Write(b)
})
log.Fatal(http.ListenAndServe(":8080", nil))
}
但是如果照上面所寫的,如果 http request 一多,是不是 goroutine 就無窮無境的開下去了....,那這樣就在比到底是先碰上 port 不夠的問題,還是 CPU & MEM 先被耗盡。最終就是被 OS 層強制關閉 application。
在這方面上的設計,我個人是有一個原則,『當存取的人數多時,服務可以慢,但不能死』,所以當我們存取人數多時,寧可少收幾個客戶,也不能硬要多收幾個客戶,讓整個系統崩潰。
接下來我們來構思一下改良策略,首先我們需要一個 worker 派發中心。這時侯就能善用 golang 的特性 channel,運用 channel 當作 quene ,在當配 gorotuine,就能做出一個派工的模型。
依據上圖,我們的實作方式如下
type StatisticWorker struct {
quene chan *http.Request
num int
}
//統一由這邊做 requset 加入的動作
func (s *StatisticWorker) AddRequest(r *http.Request) (err error) {
select {
case s.quene <- r:
//當quene已滿時,則把 err 丟出,
default:
err = errors.New("buffer full")
}
return
}
//實際worker本體 運作邏輯的地方
func (s *StatisticWorker) Statistic() {
for r := range s.quene {
//處理 request 相關統計
}
}
//初始化到底要有幾個worker
func (s *StatisticWorker) Run() {
for i := 0; i < s.num; i++ {
go Statistic()
}
}
這樣子我們就能看到一個完整 worker,可以依你需求去控制最大收 request 的統計 worker。
完整的使用範例如下
type StatisticWorker struct {
quene chan *http.Request
num int
}
//統一由這邊做 requset 加入的動作
func (s *StatisticWorker) AddRequest(r *http.Request) (err error) {
select {
case s.quene <- r:
//當quene已滿時,則把 err 丟出,
default:
err = errors.New("buffer full")
}
return
}
//實際worker本體 運作邏輯的地方
func (s *StatisticWorker) Statistic() {
for r := range s.quene {
//處理 request 相關統計
}
}
//初始化到底要有幾個worker
func (s *StatisticWorker) Run() {
for i := 0; i < s.num; i++ {
go Statistic()
}
}
func main() {
sw := &StatisticWorker{
quene: make(chan *http.Request, 1000),
num: 100,
}
go sw.Run()
http.HandleFunc("/api/query", func(w http.ResponseWriter, r *http.Request) {
//假設需要統計 request 相關數據,所以丟背景
//這邊使用這個非阻塞式的worker
err := sw.Add(r)
if err != nil {
log.Println(err)
return
}
u := &UserInfo{
Name: "syhlion",
Age: 18,
}
b, err := json.Marshal(u)
if err != nil {
log.Println(err)
return
}
w.Header().Set("Content-Type", "application/json;charset=UTF-8")
w.WriteHeader(http.StatusOK)
w.Write(b)
})
log.Fatal(http.ListenAndServe(":8080", nil))
}