Go:最簡(jiǎn)單的服務(wù)響應(yīng)時(shí)長(zhǎng)優(yōu)化方法,沒(méi)有之一
序言 - From 萬(wàn)俊峰Kevin
我們能把服務(wù)做到平均延遲基本在30ms左右,其中非常大的一個(gè)前提是我們大量使用了 MapReduce 技術(shù),讓我們的服務(wù)即使調(diào)用很多個(gè)服務(wù),很多時(shí)候也只取決于最慢的那一個(gè)請(qǐng)求的時(shí)長(zhǎng)。

對(duì)你現(xiàn)有的服務(wù),不需要優(yōu)化 DB 操作,不需要優(yōu)化緩存,不需要重寫(xiě)業(yè)務(wù)邏輯,只需要通過(guò) MapReduce 把正交(不相關(guān))的請(qǐng)求并行化,你就可以大幅降低服務(wù)響應(yīng)時(shí)長(zhǎng)。
本文歐陽(yáng)安就給大家仔細(xì)分析一下 MapReduce 的實(shí)現(xiàn)細(xì)節(jié)。
為什么需要 MapReduce
在實(shí)際的業(yè)務(wù)場(chǎng)景中我們常常需要從不同的 rpc 服務(wù)中獲取相應(yīng)屬性來(lái)組裝成復(fù)雜對(duì)象。
比如要查詢(xún)商品詳情:
商品服務(wù)-查詢(xún)商品屬性 庫(kù)存服務(wù)-查詢(xún)庫(kù)存屬性 價(jià)格服務(wù)-查詢(xún)價(jià)格屬性 營(yíng)銷(xiāo)服務(wù)-查詢(xún)營(yíng)銷(xiāo)屬性
如果是串行調(diào)用的話(huà)響應(yīng)時(shí)間會(huì)隨著 rpc 調(diào)用次數(shù)呈線(xiàn)性增長(zhǎng),所以我們要優(yōu)化性能一般會(huì)將串行改并行。
簡(jiǎn)單的場(chǎng)景下使用 waitGroup 也能夠滿(mǎn)足需求,但是如果我們需要對(duì) rpc 調(diào)用返回的數(shù)據(jù)進(jìn)行校驗(yàn)、數(shù)據(jù)加工轉(zhuǎn)換、數(shù)據(jù)匯總呢?繼續(xù)使用 waitGroup 就有點(diǎn)力不從心了,go 的官方庫(kù)中并沒(méi)有這種工具(java 中提供了 CompleteFuture),go-zero 作者依據(jù) mapReduce 架構(gòu)思想實(shí)現(xiàn)了進(jìn)程內(nèi)的數(shù)據(jù)批處理 mapReduce 并發(fā)工具類(lèi)。
設(shè)計(jì)思路
我們嘗試把自己代入到作者的角色梳理一下并發(fā)工具可能的業(yè)務(wù)場(chǎng)景:
查詢(xún)商品詳情:支持并發(fā)調(diào)用多個(gè)服務(wù)來(lái)組合產(chǎn)品屬性,支持調(diào)用錯(cuò)誤可以立即結(jié)束。 商品詳情頁(yè)自動(dòng)推薦用戶(hù)卡券:支持并發(fā)校驗(yàn)卡券,校驗(yàn)失敗自動(dòng)剔除,返回全部卡券。
以上實(shí)際都是在進(jìn)行對(duì)輸入數(shù)據(jù)進(jìn)行處理最后輸出清洗后的數(shù)據(jù),針對(duì)數(shù)據(jù)處理有個(gè)非常經(jīng)典的異步模式:生產(chǎn)者消費(fèi)者模式。于是我們可以抽象一下數(shù)據(jù)批處理的生命周期,大致可以分為三個(gè)階段:

