<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          errgroup:并發(fā)任務(wù) goroutine 的傳播控制

          共 11168字,需瀏覽 23分鐘

           ·

          2021-08-29 07:27

          Go 語言為我們提供了豐富的并發(fā)原語,且大多數(shù)都位于 sync 包下。但其實(shí) Go 官方在實(shí)驗(yàn)庫 golang.org/x/sync 下還有其他的原語補(bǔ)充,今天我們來探討一下該庫下的原語之一:errgroup。在學(xué)習(xí) errgroup 之前,我們先回顧一下 WaitGroup 存在什么不足。

          初識 errgroup

          sync.WaitGroup  源碼解析一文中,我們知道 WaitGroup 主要用于控制任務(wù)組下的并發(fā)子任務(wù)。它的具體做法就是,子任務(wù) goroutine 執(zhí)行前通過 Add 方法添加任務(wù)數(shù)目,子任務(wù) goroutine 結(jié)束時(shí)調(diào)用 Done 標(biāo)記已完成任務(wù)數(shù),主任務(wù) goroutine 通過 Wait 方法等待所有的任務(wù)完成后才能執(zhí)行后續(xù)邏輯。

           1package main
          2
          3import (
          4    "net/http"
          5    "sync"
          6)
          7
          8func main() {
          9    var wg sync.WaitGroup
          10    var urls = []string{
          11        "http://www.golang.org/",
          12        "http://www.baidu.com/",
          13        "http://www.noexist11111111.com/",
          14    }
          15    for _, url := range urls {
          16        wg.Add(1)
          17        go func(url string) {
          18            defer wg.Done()
          19            resp, err := http.Get(url)
          20            if err != nil {
          21                return
          22            }
          23            resp.Body.Close()
          24        }(url)
          25    }
          26    wg.Wait()
          27}

          在以上示例代碼中,我們通過三個(gè) goroutine 去并發(fā)的請求 url,直到所有的子任務(wù) goroutine 均完成訪問,主任務(wù) goroutine 下的 wg.Wait 才會(huì)停止阻塞。

          但在實(shí)際的項(xiàng)目代碼中,子任務(wù) goroutine 的執(zhí)行并不總是順風(fēng)順?biāo)鼈円苍S會(huì)產(chǎn)生 error。而 WaitGroup 并沒有告訴我們在子 goroutine 發(fā)生錯(cuò)誤時(shí),如何將其拋給主任務(wù) groutine。

          這個(gè)時(shí)候可以考慮使用 errgroup

           1package main
          2
          3import (
          4    "fmt"
          5    "net/http"
          6
          7    "golang.org/x/sync/errgroup"
          8)
          9
          10func main() {
          11    var urls = []string{
          12        "http://www.golang.org/",
          13        "http://www.baidu.com/",
          14        "http://www.noexist11111111.com/",
          15    }
          16    g := new(errgroup.Group)
          17    for _, url := range urls {
          18        url := url
          19        g.Go(func() error {
          20            resp, err := http.Get(url)
          21            if err != nil {
          22                fmt.Println(err)
          23                return err
          24            }
          25            fmt.Printf("get [%s] success: [%d] \n", url, resp.StatusCode)
          26            return resp.Body.Close()
          27        })
          28    }
          29    if err := g.Wait(); err != nil {
          30        fmt.Println(err)
          31    } else {
          32        fmt.Println("All success!")
          33    }
          34}

          結(jié)果如下

          1get [http://www.baidu.com/] success: [200] 
          2Get "http://www.noexist11111111.com/": dial tcp 120.240.95.35:80: i/o timeout
          3Get "http://www.golang.org/": dial tcp 172.217.160.81:80: i/o timeout
          4Get "http://www.noexist11111111.com/": dial tcp 120.240.95.35:80: i/o timeout

          可以看到,執(zhí)行獲取 www.golang.org 和 www.noexist11111111.com 兩個(gè) url 的子 groutine 均發(fā)生了錯(cuò)誤,在主任務(wù) goroutine 中成功捕獲到了第一個(gè)錯(cuò)誤信息。

          除了 擁有 WaitGroup 的控制能力錯(cuò)誤傳播 的功能之外,errgroup 還有最重要的 context 反向傳播機(jī)制,我們來看一下它的設(shè)計(jì)。

          errgroup 源碼解析

          errgroup 的設(shè)計(jì)非常精練,全部代碼如下

           1type Group struct {
          2    cancel func()
          3
          4    wg sync.WaitGroup
          5
          6    errOnce sync.Once
          7    err     error
          8}
          9
          10func WithContext(ctx context.Context) (*Group, context.Context)
           {
          11    ctx, cancel := context.WithCancel(ctx)
          12    return &Group{cancel: cancel}, ctx
          13}
          14
          15func (g *Group) Wait() error {
          16    g.wg.Wait()
          17    if g.cancel != nil {
          18        g.cancel()
          19    }
          20    return g.err
          21}
          22
          23func (g *Group) Go(f func() error) {
          24    g.wg.Add(1)
          25
          26    go func() {
          27        defer g.wg.Done()
          28
          29        if err := f(); err != nil {
          30            g.errOnce.Do(func() {
          31                g.err = err
          32                if g.cancel != nil {
          33                    g.cancel()
          34                }
          35            })
          36        }
          37    }()
          38}

          可以看到,errgroup 的實(shí)現(xiàn)依靠于結(jié)構(gòu)體 Group,它通過封裝 sync.WaitGroup,繼承了 WaitGroup 的特性,在 Go() 方法中新起一個(gè)子任務(wù) goroutine,并在 Wait() 方法中通過 sync.WaitGroup 的 Wait 進(jìn)行阻塞等待。

          同時(shí) Group 利用 sync.Once 保證了它有且僅會(huì)保留第一個(gè)子 goroutine 錯(cuò)誤

          最后,Group 通過嵌入 context.WithCancel 方法產(chǎn)生的 cancel 函數(shù)(對于 Context 不熟悉的讀者,推薦閱讀 理解Context機(jī)制 一文),能夠在子 goroutine 發(fā)生錯(cuò)誤時(shí),及時(shí)通過調(diào)用 cancle 函數(shù),將 Context 的取消信號及時(shí)傳播出去。當(dāng)然,這一特性需要用戶代碼的配合。

          errgroup 上下文取消

          在 errgroup 的文檔(https://pkg.go.dev/golang.org/x/[email protected]/errgroup#example-Group-Pipeline)中,它基于 Go 官方文檔的 pipeline( https://blog.golang.org/pipelines) ,實(shí)現(xiàn)了一個(gè)任務(wù)組 goroutine 中上下文取消(Context cancelation)演示的示例。但該 Demo 的前提知識略多,本文這里基于其思想,提供一個(gè)易于理解的使用示例。

           1package main
          2
          3import (
          4    "context"
          5    "fmt"
          6
          7    "golang.org/x/sync/errgroup"
          8)
          9
          10func main() {
          11
          12    g, ctx := errgroup.WithContext(context.Background())
          13    dataChan := make(chan int20)
          14
          15    // 數(shù)據(jù)生產(chǎn)端任務(wù)子 goroutine
          16    g.Go(func() error {
          17        defer close(dataChan)
          18        for i := 1; ; i++ {
          19            if i == 10 {
          20                return fmt.Errorf("data 10 is wrong")
          21            }
          22            dataChan <- i
          23            fmt.Println(fmt.Sprintf("sending %d", i))
          24        }
          25    })
          26
          27    // 數(shù)據(jù)消費(fèi)端任務(wù)子 goroutine
          28    for i := 0; i < 3; i++ {
          29        g.Go(func() error {
          30            for j := 1; ; j++ {
          31                select {
          32                case <-ctx.Done():
          33                    return ctx.Err()
          34                case number := <-dataChan:
          35                    fmt.Println(fmt.Sprintf("receiving %d", number))
          36                }
          37            }
          38        })
          39    }
          40
          41    // 主任務(wù) goroutine 等待 pipeline 結(jié)束數(shù)據(jù)流
          42    err := g.Wait()
          43    if err != nil {
          44        fmt.Println(err)
          45    }
          46    fmt.Println("main goroutine done!")
          47}

          在以上示例中,我們模擬了一個(gè)數(shù)據(jù)傳送管道。在數(shù)據(jù)的生產(chǎn)與消費(fèi)任務(wù)集中,有四個(gè)子任務(wù) goroutine:一個(gè)生產(chǎn)數(shù)據(jù)的 goroutine,三個(gè)消費(fèi)數(shù)據(jù)的 goroutine。當(dāng)數(shù)據(jù)生產(chǎn)方存在錯(cuò)誤數(shù)據(jù)時(shí)(數(shù)據(jù)等于 10 ),我們停止數(shù)據(jù)的生產(chǎn)與消費(fèi),并將錯(cuò)誤拋出,回到 main goroutine 的執(zhí)行邏輯中。

          可以看到,因?yàn)?errgroup 中的 Context cancle 函數(shù)的嵌入,我們在子任務(wù) goroutine 中也能反向控制任務(wù)上下文。

          程序的某一次運(yùn)行,輸出結(jié)果如下

           1receiving 1
          2sending 1
          3sending 2
          4sending 3
          5sending 4
          6sending 5
          7sending 6
          8sending 7
          9sending 8
          10sending 9
          11receiving 4
          12receiving 5
          13receiving 6
          14receiving 2
          15receiving 7
          16receiving 3
          17receiving 8
          18receiving 9
          19data 10 is wrong
          20main goroutine done!

          總結(jié)

          errgroup 是 Go 官方的并發(fā)原語補(bǔ)充庫,相對于標(biāo)準(zhǔn)庫中提供的原語而言,顯得沒那么核心。這里總結(jié)一下 errgroup 的特性。

          • 繼承了 WaitGroup 的功能

          • 錯(cuò)誤傳播:能夠返回任務(wù)組中發(fā)生的第一個(gè)錯(cuò)誤,但有且僅能返回該錯(cuò)誤

          • context 信號傳播:如果子任務(wù) goroutine 中有循環(huán)邏輯,則可以添加 ctx.Done 邏輯,此時(shí)通過 context 的取消信號,提前結(jié)束子任務(wù)執(zhí)行。



          推薦閱讀


          福利

          我為大家整理了一份從入門到進(jìn)階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門看什么,進(jìn)階看什么。關(guān)注公眾號 「polarisxu」,回復(fù) ebook 獲取;還可以回復(fù)「進(jìn)群」,和數(shù)萬 Gopher 交流學(xué)習(xí)。

          瀏覽 61
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国际亚洲中文字幕最新网址 | 五月天AV电影在线 | 人人操人人骑 | 精品无码产一区二区 | 免费v在线观看 |