<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          sourcegraph 出品的并發(fā)庫(kù) conc 詳解

          共 6879字,需瀏覽 14分鐘

           ·

          2023-02-04 16:48

          上個(gè)月 sourcegraph 放出了 conc[1] 并發(fā)庫(kù),目標(biāo)是 better structured concurrency for go, 簡(jiǎn)單的評(píng)價(jià)一下

          每個(gè)公司都有類(lèi)似的輪子,與以往的庫(kù)比起來(lái),多了泛型,代碼寫(xiě)起來(lái)更優(yōu)雅,不需要 interface, 不需要運(yùn)行時(shí) assert, 性能肯定更好

          我們?cè)趯?xiě)通用庫(kù)和框架的時(shí)候,都有一個(gè)原則,并發(fā)控制與業(yè)務(wù)邏輯分離,背離這個(gè)原則肯定做不出通用庫(kù)

          整體介紹

          1. WaitGroup 與 Panic

          標(biāo)準(zhǔn)庫(kù)自帶 sync.WaitGroup 用于等待 goroutine 運(yùn)行結(jié)束,缺點(diǎn)是我們要處理控制部分

          代碼里大量的 wg.Addwg.Done 函數(shù),所以一般封裝成右側(cè)的庫(kù)

          type WaitGroup struct {
           wg sync.WaitGroup
           pc panics.Catcher
          }

          // Go spawns a new goroutine in the WaitGroup.
          func (h *WaitGroup) Go(f func()) {
           h.wg.Add(1)
           go func() {
            defer h.wg.Done()
            h.pc.Try(f)
           }()
          }

          但是如何處理 panic 呢?簡(jiǎn)單的可以在閉包 doSomething 運(yùn)行時(shí)增加一個(gè) safeGo 函數(shù),用于捕捉 recover

          原生 Go 要生成大量無(wú)用代碼,我司 repo 運(yùn)動(dòng)式的清理過(guò)一波,也遇到過(guò) goroutine 忘寫(xiě) recover 導(dǎo)致的事故。conc 同時(shí)提供 catcher 封裝 recover 邏輯,conc.WaitGroup 可以選擇 Wait 重新拋出 panic, 也可以 WaitAndRecover 返回捕獲到的 panic 堆棧信息

          func (h *WaitGroup) Wait() {
           h.wg.Wait()

           // Propagate a panic if we caught one from a child goroutine.
           h.pc.Repanic()
          }

          func (h *WaitGroup) WaitAndRecover() *panics.RecoveredPanic {
           h.wg.Wait()

           // Return a recovered panic if we caught one from a child goroutine.
           return h.pc.Recovered()
          }

          2. ForEach 與 Map

          高級(jí)語(yǔ)言很多的基操,在 go 里面很奢侈,只能寫(xiě)很多繁瑣代碼。conc封裝了泛型版本的 iterator 和 mapper

          func process(values []int) {
              iter.ForEach(values, handle)
          }

          func concMap(input []int, f func(int) int) []int {
              return iter.Map(input, f)
          }

          上面是使用例子,用戶(hù)只需要寫(xiě)業(yè)務(wù)函數(shù) handle. 相比 go1.19 前的版本,泛型的引入,使得基礎(chǔ)庫(kù)的編寫(xiě)更游刃有余

          // Iterator is also safe for reuse and concurrent use.
          type Iterator[T any] struct {
           // MaxGoroutines controls the maximum number of goroutines
           // to use on this Iterator's methods.
           //
           // If unset, MaxGoroutines defaults to runtime.GOMAXPROCS(0).
           MaxGoroutines int
          }

          MaxGoroutines 默認(rèn) GOMAXPROCS 并發(fā)處理傳參 slice, 也可以自定義,個(gè)人認(rèn)為不合理,默認(rèn)為 1 最妥

          // ForEachIdx is the same as ForEach except it also provides the
          // index of the element to the callback.
          func ForEachIdx[T any](input []T, f func(int, *T)) { Iterator[T]{}.ForEachIdx(input, f) }

          ForEachIdx 在創(chuàng)建 Iterator[T]{} 可以自定義并發(fā)度,最終調(diào)用 iter.ForEachIdx

          // ForEachIdx is the same as ForEach except it also provides the
          // index of the element to the callback.
          func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) {
            ......
           var idx atomic.Int64
           // Create the task outside the loop to avoid extra closure allocations.
           task := func() {
            i := int(idx.Add(1) - 1)
            for ; i < numInput; i = int(idx.Add(1) - 1) {
             f(i, &input[i])
            }
           }

           var wg conc.WaitGroup
           for i := 0; i < iter.MaxGoroutines; i++ {
            wg.Go(task)
           }
           wg.Wait()
          }

          ForEachIdx 泛型函數(shù)寫(xiě)得非常好,略去部分代碼。樸素的實(shí)現(xiàn)在 for 循環(huán)里創(chuàng)建閉包,傳入 idx 參數(shù),然后 wg.Go 去運(yùn)行。但是這樣會(huì)產(chǎn)生大量閉包,我司遇到過(guò)大量閉包,造成 heap 內(nèi)存增長(zhǎng)很快頻繁觸發(fā) GC 的性能問(wèn)題,所以在外層只創(chuàng)建一個(gè)閉包,通過(guò) atomic 控制 idx

          func Map[TR any](input []T, f func(*T) R) []R {
           return Mapper[T, R]{}.Map(input, f)
          }

          func MapErr[TR any](input []T, f func(*T) (R, error)([]R, error) {
           return Mapper[T, R]{}.MapErr(input, f)
          }

          MapMapErr 也只是對(duì) ForEachIdx 的封裝,區(qū)別是處理 error

          3. 各種 Pool 與 Stream

          Pool 用于并發(fā)處理,同時(shí) Wait 等待任務(wù)結(jié)束。相比我司現(xiàn)有 concurrency 庫(kù)

          • 增加了泛型實(shí)現(xiàn)
          • 增加了對(duì) goroutine 的復(fù)用
          • 增加并發(fā)度設(shè)置(我司有,但 conc 實(shí)現(xiàn)方式更巧秒)
          • 支持的函數(shù)簽名更多

          先看一下支持的接口

          Go(f func())
          Go(f func() error) 
          Go(f func(ctx context.Context) error)
          Go(f func(context.Context) (T, error))
          Go(f func() (T, error)) 
          Go(f func() T)
          Go(f func(context.Context) (T, error))

          理論上這一個(gè)足夠用了,傳參 Context, 返回泛型類(lèi)型與錯(cuò)誤。

          Wait() ([]T, error) 

          這是對(duì)應(yīng)的 Wait 回收函數(shù),返回泛型結(jié)果 []T 與錯(cuò)誤。具體 Pool 實(shí)現(xiàn)由多種組合而來(lái):Pool, ErrorPool, ContextPool, ResultContextPool, ResultPool

          func (p *Pool) Go(f func()) {
           p.init()

           if p.limiter == nil {
            // No limit on the number of goroutines.
            select {
            case p.tasks <- f:
             // A goroutine was available to handle the task.
            default:
             // No goroutine was available to handle the task.
             // Spawn a new one and send it the task.
             p.handle.Go(p.worker)
             p.tasks <- f
            }
           }
            ......
          }

          func (p *Pool) worker() {
           // The only time this matters is if the task panics.
           // This makes it possible to spin up new workers in that case.
           defer p.limiter.release()

           for f := range p.tasks {
            f()
           }
          }

          復(fù)用方式很巧妙,如果處理速度足夠快,沒(méi)必要過(guò)多創(chuàng)建 goroutine

          Stream 用于并發(fā)處理 goroutine, 但是返回結(jié)果保持順序

          type Stream struct {
           pool             pool.Pool
           callbackerHandle conc.WaitGroup
           queue            chan callbackCh

           initOnce sync.Once
          }

          實(shí)現(xiàn)很簡(jiǎn)單,queue 是一個(gè) channel, 類(lèi)型 callbackCh 同樣也是 channel, 在真正派生 goroutine 前按序順生成 callbackCh 傳遞結(jié)果

          Stream 命名很差,容易讓人混淆,感覺(jué)叫 OrderedResultsPool 更理想,整體非常雞肋

          超時(shí)

          超時(shí)永遠(yuǎn)是最難處理的問(wèn)題,目前 conc 庫(kù) Wait 函數(shù)并沒(méi)有提供 timeout 傳參,這就要求閉包內(nèi)部必須考濾超時(shí),如果添加 timeout 傳參,又涉及 conc 內(nèi)部庫(kù)并發(fā)問(wèn)題題

          Wait() ([]T, error)

          比如這個(gè)返回值,內(nèi)部 append 到 slice 時(shí)是有鎖的,如果 Wait 提前結(jié)束了會(huì)發(fā)生什么?

          []T 拿到的部分結(jié)果只能丟棄,返回給上層 timeout error

          Context 框架傳遞參數(shù)

          通用庫(kù)很容易做的臃腫,我司并發(fā)庫(kù)會(huì)給閉包產(chǎn)生新的 context, 并繼承所需框架層的 metadata, 兩種實(shí)現(xiàn)無(wú)可厚非,這些細(xì)節(jié)總得要處理

          小結(jié)

          代碼量不大,感興趣的可以看看。沒(méi)有造輪子的必要,夠用就行,這種庫(kù)寫(xiě)了也沒(méi)價(jià)值

          參考資料

          [1]

          conc: https://github.com/sourcegraph/conc,



          推薦閱讀


          福利

          我為大家整理了一份從入門(mén)到進(jìn)階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門(mén)看什么,進(jìn)階看什么。關(guān)注公眾號(hào) 「polarisxu」,回復(fù) ebook 獲??;還可以回復(fù)「進(jìn)群」,和數(shù)萬(wàn) Gopher 交流學(xué)習(xí)。

          瀏覽 15
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  粉嫩小泬BBBBBB免费 | 尻屄视频免费 | 狠狠干五月天 | 亚洲国产精品二二三三区 | 欧美亚洲国产a |