常用的限流框架都在這里了!
點(diǎn)擊上方 藍(lán)字 關(guān)注我們!

每天 11 點(diǎn)更新文章,餓了點(diǎn)外賣,點(diǎn)擊 ??《無門檻外賣優(yōu)惠券,每天免費(fèi)領(lǐng)!》

來源:fredal.xin/netflix-concuurency-limits
自適應(yīng)限流 TCP Vegas netflix-concuurency-limits alpha , beta & threshold 變量queueSize 動(dòng)態(tài)調(diào)整函數(shù) 平滑遞減 smoothingDecrease
自適應(yīng)限流
一般的限流常常需要指定一個(gè)固定值(qps)作為限流開關(guān)的閾值,這個(gè)值一是靠經(jīng)驗(yàn)判斷,二是靠通過大量的測(cè)試數(shù)據(jù)得出。但這個(gè)閾值,在流量激增、系統(tǒng)自動(dòng)伸縮或者某某commit了一段有毒代碼后就有可能變得不那么合適了。并且一般業(yè)務(wù)方也不太能夠正確評(píng)估自己的容量,去設(shè)置一個(gè)合適的限流閾值。
而此時(shí)自適應(yīng)限流就是解決這樣的問題的,限流閾值不需要手動(dòng)指定,也不需要去預(yù)估系統(tǒng)的容量,并且閾值能夠隨著系統(tǒng)相關(guān)指標(biāo)變化而變化。
自適應(yīng)限流算法借鑒了TCP擁塞算法,根據(jù)各種指標(biāo)預(yù)估限流的閾值,并且不斷調(diào)整。大致獲得的效果如下:

從圖上可以看到,首先以一個(gè)降低的初始并發(fā)值發(fā)送請(qǐng)求,同時(shí)通過增大限流窗口來探測(cè)系統(tǒng)更高的并發(fā)性。而一旦延遲增加到一定程度了,又會(huì)退回到較小的限流窗口。循環(huán)往復(fù)持續(xù)探測(cè)并發(fā)極限,從而產(chǎn)生類似鋸齒狀的時(shí)間關(guān)系函數(shù)。
TCP Vegas
vegas是一種主動(dòng)調(diào)整cwnd的擁塞控制算法,主要是設(shè)置兩個(gè)閾值alpha 和 beta,然后通過計(jì)算目標(biāo)速率和實(shí)際速率的差diff,再比較差diff與alpha和beta的關(guān)系,對(duì)cwnd進(jìn)行調(diào)節(jié)。偽代碼如下:
diff = cwnd*(1-baseRTT/RTT)
if (diff < alpha)
set: cwnd = cwnd + 1
else if (diff >= beta)
set: cwnd = cwnd - 1
else
set: cwnd = cwnd
其中baseRTT指的是測(cè)量的最小往返時(shí)間,RTT指的是當(dāng)前測(cè)量的往返時(shí)間,cwnd指的是當(dāng)前的TCP窗口大小。通常在tcp中alpha會(huì)被設(shè)置成2-3,beta會(huì)被設(shè)置成4-6。這樣子,cwnd就保持在了一個(gè)平衡的狀態(tài)。
netflix-concuurency-limits
concuurency-limits是netflix推出的自適應(yīng)限流組件,借鑒了TCP相關(guān)擁塞控制算法,主要是根據(jù)請(qǐng)求延時(shí),及其直接影響到的排隊(duì)長(zhǎng)度來進(jìn)行限流窗口的動(dòng)態(tài)調(diào)整。
alpha , beta & threshold
vegas算法實(shí)現(xiàn)在了VegasLimit類中。先看一下初始化相關(guān)代碼:
private int initialLimit = 20;
private int maxConcurrency = 1000;
private MetricRegistry registry = EmptyMetricRegistry.INSTANCE;
private double smoothing = 1.0;
private Function<Integer, Integer> alphaFunc = (limit) -> 3 * LOG10.apply(limit.intValue());
private Function<Integer, Integer> betaFunc = (limit) -> 6 * LOG10.apply(limit.intValue());
private Function<Integer, Integer> thresholdFunc = (limit) -> LOG10.apply(limit.intValue());
private Function<Double, Double> increaseFunc = (limit) -> limit + LOG10.apply(limit.intValue());
private Function<Double, Double> decreaseFunc = (limit) -> limit - LOG10.apply(limit.intValue());
這里首先定義了一個(gè)初始化值initialLimit為20,以及極大值maxConcurrency1000。其次是三個(gè) 閾值函數(shù)alphaFunc,betaFunc以及thresholdFunc。最后是兩個(gè)增減函數(shù)increaseFunc和decreaseFunc。函數(shù)都是基于當(dāng)前的并發(fā)值limit做運(yùn)算的。
alphaFunc可類比vegas算法中的alpha,此處的實(shí)現(xiàn)是3*log limit。limit值從初始20增加到極大1000時(shí)候,相應(yīng)的alpha從3.9增加到了9。 betaFunc則可類比為vegas算法中的beta,此處的實(shí)現(xiàn)是6*log limit。limit值從初始20增加到極大1000時(shí)候,相應(yīng)的alpha從7.8增加到了18。 thresholdFunc算是新增的一個(gè)函數(shù),表示一個(gè)較為初始的閾值,小于這個(gè)值的時(shí)候limit會(huì)采取激進(jìn)一些的增量算法。這里的實(shí)現(xiàn)是1倍的log limit。mit值從初始20增加到極大1000時(shí)候,相應(yīng)的alpha從1.3增加到了3。
這三個(gè)函數(shù)值可以認(rèn)為確定了動(dòng)態(tài)調(diào)整函數(shù)的四個(gè)區(qū)間范圍。當(dāng)變量queueSize = limit × (1 ? RTTnoLoad/RTTactual)落到這四個(gè)區(qū)間的時(shí)候應(yīng)用不同的調(diào)整函數(shù)。
變量queueSize
其中變量為queueSize,計(jì)算方法即為limit × (1 ? RTTnoLoad/RTTactual),為什么這么計(jì)算其實(shí)稍加領(lǐng)悟一下即可。

