你真的了解 sync.Mutex嗎
Mutex是一個互斥的排他鎖,零值Mutex為未上鎖狀態(tài),Mutex一旦被使用 禁止被拷貝。使用起來也比較簡單
package?main
import?"sync"
func?main()?{
?m?:=?sync.Mutex{}
?m.Lock()
?defer?m.Unlock()
??//?do?something
}
Mutex有兩種操作模式:
正常模式(非公平模式)
阻塞等待的goroutine保存在FIFO的隊列中,喚醒的goroutine不直接擁有鎖,需要與新來的goroutine競爭獲取鎖。因為新來的goroutine很多已經(jīng)占有了CPU,所以喚醒的goroutine在競爭中很容易輸;但如果一個goroutine獲取鎖失敗超過1ms,則會將Mutex切換為饑餓模式。
饑餓模式(公平模式)
這種模式下,直接將等待隊列隊頭goroutine解鎖goroutine;新來的gorountine也不會嘗試獲得鎖,而是直接插入到等待隊列隊尾。

如果一個goroutine獲得了鎖,并且他在等待隊列隊尾 或者 他等待小于1ms,則會將Mutex的模式切換回正常模式。正常模式有更好的性能,新來的goroutine通過幾次競爭可以直接獲取到鎖,盡管當(dāng)前仍有等待的goroutine。而饑餓模式則是對正常模式的補(bǔ)充,防止等待隊列中的goroutine永遠(yuǎn)沒有機(jī)會獲取鎖。
其數(shù)據(jù)結(jié)構(gòu)為:
type?Mutex?struct?{
?state?int32?//?鎖競爭的狀態(tài)值
?sema??uint32?//?信號量
}
state代表了當(dāng)前鎖的狀態(tài)、 是否是存在自旋、是否是饑餓模式、阻塞goroutine數(shù)量
?mutexLocked?=?1?<iota?//?mutex?is?locked
?mutexWoken
?mutexStarving
?mutexWaiterShift?=?iota

