精講高并發(fā)核心編程,限流原理與實(shí)戰(zhàn),限流策略原理與參考實(shí)現(xiàn)
限流原理與實(shí)戰(zhàn)
在通信領(lǐng)域中,限流技術(shù)(Time Limiting)被用來控制網(wǎng)絡(luò)接口收發(fā)通信數(shù)據(jù)的速率,實(shí)現(xiàn)通信時(shí)的優(yōu)化性能、較少延遲和提高帶寬等。
互聯(lián)網(wǎng)領(lǐng)域中借鑒了通信領(lǐng)域的限流概念,用來控制在高并發(fā)、大流量的場景中對服務(wù)接口請求的速率,比如雙十一秒殺、搶購、搶票、搶單等場景。
舉一個(gè)具體的例子,假設(shè)某個(gè)接口能夠扛住的QPS為10 000,這時(shí)有20 000個(gè)請求進(jìn)來,經(jīng)過限流模塊,會先放10 000個(gè)請求,其余的請求會阻塞一段時(shí)間。不簡單粗暴地返回404,讓客戶端重試,同時(shí)又能起到流量削峰的作用。
每個(gè)API接口都是有訪問上限的,當(dāng)訪問頻率或者并發(fā)量超過其承受范圍時(shí),就必須通過限流來保證接口的可用性或者降級可用性,給接口安裝上保險(xiǎn)絲,以防止非預(yù)期的請求對系統(tǒng)壓力過大而引起系統(tǒng)癱瘓。
接口限流的算法主要有3種,分別是計(jì)數(shù)器限流、漏桶算法和令牌桶算法。接下來為大家一一介紹。
限流策略原理與參考實(shí)現(xiàn)
在高并發(fā)訪問的情況下,通常會通過限流的手段來控制流量問題,以保證服務(wù)器處于正常壓力下。
首先給大家介紹3種常見的限流策略。
?3種限流策略:計(jì)數(shù)器、漏桶和令牌桶
限流的手段通常有計(jì)數(shù)器、漏桶和令牌桶。注意限流和限速(所有請求都會處理)的差別,視業(yè)務(wù)場景而定。
(1)計(jì)數(shù)器:在一段時(shí)間間隔內(nèi)(時(shí)間窗),處理請求的最大數(shù)量固定,超過部分不做處理。
(2)漏桶:漏桶大小固定,處理速度固定,但請求進(jìn)入的速度不固定(在突發(fā)情況請求過多時(shí),會丟棄過多的請求)。
(3)令牌桶:令牌桶的大小固定,令牌的產(chǎn)生速度固定,但是小耗令牌(請求)的速度不固定(可以應(yīng)對某些時(shí)間請求過多的情況)。每個(gè)請求都會從令牌桶中取出令牌,如果沒有令牌,就丟棄這次請求。
?計(jì)數(shù)器限流原理和Java參考實(shí)現(xiàn)
計(jì)數(shù)器限流的原理非常簡單:在一個(gè)時(shí)間窗口(間隔)內(nèi),所處理的請求的最大數(shù)量是有限制的,對超過限制的部分請求不做處理。
下面的代碼是計(jì)數(shù)器限流算法的一個(gè)簡單的演示實(shí)現(xiàn)和測試用例。
package com.crazymaker.springcloud.ratelimit;
...
//計(jì)速器,限速
@Slf4j
public class CounterLimiter
{
//起始時(shí)間
private static long startTime = System.currentTimeMillis();
//時(shí)間區(qū)間的時(shí)間間隔毫秒
private static long interval = 1000;
//每秒限制數(shù)量
private static long maxCount = 2;
//累加器
private static AtomicLong accumulator = new AtomicLong();
//計(jì)數(shù)判斷,是否超出限制
private static long tryAcquire(long taskId, int turn)
{
long nowTime = System.currentTimeMillis();
//在時(shí)間區(qū)間之內(nèi)
if (nowTime < startTime + interval)
{
long count = accumulator.incrementAndGet();
if (count <= maxCount)
{
return count;
} else
{
return -count;
}
} else
{
//在時(shí)間區(qū)間之外
synchronized (CounterLimiter.class)
{
log.info("新時(shí)間區(qū)到了,taskId{}, turn {}..", taskId, turn);
//再一次判斷,防止重復(fù)初始化
if (nowTime > startTime + interval)
{
accumulator.set(0);
startTime = nowTime;
}
}
return 0;
}
}
//線程池,用于多線程模擬測試
private ExecutorService pool = Executors.newFixedThreadPool(10);
@Test
public void testLimit()
{ //被限制的次數(shù)
AtomicInteger limited = new AtomicInteger(0);
//線程數(shù)
final int threads = 2;
//每條線程的執(zhí)行輪數(shù)
final int turns = 20;
//同步器
CountDownLatch countDownLatch = new CountDownLatch(threads);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++)
{
pool.submit(() ->
{
try
{
for (int j = 0; j < turns; j++)
{
long taskId = Thread.currentThread().getId();
long index = tryAcquire(taskId, j);
if (index <= 0)
{
//被限制的次數(shù)累積
limited.getAndIncrement();
}
Thread.sleep(200);
}
} catch (Exception e)
{
e.printStackTrace();
}
//等待所有線程結(jié)束
countDownLatch.countDown();
});
}
try
{
countDownLatch.await();
} catch (InterruptedException e)
{
e.printStackTrace();
}
float time = (System.currentTimeMillis() - start) / 1000F;
//輸出統(tǒng)計(jì)結(jié)果
log.info("限制的次數(shù)為:" + limited.get() +
",通過的次數(shù)為:" + (threads *turns - limited.get()));
log.info("限制的比例為:" +
(float) limited.get() / (float) (threads *turns));
log.info("運(yùn)行的時(shí)長為:" + time);
}
}以上代碼使用兩條線程,每條線程各運(yùn)行20次,每一次運(yùn)行休眠200毫秒,總計(jì)耗時(shí)4秒,運(yùn)行40次,限流的輸出結(jié)果具體如下:
[pool-2-thread-2] INFO c.c.s.ratelimit.CounterLimiter - 新時(shí)間區(qū)到了,taskId16, turn 5..
[pool-2-thread-1] INFO c.c.s.ratelimit.CounterLimiter - 新時(shí)間區(qū)到了,taskId15, turn 5..
[pool-2-thread-2] INFO c.c.s.ratelimit.CounterLimiter - 新時(shí)間區(qū)到了,taskId16, turn 10..
[pool-2-thread-2] INFO c.c.s.ratelimit.CounterLimiter - 新時(shí)間區(qū)到了,taskId16, turn 15..
[main] INFO c.c.s.ratelimit.CounterLimiter - 限制的次數(shù)為:32,通過的次數(shù)為:8
[main] INFO c.c.s.ratelimit.CounterLimiter - 限制的比例為:0.8
[main] INFO c.c.s.ratelimit.CounterLimiter - 運(yùn)行的時(shí)長為:4.104大家可以自行調(diào)整參數(shù),運(yùn)行以上自驗(yàn)證程序并觀察實(shí)驗(yàn)結(jié)果,體驗(yàn)一下計(jì)數(shù)器限流的效果。
漏桶限流原理和Java參考實(shí)現(xiàn)
漏桶限流的基本原理:水(對應(yīng)請求)從進(jìn)水口進(jìn)入漏桶里,漏桶以一定的速度出水(請求放行),當(dāng)水流入的速度過大時(shí),桶內(nèi)的總水量大于桶容量會直接溢出,請求被拒絕,如圖9-1所示。
大致的漏桶限流規(guī)則如下:
(1)水通過進(jìn)水口(對應(yīng)客戶端請求)以任意速率流入漏桶。
(2)漏桶的容量是固定的,出水(放行)速率也是固定的。
(3)漏桶容量是不變的,如果處理速度太慢,桶內(nèi)水量會超出桶的容量,后面流入的水就會溢出,表示請求拒絕。