數(shù)據(jù)生產(chǎn) generate 數(shù)據(jù)加工 mapper 數(shù)據(jù)聚合 reducer
其中數(shù)據(jù)生產(chǎn)是不可或缺的階段,數(shù)據(jù)加工、數(shù)據(jù)聚合是可選階段,數(shù)據(jù)生產(chǎn)與加工支持并發(fā)調(diào)用,數(shù)據(jù)聚合基本屬于純內(nèi)存操作單協(xié)程即可。
再來(lái)思考一下不同階段之間數(shù)據(jù)應(yīng)該如何流轉(zhuǎn),既然不同階段的數(shù)據(jù)處理都是由不同 goroutine 執(zhí)行的,那么很自然的可以考慮采用 channel 來(lái)實(shí)現(xiàn) goroutine 之間的通信。

如何實(shí)現(xiàn)隨時(shí)終止流程呢?
很簡(jiǎn)單,goroutine 中監(jiān)聽(tīng)一個(gè)全局的結(jié)束 channel 就行。
go-zero 代碼實(shí)現(xiàn)
core/mr/mapreduce.go
詳細(xì)源碼可查看 https://github.com/Ouyangan/go-zero-annotation/blob/24a5753f19a6a18fc05615cb019ad809aab54232/core/mr/mapreduce.go
前置知識(shí) - channel ?基本用法
因?yàn)?MapReduce 源碼中大量使用 channel 進(jìn)行通信,大概提一下 channel 基本用法:
channel 寫(xiě)結(jié)束后記得關(guān)閉
ch?:=?make(chan?interface{})
//?寫(xiě)入完畢需要主動(dòng)關(guān)閉channel
defer?func()?{
????close(ch)
}()
go?func()?{
????//?v,ok模式?讀取channel
????for?{
????????v,?ok?:=?<-ch
????????if?!ok?{
????????????return
????????}
????????t.Log(v)
????}
????//?for?range模式讀取channel,channel關(guān)閉循環(huán)自動(dòng)退出
????for?i?:=?range?ch?{
????????t.Log(i)
????}
????//?清空channel,channel關(guān)閉循環(huán)自動(dòng)退出
????for?range?ch?{
????}
}()
for?i?:=?0;?i?10;?i++?{
????ch?<-?i
????time.Sleep(time.Second)
}
已關(guān)閉的 channel 依然支持讀取 限定 channel 讀寫(xiě)權(quán)限
//?只讀channel
func?readChan(rch?<-chan?interface{})?{
????for?i?:=?range?rch?{
????????log.Println(i)
????}
}
//?只寫(xiě)channel
func?writeChan(wch?chan<-?interface{})?{
????wch?<-?1
}
接口定義
先來(lái)看最核心的三個(gè)函數(shù)定義:
數(shù)據(jù)生產(chǎn) 數(shù)據(jù)加工 數(shù)據(jù)聚合
//?數(shù)據(jù)生產(chǎn)func
//?source?-?數(shù)據(jù)被生產(chǎn)后寫(xiě)入source
GenerateFunc?func(source?chan<-?interface{})
//?數(shù)據(jù)加工func
//?item?-?生產(chǎn)出來(lái)的數(shù)據(jù)
//?writer?-?調(diào)用writer.Write()可以將加工后的向后傳遞至reducer
//?cancel?-?終止流程func
MapperFunc?func(item?interface{},?writer?Writer,?cancel?func(error))
//?數(shù)據(jù)聚合func
//?pipe?-?加工出來(lái)的數(shù)據(jù)
//?writer?-?調(diào)用writer.Write()可以將聚合后的數(shù)據(jù)返回給用戶(hù)
//?cancel?-?終止流程func
ReducerFunc?func(pipe?<-chan?interface{},?writer?Writer,?cancel?func(error))
面向用戶(hù)的方法定義
使用方法可以查看官方文檔,這里不做贅述
面向用戶(hù)的方法比較多,方法主要分為兩大類(lèi):
無(wú)返回 執(zhí)行過(guò)程發(fā)生錯(cuò)誤立即終止 執(zhí)行過(guò)程不關(guān)注錯(cuò)誤 有返回值 手動(dòng)寫(xiě)入 source,手動(dòng)讀取聚合數(shù)據(jù) channel 手動(dòng)寫(xiě)入 source,自動(dòng)讀取聚合數(shù)據(jù) channel 外部傳入 source,自動(dòng)讀取聚合數(shù)據(jù) channel
//?并發(fā)執(zhí)行func,發(fā)生任何錯(cuò)誤將會(huì)立即終止流程
func?Finish(fns?...func()?error)?error
//?并發(fā)執(zhí)行func,即使發(fā)生錯(cuò)誤也不會(huì)終止流程
func?FinishVoid(fns?...func())
//?需要用戶(hù)手動(dòng)將生產(chǎn)數(shù)據(jù)寫(xiě)入?source,加工數(shù)據(jù)后返回一個(gè)channel供讀取
// opts -?可選參數(shù),目前包含:數(shù)據(jù)加工階段協(xié)程數(shù)量
func?Map(generate?GenerateFunc,?mapper?MapFunc,?opts?...Option)
//?無(wú)返回值,不關(guān)注錯(cuò)誤
func?MapVoid(generate?GenerateFunc,?mapper?VoidMapFunc,?opts?...Option)
//?無(wú)返回值,關(guān)注錯(cuò)誤
func?MapReduceVoid(generate?GenerateFunc,?mapper?MapperFunc,?reducer?VoidReducerFunc,?opts?...Option)
//?需要用戶(hù)手動(dòng)將生產(chǎn)數(shù)據(jù)寫(xiě)入?source?,并返回聚合后的數(shù)據(jù)
//?generate?生產(chǎn)
//?mapper?加工
//?reducer?聚合
// opts -?可選參數(shù),目前包含:數(shù)據(jù)加工階段協(xié)程數(shù)量
func?MapReduce(generate?GenerateFunc,?mapper?MapperFunc,?reducer?ReducerFunc,?opts?...Option)?(interface{},?error)
//?支持傳入數(shù)據(jù)源channel,并返回聚合后的數(shù)據(jù)
//?source?-?數(shù)據(jù)源channel
//?mapper?-?讀取source內(nèi)容并處理
//?reducer?-?數(shù)據(jù)處理完畢發(fā)送至reducer聚合
func?MapReduceWithSource(source?<-chan?interface{},?mapper?MapperFunc,?reducer?ReducerFunc,
????opts?...Option)?(interface{},?error)
核心方法是 MapReduceWithSource 和 Map,其他方法都在內(nèi)部調(diào)用她倆。弄清楚了 MapReduceWithSource ?方法 Map 也不在話(huà)下。
MapReduceWithSource 源碼實(shí)現(xiàn)
一切都在這張圖里面了

