<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          你真的了解 sync.Mutex嗎

          共 6846字,需瀏覽 14分鐘

           ·

          2021-02-20 15:11

          Mutex是一個互斥的排他鎖,零值Mutex為未上鎖狀態(tài),Mutex一旦被使用 禁止被拷貝。使用起來也比較簡單

          package?main

          import?"sync"

          func?main()?{
          ?m?:=?sync.Mutex{}
          ?m.Lock()
          ?defer?m.Unlock()
          ??//?do?something
          }

          Mutex有兩種操作模式:

          • 正常模式(非公平模式)

          阻塞等待的goroutine保存在FIFO的隊列中,喚醒的goroutine不直接擁有鎖,需要與新來的goroutine競爭獲取鎖。因為新來的goroutine很多已經(jīng)占有了CPU,所以喚醒的goroutine在競爭中很容易輸;但如果一個goroutine獲取鎖失敗超過1ms,則會將Mutex切換為饑餓模式。

          • 饑餓模式(公平模式)

          這種模式下,直接將等待隊列隊頭goroutine解鎖goroutine;新來的gorountine也不會嘗試獲得鎖,而是直接插入到等待隊列隊尾。

          mutex mode

          如果一個goroutine獲得了鎖,并且他在等待隊列隊尾 或者 他等待小于1ms,則會將Mutex的模式切換回正常模式。正常模式有更好的性能,新來的goroutine通過幾次競爭可以直接獲取到鎖,盡管當(dāng)前仍有等待的goroutine。而饑餓模式則是對正常模式的補(bǔ)充,防止等待隊列中的goroutine永遠(yuǎn)沒有機(jī)會獲取鎖。

          其數(shù)據(jù)結(jié)構(gòu)為:

          type?Mutex?struct?{
          ?state?int32?//?鎖競爭的狀態(tài)值
          ?sema??uint32?//?信號量
          }

          state代表了當(dāng)前鎖的狀態(tài)、 是否是存在自旋、是否是饑餓模式、阻塞goroutine數(shù)量

          ?mutexLocked?=?1?<iota?//?mutex?is?locked
          ?mutexWoken
          ?mutexStarving
          ?mutexWaiterShift?=?iota

          mutex state

          mutex.state & mutexLocked 加鎖狀態(tài) 1 表示已加鎖 0 表示未加鎖

          mutex.state & mutexWoken ?喚醒狀態(tài) 1 表示已喚醒狀態(tài) 0 表示未喚醒

          mutex.state & mutexStarving ?饑餓狀態(tài) ?1 表示饑餓狀態(tài)?0表示正常狀態(tài)

          mutex.state >> mutexWaiterShift得到當(dāng)前goroutine數(shù)目

          Lock

          上鎖大致分為fast-pathslow-path

          Fast-path

          lock通過調(diào)用atomic.CompareAndSwapInt32來競爭更新m.state,成功則獲得鎖;失敗,則進(jìn)入slow-path

          func?(m?*Mutex)?Lock()?{
          ?//?Fast?path:?grab?unlocked?mutex.
          ?if?atomic.CompareAndSwapInt32(&m.state,?0,?mutexLocked)?{
          ??if?race.Enabled?{
          ???race.Acquire(unsafe.Pointer(m))
          ??}
          ??return
          ?}
          ?//?Slow?path?(outlined?so?that?the?fast?path?can?be?inlined)
          ?m.lockSlow()
          }

          atomic.CompareAndSwapInt32正如簽名一樣,進(jìn)行比較交換操作,這過程是原子的

          //?CompareAndSwapInt32?executes?the?compare-and-swap?operation?for?an?int32?value.
          func?CompareAndSwapInt32(addr?*int32,?old,?new?int32)?(swapped?bool)

          源碼中我們并不能看到該函數(shù)的具體實(shí)現(xiàn),他的實(shí)現(xiàn)跟硬件平臺有關(guān),我們可以查看匯編代碼一窺究竟,go tool compile -S mutex.go也可以對二進(jìn)制文件go tool objdump -s methodname binary

          	0x0036 00054 (loop.go:6)	MOVQ	AX, CX
          0x0039 00057 ($GOROOT/src/sync/mutex.go:74) XORL AX, AX
          0x003b 00059 ($GOROOT/src/sync/mutex.go:74) MOVL $1, DX
          0x0040 00064 ($GOROOT/src/sync/mutex.go:74) LOCK
          0x0041 00065 ($GOROOT/src/sync/mutex.go:74) CMPXCHGL DX, (CX)
          0x0044 00068 ($GOROOT/src/sync/mutex.go:74) SETEQ AL
          0x0047 00071 ($GOROOT/src/sync/mutex.go:74) TESTB AL, AL
          0x0049 00073 ($GOROOT/src/sync/mutex.go:74) JEQ 150
          0x004b 00075 (loop.go:8) MOVL $8, ""..autotmp_6+16(SP)
          0x0053 00083 (loop.go:8) LEAQ sync.(*Mutex).Unlock·f(SB), AX

          重點(diǎn)關(guān)注第5行CMPXCHGL DX, (CX)這個CMPXCHGL是x86和Intel架構(gòu)中的compare and exchange指令,Java的那套AtomicXX底層也是依賴這個指令來保證原子性操作的。

          所以我們看到Mutex是互斥排他鎖且不可重入,當(dāng)我們在一個goroutine獲取同一個鎖會導(dǎo)致死鎖。

          package?main

          import?"sync"

          func?main()?{
          ?m?:=?sync.Mutex{}
          ?m.Lock()
          ??//這里會導(dǎo)致死鎖
          ?m.Lock()
          ?defer?m.Unlock()
          }

          slow-path

          如果goroutinefast-path失敗,則調(diào)用m.lockSlow()進(jìn)入slow-path,函數(shù)內(nèi)部主要是一個for{}死循環(huán),進(jìn)入循環(huán)的goroutine大致分為兩類:

          • 新來的gorountine
          • 被喚醒的goroutine

          Mutex默認(rèn)為正常模式,若新來的goroutine搶占成功,則另一個就需要阻塞等待;阻塞等待一旦超過閾值1ms則會將Mutex切換到饑餓模式,這個模式下新來的goroutine只能阻塞等待在隊列尾部,沒有搶占的資格。當(dāng)然等待阻塞->喚醒->參與搶占鎖,這個過程顯示不是很高效,所以這里有一個自旋的優(yōu)化

          當(dāng)mutex處于正常模式且能夠自旋,會讓當(dāng)前goroutine自旋等待,同時設(shè)置mutex.state的mutexWoken位為1,保證自旋等待的goroutine一定比新來goroutine更有優(yōu)先權(quán)。這樣unlock操作也會優(yōu)先保證自旋等待的goroutine獲取鎖

          golang對自旋做了些限制要求 需要:

          • 多核CPU
          • GOMAXPROCS>1
          • 至少有一個運(yùn)行的P并且local的P隊列為空

          感興趣的可以跟下源碼比較簡單

          func?(m?*Mutex)?lockSlow()?{
          ?var?waitStartTime?int64
          ?starving?:=?false
          ?awoke?:=?false
          ?iter?:=?0
          ?old?:=?m.state
          ?for?{
          ????//饑餓模式下不能自旋,也沒有資格搶占,鎖是手遞手給到等待的goroutine
          ??if?old&(mutexLocked|mutexStarving)?==?mutexLocked?&&?runtime_canSpin(iter)?{//當(dāng)Mutex處于正常模式且能夠自旋
          ??????//設(shè)置mutexWoken為1?告訴unlock操作,存在自旋gorountine?unlock后不需要喚醒其他goroutine
          ???if?!awoke?&&?old&mutexWoken?==?0?&&?old>>mutexWaiterShift?!=?0?&&
          ????atomic.CompareAndSwapInt32(&m.state,?old,?old|mutexWoken)?{
          ????awoke?=?true
          ???}
          ???runtime_doSpin()
          ???iter++
          ???old?=?m.state
          ???continue
          ??}
          ??//??自旋完了?還是沒拿到鎖
          ??new?:=?old
          ????//當(dāng)mutex處于正常模式,將new的mutexLocked設(shè)置為1?即準(zhǔn)備搶占鎖
          ??if?old&mutexStarving?==?0?{
          ???new?|=?mutexLocked
          ??}
          ????//加鎖狀態(tài)或饑餓模式下?新來的goroutine進(jìn)入等待隊列
          ??if?old&(mutexLocked|mutexStarving)?!=?0?{
          ???new?+=?1?<??}

          ????//將Mutex切換為饑餓模式,若未加鎖則不必切換
          ????//Unlock操作希望饑餓模式存在等待者
          ??if?starving?&&?old&mutexLocked?!=?0?{
          ???new?|=?mutexStarving
          ??}
          ??if?awoke?{
          ??????//?當(dāng)前goroutine自旋過?已被被喚醒,則需要將mutexWoken重置
          ???if?new&mutexWoken?==?0?{
          ????throw("sync:?inconsistent?mutex?state")
          ???}
          ???new?&^=?mutexWoken?//重置mutexWoken
          ??}
          ??if?atomic.CompareAndSwapInt32(&m.state,?old,?new)?{
          ??????//?當(dāng)前goroutine獲取鎖前mutex處于未加鎖?正常模式下
          ???if?old&(mutexLocked|mutexStarving)?==?0?{
          ????break?//?使用CAS成功搶占到鎖
          ???}
          ???//?waitStartTime!=0表示當(dāng)前goroutine是等待狀態(tài)喚醒的?
          ??????//?為了與第一次調(diào)用Lock的goroutine劃分不同的優(yōu)先級
          ???queueLifo?:=?waitStartTime?!=?0
          ???if?waitStartTime?==?0?{
          ????????//開始記錄等待時間
          ????waitStartTime?=?runtime_nanotime()
          ???}
          ??????//?將被喚醒但是沒有獲得鎖的goroutine插入到當(dāng)前等待隊列隊首
          ??????//?使用信號量阻塞當(dāng)前goroutine
          ???runtime_SemacquireMutex(&m.sema,?queueLifo,?1)
          ??????//?當(dāng)goroutine等待時間超過starvationThresholdNs,mutex進(jìn)入饑餓模式
          ???starving?=?starving?||?runtime_nanotime()-waitStartTime?>?starvationThresholdNs
          ???old?=?m.state
          ???if?old&mutexStarving?!=?0?{
          ????????//如果當(dāng)前goroutine被喚醒且mutex處于饑餓模式?則將鎖手遞手交給當(dāng)前goroutine
          ????if?old&(mutexLocked|mutexWoken)?!=?0?||?old>>mutexWaiterShift?==?0?{
          ?????throw("sync:?inconsistent?mutex?state")
          ????}
          ????????//等待狀態(tài)的goroutine?-?1
          ????delta?:=?int32(mutexLocked?-?1<????????//如果等待時間小于1ms?或?當(dāng)前goroutine是隊列中最后一個
          ????if?!starving?||?old>>mutexWaiterShift?==?1?{
          ??????//?退出饑餓模式
          ?????delta?-=?mutexStarving
          ????}
          ????atomic.AddInt32(&m.state,?delta)
          ????break
          ???}
          ???awoke?=?true
          ???iter?=?0
          ??}?else?{
          ???old?=?m.state
          ??}
          ?}
          }

          Unlock

          解鎖分兩種情況

          1. 當(dāng)前只有一個goroutine占有鎖 unlock完 直接結(jié)束
          func?(m?*Mutex)?Unlock()?{

          ?//?去除加鎖狀態(tài)
          ?new?:=?atomic.AddInt32(&m.state,?-mutexLocked)
          ?if?new?!=?0?{//存在等待的goroutine
          ??m.unlockSlow(new)
          ?}
          }
          1. unlock完畢mutex.state!=0 則存在以下可能
            • 直接將鎖交給等待隊列的第一個goroutine
            • 當(dāng)前存在等待goroutine 然后喚醒它 但不是第一個goroutine
            • 當(dāng)前存在自旋等待的goroutine 則不喚醒其他等待gorotune
            • 正常模式下
            • 饑餓模式下
          func?(m?*Mutex)?unlockSlow(new?int32)?{
          ??//未加鎖的情況下不能多次調(diào)用unlock
          ?if?(new+mutexLocked)&mutexLocked?==?0?{
          ??throw("sync:?unlock?of?unlocked?mutex")
          ?}
          ?if?new&mutexStarving?==?0?{//正常模式下
          ??old?:=?new
          ??for?{
          ??????//沒有等待的goroutine?或?已經(jīng)存在一個獲得鎖?或被喚醒?或處于饑餓模式下不需要喚醒任何處于等待的goroutine
          ???if?old>>mutexWaiterShift?==?0?||?old&(mutexLocked|mutexWoken|mutexStarving)?!=?0?{
          ????return
          ???}
          ???//?等待狀態(tài)goroutine數(shù)量-1?并設(shè)置喚醒狀態(tài)為1?然后喚醒一個等待goroutine
          ???new?=?(old?-?1<???if?atomic.CompareAndSwapInt32(&m.state,?old,?new)?{
          ????????//喚醒一個阻塞的goroutine?但不是第一個等待者
          ????runtime_Semrelease(&m.sema,?false,?1)
          ????return
          ???}
          ???old?=?m.state
          ??}
          ?}?else?{
          ????//饑餓模式下手遞手將鎖交給隊列第一個等待的goroutine
          ????//即使期間有新來的goroutine到來,只要處于饑餓模式?鎖就不會被新來的goroutine搶占
          ??runtime_Semrelease(&m.sema,?true,?1)
          ?}
          }

          信號量

          上面可以看到Mutexgoroutine的阻塞和喚醒操作是利用semaphore來實(shí)現(xiàn)的,大致的思路是:Go runtime維護(hù)了一個全局的變量semtable,它保持了所有的信號量

          //?Prime?to?not?correlate?with?any?user?patterns.
          const?semTabSize?=?251

          var?semtable?[semTabSize]struct?{
          ?root?semaRoot
          ?pad??[cpu.CacheLinePadSize?-?unsafe.Sizeof(semaRoot{})]byte
          }

          每個信號量都由一個變量地址指定,Mutex的栗子里就是mutex.sema的地址

          type?semaRoot?struct?{
          ?lock??mutex
          ?treap?*sudog?//?root?of?balanced?tree?of?unique?waiters.
          ?nwait?uint32?//?Number?of?waiters.?Read?w/o?the?lock.
          }

          大致畫了下其數(shù)據(jù)結(jié)構(gòu)

          semtable
          1. 當(dāng)goroutine未獲取到鎖,需要阻塞時調(diào)用sync.runtime_SemacquireMutex 進(jìn)入阻塞邏輯
          //go:linkname?sync_runtime_SemacquireMutex?sync.runtime_SemacquireMutex
          func?sync_runtime_SemacquireMutex(addr?*uint32,?lifo?bool,?skipframes?int)?{
          ?semacquire1(addr,?lifo,?semaBlockProfile|semaMutexProfile,?skipframes)
          }

          func?semacquire1(addr?*uint32,?lifo?bool,?profile?semaProfileFlags,?skipframes?int)?{
          ?gp?:=?getg()
          ?if?gp?!=?gp.m.curg?{
          ??throw("semacquire?not?on?the?G?stack")
          ?}

          ?//?低成本case
          ??//?若addr大于1?并通過CAS?-1?成功,則獲取信號量成功?不需要阻塞
          ?if?cansemacquire(addr)?{
          ??return
          ?}

          ?//?復(fù)雜?case:
          ?//?增加等待goroutine數(shù)量
          ?//?再次嘗試cansemacquire?成功則返回
          ?//?失敗則將自己作為一個waiter入隊
          ?//?sleep
          ?//?(waiter?descriptor?is?dequeued?by?signaler)
          ?s?:=?acquireSudog()
          ?root?:=?semroot(addr)
          ?t0?:=?int64(0)
          ?s.releasetime?=?0
          ?s.acquiretime?=?0
          ?s.ticket?=?0
          ?if?profile&semaBlockProfile?!=?0?&&?blockprofilerate?>?0?{
          ??t0?=?cputicks()
          ??s.releasetime?=?-1
          ?}
          ?if?profile&semaMutexProfile?!=?0?&&?mutexprofilerate?>?0?{
          ??if?t0?==?0?{
          ???t0?=?cputicks()
          ??}
          ??s.acquiretime?=?t0
          ?}
          ?for?{
          ??lock(&root.lock)
          ??//?給nwait+1?這樣semrelease中不會進(jìn)低成本路徑了
          ??atomic.Xadd(&root.nwait,?1)
          ??//?檢查?cansemacquire?避免錯過喚醒
          ??if?cansemacquire(addr)?{
          ???atomic.Xadd(&root.nwait,?-1)
          ???unlock(&root.lock)
          ???break
          ??}
          ????//cansemacquire之后的semrelease都可以知道我們正在等待
          ????//上面設(shè)置了nwait,所以會直接進(jìn)入sleep?即goparkunlock
          ??root.queue(addr,?s,?lifo)
          ??goparkunlock(&root.lock,?waitReasonSemacquire,?traceEvGoBlockSync,?4+skipframes)
          ??if?s.ticket?!=?0?||?cansemacquire(addr)?{
          ???break
          ??}
          ?}
          ?if?s.releasetime?>?0?{
          ??blockevent(s.releasetime-t0,?3+skipframes)
          ?}
          ?releaseSudog(s)
          }

          如果addr大于1并通過CAS-1成功則獲取信號量成功,直接返回

          否則通過對信號量地址偏移取模&semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root拿到semaRoot(這里個3和251 沒有明白為什么是這兩個數(shù)???),semaRoot包含了一個sudog鏈表和一個nwait整型字段。nwait表示該信號量上阻塞等待的g的數(shù)量,同時為了保證線程安全需要一個互斥量來保護(hù)鏈表。

          這里需要注意的是 此處的runtime.mutex并不是之前所說的sync.Mutex,是內(nèi)部的一個簡單版本

          簡單來說,sync_runtime_Semacquire就是wait知道*s>0 然后原子的遞減它,來完成同步過程中簡單的睡眠原語

          1. 當(dāng)goroutine要釋放鎖 喚醒等待的g時調(diào)用sync.runtime_Semrelease
          //go:linkname?sync_runtime_Semrelease?sync.runtime_Semrelease
          func?sync_runtime_Semrelease(addr?*uint32,?handoff?bool,?skipframes?int)?{
          ?semrelease1(addr,?handoff,?skipframes)
          }

          func?semrelease1(addr?*uint32,?handoff?bool,?skipframes?int)?{
          ?root?:=?semroot(addr)
          ?atomic.Xadd(addr,?1)

          ?//?Easy?case:?no?waiters?
          ?//?這個檢查必須發(fā)生在xadd之后?避免錯過喚醒
          ?//?(see?loop?in?semacquire).
          ?if?atomic.Load(&root.nwait)?==?0?{
          ??return
          ?}

          ?//?Harder?case:?搜索一個waiter?并喚醒它
          ?lock(&root.lock)
          ?if?atomic.Load(&root.nwait)?==?0?{
          ??//?count值已經(jīng)被另一個goroutine消費(fèi)了
          ??//?所以不需要喚醒其他goroutine
          ??unlock(&root.lock)
          ??return
          ?}
          ?s,?t0?:=?root.dequeue(addr)
          ?if?s?!=?nil?{
          ??atomic.Xadd(&root.nwait,?-1)
          ?}
          ?unlock(&root.lock)
          ?if?s?!=?nil?{?//?May?be?slow,?so?unlock?first
          ??acquiretime?:=?s.acquiretime
          ??if?acquiretime?!=?0?{
          ???mutexevent(t0-acquiretime,?3+skipframes)
          ??}
          ??if?s.ticket?!=?0?{
          ???throw("corrupted?semaphore?ticket")
          ??}
          ??if?handoff?&&?cansemacquire(addr)?{
          ???s.ticket?=?1
          ??}
          ??readyWithTime(s,?5+skipframes)
          ?}
          }

          關(guān)于信號量更深層的研究可以看下semaphore in plan9

          總結(jié)

          通過看源碼發(fā)現(xiàn)個有意思的問題:如果goroutine g1加的鎖 可以被另一個goroutine g2解鎖,但是等到g1解鎖的時候就會panic



          推薦閱讀


          福利

          我為大家整理了一份從入門到進(jìn)階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門看什么,進(jìn)階看什么。關(guān)注公眾號 「polarisxu」,回復(fù)?ebook?獲取;還可以回復(fù)「進(jìn)群」,和數(shù)萬 Gopher 交流學(xué)習(xí)。

          瀏覽 104
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  婷婷五月天丁香激情 | 俺来了俺去了不卡 | 日韩福利网 | 免费黄色视频。 | 日韩免费高清视频 |