心跳(heartbeat)是併發進程向外界發出信號的一種方式。命名者從人體解剖學中受到啟發,使用心跳一詞表示被觀察 者的生命體征。心跳在Go語言出現前就已被廣泛使用。
在併發中使用心跳是有原因的。心跳能夠讓我們更加深入的了解系統,並且在系統存在不確定性的時候對
其測試。
我們將在本節中討論兩種不同類型的心跳:
** 1.以固定時間間隔產生的心跳。
2.在工作單元開始時產生的心跳。**
固定時間間隔產生的心跳對於併發來說很有用,它可能在等待處理某個工作單元執行某個任務時發生。由 於你不知道這項工作什麽時候會進行,所以你的goroutine可能會持續等待。心跳是一種向監聽者發出信號的方式,即一切都很好,當前靜默是正常的。
以下代碼演示了會產生心跳的goroutine:
package main
import (
"fmt"
"time"
)
func main() {
doWork := func(
done <-chan interface{},
pulseInterval time.Duration,
) (<-chan interface{}, <-chan time.Time) {
heartbeat := make(chan interface{}) // <1> 在這裡,我們設置了一個發送心跳信號的通道。doWork會返回該通道。
results := make(chan time.Time)
go func() {
defer close(heartbeat)
defer close(results)
pulse := time.Tick(pulseInterval) // <2> 我們按傳入的pulseInterval值定時發送心跳,每次心跳都意味著可以從該通道上讀取到內容。
workGen := time.Tick(2 * pulseInterval) // <3> 這只是用來模擬進入的工作的另一處代碼。我們選擇一個比pulseInterval更長的持續時間,以便我們可以看到來自goroutine的心跳。
sendPulse := func() {
select {
case heartbeat <- struct{}{}:
default: // <4> 請注意,我們包含一個default子句。我們必須考慮如果沒有人接受到心跳的情況。從goroutine發出的結果是至關重要的,但心跳不是。
}
}
sendResult := func(r time.Time) {
for {
select {
case <-done:
return
case <-pulse: // <5> 就像done通道,無論何時執行發送或接收,你都需要考慮心跳發送的情況。
sendPulse()
case results <- r:
return
}
}
}
for {
select {
case <-done:
return
case <-pulse: // <5> 就像done通道,無論何時執行發送或接收,你都需要考慮心跳發送的情況。
sendPulse()
case r := <-workGen:
sendResult(r)
}
}
}()
return heartbeat, results
}
done := make(chan interface{})
time.AfterFunc(10*time.Second, func() { close(done) }) // 在此處,我們設置了一個延時函數,10秒後關閉done通道,結束goroutine的工作。
const timeout = 2 * time.Second // 設置超時時間為2秒。
heartbeat, results := doWork(done, timeout/2) // 調用doWork函數並設置心跳間隔為超時的一半。
for {
select {
case _, ok := <-heartbeat: // 讀取心跳通道。
if ok == false {
return
}
fmt.Println("pulse")
case r, ok := <-results: // 讀取結果通道。
if ok == false {
return
}
fmt.Printf("results %v\n", r.Second())
case <-time.After(timeout): // 如果在超時時間內沒有收到心跳或結果,則退出。
return
}
}
}
輸出:
pulse
pulse
results 52
pulse
pulse
results 54
pulse
pulse
results 56
pulse
pulse
results 58
pulse
和預期的一樣,每次從result中接收到信息,都會收到兩次心跳。
我們可能會使用這樣的功能來收集系統的統計參數,當你的goroutine沒有像預期那樣運行,那麽基於固定時間的心跳信號的作用會非常明顯。
接下來讓我們看看另一個場景:在工作單元開始時產生的心跳。這對測試非常有用。下面是個例子:
package main
import (
"fmt"
"math/rand"
)
func main() {
doWork := func(done <-chan interface{}) (<-chan interface{}, <-chan int) {
heartbeatStream := make(chan interface{}, 1) // <1> 在這裡我們用一個緩衝區創建心跳通道。這確保即使沒有人及時監聽發送,也總會發送至少一個pulse。
workStream := make(chan int)
go func() {
defer close(heartbeatStream)
defer close(workStream)
for i := 0; i < 10; i++ {
select { // <2> 在這裡,我們為心跳設置了一個單獨的select塊。我們不希望將它與發送結果一起包含在同一個select塊中,因為如果接收器未準備好,它們將接收到一個pulse,而result的當前值將會丟失。我們也沒有為done通道提供case語句,因為我們有一個default可以處理這種情況。
case heartbeatStream <- struct{}{}:
default: // <3> 我們再次處理如果沒有人監聽到心跳。因為我們的心跳通道是用緩衝區創建的,如果有人在監聽,但沒有及時處理第一個心跳,仍會被通知。
}
select {
case <-done:
return
case workStream <- rand.Intn(10):
}
}
}()
return heartbeatStream, workStream
}
done := make(chan interface{})
defer close(done)
heartbeat, results := doWork(done)
for {
select {
case _, ok := <-heartbeat:
if ok {
fmt.Println("pulse")
} else {
return
}
case r, ok := <-results:
if ok {
fmt.Printf("results %v\n", r)
} else {
return
}
}
}
}
輸出:
pulse
results 1
pulse
results 7
pulse
results 7
pulse
results 9
pulse
results 1
pulse
results 8
pulse
results 5
pulse
results 0
pulse
results 6
pulse
results 0
如預期一致,每個結果都會有一個心跳。
心跳在併發編程和系統監控中扮演了一個至關重要的角色。以下是心跳重要性的總結:
生命體征監測:心跳是一種表示系統或進程仍在正常運行的信號。如果心跳突然停止,則可能表示系統或進程已經崩潰或遭遇某些問題。
提供可視化的健康檢查:在大型分佈式系統中,心跳機制可用於確保各個服務和組件仍然活躍。這有助於及時發現問題並採取應對措施。
降低不確定性:在併發環境中,系統的行為可能充滿不確定性。心跳提供了一種方式,讓開發人員和系統管理員知道系統的當前狀態,即使在高度併發的情況下也是如此。
協助故障排除:當系統出現問題時,固定時間間隔的心跳可以提供有關問題發生時間的信息。這有助於定位和解決問題。
增強用戶信心:對於需要長時間運行的操作,心跳可以向用戶提供反饋,讓他們知道操作仍在進行中,而不是卡住或失敗。
避免無謂的重啟或介入:有時,一個沒有及時響應的系統可能會被誤認為是失敗的,導致不必要的重啟或其他介入措施。心跳可以避免這種情況,因為它定期向外界發送活躍的信號。
總之,心跳不僅是監控系統健康的一種手段,也是維護系統可靠性和用戶信任的重要工具。