手摸手Go 簡單聊聊sync.RWMutex
那一天我二十一歲,在我一生的黃金時代,我有好多奢侈。我想愛,想吃,還想在一瞬間變成天上半明半暗的云,后來我才知道,生活就是個緩慢受錘的過程,人一天天老下去,奢望也一天天消逝,最后變得像挨了錘的牛一樣。可是我過二十一歲生日時沒有預見到這一點。我覺得自己會永遠生猛下去,什么也錘不了我。---王小波

各位早上好~今天來聊聊Go提供的讀寫互斥鎖sync.RWMutex,它可以加任意數(shù)量的讀鎖或者一個寫鎖。讀寫鎖占用規(guī)則:
- 讀鎖占用的情況下,會組織寫鎖的獲取,但是不會阻止其他
goroutine獲取讀鎖 - 寫鎖占用的情況下,則不允許任何(讀鎖/寫鎖)請求,將整個鎖獨占
其零值表示未上鎖狀態(tài)。
基本使用
使用sync.RWMutex可以很容易實現(xiàn)一個協(xié)程安全的字典結(jié)構(gòu)。
package?main
import?(
?"fmt"
?"math/rand"
?"sync"
?"time"
)
func?init()?{
?rand.Seed(time.Now().Unix())
}
type?Key?interface{}
type?Value?interface{}
type?Dictionary?struct?{
?m????sync.RWMutex
?data?map[Key]Value
}
func?(d?*Dictionary)?Add(k?Key,?v?Value)?{
?d.m.Lock()
?defer?d.m.Unlock()
?if?d.data?==?nil?{
??d.data?=?make(map[Key]Value)
?}
?d.data[k]?=?v
}
func?(d?*Dictionary)?Get(k?Key)?Value?{
?d.m.RLock()
?defer?d.m.RUnlock()
?return?d.data[k]
}
func?main()?{
?d?:=?&Dictionary{}
?wg?:=?sync.WaitGroup{}
?for?i?:=?0;?i?<?10;?i++?{
??wg.Add(2)
??weight?:=?rand.Intn(100)
??go?func(w?int)?{
???time.Sleep(time.Duration(rand.Intn(1000))?*?time.Millisecond)
???d.Add("leo",?fmt.Sprintf("leo超帥的?+%d",?w))
???wg.Done()
??}(weight)
??go?func()?{
???time.Sleep(time.Duration(rand.Intn(1000))?*?time.Millisecond)
???fmt.Println(d.Get("leo"))
???wg.Done()
??}()
?}
?wg.Wait()
}
sync.RWMutex源碼分析
數(shù)據(jù)結(jié)構(gòu)
type?RWMutex?struct?{
?w???????????Mutex??//?寫操作需要先嘗試持有
?writerSem???uint32?//?等待讀操作完成的寫等待的信號量
?readerSem???uint32?//?等待寫操作完成的讀等待的信號量
?readerCount?int32??//?阻塞的讀操作數(shù)量
?readerWait??int32??//?寫操作?來之前?讀操作數(shù)量
}
//?最大讀操作數(shù)量
const?rwmutexMaxReaders?=?1?<<?30
上面幾個屬性,第一次看到readerWait有點兒懵 這個跟readerCount有啥關(guān)系呢?上圖吧 其實也不復雜?具體可以配合下面代碼分析一起可能會更好理解。