我們把系統(tǒng)處理請(qǐng)求的過程想象為一個(gè)水管,到來的請(qǐng)求是往這個(gè)水管灌水,當(dāng)系統(tǒng)處理順暢的時(shí)候,請(qǐng)求不需要排隊(duì),直接從水管中穿過,這個(gè)請(qǐng)求的RT是最短的,即RTTnoLoad;
反之,當(dāng)請(qǐng)求堆積的時(shí)候,那么處理請(qǐng)求的時(shí)間則會(huì)變?yōu)椋号抨?duì)時(shí)間+最短處理時(shí)間,即RTTactual = inQueueTime + RTTnoLoad。而顯然排隊(duì)的隊(duì)列長(zhǎng)度為 總排隊(duì)時(shí)間/每個(gè)請(qǐng)求的處理時(shí)間及queueSize = (limit * inQueueTime) / (inQueueTime + RTTnoLoad) = limit × (1 ? RTTnoLoad/RTTactual)。
再舉個(gè)栗子,因?yàn)榧僭O(shè)當(dāng)前延時(shí)即為最佳延時(shí),那么自然是不用排隊(duì)的,即queueSize=0。而假設(shè)當(dāng)前延時(shí)為最佳延時(shí)的一倍的時(shí)候,可以認(rèn)為處理能力折半,100個(gè)流量進(jìn)來會(huì)有一半即50個(gè)請(qǐng)求在排隊(duì),及queueSize= 100 * (1 ? 1/2)=50。
動(dòng)態(tài)調(diào)整函數(shù)
調(diào)整函數(shù)中最重要的即增函數(shù)與減函數(shù)。從初始化的代碼中得知,增函數(shù)increaseFunc實(shí)現(xiàn)為limit+log limit,減函數(shù)decreaseFunc實(shí)現(xiàn)為limit-log limit,相對(duì)來說增減都是比較保守的。
看一下應(yīng)用動(dòng)態(tài)調(diào)整函數(shù)的相關(guān)代碼:
private int updateEstimatedLimit(long rtt, int inflight, boolean didDrop) {
final int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double)rtt_noload / rtt));
double newLimit;
// Treat any drop (i.e timeout) as needing to reduce the limit
// 發(fā)現(xiàn)錯(cuò)誤直接應(yīng)用減函數(shù)decreaseFunc
if (didDrop) {
newLimit = decreaseFunc.apply(estimatedLimit);
// Prevent upward drift if not close to the limit
} else if (inflight * 2 < estimatedLimit) {
return (int)estimatedLimit;
} else {
int alpha = alphaFunc.apply((int)estimatedLimit);
int beta = betaFunc.apply((int)estimatedLimit);
int threshold = this.thresholdFunc.apply((int)estimatedLimit);
// Aggressive increase when no queuing
if (queueSize <= threshold) {
newLimit = estimatedLimit + beta;
// Increase the limit if queue is still manageable
} else if (queueSize < alpha) {
newLimit = increaseFunc.apply(estimatedLimit);
// Detecting latency so decrease
} else if (queueSize > beta) {
newLimit = decreaseFunc.apply(estimatedLimit);
// We're within he sweet spot so nothing to do
} else {
return (int)estimatedLimit;
}
}
newLimit = Math.max(1, Math.min(maxLimit, newLimit));
newLimit = (1 - smoothing) * estimatedLimit + smoothing * newLimit;
if ((int)newLimit != (int)estimatedLimit && LOG.isDebugEnabled()) {
LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={}",
(int)newLimit,
TimeUnit.NANOSECONDS.toMicros(rtt_noload) / 1000.0,
TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0,
queueSize);
}
estimatedLimit = newLimit;
return (int)estimatedLimit;
}
動(dòng)態(tài)調(diào)整函數(shù)規(guī)則如下:
當(dāng)變量queueSize < threshold時(shí),選取較激進(jìn)的增量函數(shù),newLimit = limit+beta 當(dāng)變量queueSize < alpha時(shí),需要增大限流窗口,選擇增函數(shù)increaseFunc,即newLimit = limit + log limit 當(dāng)變量queueSize處于alpha,beta之間時(shí)候,limit不變 當(dāng)變量queueSize大于beta時(shí)候,需要收攏限流窗口,選擇減函數(shù)decreaseFunc,即newLimit = limit - log limit
平滑遞減 smoothingDecrease
注意到可以設(shè)置變量smoothing,這里初始值為1,表示平滑遞減不起作用。
如果有需要的話可以按需設(shè)置,比如設(shè)置smoothing為0.5時(shí)候,那么效果就是采用減函數(shù)decreaseFunc時(shí)候效果減半,實(shí)現(xiàn)方式為newLimitAfterSmoothing = 0.5 newLimit + 0.5 limit。
往期推薦

看完文章,餓了點(diǎn)外賣,點(diǎn)擊 ??《無門檻外賣優(yōu)惠券,每天免費(fèi)領(lǐng)!》
END
若覺得文章對(duì)你有幫助,隨手轉(zhuǎn)發(fā)分享,也是我們繼續(xù)更新的動(dòng)力。
長(zhǎng)按二維碼,掃掃關(guān)注哦
?「C語言中文網(wǎng)」官方公眾號(hào),關(guān)注手機(jī)閱讀教程 ?
目前收集的資料包括: Java,Python,C/C++,Linux,PHP,go,C#,QT,git/svn,人工智能,大數(shù)據(jù),單片機(jī),算法,小程序,易語言,安卓,ios,PPT,軟件教程,前端,軟件測(cè)試,簡(jiǎn)歷,畢業(yè)設(shè)計(jì),公開課 等分類,資源在不斷更新中...

