iT邦幫忙

2023 iThome 鐵人賽

DAY 29
0
自我挑戰組

Concurrency in go 讀書心得系列 第 29

29.Healing unhealthy goroutines

  • 分享至 

  • xImage
  •  

在一些長時間運行的系統中,如守護進程(Daemon process),經常有一些常駐的子程序,名為goroutines。這些goroutines有時會因外部資源而被暫停,待著等待恢復。為了確保這些goroutines運行順暢,我們可以利用所謂的"心跳"機制來監測它們的活躍狀態。當檢測到某個goroutine出現問題時,我們有管理工具可以重新啟動它,確保系統的穩定運行。

注:守護進程(Daemon process)是在Unix和類Unix系統(例如Linux)中運行的一種後臺程序,它獨立於控制終端運行且周期性執行某種任務或等待處理某些事件。這些程序常常用於執行服務性質的工作,如日誌管理、網絡服務、排程任務等。

type startGoroutineFn func(
		done <-chan interface{},
		pulseInterval time.Duration,
	) (heartbeat <-chan interface{}) // <1> 定義一個可以監控和重新啟動的goroutine的函式簽名。我們看到熟悉的done通道,以及熟悉的心跳模式寫法。

newSteward := func(
		timeout time.Duration,
		startGoroutine startGoroutineFn,
	) startGoroutineFn { // <2> 在這裡我們設定了超時時間,並使用函式startGoroutine來啟動它正在監控的goroutine。有趣的是,監控器本身返回一個startGoroutineFn,表示監控器自身也是可監控的。
		return func(
			done <-chan interface{},
			pulseInterval time.Duration,
		) <-chan interface{} {
			heartbeat := make(chan interface{})
			go func() {
				defer close(heartbeat)

				var wardDone chan interface{}
				var wardHeartbeat <-chan interface{}
				startWard := func() { // <3> 在這裡我們定義一個閉包,它以同樣的方式來啟動我們正在監視的goroutine。
					wardDone = make(chan interface{})                             // <4> 這是我們創建一個新通道,我們會將其傳遞給監控通道,以響應發出的停止信號。
					wardHeartbeat = startGoroutine(or(wardDone, done), timeout/2) // <5> 在這裡,我們開啟對目標goroutine的監控。如果監控器停止工作,或者監控器想要停止被監控區域,我們希望監控者也停止,因此我們將兩個done通道都包含在邏輯中。我們傳入的心跳間隔是超時時間的一半,但正如我們在“心跳”中討論的那樣,這可以調整。
				}
				startWard()
				pulse := time.Tick(pulseInterval)

			monitorLoop:
				for {
					timeoutSignal := time.After(timeout)

					for { // <6> 這是我們的內部循環,它確保監控者可以發出自己的心跳。
						select {
						case <-pulse:
							select {
							case heartbeat <- struct{}{}:
							default:
							}
						case <-wardHeartbeat: // <7> 在這裡我們如果接收到監控者的心跳,就會知道它還處於正常工作狀態,程序會繼續監測循環。
						case <-timeoutSignal: // <8> 這裡如果我們發現監控者超時,我們要求監控者停下來,並開始一個新的goroutine。然後開始新的監測。
							log.Println("steward: ward unhealthy; restarting")
							close(wardDone)
							startWard()
							continue monitorLoop
						case <-done:
							return
						}
					}
				}
			}()

			return heartbeat
		}
	}


接下來讓我們試試看如果監控一個行為異常的goroutine,會發生什麽?完整程式碼如下:

package main

import (
	"log"
	"os"
	"time"
)

