互斥鎖,可以創建為其他結構體的字段;零值為解鎖 狀態,Mutex類型的鎖和線程無關,可以由不同的線程加鎖和解鎖。
使用Go語言的channel
它能夠阻塞主線程的執行,直到所有的goroutine執行完畢。要注意goroutine的執行結果是亂序的,調度器無法保証goroutine執行順序,且進程結束時不會等待goroutine退出。
WaitGroup總共有三個方法:
sync.WaitGroup實現了一個類似Que的資料結構,我們可以不斷地向Que添加並發任務,每添加一個任務,就將計數器的值增加1,若我們啟動了 N 個並發任務時時,就需要把計數器增加 N 。每個任務完成時通過呼叫 Done()方法將計數器減1,並且從Que中刪除。如果隊例中的任務尚未執行完畢,我們通過調用 Wait() 來發出阻塞, 直到計數器歸零時,表示所有並發協程已經完成。
var wg sync.WaitGroup //宣告全域的WaitGroup
var count int32
func AddOne() { //定義函數,每次調用時count加1
defer wg.Done()
count++
}
func main() {
wg.Add(3) //往WaitGroup裡添加3個goroutine
go AddOne()
go AddOne()
go AddOne()
wg.Wait()
fmt.Printf("Count: %d", count ) //執行結束,輸出Count: 3
}
WaitGroup的特點是可以調用Wait()來阻塞隊列,直到隊列中的並發任務執行完畢才解除阻塞,不用sleep固定時間來等待。缺點是無法指定goroutine的並發協程數目。
信號量是Unix系統提供的一種共享資源的保護機制,用於防止多個線程同時訪問某個資源。
當信號量>0時,表示資源可用。
當信號量==0時,表示資源暫不可用。
線程獲取資源時,系統將信號量減1。當信號量為0時,當前線程會進入睡眠,直到信號量為正時線程會被喚醒。
源碼包src/sync/waitgroup.go:WaitGroup的結構體定義如下:
type WaitGroup struct {
state1 [3]uint32
}
state1 是一個長度為3的array,包含了兩組計數器和一個信號量。
Add()的功能有兩個,第一個是將delta值加到counter裡頭,因為delta可以為負值,所以counter有可能變成0或負值。Add()的第二個功能就是判斷counter的值,當其為0時,根據 waiter 數值釋放等量的信號量,把等待的goroutine全部喚醒,如果counter變為負值,則panic。
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state() //獲取state和semphore的指針
state := atomic.AddUint64(statep, uint64(delta)<<32) //把delta值加到counter
v := int32(state >> 32) //獲取counter值
w := uint32(state) //獲取waiter值
if v < 0 { //如果counter值為負數,則panic
panic("sync: negative WaitGroup counter")
}
//如果counter大於零,或是waiter為零(沒有等待者),則直接退出
if v > 0 || w == 0 {
return
}
//當counter等於0時,waiter一定大於零(內部維護waiter數目,不會出現小於等於零的情況)
//先把counter歸零,再釋放waiter個數的信號量
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false)
}
}
Wait()的功能為累加waiter以及阻塞等待信號量
func (wg *WaitGroup) Wait() {
statep, semap := wg.state() //獲取state和semaphore的指針
for {
state := atomic.LoadUint64(statep) //獲取state值
v := int32(state >> 32) //獲取counter值
w := uint32(state) //獲取waiter值
if v == 0 { //當counter為0,代表所有的goroutine都結束了,直接退出
return
}
// 使用CAS函數累加waiter,保証有多個goroutine同時執行Wait()時也能正確累加waiter
if atomic.CompareAndSwapUint64(statep, state, state+1) {
runtime_Semacquire(semap)
return
}
}
}
Done()等同於Add(-1),也就是把counter減1。
func (wg *WaitGroup) Done() {
wg.Add(-1)
}