圖9-1 漏桶原理示意圖
漏桶的Java參考實(shí)現(xiàn)代碼如下:
package com.crazymaker.springcloud.ratelimit;
//省略import
//漏桶限流
@Slf4j
public class LeakBucketLimiter
{
//計(jì)算的起始時(shí)間
private static long lastOutTime = System.currentTimeMillis();
//流出速率每秒2次
private static long rate = 2; //剩余的水量
private static long water = 0;
//返回值說明
//false:沒有被限制到
//true:被限流
public static synchronized boolean tryAcquire(long taskId, int turn)
{
long nowTime = System.currentTimeMillis();
//過去的時(shí)間
long pastTime = nowTime - lastOutTime;
//漏出水量,按照恒定的速度不斷流出
//漏出的水 = 過去的時(shí)間 *預(yù)設(shè)速率
long outWater = pastTime *rate / 1000;
//剩余的水量 = 上次遺留的水量 - 漏出去的水
water = water - outWater;
log.info("water {} pastTime {} outWater {} ",
water, pastTime, outWater);
//糾正剩余的水量
if (water < 0)
{
water = 0;
}
//若剩余的水量小于等于1,則放行
if (water <= 1)
{
//更新起始時(shí)間,為了下次使用
lastOutTime = nowTime;
//增加遺留的水量
water ++;
return false;
} else
{
//剩余的水量太大,被限流
return true;
}
}
//線程池,用于多線程模擬測試
private ExecutorService pool = Executors.newFixedThreadPool(10);
//測試用例
@Test
public void testLimit()
{
//測試用例太長,這里省略
//90%的測試用例代碼與前面的計(jì)算器限流測試代碼相同
//具體的源碼可參見瘋狂創(chuàng)客圈社群的開源庫
}
}以下是使用兩條線程,每條線程各運(yùn)行20次,每一次運(yùn)行休眠200毫秒,總計(jì)耗時(shí)4秒,運(yùn)行40次,部分輸出結(jié)果如下:
[pool-2-thread-1] INFO c.c.s.r.LeakBucketLimiter - water 0 pastTime 75 outWater 0
...
[pool-2-thread-1] INFO c.c.s.r.LeakBucketLimiter - water 1 pastTime 601 outWater 1
...
[pool-2-thread-1] INFO c.c.s.r.LeakBucketLimiter - water 2 pastTime 416 outWater 0
[pool-2-thread-2] INFO c.c.s.r.LeakBucketLimiter - water 1 pastTime 601 outWater 1
[pool-2-thread-1] INFO c.c.s.r.LeakBucketLimiter - water 2 pastTime 15 outWater 0
[pool-2-thread-2] INFO c.c.s.r.LeakBucketLimiter - water 2 pastTime 201 outWater 0 [pool-2-thread-1] INFO c.c.s.r.LeakBucketLimiter - water 2 pastTime 216 outWater 0
[main] INFO c.c.s.r.LeakBucketLimiter - 限制的次數(shù)為:32,通過的次數(shù)為:8
[main] INFO c.c.s.r.LeakBucketLimiter - 限制的比例為:0.8
[main] INFO c.c.s.r.LeakBucketLimiter - 運(yùn)行的時(shí)長為:4.107漏桶的出水速度固定,也就是請求放行速度是固定的。故漏桶不能有效應(yīng)對突發(fā)流量,但是能起到平滑突發(fā)流量(整流)的作用。
令牌桶限流原理和Java參考實(shí)現(xiàn)
令牌桶算法以一個(gè)設(shè)定的速率產(chǎn)生令牌并放入令牌桶,每次用戶請求都得申請令牌,如果令牌不足,就會拒絕請求。
在令牌桶算法中,新請求到來時(shí)會從桶里拿走一個(gè)令牌,如果桶內(nèi)沒有令牌可拿,就拒絕服務(wù)。當(dāng)然,令牌的數(shù)量是有上限的。令牌的數(shù)量與時(shí)間和發(fā)放速率強(qiáng)相關(guān),流逝的時(shí)間越長,會不斷往桶里加入越多令牌,如果令牌發(fā)放的速度比申請速度快,令牌桶就會放滿令牌,直到令牌占滿整個(gè)令牌桶,如圖9-2所示。
另外,令牌的發(fā)送速率可以設(shè)置,從而可以對突發(fā)流量進(jìn)行有效的應(yīng)對。
令牌桶限流大致的規(guī)則如下:
(1)進(jìn)水口按照某個(gè)速度向桶中放入令牌。
(2)令牌的容量是固定的,但是放行的速度是不固定的,只要桶中還有剩余令牌,一旦請求過來就能申請成功,然后放行。
(3)如果令牌的發(fā)放速度慢于請求到來的速度,桶內(nèi)就無令牌可領(lǐng),請求就會被拒絕。

