Go 限流器系列(3)--自適應限流
漏斗桶/令牌桶確實能夠保護系統(tǒng)不被拖垮, 但不管漏斗桶還是令牌桶, 其防護思路都是設(shè)定一個指標, 當超過該指標后就阻止或減少流量的繼續(xù)進入,當系統(tǒng)負載降低到某一水平后則恢復流量的進入。但其通常都是被動的,其實際效果取決于限流閾值設(shè)置是否合理,但往往設(shè)置合理不是一件容易的事情.
項目日常維護中, 經(jīng)常能夠看到某某同學在群里說:xx系統(tǒng)429了, 然后經(jīng)過一番查找后發(fā)現(xiàn)是一波突然的活動流量, 只能申請再新增幾臺機器. 過了幾天 OP 發(fā)現(xiàn)該集群的流量達不到預期又下掉了幾臺機器, 然后又開始一輪新的循環(huán).
這里先不討論集群自動伸縮的問題. 這里提出一些問題
集群增加機器或者減少機器限流閾值是否要重新設(shè)置? 設(shè)置限流閾值的依據(jù)是什么? 人力運維成本是否過高? 當調(diào)用方反饋429時, 這個時候重新設(shè)置限流, 其實流量高峰已經(jīng)過了重新評估限流是否有意義?
這些其實都是采用漏斗桶/令牌桶的缺點, 總體來說就是太被動, 不能快速適應流量變化
自適應限流
對于自適應限流來說, 一般都是結(jié)合系統(tǒng)的 Load、CPU 使用率以及應用的入口 QPS、平均響應時間和并發(fā)量等幾個維度的監(jiān)控指標,通過自適應的流控策略, 讓系統(tǒng)的入口流量和系統(tǒng)的負載達到一個平衡,讓系統(tǒng)盡可能跑在最大吞吐量的同時保證系統(tǒng)整體的穩(wěn)定性。
比較出名的自適應限流的實現(xiàn)是 Alibaba Sentinel. 不過由于提前沒有發(fā)現(xiàn) Sentinel 有個 golang 版本的實現(xiàn), 本篇文章就以 Kratos 的 BBR 實現(xiàn)探討自適應限流的原理.
Kratos 自適應限流
借鑒了 Sentinel 項目的自適應限流系統(tǒng), 通過綜合分析服務(wù)的 cpu 使用率、請求成功的 qps 和請求成功的 rt 來做自適應限流保護。
cpu: 最近 1s 的 CPU 使用率均值,使用滑動平均計算,采樣周期是 250ms inflight: 當前處理中正在處理的請求數(shù)量 pass: 請求處理成功的量 rt: 請求成功的響應耗時
限流公式
cpu > 800 AND (Now - PrevDrop) < 1s AND (MaxPass * MinRt * windows / 1000) < InFlight
MaxPass 表示最近 5s 內(nèi),單個采樣窗口中最大的請求數(shù) MinRt 表示最近 5s 內(nèi),單個采樣窗口中最小的響應時間 windows 表示一秒內(nèi)采樣窗口的數(shù)量,默認配置中是 5s 50 個采樣,那么 windows 的值為 10
kratos 中間件實現(xiàn)
func?(b?*RateLimiter)?Limit()?HandlerFunc?{
?return?func(c?*Context)?{
??uri?:=?fmt.Sprintf("%s://%s%s",?c.Request.URL.Scheme,?c.Request.Host,?c.Request.URL.Path)
??limiter?:=?b.group.Get(uri)
??done,?err?:=?limiter.Allow(c)
??if?err?!=?nil?{
???_metricServerBBR.Inc(uri,?c.Request.Method)
???c.JSON(nil,?err)
???c.Abort()
???return
??}
??defer?func()?{
???done(limit.DoneInfo{Op:?limit.Success})
???b.printStats(uri,?limiter)
??}()
??c.Next()
?}
}
使用方式
e?:=?bm.DefaultServer(nil)
limiter?:=?bm.NewRateLimiter(nil)
e.Use(limiter.Limit())
e.GET("/api",?myHandler)
源碼實現(xiàn)
Allow
func?(l?*BBR)?Allow(ctx?context.Context,?opts?...limit.AllowOption)?(func(info?limit.DoneInfo),?error)?{
?allowOpts?:=?limit.DefaultAllowOpts()
?for?_,?opt?:=?range?opts?{
??opt.Apply(&allowOpts)
?}
?if?l.shouldDrop()?{?//?判斷是否觸發(fā)限流
??return?nil,?ecode.LimitExceed
?}
?atomic.AddInt64(&l.inFlight,?1)?//?增加正在處理請求數(shù)
?stime?:=?time.Since(initTime)?//?記錄請求到來的時間
?return?func(do?limit.DoneInfo)?{
??rt?:=?int64((time.Since(initTime)?-?stime)?/?time.Millisecond)?//?請求處理成功的響應時長
??l.rtStat.Add(rt)?//?增加rtStat響應耗時的統(tǒng)計
??atomic.AddInt64(&l.inFlight,?-1)?//?請求處理成功后,?減少正在處理的請求數(shù)
??switch?do.Op?{
??case?limit.Success:
???l.passStat.Add(1)?//?處理成功后增加成功處理請求數(shù)的統(tǒng)計
???return
??default:
???return
??}
?},?nil
}
shouldDrop
func?(l?*BBR)?shouldDrop()?bool?{
?//?判斷目前cpu的使用率是否達到設(shè)置的CPU的限制,?默認值800
?if?l.cpu()???//?如果上一次舍棄請求的時間是0,?那么說明沒有限流的需求,?直接返回
??prevDrop,?_?:=?l.prevDrop.Load().(time.Duration)
??if?prevDrop?==?0?{
???return?false
??}
??//?如果上一次請求的時間與當前的請求時間小于1s,?那么說明有限流的需求
??if?time.Since(initTime)-prevDrop?<=?time.Second?{
???if?atomic.LoadInt32(&l.prevDropHit)?==?0?{
????atomic.StoreInt32(&l.prevDropHit,?1)
???}
???//?增加正在處理的請求的數(shù)量
???inFlight?:=?atomic.LoadInt64(&l.inFlight)
???//?判斷正在處理的請求數(shù)是否達到系統(tǒng)的最大的請求數(shù)量
???return?inFlight?>?1?&&?inFlight?>?l.maxFlight()
??}
??//?清空當前的prevDrop
??l.prevDrop.Store(time.Duration(0))
??return?false
?}
?//?增加正在處理的請求的數(shù)量
?inFlight?:=?atomic.LoadInt64(&l.inFlight)
?//?判斷正在處理的請求數(shù)是否達到系統(tǒng)的最大的請求數(shù)量
?drop?:=?inFlight?>?1?&&?inFlight?>?l.maxFlight()
?if?drop?{
??prevDrop,?_?:=?l.prevDrop.Load().(time.Duration)
??//?如果判斷達到了最大請求數(shù)量,?并且當前有限流需求
??if?prevDrop?!=?0?{
???return?drop
??}
??l.prevDrop.Store(time.Since(initTime))
?}
?return?drop
}
maxFlight
該函數(shù)是核心函數(shù). 其計算公式: MaxPass * MinRt * windows / 1000. maxPASS/minRT都是基于metric.RollingCounter來實現(xiàn)的, 限于篇幅原因這里就不再具體看其實現(xiàn)(想看的可以去看rolling_counter_test.go還是蠻容易理解的)
func?(l?*BBR)?maxFlight()?int64?{
?return?int64(math.Floor(float64(l.maxPASS()*l.minRT()*l.winBucketPerSec)/1000.0?+?0.5))
}
winBucketPerSec: 每秒內(nèi)的采樣數(shù)量,其計算方式:int64(time.Second)/(int64(conf.Window)/int64(conf.WinBucket)), conf.Window默認值10s, conf.WinBucket默認值100. 簡化下公式: 1/(10/100) = 10, 所以每秒內(nèi)的采樣數(shù)就是10
//?單個采樣窗口在一個采樣周期中的最大的請求數(shù),?默認的采樣窗口是10s,?采樣bucket數(shù)量100
func?(l?*BBR)?maxPASS()?int64?{
?rawMaxPass?:=?atomic.LoadInt64(&l.rawMaxPASS)
?if?rawMaxPass?>?0?&&?l.passStat.Timespan()?1?{
??return?rawMaxPass
?}
?//?遍歷100個采樣bucket,?找到采樣bucket中最大的請求數(shù)
?rawMaxPass?=?int64(l.passStat.Reduce(func(iterator?metric.Iterator)?float64?{
??var?result?=?1.0
??for?i?:=?1;?iterator.Next()?&&?i????bucket?:=?iterator.Bucket()
???count?:=?0.0
???for?_,?p?:=?range?bucket.Points?{
????count?+=?p
???}
???result?=?math.Max(result,?count)
??}
??return?result
?}))
?if?rawMaxPass?==?0?{
??rawMaxPass?=?1
?}
?atomic.StoreInt64(&l.rawMaxPASS,?rawMaxPass)
?return?rawMaxPass
}
//?單個采樣窗口中最小的響應時間
func?(l?*BBR)?minRT()?int64?{
?rawMinRT?:=?atomic.LoadInt64(&l.rawMinRt)
?if?rawMinRT?>?0?&&?l.rtStat.Timespan()?1?{
??return?rawMinRT
?}
?//?遍歷100個采樣bucket,?找到采樣bucket中最小的響應時間
?rawMinRT?=?int64(math.Ceil(l.rtStat.Reduce(func(iterator?metric.Iterator)?float64?{
??var?result?=?math.MaxFloat64
??for?i?:=?1;?iterator.Next()?&&?i????bucket?:=?iterator.Bucket()
???if?len(bucket.Points)?==?0?{
????continue
???}
???total?:=?0.0
???for?_,?p?:=?range?bucket.Points?{
????total?+=?p
???}
???avg?:=?total?/?float64(bucket.Count)
???result?=?math.Min(result,?avg)
??}
??return?result
?})))
?if?rawMinRT?<=?0?{
??rawMinRT?=?1
?}
?atomic.StoreInt64(&l.rawMinRt,?rawMinRT)
?return?rawMinRT
}
參考
alibaba/Sentinel-系統(tǒng)自適應限流: https://github.com/alibaba/Sentinel/wiki/%E7%B3%BB%E7%BB%9F%E8%87%AA%E9%80%82%E5%BA%94%E9%99%90%E6%B5%81
[2]go-kratos/kratos-自適應限流保護: https://github.com/go-kratos/kratos/blob/master/doc/wiki-cn/ratelimit.md
[3]alibaba/sentinel-golang-系統(tǒng)自適應流控: https://github.com/alibaba/sentinel-golang/wiki/%E7%B3%BB%E7%BB%9F%E8%87%AA%E9%80%82%E5%BA%94%E6%B5%81%E6%8E%A7
推薦閱讀
站長 polarisxu
自己的原創(chuàng)文章
不限于 Go 技術(shù)
職場和創(chuàng)業(yè)經(jīng)驗
Go語言中文網(wǎng)
每天為你
分享 Go 知識
Go愛好者值得關(guān)注
