iT邦幫忙

2025 iThome 鐵人賽

DAY 26
0
Software Development

Go Clean Architecture API 開發全攻略系列 第 26

[Day 26] 快取服務(二):最佳實踐與注意事項

  • 分享至 

  • xImage
  •  

我們這裡會以銀行的分行資料為例,來說明快取服務的最佳實踐與注意事項。

第三步:用「裝飾者模式」整合快取邏輯

在上一篇中,我們已經完成了快取服務的基本架構,接下來我們會進一步優化這個快取服務。
我們會使用「裝飾者模式」(Decorator Pattern) 來整合快取邏輯,讓程式碼更具可讀性與可維護性。

我們不希望在 Repository 的 GORM 實作中,混入 Redis 的快取邏輯。這會違反「單一職責原則」。一個絕佳的解決方案是使用裝飾者模式(Decorator Pattern)

我們將建立一個新的 CachedRepository,它像一個「包裝盒」,將我們原有的 Repository 實作包裝起來。它實作了與 Repository 完全相同的介面,但內部增加了快取邏輯。

type CachedRepository struct {
  repo  *mysql.Database
  cache *cache.Service
}

func (c *CachedRepository) GetBranchInfos(ctx context.Context) (*[]domain.BranchInfo, error) {
  // 先從快取取得
  models, err := c.cache.GetBranchInfos(ctx)
  if err == nil {
    return models, nil
  }

  // 如果快取沒有命中,則從資料庫取得
  models, err = c.repo.GetBranchInfos(ctx)
  if err != nil {
    return nil, err
  }

  // 將資料寫入快取
  err = c.cache.SetBranchInfos(ctx, models)
  if err != nil {
    log.Printf("failed to set cache: %v", err)
  }

  // 回傳資料
  return models, nil
}

這時候,我們就可以在傳入 usecaseRepository 實例時,選擇是否要使用快取功能。

快取的經典問題

引入快取也會帶來新的複雜性,你需要了解幾個經典問題:

  • 快取穿透(cache penetration):查詢一個絕對不存在的資料,導致每次請求都穿過快取,直接打到資料庫。
    原因:例如用戶查詢不存在的用戶 ID,快取中沒有,資料庫也沒有,每次都要查詢資料庫。
    解決方案:可以為「空結果」也進行短時間的快取(如快取 null 或特殊標記),避免重複查詢資料庫。

  • 快取擊穿(cache breakdown):一個熱點 Key 在過期的瞬間,大量併發請求同時打到資料庫。
    原因:熱門資料剛好過期,快取失效,所有請求同時查詢資料庫,造成瞬間壓力。
    解決方案:使用分散式鎖(如 Redis 的 setnx),確保只有一個請求去資料庫回填快取,其他請求等待快取更新。

  • 快取雪崩(cache avalanche):大量 Key 在同一時間集體失效,導致資料庫壓力瞬間劇增。
    原因:例如系統重啟或快取設置了相同的過期時間,導致大量快取同時失效。
    解決方案:為每個 Key 的過期時間增加一個隨機的「抖動(Jitter)」,讓快取失效時間分散,減少同時失效的風險。

Cache breakdown 的解決方案

在這裡,我們會針對 Cache breakdown 的問題,提供多種解決方案。

先說說 單一台主機時 與 多台主機時 的解決方案。

單一台主機

這邊我們會使用 singleflight 這個套件來解決 cache breakdown 的問題。

singleflight

它是以同一個 key 來做請求的合併,確保同一時間只有一個請求會去執行。

主要是提供了以下兩個 function:

Do(key string, fn func() (any, error)) (v any, err error, shared bool)
// 這個方法跟上方 Do 的差別只有回應的方式,一個是直接回傳,一個是回傳 channel,本文就不多做介紹
DoChan(key string, fn func() (any, error)) <-chan singleflight.Result

根據我們前面實作 CachedRepository 的範例,我們可以看出在 Cache-Aside Pattern 中,當快取沒有命中時,會去資料庫查詢資料並回填快取。

就會是主要三個 function:

  1. Get from cache
  2. Get from DB
  3. Set to cache

所以到了使用 singleflight 的時候,就會多一個 Identifier,用來標識這個請求。

// 定義一個 interface,包含 Do 和 DoChan 方法(幫 singleflight 做 interface)
type group interface {
  Do(key string, fn func() (any, error)) (v any, err error, shared bool)
}

// 定義一個 job 結構體,包含工作標識、快取讀取函數、快取寫入函數、一次性讀取函數和預設值
type job[T any] struct {
  // 用來標識這個請求
	WorkIdentify string
  // 從快取讀取資料的函數
	CacheGetter  func(ctx context.Context) (*T, error)
  // 將資料寫入快取的函數
	CacheSetter  func(ctx context.Context, value *T) error
  // 從資料庫或其他來源一次性讀取資料的函數
	OnceGetter   func(ctx context.Context) (*T, error)
}

