【GoCN酷Go推薦】 errgroup 并發(fā)小工具
使用場景:微服務(wù)中的并發(fā)請求
并發(fā)編程是Golang語言的強(qiáng)大特性之一。在微服務(wù)架構(gòu)中,面對用戶的請求,我們常常需要向下游請求大量的數(shù)據(jù)繼而組裝成所需數(shù)據(jù),不同的數(shù)據(jù)很可能會由不同的服務(wù)提供,這里一一請求顯然是效率十分低效的,所以并發(fā)成為提高響應(yīng)效率的優(yōu)選方法。
errgroup庫
基礎(chǔ)版本安裝
$ go get -u golang.org/x/sync/errgroup
加強(qiáng)版本https://github.com/go-kratos/kratos/tree/v1.0.x/pkg/sync/errgroup
channel版本
res_ch := make(chan interface{},3)
go func() {
r := funA()
res_ch <- r
}()
go func() {
r := funB()
res_ch <- r
}()
go func() {
r := funC()
res_ch <- r
}()
res := make([]interface{},0,3)
for i := 0; i < 3; i++ {
data := <- res_ch
res = append(res,data)
}
此版本運(yùn)用了官方推薦的用于goroutine通信的channel結(jié)構(gòu)。預(yù)計(jì)完整接收goroutine的結(jié)果。
問題1:goroutine數(shù)量控制較為繁瑣
問題2:若goroutine內(nèi)部發(fā)生錯誤,會導(dǎo)致接收程序阻塞,無法正常退出
基本版本errgroup
源碼
//源代碼結(jié)構(gòu)
type Group struct {
cancel func()
wg sync.WaitGroup
errOnce sync.Once
err error
}
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
func (g *Group) Go(f func() error) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}
閱讀源碼我們可以得知,Group結(jié)構(gòu)中使用sync.WaitGroup來控制goroutine的并發(fā),成員變量err來記錄運(yùn)行中發(fā)生的錯誤,這里只記錄第一次返回的錯誤值。
使用
group,ctx := errgroup.WithContent(context.Background())
urls :=[]string{
...
}
for _,v := range urls {
group.Go(func()error{
resp,err := http.Get(v)
if err != nil {
resp.Body.Close()
}
...
return err
})
}
if err := g.Wait();err != nil {
fmt.Println(err)
}
一些說明
Wait函數(shù)在所有g(shù)oroutine運(yùn)行結(jié)束才會返回,返回值記錄了第一個發(fā)生的錯誤。 WithContext函數(shù)的第二返回值為ctx,Group會在goroutine發(fā)生錯誤時(shí)調(diào)用與ctx對應(yīng)的cancel函數(shù),所以ctx不適合作為其他調(diào)用的參數(shù)。
加強(qiáng)版本
下面是kratos的errgroup加強(qiáng)版,其針對幾個問題作出的改進(jìn)。
//基礎(chǔ)版本
type Group struct {
cancel func()
wg sync.WaitGroup
errOnce sync.Once
err error
}
//kratos 版本
type Group struct {
err error
wg sync.WaitGroup
errOnce sync.Once
workerOnce sync.Once
ch chan func(ctx context.Context) error
chs []func(ctx context.Context) error
ctx context.Context
cancel func()
}
我們先從結(jié)構(gòu)體定義的角度來看待加強(qiáng)點(diǎn)。
ch、chs、workerOnce用于控制goroutine的并發(fā)數(shù)量,在基礎(chǔ)版的代碼中我們發(fā)現(xiàn)在使用Go(function()error)函數(shù)的調(diào)用過程中是全開放的,即對于同時(shí)進(jìn)行的goroutine數(shù)量并沒有做限制。kratos在基礎(chǔ)版本的基礎(chǔ)上添加了一個chan控制并發(fā)數(shù)量,一個slice來緩存為并發(fā)的函數(shù)指針。 kratos將產(chǎn)生的context對象緩存,并且更改了方法Go的函數(shù)簽名加入了context參數(shù),即func (g *Group) Go(f func(ctx context.Context) error)。在基礎(chǔ)版本中,當(dāng)error發(fā)生的是時(shí)候函數(shù),仍然需要等到所有g(shù)oroutine運(yùn)行結(jié)束才會返回,kratos的Group可以使用成員函數(shù)ctx作為參數(shù),從而控制全部并發(fā)的生命周期。
控制并發(fā)數(shù)量源碼分析
func (g *Group) Go(f func(ctx context.Context) error) {
g.wg.Add(1)
if g.ch != nil {
select {
case g.ch <- f:
default:
g.chs = append(g.chs, f)
}
return
}
go g.do(f)
}
func (g *Group) GOMAXPROCS(n int) {
if n <= 0 {
panic("errgroup: GOMAXPROCS must great than 0")
}
g.workerOnce.Do(func() {
g.ch = make(chan func(context.Context) error, n)
for i := 0; i < n; i++ {
go func() {
for f := range g.ch {
g.do(f)
}
}()
}
})
}
func (g *Group) Wait() error {
if g.ch != nil {
for _, f := range g.chs {
g.ch <- f
}
}
g.wg.Wait()
if g.ch != nil {
close(g.ch) // let all receiver exit
}
if g.cancel != nil {
g.cancel()
}
return g.err
}
從Go函數(shù)中我們看到,當(dāng)g.ch != nil時(shí),f函數(shù)首先嘗試進(jìn)入g.ch中,當(dāng)g.ch滿的時(shí)候存入g.chs中,這就是上面提到的,利用chan控制并發(fā)數(shù)量,利用slice作為函數(shù)指針的緩存。
GOMAXPROCE 函數(shù)初始化g.ch用于開啟并發(fā)數(shù)量控制的開關(guān)。并且啟動n個goroutine來消費(fèi)傳入的函數(shù)。
Wait函數(shù)中會不斷將緩存中的函數(shù)不斷壓入chan中進(jìn)行消費(fèi)。
使用案例
func sleep1s(context.Context) error {
time.Sleep(time.Second)
return nil
}
{
...
g := Group{}
g.GOMAXPROCS(2)//開啟并發(fā)控制
g.Go(sleep1s)
g.Go(sleep1s)
g.Go(sleep1s)
g.Go(sleep1s)
g.Wait()
....
}
總結(jié)
errgroup 在sync.WaitGroup的功能之上添加了錯誤傳遞,以及在發(fā)生不可恢復(fù)的錯誤時(shí)取消整個goroutine集合的功能(返回值cancel)。
kratos的加強(qiáng)版errgroup從統(tǒng)一goroutine控制,defer錯誤捕獲,并發(fā)數(shù)量控制等方面對errgroup進(jìn)行了功能擴(kuò)充,利用匿名函數(shù)的參數(shù)context.Context的參數(shù)傳遞從整體上控制goroutine的生命周期。
參考資料
https://github.com/golang/sync/blob/master/errgroup/errgroup.go
https://github.com/go-kratos/kratos/tree/v1.0.x/pkg/sync/errgrou
更多請查看:https://github.com/golang/sync/blob/master/errgroup/errgroup.go 歡迎加入我們GOLANG中國社區(qū):https://gocn.vip/
《酷Go推薦》招募:
各位Gopher同學(xué),最近我們社區(qū)打算推出一個類似GoCN每日新聞的新欄目《酷Go推薦》,主要是每周推薦一個庫或者好的項(xiàng)目,然后寫一點(diǎn)這個庫使用方法或者優(yōu)點(diǎn)之類的,這樣可以真正的幫助到大家能夠?qū)W習(xí)到新的庫,并且知道怎么用。
大概規(guī)則和每日新聞類似,如果報(bào)名人多的話每個人一個月輪到一次,歡迎大家報(bào)名!(報(bào)名地址:https://wj.qq.com/s2/7734329/3f51)

?? 各位Gopher們,注意啦!
別忘了還有 Gopher China2021 大會
還沒報(bào)名的童鞋們趕快抓住最后的機(jī)會!!!