//?支持傳入數(shù)據(jù)源channel,并返回聚合后的數(shù)據(jù)
//?source?-?數(shù)據(jù)源channel
//?mapper?-?讀取source內(nèi)容并處理
//?reducer?-?數(shù)據(jù)處理完畢發(fā)送至reducer聚合
func?MapReduceWithSource(source?<-chan?interface{},?mapper?MapperFunc,?reducer?ReducerFunc,
????opts?...Option)?(interface{},?error)?{
????//?可選參數(shù)設(shè)置
????options?:=?buildOptions(opts...)
????//?聚合數(shù)據(jù)channel,需要手動(dòng)調(diào)用write方法寫(xiě)入到output中
????output?:=?make(chan?interface{})
????//?output最后只會(huì)被讀取一次
????defer?func()?{
????????//?如果有多次寫(xiě)入的話(huà)則會(huì)造成阻塞從而導(dǎo)致協(xié)程泄漏
????????//?這里用?for?range檢測(cè)是否可以讀出數(shù)據(jù),讀出數(shù)據(jù)說(shuō)明多次寫(xiě)入了
????????//?為什么這里使用panic呢?顯示的提醒用戶(hù)用法錯(cuò)了會(huì)比自動(dòng)修復(fù)掉好一些
????????for?range?output?{
????????????panic("more?than?one?element?written?in?reducer")
????????}
????}()
????//?創(chuàng)建有緩沖的chan,容量為workers
????//?意味著最多允許?workers?個(gè)協(xié)程同時(shí)處理數(shù)據(jù)
????collector?:=?make(chan?interface{},?options.workers)
????//?數(shù)據(jù)聚合任務(wù)完成標(biāo)志
????done?:=?syncx.NewDoneChan()
????//?支持阻塞寫(xiě)入chan的writer
????writer?:=?newGuardedWriter(output,?done.Done())
????//?單例關(guān)閉
????var?closeOnce?sync.Once
????var?retErr?errorx.AtomicError
????//?數(shù)據(jù)聚合任務(wù)已結(jié)束,發(fā)送完成標(biāo)志
????finish?:=?func()?{
????????//?只能關(guān)閉一次
????????closeOnce.Do(func()?{
????????????//?發(fā)送聚合任務(wù)完成信號(hào),close函數(shù)將會(huì)向chan寫(xiě)入一個(gè)零值
????????????done.Close()
????????????//?關(guān)閉數(shù)據(jù)聚合chan
????????????close(output)
????????})
????}
????//?取消操作
????cancel?:=?once(func(err?error)?{
????????//?設(shè)置error
????????if?err?!=?nil?{
????????????retErr.Set(err)
????????}?else?{
????????????retErr.Set(ErrCancelWithNil)
????????}
????????//?清空source?channel
????????drain(source)
????????//?調(diào)用完成方法
????????finish()
????})
????go?func()?{
????????defer?func()?{
????????????//?清空聚合任務(wù)channel
????????????drain(collector)
????????????//?捕獲panic
????????????if?r?:=?recover();?r?!=?nil?{
????????????????//?調(diào)用cancel方法,立即結(jié)束
????????????????cancel(fmt.Errorf("%v",?r))
????????????}?else?{
????????????????//?正常結(jié)束
????????????????finish()
????????????}
????????}()
????????//?執(zhí)行數(shù)據(jù)加工
????????//?注意writer.write將加工后數(shù)據(jù)寫(xiě)入了output
????????reducer(collector,?writer,?cancel)
????}()
????//?異步執(zhí)行數(shù)據(jù)加工
????//?source?-?數(shù)據(jù)生產(chǎn)
????//?collector?-?數(shù)據(jù)收集
????//?done?-?結(jié)束標(biāo)志
????//?workers?-?并發(fā)數(shù)
????go?executeMappers(func(item?interface{},?w?Writer)?{
????????mapper(item,?w,?cancel)
????},?source,?collector,?done.Done(),?options.workers)
????//?reducer將加工后的數(shù)據(jù)寫(xiě)入了output,
????//?需要數(shù)據(jù)返回時(shí)讀取output即可
????//?假如output被寫(xiě)入了超過(guò)兩次
????//?則開(kāi)始的defer?func那里將還可以讀到數(shù)據(jù)
????//?由此可以檢測(cè)到用戶(hù)調(diào)用了多次write方法
????value,?ok?:=?<-output
????if?err?:=?retErr.Load();?err?!=?nil?{
????????return?nil,?err
????}?else?if?ok?{
????????return?value,?nil
????}?else?{
????????return?nil,?ErrReduceNoOutput
????}
}
//?數(shù)據(jù)加工
func?executeMappers(mapper?MapFunc,?input?<-chan?interface{},?collector?chan<-?interface{},
????done?<-chan?lang.PlaceholderType,?workers?int)?{
????//?goroutine協(xié)調(diào)同步信號(hào)量
????var?wg?sync.WaitGroup
????defer?func()?{
????????//?等待數(shù)據(jù)加工任務(wù)完成
????????//?防止數(shù)據(jù)加工的協(xié)程還未處理完數(shù)據(jù)就直接退出了
????????wg.Wait()
????????//?關(guān)閉數(shù)據(jù)加工channel
????????close(collector)
????}()
????//?帶緩沖區(qū)的channel,緩沖區(qū)大小為workers
????//?控制數(shù)據(jù)加工的協(xié)程數(shù)量
????pool?:=?make(chan?lang.PlaceholderType,?workers)
????//?數(shù)據(jù)加工writer
????writer?:=?newGuardedWriter(collector,?done)
????for?{
????????select?{
????????//?監(jiān)聽(tīng)到外部結(jié)束信號(hào),直接結(jié)束
????????case?<-done:
????????????return
????????//?控制數(shù)據(jù)加工協(xié)程數(shù)量
????????//?緩沖區(qū)容量-1
????????//?無(wú)容量時(shí)將會(huì)被阻塞,等待釋放容量
????????case?pool?<-?lang.Placeholder:
????????????//?阻塞等待生產(chǎn)數(shù)據(jù)channel
????????????item,?ok?:=?<-input
????????????//?如果ok為false則說(shuō)明input已被關(guān)閉或者清空
????????????//?數(shù)據(jù)加工完成,執(zhí)行退出
????????????if?!ok?{
????????????????//?緩沖區(qū)容量+1
????????????????<-pool
????????????????//?結(jié)束本次循環(huán)
????????????????return
????????????}
????????????//?wg同步信號(hào)量+1
????????????wg.Add(1)
????????????//?better?to?safely?run?caller?defined?method
????????????//?異步執(zhí)行數(shù)據(jù)加工,防止panic錯(cuò)誤
????????????threading.GoSafe(func()?{
????????????????defer?func()?{
????????????????????//?wg同步信號(hào)量-1
????????????????????wg.Done()
????????????????????//?緩沖區(qū)容量+1
????????????????????<-pool
????????????????}()
????????????????mapper(item,?writer)
????????????})
????????}
????}
}
總結(jié)
mapReduce 的源碼我大概看了兩個(gè)晚上,整體看下來(lái)比較累。一方面是我自身 go 語(yǔ)言并不是很熟練尤其是 channel 的用法,導(dǎo)致我需要頻繁停下來(lái)查詢(xún)相關(guān)文檔理解作者的寫(xiě)法,另一方面是多個(gè) goroutine 之間通過(guò) channel 進(jìn)行通信實(shí)現(xiàn)協(xié)作真的蠻燒腦(佩服作者的思維能力)。
其次看源碼時(shí)第一遍看起來(lái)肯定會(huì)比較懵的,其實(shí)沒(méi)關(guān)系找到程序的入口(公共基礎(chǔ)組件一般是面向的方法)先沿著主線(xiàn)一路看下去把每一句代碼都看懂加上注釋?zhuān)倏粗Ь€(xiàn)代碼。
如果有實(shí)在看不懂的地方就查查這段代碼的提交記錄非常有可能是解決某個(gè)bug改動(dòng)的,比如下面這段代碼我死活看了好多遍都不理解。
//?聚合數(shù)據(jù)channel,需要手動(dòng)調(diào)用write方法寫(xiě)入到output中
output?:=?make(chan?interface{})
//?output最后只會(huì)被讀取一次
defer?func()?{
????//?如果有多次寫(xiě)入的話(huà)則會(huì)造成阻塞從而導(dǎo)致協(xié)程泄漏
????//?這里用?for?range檢測(cè)是否可以讀出數(shù)據(jù),讀出數(shù)據(jù)說(shuō)明多次寫(xiě)入了
????//?為什么這里使用panic呢?顯示的提醒用戶(hù)用法錯(cuò)了會(huì)比自動(dòng)修復(fù)掉好一些
????for?range?output?{
????????panic("more?than?one?element?written?in?reducer")
????}
}()
最后畫(huà)出流程圖基本就能把源碼看懂了,對(duì)于我而言這方法比較笨但有效。
資料
Go Channel 詳解: https://colobu.com/2016/04/14/Golang-Channels/
go-zero MapReduce文檔: https://go-zero.dev/cn/mapreduce.html
項(xiàng)目地址
https://github.com/zeromicro/go-zero
歡迎使用 go-zero 并 star 支持我們!
我是 polarisxu,北大碩士畢業(yè),曾在 360 等知名互聯(lián)網(wǎng)公司工作,10多年技術(shù)研發(fā)與架構(gòu)經(jīng)驗(yàn)!2012 年接觸 Go 語(yǔ)言并創(chuàng)建了 Go 語(yǔ)言中文網(wǎng)!著有《Go語(yǔ)言編程之旅》、開(kāi)源圖書(shū)《Go語(yǔ)言標(biāo)準(zhǔn)庫(kù)》等。
堅(jiān)持輸出技術(shù)(包括 Go、Rust 等技術(shù))、職場(chǎng)心得和創(chuàng)業(yè)感悟!歡迎關(guān)注「polarisxu」一起成長(zhǎng)!也歡迎加我微信好友交流:gopherstudio
