沒想到,為了一個限流我寫了1萬字!
限流作為現(xiàn)在微服務中常見的穩(wěn)定性措施,在面試中肯定也是經常會被問到的,我在面試的時候也經常喜歡問一下你對限流算法知道哪一些?有看過源碼嗎?實現(xiàn)原理是什么?
第一部分先講講限流算法,最后再講講源碼的實現(xiàn)原理。
限流算法
關于限流的算法大體上可以分為四類:固定窗口計數(shù)器、滑動窗口計數(shù)器、漏桶(也有稱漏斗,英文Leaky bucket)、令牌桶(英文Token bucket)。
固定窗口
固定窗口,相比其他的限流算法,這應該是最簡單的一種。
它簡單地對一個固定的時間窗口內的請求數(shù)量進行計數(shù),如果超過請求數(shù)量的閾值,將被直接丟棄。
這個簡單的限流算法優(yōu)缺點都很明顯。優(yōu)點的話就是簡單,缺點舉個例子來說。
比如我們下圖中的黃色區(qū)域就是固定時間窗口,默認時間范圍是60s,限流數(shù)量是100。
如圖中括號內所示,前面一段時間都沒有流量,剛好后面30秒內來了100個請求,此時因為沒有超過限流閾值,所以請求全部通過,然后下一個窗口的20秒內同樣通過了100個請求。
所以變相的相當于在這個括號的40秒的時間內就通過了200個請求,超過了我們限流的閾值。

滑動窗口
為了優(yōu)化這個問題,于是有了滑動窗口算法,顧名思義,滑動窗口就是時間窗口在隨著時間推移不停地移動。
滑動窗口把一個固定時間窗口再繼續(xù)拆分成N個小窗口,然后對每個小窗口分別進行計數(shù),所有小窗口請求之和不能超過我們設定的限流閾值。
以下圖舉例子來說,假設我們的窗口拆分成了3個小窗口,小窗口都是20s,同樣基于上面的例子,當在第三個20s的時候來了100個請求,可以通過。
然后時間窗口滑動,下一個20s請求又來了100個請求,此時我們滑動窗口的60s范圍內請求數(shù)量肯定就超過100了啊,所以請求被拒絕。

漏桶Leaky bucket
漏桶算法,人如其名,他就是一個漏的桶,不管請求的數(shù)量有多少,最終都會以固定的出口流量大小勻速流出,如果請求的流量超過漏桶大小,那么超出的流量將會被丟棄。
也就是說流量流入的速度是不定的,但是流出的速度是恒定的。
這個和MQ削峰填谷的思想比較類似,在面對突然激增的流量的時候,通過漏桶算法可以做到勻速排隊,固定速度限流。
漏桶算法的優(yōu)勢是勻速,勻速是優(yōu)點也是缺點,很多人說漏桶不能處理突增流量,這個說法并不準確。
漏桶本來就應該是為了處理間歇性的突增流量,流量一下起來了,然后系統(tǒng)處理不過來,可以在空閑的時候去處理,防止了突增流量導致系統(tǒng)崩潰,保護了系統(tǒng)的穩(wěn)定性。
但是,換一個思路來想,其實這些突增的流量對于系統(tǒng)來說完全沒有壓力,你還在慢慢地勻速排隊,其實是對系統(tǒng)性能的浪費。
所以,對于這種有場景來說,令牌桶算法比漏桶就更有優(yōu)勢。
令牌桶token bucket
令牌桶算法是指系統(tǒng)以一定地速度往令牌桶里丟令牌,當一個請求過來的時候,會去令牌桶里申請一個令牌,如果能夠獲取到令牌,那么請求就可以正常進行,反之被丟棄。
現(xiàn)在的令牌桶算法,像Guava和Sentinel的實現(xiàn)都有冷啟動/預熱的方式,為了避免在流量激增的同時把系統(tǒng)打掛,令牌桶算法會在最開始一段時間內冷啟動,隨著流量的增加,系統(tǒng)會根據(jù)流量大小動態(tài)地調整生成令牌的速度,最終直到請求達到系統(tǒng)的閾值。
源碼舉例
我們以sentinel舉例,sentinel中統(tǒng)計用到了滑動窗口算法,然后也有用到漏桶、令牌桶算法。
滑動窗口
sentinel中就使用到了滑動窗口算法來進行統(tǒng)計,不過他的實現(xiàn)和我上面畫的圖有點不一樣,實際上sentinel中的滑動窗口用一個圓形來描述更合理一點。
前期就是創(chuàng)建節(jié)點,然后slot串起來就是一個責任鏈模式,StatisticSlot通過滑動窗口來統(tǒng)計數(shù)據(jù),F(xiàn)lowSlot是真正限流的邏輯,還有一些降級、系統(tǒng)保護的措施,最終形成了整個sentinel的限流方式。

