Sentinel-Go 源碼系列(三)滑動時間窗口算法的工程實現(xiàn)
要說現(xiàn)在工程師最重要的能力,我覺得工程能力要排第一。
就算現(xiàn)在大廠面試經(jīng)常要手撕算法,也是更偏向考查代碼工程實現(xiàn)的能力,之前在群里看到這樣的圖片,就覺得很離譜(大概率是假的~)。

算法與工程實現(xiàn)
在 Sentinel-Go 中,一個很核心的算法是流控(限流)算法。
流控可能每個人都聽過,但真要手寫一個,還是有些困難。為什么流控算法難寫?以我的感覺是算法和工程實現(xiàn)上存在一定差異,雖然算法好理解,但卻沒法照著實現(xiàn)。
舉個例子,令牌桶算法很好理解,只需給定一個桶,以恒定的速率往桶內(nèi)放令牌,滿了則丟棄,執(zhí)行任務(wù)前先去桶里拿令牌,只有拿到令牌才可以執(zhí)行,否則拒絕。
如果實現(xiàn)令牌桶,按道理應(yīng)該用一個單獨線程(或進程)往桶里放令牌,業(yè)務(wù)線程去桶里取,但真要這么實現(xiàn),怎么保證這個單獨線程能穩(wěn)定執(zhí)行,萬一掛了豈不是很危險?
所以工程實現(xiàn)上和算法原本肯定存在一定的差異,這也是為什么需要深入源碼的一個原因。
滑動時間窗口的演進
通常來說,流控的度量是按每秒的請求數(shù),也就是 QPS
QPS:query per second,指每秒查詢數(shù),當然他的意義已經(jīng)泛化了,不再特指查詢,可以泛指所有請求。如果非要區(qū)分,TPS 指每秒事務(wù)數(shù),即寫入數(shù),或 RPS,每秒請求數(shù),本文不分這么細,統(tǒng)一叫QPS。
當然也有按并發(fā)數(shù)來度量,并發(fā)數(shù)的流控就非常簡單
并發(fā)數(shù)流控
并發(fā)是一個瞬時概念,它跟時間沒有關(guān)系。和進程中的線程數(shù)、協(xié)程數(shù)一樣,每次取的時候只能拿到一個瞬間的快照,但可能很快就變化了。
并發(fā)數(shù)怎么定義?可以近似認為進入業(yè)務(wù)代碼開始就算一個并發(fā),執(zhí)行完這個并發(fā)就消失。

這樣說來,實現(xiàn)就非常簡單了,只需要定義一個全局變量,責任鏈開始時對這個變量原子增1,并獲取當前并發(fā)數(shù)的一個快照,判斷并發(fā)數(shù)是否超限,如果超限則直接阻斷,執(zhí)行完了別忘了原子減1即可,由于太過簡單,就不需要放代碼了。
固定時間窗口
參考并發(fā)數(shù)流控,當需要度量 QPS 時,是否也可以利用這樣的思想呢?
由于 QPS 有時間的度量,第一直覺是和并發(fā)數(shù)一樣弄個變量,再起個單獨線程每隔 1s 重置這個變量。
但單獨線程始終不放心,需要稍微改一下。
如果系統(tǒng)有一個起始時間,每次請求時,獲取當前時間,兩者之差,就能算出當前處于哪個時間窗口,這個時間窗口單獨計數(shù)即可。

如果稍微思考下,你會發(fā)現(xiàn)問題不簡單,如下圖,10t 到20t 只有60個請求,20t到30t之間只有80個請求,但有可能16t到26t之間有110個請求,這就很有可能把系統(tǒng)打垮。

滑動時間窗口
為了解決上面的問題,工程師想出了一個好辦法:別固定時間窗口,以當前時間往前推算窗口

但問題又來了,這該怎么實現(xiàn)呢?
滑動時間窗口工程實現(xiàn)
在工程實現(xiàn)上,可以將時間劃分為細小的采樣窗口,緩存一段時間的采樣窗口,這樣每當請求來的時候,只需要往前拿一段時間的采樣窗口,然后求和就能拿到總的請求數(shù)。

