在一些長時間運行的系統中,如守護進程(Daemon process),經常有一些常駐的子程序,名為goroutines。這些goroutines有時會因外部資源而被暫停,待著等待恢復。為了確保這些goroutines運行順暢,我們可以利用所謂的"心跳"機制來監測它們的活躍狀態。當檢測到某個goroutine出現問題時,我們有管理工具可以重新啟動它,確保系統的穩定運行。
注:守護進程(Daemon process)是在Unix和類Unix系統(例如Linux)中運行的一種後臺程序,它獨立於控制終端運行且周期性執行某種任務或等待處理某些事件。這些程序常常用於執行服務性質的工作,如日誌管理、網絡服務、排程任務等。
type startGoroutineFn func(
done <-chan interface{},
pulseInterval time.Duration,
) (heartbeat <-chan interface{}) // <1> 定義一個可以監控和重新啟動的goroutine的函式簽名。我們看到熟悉的done通道,以及熟悉的心跳模式寫法。
newSteward := func(
timeout time.Duration,
startGoroutine startGoroutineFn,
) startGoroutineFn { // <2> 在這裡我們設定了超時時間,並使用函式startGoroutine來啟動它正在監控的goroutine。有趣的是,監控器本身返回一個startGoroutineFn,表示監控器自身也是可監控的。
return func(
done <-chan interface{},
pulseInterval time.Duration,
) <-chan interface{} {
heartbeat := make(chan interface{})
go func() {
defer close(heartbeat)
var wardDone chan interface{}
var wardHeartbeat <-chan interface{}
startWard := func() { // <3> 在這裡我們定義一個閉包,它以同樣的方式來啟動我們正在監視的goroutine。
wardDone = make(chan interface{}) // <4> 這是我們創建一個新通道,我們會將其傳遞給監控通道,以響應發出的停止信號。
wardHeartbeat = startGoroutine(or(wardDone, done), timeout/2) // <5> 在這裡,我們開啟對目標goroutine的監控。如果監控器停止工作,或者監控器想要停止被監控區域,我們希望監控者也停止,因此我們將兩個done通道都包含在邏輯中。我們傳入的心跳間隔是超時時間的一半,但正如我們在“心跳”中討論的那樣,這可以調整。
}
startWard()
pulse := time.Tick(pulseInterval)
monitorLoop:
for {
timeoutSignal := time.After(timeout)
for { // <6> 這是我們的內部循環,它確保監控者可以發出自己的心跳。
select {
case <-pulse:
select {
case heartbeat <- struct{}{}:
default:
}
case <-wardHeartbeat: // <7> 在這裡我們如果接收到監控者的心跳,就會知道它還處於正常工作狀態,程序會繼續監測循環。
case <-timeoutSignal: // <8> 這裡如果我們發現監控者超時,我們要求監控者停下來,並開始一個新的goroutine。然後開始新的監測。
log.Println("steward: ward unhealthy; restarting")
close(wardDone)
startWard()
continue monitorLoop
case <-done:
return
}
}
}
}()
return heartbeat
}
}
接下來讓我們試試看如果監控一個行為異常的goroutine,會發生什麽?完整程式碼如下:
package main
import (
"log"
"os"
"time"
)
func main() {
var or func(channels ...<-chan interface{}) <-chan interface{}
or = func(channels ...<-chan interface{}) <-chan interface{} { // <1> 此函式返回一個通道,此通道會在提供的任意一個通道中接收到訊息時被關閉。
switch len(channels) {
case 0: // <2> 沒有通道被提供。
return nil
case 1: // <3> 只提供了一個通道。
return channels[0]
}
orDone := make(chan interface{})
go func() { // <4> 啟動一個新的goroutine來等待多個通道。
defer close(orDone)
switch len(channels) {
case 2: // <5> 當有兩個通道時的選擇。
select {
case <-channels[0]:
case <-channels[1]:
}
default: // <6> 當有超過兩個通道時的選擇。
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-or(append(channels[3:], orDone)...): // 這裡會遞迴地呼叫or函式,並將orDone加入到通道列表中,確保任何子集的完成都將導致主集完成。
}
}
}()
return orDone
}
type startGoroutineFn func(
done <-chan interface{},
pulseInterval time.Duration,
) (heartbeat <-chan interface{}) // <1> 定義一個可以監控和重新啟動的goroutine的函式簽名。我們看到熟悉的done通道,以及熟悉的心跳模式寫法。
newSteward := func(
timeout time.Duration,
startGoroutine startGoroutineFn,
) startGoroutineFn { // <2> 在這裡我們設定了超時時間,並使用函式startGoroutine來啟動它正在監控的goroutine。有趣的是,監控器本身返回一個startGoroutineFn,表示監控器自身也是可監控的。
return func(
done <-chan interface{},
pulseInterval time.Duration,
) <-chan interface{} {
heartbeat := make(chan interface{})
go func() {
defer close(heartbeat)
var wardDone chan interface{}
var wardHeartbeat <-chan interface{}
startWard := func() { // <3> 在這裡我們定義一個閉包,它以同樣的方式來啟動我們正在監視的goroutine。
wardDone = make(chan interface{}) // <4> 這是我們創建一個新通道,我們會將其傳遞給監控通道,以響應發出的停止信號。
wardHeartbeat = startGoroutine(or(wardDone, done), timeout/2) // <5> 在這裡,我們開啟對目標goroutine的監控。如果監控器停止工作,或者監控器想要停止被監控區域,我們希望監控者也停止,因此我們將兩個done通道都包含在邏輯中。我們傳入的心跳間隔是超時時間的一半,但正如我們在“心跳”中討論的那樣,這可以調整。
}
startWard()
pulse := time.Tick(pulseInterval)
monitorLoop:
for {
timeoutSignal := time.After(timeout)
for { // <6> 這是我們的內部循環,它確保監控者可以發出自己的心跳。
select {
case <-pulse:
select {
case heartbeat <- struct{}{}:
default:
}
case <-wardHeartbeat: // <7> 在這裡我們如果接收到監控者的心跳,就會知道它還處於正常工作狀態,程序會繼續監測循環。
case <-timeoutSignal: // <8> 這裡如果我們發現監控者超時,我們要求監控者停下來,並開始一個新的goroutine。然後開始新的監測。
log.Println("steward: ward unhealthy; restarting")
close(wardDone)
startWard()
continue monitorLoop
case <-done:
return
}
}
}
}()
return heartbeat
}
}
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)
doWork := func(done <-chan interface{}, _ time.Duration) <-chan interface{} {
log.Println("ward: Hello, I'm irresponsible!")
go func() {
<-done // <1> 我們可以看到這個goroutine什麼都沒幹,持續阻塞等待被取消,它同樣不會發出任何表明自己正常信號。
log.Println("ward: I am halting.")
}()
return nil
}
doWorkWithSteward := newSteward(4*time.Second, doWork) // <2> 這裡開始建立被監控的例程,其4秒後會超時。
done := make(chan interface{})
time.AfterFunc(9*time.Second, func() { // <3> 這裡我們9秒後向done通道發出信號停止整個程序。
log.Println("main: halting steward and ward.")
close(done)
})
for range doWorkWithSteward(done, 4*time.Second) {
} // <4> 最後,我們啟動監控器並在其心跳範圍內防止示例停止。
log.Println("Done")
}
輸出:
13:58:22 ward: Hello, I'm irresponsible!
13:58:26 steward: ward unhealthy; restarting
13:58:26 ward: Hello, I'm irresponsible!
13:58:26 ward: I am halting.
13:58:30 steward: ward unhealthy; restarting
13:58:30 ward: Hello, I'm irresponsible!
13:58:30 ward: I am halting.
13:58:31 main: halting steward and ward.
13:58:31 ward: I am halting.
13:58:31 Done
這個程序展示了如何使用心跳模式監控 goroutine 的健康狀態,並在檢測到任何問題時自動重新啟動該 goroutine。這是在分佈式系統或並行計算中保持系統健康的一種常見策略,尤其是當某些任務可能會因各種原因而無法回應時。