<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Go:如何優(yōu)雅地實現(xiàn)并發(fā)編排任務

          共 17690字,需瀏覽 36分鐘

           ·

          2021-05-15 13:32

          業(yè)務場景


          在做任務開發(fā)的時候,你們一定會碰到以下場景:


          場景1:調用第三方接口的時候, 一個需求你需要調用不同的接口,做數(shù)據(jù)組裝。
          場景2:一個應用首頁可能依托于很多服務。那就涉及到在加載頁面時需要同時請求多個服務的接口。這一步往往是由后端統(tǒng)一調用組裝數(shù)據(jù)再返回給前端,也就是所謂的 BFF(Backend For Frontend) 層。

          針對以上兩種場景,假設在沒有強依賴關系下,選擇串行調用,那么總耗時即:


          time=s1+s2+....sn


          按照當代秒入百萬的有為青年,這么長時間早就把你祖宗十八代問候了一遍。

          為了偉大的KPI,我們往往會選擇并發(fā)地調用這些依賴接口。那么總耗時就是:


          time=max(s1,s2,s3.....,sn)


          當然開始堆業(yè)務的時候可以先串行化,等到上面的人著急的時候,亮出絕招。

          這樣,年底 PPT 就可以加上濃重的一筆流水賬:為業(yè)務某個接口提高百分之XXX性能,間接產(chǎn)生XXX價值

          當然這一切的前提是,做老板不懂技術,做技術”懂”你。


          言歸正傳,如果修改成并發(fā)調用,你可能會這么寫

          package main

          import (
              "fmt"
              "sync"
              "time"
          )

          func main() {
              var wg sync.WaitGroup
              wg.Add(2)

              var userInfo *User
              var productList []Product

              go func() {
                  defer wg.Done()
                  userInfo, _ = getUser()
              }()

              go func() {
                  defer wg.Done()
                  productList, _ = getProductList()
              }()
              wg.Wait()
              fmt.Printf("用戶信息:%+v\n", userInfo)
              fmt.Printf("商品信息:%+v\n", productList)
          }


          /********用戶服務**********/

          type User struct {
              Name string
              Age uint8
          }

          func getUser() (*User, error) {
              time.Sleep(500 * time.Millisecond)
              var u User
              u.Name = "wuqinqiang"
              u.Age = 18
              return &u, nil
          }

          /********商品服務**********/

          type Product struct {
              Title string
              Price uint32
          }

          func getProductList() ([]Product, error) {
              time.Sleep(400 * time.Millisecond)
              var list []Product
              list = append(list, Product{
                  Title: "SHib",
                  Price: 10,
              })
              return list, nil
          }


          從實現(xiàn)上來說,需要多少服務,會開多少個 G利用 sync.WaitGroup 的特性

          實現(xiàn)并發(fā)編排任務的效果。


          好像,問題不大。


          但是隨著代號 996 業(yè)務場景的增加,你會發(fā)現(xiàn),好多模塊都有相似的功能,只是對應的業(yè)務場景不同而已。


          那么我們能不能抽像出一套針對此業(yè)務場景的工具,而把具體業(yè)務實現(xiàn)交給業(yè)務方。


          使用


          本著不重復造輪子的原則,去搜了下開源項目,最終看上了 go-zero 里面的一個工具 mapreduce

          可以自行 Google 這個名詞

          使用很簡單。我們通過它改造一下上面的代碼:

          package main

          import (
              "fmt"
              "github.com/tal-tech/go-zero/core/mr"
              "time"
          )

          func main() {
              var userInfo *User
              var productList []Product
              _ = mr.Finish(func() (err error) {
                  userInfo, err = getUser()
                  return err
              }, func() (err error) {
                  productList, err = getProductList()
                  return err
              })
              fmt.Printf("用戶信息:%+v\n", userInfo)
              fmt.Printf("商品信息:%+v\n", productList)
          }
          //打印
          用戶信息:&{Name:wuqinqiang Age:18}
          商品信息:[{Title:SHib Price:10}]


          是不是舒服多了。

          但是這里還需要注意一點,假設你調用的其中一個服務錯誤,并且你 return err 對應的錯誤,那么其他調用的服務會被取消

          比如我們修改 getProductList 直接響應錯誤

          func getProductList() ([]Product, error) {
              return nil, errors.New("test error")
          }
          //打印
          // 用戶信息:<nil>
          // 商品信息:[]


          那么最終打印的時候連用戶信息都會為空,因為出現(xiàn)一個服務錯誤,用戶服務請求被取消了。

          一般情況下,在請求服務錯誤的時候我們會有保底操作,一個服務錯誤不能影響其他請求的結果。
          所以在使用的時候具體處理取決于業(yè)務場景


          源碼


          既然用了,那么就追下源碼吧

          func Finish(fns ...func() error) error {
              if len(fns) == 0 {
                  return nil
              }

              return MapReduceVoid(func(source chan<- interface{}) {
                  for _, fn := range fns {
                      source <- fn
                  }
              }, func(item interface{}, writer Writer, cancel func(error)) {
                  fn := item.(func() error)
                  if err := fn(); err != nil
           {
                      cancel(err)
                  }
              }, func(pipe <-chan interface{}, cancel func(error)) {
                  drain(pipe)
              }, WithWorkers(len(fns)))
          }


          func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
              _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
                  reducer(input, cancel)
                  drain(input)
                  // We need to write a placeholder to let MapReduce to continue on reducer done,
                  // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
                  writer.Write(lang.Placeholder)
              }, opts...)
              return err
          }


          對于 MapReduceVoid函數(shù),主要查看三個閉包參數(shù)。

          • 第一個 GenerateFunc 用于生產(chǎn)數(shù)據(jù)

          • MapperFunc 讀取生產(chǎn)出的數(shù)據(jù),進行處理

          • VoidReducerFunc 這里表示不對 mapper 后的數(shù)據(jù)做聚合返回。所以這個閉包在此操作幾乎0作用。

          func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) {
              source := buildSource(generate)
              return MapReduceWithSource(source, mapper, reducer, opts...)
          }

          func buildSource(generate GenerateFunc) chan interface{} {
              source := make(chan interface{})// 創(chuàng)建無緩沖通道
              threading.GoSafe(func() {
                  defer close(source)
                  generate(source) //開始生產(chǎn)數(shù)據(jù)
              })

              return source //返回無緩沖通道
          }


          buildSource函數(shù)中,返回一個無緩沖的通道。并開啟一個 G 運行 generate(source)往無緩沖通道塞數(shù)據(jù)。這個generate(source) 不就是一開始 Finish 傳遞的第一個閉包參數(shù)。

          return MapReduceVoid(func(source chan<- interface{}) {
              // 就這個
                  for _, fn := range fns {
                      source <- fn
                  }
              })


          然后查看 MapReduceWithSource 函數(shù),

          func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
              opts ...Option)
           (interface{}, error)
           {
              options := buildOptions(opts...)
              //任務執(zhí)行結束通知信號
              output := make(chan interface{})
              //將mapper處理完的數(shù)據(jù)寫入collector
              collector := make(chan interface{}, options.workers)
              // 取消操作信號
              done := syncx.NewDoneChan()
              writer := newGuardedWriter(output, done.Done())
              var closeOnce sync.Once
              var retErr errorx.AtomicError
              finish := func() {
                  closeOnce.Do(func() {
                      done.Close()
                      close(output)
                  })
              }
              cancel := once(func(err error) {
                  if err != nil {
                      retErr.Set(err)
                  } else {
                      retErr.Set(ErrCancelWithNil)
                  }

                  drain(source)
                  finish()
              })

              go func() {
                  defer func() {
                      if r := recover(); r != nil {
                          cancel(fmt.Errorf("%v", r))
                      } else {
                          finish()
                      }
                  }()
                  reducer(collector, writer, cancel)
                  drain(collector)
              }()
              // 真正從生成器通道取數(shù)據(jù)執(zhí)行Mapper
              go executeMappers(func(item interface{}, w Writer) {
                  mapper(item, w, cancel)
              }, source, collector, done.Done(), options.workers)

              value, ok := <-output
              if err := retErr.Load(); err != nil {
                  return nil, err
              } else if ok {
                  return value, nil
              } else {
                  return nil, ErrReduceNoOutput
              }
          }


          這段代碼挺長的,我們說下核心的點。這里使用一個G 調用 executeMappers 方法

          go executeMappers(func(item interface{}, w Writer) {
                  mapper(item, w, cancel)
              }, source, collector, done.Done(), options.workers)


          func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{},
              done <-chan lang.PlaceholderType, workers int)
           {
              var wg sync.WaitGroup
              defer func() {
                  // 等待所有任務全部執(zhí)行完畢
                  wg.Wait()
                  // 關閉通道
                  close(collector)
              }()
             //根據(jù)指定數(shù)量創(chuàng)建 worker池
              pool := make(chan lang.PlaceholderType, workers)
              writer := newGuardedWriter(collector, done)
              for {
                  select {
                  case <-done:
                      return
                  case pool <- lang.Placeholder:
                      // 從buildSource() 返回的無緩沖通道取數(shù)據(jù)
                      item, ok := <-input
                      // 當通道關閉,結束
                      if !ok {
                          <-pool
                          return
                      }

                      wg.Add(1)
                      // better to safely run caller defined method
                      threading.GoSafe(func() {
                          defer func() {
                              wg.Done()
                              <-pool
                          }()
                          //真正運行閉包函數(shù)的地方
                         // func(item interface{}, w Writer) {
                         // mapper(item, w, cancel)
                         // }
                          mapper(item, writer)
                      })
                  }
              }
          }


          具體的邏輯已備注,代碼很容易懂。

          一旦 executeMappers 函數(shù)返回,關閉 collector 通道,那么執(zhí)行 reducer 不再阻塞

          go func() {
                  defer func() {
                      if r := recover(); r != nil {
                          cancel(fmt.Errorf("%v", r))
                      } else {
                          finish()
                      }
                  }()
                  reducer(collector, writer, cancel)
                  //這里
                  drain(collector)
              }()


          這里的 reducer(collector, writer, cancel) 其實就是從 MapReduceVoid 傳遞的第三個閉包函數(shù)

          func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
              _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
                  reducer(input, cancel)
                  //這里
                  drain(input)
                  // We need to write a placeholder to let MapReduce to continue on reducer done,
                  // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
                  writer.Write(lang.Placeholder)
              }, opts...)
              return err
          }


          然后這個閉包函數(shù)又執(zhí)行了 reducer(input, cancel)這里的 reducer 就是我們一開始解釋過的 VoidReducerFunc Finish() 而來

          等等,看到上面三個地方的 drain(input)了嗎?

          // drain drains the channel.
          func drain(channel <-chan interface{}) {
              // drain the channel
              for range channel {
              }
          }


          其實就是一個排空 channel 的操作,但是三個地方都對同一個 channel做同樣的操作,也是讓我費解

          還有更重要的一點。

          go func() {
                  defer func() {
                      if r := recover(); r != nil {
                          cancel(fmt.Errorf("%v", r))
                      } else {
                          finish()
                      }
                  }()
                  reducer(collector, writer, cancel)
                  drain(collector)
              }()


          上面的代碼,假如執(zhí)行 reducerwriter 寫入引發(fā) panic,那么drain(collector) 將沒有機會執(zhí)行

          不過作者已經(jīng)修復了這個問題,直接把 drain(collector) 放入到 defer

          具體 issues[1]

          到這里,關于 Finish 的源碼也就結束了。感興趣的可以看看其他源碼

          很喜歡 go-zero 里的一些工具,但是工具往往并不獨立,依賴于其他文件包,導致明明只想使用其中一個工具卻需要安裝整個包
          所以最終的結果就是扒源碼,創(chuàng)建無依賴庫工具集,遵循 
          MIT 即可


          附錄

          [1]https://github.com/tal-tech/go-zero/issues/676



          推薦閱讀


          福利

          我為大家整理了一份從入門到進階的Go學習資料禮包,包含學習建議:入門看什么,進階看什么。關注公眾號 「polarisxu」,回復 ebook 獲取;還可以回復「進群」,和數(shù)萬 Gopher 交流學習。

          瀏覽 54
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产精品久久久久久久久久中字幕 | 久久精品无码播放 | 亚洲黄色在线观看 | 色情综合网 | 久草青娱乐 |