<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          【GoCN酷Go推薦】 errgroup 并發(fā)小工具

          共 9214字,需瀏覽 19分鐘

           ·

          2021-05-05 10:03

          使用場景:微服務(wù)中的并發(fā)請求

          并發(fā)編程是Golang語言的強(qiáng)大特性之一。在微服務(wù)架構(gòu)中,面對用戶的請求,我們常常需要向下游請求大量的數(shù)據(jù)繼而組裝成所需數(shù)據(jù),不同的數(shù)據(jù)很可能會由不同的服務(wù)提供,這里一一請求顯然是效率十分低效的,所以并發(fā)成為提高響應(yīng)效率的優(yōu)選方法。

          errgroup庫

          基礎(chǔ)版本安裝

          go get -u golang.org/x/sync/errgroup

          加強(qiáng)版本https://github.com/go-kratos/kratos/tree/v1.0.x/pkg/sync/errgroup

          演變歷程

          channel版本

              res_ch := make(chan interface{},3)
              go func() {
                  r := funA()
                  res_ch <- r
              }()
              go func() {
                  r := funB()
                  res_ch <- r
              }()
              go func() {
                  r := funC()
                  res_ch <- r
              }()
              res := make([]interface{},0,3)
              for i := 0; i < 3; i++ {
                  data := <- res_ch
                  res = append(res,data)
              }

          此版本運(yùn)用了官方推薦的用于goroutine通信的channel結(jié)構(gòu)。預(yù)計(jì)完整接收goroutine的結(jié)果。

          問題1:goroutine數(shù)量控制較為繁瑣

          問題2:若goroutine內(nèi)部發(fā)生錯誤,會導(dǎo)致接收程序阻塞,無法正常退出

          基本版本errgroup

          源碼

              //源代碼結(jié)構(gòu)
              type Group struct {
               cancel func()

               wg sync.WaitGroup

               errOnce sync.Once
               err     error
              }

              func WithContext(ctx context.Context) (*Group, context.Context) {
                  ctx, cancel := context.WithCancel(ctx)
                  return &Group{cancel: cancel}, ctx
              }

              func (g *Group) Wait() error {
                  g.wg.Wait()
                  if g.cancel != nil {
                      g.cancel()
                  }
                  return g.err
              }

              func (g *Group) Go(f func() error) {
                  g.wg.Add(1)

                  go func() {
                      defer g.wg.Done()

                      if err := f(); err != nil {
                          g.errOnce.Do(func() {
                              g.err = err
                              if g.cancel != nil {
                                  g.cancel()
                              }
                          })
                      }
                  }()
              }

          閱讀源碼我們可以得知,Group結(jié)構(gòu)中使用sync.WaitGroup來控制goroutine的并發(fā),成員變量err來記錄運(yùn)行中發(fā)生的錯誤,這里只記錄第一次返回的錯誤值。

          使用

          group,ctx := errgroup.WithContent(context.Background())
          urls :=[]string{
              ...
          }
          for _,v := range urls {
              group.Go(func()error{
                  resp,err := http.Get(v)
                  if err != nil {
                      resp.Body.Close()
                  }
                  ...
                  return err
              })
          }
          if err := g.Wait();err != nil {
              fmt.Println(err)
          }

          一些說明

          • Wait函數(shù)在所有g(shù)oroutine運(yùn)行結(jié)束才會返回,返回值記錄了第一個發(fā)生的錯誤。
          • WithContext函數(shù)的第二返回值為ctx,Group會在goroutine發(fā)生錯誤時(shí)調(diào)用與ctx對應(yīng)的cancel函數(shù),所以ctx不適合作為其他調(diào)用的參數(shù)。

          加強(qiáng)版本

          下面是kratos的errgroup加強(qiáng)版,其針對幾個問題作出的改進(jìn)。

          //基礎(chǔ)版本
          type Group struct {
           cancel func()

              wg sync.WaitGroup

              errOnce sync.Once
              err     error
          }    

          //kratos 版本
          type Group struct {
              err     error
              wg      sync.WaitGroup
              errOnce sync.Once

              workerOnce sync.Once
              ch         chan func(ctx context.Context) error
              chs        []func(ctx context.Context) error

              ctx    context.Context
              cancel func()
          }

          我們先從結(jié)構(gòu)體定義的角度來看待加強(qiáng)點(diǎn)。

          • ch、chs、workerOnce用于控制goroutine的并發(fā)數(shù)量,在基礎(chǔ)版的代碼中我們發(fā)現(xiàn)在使用Go(function()error)函數(shù)的調(diào)用過程中是全開放的,即對于同時(shí)進(jìn)行的goroutine數(shù)量并沒有做限制。kratos在基礎(chǔ)版本的基礎(chǔ)上添加了一個chan控制并發(fā)數(shù)量,一個slice來緩存為并發(fā)的函數(shù)指針。
          • kratos將產(chǎn)生的context對象緩存,并且更改了方法Go的函數(shù)簽名加入了context參數(shù),即func (g *Group) Go(f func(ctx context.Context) error)。在基礎(chǔ)版本中,當(dāng)error發(fā)生的是時(shí)候函數(shù),仍然需要等到所有g(shù)oroutine運(yùn)行結(jié)束才會返回,kratos的Group可以使用成員函數(shù)ctx作為參數(shù),從而控制全部并發(fā)的生命周期

          控制并發(fā)數(shù)量源碼分析

          func (g *Group) Go(f func(ctx context.Context) error) {
           g.wg.Add(1)
           if g.ch != nil {
            select {
            case g.ch <- f:
            default:
             g.chs = append(g.chs, f)
            }
            return
           }
           go g.do(f)
          }

          func (g *Group) GOMAXPROCS(n int) {
           if n <= 0 {
            panic("errgroup: GOMAXPROCS must great than 0")
           }
           g.workerOnce.Do(func() {
            g.ch = make(chan func(context.Context) error, n)
            for i := 0; i < n; i++ {
             go func() {
              for f := range g.ch {
               g.do(f)
              }
             }()
            }
           })
          }

          func (g *Group) Wait() error {
           if g.ch != nil {
            for _, f := range g.chs {
             g.ch <- f
            }
           }
           g.wg.Wait()
           if g.ch != nil {
            close(g.ch) // let all receiver exit
           }
           if g.cancel != nil {
            g.cancel()
           }
           return g.err
          }

          從Go函數(shù)中我們看到,當(dāng)g.ch != nil時(shí),f函數(shù)首先嘗試進(jìn)入g.ch中,當(dāng)g.ch滿的時(shí)候存入g.chs中,這就是上面提到的,利用chan控制并發(fā)數(shù)量,利用slice作為函數(shù)指針的緩存。

          GOMAXPROCE 函數(shù)初始化g.ch用于開啟并發(fā)數(shù)量控制的開關(guān)。并且啟動n個goroutine來消費(fèi)傳入的函數(shù)。

          Wait函數(shù)中會不斷將緩存中的函數(shù)不斷壓入chan中進(jìn)行消費(fèi)。

          使用案例

          func sleep1s(context.Context) error {
           time.Sleep(time.Second)
           return nil
          }   

          {
              ...
              g := Group{}
              g.GOMAXPROCS(2)//開啟并發(fā)控制
              g.Go(sleep1s)
              g.Go(sleep1s)
              g.Go(sleep1s)
              g.Go(sleep1s)
              g.Wait()
              ....
          }


          總結(jié)


          errgroup 在sync.WaitGroup的功能之上添加了錯誤傳遞,以及在發(fā)生不可恢復(fù)的錯誤時(shí)取消整個goroutine集合的功能(返回值cancel)。

          kratos的加強(qiáng)版errgroup從統(tǒng)一goroutine控制,defer錯誤捕獲,并發(fā)數(shù)量控制等方面對errgroup進(jìn)行了功能擴(kuò)充,利用匿名函數(shù)的參數(shù)context.Context的參數(shù)傳遞從整體上控制goroutine的生命周期。

          參考資料

          https://github.com/golang/sync/blob/master/errgroup/errgroup.go

          https://github.com/go-kratos/kratos/tree/v1.0.x/pkg/sync/errgrou


          還想了解更多嗎?

              更多請查看:https://github.com/golang/sync/blob/master/errgroup/errgroup.go    歡迎加入我們GOLANG中國社區(qū):https://gocn.vip/


          《酷Go推薦》招募:


          各位Gopher同學(xué),最近我們社區(qū)打算推出一個類似GoCN每日新聞的新欄目《酷Go推薦》,主要是每周推薦一個庫或者好的項(xiàng)目,然后寫一點(diǎn)這個庫使用方法或者優(yōu)點(diǎn)之類的,這樣可以真正的幫助到大家能夠?qū)W習(xí)到新的庫,并且知道怎么用。


          大概規(guī)則和每日新聞類似,如果報(bào)名人多的話每個人一個月輪到一次,歡迎大家報(bào)名!(報(bào)名地址:https://wj.qq.com/s2/7734329/3f51)





          ??  各位Gopher們,注意啦!

          別忘了還有 Gopher China2021 大會

          還沒報(bào)名的童鞋們趕快抓住最后的機(jī)會!!!


          點(diǎn)擊這里閱讀原文,即刻報(bào)名~
          瀏覽 33
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  淫色成人网站 | 深夜精品福利 | 大鸡巴操小逼视频 | 亚洲夜福利 | 国产精品视频400部 |