<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          手摸手Go 簡單聊聊sync.RWMutex

          共 5972字,需瀏覽 12分鐘

           ·

          2021-03-09 19:59

          那一天我二十一歲,在我一生的黃金時代,我有好多奢侈。我想愛,想吃,還想在一瞬間變成天上半明半暗的云,后來我才知道,生活就是個緩慢受錘的過程,人一天天老下去,奢望也一天天消逝,最后變得像挨了錘的牛一樣。可是我過二十一歲生日時沒有預見到這一點。我覺得自己會永遠生猛下去,什么也錘不了我。---王小波

          2ab9c57869c21812d62e31bd9b983957.webp

          各位早上好~今天來聊聊Go提供的讀寫互斥鎖sync.RWMutex,它可以加任意數(shù)量的讀鎖或者一個寫鎖。讀寫鎖占用規(guī)則:

          • 讀鎖占用的情況下,會組織寫鎖的獲取,但是不會阻止其他goroutine獲取讀鎖
          • 寫鎖占用的情況下,則不允許任何(讀鎖/寫鎖)請求,將整個鎖獨占

          其零值表示未上鎖狀態(tài)。


          基本使用

          使用sync.RWMutex可以很容易實現(xiàn)一個協(xié)程安全的字典結(jié)構(gòu)。

          package?main

          import?(
          ?"fmt"
          ?"math/rand"
          ?"sync"
          ?"time"
          )

          func?init()?{
          ?rand.Seed(time.Now().Unix())
          }

          type?Key?interface{}
          type?Value?interface{}
          type?Dictionary?struct?{
          ?m????sync.RWMutex
          ?data?map[Key]Value
          }

          func?(d?*Dictionary)?Add(k?Key,?v?Value)?{
          ?d.m.Lock()
          ?defer?d.m.Unlock()
          ?if?d.data?==?nil?{
          ??d.data?=?make(map[Key]Value)
          ?}
          ?d.data[k]?=?v
          }

          func?(d?*Dictionary)?Get(k?Key)?Value?{
          ?d.m.RLock()
          ?defer?d.m.RUnlock()
          ?return?d.data[k]
          }
          func?main()?{
          ?d?:=?&Dictionary{}
          ?wg?:=?sync.WaitGroup{}

          ?for?i?:=?0;?i?<?10;?i++?{
          ??wg.Add(2)
          ??weight?:=?rand.Intn(100)
          ??go?func(w?int)?{
          ???time.Sleep(time.Duration(rand.Intn(1000))?*?time.Millisecond)
          ???d.Add("leo",?fmt.Sprintf("leo超帥的?+%d",?w))
          ???wg.Done()
          ??}(weight)
          ??go?func()?{
          ???time.Sleep(time.Duration(rand.Intn(1000))?*?time.Millisecond)
          ???fmt.Println(d.Get("leo"))
          ???wg.Done()
          ??}()
          ?}
          ?wg.Wait()
          }

          sync.RWMutex源碼分析

          數(shù)據(jù)結(jié)構(gòu)

          type?RWMutex?struct?{
          ?w???????????Mutex??//?寫操作需要先嘗試持有
          ?writerSem???uint32?//?等待讀操作完成的寫等待的信號量
          ?readerSem???uint32?//?等待寫操作完成的讀等待的信號量
          ?readerCount?int32??//?阻塞的讀操作數(shù)量
          ?readerWait??int32??//?寫操作?來之前?讀操作數(shù)量
          }
          //?最大讀操作數(shù)量
          const?rwmutexMaxReaders?=?1?<<?30

          上面幾個屬性,第一次看到readerWait有點兒懵 這個跟readerCount有啥關(guān)系呢?上圖吧 其實也不復雜?具體可以配合下面代碼分析一起可能會更好理解。

          8a4e629a793fd4cbe685abc156158691.webp

          rwmutex attribute

          假設(shè)一個場景,不同操作時不同屬性的值變化如下表:

          操作writerSemreaderSemreaderCountreaderWaitrw.w
          4次Rlock()且均未釋放未阻塞寫操作未阻塞讀操作400
          假設(shè)執(zhí)行一次Unlock()未阻塞寫操作未阻塞讀操作4-1=300
          嘗試執(zhí)行Lock()阻塞1個寫操作未阻塞讀操作3-(1<<30)30
          Lock()等待readerWait個讀操作執(zhí)行完畢




          執(zhí)行2次RUnlock()同時執(zhí)行2次Rlock()阻塞1個寫操作阻塞2個讀操作3-(1<<30)-2+23-20
          第一次Lock()未獲得鎖 再次執(zhí)行Lock() 將被阻塞在rw.w上阻塞1個寫操作阻塞2個讀操作3-(1<<30)-2+23-21
          第4次RUnlock執(zhí)行完畢時 會喚醒阻塞的第一個Lock未阻塞寫阻塞2個讀操作3-(1<<30)-2+2-1+(1<<30)=201

          為什么他們的值會是這樣?我們接著看源碼,然后回過頭再對照表格 自然就明了了。

          操作方法

          RLock

          用于讀操作搶占鎖,它不應該被遞歸調(diào)用。

          func?(rw?*RWMutex)?RLock()?{
          ?if?atomic.AddInt32(&rw.readerCount,?1)?<?0?{
          ??//?A?writer?is?pending,?wait?for?it.
          ??runtime_SemacquireMutex(&rw.readerSem,?false,?0)
          ?}
          }

          執(zhí)行RLock,若rw.readerCount加1小于0則說明存在寫操作持有鎖,則將當前的讀操作阻塞到rw.readerSem上。

          RUnlock

          RUnlock一次只能解除一個Rlock操作,并不會影響其他的讀操作。如果沒有執(zhí)行RLock,執(zhí)行RUnlock會panic throw("sync: RUnlock of unlocked RWMutex")

          func?(rw?*RWMutex)?RUnlock()?{
          ?if?r?:=?atomic.AddInt32(&rw.readerCount,?-1);?r?<?0?{
          ??//?Outlined?slow-path?to?allow?the?fast-path?to?be?inlined
          ??rw.rUnlockSlow(r)
          ?}
          }

          RUnlock首先判斷r=rw.readerCount-1

          • r>=0 表示釋放讀鎖成功

          • r<0表示存在寫操作持有鎖,進入slow-path

          func?(rw?*RWMutex)?rUnlockSlow(r?int32)?{
          ??//不存在RLock操作?不能執(zhí)行RUnlock
          ?if?r+1?==?0?||?r+1?==?-rwmutexMaxReaders?{
          ??race.Enable()
          ??throw("sync:?RUnlock?of?unlocked?RWMutex")
          ?}
          ?//?A?writer?is?pending.
          ?if?atomic.AddInt32(&rw.readerWait,?-1)?==?0?{//讀操作執(zhí)行完畢?喚醒等待的寫操作
          ??//最后一個讀操作解鎖?喚醒寫操作
          ??runtime_Semrelease(&rw.writerSem,?false,?1)
          ?}
          }

          因為初始狀態(tài)下sync.RWMutex是未上鎖狀態(tài),rw.readerCount初始為0,或者在無讀操作加鎖的情況下,寫操作加鎖rw.readerCount會被置為const rwmutexMaxReaders = 1 << 30,因此

          r+1 == 0 || r+1 == -rwmutexMaxReaders表明當前無讀操作持有鎖,而直接執(zhí)行RUnlock會panic。

          嘗試進行rw.readerWait-1操作,然后判斷若rw.readerWait==0則表明寫操作搶占鎖之前的讀操作都已經(jīng)處理完畢,此時可以喚醒被阻塞在rw.writerSem上的寫操作了。

          Lock

          針對寫操作時嘗試獲取鎖,如果當前鎖被讀操作或?qū)懖僮鞒钟校瑒t阻塞等待直到鎖可用。

          func?(rw?*RWMutex)?Lock()?{
          ?//?首先解決于其他寫操作的競爭問題
          ?rw.w.Lock()
          ?//?告訴讀操作,這里存在阻塞的寫操作
          ?r?:=?atomic.AddInt32(&rw.readerCount,?-rwmutexMaxReaders)?+?rwmutexMaxReaders
          ?//?如果仍存存在讀操作持有鎖?則阻塞等待
          ?if?r?!=?0?&&?atomic.AddInt32(&rw.readerWait,?r)?!=?0?{
          ??runtime_SemacquireMutex(&rw.writerSem,?false,?0)
          ?}
          }

          大致步驟:

          1. Lock首先調(diào)用了rw.w.Lock()來解決多個寫操作并發(fā)請求的競爭問題:如果存在多個寫操作,只有一個寫操作會獲取到rw.w鎖接著嘗試剩余的操作,其他的寫操作會被阻塞在rw.w上。
          2. 調(diào)用atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders),這里結(jié)合RLock()中的atomic.AddInt32(&rw.readerCount, 1) < 0則將讀操作阻塞在rw.readerSem上,以此來讓讀操作感知到當前是否存在阻塞的寫操作。

          Unlock

          sync.Mutex一樣,一個鎖定的sync.RWMutex跟特定的goroutine沒有任何關(guān)聯(lián)。一個goroutine可能RLockLock)一個sync.RWMutex,然后另一個goroutine可以RUnlockUnlock)掉這個鎖狀態(tài)。

          //?這里也是不允許針對一個未執(zhí)行Lock的rw執(zhí)行Unlock操作的
          func?(rw?*RWMutex)?Unlock()?{
          ?//?通知讀操作,這里沒有激活的寫操作了
          ?r?:=?atomic.AddInt32(&rw.readerCount,?rwmutexMaxReaders)
          ?if?r?>=?rwmutexMaxReaders?{
          ??race.Enable()
          ??throw("sync:?Unlock?of?unlocked?RWMutex")
          ?}
          ?//?Unblock?blocked?readers,?if?any.
          ?for?i?:=?0;?i?<?int(r);?i++?{
          ??runtime_Semrelease(&rw.readerSem,?false,?0)
          ?}
          ?//?Allow?other?writers?to?proceed.
          ?rw.w.Unlock()
          }

          基本邏輯:

          1. 通過atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)恢復rw.readerCount,即通知讀操作這里沒有激活的寫操作,意味著這個時候讀操作可以有機會競爭鎖了,即使仍存在阻塞在rw.w上的寫操作,這里應該是防止讀操作會因為寫操作過多被餓死。
          2. 判斷是否是沒Lock的情況下執(zhí)行了Unlock
          3. 依次喚醒阻塞在rw.readerSem上的讀操作
          4. rw.w.Unlock意味著阻塞在rw.w上的其他寫操作可以接著搶占鎖了。

          關(guān)于遞歸讀鎖定問題

          細心的童鞋可能會發(fā)現(xiàn)sync.RWMutex是禁止遞歸讀鎖定的,官方是這么說的

          If a goroutine holds a RWMutex for reading and another goroutine might call Lock, no goroutine should expect to be able to acquire a read lock until the initial read lock is released. In particular, this prohibits recursive read locking. This is to ensure that the lock eventually becomes available; a blocked Lock call excludes new readers from acquiring the lock.

          大概意思是說,如果我們持有一個sync.RWMutex的讀鎖時,可能會有另一個寫操作嘗試獲取鎖,因為前面的讀鎖未釋放則這個寫操作只能阻塞等待。不幸的是,這個讀操作干完活并不釋放讀鎖,而是繼續(xù)遞歸調(diào)用讀操作獲取鎖,但是這個時候獲取讀鎖的時候發(fā)現(xiàn)前面有阻塞的寫鎖請求,則這個讀操作請求只能阻塞等待前面的寫操作完事兒。最早的讀操作又等待當前的讀操作完事兒去釋放鎖,完美的一個貪吃蛇構(gòu)成的一個死鎖的場景就出現(xiàn)啦。

          795fe31b617a220dead659a9f7eff572.webp

          recursive rlock

          舉個栗子吧:

          我們在斐波那契數(shù)列遞歸函數(shù)里,遞歸獲取讀鎖,然后中途我們來個寫鎖請求,看看啥結(jié)果

          package?main

          import?(
          ?"sync"
          ?"time"
          )

          var?m?sync.RWMutex

          func?fibonacci(num?int)?int?{
          ?if?num?<?2?{
          ??return?1
          ?}
          ?m.RLock()
          ?defer?m.RUnlock()
          ?time.Sleep(time.Millisecond?*?100)
          ?return?fibonacci(num-1)?+?fibonacci(num-2)
          }
          func?main()?{
          ?done?:=?make(chan?int)
          ?go?func()?{
          ??m.Lock()
          ??time.Sleep(time.Millisecond?*?200)
          ??m.Unlock()
          ??done?<-?1
          ?}()
          ?fibonacci(5)
          ?<-done
          }

          輸出結(jié)果是那么熟悉的味道。

          fatal?error:?all?goroutines?are?asleep?-?deadlock!

          總結(jié)

          sync.RWMutex提供了比sync.Mutex更加細粒度的鎖控制,將讀鎖和寫鎖做了分離,本來邏輯會比較復雜,但是它是基于sync.Mutex所以整體邏輯就變得比較簡單,可能readerCountreaderWait咋一看有點兒懵,不過仔細看看不難,通過readerCount的值來達到讀寫鎖通信的目的 設(shè)計還是很巧妙的,受益匪淺。




          如果閱讀過程中發(fā)現(xiàn)本文存疑或錯誤的地方,可以關(guān)注公眾號留言。如果覺得還可以 幫忙點個在看??






          瀏覽 126
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩一区二区在线视频 | 久久夜色精品国产亚洲AV | 狠狠躁夜夜躁人人爽天天高潮 | 天天爽,夜爽。 | 久久对白 |