iT邦幫忙

第 11 屆 iT 邦幫忙鐵人賽

DAY 23
1
Software Development

Golang入門到進階實戰系列 第 23

Day 23 協程同步的三個方法 - WaitGroup

協程同步的三種方法

Mutex

互斥鎖,可以創建為其他結構體的字段;零值為解鎖 狀態,Mutex類型的鎖和線程無關,可以由不同的線程加鎖和解鎖。

Channel

使用Go語言的channel

WaitGroup

它能夠阻塞主線程的執行,直到所有的goroutine執行完畢。要注意goroutine的執行結果是亂序的,調度器無法保証goroutine執行順序,且進程結束時不會等待goroutine退出。

WaitGroup使用詳解

WaitGroup總共有三個方法:

  • Add(delta int) : 計數器增加delta
  • Done() : 計數器-1,相當於Add(-1)
  • Wait() : 阻塞直到所有的WaitGroup數量變為零,即計數器變為0

sync.WaitGroup實現了一個類似Que的資料結構,我們可以不斷地向Que添加並發任務,每添加一個任務,就將計數器的值增加1,若我們啟動了 N 個並發任務時時,就需要把計數器增加 N 。每個任務完成時通過呼叫 Done()方法將計數器減1,並且從Que中刪除。如果隊例中的任務尚未執行完畢,我們通過調用 Wait() 來發出阻塞, 直到計數器歸零時,表示所有並發協程已經完成。

var wg sync.WaitGroup //宣告全域的WaitGroup
var count int32

func AddOne() { //定義函數,每次調用時count加1
	defer wg.Done()
	count++
}

func main()  {
	wg.Add(3) //往WaitGroup裡添加3個goroutine
	go AddOne()
	go AddOne()
	go AddOne()
	wg.Wait()
	fmt.Printf("Count: %d", count ) //執行結束,輸出Count: 3
}

WaitGroup的特點是可以調用Wait()來阻塞隊列,直到隊列中的並發任務執行完畢才解除阻塞,不用sleep固定時間來等待。缺點是無法指定goroutine的並發協程數目。

WaitGroup源碼閱讀

信號量

信號量是Unix系統提供的一種共享資源的保護機制,用於防止多個線程同時訪問某個資源。

當信號量>0時,表示資源可用。
當信號量==0時,表示資源暫不可用。

線程獲取資源時,系統將信號量減1。當信號量為0時,當前線程會進入睡眠,直到信號量為正時線程會被喚醒。

資料結構

源碼包src/sync/waitgroup.go:WaitGroup的結構體定義如下:

type WaitGroup struct {
	state1 [3]uint32
}

state1 是一個長度為3的array,包含了兩組計數器和一個信號量。

  • counter : 當前還未執行結束的goroutine計數器
  • waiter count : 等待goroutine-group結束的goroutine數量,即等候者的數量
  • semaphore : 信號量
    https://ithelp.ithome.com.tw/upload/images/20191008/20120698CUTVP4tIWL.png
    ##接口
    WaitGroup對外提供三個接口,Add(delta int),Wait()和Done(),下面介紹這三個函數的實現細節。

Add(delta int)

Add()的功能有兩個,第一個是將delta值加到counter裡頭,因為delta可以為負值,所以counter有可能變成0或負值。Add()的第二個功能就是判斷counter的值,當其為0時,根據 waiter 數值釋放等量的信號量,把等待的goroutine全部喚醒,如果counter變為負值,則panic。

func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state() //獲取state和semphore的指針
    
    state := atomic.AddUint64(statep, uint64(delta)<<32) //把delta值加到counter
    v := int32(state >> 32) //獲取counter值
    w := uint32(state)      //獲取waiter值
    
    if v < 0 {              //如果counter值為負數,則panic
        panic("sync: negative WaitGroup counter")
    }

    //如果counter大於零,或是waiter為零(沒有等待者),則直接退出
    if v > 0 || w == 0 {
        return
    }

    //當counter等於0時,waiter一定大於零(內部維護waiter數目,不會出現小於等於零的情況)
    //先把counter歸零,再釋放waiter個數的信號量
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false)
    }
}

Wait()

Wait()的功能為累加waiter以及阻塞等待信號量

func (wg *WaitGroup) Wait() {
    statep, semap := wg.state() //獲取state和semaphore的指針
    for {
        state := atomic.LoadUint64(statep) //獲取state值
        v := int32(state >> 32)            //獲取counter值
        w := uint32(state)                 //獲取waiter值
        if v == 0 {                        //當counter為0,代表所有的goroutine都結束了,直接退出
            return
        }
        
        // 使用CAS函數累加waiter,保証有多個goroutine同時執行Wait()時也能正確累加waiter
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            runtime_Semacquire(semap) 
            return
        }
    }
}

Done()

Done()等同於Add(-1),也就是把counter減1。

func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

WaitGroup的坑

  • Add()操作必須早於Wait(),否則會panic
  • Add()設置的值必須與實際等待的goroutine數量一致,否則會panic

上一篇
Day22 競爭條件Race Condition
下一篇
Day24 並發同步機制(2) - Channel
系列文
Golang入門到進階實戰30

尚未有邦友留言

立即登入留言