假設(shè)一個場景,不同操作時不同屬性的值變化如下表:
| 操作 | writerSem | readerSem | readerCount | readerWait | rw.w |
|---|---|---|---|---|---|
| 4次Rlock()且均未釋放 | 未阻塞寫操作 | 未阻塞讀操作 | 4 | 0 | 0 |
| 假設(shè)執(zhí)行一次Unlock() | 未阻塞寫操作 | 未阻塞讀操作 | 4-1=3 | 0 | 0 |
| 嘗試執(zhí)行Lock() | 阻塞1個寫操作 | 未阻塞讀操作 | 3-(1<<30) | 3 | 0 |
| Lock()等待readerWait個讀操作執(zhí)行完畢 | |||||
| 執(zhí)行2次RUnlock()同時執(zhí)行2次Rlock() | 阻塞1個寫操作 | 阻塞2個讀操作 | 3-(1<<30)-2+2 | 3-2 | 0 |
| 第一次Lock()未獲得鎖 再次執(zhí)行Lock() 將被阻塞在rw.w上 | 阻塞1個寫操作 | 阻塞2個讀操作 | 3-(1<<30)-2+2 | 3-2 | 1 |
| 第4次RUnlock執(zhí)行完畢時 會喚醒阻塞的第一個Lock | 未阻塞寫 | 阻塞2個讀操作 | 3-(1<<30)-2+2-1+(1<<30)=2 | 0 | 1 |
為什么他們的值會是這樣?我們接著看源碼,然后回過頭再對照表格 自然就明了了。
操作方法
RLock
用于讀操作搶占鎖,它不應該被遞歸調(diào)用。
func?(rw?*RWMutex)?RLock()?{
?if?atomic.AddInt32(&rw.readerCount,?1)?<?0?{
??//?A?writer?is?pending,?wait?for?it.
??runtime_SemacquireMutex(&rw.readerSem,?false,?0)
?}
}
執(zhí)行RLock,若rw.readerCount加1小于0則說明存在寫操作持有鎖,則將當前的讀操作阻塞到rw.readerSem上。
RUnlock
RUnlock一次只能解除一個Rlock操作,并不會影響其他的讀操作。如果沒有執(zhí)行RLock,執(zhí)行RUnlock會panic throw("sync: RUnlock of unlocked RWMutex")
func?(rw?*RWMutex)?RUnlock()?{
?if?r?:=?atomic.AddInt32(&rw.readerCount,?-1);?r?<?0?{
??//?Outlined?slow-path?to?allow?the?fast-path?to?be?inlined
??rw.rUnlockSlow(r)
?}
}
RUnlock首先判斷r=rw.readerCount-1
若
r>=0表示釋放讀鎖成功若
r<0表示存在寫操作持有鎖,進入slow-path
func?(rw?*RWMutex)?rUnlockSlow(r?int32)?{
??//不存在RLock操作?不能執(zhí)行RUnlock
?if?r+1?==?0?||?r+1?==?-rwmutexMaxReaders?{
??race.Enable()
??throw("sync:?RUnlock?of?unlocked?RWMutex")
?}
?//?A?writer?is?pending.
?if?atomic.AddInt32(&rw.readerWait,?-1)?==?0?{//讀操作執(zhí)行完畢?喚醒等待的寫操作
??//最后一個讀操作解鎖?喚醒寫操作
??runtime_Semrelease(&rw.writerSem,?false,?1)
?}
}
因為初始狀態(tài)下sync.RWMutex是未上鎖狀態(tài),rw.readerCount初始為0,或者在無讀操作加鎖的情況下,寫操作加鎖rw.readerCount會被置為const rwmutexMaxReaders = 1 << 30,因此
r+1 == 0 || r+1 == -rwmutexMaxReaders表明當前無讀操作持有鎖,而直接執(zhí)行RUnlock會panic。
嘗試進行rw.readerWait-1操作,然后判斷若rw.readerWait==0則表明寫操作搶占鎖之前的讀操作都已經(jīng)處理完畢,此時可以喚醒被阻塞在rw.writerSem上的寫操作了。
Lock
針對寫操作時嘗試獲取鎖,如果當前鎖被讀操作或?qū)懖僮鞒钟校瑒t阻塞等待直到鎖可用。
func?(rw?*RWMutex)?Lock()?{
?//?首先解決于其他寫操作的競爭問題
?rw.w.Lock()
?//?告訴讀操作,這里存在阻塞的寫操作
?r?:=?atomic.AddInt32(&rw.readerCount,?-rwmutexMaxReaders)?+?rwmutexMaxReaders
?//?如果仍存存在讀操作持有鎖?則阻塞等待
?if?r?!=?0?&&?atomic.AddInt32(&rw.readerWait,?r)?!=?0?{
??runtime_SemacquireMutex(&rw.writerSem,?false,?0)
?}
}
大致步驟:
Lock首先調(diào)用了rw.w.Lock()來解決多個寫操作并發(fā)請求的競爭問題:如果存在多個寫操作,只有一個寫操作會獲取到rw.w鎖接著嘗試剩余的操作,其他的寫操作會被阻塞在rw.w上。- 調(diào)用
atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders),這里結(jié)合RLock()中的atomic.AddInt32(&rw.readerCount, 1) < 0則將讀操作阻塞在rw.readerSem上,以此來讓讀操作感知到當前是否存在阻塞的寫操作。
Unlock
跟sync.Mutex一樣,一個鎖定的sync.RWMutex跟特定的goroutine沒有任何關(guān)聯(lián)。一個goroutine可能RLock(Lock)一個sync.RWMutex,然后另一個goroutine可以RUnlock(Unlock)掉這個鎖狀態(tài)。
//?這里也是不允許針對一個未執(zhí)行Lock的rw執(zhí)行Unlock操作的
func?(rw?*RWMutex)?Unlock()?{
?//?通知讀操作,這里沒有激活的寫操作了
?r?:=?atomic.AddInt32(&rw.readerCount,?rwmutexMaxReaders)
?if?r?>=?rwmutexMaxReaders?{
??race.Enable()
??throw("sync:?Unlock?of?unlocked?RWMutex")
?}
?//?Unblock?blocked?readers,?if?any.
?for?i?:=?0;?i?<?int(r);?i++?{
??runtime_Semrelease(&rw.readerSem,?false,?0)
?}
?//?Allow?other?writers?to?proceed.
?rw.w.Unlock()
}
基本邏輯:
- 通過
atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)恢復rw.readerCount,即通知讀操作這里沒有激活的寫操作,意味著這個時候讀操作可以有機會競爭鎖了,即使仍存在阻塞在rw.w上的寫操作,這里應該是防止讀操作會因為寫操作過多被餓死。 - 判斷是否是沒
Lock的情況下執(zhí)行了Unlock - 依次喚醒阻塞在
rw.readerSem上的讀操作 rw.w.Unlock意味著阻塞在rw.w上的其他寫操作可以接著搶占鎖了。
關(guān)于遞歸讀鎖定問題
細心的童鞋可能會發(fā)現(xiàn)sync.RWMutex是禁止遞歸讀鎖定的,官方是這么說的
If a goroutine holds a RWMutex for reading and another goroutine might call Lock, no goroutine should expect to be able to acquire a read lock until the initial read lock is released. In particular, this prohibits recursive read locking. This is to ensure that the lock eventually becomes available; a blocked Lock call excludes new readers from acquiring the lock.
大概意思是說,如果我們持有一個sync.RWMutex的讀鎖時,可能會有另一個寫操作嘗試獲取鎖,因為前面的讀鎖未釋放則這個寫操作只能阻塞等待。不幸的是,這個讀操作干完活并不釋放讀鎖,而是繼續(xù)遞歸調(diào)用讀操作獲取鎖,但是這個時候獲取讀鎖的時候發(fā)現(xiàn)前面有阻塞的寫鎖請求,則這個讀操作請求只能阻塞等待前面的寫操作完事兒。最早的讀操作又等待當前的讀操作完事兒去釋放鎖,完美的一個貪吃蛇構(gòu)成的一個死鎖的場景就出現(xiàn)啦。

