iT邦幫忙

2024 iThome 鐵人賽

DAY 15
0
Modern Web

Go 快 Go 高效: 從基礎語法到現代Web應用開發系列 第 15

【Day15】多線程/平行化處理 II | WaitGroup 與 Caching 應用

  • 分享至 

  • xImage
  •  

在並發程式設計中,如何有效地管理多線程並確保資料一致性是一個重要的課題。上一篇文章中,我們介紹了 -GoroutinesChannels 的基本概念。本篇將深入探討如何使用 WaitGroup 進行同步控制,並結合高效能的快取機制來優化多線程應用。此外,我們還將介紹如何使用 singleflight 模式來避免重複的資料請求,以及 Fan-InFan-Out 的併發模式。

WaitGroup 簡介

WaitGroup 是 Go 語言 sync 套件提供的一種同步工具,用於等待一組 Goroutines 完成。它比使用 Mutex 更簡單,特別適合於等待多個並發操作的完成。

  • 基本用法
package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(2) // 設置等待的 Goroutines 數量

    go func() {
        defer wg.Done() // 完成後調用 Done
        fmt.Println("Goroutine 1 完成")
    }()

    go func() {
        defer wg.Done()
        fmt.Println("Goroutine 2 完成")
    }()

    wg.Wait() // 等待所有 Goroutines 完成
    fmt.Println("所有 Goroutines 已完成")
}
</* Output: */>(預期輸出)
Goroutine 1 完成
Goroutine 2 完成
所有 Goroutines 已完成
  • wg.Add(n):設置等待的 Goroutines 數量。
  • wg.Done():每個 Goroutine 完成後調用一次。
  • wg.Wait():阻塞主線程,直到所有的 Done 被調用後才會繼續執行下面內容。

Caching 介紹

在多線程應用中,頻繁地從資料來源(如資料庫或 API)獲取資料可能會導致性能瓶頸。快取(Caching)是一種常見的優化手段,可以減少重複的資料請求,提升應用的響應速度和整體性能。然而,在高並發環境下,實現一個高效且安全的快取機制並不簡單,這需要處理好資料的一致性和併發訪問的問題。

使用 BigCache 實現高效能快取

BigCache 是一個高效能的 Go 語言快取庫,專為高並發環境設計。它內建處理了併發問題,無需手動使用 sync.Mutex 來保護資料,極大地簡化了快取的實現。

  • 記得先在terminal中輸入:
go get -u github.com/allegro/bigcache/v3
  • 檢查go.mod中有我們安裝好的依賴項目
module demo

go 1.23.0

require github.com/allegro/bigcache/v3 v3.1.0
  • 基本用法
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/allegro/bigcache/v3"
)

func main() {
	// 創建一個背景
	ctx := context.Background()

	// 設定 BigCache 的配置
	config := bigcache.DefaultConfig(10 * time.Minute)
	config.Shards = 1024             // 分片數量,默認為 256
	config.MaxEntriesInWindow = 1000 // 最大條目數量
	config.HardMaxCacheSize = 8192   // 最大快取大小(MB)

	// 初始化 BigCache
	cache, err := bigcache.New(ctx, config)
	if err != nil {
		panic(err)
	}
	defer cache.Close() // 確保在應用結束時關閉快取

	// 設置快取值
	err = cache.Set("myKey", []byte("cached data"))
	if err != nil {
		fmt.Println("設置快取值時發生錯誤:", err)
	}

	// 獲取快取值
	value, err := cache.Get("myKey")
	if err != nil {
		fmt.Println("未找到快取值")
	} else {
		fmt.Println("快取值:", string(value))
	}
}
</ Output: />
快取值: cached data
  • Context
    • context.Context 用於控制快取的生命周期。可以傳入 context.Background() 或者根據需要傳入可取消的上下文(如帶有超時或截止時間的上下文)。
  • Config 設定
    • bigcache.DefaultConfig 提供了預設配置,參數為快取條目的過期時間(此處設為 10 分鐘)。
    • Shards:分片數量,默認為 256。增加分片數量可以提升在高併發環境下的性能。
    • MaxEntriesInWindow:在一個清理窗口內最多允許的條目數量。
    • HardMaxCacheSize:快取的最大大小,以 MB 為單位。
  • 快取操作
    • cache.Set(key, value):設置快取值。
    • cache.Get(key):獲取快取值。如果鍵不存在,會返回錯誤。
  • 關閉快取
    • 使用 defer cache.Close() 確保在應用結束時正確關閉快取,釋放資源。

