<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Go:說(shuō)說(shuō)fanIn和fanOut模式

          共 7931字,需瀏覽 16分鐘

           ·

          2021-09-13 20:16

          文章目錄

          • fanIn

            • 協(xié)程版

            • 遞歸版

            • 反射版

          • fanOut

            • 同步版

            • 協(xié)程異步版

            • 反射版


          今天回顧下常用的兩種channel應(yīng)用模式: fanInfanOut,

          分別對(duì)應(yīng)了,對(duì)一組相同類(lèi)型chan的合并和廣播。

          fanIn

          將全部輸入chan都聚合到一個(gè)out chan中,在全部聚合完成后,關(guān)閉out chan.

          協(xié)程版

          func fanIn(chans ...<-chan interface{}) <-chan interface{} {
           out := make(chan interface{})

           go func() {
            var wg sync.WaitGroup
            wg.Add(len(chans))
            for _, ch := range chans {
             go func(ch <-chan interface{}) {
              for v := range ch {
               out <- v
              }
              wg.Done()
             }(ch)
            }
            // 等待協(xié)程全部結(jié)束
            wg.Wait()
            close(out)
           }()
           return out
          }

          這里用waitGroup是防止關(guān)閉out時(shí)還有寫(xiě)入(out <- v),避免panic

          遞歸版

          2 分遞歸并合并。

          其中合并mergeTwo主要用了nil chan對(duì)讀寫(xiě)均阻塞。

          當(dāng)chan關(guān)閉時(shí),設(shè)置為nil,阻塞讀取。

          func fanInRecur(chans ...<-chan interface{}) <-chan interface{} {
           switch len(chans) {
           case 0:
            c := make(chan interface{})
            close(c)
            // 無(wú)可聚合chan,返回一個(gè)已關(guān)閉chan,可讀不可寫(xiě)
            return c
           case 1:
            return chans[0]
           case 2:
            return mergeTwo(chans[0], chans[1])
           default:
            // 一分為二,遞歸
            m := len(chans) / 2
            return mergeTwo(
             fanInRecur(chans[:m]...),
             fanInRecur(chans[m:]...))
           }
          }

          func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
           c := make(chan interface{})
           go func() {
            defer close(c)
            for a != nil || b != nil { // 只要還有可讀的chan
             select {
             case v, ok := <-a:
              if !ok { // a 已關(guān)閉,設(shè)置為nil
               a = nil
               continue
              }
              c <- v
             case v, ok := <-b:
              if !ok { // b 已關(guān)閉,設(shè)置為nil
               b = nil
               continue
              }
              c <- v
             }
            }
           }()
           return c
          }

          反射版

          利用reflect.SelectCase構(gòu)造批量可Select的發(fā)送chan

          func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
           out := make(chan interface{})
           go func() {
            defer close(out)
            // 構(gòu)造SelectCase slice
            var cases []reflect.SelectCase
            for _, c := range chans {
             cases = append(cases, reflect.SelectCase{
              Dir:  reflect.SelectRecv,
              Chan: reflect.ValueOf(c),
             })
            }

            // 循環(huán),從cases中選擇一個(gè)可用的
            for len(cases) > 0 {
             i, v, ok := reflect.Select(cases)
             if !ok {
              // 此channel已經(jīng)close, 從切片移除
              cases = append(cases[:i], cases[i+1:]...)
              continue
             }
             out <- v.Interface()
            }
           }()
           return out
          }

          附上壓測(cè)數(shù)據(jù)

          性能對(duì)比

          fanOut

          同步版

          最直觀的方式,直接向每一個(gè)chan都同步發(fā)送一遍 返回前關(guān)閉這組chan, 即不再寫(xiě)入

          func fanOut(ch <-chan interface{}, out []chan interface{}) {
           go func() {
            defer func() { // 退出時(shí)關(guān)閉所有的輸出chan
             for i := range out {
              close(out[i])
             }
            }()

            for v := range ch { // 從輸入chan中讀取數(shù)據(jù)
             v := v
             for i := range out {
              i := i
              out[i] <- v // 放入到輸出chan中,同步方式
             }
            }
           }()
          }

          協(xié)程異步版

          發(fā)送這里用起協(xié)程的方式,實(shí)現(xiàn)異步,發(fā)送操作耗時(shí)情況下無(wú)需阻塞等待

          可是有個(gè)問(wèn)題,不知道你看出來(lái)沒(méi)。

          func fanOut(ch <-chan interface{}, out []chan interface{}) {
           go func() {
            defer func() { // 退出時(shí)關(guān)閉所有的輸出chan
             for i := range out {
              close(out[i])
             }
            }()

            for v := range ch { // 從輸入chan中讀取數(shù)據(jù)
             v := v
             for i := range out {
              i := i
              // 協(xié)程異步
              go func(){}
                out[i] <- v
              }()
             }
            }
           }()
          }

          乍一看好像沒(méi)什么問(wèn)題, 但退出時(shí)關(guān)閉時(shí),很可能發(fā)送的協(xié)程寫(xiě)入還沒(méi)完成,

          畢竟這里out之前寫(xiě)入的要有人讀才能繼續(xù)寫(xiě)。

          這里加waitGroup可以等待全部發(fā)送完畢在關(guān)閉

          func fanOutAsync(ch <-chan interface{}, out []chan interface{}) {
           go func() {
            var wg sync.WaitGroup
            defer func() { // 退出時(shí)關(guān)閉所有的輸出chan
             wg.Wait()
             for i := range out {
              close(out[i])
             }
            }()

            for v := range ch { // 從輸入chan中讀取數(shù)據(jù)
             v := v
             for i := range out {
              i := i
              wg.Add(1)
              go func() { // 異步,避免一個(gè)out阻塞的時(shí)候影響其他out
               out[i] <- v
               wg.Done()
              }()
             }
            }
           }()
          }

          反射版

          構(gòu)造一票chan send case, 遍歷select,發(fā)送完成的將其置為nil阻塞,避免再次發(fā)送

          不得不說(shuō),nil chan出鏡率很高啊

          func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
           go func() {
            defer func() { // 退出時(shí)關(guān)閉所有的輸出chan
             for i := range out {
              close(out[i])
             }
            }()
            cases := make([]reflect.SelectCase, len(out))
            // 構(gòu)造SelectCase slice
            for i := range cases {
             cases[i].Dir = reflect.SelectSend
            }
            for v := range ch {
             v := v
             // 先完成send case構(gòu)造
             for i := range cases {
              cases[i].Chan = reflect.ValueOf(out[i])
              cases[i].Send = reflect.ValueOf(v)
             }
             // 遍歷select
             for range cases {
              chosen, _, _ := reflect.Select(cases)
              // 已發(fā)送過(guò),用nil阻塞,避免再次發(fā)送
              cases[chosen].Chan = reflect.ValueOf(nil)
             }
            }
           }()
          }

          附上壓測(cè)數(shù)據(jù)

          性能對(duì)比

          具體測(cè)試代碼詳見(jiàn):concurrency[1]

          參考資料

          [1]

          concurrency: https://github.com/NewbMiao/Dig101-Go/tree/master/concurrency/channel/schedule



          推薦閱讀


          福利

          我為大家整理了一份從入門(mén)到進(jìn)階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門(mén)看什么,進(jìn)階看什么。關(guān)注公眾號(hào) 「polarisxu」,回復(fù) ebook 獲??;還可以回復(fù)「進(jìn)群」,和數(shù)萬(wàn) Gopher 交流學(xué)習(xí)。


          瀏覽 67
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产做受91 一片二片老头 | 无码子一区二区 | 真人一级毛毛片 | 爱搞搞视频网站 | 青娱乐成人网 |