舉個栗子吧:
我們在斐波那契數(shù)列遞歸函數(shù)里,遞歸獲取讀鎖,然后中途我們來個寫鎖請求,看看啥結(jié)果
package?main
import?(
?"sync"
?"time"
)
var?m?sync.RWMutex
func?fibonacci(num?int)?int?{
?if?num?<?2?{
??return?1
?}
?m.RLock()
?defer?m.RUnlock()
?time.Sleep(time.Millisecond?*?100)
?return?fibonacci(num-1)?+?fibonacci(num-2)
}
func?main()?{
?done?:=?make(chan?int)
?go?func()?{
??m.Lock()
??time.Sleep(time.Millisecond?*?200)
??m.Unlock()
??done?<-?1
?}()
?fibonacci(5)
?<-done
}
輸出結(jié)果是那么熟悉的味道。
fatal?error:?all?goroutines?are?asleep?-?deadlock!
總結(jié)
sync.RWMutex提供了比sync.Mutex更加細粒度的鎖控制,將讀鎖和寫鎖做了分離,本來邏輯會比較復雜,但是它是基于sync.Mutex所以整體邏輯就變得比較簡單,可能readerCount和readerWait咋一看有點兒懵,不過仔細看看不難,通過readerCount的值來達到讀寫鎖通信的目的 設(shè)計還是很巧妙的,受益匪淺。
如果閱讀過程中發(fā)現(xiàn)本文存疑或錯誤的地方,可以關(guān)注公眾號留言。如果覺得還可以 幫忙點個在看??