使用 Single Flight 模式來避免重複來源請求

在多協程環境下,當多個請求同時需要相同的資料且該資料尚未存在於快取中時,可能會導致多次來源請求,造成資源浪費。singleflight模式可以確保相同的請求只會觸發一次資料來源請求,避免重複操作。

  • 同樣要在terminal中輸入:
go get -u golang.org/x/sync/singleflight
package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/allegro/bigcache/v3"
	"golang.org/x/sync/singleflight"
)

var (
	cache   *bigcache.BigCache
	sfGroup singleflight.Group
)

// 初始化 BigCache
func initCache() {
	ctx := context.Background()

	// 設定 BigCache 的配置
	config := bigcache.DefaultConfig(10 * time.Minute)
	config.Shards = 1024             // 分片數量,默認為 256
	config.MaxEntriesInWindow = 1000 // 最大條目數量
	config.HardMaxCacheSize = 8192   // 最大快取大小(MB)

	var err error
	cache, err = bigcache.New(ctx, config)
	if err != nil {
		panic(fmt.Errorf("初始化 BigCache 失敗: %v", err))
	}
}

// 模擬從來源取得資料的過程
func fetchFromSource(key string) []byte {
	time.Sleep(2 * time.Second) // 模擬延遲
	return []byte("fetched data for " + key)
}

// 獲取值的函數,結合 BigCache 和 singleflight
func getValue(key string) ([]byte, error) {
	// 嘗試從快取中獲取值
	value, err := cache.Get(key)
	if err == nil {
		return value, nil
	}

	// 使用 singleflight 確保相同 key 的來源請求只發送一次
	result, err, _ := sfGroup.Do(key, func() (interface{}, error) {
		// 再次檢查快取,避免在等待期間其他 Goroutine 已經設置了值
		value, err := cache.Get(key)
		if err == nil {
			return value, nil
		}

		// 從來源獲取資料
		value = fetchFromSource(key)

		// 設置快取值
		if err := cache.Set(key, value); err != nil {
			return nil, fmt.Errorf("設置快取值失敗: %v", err)
		}
		return value, nil
	})
	if err != nil {
		return nil, err
	}
	return result.([]byte), nil
}

func main() {
	// 初始化快取
	initCache()
	defer cache.Close() // 確保在應用結束時關閉快取

	key := "myKey"

	var wg sync.WaitGroup
	numGoroutines := 5
	wg.Add(numGoroutines)

	// 模擬多個並發請求
	for i := 0; i < numGoroutines; i++ {
		go func(id int) {
			defer wg.Done()
			value, err := getValue(key)
			if err != nil {
				fmt.Printf("Goroutine %d: 錯誤: %v\n", id, err)
				return
			}
			fmt.Printf("Goroutine %d: 獲取值: %s\n", id, string(value))
		}(i)
	}

	// 等待所有 Goroutines 完成
	wg.Wait()
}
</* Output: */>預期輸出(所有 Goroutines 獲取到的值相同,且來源請求只執行一次)
Goroutine 0: 獲取值: fetched data for myKey
Goroutine 1: 獲取值: fetched data for myKey
Goroutine 2: 獲取值: fetched data for myKey
Goroutine 3: 獲取值: fetched data for myKey
Goroutine 4: 獲取值: fetched data for myKey
  • 當多個 Goroutines 同時請求相同的 key 且該 key 尚未存在於快取中時,singleflight.Group 確保只會有一個請求實際執行 fetchFromSource,其他 Goroutines 會等待該請求完成並共享結果,避免重複的來源請求。
  • singleflight.Group.Do 的回調函數中,先再次檢查快取,確保在等待期間其他 Goroutine 已經設置了值,從而避免不必要的來源請求。

Fan-In 與 Fan-Out 模式介紹