Sentinel-Go 滑動時間窗口的實現(xiàn)
前方代碼高能預(yù)警~
Sentinel-Go 是基于 LeapArray 實現(xiàn)的滑動窗口,其數(shù)據(jù)結(jié)構(gòu)如下
type?LeapArray?struct?{
?bucketLengthInMs?uint32?//?bucket大小
?sampleCount??????uint32?//?bucket數(shù)量
?intervalInMs?????uint32?//?窗口總大小
?array????????????*AtomicBucketWrapArray?//?bucket數(shù)組
?updateLock?mutex?//?更新鎖
}
type?AtomicBucketWrapArray?struct?{
?base?unsafe.Pointer?//?數(shù)組的起始地址
?length?int?//?長度,不能改變
?data???[]*BucketWrap?//?真正bucket的數(shù)據(jù)
}
type?BucketWrap?struct?{
?BucketStart?uint64?//?bucket起始時間
?Value?atomic.Value?//?bucket數(shù)據(jù)結(jié)構(gòu),例如?MetricBucket
}
type?MetricBucket?struct?{
?counter????????[base.MetricEventTotal]int64?//?計數(shù)數(shù)組,可放不同類型
?minRt??????????int64?//?最小RT
?maxConcurrency?int32?//?最大并發(fā)數(shù)
}
再看下是如何寫入指標的,例如當流程正常通過時
//?①
sn.AddCount(base.MetricEventPass,?int64(count))
//?②
func?(bla?*BucketLeapArray)?AddCount(event?base.MetricEvent,?count?int64)?{
?bla.addCountWithTime(util.CurrentTimeMillis(),?event,?count)
}
//?③
func?(bla?*BucketLeapArray)?addCountWithTime(now?uint64,?event?base.MetricEvent,?count?int64)?{
?b?:=?bla.currentBucketWithTime(now)
?if?b?==?nil?{
??return
?}
?b.Add(event,?count)
}
//?④
func?(mb?*MetricBucket)?Add(event?base.MetricEvent,?count?int64)?{
?if?event?>=?base.MetricEventTotal?||?event?0?{
??logging.Error(errors.Errorf("Unknown?metric?event:?%v",?event),?"")
??return
?}
?if?event?==?base.MetricEventRt?{
??mb.AddRt(count)
??return
?}
?mb.addCount(event,?count)
}
//?⑤
func?(mb?*MetricBucket)?addCount(event?base.MetricEvent,?count?int64)?{
?atomic.AddInt64(&mb.counter[event],?count)
}
取到相應(yīng)的 bucket,然后寫入相應(yīng) event 的 count,對 RT 會特殊處理,因為有一個最小 RT 需要處理。
重點看是如何取到相應(yīng)的 bucket 的:
func?(bla?*BucketLeapArray)?currentBucketWithTime(now?uint64)?*MetricBucket?{
?//?①根據(jù)當前時間取bucket
?curBucket,?err?:=?bla.data.currentBucketOfTime(now,?bla)
?...
?b,?ok?:=?mb.(*MetricBucket)
?if?!ok?{
??...
??return?nil
?}
?return?b
}
func?(la?*LeapArray)?currentBucketOfTime(now?uint64,?bg?BucketGenerator)?(*BucketWrap,?error)?{
?...
?//?②計算index?=?(now?/?bucketLengthInMs)?%?LeapArray.array.length
?idx?:=?la.calculateTimeIdx(now)
?//?③計算bucket開始時間?=?now?-?(now?%?bucketLengthInMs)
?bucketStart?:=?calculateStartTime(now,?la.bucketLengthInMs)
?for?{?
??old?:=?la.array.get(idx)
??if?old?==?nil?{?//?④未使用,直接返回
???newWrap?:=?&BucketWrap{
????BucketStart:?bucketStart,
????Value:???????atomic.Value{},
???}
???newWrap.Value.Store(bg.NewEmptyBucket())
???if?la.array.compareAndSet(idx,?nil,?newWrap)?{
????return?newWrap,?nil
???}?else?{
????runtime.Gosched()
???}
??}?else?if?bucketStart?==?atomic.LoadUint64(&old.BucketStart)?{?//?⑤剛好取到是當前bucket,返回
???return?old,?nil
??}?else?if?bucketStart?>?atomic.LoadUint64(&old.BucketStart)?{?//?⑥取到了舊的bucket,重置使用
???if?la.updateLock.TryLock()?{
????old?=?bg.ResetBucketTo(old,?bucketStart)
????la.updateLock.Unlock()
????return?old,?nil
???}?else?{
????runtime.Gosched()
???}
??}?else?if?bucketStart?//?⑦取到了比當前還新的bucket,總共只有一個bucket時,并發(fā)情況可能會出現(xiàn)這種情況,其他情況不可能,直接報錯
???if?la.sampleCount?==?1?{
????return?old,?nil
???}
???
???return?nil,?errors.New(fmt.Sprintf("Provided?time?timeMillis=%d?is?already?behind?old.BucketStart=%d.",?bucketStart,?old.BucketStart))
??}
?}
}
舉個直觀的例子,看如何拿到 bucket:

假設(shè) B2 取出來是 nil,則 new 一個 bucket 通過 compareAndSet 寫入,保證線程安全,如果別別的線程先寫入,這里會執(zhí)行失敗,調(diào)用 runtime.Gosched(),讓出時間片,進入下一次循環(huán) 假設(shè)取出 B2 的開始時間是3400,與計算的相同,則直接使用 假設(shè)取出的 B2 的開始時間小于 3400,說明這個 bucket 太舊了,需要覆蓋,使用更新鎖來更新,保證線程安全,如果拿不到鎖,也讓出時間片,進入下一次循環(huán) 假設(shè)取出 B2 的開始時間大于3400,說明已經(jīng)有其他線程更新了,而 bucketLengthInMs 通常遠遠大于鎖的獲取時間,所以這里只考慮只有一個 bucket 的情況直接返回,其他情況報錯
回到 QPS 計算:
qps?:=?stat.InboundNode().GetQPS(base.MetricEventPass)
該方法會先計算一個起始時間范圍
func?(m?*SlidingWindowMetric)?getBucketStartRange(timeMs?uint64)?(start,?end?uint64)?{
?curBucketStartTime?:=?calculateStartTime(timeMs,?m.real.BucketLengthInMs())
?end?=?curBucketStartTime
?start?=?end?-?uint64(m.intervalInMs)?+?uint64(m.real.BucketLengthInMs())
?return
}
例如當前時間為3500,則計算出
end = 3400 start = 3400 - 1200 ?+ 200 = 2400

然后遍歷所有 bucket,把在這個范圍內(nèi)的 bucket 都拿出來,計算 QPS,只需要相加即可。
最后
本節(jié)從滑動窗口流控算法的工程實現(xiàn)演進到 Sentinel-Go 里滑動窗口的實現(xiàn),從 Sentinel-Go 的實現(xiàn)上看到,還得考慮內(nèi)存的使用,并發(fā)控制等等,如果完全寫出來,還是非常不容易的。
《Sentinel-Go源碼系列》已經(jīng)寫了三篇,只介紹了兩個知識點:責任鏈模式、滑動窗口限流,后續(xù)還有對象池等,但這其實和 Sentinel-Go 關(guān)系不是很大,到時候單獨成文,就不放在本系列里了。
本文算是一個結(jié)束,與其說是結(jié)束,不如說是一個開始。
搜索關(guān)注微信公眾號"捉蟲大師",后端技術(shù)分享,架構(gòu)設(shè)計、性能優(yōu)化、源碼閱讀、問題排查、踩坑實踐。 另外我也準備組建一個技術(shù)交流群,但現(xiàn)在也不知道會有多少人,所以大家先加我微信 MrRoshi,備注加群,一起交流技術(shù),超過一定人數(shù)我就拉一個~