滑動窗口的實現(xiàn)主要可以看LeapArray的代碼,默認的話定義了時間窗口的相關參數(shù)。
對于sentinel來說其實窗口分為秒和分鐘兩個級別,秒的話窗口數(shù)量是2,分鐘則是60個窗口,每個窗口的時間長度是1s,總的時間周期就是60s,分成60個窗口,這里我們就以分鐘級別的統(tǒng)計來說。
public abstract class LeapArray<T> {
//窗口時間長度,毫秒數(shù),默認1000ms
protected int windowLengthInMs;
//窗口數(shù)量,默認60
protected int sampleCount;
//毫秒時間周期,默認60*1000
protected int intervalInMs;
//秒級時間周期,默認60
private double intervalInSecond;
//時間窗口數(shù)組
protected final AtomicReferenceArray<WindowWrap<T>> array;
然后我們要看的就是它是怎么計算出當前窗口的,其實源碼里寫的聽清楚的,但是如果你按照之前想象把他當做一條直線延伸去想的話估計不太好理解。
首先計算數(shù)組索引下標和時間窗口時間這個都比較簡單,難點應該大部分在于第三點窗口大于old這個是什么鬼,詳細說下這幾種情況。
數(shù)組中的時間窗口是是空的,這個說明時間走到了我們初始化的時間之后了,此時new一個新的窗口通過CAS的方式去更新,然后返回這個新的窗口就好了。 第二種情況是剛好時間窗口的時間相等,那么直接返回,沒啥好說的 第三種情況就是比較難以理解的,可以參看兩條時間線的圖,就比較好理解了,第一次時間窗口走完了達到1200,然后圓形時間窗口開始循環(huán),新的時間起始位置還是1200,然后時間窗口的時間來到1676,B2的位置如果還是老的窗口那么就是600,所以我們要重置之前的時間窗口的時間為當前的時間。 最后一種一般情況不太可能發(fā)生,除非時鐘回撥這樣子
從這個我們可以發(fā)現(xiàn)就是針對每個WindowWrap時間窗口都進行了統(tǒng)計,最后實際上在后面的幾個地方都會用到時間窗口統(tǒng)計的QPS結果,這里就不再贅述了,知道即可。
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int) (timeId % array.length());
}
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
public WindowWrap<T> currentWindow(long timeMillis) {
//當前時間如果小于0,返回空
if (timeMillis < 0) {
return null;
}
//計算時間窗口的索引
int idx = calculateTimeIdx(timeMillis);
// 計算當前時間窗口的開始時間
long windowStart = calculateWindowStart(timeMillis);
while (true) {
//在窗口數(shù)組中獲得窗口
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* 比如當前時間是888,根據(jù)計算得到的數(shù)組窗口位置是個空,所以直接創(chuàng)建一個新窗口就好了
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* 這個更好了,剛好等于,直接返回就行
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* |_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* 這個要當成圓形理解就好了,之前如果是1200一個完整的圓形,然后繼續(xù)從1200開始,如果現(xiàn)在時間是1676,落在在B2的位置,
* 窗口開始時間是1600,獲取到的old時間其實會是600,所以肯定是過期了,直接重置窗口就可以了
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 這個不太可能出現(xiàn),嗯。。時鐘回撥
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
漏桶
sentinel主要根據(jù)FlowSlot中的流控進行流量控制,其中RateLimiterController就是漏桶算法的實現(xiàn),這個實現(xiàn)相比其他幾個還是簡單多了,稍微看一下應該就明白了。
首先計算出當前請求平攤到1s內的時間花費,然后去計算這一次請求預計時間 如果小于當前時間的話,那么以當前時間為主,返回即可 反之如果超過當前時間的話,這時候就要進行排隊等待了,等待的時候要判斷是否超過當前最大的等待時間,超過就直接丟棄 沒有超過就更新上一次的通過時間,然后再比較一次是否超時,還超時就重置時間,反之在等待時間范圍之內的話就等待,如果都不是那就可以通過了
public class RateLimiterController implements TrafficShapingController {
//最大等待超時時間,默認500ms
private final int maxQueueingTimeMs;
//限流數(shù)量
private final double count;
//上一次的通過時間
private final AtomicLong latestPassedTime = new AtomicLong(-1);
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
//時間平攤到1s內的花費
long costTime = Math.round(1.0 * (acquireCount) / count * 1000); // 1 / 100 * 1000 = 10ms
//計算這一次請求預計的時間
long expectedTime = costTime + latestPassedTime.get();
//花費時間小于當前時間,pass,最后通過時間 = 當前時間
if (expectedTime <= currentTime) {
latestPassedTime.set(currentTime);
return true;
} else {
//預計通過的時間超過當前時間,要進行排隊等待,重新獲取一下,避免出現(xiàn)問題,差額就是需要等待的時間
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
//等待時間超過最大等待時間,丟棄
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
//反之,可以更新最后一次通過時間了
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
//更新后再判斷,還是超過最大超時時間,那么就丟棄,時間重置
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
//在時間范圍之內的話,就等待
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}
令牌桶
最后是令牌桶,這個不在于實現(xiàn)的復制,而是你看源碼會發(fā)現(xiàn)都算的些啥玩意兒。。。sentinel的令牌桶實現(xiàn)基于Guava,代碼在WarmUpController中。
這個算法那些各種計算邏輯其實我們可以不管(因為我也沒看懂。。),但是流程上我們是清晰的就可以了。
幾個核心的參數(shù)看注釋,構造方法里那些計算邏輯暫時不管他是怎么算的(我也沒整明白,但是不影響我們理解),關鍵看canPass是怎么做的。
拿到當前窗口和上一個窗口的QPS 填充令牌,也就是往桶里丟令牌,然后我們先看填充令牌的邏輯
public class WarmUpController implements TrafficShapingController {
//限流QPS
protected double count;
//冷啟動系數(shù),默認=3
private int coldFactor;
//警戒的令牌數(shù)
protected int warningToken = 0;
//最大令牌數(shù)
private int maxToken;
//斜率,產生令牌的速度
protected double slope;
//存儲的令牌數(shù)量
protected AtomicLong storedTokens = new AtomicLong(0);
//最后一次填充令牌時間
protected AtomicLong lastFilledTime = new AtomicLong(0);
public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
construct(count, warmUpPeriodInSec, coldFactor);
}
public WarmUpController(double count, int warmUpPeriodInSec) {
construct(count, warmUpPeriodInSec, 3);
}
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
this.count = count;
this.coldFactor = coldFactor;
//stableInterval 穩(wěn)定產生令牌的時間周期,1/QPS
//warmUpPeriodInSec 預熱/冷啟動時間 ,默認 10s
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
//斜率的計算參考Guava,當做一個固定改的公式
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//當前時間窗口通過的QPS
long passQps = (long) node.passQps();
//上一個時間窗口QPS
long previousQps = (long) node.previousPassQps();
//填充令牌
syncToken(previousQps);
// 開始計算它的斜率
// 如果進入了警戒線,開始調整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
//當前的令牌超過警戒線,獲得超過警戒線的令牌數(shù)
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
}
填充令牌的邏輯如下:
拿到當前的時間,然后去掉毫秒數(shù),得到的就是秒級時間 判斷時間小于這里就是為了控制每秒丟一次令牌 然后就是 coolDownTokens去計算我們的冷啟動/預熱是怎么計算填充令牌的后面計算當前剩下的令牌數(shù)這個就不說了,減去上一次消耗的就是桶里剩下的令牌
protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
//去掉當前時間的毫秒
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
//控制每秒填充一次令牌
if (currentTime <= oldLastFillTime) {
return;
}
//當前的令牌數(shù)量
long oldValue = storedTokens.get();
//獲取新的令牌數(shù)量,包含添加令牌的邏輯,這就是預熱的邏輯
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {
//存儲的令牌數(shù)量當然要減去上一次消耗的令牌
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
最開始的事實因為 lastFilledTime和oldValue都是0,所以根據(jù)當前時間戳會得到一個非常大的數(shù)字,最后和maxToken取小的話就得到了最大的令牌數(shù),所以第一次初始化的時候就會生成maxToken的令牌之后我們假設系統(tǒng)的QPS一開始很低,然后突然飆高。所以開始的時候回一直走到高于警戒線的邏輯里去,然后 passQps又很低,所以會一直處于把令牌桶填滿的狀態(tài)(currentTime - lastFilledTime.get()會一直都是1000,也就是1秒),所以每次都會填充最大QPScount數(shù)量的令牌然后突增流量來了,QPS瞬間很高,慢慢地令牌數(shù)量就會消耗到警戒線之下,走到我們 if的邏輯里去,然后去按照count數(shù)量增加令牌
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
//水位低于警戒線,就生成令牌
if (oldValue < warningToken) {
//如果桶中令牌低于警戒線,根據(jù)上一次的時間差,得到新的令牌數(shù),因為去掉了毫秒,1秒生成的令牌就是閾值count
//第一次都是0的話,會生成count數(shù)量的令牌
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
//反之,如果是高于警戒線,要判斷QPS。因為QPS越高,生成令牌就要越慢,QPS低的話生成令牌要越快
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
//不要超過最大令牌數(shù)
return Math.min(newValue, maxToken);
}
上面的邏輯理順之后,我們就可以繼續(xù)看限流的部分邏輯:
令牌計算的邏輯完成,然后判斷是不是超過警戒線,按照上面的說法,低QPS的狀態(tài)肯定是一直超過的,所以會根據(jù)斜率來計算出一個 warningQps,因為我們處于冷啟動的狀態(tài),所以這個階段就是要根據(jù)斜率來計算出一個QPS數(shù)量,讓流量慢慢地達到系統(tǒng)能承受的峰值。舉個例子,如果count是100,那么在QPS很低的情況下,令牌桶一直處于滿狀態(tài),但是系統(tǒng)會控制QPS,實際通過的QPS就是warningQps,根據(jù)算法可能只有10或者20(怎么算的不影響理解)。QPS主鍵提高的時候,aboveToken再逐漸變小,整個warningQps就在逐漸變大,直到走到警戒線之下,到了else邏輯里。流量突增的情況,就是 else邏輯里低于警戒線的情況,我們令牌桶在不停地根據(jù)count去增加令牌,這時候消耗令牌的速度超過我們生成令牌的速度,可能就會導致一直處于警戒線之下,這時候判斷當然就需要根據(jù)最高QPS去判斷限流了。
long restToken = storedTokens.get();
if (restToken >= warningToken) {
//當前的令牌超過警戒線,獲得超過警戒線的令牌數(shù)
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
所以,按照低QPS到突增高QPS的流程,來想象一下這個過程:
剛開始,系統(tǒng)的QPS非常低,初始化我們就直接把令牌桶塞滿了 然后這個低QPS的狀態(tài)持續(xù)了一段時間,因為我們一直會填充最大QPS數(shù)量的令牌(因為取最小值,所以其實桶里令牌基本不會有變化),所以令牌桶一直處于滿的狀態(tài),整個系統(tǒng)的限流也處于一個比較低的水平
這以上的部分一直處于警戒線之上,實際上就是叫做冷啟動/預熱的過程。
接著系統(tǒng)的QPS突然激增,令牌消耗速度太快,就算我們每次增加最大QPS數(shù)量的令牌任然無法維持消耗,所以桶里的令牌在不斷低減少,這個時候,冷啟動階段的限制QPS也在不斷地提高,最后直到桶里的令牌低于警戒線 低于警戒線之后,系統(tǒng)就會按照最高QPS去限流,這個過程就是系統(tǒng)在逐漸達到最高限流的過程
那這樣一來,實際就達到了我們處理突增流量的目的,整個系統(tǒng)在漫漫地適應突然飆高的QPS,然后最終達到系統(tǒng)的QPS閾值。
最后,如果QPS回復正常,那么又會逐漸回到警戒線之上,就回到了最開始的過程。

總結
因為算法如果單獨說的話都比較簡單,一說大家都可以聽明白,不需要幾個字就能說明白,所以還是得弄點源碼看看別人是怎么玩的,所以盡管我很討厭放源碼,但是還是不得不干。
光靠別人說一點其實有點看不明白,按照順序讀一遍的話心里就有數(shù)了。
那源碼的話最難以理解的就是令牌桶的實現(xiàn)了,說實話那幾個計算的邏輯我看了好幾遍不知道他算的什么鬼,但是思想我們理解就行了,其他的邏輯相對來說就比較容易理解。
