<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          使用增強版 singleflight 合并事件推送,效果炸裂!

          共 3709字,需瀏覽 8分鐘

           ·

          2023-05-18 05:34

          hello,大家好啊,我是小樓。

          最近在工作中對 Go 的 singleflight 包做了下增強,解決了一個性能問題,這里記錄下,希望對你也有所幫助。

          singleflight 是什么

          singleflight 直接翻譯為"單(次)飛(行)",它是對同一種請求的抑制,保證同一時刻相同的請求只有一個在執(zhí)行,且在它執(zhí)行期間的相同請求都會 Hold 直到執(zhí)行完成,這些 Hold 的請求也使用這次執(zhí)行的結(jié)果。

          舉個例子,當程序中有讀(如 Redis、MySQL、Http、RPC等)請求,且并發(fā)非常高的情況,使用 singleflight 能得到比較好的效果,它限制了同一時刻只有一個請求在執(zhí)行,也就是并發(fā)永遠為1。

          8527b1a67674c23b05b5d20c9cedd808.webp singleflight 的原理

          最初 singleflight 出現(xiàn)在 groupcache 項目中,這個項目也是 Go 團隊所寫,后來該包被移到 Go 源碼中,在 Go 源碼中的版本經(jīng)過幾輪迭代,稍微有點復雜,我們以最原始的源碼來講解原理,更方便地看清本質(zhì)。

          https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go

          singleflight 把每次請求定義為 call,每個 call 對象包含了一個 waitGroup,一個 val,即請求的返回值,一個 err,即請求返回的錯誤。

                
                type?call?struct?{
          ?wg??sync.WaitGroup
          ?val?interface{}
          ?err?error
          }

          再定義全局的 Group,包含一個互斥鎖 Mutex,一個 key 為 string,value 為 call 的 map。

                
                type?Group?struct?{
          ?mu?sync.Mutex???????
          ?m??map[string]*call
          }

          Group 對象有一個 Do 方法,其第一個參數(shù)是 string 類型的 key,這個 key 也就是上面說的 map 的 key,相同的 key 標志著他們是相同的請求,只有相同的請求會被抑制;第二個參數(shù)是一個函數(shù) fn,這個函數(shù)是真正要執(zhí)行的函數(shù),例如調(diào)用 MySQL;返回值比較好理解,即最終調(diào)用的返回值和錯誤信息。

                
                func?(g?*Group)?Do(key?string,?fn?func()?(interface{},?error))?(interface{},?error)?{
          ?//?①
          ??g.mu.Lock()
          ?if?g.m?==?nil?{
          ??g.m?=?make(map[string]*call)
          ?}
          ??//?②
          ?if?c,?ok?:=?g.m[key];?ok?{
          ??g.mu.Unlock()
          ??c.wg.Wait()
          ??return?c.val,?c.err
          ?}
          ??//?③
          ?c?:=?new(call)
          ?c.wg.Add(1)
          ?g.m[key]?=?c
          ?g.mu.Unlock()

          ?c.val,?c.err?=?fn()
          ?c.wg.Done()

          ?g.mu.Lock()
          ?delete(g.m,?key)
          ?g.mu.Unlock()

          ?return?c.val,?c.err
          }

          將整個代碼分成三塊:

          • ① 懶加載方式初始化 map;
          • ② 如果當前 key 存在,即相同請求正在調(diào)用中,就等它完成,完成后直接使用它的 value 和 error;
          • ③ 如果當前 key 不存在,即沒有相同請求正在調(diào)用中,就創(chuàng)建一個 call 對象,并把它放進 map,接著執(zhí)行 fn 函數(shù),當函數(shù)執(zhí)行完喚醒 waitGroup,并刪除 map 相應(yīng)的 key,返回 value 和 error。
          讀可以抑制,寫呢?

          我們通過上面的介紹能了解,singleflight 能解決并發(fā)讀的問題,但我又遇到一個并發(fā)寫的問題。為了能讓大家快速進入狀態(tài),先花一點篇幅描述一下遇到的實際問題:

          微服務(wù)中的注冊中心想必大家都有所了解,如果不了解,可以去查查相關(guān)概念,或者翻看我以前的文章,老讀者應(yīng)該能發(fā)現(xiàn)我寫了很多相關(guān)的文章。

          服務(wù)提供方在注冊之后,會將變更事件推送到消費方,推送事件的處理流程是:接收到事件,查詢組裝出最新的數(shù)據(jù),然后推送給訂閱者。存在兩種情況可能會導致短時間內(nèi)注冊請求非常多,推送事件多會影響整個注冊中心的性能:

          • 接口級注冊(類似 Dubbo),每臺機器會注冊N多次
          • 服務(wù)并發(fā)發(fā)布,例如每次發(fā)布重啟100臺機器,那么注冊的并發(fā)就可能是100

          拿到這種問題,第一想到的解法是:合并推送。但,怎么合并呢?

          是不是每次推送的時候等一等,等事件都來了再一把推過去就可以了?但等多久呢?什么時候該等呢?粗暴點,每秒鐘推送一次,這樣就能將一秒內(nèi)的時間都聚合,但這會影響推送的時效性,顯然不符合我們精益求精的要求。

          直接使用 singleflight,能行嗎?

          套用上面 singleflight ,在第一個事件推送過程中,其他相同的事件被 Hold 住,等第一個事件推送完成后,這些 Hold 的事件不再執(zhí)行推送直接返回。

          稍微想一下就知道這樣是有問題的,假設(shè)有三個事件 A、B、C,分別對應(yīng)到三個版本的數(shù)據(jù)A1、B1、C1,A 最先到達,在 A 開始推送后但沒完成時 B、C 事件到達,A 事件觸發(fā)推送了 A1 版本的數(shù)據(jù),B、C 事件在 A 事件推送完成后,直接丟棄,最終推送到消費者上的數(shù)據(jù)版本為 A1,但我們肯定期望推送的數(shù)據(jù)版本為 C1,畫個圖線感受下:

          225c704df21ba1211d27a68b80e8d71f.webp

          增強一點點 ????

          假設(shè)有事件 A、B、C、D 先后到達,A 事件仍然先正常執(zhí)行推送,在 A 事件推送的時候,B、C、D 事件 Hold 住,當 A 事件推送完成后,B 事件開始推送,B 事件將把 A 事件推送時期積攢的事件都一起推送掉,即 B、C、D 一次性推送完成。

          84db39f3b8c3205f536e28aa2f51472b.webp

          增強代碼參考

          增強的定義為 WriteGroup,借用 singleflight 原先的實現(xiàn),具體代碼就不必解讀了,對照上面的例子應(yīng)該很好理解。

                
                package?singleflight

          import?(
          ?"sync"
          )

          type?WriteGroup?struct?{
          ?mu????sync.Mutex
          ?wgs???map[string]*sync.WaitGroup
          ?group?Group
          }

          func?(g?*WriteGroup)?Do(key?string,?fn?func()?error)?error?{
          ?g.mu.Lock()
          ?if?g.wgs?==?nil?{
          ??g.wgs?=?make(map[string]*sync.WaitGroup)
          ?}
          ?wg,?ok?:=?g.wgs[key]
          ?if?!ok?{
          ??wg?=?&sync.WaitGroup{}
          ??wg.Add(1)
          ??g.wgs[key]?=?wg
          ?}
          ?g.mu.Unlock()

          ?if?!ok?{
          ??err?:=?fn()

          ??g.mu.Lock()
          ??wg.Done()
          ??delete(g.wgs,?key)
          ??g.mu.Unlock()
          ??return?err
          ?}

          ?wg.Wait()
          ?_,?err?:=?g.group.Do(key,?func()?(interface{},?error)?{
          ??return?nil,?fn()
          ?})
          ?return?err
          }

          效果如何?

          理論上,如果沒有并發(fā),事件和以前一樣推送,沒有合并,當然這也沒毛病。當并發(fā)大于 2 時,開始發(fā)揮威力。在實際的壓測上,注冊并發(fā) 1500 時,合并的事件達到 99.9%,效果相當炸裂!

          最后感謝能抽空看到這里,如果你能點贊在看、分享,我會更加感激不盡~


          • 搜索關(guān)注微信公眾號"捉蟲大師",后端技術(shù)分享,架構(gòu)設(shè)計、性能優(yōu)化、源碼閱讀、問題排查、踩坑實踐
          • 進技術(shù)交流群加微信 MrRoshi
          瀏覽 63
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  熟女网址 | 97超碰精品 | 亚洲国产日韩欧美在线a乱码日本中文字幕 欧美三级韩国三级日本三斤在线观看en | 欧美淫秽视频在线 | 精品无人国产偷自产在线 |