<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          源碼剖析sync.Cond(條件變量的實現(xiàn)機制)

          共 8734字,需瀏覽 18分鐘

           ·

          2021-06-13 00:40

          前言

          這是我并發(fā)編程系列的第三篇文章,這一篇我們一起來看看sync.Cond的使用與實現(xiàn)。之前寫過java的朋友對等待/通知(wait/notify)機制一定很熟悉,可以利用等待/通知機制實現(xiàn)阻塞或者喚醒,在Go語言使用Cond也可以達到同樣的效果,接下來我們一起來看看它的使用與實現(xiàn)。

          sync.Cond的基本使用

          Go標準庫提供了Cond原語,為等待/通知場景下的并發(fā)問題提供支持。Cond他可以讓一組的Goroutine都在滿足特定條件(這個等待條件有很多,可以是某個時間點或者某個變量或一組變量達到了某個閾值,還可以是某個對象的狀態(tài)滿足了特定的條件)時被喚醒,Cond是和某個條件相關(guān),這個條件需要一組goroutine協(xié)作共同完成,在條件還沒有滿足的時候,所有等待這個條件的goroutine都會被阻塞住,只有這一組goroutine通過協(xié)作達到了這個條件,等待的goroutine才可以繼續(xù)進行下去。

          先看這樣一個例子:

          var (
           done = false
           topic = "Golang夢工廠"
          )

          func main() {
           cond := sync.NewCond(&sync.Mutex{})
           go Consumer(topic,cond)
           go Consumer(topic,cond)
           go Consumer(topic,cond)
           Push(topic,cond)
           time.Sleep(5 * time.Second)

          }

          func Consumer(topic string,cond *sync.Cond)  {
           cond.L.Lock()
           for !done{
            cond.Wait()
           }
           fmt.Println("topic is ",topic," starts Consumer")
           cond.L.Unlock()
          }

          func Push(topic string,cond *sync.Cond)  {
           fmt.Println(topic,"starts Push")
           cond.L.Lock()
           done = true
           cond.L.Unlock()
           fmt.Println("topic is ",topic," wakes all")
           cond.Broadcast()
          }
          // 運行結(jié)果
          Golang夢工廠 starts Push
          topic is  Golang夢工廠  wakes all
          topic is  Golang夢工廠  starts Consumer
          topic is  Golang夢工廠  starts Consumer
          topic is  Golang夢工廠  starts Consumer

          上述代碼我們運行了4Goroutine,其中三個Goroutine分別做了相同的事情,通過調(diào)用cond.Wait()等特定條件的滿足,1個Goroutine會調(diào)用cond.Broadcast喚醒所用陷入等待的Goroutine。畫個圖看一下更清晰:

          我們看上面這一段代碼,Cond使用起來并不簡單,使用不當就出現(xiàn)不可避免的問題,所以,有的開發(fā)者會認為,Cond是唯一難以掌握的Go并發(fā)原語。為了讓大家能更好的理解Cond,接下來我們一起看看Cond的實現(xiàn)原理。

          Cond實現(xiàn)原理

          Cond的實現(xiàn)還是比較簡單的,代碼量比較少,復(fù)雜的邏輯已經(jīng)被Locker或者runtime的等待隊列實現(xiàn)了,所以我們來看這些源代碼也會輕松一些。首先我們來看一下它的結(jié)構(gòu)體:

          type Cond struct {
           noCopy noCopy

           // L is held while observing or changing the condition
           L Locker

           notify  notifyList
           checker copyChecker
          }

          主要有4個字段:

          • nocopy :之前在講waitGroup時介紹過,保證結(jié)構(gòu)體不會在編譯器期間拷貝,原因就不在這里說了,想了解的看這篇文章源碼剖析sync.WaitGroup(文末思考題你能解釋一下嗎?)
          • checker:用于禁止運行期間發(fā)生拷貝,雙重檢查(Double check)
          • L:可以傳入一個讀寫鎖或互斥鎖,當修改條件或者調(diào)用wait方法時需要加鎖
          • notify:通知鏈表,調(diào)用wait()方法的Goroutine會放到這個鏈表中,喚醒從這里取。我們可以看一下notifyList的結(jié)構(gòu):
          type notifyList struct {
           wait   uint32
           notify uint32
           lock   uintptr // key field of the mutex
           head   unsafe.Pointer
           tail   unsafe.Pointer
          }

          我們簡單分析一下notifyList的各個字段:

          • wait:下一個等待喚醒Goroutine的索引,他是在鎖外自動遞增的.
          • notify:下一個要通知的Goroutine的索引,他可以在鎖外讀取,但是只能在鎖持有的情況下寫入.
          • head:指向鏈表的頭部
          • tail:指向鏈表的尾部

          基本結(jié)構(gòu)我們都知道了,下面我就來看一看Cond提供的三種方法是如何實現(xiàn)的~。

          wait

          我們先來看一下wait方法源碼部分:

          func (c *Cond) Wait() {
           c.checker.check()
           t := runtime_notifyListAdd(&c.notify)
           c.L.Unlock()
           runtime_notifyListWait(&c.notify, t)
           c.L.Lock()
          }

          代碼量不多,執(zhí)行步驟如下:

          • 執(zhí)行運行期間拷貝檢查,如果發(fā)生了拷貝,則直接panic程序
          • 調(diào)用runtime_notifyListAdd將等待計數(shù)器加一并解鎖;
          • 調(diào)用runtime_notifyListWait等待其他 Goroutine 的喚醒并加鎖

          runtime_notifyListAdd的實現(xiàn):

          // See runtime/sema.go for documentation.
          func notifyListAdd(l *notifyList) uint32 {
           // This may be called concurrently, for example, when called from
           // sync.Cond.Wait while holding a RWMutex in read mode.
           return atomic.Xadd(&l.wait, 1) - 1
          }

          代碼實現(xiàn)比較簡單,原子操作將等待計數(shù)器加1,因為wait代表的是下一個等待喚醒Goroutine的索引,所以需要減1操作。

          runtime_notifyListWait的實現(xiàn):

          // See runtime/sema.go for documentation.
          func notifyListWait(l *notifyList, t uint32) {
           lockWithRank(&l.lock, lockRankNotifyList)

           // Return right away if this ticket has already been notified.
           if less(t, l.notify) {
            unlock(&l.lock)
            return
           }

           // Enqueue itself.
           s := acquireSudog()
           s.g = getg()
           s.ticket = t
           s.releasetime = 0
           t0 := int64(0)
           if blockprofilerate > 0 {
            t0 = cputicks()
            s.releasetime = -1
           }
           if l.tail == nil {
            l.head = s
           } else {
            l.tail.next = s
           }
           l.tail = s
           goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
           if t0 != 0 {
            blockevent(s.releasetime-t0, 2)
           }
           releaseSudog(s)
          }

          這里主要執(zhí)行步驟如下:

          • 檢查當前waitnotify索引位置是否匹配,如果已經(jīng)被通知了,便立即返回.
          • 獲取當前Goroutine,并將當前Goroutine追加到鏈表末端.
          • 調(diào)用goparkunlock方法讓當前Goroutine進入等待狀態(tài),也就是進入睡眠,等待喚醒
          • 被喚醒后,調(diào)用releaseSudog釋放當前等待列表中的Goroutine

          看完源碼我們來總結(jié)一下注意事項:

          wait方法會把調(diào)用者放入Cond的等待隊列中并阻塞,直到被喚醒,調(diào)用wait方法必須要持有c.L鎖。

          signalBroadcast

          signalBroadcast都會喚醒等待隊列,不過signal是喚醒鏈表最前面的Goroutine,Boradcast會喚醒隊列中全部的Goroutine。下面我們分別來看一下signalbroadcast的源碼:

          • signal
          func (c *Cond) Signal() {
           c.checker.check()
           runtime_notifyListNotifyOne(&c.notify)
          }
          func notifyListNotifyOne(l *notifyList) {
           if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
            return
           }
           lockWithRank(&l.lock, lockRankNotifyList)
           t := l.notify
           if t == atomic.Load(&l.wait) {
            unlock(&l.lock)
            return
           }

           atomic.Store(&l.notify, t+1)

           for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
            if s.ticket == t {
             n := s.next
             if p != nil {
              p.next = n
             } else {
              l.head = n
             }
             if n == nil {
              l.tail = p
             }
             unlock(&l.lock)
             s.next = nil
             readyWithTime(s, 4)
             return
            }
           }
           unlock(&l.lock)
          }

          上面我們看wait源代碼時,每次都會調(diào)用都會原子遞增wait,那么這個wait就代表當前最大的wait值,對應(yīng)喚醒的時候,也就會對應(yīng)一個notify屬性,我們在notifyList鏈表中逐個檢查,找到ticket對應(yīng)相等的notify屬性。這里大家肯定會有疑惑,我們?yōu)楹尾恢苯尤℃湵眍^部喚醒呢?

          notifyList并不是一直有序的,wait方法中調(diào)用runtime_notifyListAddruntime_notifyListWait完全是兩個獨立的行為,中間還有釋放鎖的行為,而當多個 goroutine 同時進行時,中間會產(chǎn)生進行并發(fā)操作,這樣就會出現(xiàn)亂序,所以采用這種操作即使在 notifyList 亂序的情況下,也能取到最先Waitgoroutine

          • broadcast
          func (c *Cond) Broadcast() {
           c.checker.check()
           runtime_notifyListNotifyAll(&c.notify)
          }
          func notifyListNotifyAll(l *notifyList) {
           if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
            return
           }

           lockWithRank(&l.lock, lockRankNotifyList)
           s := l.head
           l.head = nil
           l.tail = nil

           atomic.Store(&l.notify, atomic.Load(&l.wait))
           unlock(&l.lock)
           for s != nil {
            next := s.next
            s.next = nil
            readyWithTime(s, 4)
            s = next
           }
          }

          全部喚醒實現(xiàn)要簡單一些,主要是通過調(diào)用readyWithTime方法喚醒鏈表中的goroutine,喚醒順序也是按照加入隊列的先后順序,先加入的會先被喚醒,而后加入的可能 Goroutine 需要等待調(diào)度器的調(diào)度。

          最后我們總結(jié)一下使用這兩個方法要注意的問題:

          • Signal:允許調(diào)用者喚醒一個等待此CondGoroutine,如果此時沒有等待的 goroutine,顯然無需通知waiter;如果Cond 等待隊列中有一個或者多個等待的 goroutine,則需要從等待隊列中移除第一個 goroutine 并把它喚醒。調(diào)用 Signal方法時,不強求你一定要持有 c.L 的鎖。
          • broadcast:允許調(diào)用者喚醒所有等待此 Condgoroutine。如果此時沒有等待的goroutine,顯然無需通知 waiter;如果 Cond 等待隊列中有一個或者多個等待的 goroutine,則清空所有等待的 goroutine,并全部喚醒,不強求你一定要持有 c.L 的鎖。

          注意事項

          • 調(diào)用wait方法的時候一定要加鎖,否則會導(dǎo)致程序發(fā)生panic.
          • wait調(diào)用時需要檢查等待條件是否滿足,也就說goroutine被喚醒了不等于等待條件被滿足,等待者被喚醒,只是得到了一次檢查的機會而已,推薦寫法如下:
          //    c.L.Lock()
          //    for !condition() {
          //        c.Wait()
          //    }
          //    ... make use of condition ...
          //    c.L.Unlock()
          • SignalBoardcast 兩個喚醒操作不需要加鎖

          總結(jié)

          其實Cond在實際項目中被使用的機會比較少,Go特有的channel就可以代替它,暫時只在Kubernetes項目中看到了應(yīng)用,使用場景是每次往隊列中成功增加了元素后就需要調(diào)用 Broadcast 通知所有的等待者,使用Cond就很合適,相比channel減少了代碼復(fù)雜性。


          推薦閱讀


          福利

          我為大家整理了一份從入門到進階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門看什么,進階看什么。關(guān)注公眾號 「polarisxu」,回復(fù) ebook 獲??;還可以回復(fù)「進群」,和數(shù)萬 Gopher 交流學(xué)習(xí)。

          瀏覽 33
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲精品成人无码熟妇在线 | 免费的av网站 | 性免费视频 | 人人草视频在线播放 | 成人视频偷拍 |