<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Go 限流器系列(3)--自適應限流

          共 3492字,需瀏覽 7分鐘

           ·

          2020-08-26 13:10

          漏斗桶/令牌桶確實能夠保護系統(tǒng)不被拖垮, 但不管漏斗桶還是令牌桶, 其防護思路都是設(shè)定一個指標, 當超過該指標后就阻止或減少流量的繼續(xù)進入,當系統(tǒng)負載降低到某一水平后則恢復流量的進入。但其通常都是被動的,其實際效果取決于限流閾值設(shè)置是否合理,但往往設(shè)置合理不是一件容易的事情.

          項目日常維護中, 經(jīng)常能夠看到某某同學在群里說:xx系統(tǒng)429了, 然后經(jīng)過一番查找后發(fā)現(xiàn)是一波突然的活動流量, 只能申請再新增幾臺機器. 過了幾天 OP 發(fā)現(xiàn)該集群的流量達不到預期又下掉了幾臺機器, 然后又開始一輪新的循環(huán).

          這里先不討論集群自動伸縮的問題. 這里提出一些問題

          1. 集群增加機器或者減少機器限流閾值是否要重新設(shè)置?
          2. 設(shè)置限流閾值的依據(jù)是什么?
          3. 人力運維成本是否過高?
          4. 當調(diào)用方反饋429時, 這個時候重新設(shè)置限流, 其實流量高峰已經(jīng)過了重新評估限流是否有意義?

          這些其實都是采用漏斗桶/令牌桶的缺點, 總體來說就是太被動, 不能快速適應流量變化

          自適應限流

          對于自適應限流來說, 一般都是結(jié)合系統(tǒng)的 Load、CPU 使用率以及應用的入口 QPS、平均響應時間和并發(fā)量等幾個維度的監(jiān)控指標,通過自適應的流控策略, 讓系統(tǒng)的入口流量和系統(tǒng)的負載達到一個平衡,讓系統(tǒng)盡可能跑在最大吞吐量的同時保證系統(tǒng)整體的穩(wěn)定性。

          比較出名的自適應限流的實現(xiàn)是 Alibaba Sentinel. 不過由于提前沒有發(fā)現(xiàn) Sentinel 有個 golang 版本的實現(xiàn), 本篇文章就以 Kratos 的 BBR 實現(xiàn)探討自適應限流的原理.

          Kratos 自適應限流

          借鑒了 Sentinel 項目的自適應限流系統(tǒng), 通過綜合分析服務(wù)的 cpu 使用率、請求成功的 qps 和請求成功的 rt 來做自適應限流保護。

          • cpu: 最近 1s 的 CPU 使用率均值,使用滑動平均計算,采樣周期是 250ms
          • inflight: 當前處理中正在處理的請求數(shù)量
          • pass: 請求處理成功的量
          • rt: 請求成功的響應耗時

          限流公式

          cpu > 800 AND (Now - PrevDrop) < 1s AND (MaxPass * MinRt * windows / 1000) < InFlight

          • MaxPass 表示最近 5s 內(nèi),單個采樣窗口中最大的請求數(shù)
          • MinRt 表示最近 5s 內(nèi),單個采樣窗口中最小的響應時間
          • windows 表示一秒內(nèi)采樣窗口的數(shù)量,默認配置中是 5s 50 個采樣,那么 windows 的值為 10

          kratos 中間件實現(xiàn)

          func?(b?*RateLimiter)?Limit()?HandlerFunc?{
          ?return?func(c?*Context)?{
          ??uri?:=?fmt.Sprintf("%s://%s%s",?c.Request.URL.Scheme,?c.Request.Host,?c.Request.URL.Path)
          ??limiter?:=?b.group.Get(uri)
          ??done,?err?:=?limiter.Allow(c)
          ??if?err?!=?nil?{
          ???_metricServerBBR.Inc(uri,?c.Request.Method)
          ???c.JSON(nil,?err)
          ???c.Abort()
          ???return
          ??}
          ??defer?func()?{
          ???done(limit.DoneInfo{Op:?limit.Success})
          ???b.printStats(uri,?limiter)
          ??}()
          ??c.Next()
          ?}
          }

          使用方式

          e?:=?bm.DefaultServer(nil)
          limiter?:=?bm.NewRateLimiter(nil)
          e.Use(limiter.Limit())
          e.GET("/api",?myHandler)

          源碼實現(xiàn)

          Allow

          func?(l?*BBR)?Allow(ctx?context.Context,?opts?...limit.AllowOption)?(func(info?limit.DoneInfo),?error)?{
          ?allowOpts?:=?limit.DefaultAllowOpts()
          ?for?_,?opt?:=?range?opts?{
          ??opt.Apply(&allowOpts)
          ?}
          ?if?l.shouldDrop()?{?//?判斷是否觸發(fā)限流
          ??return?nil,?ecode.LimitExceed
          ?}
          ?atomic.AddInt64(&l.inFlight,?1)?//?增加正在處理請求數(shù)
          ?stime?:=?time.Since(initTime)?//?記錄請求到來的時間
          ?return?func(do?limit.DoneInfo)?{
          ??rt?:=?int64((time.Since(initTime)?-?stime)?/?time.Millisecond)?//?請求處理成功的響應時長
          ??l.rtStat.Add(rt)?//?增加rtStat響應耗時的統(tǒng)計
          ??atomic.AddInt64(&l.inFlight,?-1)?//?請求處理成功后,?減少正在處理的請求數(shù)
          ??switch?do.Op?{
          ??case?limit.Success:
          ???l.passStat.Add(1)?//?處理成功后增加成功處理請求數(shù)的統(tǒng)計
          ???return
          ??default:
          ???return
          ??}
          ?},?nil
          }

          shouldDrop

          func?(l?*BBR)?shouldDrop()?bool?{
          ?//?判斷目前cpu的使用率是否達到設(shè)置的CPU的限制,?默認值800
          ?if?l.cpu()???//?如果上一次舍棄請求的時間是0,?那么說明沒有限流的需求,?直接返回
          ??prevDrop,?_?:=?l.prevDrop.Load().(time.Duration)
          ??if?prevDrop?==?0?{
          ???return?false
          ??}
          ??//?如果上一次請求的時間與當前的請求時間小于1s,?那么說明有限流的需求
          ??if?time.Since(initTime)-prevDrop?<=?time.Second?{
          ???if?atomic.LoadInt32(&l.prevDropHit)?==?0?{
          ????atomic.StoreInt32(&l.prevDropHit,?1)
          ???}
          ???//?增加正在處理的請求的數(shù)量
          ???inFlight?:=?atomic.LoadInt64(&l.inFlight)
          ???//?判斷正在處理的請求數(shù)是否達到系統(tǒng)的最大的請求數(shù)量
          ???return?inFlight?>?1?&&?inFlight?>?l.maxFlight()
          ??}
          ??//?清空當前的prevDrop
          ??l.prevDrop.Store(time.Duration(0))
          ??return?false
          ?}
          ?//?增加正在處理的請求的數(shù)量
          ?inFlight?:=?atomic.LoadInt64(&l.inFlight)
          ?//?判斷正在處理的請求數(shù)是否達到系統(tǒng)的最大的請求數(shù)量
          ?drop?:=?inFlight?>?1?&&?inFlight?>?l.maxFlight()
          ?if?drop?{
          ??prevDrop,?_?:=?l.prevDrop.Load().(time.Duration)
          ??//?如果判斷達到了最大請求數(shù)量,?并且當前有限流需求
          ??if?prevDrop?!=?0?{
          ???return?drop
          ??}
          ??l.prevDrop.Store(time.Since(initTime))
          ?}
          ?return?drop
          }

          maxFlight

          該函數(shù)是核心函數(shù). 其計算公式: MaxPass * MinRt * windows / 1000. maxPASS/minRT都是基于metric.RollingCounter來實現(xiàn)的, 限于篇幅原因這里就不再具體看其實現(xiàn)(想看的可以去看rolling_counter_test.go還是蠻容易理解的)

          func?(l?*BBR)?maxFlight()?int64?{
          ?return?int64(math.Floor(float64(l.maxPASS()*l.minRT()*l.winBucketPerSec)/1000.0?+?0.5))
          }
          • winBucketPerSec: 每秒內(nèi)的采樣數(shù)量,其計算方式:int64(time.Second)/(int64(conf.Window)/int64(conf.WinBucket)), conf.Window默認值10s, conf.WinBucket默認值100. 簡化下公式: 1/(10/100) = 10, 所以每秒內(nèi)的采樣數(shù)就是10
          //?單個采樣窗口在一個采樣周期中的最大的請求數(shù),?默認的采樣窗口是10s,?采樣bucket數(shù)量100
          func?(l?*BBR)?maxPASS()?int64?{
          ?rawMaxPass?:=?atomic.LoadInt64(&l.rawMaxPASS)
          ?if?rawMaxPass?>?0?&&?l.passStat.Timespan()?1?{
          ??return?rawMaxPass
          ?}
          ?//?遍歷100個采樣bucket,?找到采樣bucket中最大的請求數(shù)
          ?rawMaxPass?=?int64(l.passStat.Reduce(func(iterator?metric.Iterator)?float64?{
          ??var?result?=?1.0
          ??for?i?:=?1;?iterator.Next()?&&?i????bucket?:=?iterator.Bucket()
          ???count?:=?0.0
          ???for?_,?p?:=?range?bucket.Points?{
          ????count?+=?p
          ???}
          ???result?=?math.Max(result,?count)
          ??}
          ??return?result
          ?}))
          ?if?rawMaxPass?==?0?{
          ??rawMaxPass?=?1
          ?}
          ?atomic.StoreInt64(&l.rawMaxPASS,?rawMaxPass)
          ?return?rawMaxPass
          }

          //?單個采樣窗口中最小的響應時間
          func?(l?*BBR)?minRT()?int64?{
          ?rawMinRT?:=?atomic.LoadInt64(&l.rawMinRt)
          ?if?rawMinRT?>?0?&&?l.rtStat.Timespan()?1?{
          ??return?rawMinRT
          ?}
          ?//?遍歷100個采樣bucket,?找到采樣bucket中最小的響應時間
          ?rawMinRT?=?int64(math.Ceil(l.rtStat.Reduce(func(iterator?metric.Iterator)?float64?{
          ??var?result?=?math.MaxFloat64
          ??for?i?:=?1;?iterator.Next()?&&?i????bucket?:=?iterator.Bucket()
          ???if?len(bucket.Points)?==?0?{
          ????continue
          ???}
          ???total?:=?0.0
          ???for?_,?p?:=?range?bucket.Points?{
          ????total?+=?p
          ???}
          ???avg?:=?total?/?float64(bucket.Count)
          ???result?=?math.Min(result,?avg)
          ??}
          ??return?result
          ?})))
          ?if?rawMinRT?<=?0?{
          ??rawMinRT?=?1
          ?}
          ?atomic.StoreInt64(&l.rawMinRt,?rawMinRT)
          ?return?rawMinRT
          }

          參考

          [1]

          alibaba/Sentinel-系統(tǒng)自適應限流: https://github.com/alibaba/Sentinel/wiki/%E7%B3%BB%E7%BB%9F%E8%87%AA%E9%80%82%E5%BA%94%E9%99%90%E6%B5%81

          [2]

          go-kratos/kratos-自適應限流保護: https://github.com/go-kratos/kratos/blob/master/doc/wiki-cn/ratelimit.md

          [3]

          alibaba/sentinel-golang-系統(tǒng)自適應流控: https://github.com/alibaba/sentinel-golang/wiki/%E7%B3%BB%E7%BB%9F%E8%87%AA%E9%80%82%E5%BA%94%E6%B5%81%E6%8E%A7





          推薦閱讀



          學習交流 Go 語言,掃碼回復「進群」即可


          站長 polarisxu

          自己的原創(chuàng)文章

          不限于 Go 技術(shù)

          職場和創(chuàng)業(yè)經(jīng)驗


          Go語言中文網(wǎng)

          每天為你

          分享 Go 知識

          Go愛好者值得關(guān)注



          瀏覽 80
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产传媒无码 | 草大B老骚B | 日韩黄页| 日本乱伦一区 | 日韩成人电影在线观看 |