<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          手摸手Go 深入剖析sync.Pool

          共 4344字,需瀏覽 9分鐘

           ·

          2021-02-23 11:17

          如果能夠?qū)⑺袃?nèi)存都分配到棧上無疑性能是最佳的,但不幸的是我們不可避免需要使用堆上分配的內(nèi)存。我們可以優(yōu)化使用堆內(nèi)存時的性能損耗嗎?答案是肯定的。Go同步包中,sync.Pool提供了保存和訪問一組臨時對象并復(fù)用它們的能力。

          對于一些創(chuàng)建成本昂貴、頻繁使用的臨時對象,使用sync.Pool可以減少內(nèi)存分配,降低GC壓力。因?yàn)?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">Go的gc算法是根據(jù)標(biāo)記清除改進(jìn)的三色標(biāo)記法,如果頻繁創(chuàng)建大量臨時對象,勢必給GC標(biāo)記帶來負(fù)擔(dān),CPU也很容易出現(xiàn)毛刺現(xiàn)象。當(dāng)然需要注意的是:存儲在Pool中的對象隨時都可能在不被通知的情況下被移除。所以并不是所有頻繁使用、創(chuàng)建昂貴的對象都適用,比如DB連接、線程池

          Talk is cheap,Show me your code

          因?yàn)?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">Go1.13版本后對sync.Pool做了優(yōu)化,放棄了利用sync.Mutex加鎖的方式該用CAS加帶環(huán)形數(shù)組的雙向鏈表的方式來實(shí)現(xiàn),本文基于Go1.15.8最新穩(wěn)定版本分析。

          基本使用

          package?main

          import?"sync"

          type?Person?struct?{
          ?Age?int
          }

          //?初始化pool
          var?personPool?=?sync.Pool{
          ?New:?func()?interface{}?{
          ??return?new(Person)
          ?},
          }

          func?main()?{
          ?//?獲取一個實(shí)例
          ?newPerson?:=?personPool.Get().(*Person)
          ?//?回收對象?以備其他協(xié)程使用
          ?defer?personPool.Put(newPerson)

          ?newPerson.Age?=?25
          }

          使用起來比較簡單大概分三步:

          1. 初始化Pool

          提供一個New函數(shù),當(dāng)Pool中未緩存該對象時調(diào)用

          1. 使用Get從緩存池中獲取對象,接著進(jìn)行業(yè)務(wù)邏輯處理即可
          2. 使用完畢 利用Put將對象交還給緩存池

          需要注意的是:跟sync.Mutex一樣sync.Pool第一次使用之后是不允許被拷貝的。

          sync.Pool對性能優(yōu)化真的有這么大魔力嗎?Benchmark之

          import?(
          ?"testing"
          )

          func?BenchmarkWithoutPool(b?*testing.B)?{
          ?var?p?*Person
          ?b.ReportAllocs()
          ?b.ResetTimer()
          ?for?i?:=?0;?i???for?j?:=?0;?j?10000;?j++?{
          ???p?=?new(Person)
          ???p.Age?=?30
          ??}
          ?}
          }

          func?BenchmarkWithPool(b?*testing.B)?{
          ?var?p?*Person
          ?b.ReportAllocs()
          ?b.ResetTimer()
          ?for?i?:=?0;?i???for?j?:=?0;?j?10000;?j++?{
          ???p?=?personPool.Get().(*Person)
          ???p.Age?=?30
          ???personPool.Put(p)
          ??}
          ?}
          }

          基準(zhǔn)測試結(jié)果:

          BenchmarkWithoutPool
          BenchmarkWithoutPool-8????????7630?????135523?ns/op????80000?B/op????10000?allocs/op
          BenchmarkWithPool
          BenchmarkWithPool-8????????9865?????126072?ns/op????????0?B/op????????0?allocs/op

          工作原理

          沒有啥一張圖搞不定的

          allPools

          如果不行 那就再來一張

          pool architecture

          sync.Pool 數(shù)據(jù)結(jié)構(gòu)

          type?Pool?struct?{
          ?noCopy?noCopy
          ?//?實(shí)際指向[]poolLocal?每個P對應(yīng)一個poolLocal?數(shù)組大小取決于P的數(shù)量?runtime.GOMAXPROCS(0)
          ?local?????unsafe.Pointer?
          ?localSize?uintptr????????//?[]poolLocal的大小

          ?victim?????unsafe.Pointer?//?local?from?previous?cycle
          ?victimSize?uintptr????????//?size?of?victims?array
          ??
          ??//當(dāng)緩存池?zé)o對應(yīng)對象時調(diào)用
          ?New?func()?interface{}
          }

          相較于Go1.13之前版本,sync.Pool的結(jié)構(gòu)體中新增了victimvictimSize字段

          sync.Pool主要維護(hù)了一個sync.poolLocal的數(shù)組,數(shù)組大小由runtime.GOMAXPROCS(0)決定。

          type?poolLocal?struct?{
          ?poolLocalInternal
          ?//?Prevents?false?sharing?on?widespread?platforms?with
          ?//?128?mod?(cache?line?size)?=?0?.
          ?pad?[128?-?unsafe.Sizeof(poolLocalInternal{})%128]byte
          }

          //?Local?per-P?Pool?appendix.
          type?poolLocalInternal?struct?{
          ?private?interface{}?//?只能被對應(yīng)的P使用
          ?shared??poolChain???//?本地的P可以從Head?進(jìn)行pushHead/popHead?其他的P可以popTail.
          }


          poolLocal內(nèi)部又由P私有空間private和共享空間shared。共享空間是一個雙端隊列,雙端隊列每個節(jié)點(diǎn)又對應(yīng)著一個環(huán)形數(shù)組,聽著貌似有點(diǎn)兒繞,老規(guī)矩上圖:

          pool chain

          poolDequeue算是個邏輯上的環(huán)形數(shù)組,字段vals存儲著實(shí)際的值,出于操作原子性的考慮,headTail字段將首尾索引融合在一起,高32位為head的索引下標(biāo),低32位為tail的索引下標(biāo),head和tail指向同一位置則表示環(huán)形數(shù)組為空。

          headTail

          代碼佐證:

          func?(d?*poolDequeue)?unpack(ptrs?uint64)?(head,?tail?uint32)?{
          ?const?mask?=?1<1
          ?head?=?uint32((ptrs?>>?dequeueBits)?&?mask)
          ?tail?=?uint32(ptrs?&?mask)
          ?return
          }
          func?(d?*poolDequeue)?pack(head,?tail?uint32)?uint64?{
          ?const?mask?=?1<1
          ?return?(uint64(head)?<??uint64(tail&mask)
          }

          sync.Pool實(shí)際使用過程中又將poolDequeue進(jìn)行了包裝,因?yàn)閿?shù)組大小是固定,所以為了讓他大小可變,將其包裝成了poolChainElt雙向鏈表。

          操作方法

          接下來我們來剖析一下sync.Pool幾個核心流程

          獲取對象 p.Get

          獲取對象,大體流程:

          1. 將當(dāng)前goroutine與P綁定并防止被搶占 具體是調(diào)用了runtime_procPin,返回poolLocal和P的id
          2. 優(yōu)先從私有空間獲取對象
          3. 若私有空間沒有,則嘗試從共享區(qū)域獲取
          4. 若共享區(qū)域也沒拿到,則嘗試從別人那邊“偷”來一個
          5. 若偷都偷不到,那么自己手動New一個
          func?(p?*Pool)?Get()?interface{}?{
          ??//?將當(dāng)前goroutine與P進(jìn)行綁定?runtime_procPin禁用搶占
          ??//?返回poolLocal與P的id
          ?l,?pid?:=?p.pin()
          ?x?:=?l.private?//嘗試直接從私有空間拿
          ?l.private?=?nil
          ?if?x?==?nil?{
          ????//從共享區(qū)域頭部拿
          ??x,?_?=?l.shared.popHead()
          ??if?x?==?nil?{
          ??????//直接實(shí)在沒有?嘗試去別人那邊看看能不能偷個
          ???x?=?p.getSlow(pid)
          ??}
          ?}
          ??//?解除搶占禁用
          ?runtime_procUnpin()
          ??//?都沒有?那只好自己New一個
          ?if?x?==?nil?&&?p.New?!=?nil?{
          ??x?=?p.New()
          ?}
          ?return?x
          }

          那么我們來看看goroutine 是怎么跟P綁定的

          func?(p?*Pool)?pin()?(*poolLocal,?int)?{
          ?pid?:=?runtime_procPin()
          ??//?pinSlow中我們先存儲local再存儲localSize,這里我們以相反順序加載
          ??//?因?yàn)槲覀円呀?jīng)禁用了搶占?GC這期間不會發(fā)生?因此我們需要觀察local的大小至少跟localSize一樣
          ?s?:=?atomic.LoadUintptr(&p.localSize)?//?load-acquire
          ?l?:=?p.local??????????????????????????//?load-consume
          ?if?uintptr(pid)???return?indexLocal(l,?pid),?pid
          ?}
          ??//?運(yùn)行過程中可能會存在調(diào)整P的情況?或者GC了
          ?return?p.pinSlow()
          }

          這里我們先調(diào)用runtime_procPin(),為啥它這么牛逼,不僅讓P不會被搶占,還讓GC為之折腰?

          番外:禁止搶占

          func?runtime_procPin()?int
          //go:linkname?sync_runtime_procPin?sync.runtime_procPin
          //go:nosplit
          func?sync_runtime_procPin()?int?{
          ?return?procPin()
          }
          //go:nosplit
          func?procPin()?int?{
          ?_g_?:=?getg()
          ?mp?:=?_g_.m

          ?mp.locks++
          ?return?int(mp.p.ptr().id)
          }

          正如所見,兜兜轉(zhuǎn)轉(zhuǎn)實(shí)際綁定goroutine和P、禁用搶占交給了procPinprocPin首先從TLS或?qū)S眉拇嫫髂玫疆?dāng)前的goroutine,然后獲取當(dāng)前gorountine綁定的物理線程,并對物理線程的locks屬性自增操作。這意味什么呢?

          這里可能涉及到一些goroutine調(diào)度的內(nèi)容,Go runtime調(diào)度是一個GPM模型。G為調(diào)度的基本單元,P可以理解為運(yùn)行G的邏輯CPU M為系統(tǒng)線程。何為搶占?

          即,將m綁定的P給占用,因?yàn)?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">Go runtime中99.9%的任務(wù)都需要P才能執(zhí)行任務(wù)。Go運(yùn)行時調(diào)度主要存在兩種搶占的情況:

          • 第一種情況,進(jìn)行系統(tǒng)調(diào)用的G,因?yàn)榇嬖谧枞瞪档仍谀抢飼容^浪費(fèi)計算資源,為了讓其他goroutine不被餓死
          • 第二種情況,如果一個G運(yùn)行時間太長,P中其他G得不到執(zhí)行也會餓死
          搶占實(shí)現(xiàn)

          Go中的搶占是sysmon實(shí)現(xiàn)的。對 沒錯就是runtime.main里的那個sysmon也是唯一一個脫離GPM模型只需GM即可運(yùn)行的特例。sysmon中包含了netpoolretakeforcegcscavengeheap,這里搶占我們需要關(guān)注下retake

          //go:nowritebarrierrec
          func?sysmon()?{
          ??...
          ?//?retake?P's?blocked?in?syscalls
          ??//?and?preempt?long?running?G's
          ??if?retake(now)?!=?0?{
          ???idle?=?0
          ??}?else?{
          ???idle++
          ??}
          ??...
          }
          func?retake(now?int64)?uint32?{
          ?...?
          if?s?==?_Prunning?||?s?==?_Psyscall?{
          ???//?Preempt?G?if?it's?running?for?too?long.
          ???t?:=?int64(_p_.schedtick)
          ???if?int64(pd.schedtick)?!=?t?{
          ????pd.schedtick?=?uint32(t)
          ????pd.schedwhen?=?now
          ???}?else?if?pd.schedwhen+forcePreemptNS?<=?now?{//G運(yùn)行時間超過forcePreemptNS
          ????preemptone(_p_)
          ????//?In?case?of?syscall,?preemptone()?doesn't
          ????//?work,?because?there?is?no?M?wired?to?P.
          ????sysretake?=?true
          ???}
          ??...
          }

          P處于運(yùn)行中或系統(tǒng)調(diào)用,檢查G運(yùn)行時間是否超過forcePreemptNS(10ms),超過則調(diào)用preemptone(_p_)搶占這個P

          func?preemptone(_p_?*p)?bool?{
          ?mp?:=?_p_.m.ptr()
          ?if?mp?==?nil?||?mp?==?getg().m?{
          ??return?false
          ?}
          ?gp?:=?mp.curg
          ?if?gp?==?nil?||?gp?==?mp.g0?{
          ??return?false
          ?}

          ?gp.preempt?=?true

          ?//?Every?call?in?a?go?routine?checks?for?stack?overflow?by
          ?//?comparing?the?current?stack?pointer?to?gp->stackguard0.
          ?//?Setting?gp->stackguard0?to?StackPreempt?folds
          ?//?preemption?into?the?normal?stack?overflow?check.
          ?gp.stackguard0?=?stackPreempt

          ?//?Request?an?async?preemption?of?this?P.
          ?if?preemptMSupported?&&?debug.asyncpreemptoff?==?0?{
          ??_p_.preempt?=?true
          ??preemptM(mp)
          ?}

          ?return?true
          }

          主要是設(shè)置兩個標(biāo)志位gp.preemptgp.stackguard0 主要起作用的是后者。通過將goroutinestackguard0設(shè)置為(1<<(8*sys.PtrSize) - 1)& -1314,導(dǎo)致P在執(zhí)行G下一次的函數(shù)調(diào)用時,棧空間檢查失敗(stackguard0與SP寄存器比較),進(jìn)而觸發(fā)編譯器安插的指令morestack

          //以asm_amd64.s為例
          TEXT runtime·morestack(SB),NOSPLIT,$0-0
          ... ...
          // Call newstack on m->g0's stack.
          MOVQ m_g0(BX), BX
          MOVQ BX, g(CX)
          MOVQ (g_sched+gobuf_sp)(BX), SP
          CALL runtime·newstack(SB)
          CALL runtime·abort(SB) // crash if newstack returns
          RET

          morestack會調(diào)用newstack嘗試棧擴(kuò)容

          //go:nowritebarrierrec
          func?newstack()?{
          ??...?...
          ?if?preempt?{
          ??if?!canPreemptM(thisg.m)?{
          ???//?Let?the?goroutine?keep?running?for?now.
          ???//?gp->preempt?is?set,?so?it?will?be?preempted?next?time.
          ???gp.stackguard0?=?gp.stack.lo?+?_StackGuard
          ???gogo(&gp.sched)?//?never?return
          ??}
          ?}
          ?...?...?
          }
          //go:nosplit
          func?canPreemptM(mp?*m)?bool?{
          ?return?mp.locks?==?0?&&?mp.mallocing?==?0?&&?mp.preemptoff?==?""?&&?mp.p.ptr().status?==?_Prunning
          }

          newstack在棧擴(kuò)容前會檢查搶占標(biāo)志位mp.locks!=0則不搶占。

          如果搶占成功,則會繼續(xù)調(diào)用gopreempt_m(gp)進(jìn)而調(diào)用goschedImpl(gp)P與當(dāng)前m接觸關(guān)聯(lián),設(shè)置goroutine狀態(tài)casgstatus(gp, _Grunning, _Grunnable),然后將goroutine插入Global runnable queue 等待下次調(diào)度。

          至此,應(yīng)該能徹底明白為啥runtime_procPin能夠通過修改goroutine綁定的mlocks屬性就能禁用搶占了。

          但是還有個問題,為啥GC也拿它沒辦法?

          關(guān)于GoGC,大致有三種觸發(fā)方式:

          • gcTriggerCycle 后臺定時檢查觸發(fā),如 runtime.sysmon
          • gcTriggerTimer 自上個GC周期超過forcegcperiod納秒則觸發(fā) 如runtime.forcegchelper
          • g cTriggerHeap 申請的堆內(nèi)存大小達(dá)到觸發(fā)閾值 如 runtime.mallocgc

          最終都會調(diào)用gcStart(trigger gcTrigger),進(jìn)而我們在GC的STW階段執(zhí)行中可以看到

          func?stopTheWorldWithSema()?{
          ?_g_?:=?getg()

          ?//?If?we?hold?a?lock,?then?we?won't?be?able?to?stop?another?M
          ?//?that?is?blocked?trying?to?acquire?the?lock.
          ?if?_g_.m.locks?>?0?{
          ??throw("stopTheWorld:?holding?locks")
          ?}
          ?lock(&sched.lock)
          ?sched.stopwait?=?gomaxprocs
          ?atomic.Store(&sched.gcwaiting,?1)
          ?preemptall()
          ?//?stop?current?P
          ?_g_.m.p.ptr().status?=?_Pgcstop?//?Pgcstop?is?only?diagnostic.
          ?sched.stopwait--
          ?//?try?to?retake?all?P's?in?Psyscall?status
          ?for?_,?p?:=?range?allp?{
          ??s?:=?p.status
          ??if?s?==?_Psyscall?&&?atomic.Cas(&p.status,?s,?_Pgcstop)?{
          ???if?trace.enabled?{
          ????traceGoSysBlock(p)
          ????traceProcStop(p)
          ???}
          ???p.syscalltick++
          ???sched.stopwait--
          ??}
          ?}
          ?//?stop?idle?P's
          ?for?{
          ??p?:=?pidleget()
          ??if?p?==?nil?{
          ???break
          ??}
          ??p.status?=?_Pgcstop
          ??sched.stopwait--
          ?}
          ?wait?:=?sched.stopwait?>?0
          ?unlock(&sched.lock)

          ?//?wait?for?remaining?P's?to?stop?voluntarily
          ?if?wait?{
          ??for?{
          ???//?wait?for?100us,?then?try?to?re-preempt?in?case?of?any?races
          ???if?notetsleep(&sched.stopnote,?100*1000)?{
          ????noteclear(&sched.stopnote)
          ????break
          ???}
          ???preemptall()
          ??}
          ?}

          ?//?sanity?checks
          ?bad?:=?""
          ?if?sched.stopwait?!=?0?{
          ??bad?=?"stopTheWorld:?not?stopped?(stopwait?!=?0)"
          ?}?else?{
          ??for?_,?p?:=?range?allp?{
          ???if?p.status?!=?_Pgcstop?{
          ????bad?=?"stopTheWorld:?not?stopped?(status?!=?_Pgcstop)"
          ???}
          ??}
          ?}
          ?if?atomic.Load(&freezing)?!=?0?{
          ??//?Some?other?thread?is?panicking.?This?can?cause?the
          ??//?sanity?checks?above?to?fail?if?the?panic?happens?in
          ??//?the?signal?handler?on?a?stopped?thread.?Either?way,
          ??//?we?should?halt?this?thread.
          ??lock(&deadlock)
          ??lock(&deadlock)
          ?}
          ?if?bad?!=?""?{
          ??throw(bad)
          ?}
          }

          大致邏輯先調(diào)用preemptall()嘗試搶占所有的P,然后停掉當(dāng)前P,遍歷所有的P,如果P處于系統(tǒng)調(diào)用則直接stop掉;然后處理空閑的P;最后檢查是否存在需要等待處理的P,如果有則循環(huán)等待,并嘗試調(diào)用preemptall()

          func?preemptall()?bool?{
          ?res?:=?false
          ?for?_,?_p_?:=?range?allp?{
          ??if?_p_.status?!=?_Prunning?{
          ???continue
          ??}
          ??if?preemptone(_p_)?{
          ???res?=?true
          ??}
          ?}
          ?return?res
          }

          到這里就很清晰了,我們又看到老朋友preemptone(_p_),顯然GC會在STW階段等下去,GC自然也無法執(zhí)行下去。

          好了 剛剛兩個問題我們已經(jīng)搞清楚了。書歸正傳 runtime_procPin能禁用P被搶占,那么runtime_procUnpin自然能解除禁用。完成goroutineP的綁定,返回了當(dāng)前P的id,如果pid則說明當(dāng)前poolLocal已經(jīng)存在 直接利用地址偏移拿到poolLocal

          func?indexLocal(l?unsafe.Pointer,?i?int)?*poolLocal?{
          ?lp?:=?unsafe.Pointer(uintptr(l)?+?uintptr(i)*unsafe.Sizeof(poolLocal{}))
          ?return?(*poolLocal)(lp)
          }

          如果運(yùn)行時P被調(diào)整了呢?那么嘗試下p.pinSlow(),正如其名這個過程會有點(diǎn)兒慢

          func?(p?*Pool)?pinSlow()?(*poolLocal,?int)?{
          ?//?Retry?under?the?mutex.
          ?//?Can?not?lock?the?mutex?while?pinned.
          ?runtime_procUnpin()
          ?allPoolsMu.Lock()
          ?defer?allPoolsMu.Unlock()
          ?pid?:=?runtime_procPin()
          ?//?poolCleanup?won't?be?called?while?we?are?pinned.
          ?s?:=?p.localSize
          ?l?:=?p.local
          ?if?uintptr(pid)???return?indexLocal(l,?pid),?pid
          ?}
          ?if?p.local?==?nil?{
          ??allPools?=?append(allPools,?p)
          ?}
          ?//?If?GOMAXPROCS?changes?between?GCs,?we?re-allocate?the?array?and?lose?the?old?one.
          ?size?:=?runtime.GOMAXPROCS(0)
          ?local?:=?make([]poolLocal,?size)
          ?atomic.StorePointer(&p.local,?unsafe.Pointer(&local[0]))?//?store-release
          ?atomic.StoreUintptr(&p.localSize,?uintptr(size))?????????//?store-release
          ?return?&local[pid],?pid
          }

          pinSlow()上來第一件事兒 將我們之前設(shè)置的P禁用搶占給釋放了。然后嘗試獲取全局排他鎖allPoolsMu Mutex。這也能解釋它為啥上來就釋放掉之前的禁止占用,因?yàn)楂@取當(dāng)前全局排他鎖不一定能立馬拿到啊。拿到鎖之后又開啟了禁止搶占P,接著又判斷了下uintptr(pid) < s因?yàn)槟玫芥i之前P可能已經(jīng)變化了。如果當(dāng)前p.local=nil則將p放到全局的池子allPools []*Pool里,也是為啥剛才需要等待全局排他鎖的原因。因?yàn)?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">GC時會將原有的pool清理掉所以這里進(jìn)行重建,原有pool真的沒了嗎?這個就跟之前提到的victim有點(diǎn)兒關(guān)系了 等會兒一起看。

          至此,我們拿到了poolLocal,接著獲取對象的順序?yàn)?/p>

          1. 首先嘗試從本地的private中獲取
          2. 如果本地沒拿到,則x, _ = l.shared.popHead()嘗試從共享空間拿
          func?(c?*poolChain)?popHead()?(interface{},?bool)?{
          ?d?:=?c.head
          ?for?d?!=?nil?{
          ??if?val,?ok?:=?d.popHead();?ok?{
          ???return?val,?ok
          ??}
          ??//?There?may?still?be?unconsumed?elements?in?the
          ??//?previous?dequeue,?so?try?backing?up.
          ??d?=?loadPoolChainElt(&d.prev)
          ?}
          ?return?nil,?false
          }

          共享空間是以PoolChainElt為節(jié)點(diǎn)的雙向鏈表,首先我們嘗試沿著雙向鏈表prev的方向依次調(diào)用d.popHead()嘗試從頭部拿數(shù)據(jù)

          func?(d?*poolDequeue)?popHead()?(interface{},?bool)?{
          ?var?slot?*eface
          ?for?{
          ??ptrs?:=?atomic.LoadUint64(&d.headTail)
          ??head,?tail?:=?d.unpack(ptrs)
          ??if?tail?==?head?{
          ???//?Queue?is?empty.
          ???return?nil,?false
          ??}

          ??//?Confirm?tail?and?decrement?head.?We?do?this?before
          ??//?reading?the?value?to?take?back?ownership?of?this
          ??//?slot.
          ??head--
          ??ptrs2?:=?d.pack(head,?tail)
          ??if?atomic.CompareAndSwapUint64(&d.headTail,?ptrs,?ptrs2)?{
          ???//?We?successfully?took?back?slot.
          ???slot?=?&d.vals[head&uint32(len(d.vals)-1)]
          ???break
          ??}
          ?}

          ?val?:=?*(*interface{})(unsafe.Pointer(slot))
          ?if?val?==?dequeueNil(nil)?{
          ??val?=?nil
          ?}
          ?//?Zero?the?slot.?Unlike?popTail,?this?isn't?racing?with
          ?//?pushHead,?so?we?don't?need?to?be?careful?here.
          ?*slot?=?eface{}
          ?return?val,?true
          }

          邏輯也比較簡單

          2.1 將headTail拆封 如果head==tail表明當(dāng)前環(huán)形數(shù)組為空,直接返回

          2.2 接著將head索引減1,然后將head、tail再打包回去,通過CAS判斷當(dāng)前沒有并發(fā)修改就拿到數(shù)據(jù) 跳出循環(huán) 否則循環(huán)等待

          2.3 將slot轉(zhuǎn)為interface{}類型

          2.4 將slot賦值為eface{}

          1. 如果共享空間依然沒拿到,那么想辦法從其他P那偷個吧p.getSlow(pid)
          func?(p?*Pool)?getSlow(pid?int)?interface{}?{
          ?//?See?the?comment?in?pin?regarding?ordering?of?the?loads.
          ?size?:=?atomic.LoadUintptr(&p.localSize)?//?load-acquire
          ?locals?:=?p.local????????????????????????//?load-consume
          ?//?Try?to?steal?one?element?from?other?procs.
          ?for?i?:=?0;?i?int(size);?i++?{
          ??l?:=?indexLocal(locals,?(pid+i+1)%int(size))
          ??if?x,?_?:=?l.shared.popTail();?x?!=?nil?{
          ???return?x
          ??}
          ?}

          ?//?Try?the?victim?cache.?We?do?this?after?attempting?to?steal
          ?//?from?all?primary?caches?because?we?want?objects?in?the
          ?//?victim?cache?to?age?out?if?at?all?possible.
          ?size?=?atomic.LoadUintptr(&p.victimSize)
          ?if?uintptr(pid)?>=?size?{
          ??return?nil
          ?}
          ?locals?=?p.victim
          ?l?:=?indexLocal(locals,?pid)
          ?if?x?:=?l.private;?x?!=?nil?{
          ??l.private?=?nil
          ??return?x
          ?}
          ?for?i?:=?0;?i?int(size);?i++?{
          ??l?:=?indexLocal(locals,?(pid+i)%int(size))
          ??if?x,?_?:=?l.shared.popTail();?x?!=?nil?{
          ???return?x
          ??}
          ?}

          ?//?Mark?the?victim?cache?as?empty?for?future?gets?don't?bother
          ?//?with?it.
          ?atomic.StoreUintptr(&p.victimSize,?0)

          ?return?nil
          }

          3.1 拿到[]poolLocal數(shù)組,遍歷每個poolLocal,并調(diào)用l.shared.popTail() 從其共享空間的尾部拿數(shù)據(jù)

          func?(c?*poolChain)?popTail()?(interface{},?bool)?{
          ?d?:=?loadPoolChainElt(&c.tail)
          ?if?d?==?nil?{
          ??return?nil,?false
          ?}

          ?for?{
          ??//?It's?important?that?we?load?the?next?pointer
          ??//?*before*?popping?the?tail.?In?general,?d?may?be
          ??//?transiently?empty,?but?if?next?is?non-nil?before
          ??//?the?pop?and?the?pop?fails,?then?d?is?permanently
          ??//?empty,?which?is?the?only?condition?under?which?it's
          ??//?safe?to?drop?d?from?the?chain.
          ??d2?:=?loadPoolChainElt(&d.next)

          ??if?val,?ok?:=?d.popTail();?ok?{
          ???return?val,?ok
          ??}

          ??if?d2?==?nil?{
          ???//?This?is?the?only?dequeue.?It's?empty?right
          ???//?now,?but?could?be?pushed?to?in?the?future.
          ???return?nil,?false
          ??}

          ??//?The?tail?of?the?chain?has?been?drained,?so?move?on
          ??//?to?the?next?dequeue.?Try?to?drop?it?from?the?chain
          ??//?so?the?next?pop?doesn't?have?to?look?at?the?empty
          ??//?dequeue?again.
          ??if?atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)),?unsafe.Pointer(d),?unsafe.Pointer(d2))?{
          ???//?We?won?the?race.?Clear?the?prev?pointer?so
          ???//?the?garbage?collector?can?collect?the?empty
          ???//?dequeue?and?so?popHead?doesn't?back?up
          ???//?further?than?necessary.
          ???storePoolChainElt(&d2.prev,?nil)
          ??}
          ??d?=?d2
          ?}
          }

          首先拿到尾節(jié)點(diǎn),然后在死循環(huán)中沿著雙向鏈表next的方向不斷獲取PoolChainElt節(jié)點(diǎn),嘗試調(diào)用d.popTail()獲取數(shù)據(jù)

          func?(d?*poolDequeue)?popTail()?(interface{},?bool)?{
          ?var?slot?*eface
          ?for?{
          ??ptrs?:=?atomic.LoadUint64(&d.headTail)
          ??head,?tail?:=?d.unpack(ptrs)
          ??if?tail?==?head?{
          ???//?Queue?is?empty.
          ???return?nil,?false
          ??}
          ??ptrs2?:=?d.pack(head,?tail+1)
          ??if?atomic.CompareAndSwapUint64(&d.headTail,?ptrs,?ptrs2)?{
          ???slot?=?&d.vals[tail&uint32(len(d.vals)-1)]
          ???break
          ??}
          ?}
          ?val?:=?*(*interface{})(unsafe.Pointer(slot))
          ?if?val?==?dequeueNil(nil)?{
          ??val?=?nil
          ?}
          ?slot.val?=?nil
          ?atomic.StorePointer(&slot.typ,?nil)
          ?return?val,?true
          }

          popHead比較像,不同在于一個從頭部拿數(shù)據(jù)一個從尾部拿。首先依然是在死循環(huán)中先將headTail拆封,如果tai l==head表示環(huán)形數(shù)組為空,直接返回。否則將tail+1再封裝好,同CAS規(guī)避并發(fā)問題 拿到數(shù)據(jù)則跳出循環(huán),否則循環(huán)等待。

          這里有一個跟popHead不同的是 先將value置為nil然后利用CAS來將typ置空操作atomic.StorePointer(&slot.typ, nil),原因很簡單,pushHeadpopTail一個從頭放一個從尾拿數(shù)據(jù),一旦碰頭就會出現(xiàn)競爭。

          3.2 那如果偷都偷不到,會進(jìn)行以下操作

          size?=?atomic.LoadUintptr(&p.victimSize)
          ?if?uintptr(pid)?>=?size?{
          ??return?nil
          ?}
          ?locals?=?p.victim
          ?l?:=?indexLocal(locals,?pid)
          ?if?x?:=?l.private;?x?!=?nil?{
          ??l.private?=?nil
          ??return?x
          ?}
          ?for?i?:=?0;?i?int(size);?i++?{
          ??l?:=?indexLocal(locals,?(pid+i)%int(size))
          ??if?x,?_?:=?l.shared.popTail();?x?!=?nil?{
          ???return?x
          ??}
          ?}

          ?//?Mark?the?victim?cache?as?empty?for?future?gets?don't?bother
          ?//?with?it.
          ?atomic.StoreUintptr(&p.victimSize,?0)

          victim cache翻譯過來叫“受害者緩存”

          受害者緩存是由Norman Jouppi提出的一種提高緩存性能的硬件技術(shù)。如他的論文所述

          Miss caching places a fully-associative cache between cache and its re-fill path. Misses in the cache that hit in the miss cache have a one cycle penalty, as opposed to a many cycle miss penalty without the miss cache. Victim Caching is an improvement to miss caching that loads the small fully-associative cache with victim of a miss and not the requested cache line.

          大概意思就是在舊緩存和緩解重建的過程中,添加一個全關(guān)聯(lián)的緩存(保存舊緩存數(shù)據(jù))。也就是說當(dāng)一級緩存踢出的數(shù)據(jù),放到受害者緩存中。當(dāng)我們在一級緩存未命中,則可以繼續(xù)嘗試從受害者緩存中查詢。

          如代碼:

          size?=?atomic.LoadUintptr(&p.victimSize)
          ?if?uintptr(pid)?>=?size?{
          ??return?nil
          ?}
          ?locals?=?p.victim
          ?l?:=?indexLocal(locals,?pid)
          ?if?x?:=?l.private;?x?!=?nil?{
          ??l.private?=?nil
          ??return?x
          ?}
          ?for?i?:=?0;?i?int(size);?i++?{
          ??l?:=?indexLocal(locals,?(pid+i)%int(size))
          ??if?x,?_?:=?l.shared.popTail();?x?!=?nil?{
          ???return?x
          ??}
          ?}

          ?//?Mark?the?victim?cache?as?empty?for?future?gets?don't?bother
          ?//?with?it.
          ?atomic.StoreUintptr(&p.victimSize,?0)

          如果能理解,其實(shí)還是挺簡單的,也就是

          local1 ->GC ->local2 ? ? victim->local1

          Local2 ->GC ->local3 ? ?victim->local2

          1. 很遺憾getSlow也沒拿到 那只好自己手動new一個了
          if?x?==?nil?&&?p.New?!=?nil?{
          ??x?=?p.New()
          ?}

          用完返回Pool p.Put

          看完 Get,接著看下Put

          func?(p?*Pool)?Put(x?interface{})?{
          ?if?x?==?nil?{
          ??return
          ?}
          ??//?將goroutine與P綁定?runtime_procPin禁用搶占?返回poolLocal
          ?l,?_?:=?p.pin()
          ?if?l.private?==?nil?{//優(yōu)先放到私有空間
          ??l.private?=?x
          ??x?=?nil
          ?}
          ?if?x?!=?nil?{?//放回共享空間
          ??l.shared.pushHead(x)
          ?}
          ??//?解除搶占禁用
          ?runtime_procUnpin()
          }

          基本邏輯:

          1. 如果放入對象為空 直接返回
          2. 調(diào)用p.pin獲取poolLocal之前分析過大體類似
          3. 優(yōu)先放入私有空間
          4. 若私有空間已滿 則嘗試放入共享空間
          5. 釋放P禁止占用
          func?(c?*poolChain)?pushHead(val?interface{})?{
          ?d?:=?c.head
          ?if?d?==?nil?{
          ??//?Initialize?the?chain.
          ??const?initSize?=?8?//?Must?be?a?power?of?2
          ??d?=?new(poolChainElt)
          ??d.vals?=?make([]eface,?initSize)
          ??c.head?=?d
          ??storePoolChainElt(&c.tail,?d)
          ?}
          ?if?d.pushHead(val)?{
          ??return
          ?}

          ?newSize?:=?len(d.vals)?*?2
          ?if?newSize?>=?dequeueLimit?{
          ??//?Can't?make?it?any?bigger.
          ??newSize?=?dequeueLimit
          ?}

          ?d2?:=?&poolChainElt{prev:?d}
          ?d2.vals?=?make([]eface,?newSize)
          ?c.head?=?d2
          ?storePoolChainElt(&d.next,?d2)
          ?d2.pushHead(val)
          }

          putHead邏輯主要是將對象放到雙向鏈表的對應(yīng)節(jié)點(diǎn)的環(huán)形數(shù)組中。

          1. 先獲取雙向鏈表的head節(jié)點(diǎn)
          2. 若head節(jié)點(diǎn)為空 則初始化head節(jié)點(diǎn) 節(jié)點(diǎn)對應(yīng)環(huán)形數(shù)組初始大小為8
          3. 將對象放到環(huán)形數(shù)組中
          func?(d?*poolDequeue)?pushHead(val?interface{})?bool?{
          ?ptrs?:=?atomic.LoadUint64(&d.headTail)
          ?head,?tail?:=?d.unpack(ptrs)
          ?if?(tail+uint32(len(d.vals)))&(1<-1)?==?head?{
          ??//?Queue?is?full.
          ??return?false
          ?}
          ?slot?:=?&d.vals[head&uint32(len(d.vals)-1)]
          ?typ?:=?atomic.LoadPointer(&slot.typ)
          ?if?typ?!=?nil?{//?popTail可能還沒處理完
          ??return?false
          ?}

          ?//?The?head?slot?is?free,?so?we?own?it.
          ?if?val?==?nil?{
          ??val?=?dequeueNil(nil)
          ?}
          ?*(*interface{})(unsafe.Pointer(slot))?=?val
          ?atomic.AddUint64(&d.headTail,?1<?return?true
          }

          popHead是相反的操作,大體也比較簡單。先判斷環(huán)形數(shù)組是否滿了,滿了則直接返回。因?yàn)?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">pushHead跟popTail存在競爭關(guān)系,slot.typ不為空可能是popTail還沒處理完。

          關(guān)于GC清除數(shù)據(jù)問題

          pool.go中的init函數(shù)組冊了GC發(fā)生時如何清理Pool的函數(shù),調(diào)用鏈如下

          gcTrigger->gcStart()->clearpools()->poolCleanup()

          func?init()?{
          ?runtime_registerPoolCleanup(poolCleanup)
          }
          //go:linkname?sync_runtime_registerPoolCleanup?sync.runtime_registerPoolCleanup
          func?sync_runtime_registerPoolCleanup(f?func())?{
          ?poolcleanup?=?f
          }
          func?poolCleanup()?{
          ?for?_,?p?:=?range?oldPools?{
          ??p.victim?=?nil
          ??p.victimSize?=?0
          ?}

          ?for?_,?p?:=?range?allPools?{
          ??p.victim?=?p.local
          ??p.victimSize?=?p.localSize
          ??p.local?=?nil
          ??p.localSize?=?0
          ?}

          ?oldPools,?allPools?=?allPools,?nil
          }

          邏輯很簡單 正如上面講victim說的那樣。

          最后的最后,細(xì)心的你可能發(fā)現(xiàn) 還遺漏了兩個細(xì)節(jié)

          noCopy

          sync.Pool結(jié)構(gòu)體中noCopy其實(shí)是為了防止sync.Pool使用過程中被拷貝。至于原因應(yīng)該不用多說,因?yàn)?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">Go并沒有提供原生的強(qiáng)制不能拷貝的方法。所以采用這種方式,讓go vet檢測報錯來實(shí)現(xiàn)。

          舉個例子

          type?noCopy?struct{}

          //?Lock?is?a?no-op?used?by?-copylocks?checker?from?`go?vet`.
          func?(*noCopy)?Lock()???{}
          func?(*noCopy)?Unlock()?{}
          type?People?struct?{
          ?noCopy?noCopy
          }

          func?say(p?People)?{

          }

          func?main()?{
          ?var?p?People
          ?say(p)
          }
          go?vet?demo.go

          輸出:

          #?command-line-arguments
          ./demo.go:12:12:?say?passes?lock?by?value:?command-line-arguments.People?contains?command-line-arguments.noCopy
          ./demo.go:18:6:?call?of?say?copies?lock?value:?command-line-arguments.People?contains?command-line-arguments.noCopy

          當(dāng)然直接執(zhí)行不會報任何錯

          pad

          type?poolLocal?struct?{
          ?poolLocalInternal

          ?//?Prevents?false?sharing?on?widespread?platforms?with
          ?//?128?mod?(cache?line?size)?=?0?.
          ?pad?[128?-?unsafe.Sizeof(poolLocalInternal{})%128]byte
          }

          pad字段在這里沒有啥業(yè)務(wù)意思,目的就是為了避免偽共享問題。因?yàn)槲覀優(yōu)榱司徑庥嬎銠C(jī)CPU計算速度和內(nèi)存的讀取速度不匹配的矛盾,在他們之間增加了L1 L2 L3 高速緩存,他們比內(nèi)存小很多但是速度卻是內(nèi)存無法比擬的。

          圖片來自網(wǎng)絡(luò)

          緩存系統(tǒng)中我們是以緩存行(cache line)為單位,通常大小為64字節(jié)。上面這張圖,我們可以看到L1、L2、L3三級緩存他們和內(nèi)存的讀取速度當(dāng)然取決于他們與CPU緊密程度。L1>L2>L3>內(nèi)存

          但是!我們現(xiàn)在使用的都是多核CPU的計算機(jī),如何保證多核看到的數(shù)據(jù)的一致性呢?這里我們需要談到一個協(xié)議-MESI協(xié)議,M、E、S、I分別表示緩存行的4個狀態(tài)

          M(修改,Modified):本地處理器已經(jīng)修改緩存行,即是臟行,它的內(nèi)容與內(nèi)存中的內(nèi)容不一樣,并且此 cache 只有本地一個拷貝(專有);

          E(專有,Exclusive):緩存行內(nèi)容和內(nèi)存中的一樣,而且其它處理器都沒有這行數(shù)據(jù);

          S(共享,Shared):緩存行內(nèi)容和內(nèi)存中的一樣, 有可能其它處理器也存在此緩存行的拷貝;

          I(無效,Invalid):緩存行失效, 不能使用。

          他們轉(zhuǎn)換關(guān)系如下:

          圖片來自網(wǎng)絡(luò)

          現(xiàn)在假設(shè)我們有以下場景

          圖片來自網(wǎng)絡(luò)

          有兩個變量X、Y共享在了一個cache line中。如果core1想要更新X,core2想要更新Y,更新完他們的緩存行都變成了I狀態(tài),即L1 L2上的緩存均不可用,這時如果其他線程再要訪問X Y就只能從L3甚至從內(nèi)存拿數(shù)據(jù),其性能可想而知。

          怎么解決呢?

          解決偽共享的問題 業(yè)界大多采用pad填充的方式來解決,讓數(shù)據(jù)獨(dú)占一個cacheline 降低數(shù)據(jù)關(guān)聯(lián)共享的影響。比如Java8還提供了語法糖,通過添加注解@Contended自動進(jìn)行緩存行填充。

          總結(jié)

          sync.Pool實(shí)現(xiàn)總體比較小巧,具體思想其實(shí)其他語言也都有影子,比如Java中的ForkJoinPool。但是往往簡單設(shè)計的細(xì)節(jié)往往很值得我們?nèi)タ季繉W(xué)習(xí)一下的。總結(jié)下知識點(diǎn)還真不少:

          • work stealing算法
          • CAS如何做到lock-free
          • 設(shè)置搶占標(biāo)志 禁止P被占用 并制止GC
          • Victim cache 受害者緩存是怎么回事兒
          • noCopy是干啥的 怎么實(shí)現(xiàn)禁止拷貝
          • 偽共享(false share)
          • Pool GC的機(jī)制

          不過這也符合Go“少即是多”的設(shè)計理念。



          推薦閱讀


          福利

          我為大家整理了一份從入門到進(jìn)階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門看什么,進(jìn)階看什么。關(guān)注公眾號 「polarisxu」,回復(fù)?ebook?獲取;還可以回復(fù)「進(jìn)群」,和數(shù)萬 Gopher 交流學(xué)習(xí)。

          瀏覽 30
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  在线观看黄色视频网站 | 17c.白丝喷水 自慰 | 一级爱爱| 色婷婷激情五月天 | 日韩爱爱爱 |