// 定義一個方法,使用 singleflight 來執行工作
func (job job[T]) doWith(ctx context.Context, engine group) (*T, error) {
  // 使用 singleflight 的 Do 方法來執行工作
  // 這裏就保證同一時間只有一個請求會去執行
	m, err, _ := engine.Do(job.WorkIdentify, func() (any, error) {
    // 1. 先從快取取得資料
		v, e := job.CacheGetter(ctx)
		if e == nil {
			return v, nil
		}

    // 2. 如果快取沒有命中,則從資料庫或其他來源取得資料
		v, e = job.OnceGetter(ctx)
		if e != nil {
			return nil, e
		}

    // 3. 將資料寫入快取
		e = job.CacheSetter(ctx, v)
		if e != nil {
			return nil, e
		}

    // 4. 回傳資料
		return v, nil
	})

	if err != nil {
		return nil, err
	}

	value, ok := m.(*T)
	if !ok {
		return nil, errors.New("type assertion failed")
	}

	return value, nil
}

這時候,我們就可以幫 CachedRepository 再升級,讓 singleflight 來解決 cache breakdown 的問題。

type repo interface {
  GetBranchInfos(ctx context.Context) (*[]domain.BranchInfo, error)
}

type cache interface {
  GetBranchInfos(ctx context.Context) (*[]domain.BranchInfo, error)
  SetBranchInfos(ctx context.Context, value *[]domain.BranchInfo) error
}

type Service struct {
  repo  repo
  cache cache
  group group
}

func NewService(repo repo, cache cache) *Service {
  return &Service{
    repo:  repo,
    cache: cache,
    group: &singleflight.Group{},
  }
}

func (s *Service) GetBranchInfos(ctx context.Context) (*[]domain.BranchInfo, error) {
	job := job[[]domain.BranchInfo]{
		WorkIdentify: "GetBranchInfos",
		CacheGetter:  s.cache.GetBranchInfos,
		CacheSetter:  s.cache.SetBranchInfos,
		OnceGetter:   s.repo.GetBranchInfos,
	}

  return job.doWith(ctx, s.group)
}

同樣因為是使用裝飾者模式,所以我們可以直接在 usecase 傳入 Service

多台主機

多台主機的話,因為 singleflight 是在單一台主機上做請求的合併,所以我們需要使用分散式鎖來解決這個問題。

這邊我們會使用 Redis 的 setnx 來實作一個簡單的分散式鎖。

type RedisLock struct {
  client *redis.Client
  key    string
  value  string
  ttl    time.Duration
}

func NewRedisLock(client *redis.Client, key string, ttl time.Duration) *RedisLock {
  return &RedisLock{
    client: client,
    key:    key,
    value:  uuid.New().String(), // 使用 UUID 作為鎖的值,確保唯一性
    ttl:    ttl,
  }
}

func (l *RedisLock) TryLock(ctx context.Context) (bool, error) {
  // 使用 SET 命令設置鎖,NX 表示只有在鍵不存在時才設置,PX 表示設置鍵的過期時間
  result, err := l.client.SetNX(ctx, l.key, l.value, l.ttl).Result()
  if err != nil {
    return false, err
  }
  return result, nil // 返回是否成功獲取鎖
}

func (l *RedisLock) Unlock(ctx context.Context) error {
  // 使用 Lua 腳本來確保只有持有鎖的客戶端才能釋放鎖
  script := `
    if redis.call("get", KEYS[1]) == ARGV[1] then
      return redis.call("del", KEYS[1])
    else
      return 0
    end
  `
  result, err := l.client.Eval(ctx, script, []string{l.key}, l.value).Result()
  if err != nil {
    return err
  }
  if result.(int64) == 0 {
    return errors.New("unlock failed: not the lock owner")
  }
  return nil
}

singleflight 一樣,我們也可以將這整個流程做一個 Generic function。

// internal/domain/type.go
type LockManager interface {
	NewLock(key string, ttl time.Duration) Lock
}

type Lock interface {
	TryLock(ctx context.Context) (bool, error)
	Unlock(ctx context.Context) error
}
// internal/service/nx_cache/job.go
type job[T any] struct {
	// 用來標識這個請求
	LockIdentify string
	// 從快取讀取資料的函數
	CacheGetter func(ctx context.Context) (*T, error)
	// 將資料寫入快取的函數
	CacheSetter func(ctx context.Context, value *T) error
	// 從資料庫或其他來源一次性讀取資料的函數
	OnceGetter func(ctx context.Context) (*T, error)
	// lock 鎖住的最大時間,超過這個時間會自動釋放鎖
	LockTTL time.Duration
	// lock 鎖住時的等待重取時間
	RetryWaitTime time.Duration
}

