<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Go:最簡(jiǎn)單的服務(wù)響應(yīng)時(shí)長(zhǎng)優(yōu)化方法,沒(méi)有之一

          共 1503字,需瀏覽 4分鐘

           ·

          2022-03-01 15:28

          序言 - From 萬(wàn)俊峰Kevin

          我們能把服務(wù)做到平均延遲基本在30ms左右,其中非常大的一個(gè)前提是我們大量使用了 MapReduce 技術(shù),讓我們的服務(wù)即使調(diào)用很多個(gè)服務(wù),很多時(shí)候也只取決于最慢的那一個(gè)請(qǐng)求的時(shí)長(zhǎng)。

          對(duì)你現(xiàn)有的服務(wù),不需要優(yōu)化 DB 操作,不需要優(yōu)化緩存,不需要重寫(xiě)業(yè)務(wù)邏輯,只需要通過(guò) MapReduce 把正交(不相關(guān))的請(qǐng)求并行化,你就可以大幅降低服務(wù)響應(yīng)時(shí)長(zhǎng)。

          本文歐陽(yáng)安就給大家仔細(xì)分析一下 MapReduce 的實(shí)現(xiàn)細(xì)節(jié)。

          為什么需要 MapReduce

          在實(shí)際的業(yè)務(wù)場(chǎng)景中我們常常需要從不同的 rpc 服務(wù)中獲取相應(yīng)屬性來(lái)組裝成復(fù)雜對(duì)象。

          比如要查詢(xún)商品詳情:

          1. 商品服務(wù)-查詢(xún)商品屬性
          2. 庫(kù)存服務(wù)-查詢(xún)庫(kù)存屬性
          3. 價(jià)格服務(wù)-查詢(xún)價(jià)格屬性
          4. 營(yíng)銷(xiāo)服務(wù)-查詢(xún)營(yíng)銷(xiāo)屬性

          如果是串行調(diào)用的話(huà)響應(yīng)時(shí)間會(huì)隨著 rpc 調(diào)用次數(shù)呈線(xiàn)性增長(zhǎng),所以我們要優(yōu)化性能一般會(huì)將串行改并行。

          簡(jiǎn)單的場(chǎng)景下使用 waitGroup 也能夠滿(mǎn)足需求,但是如果我們需要對(duì) rpc 調(diào)用返回的數(shù)據(jù)進(jìn)行校驗(yàn)、數(shù)據(jù)加工轉(zhuǎn)換、數(shù)據(jù)匯總呢?繼續(xù)使用 waitGroup 就有點(diǎn)力不從心了,go 的官方庫(kù)中并沒(méi)有這種工具(java 中提供了 CompleteFuture),go-zero 作者依據(jù) mapReduce 架構(gòu)思想實(shí)現(xiàn)了進(jìn)程內(nèi)的數(shù)據(jù)批處理 mapReduce 并發(fā)工具類(lèi)。

          設(shè)計(jì)思路

          我們嘗試把自己代入到作者的角色梳理一下并發(fā)工具可能的業(yè)務(wù)場(chǎng)景:

          1. 查詢(xún)商品詳情:支持并發(fā)調(diào)用多個(gè)服務(wù)來(lái)組合產(chǎn)品屬性,支持調(diào)用錯(cuò)誤可以立即結(jié)束。
          2. 商品詳情頁(yè)自動(dòng)推薦用戶(hù)卡券:支持并發(fā)校驗(yàn)卡券,校驗(yàn)失敗自動(dòng)剔除,返回全部卡券。

          以上實(shí)際都是在進(jìn)行對(duì)輸入數(shù)據(jù)進(jìn)行處理最后輸出清洗后的數(shù)據(jù),針對(duì)數(shù)據(jù)處理有個(gè)非常經(jīng)典的異步模式:生產(chǎn)者消費(fèi)者模式。于是我們可以抽象一下數(shù)據(jù)批處理的生命周期,大致可以分為三個(gè)階段:

          1. 數(shù)據(jù)生產(chǎn) generate
          2. 數(shù)據(jù)加工 mapper
          3. 數(shù)據(jù)聚合 reducer

          其中數(shù)據(jù)生產(chǎn)是不可或缺的階段,數(shù)據(jù)加工、數(shù)據(jù)聚合是可選階段,數(shù)據(jù)生產(chǎn)與加工支持并發(fā)調(diào)用,數(shù)據(jù)聚合基本屬于純內(nèi)存操作單協(xié)程即可。

          再來(lái)思考一下不同階段之間數(shù)據(jù)應(yīng)該如何流轉(zhuǎn),既然不同階段的數(shù)據(jù)處理都是由不同 goroutine 執(zhí)行的,那么很自然的可以考慮采用 channel 來(lái)實(shí)現(xiàn) goroutine 之間的通信。

          如何實(shí)現(xiàn)隨時(shí)終止流程呢?

          很簡(jiǎn)單,goroutine 中監(jiān)聽(tīng)一個(gè)全局的結(jié)束 channel 就行。

          go-zero 代碼實(shí)現(xiàn)

          core/mr/mapreduce.go

          詳細(xì)源碼可查看 https://github.com/Ouyangan/go-zero-annotation/blob/24a5753f19a6a18fc05615cb019ad809aab54232/core/mr/mapreduce.go

          前置知識(shí) - channel ?基本用法

          因?yàn)?MapReduce 源碼中大量使用 channel 進(jìn)行通信,大概提一下 channel 基本用法:

          1. channel 寫(xiě)結(jié)束后記得關(guān)閉
          ch?:=?make(chan?interface{})
          //?寫(xiě)入完畢需要主動(dòng)關(guān)閉channel
          defer?func()?{
          ????close(ch)
          }()
          go?func()?{
          ????//?v,ok模式?讀取channel
          ????for?{
          ????????v,?ok?:=?<-ch
          ????????if?!ok?{
          ????????????return
          ????????}
          ????????t.Log(v)
          ????}

          ????//?for?range模式讀取channel,channel關(guān)閉循環(huán)自動(dòng)退出
          ????for?i?:=?range?ch?{
          ????????t.Log(i)
          ????}

          ????//?清空channel,channel關(guān)閉循環(huán)自動(dòng)退出
          ????for?range?ch?{
          ????}
          }()
          for?i?:=?0;?i?10;?i++?{
          ????ch?<-?i
          ????time.Sleep(time.Second)
          }
          1. 已關(guān)閉的 channel 依然支持讀取
          2. 限定 channel 讀寫(xiě)權(quán)限
          //?只讀channel
          func?readChan(rch?<-chan?interface{})?{
          ????for?i?:=?range?rch?{
          ????????log.Println(i)
          ????}
          }

          //?只寫(xiě)channel
          func?writeChan(wch?chan<-?interface{})?{
          ????wch?<-?1
          }

          接口定義

          先來(lái)看最核心的三個(gè)函數(shù)定義:

          1. 數(shù)據(jù)生產(chǎn)
          2. 數(shù)據(jù)加工
          3. 數(shù)據(jù)聚合
          //?數(shù)據(jù)生產(chǎn)func
          //?source?-?數(shù)據(jù)被生產(chǎn)后寫(xiě)入source
          GenerateFunc?func(source?chan<-?interface{})

          //?數(shù)據(jù)加工func
          //?item?-?生產(chǎn)出來(lái)的數(shù)據(jù)
          //?writer?-?調(diào)用writer.Write()可以將加工后的向后傳遞至reducer
          //?cancel?-?終止流程func
          MapperFunc?func(item?interface{},?writer?Writer,?cancel?func(error))

          //?數(shù)據(jù)聚合func
          //?pipe?-?加工出來(lái)的數(shù)據(jù)
          //?writer?-?調(diào)用writer.Write()可以將聚合后的數(shù)據(jù)返回給用戶(hù)
          //?cancel?-?終止流程func
          ReducerFunc?func(pipe?<-chan?interface{},?writer?Writer,?cancel?func(error))

          面向用戶(hù)的方法定義

          使用方法可以查看官方文檔,這里不做贅述

          面向用戶(hù)的方法比較多,方法主要分為兩大類(lèi):

          1. 無(wú)返回
            1. 執(zhí)行過(guò)程發(fā)生錯(cuò)誤立即終止
            2. 執(zhí)行過(guò)程不關(guān)注錯(cuò)誤
          2. 有返回值
            1. 手動(dòng)寫(xiě)入 source,手動(dòng)讀取聚合數(shù)據(jù) channel
            2. 手動(dòng)寫(xiě)入 source,自動(dòng)讀取聚合數(shù)據(jù) channel
            3. 外部傳入 source,自動(dòng)讀取聚合數(shù)據(jù) channel
          //?并發(fā)執(zhí)行func,發(fā)生任何錯(cuò)誤將會(huì)立即終止流程
          func?Finish(fns?...func()?error)?error

          //?并發(fā)執(zhí)行func,即使發(fā)生錯(cuò)誤也不會(huì)終止流程
          func?FinishVoid(fns?...func())

          //?需要用戶(hù)手動(dòng)將生產(chǎn)數(shù)據(jù)寫(xiě)入?source,加工數(shù)據(jù)后返回一個(gè)channel供讀取
          // opts -?可選參數(shù),目前包含:數(shù)據(jù)加工階段協(xié)程數(shù)量
          func?Map(generate?GenerateFunc,?mapper?MapFunc,?opts?...Option)

          //?無(wú)返回值,不關(guān)注錯(cuò)誤
          func?MapVoid(generate?GenerateFunc,?mapper?VoidMapFunc,?opts?...Option)

          //?無(wú)返回值,關(guān)注錯(cuò)誤
          func?MapReduceVoid(generate?GenerateFunc,?mapper?MapperFunc,?reducer?VoidReducerFunc,?opts?...Option)

          //?需要用戶(hù)手動(dòng)將生產(chǎn)數(shù)據(jù)寫(xiě)入?source?,并返回聚合后的數(shù)據(jù)
          //?generate?生產(chǎn)
          //?mapper?加工
          //?reducer?聚合
          // opts -?可選參數(shù),目前包含:數(shù)據(jù)加工階段協(xié)程數(shù)量
          func?MapReduce(generate?GenerateFunc,?mapper?MapperFunc,?reducer?ReducerFunc,?opts?...Option)?(interface{},?error)

          //?支持傳入數(shù)據(jù)源channel,并返回聚合后的數(shù)據(jù)
          //?source?-?數(shù)據(jù)源channel
          //?mapper?-?讀取source內(nèi)容并處理
          //?reducer?-?數(shù)據(jù)處理完畢發(fā)送至reducer聚合
          func?MapReduceWithSource(source?<-chan?interface{},?mapper?MapperFunc,?reducer?ReducerFunc,
          ????opts?...Option)
          ?(interface{},?error)

          核心方法是 MapReduceWithSource 和 Map,其他方法都在內(nèi)部調(diào)用她倆。弄清楚了 MapReduceWithSource ?方法 Map 也不在話(huà)下。

          MapReduceWithSource 源碼實(shí)現(xiàn)

          一切都在這張圖里面了

          //?支持傳入數(shù)據(jù)源channel,并返回聚合后的數(shù)據(jù)
          //?source?-?數(shù)據(jù)源channel
          //?mapper?-?讀取source內(nèi)容并處理
          //?reducer?-?數(shù)據(jù)處理完畢發(fā)送至reducer聚合
          func?MapReduceWithSource(source?<-chan?interface{},?mapper?MapperFunc,?reducer?ReducerFunc,
          ????opts?...Option)
          ?(interface{},?error)
          ?{
          ????//?可選參數(shù)設(shè)置
          ????options?:=?buildOptions(opts...)
          ????//?聚合數(shù)據(jù)channel,需要手動(dòng)調(diào)用write方法寫(xiě)入到output中
          ????output?:=?make(chan?interface{})
          ????//?output最后只會(huì)被讀取一次
          ????defer?func()?{
          ????????//?如果有多次寫(xiě)入的話(huà)則會(huì)造成阻塞從而導(dǎo)致協(xié)程泄漏
          ????????//?這里用?for?range檢測(cè)是否可以讀出數(shù)據(jù),讀出數(shù)據(jù)說(shuō)明多次寫(xiě)入了
          ????????//?為什么這里使用panic呢?顯示的提醒用戶(hù)用法錯(cuò)了會(huì)比自動(dòng)修復(fù)掉好一些
          ????????for?range?output?{
          ????????????panic("more?than?one?element?written?in?reducer")
          ????????}
          ????}()
          ????//?創(chuàng)建有緩沖的chan,容量為workers
          ????//?意味著最多允許?workers?個(gè)協(xié)程同時(shí)處理數(shù)據(jù)
          ????collector?:=?make(chan?interface{},?options.workers)
          ????//?數(shù)據(jù)聚合任務(wù)完成標(biāo)志
          ????done?:=?syncx.NewDoneChan()
          ????//?支持阻塞寫(xiě)入chan的writer
          ????writer?:=?newGuardedWriter(output,?done.Done())
          ????//?單例關(guān)閉
          ????var?closeOnce?sync.Once
          ????var?retErr?errorx.AtomicError
          ????//?數(shù)據(jù)聚合任務(wù)已結(jié)束,發(fā)送完成標(biāo)志
          ????finish?:=?func()?{
          ????????//?只能關(guān)閉一次
          ????????closeOnce.Do(func()?{
          ????????????//?發(fā)送聚合任務(wù)完成信號(hào),close函數(shù)將會(huì)向chan寫(xiě)入一個(gè)零值
          ????????????done.Close()
          ????????????//?關(guān)閉數(shù)據(jù)聚合chan
          ????????????close(output)
          ????????})
          ????}
          ????//?取消操作
          ????cancel?:=?once(func(err?error)?{
          ????????//?設(shè)置error
          ????????if?err?!=?nil?{
          ????????????retErr.Set(err)
          ????????}?else?{
          ????????????retErr.Set(ErrCancelWithNil)
          ????????}
          ????????//?清空source?channel
          ????????drain(source)
          ????????//?調(diào)用完成方法
          ????????finish()
          ????})

          ????go?func()?{
          ????????defer?func()?{
          ????????????//?清空聚合任務(wù)channel
          ????????????drain(collector)
          ????????????//?捕獲panic
          ????????????if?r?:=?recover();?r?!=?nil?{
          ????????????????//?調(diào)用cancel方法,立即結(jié)束
          ????????????????cancel(fmt.Errorf("%v",?r))
          ????????????}?else?{
          ????????????????//?正常結(jié)束
          ????????????????finish()
          ????????????}
          ????????}()
          ????????//?執(zhí)行數(shù)據(jù)加工
          ????????//?注意writer.write將加工后數(shù)據(jù)寫(xiě)入了output
          ????????reducer(collector,?writer,?cancel)
          ????}()
          ????//?異步執(zhí)行數(shù)據(jù)加工
          ????//?source?-?數(shù)據(jù)生產(chǎn)
          ????//?collector?-?數(shù)據(jù)收集
          ????//?done?-?結(jié)束標(biāo)志
          ????//?workers?-?并發(fā)數(shù)
          ????go?executeMappers(func(item?interface{},?w?Writer)?{
          ????????mapper(item,?w,?cancel)
          ????},?source,?collector,?done.Done(),?options.workers)
          ????//?reducer將加工后的數(shù)據(jù)寫(xiě)入了output,
          ????//?需要數(shù)據(jù)返回時(shí)讀取output即可
          ????//?假如output被寫(xiě)入了超過(guò)兩次
          ????//?則開(kāi)始的defer?func那里將還可以讀到數(shù)據(jù)
          ????//?由此可以檢測(cè)到用戶(hù)調(diào)用了多次write方法
          ????value,?ok?:=?<-output
          ????if?err?:=?retErr.Load();?err?!=?nil?{
          ????????return?nil,?err
          ????}?else?if?ok?{
          ????????return?value,?nil
          ????}?else?{
          ????????return?nil,?ErrReduceNoOutput
          ????}
          }
          //?數(shù)據(jù)加工
          func?executeMappers(mapper?MapFunc,?input?<-chan?interface{},?collector?chan<-?interface{},
          ????done?<-chan?lang.PlaceholderType,?workers?int)
          ?{
          ????//?goroutine協(xié)調(diào)同步信號(hào)量
          ????var?wg?sync.WaitGroup
          ????defer?func()?{
          ????????//?等待數(shù)據(jù)加工任務(wù)完成
          ????????//?防止數(shù)據(jù)加工的協(xié)程還未處理完數(shù)據(jù)就直接退出了
          ????????wg.Wait()
          ????????//?關(guān)閉數(shù)據(jù)加工channel
          ????????close(collector)
          ????}()
          ????//?帶緩沖區(qū)的channel,緩沖區(qū)大小為workers
          ????//?控制數(shù)據(jù)加工的協(xié)程數(shù)量
          ????pool?:=?make(chan?lang.PlaceholderType,?workers)
          ????//?數(shù)據(jù)加工writer
          ????writer?:=?newGuardedWriter(collector,?done)
          ????for?{
          ????????select?{
          ????????//?監(jiān)聽(tīng)到外部結(jié)束信號(hào),直接結(jié)束
          ????????case?<-done:
          ????????????return
          ????????//?控制數(shù)據(jù)加工協(xié)程數(shù)量
          ????????//?緩沖區(qū)容量-1
          ????????//?無(wú)容量時(shí)將會(huì)被阻塞,等待釋放容量
          ????????case?pool?<-?lang.Placeholder:
          ????????????//?阻塞等待生產(chǎn)數(shù)據(jù)channel
          ????????????item,?ok?:=?<-input
          ????????????//?如果ok為false則說(shuō)明input已被關(guān)閉或者清空
          ????????????//?數(shù)據(jù)加工完成,執(zhí)行退出
          ????????????if?!ok?{
          ????????????????//?緩沖區(qū)容量+1
          ????????????????<-pool
          ????????????????//?結(jié)束本次循環(huán)
          ????????????????return
          ????????????}
          ????????????//?wg同步信號(hào)量+1
          ????????????wg.Add(1)
          ????????????//?better?to?safely?run?caller?defined?method
          ????????????//?異步執(zhí)行數(shù)據(jù)加工,防止panic錯(cuò)誤
          ????????????threading.GoSafe(func()?{
          ????????????????defer?func()?{
          ????????????????????//?wg同步信號(hào)量-1
          ????????????????????wg.Done()
          ????????????????????//?緩沖區(qū)容量+1
          ????????????????????<-pool
          ????????????????}()

          ????????????????mapper(item,?writer)
          ????????????})
          ????????}
          ????}
          }

          總結(jié)

          mapReduce 的源碼我大概看了兩個(gè)晚上,整體看下來(lái)比較累。一方面是我自身 go 語(yǔ)言并不是很熟練尤其是 channel 的用法,導(dǎo)致我需要頻繁停下來(lái)查詢(xún)相關(guān)文檔理解作者的寫(xiě)法,另一方面是多個(gè) goroutine 之間通過(guò) channel 進(jìn)行通信實(shí)現(xiàn)協(xié)作真的蠻燒腦(佩服作者的思維能力)。

          其次看源碼時(shí)第一遍看起來(lái)肯定會(huì)比較懵的,其實(shí)沒(méi)關(guān)系找到程序的入口(公共基礎(chǔ)組件一般是面向的方法)先沿著主線(xiàn)一路看下去把每一句代碼都看懂加上注釋?zhuān)倏粗Ь€(xiàn)代碼。

          如果有實(shí)在看不懂的地方就查查這段代碼的提交記錄非常有可能是解決某個(gè)bug改動(dòng)的,比如下面這段代碼我死活看了好多遍都不理解。

          //?聚合數(shù)據(jù)channel,需要手動(dòng)調(diào)用write方法寫(xiě)入到output中
          output?:=?make(chan?interface{})
          //?output最后只會(huì)被讀取一次
          defer?func()?{
          ????//?如果有多次寫(xiě)入的話(huà)則會(huì)造成阻塞從而導(dǎo)致協(xié)程泄漏
          ????//?這里用?for?range檢測(cè)是否可以讀出數(shù)據(jù),讀出數(shù)據(jù)說(shuō)明多次寫(xiě)入了
          ????//?為什么這里使用panic呢?顯示的提醒用戶(hù)用法錯(cuò)了會(huì)比自動(dòng)修復(fù)掉好一些
          ????for?range?output?{
          ????????panic("more?than?one?element?written?in?reducer")
          ????}
          }()

          最后畫(huà)出流程圖基本就能把源碼看懂了,對(duì)于我而言這方法比較笨但有效。

          資料

          Go Channel 詳解: https://colobu.com/2016/04/14/Golang-Channels/

          go-zero MapReduce文檔: https://go-zero.dev/cn/mapreduce.html

          項(xiàng)目地址

          https://github.com/zeromicro/go-zero

          歡迎使用 go-zerostar 支持我們!




          往期推薦


          我是 polarisxu,北大碩士畢業(yè),曾在 360 等知名互聯(lián)網(wǎng)公司工作,10多年技術(shù)研發(fā)與架構(gòu)經(jīng)驗(yàn)!2012 年接觸 Go 語(yǔ)言并創(chuàng)建了 Go 語(yǔ)言中文網(wǎng)!著有《Go語(yǔ)言編程之旅》、開(kāi)源圖書(shū)《Go語(yǔ)言標(biāo)準(zhǔn)庫(kù)》等。


          堅(jiān)持輸出技術(shù)(包括 Go、Rust 等技術(shù))、職場(chǎng)心得和創(chuàng)業(yè)感悟!歡迎關(guān)注「polarisxu」一起成長(zhǎng)!也歡迎加我微信好友交流:gopherstudio

          瀏覽 139
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美激情四射 | 大鸡已久久久 | 骚逼女 | 日韩一级电影院 | 91免费成人电影 |