<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Sentinel-Go 源碼系列(三)滑動時間窗口算法的工程實現(xiàn)

          共 2678字,需瀏覽 6分鐘

           ·

          2021-12-23 15:52

          要說現(xiàn)在工程師最重要的能力,我覺得工程能力要排第一。

          就算現(xiàn)在大廠面試經(jīng)常要手撕算法,也是更偏向考查代碼工程實現(xiàn)的能力,之前在群里看到這樣的圖片,就覺得很離譜(大概率是假的~)。

          算法與工程實現(xiàn)

          在 Sentinel-Go 中,一個很核心的算法是流控(限流)算法。

          流控可能每個人都聽過,但真要手寫一個,還是有些困難。為什么流控算法難寫?以我的感覺是算法和工程實現(xiàn)上存在一定差異,雖然算法好理解,但卻沒法照著實現(xiàn)。

          舉個例子,令牌桶算法很好理解,只需給定一個桶,以恒定的速率往桶內(nèi)放令牌,滿了則丟棄,執(zhí)行任務(wù)前先去桶里拿令牌,只有拿到令牌才可以執(zhí)行,否則拒絕。

          如果實現(xiàn)令牌桶,按道理應(yīng)該用一個單獨線程(或進程)往桶里放令牌,業(yè)務(wù)線程去桶里取,但真要這么實現(xiàn),怎么保證這個單獨線程能穩(wěn)定執(zhí)行,萬一掛了豈不是很危險?

          所以工程實現(xiàn)上和算法原本肯定存在一定的差異,這也是為什么需要深入源碼的一個原因。

          滑動時間窗口的演進

          通常來說,流控的度量是按每秒的請求數(shù),也就是 QPS

          QPS:query per second,指每秒查詢數(shù),當然他的意義已經(jīng)泛化了,不再特指查詢,可以泛指所有請求。如果非要區(qū)分,TPS 指每秒事務(wù)數(shù),即寫入數(shù),或 RPS,每秒請求數(shù),本文不分這么細,統(tǒng)一叫QPS。

          當然也有按并發(fā)數(shù)來度量,并發(fā)數(shù)的流控就非常簡單

          并發(fā)數(shù)流控

          并發(fā)是一個瞬時概念,它跟時間沒有關(guān)系。和進程中的線程數(shù)、協(xié)程數(shù)一樣,每次取的時候只能拿到一個瞬間的快照,但可能很快就變化了。

          并發(fā)數(shù)怎么定義?可以近似認為進入業(yè)務(wù)代碼開始就算一個并發(fā),執(zhí)行完這個并發(fā)就消失。

          這樣說來,實現(xiàn)就非常簡單了,只需要定義一個全局變量,責任鏈開始時對這個變量原子增1,并獲取當前并發(fā)數(shù)的一個快照,判斷并發(fā)數(shù)是否超限,如果超限則直接阻斷,執(zhí)行完了別忘了原子減1即可,由于太過簡單,就不需要放代碼了。

          固定時間窗口

          參考并發(fā)數(shù)流控,當需要度量 QPS 時,是否也可以利用這樣的思想呢?

          由于 QPS 有時間的度量,第一直覺是和并發(fā)數(shù)一樣弄個變量,再起個單獨線程每隔 1s 重置這個變量。

          但單獨線程始終不放心,需要稍微改一下。

          如果系統(tǒng)有一個起始時間,每次請求時,獲取當前時間,兩者之差,就能算出當前處于哪個時間窗口,這個時間窗口單獨計數(shù)即可。

          如果稍微思考下,你會發(fā)現(xiàn)問題不簡單,如下圖,10t 到20t 只有60個請求,20t到30t之間只有80個請求,但有可能16t到26t之間有110個請求,這就很有可能把系統(tǒng)打垮。

          滑動時間窗口

          為了解決上面的問題,工程師想出了一個好辦法:別固定時間窗口,以當前時間往前推算窗口

          但問題又來了,這該怎么實現(xiàn)呢?

          滑動時間窗口工程實現(xiàn)

          在工程實現(xiàn)上,可以將時間劃分為細小的采樣窗口,緩存一段時間的采樣窗口,這樣每當請求來的時候,只需要往前拿一段時間的采樣窗口,然后求和就能拿到總的請求數(shù)。

          Sentinel-Go 滑動時間窗口的實現(xiàn)

          前方代碼高能預(yù)警~

          Sentinel-Go 是基于 LeapArray 實現(xiàn)的滑動窗口,其數(shù)據(jù)結(jié)構(gòu)如下

          type?LeapArray?struct?{
          ?bucketLengthInMs?uint32?//?bucket大小
          ?sampleCount??????uint32?//?bucket數(shù)量
          ?intervalInMs?????uint32?//?窗口總大小
          ?array????????????*AtomicBucketWrapArray?//?bucket數(shù)組
          ?updateLock?mutex?//?更新鎖
          }

          type?AtomicBucketWrapArray?struct?{
          ?base?unsafe.Pointer?//?數(shù)組的起始地址
          ?length?int?//?長度,不能改變
          ?data???[]*BucketWrap?//?真正bucket的數(shù)據(jù)
          }

          type?BucketWrap?struct?{
          ?BucketStart?uint64?//?bucket起始時間
          ?Value?atomic.Value?//?bucket數(shù)據(jù)結(jié)構(gòu),例如?MetricBucket
          }

          type?MetricBucket?struct?{
          ?counter????????[base.MetricEventTotal]int64?//?計數(shù)數(shù)組,可放不同類型
          ?minRt??????????int64?//?最小RT
          ?maxConcurrency?int32?//?最大并發(fā)數(shù)
          }

          再看下是如何寫入指標的,例如當流程正常通過時

          //?①
          sn.AddCount(base.MetricEventPass,?int64(count))

          //?②
          func?(bla?*BucketLeapArray)?AddCount(event?base.MetricEvent,?count?int64)?{
          ?bla.addCountWithTime(util.CurrentTimeMillis(),?event,?count)
          }

          //?③
          func?(bla?*BucketLeapArray)?addCountWithTime(now?uint64,?event?base.MetricEvent,?count?int64)?{
          ?b?:=?bla.currentBucketWithTime(now)
          ?if?b?==?nil?{
          ??return
          ?}
          ?b.Add(event,?count)
          }

          //?④
          func?(mb?*MetricBucket)?Add(event?base.MetricEvent,?count?int64)?{
          ?if?event?>=?base.MetricEventTotal?||?event?0?{
          ??logging.Error(errors.Errorf("Unknown?metric?event:?%v",?event),?"")
          ??return
          ?}
          ?if?event?==?base.MetricEventRt?{
          ??mb.AddRt(count)
          ??return
          ?}
          ?mb.addCount(event,?count)
          }

          //?⑤
          func?(mb?*MetricBucket)?addCount(event?base.MetricEvent,?count?int64)?{
          ?atomic.AddInt64(&mb.counter[event],?count)
          }

          取到相應(yīng)的 bucket,然后寫入相應(yīng) event 的 count,對 RT 會特殊處理,因為有一個最小 RT 需要處理。

          重點看是如何取到相應(yīng)的 bucket 的:

          func?(bla?*BucketLeapArray)?currentBucketWithTime(now?uint64)?*MetricBucket?{
          ?//?①根據(jù)當前時間取bucket
          ?curBucket,?err?:=?bla.data.currentBucketOfTime(now,?bla)
          ?...
          ?b,?ok?:=?mb.(*MetricBucket)
          ?if?!ok?{
          ??...
          ??return?nil
          ?}
          ?return?b
          }

          func?(la?*LeapArray)?currentBucketOfTime(now?uint64,?bg?BucketGenerator)?(*BucketWrap,?error)?{
          ?...
          ?//?②計算index?=?(now?/?bucketLengthInMs)?%?LeapArray.array.length
          ?idx?:=?la.calculateTimeIdx(now)
          ?//?③計算bucket開始時間?=?now?-?(now?%?bucketLengthInMs)
          ?bucketStart?:=?calculateStartTime(now,?la.bucketLengthInMs)

          ?for?{?
          ??old?:=?la.array.get(idx)
          ??if?old?==?nil?{?//?④未使用,直接返回
          ???newWrap?:=?&BucketWrap{
          ????BucketStart:?bucketStart,
          ????Value:???????atomic.Value{},
          ???}
          ???newWrap.Value.Store(bg.NewEmptyBucket())
          ???if?la.array.compareAndSet(idx,?nil,?newWrap)?{
          ????return?newWrap,?nil
          ???}?else?{
          ????runtime.Gosched()
          ???}
          ??}?else?if?bucketStart?==?atomic.LoadUint64(&old.BucketStart)?{?//?⑤剛好取到是當前bucket,返回
          ???return?old,?nil
          ??}?else?if?bucketStart?>?atomic.LoadUint64(&old.BucketStart)?{?//?⑥取到了舊的bucket,重置使用
          ???if?la.updateLock.TryLock()?{
          ????old?=?bg.ResetBucketTo(old,?bucketStart)
          ????la.updateLock.Unlock()
          ????return?old,?nil
          ???}?else?{
          ????runtime.Gosched()
          ???}
          ??}?else?if?bucketStart?//?⑦取到了比當前還新的bucket,總共只有一個bucket時,并發(fā)情況可能會出現(xiàn)這種情況,其他情況不可能,直接報錯
          ???if?la.sampleCount?==?1?{
          ????return?old,?nil
          ???}
          ???
          ???return?nil,?errors.New(fmt.Sprintf("Provided?time?timeMillis=%d?is?already?behind?old.BucketStart=%d.",?bucketStart,?old.BucketStart))
          ??}
          ?}
          }

          舉個直觀的例子,看如何拿到 bucket:

          • 假設(shè) B2 取出來是 nil,則 new 一個 bucket 通過 compareAndSet 寫入,保證線程安全,如果別別的線程先寫入,這里會執(zhí)行失敗,調(diào)用 runtime.Gosched(),讓出時間片,進入下一次循環(huán)
          • 假設(shè)取出 B2 的開始時間是3400,與計算的相同,則直接使用
          • 假設(shè)取出的 B2 的開始時間小于 3400,說明這個 bucket 太舊了,需要覆蓋,使用更新鎖來更新,保證線程安全,如果拿不到鎖,也讓出時間片,進入下一次循環(huán)
          • 假設(shè)取出 B2 的開始時間大于3400,說明已經(jīng)有其他線程更新了,而 bucketLengthInMs 通常遠遠大于鎖的獲取時間,所以這里只考慮只有一個 bucket 的情況直接返回,其他情況報錯

          回到 QPS 計算:

          qps?:=?stat.InboundNode().GetQPS(base.MetricEventPass)

          該方法會先計算一個起始時間范圍

          func?(m?*SlidingWindowMetric)?getBucketStartRange(timeMs?uint64)?(start,?end?uint64)?{
          ?curBucketStartTime?:=?calculateStartTime(timeMs,?m.real.BucketLengthInMs())
          ?end?=?curBucketStartTime
          ?start?=?end?-?uint64(m.intervalInMs)?+?uint64(m.real.BucketLengthInMs())
          ?return
          }

          例如當前時間為3500,則計算出

          • end = 3400
          • start = 3400 - 1200 ?+ 200 = 2400

          然后遍歷所有 bucket,把在這個范圍內(nèi)的 bucket 都拿出來,計算 QPS,只需要相加即可。

          最后

          本節(jié)從滑動窗口流控算法的工程實現(xiàn)演進到 Sentinel-Go 里滑動窗口的實現(xiàn),從 Sentinel-Go 的實現(xiàn)上看到,還得考慮內(nèi)存的使用,并發(fā)控制等等,如果完全寫出來,還是非常不容易的。

          《Sentinel-Go源碼系列》已經(jīng)寫了三篇,只介紹了兩個知識點:責任鏈模式、滑動窗口限流,后續(xù)還有對象池等,但這其實和 Sentinel-Go 關(guān)系不是很大,到時候單獨成文,就不放在本系列里了。

          本文算是一個結(jié)束,與其說是結(jié)束,不如說是一個開始。


          • 搜索關(guān)注微信公眾號"捉蟲大師",后端技術(shù)分享,架構(gòu)設(shè)計、性能優(yōu)化、源碼閱讀、問題排查、踩坑實踐。
          • 另外我也準備組建一個技術(shù)交流群,但現(xiàn)在也不知道會有多少人,所以大家先加我微信 MrRoshi,備注加群,一起交流技術(shù),超過一定人數(shù)我就拉一個~
          瀏覽 61
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲激情视频 | 欧美大香蕉伊人 | 香蕉人妻AV久久久久天天 | 丁香六月欧美 | 亚欧成人精品无码视频在线观看 |