iT邦幫忙

2023 iThome 鐵人賽

DAY 25
0

心跳(heartbeat)是併發進程向外界發出信號的一種方式。命名者從人體解剖學中受到啟發,使用心跳一詞表示被觀察 者的生命體征。心跳在Go語言出現前就已被廣泛使用。
在併發中使用心跳是有原因的。心跳能夠讓我們更加深入的了解系統,並且在系統存在不確定性的時候對
其測試。
我們將在本節中討論兩種不同類型的心跳:
** 1.以固定時間間隔產生的心跳。
2.在工作單元開始時產生的心跳。**
固定時間間隔產生的心跳對於併發來說很有用,它可能在等待處理某個工作單元執行某個任務時發生。由 於你不知道這項工作什麽時候會進行,所以你的goroutine可能會持續等待。心跳是一種向監聽者發出信號的方式,即一切都很好,當前靜默是正常的。
以下代碼演示了會產生心跳的goroutine:

package main

import (
	"fmt"
	"time"
)

func main() {
	doWork := func(
		done <-chan interface{},
		pulseInterval time.Duration,
	) (<-chan interface{}, <-chan time.Time) {
		heartbeat := make(chan interface{}) // <1> 在這裡,我們設置了一個發送心跳信號的通道。doWork會返回該通道。
		results := make(chan time.Time)
		go func() {
			defer close(heartbeat)
			defer close(results)

			pulse := time.Tick(pulseInterval)       // <2> 我們按傳入的pulseInterval值定時發送心跳,每次心跳都意味著可以從該通道上讀取到內容。
			workGen := time.Tick(2 * pulseInterval) // <3> 這只是用來模擬進入的工作的另一處代碼。我們選擇一個比pulseInterval更長的持續時間,以便我們可以看到來自goroutine的心跳。

			sendPulse := func() {
				select {
				case heartbeat <- struct{}{}:
				default: // <4> 請注意,我們包含一個default子句。我們必須考慮如果沒有人接受到心跳的情況。從goroutine發出的結果是至關重要的,但心跳不是。
				}
			}
			sendResult := func(r time.Time) {
				for {
					select {
					case <-done:
						return
					case <-pulse: // <5> 就像done通道,無論何時執行發送或接收,你都需要考慮心跳發送的情況。
						sendPulse()
					case results <- r:
						return
					}
				}
			}

			for {
				select {
				case <-done:
					return
				case <-pulse: // <5> 就像done通道,無論何時執行發送或接收,你都需要考慮心跳發送的情況。
					sendPulse()
				case r := <-workGen:
					sendResult(r)
				}
			}
		}()
		return heartbeat, results
	}
	done := make(chan interface{})
	time.AfterFunc(10*time.Second, func() { close(done) }) // 在此處,我們設置了一個延時函數,10秒後關閉done通道,結束goroutine的工作。

	const timeout = 2 * time.Second               // 設置超時時間為2秒。
	heartbeat, results := doWork(done, timeout/2) // 調用doWork函數並設置心跳間隔為超時的一半。
	for {
		select {
		case _, ok := <-heartbeat: // 讀取心跳通道。
			if ok == false {
				return
			}
			fmt.Println("pulse")
		case r, ok := <-results: // 讀取結果通道。
			if ok == false {
				return
			}
			fmt.Printf("results %v\n", r.Second())
		case <-time.After(timeout): // 如果在超時時間內沒有收到心跳或結果,則退出。
			return
		}
	}
}
輸出:
pulse
  pulse
  results 52
  pulse
  pulse
  results 54
  pulse
  pulse
  results 56
  pulse
  pulse
  results 58
  pulse

和預期的一樣,每次從result中接收到信息,都會收到兩次心跳。
我們可能會使用這樣的功能來收集系統的統計參數,當你的goroutine沒有像預期那樣運行,那麽基於固定時間的心跳信號的作用會非常明顯。


接下來讓我們看看另一個場景:在工作單元開始時產生的心跳。這對測試非常有用。下面是個例子:

package main

import (
	"fmt"
	"math/rand"
)

func main() {
	doWork := func(done <-chan interface{}) (<-chan interface{}, <-chan int) {
		heartbeatStream := make(chan interface{}, 1) // <1> 在這裡我們用一個緩衝區創建心跳通道。這確保即使沒有人及時監聽發送,也總會發送至少一個pulse。
		workStream := make(chan int)
		go func() {
			defer close(heartbeatStream)
			defer close(workStream)

			for i := 0; i < 10; i++ {
				select { // <2> 在這裡,我們為心跳設置了一個單獨的select塊。我們不希望將它與發送結果一起包含在同一個select塊中,因為如果接收器未準備好,它們將接收到一個pulse,而result的當前值將會丟失。我們也沒有為done通道提供case語句,因為我們有一個default可以處理這種情況。
				case heartbeatStream <- struct{}{}:
				default: // <3> 我們再次處理如果沒有人監聽到心跳。因為我們的心跳通道是用緩衝區創建的,如果有人在監聽,但沒有及時處理第一個心跳,仍會被通知。
				}

				select {
				case <-done:
					return
				case workStream <- rand.Intn(10):
				}
			}
		}()

		return heartbeatStream, workStream
	}

	done := make(chan interface{})
	defer close(done)

	heartbeat, results := doWork(done)
	for {
		select {
		case _, ok := <-heartbeat:
			if ok {
				fmt.Println("pulse")
			} else {
				return
			}
		case r, ok := <-results:
			if ok {
				fmt.Printf("results %v\n", r)
			} else {
				return
			}
		}
	}
}
輸出:
 pulse
  results 1
  pulse
  results 7
  pulse
  results 7
  pulse
  results 9
  pulse
  results 1
  pulse
  results 8
  pulse
  results 5
  pulse
  results 0
  pulse
  results 6
  pulse
  results 0

如預期一致,每個結果都會有一個心跳。


心跳在併發編程和系統監控中扮演了一個至關重要的角色。以下是心跳重要性的總結:

  1. 生命體征監測:心跳是一種表示系統或進程仍在正常運行的信號。如果心跳突然停止,則可能表示系統或進程已經崩潰或遭遇某些問題。

  2. 提供可視化的健康檢查:在大型分佈式系統中,心跳機制可用於確保各個服務和組件仍然活躍。這有助於及時發現問題並採取應對措施。

  3. 降低不確定性:在併發環境中,系統的行為可能充滿不確定性。心跳提供了一種方式,讓開發人員和系統管理員知道系統的當前狀態,即使在高度併發的情況下也是如此。

  4. 協助故障排除:當系統出現問題時,固定時間間隔的心跳可以提供有關問題發生時間的信息。這有助於定位和解決問題。

  5. 增強用戶信心:對於需要長時間運行的操作,心跳可以向用戶提供反饋,讓他們知道操作仍在進行中,而不是卡住或失敗。

  6. 避免無謂的重啟或介入:有時,一個沒有及時響應的系統可能會被誤認為是失敗的,導致不必要的重啟或其他介入措施。心跳可以避免這種情況,因為它定期向外界發送活躍的信號。

總之,心跳不僅是監控系統健康的一種手段,也是維護系統可靠性和用戶信任的重要工具。


上一篇
24.Timeout and Cancellation
下一篇
26.Replicated request
系列文
Concurrency in go 讀書心得30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言