Go:說(shuō)說(shuō)fanIn和fanOut模式
文章目錄
fanIn
協(xié)程版
遞歸版
反射版
fanOut
同步版
協(xié)程異步版
反射版
今天回顧下常用的兩種channel應(yīng)用模式: fanIn和fanOut,
分別對(duì)應(yīng)了,對(duì)一組相同類(lèi)型chan的合并和廣播。
fanIn
將全部輸入chan都聚合到一個(gè)out chan中,在全部聚合完成后,關(guān)閉out chan.
協(xié)程版
func fanIn(chans ...<-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
var wg sync.WaitGroup
wg.Add(len(chans))
for _, ch := range chans {
go func(ch <-chan interface{}) {
for v := range ch {
out <- v
}
wg.Done()
}(ch)
}
// 等待協(xié)程全部結(jié)束
wg.Wait()
close(out)
}()
return out
}
這里用waitGroup是防止關(guān)閉out時(shí)還有寫(xiě)入(out <- v),避免panic
遞歸版
2 分遞歸并合并。
其中合并mergeTwo主要用了nil chan對(duì)讀寫(xiě)均阻塞。
當(dāng)chan關(guān)閉時(shí),設(shè)置為nil,阻塞讀取。
func fanInRecur(chans ...<-chan interface{}) <-chan interface{} {
switch len(chans) {
case 0:
c := make(chan interface{})
close(c)
// 無(wú)可聚合chan,返回一個(gè)已關(guān)閉chan,可讀不可寫(xiě)
return c
case 1:
return chans[0]
case 2:
return mergeTwo(chans[0], chans[1])
default:
// 一分為二,遞歸
m := len(chans) / 2
return mergeTwo(
fanInRecur(chans[:m]...),
fanInRecur(chans[m:]...))
}
}
func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
for a != nil || b != nil { // 只要還有可讀的chan
select {
case v, ok := <-a:
if !ok { // a 已關(guān)閉,設(shè)置為nil
a = nil
continue
}
c <- v
case v, ok := <-b:
if !ok { // b 已關(guān)閉,設(shè)置為nil
b = nil
continue
}
c <- v
}
}
}()
return c
}
反射版
利用reflect.SelectCase構(gòu)造批量可Select的發(fā)送chan
func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
// 構(gòu)造SelectCase slice
var cases []reflect.SelectCase
for _, c := range chans {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(c),
})
}
// 循環(huán),從cases中選擇一個(gè)可用的
for len(cases) > 0 {
i, v, ok := reflect.Select(cases)
if !ok {
// 此channel已經(jīng)close, 從切片移除
cases = append(cases[:i], cases[i+1:]...)
continue
}
out <- v.Interface()
}
}()
return out
}
附上壓測(cè)數(shù)據(jù)

fanOut
同步版
最直觀的方式,直接向每一個(gè)chan都同步發(fā)送一遍
返回前關(guān)閉這組chan, 即不再寫(xiě)入
func fanOut(ch <-chan interface{}, out []chan interface{}) {
go func() {
defer func() { // 退出時(shí)關(guān)閉所有的輸出chan
for i := range out {
close(out[i])
}
}()
for v := range ch { // 從輸入chan中讀取數(shù)據(jù)
v := v
for i := range out {
i := i
out[i] <- v // 放入到輸出chan中,同步方式
}
}
}()
}
協(xié)程異步版
發(fā)送這里用起協(xié)程的方式,實(shí)現(xiàn)異步,發(fā)送操作耗時(shí)情況下無(wú)需阻塞等待
可是有個(gè)問(wèn)題,不知道你看出來(lái)沒(méi)。
func fanOut(ch <-chan interface{}, out []chan interface{}) {
go func() {
defer func() { // 退出時(shí)關(guān)閉所有的輸出chan
for i := range out {
close(out[i])
}
}()
for v := range ch { // 從輸入chan中讀取數(shù)據(jù)
v := v
for i := range out {
i := i
// 協(xié)程異步
go func(){}
out[i] <- v
}()
}
}
}()
}
乍一看好像沒(méi)什么問(wèn)題, 但退出時(shí)關(guān)閉時(shí),很可能發(fā)送的協(xié)程寫(xiě)入還沒(méi)完成,
畢竟這里out之前寫(xiě)入的要有人讀才能繼續(xù)寫(xiě)。
這里加waitGroup可以等待全部發(fā)送完畢在關(guān)閉
func fanOutAsync(ch <-chan interface{}, out []chan interface{}) {
go func() {
var wg sync.WaitGroup
defer func() { // 退出時(shí)關(guān)閉所有的輸出chan
wg.Wait()
for i := range out {
close(out[i])
}
}()
for v := range ch { // 從輸入chan中讀取數(shù)據(jù)
v := v
for i := range out {
i := i
wg.Add(1)
go func() { // 異步,避免一個(gè)out阻塞的時(shí)候影響其他out
out[i] <- v
wg.Done()
}()
}
}
}()
}
反射版
構(gòu)造一票chan send case, 遍歷select,發(fā)送完成的將其置為nil阻塞,避免再次發(fā)送
不得不說(shuō),nil chan出鏡率很高啊
func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
go func() {
defer func() { // 退出時(shí)關(guān)閉所有的輸出chan
for i := range out {
close(out[i])
}
}()
cases := make([]reflect.SelectCase, len(out))
// 構(gòu)造SelectCase slice
for i := range cases {
cases[i].Dir = reflect.SelectSend
}
for v := range ch {
v := v
// 先完成send case構(gòu)造
for i := range cases {
cases[i].Chan = reflect.ValueOf(out[i])
cases[i].Send = reflect.ValueOf(v)
}
// 遍歷select
for range cases {
chosen, _, _ := reflect.Select(cases)
// 已發(fā)送過(guò),用nil阻塞,避免再次發(fā)送
cases[chosen].Chan = reflect.ValueOf(nil)
}
}
}()
}
附上壓測(cè)數(shù)據(jù)

具體測(cè)試代碼詳見(jiàn):concurrency[1]
參考資料
concurrency: https://github.com/NewbMiao/Dig101-Go/tree/master/concurrency/channel/schedule
推薦閱讀