func (job job[T]) doWithLock(ctx context.Context, lockManager domain.LockManager) (*T, error) {
	// 1. 先從快取取得資料
	models, err := job.CacheGetter(ctx)
	if err == nil {
		return models, nil
	}

	// 2. 嘗試獲取分散式鎖
	lock := lockManager.NewLock(job.LockIdentify, job.LockTTL)
	locked, err := lock.TryLock(ctx)
	if err != nil {
		return nil, err
	}
	if locked {
		defer func() {
			if err := lock.Unlock(ctx); err != nil {
				log.Printf("failed to unlock: %v", err)
			}
		}() // 確保在函數結束時釋放鎖
		// 3. 獲取鎖成功,從資料庫取得資料
		models, err = job.OnceGetter(ctx)
		if err != nil {
			return nil, err
		}

		// 4. 將資料寫入快取
		err = job.CacheSetter(ctx, models)
		if err != nil {
			return nil, err
		}

		// 5. 回傳資料
		return models, nil
	}

	// 6. 獲取鎖失敗,表示有其他請求正在更新快取
	// 可以選擇等待一段時間後重試,或者直接回傳錯誤
	// 這裏我們實作等待一段時間後重試,等待的時間可以根據實際情況調整
	time.Sleep(job.RetryWaitTime)
	return job.CacheGetter(ctx)
}

接下來,我們可以將這個分散式鎖建立一個 Service,來解決多台主機的 cache breakdown 問題。

// internal/service/nx_cache/nx_cache.go
type repo interface {
	GetBranchInfos(ctx context.Context) (*[]domain.BranchInfo, error)
}

type cache interface {
	GetBranchInfos(ctx context.Context) (*[]domain.BranchInfo, error)
	SetBranchInfos(ctx context.Context, value *[]domain.BranchInfo) error
}

type Service struct {
	repo        repo
	cache       cache
	lockManager domain.LockManager
}

func NewService(repo repo, cache cache, lockManager domain.LockManager) *Service {
	return &Service{
		repo:        repo,
		cache:       cache,
		lockManager: lockManager,
	}
}

func (s *Service) GetBranchInfos(ctx context.Context) (*[]domain.BranchInfo, error) {
	job := job[[]domain.BranchInfo]{
		LockIdentify:  "branch_infos_lock",
		CacheGetter:   s.cache.GetBranchInfos,
		CacheSetter:   s.cache.SetBranchInfos,
		OnceGetter:    s.repo.GetBranchInfos,
		LockTTL:       5 * time.Second,
		RetryWaitTime: 100 * time.Millisecond,
	}
	return job.doWithLock(ctx, s.lockManager)
}

這樣就完成了多台主機的 cache breakdown 解決方案。

當多台主機同時請求資料時,只有一台主機會成功獲取鎖並從資料庫取得資料,其他主機會因為無法獲取鎖,而嘗試等待一定時間後重試。

這樣的設計雖然能有效防止資料庫過載,但也可能會導致部分請求失敗,這在某些應用場景下可能不可接受。

單台與多台主機解決方案的結合(進階)

實務上,我們可以將 singleflight(單台主機合併請求)與 Redis 分散式鎖(多台主機協調)結合,
這樣可以保證單一台主機內的請求合併,同時也能在多台主機間協調,確保只有一台主機會去資料庫查詢並回填快取。

這個疊加就不複雜了,就是建立一個 function 使用 singleflightDo 方法來包裝 nx_cacheGetBranchInfos 方法。

// internal/service/sfnx/sfnx.go
type group interface {
	Do(key string, fn func() (any, error)) (v any, err error, shared bool)
}

type nxCache interface {
	GetBranchInfos(ctx context.Context) (*[]domain.BranchInfo, error)
}

type Service struct {
	nxCache nxCache
	group   group
}

func NewService(nxCache nxCache) *Service {
	return &Service{
		nxCache: nxCache,
		group:   &singleflight.Group{},
	}
}

func (s *Service) GetBranchInfos(ctx context.Context) (*[]domain.BranchInfo, error) {
	v, err, _ := s.group.Do("GetBranchInfos:SFNX", func() (any, error) {
		return s.nxCache.GetBranchInfos(ctx)
	})
	if err != nil {
		return nil, err
	}

	value, ok := v.(*[]domain.BranchInfo)
	if !ok {
		return nil, errors.New("type assertion to *[]domain.BranchInfo failed")
	}

	return value, nil
}

總結

在本篇文章中,我們探討了快取服務的最佳實踐與注意事項,並介紹了如何使用裝飾者模式來整合快取邏輯。
我們也討論了快取穿透、快取擊穿和快取雪崩等經典問題,並提供了相應的解決方案。

此外,我們還介紹了快取更新策略的兩種主要方法:主動更新和被動更新,並分析了它們的優缺點。
最後,我們詳細說明了如何使用 singleflight 和 Redis 分散式鎖來解決 cache breakdown 的問題,並展示了如何將這兩種方法結合起來,以實現更高效的快取服務。

詳細的程式碼,請參考 Github 這一個 commit。


上一篇
[Day 25] 快取服務(一):使用 Redis
下一篇
[Day 27] 重複的程式碼,是一種原罪
系列文
Go Clean Architecture API 開發全攻略27
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言