mutex.state & mutexLocked 加鎖狀態(tài) 1 表示已加鎖 0 表示未加鎖
mutex.state & mutexWoken ?喚醒狀態(tài) 1 表示已喚醒狀態(tài) 0 表示未喚醒
mutex.state & mutexStarving ?饑餓狀態(tài) ?1 表示饑餓狀態(tài)?0表示正常狀態(tài)
mutex.state >> mutexWaiterShift得到當(dāng)前goroutine數(shù)目
Lock
上鎖大致分為fast-path和slow-path
Fast-path
lock通過調(diào)用atomic.CompareAndSwapInt32來競爭更新m.state,成功則獲得鎖;失敗,則進(jìn)入slow-path
func?(m?*Mutex)?Lock()?{
?//?Fast?path:?grab?unlocked?mutex.
?if?atomic.CompareAndSwapInt32(&m.state,?0,?mutexLocked)?{
??if?race.Enabled?{
???race.Acquire(unsafe.Pointer(m))
??}
??return
?}
?//?Slow?path?(outlined?so?that?the?fast?path?can?be?inlined)
?m.lockSlow()
}
atomic.CompareAndSwapInt32正如簽名一樣,進(jìn)行比較和交換操作,這過程是原子的
//?CompareAndSwapInt32?executes?the?compare-and-swap?operation?for?an?int32?value.
func?CompareAndSwapInt32(addr?*int32,?old,?new?int32)?(swapped?bool)
源碼中我們并不能看到該函數(shù)的具體實(shí)現(xiàn),他的實(shí)現(xiàn)跟硬件平臺有關(guān),我們可以查看匯編代碼一窺究竟,go tool compile -S mutex.go也可以對二進(jìn)制文件go tool objdump -s methodname binary
0x0036 00054 (loop.go:6) MOVQ AX, CX
0x0039 00057 ($GOROOT/src/sync/mutex.go:74) XORL AX, AX
0x003b 00059 ($GOROOT/src/sync/mutex.go:74) MOVL $1, DX
0x0040 00064 ($GOROOT/src/sync/mutex.go:74) LOCK
0x0041 00065 ($GOROOT/src/sync/mutex.go:74) CMPXCHGL DX, (CX)
0x0044 00068 ($GOROOT/src/sync/mutex.go:74) SETEQ AL
0x0047 00071 ($GOROOT/src/sync/mutex.go:74) TESTB AL, AL
0x0049 00073 ($GOROOT/src/sync/mutex.go:74) JEQ 150
0x004b 00075 (loop.go:8) MOVL $8, ""..autotmp_6+16(SP)
0x0053 00083 (loop.go:8) LEAQ sync.(*Mutex).Unlock·f(SB), AX
重點(diǎn)關(guān)注第5行CMPXCHGL DX, (CX)這個CMPXCHGL是x86和Intel架構(gòu)中的compare and exchange指令,Java的那套AtomicXX底層也是依賴這個指令來保證原子性操作的。
所以我們看到Mutex是互斥排他鎖且不可重入,當(dāng)我們在一個goroutine獲取同一個鎖會導(dǎo)致死鎖。
package?main
import?"sync"
func?main()?{
?m?:=?sync.Mutex{}
?m.Lock()
??//這里會導(dǎo)致死鎖
?m.Lock()
?defer?m.Unlock()
}
slow-path
如果goroutinefast-path失敗,則調(diào)用m.lockSlow()進(jìn)入slow-path,函數(shù)內(nèi)部主要是一個for{}死循環(huán),進(jìn)入循環(huán)的goroutine大致分為兩類:
新來的 gorountine被喚醒的 goroutine
Mutex默認(rèn)為正常模式,若新來的goroutine搶占成功,則另一個就需要阻塞等待;阻塞等待一旦超過閾值1ms則會將Mutex切換到饑餓模式,這個模式下新來的goroutine只能阻塞等待在隊列尾部,沒有搶占的資格。當(dāng)然等待阻塞->喚醒->參與搶占鎖,這個過程顯示不是很高效,所以這里有一個自旋的優(yōu)化
當(dāng)mutex處于正常模式且能夠自旋,會讓當(dāng)前goroutine自旋等待,同時設(shè)置mutex.state的mutexWoken位為1,保證自旋等待的goroutine一定比新來goroutine更有優(yōu)先權(quán)。這樣unlock操作也會優(yōu)先保證自旋等待的goroutine獲取鎖
golang對自旋做了些限制要求 需要:
多核CPU GOMAXPROCS>1 至少有一個運(yùn)行的P并且local的P隊列為空 感興趣的可以跟下源碼比較簡單
func?(m?*Mutex)?lockSlow()?{
?var?waitStartTime?int64
?starving?:=?false
?awoke?:=?false
?iter?:=?0
?old?:=?m.state
?for?{
????//饑餓模式下不能自旋,也沒有資格搶占,鎖是手遞手給到等待的goroutine
??if?old&(mutexLocked|mutexStarving)?==?mutexLocked?&&?runtime_canSpin(iter)?{//當(dāng)Mutex處于正常模式且能夠自旋
??????//設(shè)置mutexWoken為1?告訴unlock操作,存在自旋gorountine?unlock后不需要喚醒其他goroutine
???if?!awoke?&&?old&mutexWoken?==?0?&&?old>>mutexWaiterShift?!=?0?&&
????atomic.CompareAndSwapInt32(&m.state,?old,?old|mutexWoken)?{
????awoke?=?true
???}
???runtime_doSpin()
???iter++
???old?=?m.state
???continue
??}
??//??自旋完了?還是沒拿到鎖
??new?:=?old
????//當(dāng)mutex處于正常模式,將new的mutexLocked設(shè)置為1?即準(zhǔn)備搶占鎖
??if?old&mutexStarving?==?0?{
???new?|=?mutexLocked
??}
????//加鎖狀態(tài)或饑餓模式下?新來的goroutine進(jìn)入等待隊列
??if?old&(mutexLocked|mutexStarving)?!=?0?{
???new?+=?1?<??}
????//將Mutex切換為饑餓模式,若未加鎖則不必切換
????//Unlock操作希望饑餓模式存在等待者
??if?starving?&&?old&mutexLocked?!=?0?{
???new?|=?mutexStarving
??}
??if?awoke?{
??????//?當(dāng)前goroutine自旋過?已被被喚醒,則需要將mutexWoken重置
???if?new&mutexWoken?==?0?{
????throw("sync:?inconsistent?mutex?state")
???}
???new?&^=?mutexWoken?//重置mutexWoken
??}
??if?atomic.CompareAndSwapInt32(&m.state,?old,?new)?{
??????//?當(dāng)前goroutine獲取鎖前mutex處于未加鎖?正常模式下
???if?old&(mutexLocked|mutexStarving)?==?0?{
????break?//?使用CAS成功搶占到鎖
???}
???//?waitStartTime!=0表示當(dāng)前goroutine是等待狀態(tài)喚醒的?
??????//?為了與第一次調(diào)用Lock的goroutine劃分不同的優(yōu)先級
???queueLifo?:=?waitStartTime?!=?0
???if?waitStartTime?==?0?{
????????//開始記錄等待時間
????waitStartTime?=?runtime_nanotime()
???}
??????//?將被喚醒但是沒有獲得鎖的goroutine插入到當(dāng)前等待隊列隊首
??????//?使用信號量阻塞當(dāng)前goroutine
???runtime_SemacquireMutex(&m.sema,?queueLifo,?1)
??????//?當(dāng)goroutine等待時間超過starvationThresholdNs,mutex進(jìn)入饑餓模式
???starving?=?starving?||?runtime_nanotime()-waitStartTime?>?starvationThresholdNs
???old?=?m.state
???if?old&mutexStarving?!=?0?{
????????//如果當(dāng)前goroutine被喚醒且mutex處于饑餓模式?則將鎖手遞手交給當(dāng)前goroutine
????if?old&(mutexLocked|mutexWoken)?!=?0?||?old>>mutexWaiterShift?==?0?{
?????throw("sync:?inconsistent?mutex?state")
????}
????????//等待狀態(tài)的goroutine?-?1
????delta?:=?int32(mutexLocked?-?1<????????//如果等待時間小于1ms?或?當(dāng)前goroutine是隊列中最后一個
????if?!starving?||?old>>mutexWaiterShift?==?1?{
??????//?退出饑餓模式
?????delta?-=?mutexStarving
????}
????atomic.AddInt32(&m.state,?delta)
????break
???}
???awoke?=?true
???iter?=?0
??}?else?{
???old?=?m.state
??}
?}
}
Unlock
解鎖分兩種情況
當(dāng)前只有一個goroutine占有鎖 unlock完 直接結(jié)束
func?(m?*Mutex)?Unlock()?{
?//?去除加鎖狀態(tài)
?new?:=?atomic.AddInt32(&m.state,?-mutexLocked)
?if?new?!=?0?{//存在等待的goroutine
??m.unlockSlow(new)
?}
}
unlock完畢mutex.state!=0 則存在以下可能 直接將鎖交給等待隊列的第一個goroutine 當(dāng)前存在等待goroutine 然后喚醒它 但不是第一個goroutine 當(dāng)前存在自旋等待的goroutine 則不喚醒其他等待gorotune 正常模式下 饑餓模式下
func?(m?*Mutex)?unlockSlow(new?int32)?{
??//未加鎖的情況下不能多次調(diào)用unlock
?if?(new+mutexLocked)&mutexLocked?==?0?{
??throw("sync:?unlock?of?unlocked?mutex")
?}
?if?new&mutexStarving?==?0?{//正常模式下
??old?:=?new
??for?{
??????//沒有等待的goroutine?或?已經(jīng)存在一個獲得鎖?或被喚醒?或處于饑餓模式下不需要喚醒任何處于等待的goroutine
???if?old>>mutexWaiterShift?==?0?||?old&(mutexLocked|mutexWoken|mutexStarving)?!=?0?{
????return
???}
???//?等待狀態(tài)goroutine數(shù)量-1?并設(shè)置喚醒狀態(tài)為1?然后喚醒一個等待goroutine
???new?=?(old?-?1<???if?atomic.CompareAndSwapInt32(&m.state,?old,?new)?{
????????//喚醒一個阻塞的goroutine?但不是第一個等待者
????runtime_Semrelease(&m.sema,?false,?1)
????return
???}
???old?=?m.state
??}
?}?else?{
????//饑餓模式下手遞手將鎖交給隊列第一個等待的goroutine
????//即使期間有新來的goroutine到來,只要處于饑餓模式?鎖就不會被新來的goroutine搶占
??runtime_Semrelease(&m.sema,?true,?1)
?}
}
信號量
上面可以看到Mutex對goroutine的阻塞和喚醒操作是利用semaphore來實(shí)現(xiàn)的,大致的思路是:Go runtime維護(hù)了一個全局的變量semtable,它保持了所有的信號量
//?Prime?to?not?correlate?with?any?user?patterns.
const?semTabSize?=?251
var?semtable?[semTabSize]struct?{
?root?semaRoot
?pad??[cpu.CacheLinePadSize?-?unsafe.Sizeof(semaRoot{})]byte
}
每個信號量都由一個變量地址指定,Mutex的栗子里就是mutex.sema的地址
type?semaRoot?struct?{
?lock??mutex
?treap?*sudog?//?root?of?balanced?tree?of?unique?waiters.
?nwait?uint32?//?Number?of?waiters.?Read?w/o?the?lock.
}
大致畫了下其數(shù)據(jù)結(jié)構(gòu)

當(dāng) goroutine未獲取到鎖,需要阻塞時調(diào)用sync.runtime_SemacquireMutex進(jìn)入阻塞邏輯
//go:linkname?sync_runtime_SemacquireMutex?sync.runtime_SemacquireMutex
func?sync_runtime_SemacquireMutex(addr?*uint32,?lifo?bool,?skipframes?int)?{
?semacquire1(addr,?lifo,?semaBlockProfile|semaMutexProfile,?skipframes)
}
func?semacquire1(addr?*uint32,?lifo?bool,?profile?semaProfileFlags,?skipframes?int)?{
?gp?:=?getg()
?if?gp?!=?gp.m.curg?{
??throw("semacquire?not?on?the?G?stack")
?}
?//?低成本case
??//?若addr大于1?并通過CAS?-1?成功,則獲取信號量成功?不需要阻塞
?if?cansemacquire(addr)?{
??return
?}
?//?復(fù)雜?case:
?//?增加等待goroutine數(shù)量
?//?再次嘗試cansemacquire?成功則返回
?//?失敗則將自己作為一個waiter入隊
?//?sleep
?//?(waiter?descriptor?is?dequeued?by?signaler)
?s?:=?acquireSudog()
?root?:=?semroot(addr)
?t0?:=?int64(0)
?s.releasetime?=?0
?s.acquiretime?=?0
?s.ticket?=?0
?if?profile&semaBlockProfile?!=?0?&&?blockprofilerate?>?0?{
??t0?=?cputicks()
??s.releasetime?=?-1
?}
?if?profile&semaMutexProfile?!=?0?&&?mutexprofilerate?>?0?{
??if?t0?==?0?{
???t0?=?cputicks()
??}
??s.acquiretime?=?t0
?}
?for?{
??lock(&root.lock)
??//?給nwait+1?這樣semrelease中不會進(jìn)低成本路徑了
??atomic.Xadd(&root.nwait,?1)
??//?檢查?cansemacquire?避免錯過喚醒
??if?cansemacquire(addr)?{
???atomic.Xadd(&root.nwait,?-1)
???unlock(&root.lock)
???break
??}
????//cansemacquire之后的semrelease都可以知道我們正在等待
????//上面設(shè)置了nwait,所以會直接進(jìn)入sleep?即goparkunlock
??root.queue(addr,?s,?lifo)
??goparkunlock(&root.lock,?waitReasonSemacquire,?traceEvGoBlockSync,?4+skipframes)
??if?s.ticket?!=?0?||?cansemacquire(addr)?{
???break
??}
?}
?if?s.releasetime?>?0?{
??blockevent(s.releasetime-t0,?3+skipframes)
?}
?releaseSudog(s)
}
如果addr大于1并通過CAS-1成功則獲取信號量成功,直接返回
否則通過對信號量地址偏移取模&semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root拿到semaRoot(這里個3和251 沒有明白為什么是這兩個數(shù)???),semaRoot包含了一個sudog鏈表和一個nwait整型字段。nwait表示該信號量上阻塞等待的g的數(shù)量,同時為了保證線程安全需要一個互斥量來保護(hù)鏈表。
這里需要注意的是 此處的runtime.mutex并不是之前所說的sync.Mutex,是內(nèi)部的一個簡單版本
簡單來說,sync_runtime_Semacquire就是wait知道*s>0 然后原子的遞減它,來完成同步過程中簡單的睡眠原語
當(dāng) goroutine要釋放鎖 喚醒等待的g時調(diào)用sync.runtime_Semrelease
//go:linkname?sync_runtime_Semrelease?sync.runtime_Semrelease
func?sync_runtime_Semrelease(addr?*uint32,?handoff?bool,?skipframes?int)?{
?semrelease1(addr,?handoff,?skipframes)
}
func?semrelease1(addr?*uint32,?handoff?bool,?skipframes?int)?{
?root?:=?semroot(addr)
?atomic.Xadd(addr,?1)
?//?Easy?case:?no?waiters?
?//?這個檢查必須發(fā)生在xadd之后?避免錯過喚醒
?//?(see?loop?in?semacquire).
?if?atomic.Load(&root.nwait)?==?0?{
??return
?}
?//?Harder?case:?搜索一個waiter?并喚醒它
?lock(&root.lock)
?if?atomic.Load(&root.nwait)?==?0?{
??//?count值已經(jīng)被另一個goroutine消費(fèi)了
??//?所以不需要喚醒其他goroutine
??unlock(&root.lock)
??return
?}
?s,?t0?:=?root.dequeue(addr)
?if?s?!=?nil?{
??atomic.Xadd(&root.nwait,?-1)
?}
?unlock(&root.lock)
?if?s?!=?nil?{?//?May?be?slow,?so?unlock?first
??acquiretime?:=?s.acquiretime
??if?acquiretime?!=?0?{
???mutexevent(t0-acquiretime,?3+skipframes)
??}
??if?s.ticket?!=?0?{
???throw("corrupted?semaphore?ticket")
??}
??if?handoff?&&?cansemacquire(addr)?{
???s.ticket?=?1
??}
??readyWithTime(s,?5+skipframes)
?}
}
關(guān)于信號量更深層的研究可以看下semaphore in plan9
總結(jié)
通過看源碼發(fā)現(xiàn)個有意思的問題:如果goroutine g1加的鎖 可以被另一個goroutine g2解鎖,但是等到g1解鎖的時候就會panic
推薦閱讀
