源碼剖析sync.Cond(條件變量的實現(xiàn)機制)
前言
這是我并發(fā)編程系列的第三篇文章,這一篇我們一起來看看sync.Cond的使用與實現(xiàn)。之前寫過java的朋友對等待/通知(wait/notify)機制一定很熟悉,可以利用等待/通知機制實現(xiàn)阻塞或者喚醒,在Go語言使用Cond也可以達到同樣的效果,接下來我們一起來看看它的使用與實現(xiàn)。
sync.Cond的基本使用
Go標準庫提供了Cond原語,為等待/通知場景下的并發(fā)問題提供支持。Cond他可以讓一組的Goroutine都在滿足特定條件(這個等待條件有很多,可以是某個時間點或者某個變量或一組變量達到了某個閾值,還可以是某個對象的狀態(tài)滿足了特定的條件)時被喚醒,Cond是和某個條件相關(guān),這個條件需要一組goroutine協(xié)作共同完成,在條件還沒有滿足的時候,所有等待這個條件的goroutine都會被阻塞住,只有這一組goroutine通過協(xié)作達到了這個條件,等待的goroutine才可以繼續(xù)進行下去。
先看這樣一個例子:
var (
done = false
topic = "Golang夢工廠"
)
func main() {
cond := sync.NewCond(&sync.Mutex{})
go Consumer(topic,cond)
go Consumer(topic,cond)
go Consumer(topic,cond)
Push(topic,cond)
time.Sleep(5 * time.Second)
}
func Consumer(topic string,cond *sync.Cond) {
cond.L.Lock()
for !done{
cond.Wait()
}
fmt.Println("topic is ",topic," starts Consumer")
cond.L.Unlock()
}
func Push(topic string,cond *sync.Cond) {
fmt.Println(topic,"starts Push")
cond.L.Lock()
done = true
cond.L.Unlock()
fmt.Println("topic is ",topic," wakes all")
cond.Broadcast()
}
// 運行結(jié)果
Golang夢工廠 starts Push
topic is Golang夢工廠 wakes all
topic is Golang夢工廠 starts Consumer
topic is Golang夢工廠 starts Consumer
topic is Golang夢工廠 starts Consumer
上述代碼我們運行了4個Goroutine,其中三個Goroutine分別做了相同的事情,通過調(diào)用cond.Wait()等特定條件的滿足,1個Goroutine會調(diào)用cond.Broadcast喚醒所用陷入等待的Goroutine。畫個圖看一下更清晰:
我們看上面這一段代碼,Cond使用起來并不簡單,使用不當就出現(xiàn)不可避免的問題,所以,有的開發(fā)者會認為,Cond是唯一難以掌握的Go并發(fā)原語。為了讓大家能更好的理解Cond,接下來我們一起看看Cond的實現(xiàn)原理。
Cond實現(xiàn)原理
Cond的實現(xiàn)還是比較簡單的,代碼量比較少,復(fù)雜的邏輯已經(jīng)被Locker或者runtime的等待隊列實現(xiàn)了,所以我們來看這些源代碼也會輕松一些。首先我們來看一下它的結(jié)構(gòu)體:
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
主要有4個字段:
nocopy:之前在講waitGroup時介紹過,保證結(jié)構(gòu)體不會在編譯器期間拷貝,原因就不在這里說了,想了解的看這篇文章源碼剖析sync.WaitGroup(文末思考題你能解釋一下嗎?)checker:用于禁止運行期間發(fā)生拷貝,雙重檢查(Double check)L:可以傳入一個讀寫鎖或互斥鎖,當修改條件或者調(diào)用wait方法時需要加鎖notify:通知鏈表,調(diào)用wait()方法的Goroutine會放到這個鏈表中,喚醒從這里取。我們可以看一下notifyList的結(jié)構(gòu):
type notifyList struct {
wait uint32
notify uint32
lock uintptr // key field of the mutex
head unsafe.Pointer
tail unsafe.Pointer
}
我們簡單分析一下notifyList的各個字段:
wait:下一個等待喚醒Goroutine的索引,他是在鎖外自動遞增的.notify:下一個要通知的Goroutine的索引,他可以在鎖外讀取,但是只能在鎖持有的情況下寫入.head:指向鏈表的頭部tail:指向鏈表的尾部
基本結(jié)構(gòu)我們都知道了,下面我就來看一看Cond提供的三種方法是如何實現(xiàn)的~。
wait
我們先來看一下wait方法源碼部分:
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
代碼量不多,執(zhí)行步驟如下:
執(zhí)行運行期間拷貝檢查,如果發(fā)生了拷貝,則直接 panic程序調(diào)用 runtime_notifyListAdd將等待計數(shù)器加一并解鎖;調(diào)用 runtime_notifyListWait等待其他Goroutine的喚醒并加鎖
runtime_notifyListAdd的實現(xiàn):
// See runtime/sema.go for documentation.
func notifyListAdd(l *notifyList) uint32 {
// This may be called concurrently, for example, when called from
// sync.Cond.Wait while holding a RWMutex in read mode.
return atomic.Xadd(&l.wait, 1) - 1
}
代碼實現(xiàn)比較簡單,原子操作將等待計數(shù)器加1,因為wait代表的是下一個等待喚醒Goroutine的索引,所以需要減1操作。
runtime_notifyListWait的實現(xiàn):
// See runtime/sema.go for documentation.
func notifyListWait(l *notifyList, t uint32) {
lockWithRank(&l.lock, lockRankNotifyList)
// Return right away if this ticket has already been notified.
if less(t, l.notify) {
unlock(&l.lock)
return
}
// Enqueue itself.
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
if t0 != 0 {
blockevent(s.releasetime-t0, 2)
}
releaseSudog(s)
}
這里主要執(zhí)行步驟如下:
檢查當前 wait與notify索引位置是否匹配,如果已經(jīng)被通知了,便立即返回.獲取當前 Goroutine,并將當前Goroutine追加到鏈表末端.調(diào)用 goparkunlock方法讓當前Goroutine進入等待狀態(tài),也就是進入睡眠,等待喚醒被喚醒后,調(diào)用 releaseSudog釋放當前等待列表中的Goroutine
看完源碼我們來總結(jié)一下注意事項:
wait方法會把調(diào)用者放入Cond的等待隊列中并阻塞,直到被喚醒,調(diào)用wait方法必須要持有c.L鎖。
signal和Broadcast
signal和Broadcast都會喚醒等待隊列,不過signal是喚醒鏈表最前面的Goroutine,Boradcast會喚醒隊列中全部的Goroutine。下面我們分別來看一下signal和broadcast的源碼:
signal
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
func notifyListNotifyOne(l *notifyList) {
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
atomic.Store(&l.notify, t+1)
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}
上面我們看wait源代碼時,每次都會調(diào)用都會原子遞增wait,那么這個wait就代表當前最大的wait值,對應(yīng)喚醒的時候,也就會對應(yīng)一個notify屬性,我們在notifyList鏈表中逐個檢查,找到ticket對應(yīng)相等的notify屬性。這里大家肯定會有疑惑,我們?yōu)楹尾恢苯尤℃湵眍^部喚醒呢?
notifyList并不是一直有序的,wait方法中調(diào)用runtime_notifyListAdd和runtime_notifyListWait完全是兩個獨立的行為,中間還有釋放鎖的行為,而當多個 goroutine 同時進行時,中間會產(chǎn)生進行并發(fā)操作,這樣就會出現(xiàn)亂序,所以采用這種操作即使在 notifyList 亂序的情況下,也能取到最先Wait 的 goroutine。
broadcast
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
func notifyListNotifyAll(l *notifyList) {
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
s := l.head
l.head = nil
l.tail = nil
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
全部喚醒實現(xiàn)要簡單一些,主要是通過調(diào)用readyWithTime方法喚醒鏈表中的goroutine,喚醒順序也是按照加入隊列的先后順序,先加入的會先被喚醒,而后加入的可能 Goroutine 需要等待調(diào)度器的調(diào)度。
最后我們總結(jié)一下使用這兩個方法要注意的問題:
Signal:允許調(diào)用者喚醒一個等待此Cond的Goroutine,如果此時沒有等待的goroutine,顯然無需通知waiter;如果Cond等待隊列中有一個或者多個等待的goroutine,則需要從等待隊列中移除第一個goroutine并把它喚醒。調(diào)用Signal方法時,不強求你一定要持有c.L的鎖。broadcast:允許調(diào)用者喚醒所有等待此Cond的goroutine。如果此時沒有等待的goroutine,顯然無需通知 waiter;如果Cond等待隊列中有一個或者多個等待的goroutine,則清空所有等待的goroutine,并全部喚醒,不強求你一定要持有c.L的鎖。
注意事項
調(diào)用 wait方法的時候一定要加鎖,否則會導(dǎo)致程序發(fā)生panic.wait調(diào)用時需要檢查等待條件是否滿足,也就說goroutine被喚醒了不等于等待條件被滿足,等待者被喚醒,只是得到了一次檢查的機會而已,推薦寫法如下:
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
Signal和Boardcast兩個喚醒操作不需要加鎖
總結(jié)
其實Cond在實際項目中被使用的機會比較少,Go特有的channel就可以代替它,暫時只在Kubernetes項目中看到了應(yīng)用,使用場景是每次往隊列中成功增加了元素后就需要調(diào)用 Broadcast 通知所有的等待者,使用Cond就很合適,相比channel減少了代碼復(fù)雜性。
推薦閱讀