Fan-InFan-Out 是兩種常見的併發模式,用於管理多個 Goroutines 之間的工作分配和結果匯總。

模式 說明
Fan-Out Fan-Out 模式指的是將多個任務分發給多個 Goroutines 處理。這種模式可以有效地利用多核 CPU 提升處理效率。
Fan-In Fan-In 模式指的是將多個 Goroutines 的結果匯集到一個 Channel 中,便於後續的統一處理。
package main

import (
    "fmt"
    "sync"
)

// Worker 函數,負責處理從 jobs 通道接收到的工作,並將結果發送到 results 通道
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        fmt.Printf("Worker %d 處理工作 %d\n", id, job)
        results <- job * 2 // 模擬處理結果
    }
}

func main() {
    const numWorkers = 3
    jobs := make(chan int, 5)    // 用於發送工作的通道
    results := make(chan int, 5) // 用於接收結果的通道
    var wg sync.WaitGroup

    // 啟動工作池(Fan-Out)
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    // 分派工作(Fan-Out 的另一部分)
    for j := 1; j <= 5; j++ {
        jobs <- j
    }
    close(jobs) // 關閉工作通道,告訴工作者沒有更多工作

    // 等待所有工作完成
    wg.Wait()
    close(results) // 關閉結果通道,告訴主線程沒有更多結果

    // 獲取結果(Fan-In)
    for res := range results {
        fmt.Println("結果:", res)
    }

    fmt.Println("所有工作已完成")
}
</* Output: */>預期輸出
Worker 1 處理工作 1
Worker 2 處理工作 2
Worker 3 處理工作 3
Worker 1 處理工作 4
Worker 2 處理工作 5
結果: 2
結果: 4
結果: 6
結果: 8
結果: 10
所有工作已完成

https://ithelp.ithome.com.tw/upload/images/20240922/201618502pSDdyIvgf.png

  • 工作池的啟動(Fan-Out)
    • 主線程啟動了 numWorkers 個 Goroutines,每個 Goroutine 都會從 jobs 通道接收工作並處理,然後將結果發送到 results 通道。
    • 這樣做的好處是可以同時處理多個工作,提高處理效率。
  • 工作分派(Fan-Out)
    • 主線程將多個工作發送到 jobs 通道。由於有多個工作者 Goroutines 同時監聽這個通道,工作會被分散到不同的工作者進行處理。
  • 結果匯集(Fan-In)
    • 所有工作者將結果發送到同一個 results 通道,主線程從這個通道中接收結果。
    • 這樣做的好處是可以統一管理所有工作者的結果,方便後續處理。

總結

在本篇文章中,我們深入探討了 Go 語言中的多線程處理技術,重點介紹了以下幾個重要概念和工具:

  1. WaitGroup:
  • 使用 WaitGroup 來同步多個 Goroutines,確保所有並發操作完成後再繼續執行後續邏輯。這比使用傳統的 Mutex 更加簡便,特別適合等待一組 Goroutines 的完成。
  1. BigCache:
  • 引入了 BigCache 作為高效能的快取解決方案,專為高並發環境設計。BigCache 內建處理併發問題,無需手動管理鎖,顯著簡化了快取的實現,同時提升了應用的性能和響應速度。
  1. Singleflight 模式:
  • 使用 singleflight 模式避免在多個 Goroutines 同時請求相同資料時發生重複來源請求。這不僅節省了資源,還提升了系統的穩定性和效率。
  1. Fan-In 與 Fan-Out 模式:
  • Fan-Out:將工作任務分發給多個 Goroutines 並行處理,有效利用多核 CPU 提升處理效率。
  • Fan-In:將多個 Goroutines 的結果匯集到一個通道中,便於統一管理和後續處理。

希望這篇文章能夠幫助你更好地理解和應用 Go 語言中的並發處理技術,進而開發出高效能的應用程式!


延伸閱讀

如果你想學習更多平行化管理的技巧,歡迎查看下面內容。


上一篇
【Day14】多線程/平行化處理 I | Goroutines 和 Channels
下一篇
【Day16】連接資料庫 I | GORM & PostgresSQL 基本認識
系列文
Go 快 Go 高效: 從基礎語法到現代Web應用開發30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言