func main() {
	var or func(channels ...<-chan interface{}) <-chan interface{}
	or = func(channels ...<-chan interface{}) <-chan interface{} { // <1> 此函式返回一個通道,此通道會在提供的任意一個通道中接收到訊息時被關閉。
		switch len(channels) {
		case 0: // <2> 沒有通道被提供。
			return nil
		case 1: // <3> 只提供了一個通道。
			return channels[0]
		}

		orDone := make(chan interface{})
		go func() { // <4> 啟動一個新的goroutine來等待多個通道。
			defer close(orDone)

			switch len(channels) {
			case 2: // <5> 當有兩個通道時的選擇。
				select {
				case <-channels[0]:
				case <-channels[1]:
				}
			default: // <6> 當有超過兩個通道時的選擇。
				select {
				case <-channels[0]:
				case <-channels[1]:
				case <-channels[2]:
				case <-or(append(channels[3:], orDone)...): // 這裡會遞迴地呼叫or函式,並將orDone加入到通道列表中,確保任何子集的完成都將導致主集完成。
				}
			}
		}()
		return orDone
	}
	type startGoroutineFn func(
		done <-chan interface{},
		pulseInterval time.Duration,
	) (heartbeat <-chan interface{}) // <1> 定義一個可以監控和重新啟動的goroutine的函式簽名。我們看到熟悉的done通道,以及熟悉的心跳模式寫法。

	newSteward := func(
		timeout time.Duration,
		startGoroutine startGoroutineFn,
	) startGoroutineFn { // <2> 在這裡我們設定了超時時間,並使用函式startGoroutine來啟動它正在監控的goroutine。有趣的是,監控器本身返回一個startGoroutineFn,表示監控器自身也是可監控的。
		return func(
			done <-chan interface{},
			pulseInterval time.Duration,
		) <-chan interface{} {
			heartbeat := make(chan interface{})
			go func() {
				defer close(heartbeat)

				var wardDone chan interface{}
				var wardHeartbeat <-chan interface{}
				startWard := func() { // <3> 在這裡我們定義一個閉包,它以同樣的方式來啟動我們正在監視的goroutine。
					wardDone = make(chan interface{})                             // <4> 這是我們創建一個新通道,我們會將其傳遞給監控通道,以響應發出的停止信號。
					wardHeartbeat = startGoroutine(or(wardDone, done), timeout/2) // <5> 在這裡,我們開啟對目標goroutine的監控。如果監控器停止工作,或者監控器想要停止被監控區域,我們希望監控者也停止,因此我們將兩個done通道都包含在邏輯中。我們傳入的心跳間隔是超時時間的一半,但正如我們在“心跳”中討論的那樣,這可以調整。
				}
				startWard()
				pulse := time.Tick(pulseInterval)

			monitorLoop:
				for {
					timeoutSignal := time.After(timeout)

					for { // <6> 這是我們的內部循環,它確保監控者可以發出自己的心跳。
						select {
						case <-pulse:
							select {
							case heartbeat <- struct{}{}:
							default:
							}
						case <-wardHeartbeat: // <7> 在這裡我們如果接收到監控者的心跳,就會知道它還處於正常工作狀態,程序會繼續監測循環。
						case <-timeoutSignal: // <8> 這裡如果我們發現監控者超時,我們要求監控者停下來,並開始一個新的goroutine。然後開始新的監測。
							log.Println("steward: ward unhealthy; restarting")
							close(wardDone)
							startWard()
							continue monitorLoop
						case <-done:
							return
						}
					}
				}
			}()

			return heartbeat
		}
	}

	log.SetOutput(os.Stdout)
	log.SetFlags(log.Ltime | log.LUTC)

	doWork := func(done <-chan interface{}, _ time.Duration) <-chan interface{} {
		log.Println("ward: Hello, I'm irresponsible!")
		go func() {
			<-done // <1> 我們可以看到這個goroutine什麼都沒幹,持續阻塞等待被取消,它同樣不會發出任何表明自己正常信號。
			log.Println("ward: I am halting.")
		}()
		return nil
	}
	doWorkWithSteward := newSteward(4*time.Second, doWork) // <2> 這裡開始建立被監控的例程,其4秒後會超時。

	done := make(chan interface{})
	time.AfterFunc(9*time.Second, func() { // <3> 這裡我們9秒後向done通道發出信號停止整個程序。
		log.Println("main: halting steward and ward.")
		close(done)
	})

	for range doWorkWithSteward(done, 4*time.Second) {
	} // <4> 最後,我們啟動監控器並在其心跳範圍內防止示例停止。
	log.Println("Done")

}
輸出:
13:58:22 ward: Hello, I'm irresponsible!
13:58:26 steward: ward unhealthy; restarting
13:58:26 ward: Hello, I'm irresponsible!
13:58:26 ward: I am halting.
13:58:30 steward: ward unhealthy; restarting
13:58:30 ward: Hello, I'm irresponsible!
13:58:30 ward: I am halting.
13:58:31 main: halting steward and ward.
13:58:31 ward: I am halting.
13:58:31 Done

這個程序展示了如何使用心跳模式監控 goroutine 的健康狀態,並在檢測到任何問題時自動重新啟動該 goroutine。這是在分佈式系統或並行計算中保持系統健康的一種常見策略,尤其是當某些任務可能會因各種原因而無法回應時。


上一篇
28.Rate limiting-2
下一篇
30.完賽心得
系列文
Concurrency in go 讀書心得30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言