圖9-2 令牌桶
令牌桶的Java參考實(shí)現(xiàn)代碼如下:
package com.crazymaker.springcloud.ratelimit;
...
//令牌桶,限速
@Slf4j
public class TokenBucketLimiter
{
//上一次令牌發(fā)放的時(shí)間
public long lastTime = System.currentTimeMillis();
//桶的容量
public int capacity = 2;
//令牌生成速度個(gè)/秒
public int rate = 2;
//當(dāng)前令牌的數(shù)量
public int tokens;
//返回值說明
//false:沒有被限制到
//true:被限流
public synchronized boolean tryAcquire(long taskId, int applyCount)
{
long now = System.currentTimeMillis();
時(shí)間間隔
單位為毫秒 //時(shí)間間隔,單位為毫秒
long gap = now - lastTime;
//當(dāng)前令牌數(shù)
tokens = Math.min(capacity, (int) (tokens + gap *rate/ 1000));
log.info("tokens {} capacity {} gap {} ", tokens, capacity, gap);
if (tokens < applyCount)
{
//若拿不到令牌,則拒絕
//log.info("被限流了.." + taskId + ", applyCount: " + applyCount);
return true;
} else
{
//還有令牌,領(lǐng)取令牌
tokens -= applyCount;
lastTime = now;
//log.info("剩余令牌.." + tokens);
return false;
}
}
//線程池,用于多線程模擬測試
private ExecutorService pool = Executors.newFixedThreadPool(10);
@Test
public void testLimit()
{
//被限制的次數(shù)
AtomicInteger limited = new AtomicInteger(0);
//線程數(shù)
final int threads = 2;
//每條線程的執(zhí)行輪數(shù)
final int turns = 20;
//同步器
CountDownLatch countDownLatch = new CountDownLatch(threads);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++)
{
pool.submit(() ->
{
try
{
for (int j = 0; j < turns; j++)
{
long taskId = Thread.currentThread().getId();
boolean intercepted = tryAcquire(taskId, 1);
if (intercepted)
{
//被限制的次數(shù)累積
limited.getAndIncrement();
}
Thread.sleep(200);
}
} catch (Exception e)
{
e.printStackTrace();
}
//等待所有線程結(jié)束
countDownLatch.countDown();
});
}
try
{
countDownLatch.await();
} catch (InterruptedException e)
{
e.printStackTrace();
}
float time = (System.currentTimeMillis() - start) / 1000F;
//輸出統(tǒng)計(jì)結(jié)果
log.info("限制的次數(shù)為:" + limited.get() +
",通過的次數(shù)為:" + (threads *turns - limited.get()));
限制的比例為 log.info("限制的比例為:" +
(float) limited.get() / (float) (threads *turns));
log.info("運(yùn)行的時(shí)長為:" + time);
}
}運(yùn)行這個(gè)示例程序,部分結(jié)果如下:
[pool-2-thread-2] INFO c.c.s.r.TokenBucketLimiter - tokens 0 capacity 2 gap 104
[pool-2-thread-1] INFO c.c.s.r.TokenBucketLimiter - tokens 0 capacity 2 gap 114
[pool-2-thread-2] INFO c.c.s.r.TokenBucketLimiter - tokens 0 capacity 2 gap 314
[pool-2-thread-1] INFO c.c.s.r.TokenBucketLimiter - tokens 0 capacity 2 gap 314
[pool-2-thread-2] INFO c.c.s.r.TokenBucketLimiter - tokens 1 capacity 2 gap 515
[pool-2-thread-1] INFO c.c.s.r.TokenBucketLimiter - tokens 0 capacity 2 gap 0
...
[pool-2-thread-2] INFO c.c.s.r.TokenBucketLimiter - tokens 0 capacity 2 gap 401
[pool-2-thread-1] INFO c.c.s.r.TokenBucketLimiter - tokens 0 capacity 2 gap 402
[main] INFO c.c.s.r.TokenBucketLimiter - 限制的次數(shù)為:34,通過的次數(shù)為:6
[main] INFO c.c.s.r.TokenBucketLimiter - 限制的比例為:0.85
[main] INFO c.c.s.r.TokenBucketLimiter - 運(yùn)行的時(shí)長為:4.119令牌桶的好處之一就是可以方便地應(yīng)對突發(fā)流量。比如,可以改變令牌的發(fā)放速度,算法能按照新的發(fā)送速率調(diào)大令牌的發(fā)放數(shù)量,使得突發(fā)流量能被處理。
本文給大家講解的內(nèi)容是高并發(fā)核心編程,限流原理與實(shí)戰(zhàn),限流策略原理與參考實(shí)現(xiàn)
下篇文章給大家講解的是高并發(fā)核心編程,限流原理與實(shí)戰(zhàn),分布式計(jì)數(shù)器限流;
覺得文章不錯的朋友可以轉(zhuǎn)發(fā)此文關(guān)注小編;
感謝大家的支持!
本文就是愿天堂沒有BUG給大家分享的內(nèi)容,大家有收獲的話可以分享下,想學(xué)習(xí)更多的話可以到微信公眾號里找我,我等你哦。
