可能要用心學(xué)高并發(fā)核心編程,限流原理與實戰(zhàn),分布式令牌桶限流
實戰(zhàn):分布式令牌桶限流
本節(jié)介紹的分布式令牌桶限流通過Lua+Java結(jié)合完成,首先在Lua腳本中完成限流的計算,然后在Java代碼中進(jìn)行組織和調(diào)用。

分布式令牌桶限流Lua腳本
分布式令牌桶限流Lua腳本的核心邏輯和Java令牌桶的執(zhí)行邏輯類似,只是限流計算相關(guān)的統(tǒng)計和時間數(shù)據(jù)存放于Redis中。
這里將限流的腳本命名為rate_limiter.lua,該腳本既使用Redis存儲令牌桶信息,自身又執(zhí)行于Redis中,所以筆者將該腳本放置于base-redis基礎(chǔ)模塊中,它的代碼如下:
---此腳本的環(huán)境:redis內(nèi)部,不是運行在Nginx內(nèi)部
---方法:申請令牌
----1:failed
---1:success
---@param key:key限流關(guān)鍵字
---@param apply:申請的令牌數(shù)量
local function acquire(key, apply)
local times = redis.call('TIME');
--times[1] 秒數(shù) --times[2] 微秒數(shù)
local curr_mill_second = times[1] *1000000 + times[2];
curr_mill_second = curr_mill_second / 1000;
local cacheInfo = redis.pcall("HMGET", key, "last_mill_second", "curr_permits", "max_permits", "rate")
---局部變量:上次申請的時間
local last_mill_second = cacheInfo[1];
---局部變量:之前的令牌數(shù)
local curr_permits = tonumber(cacheInfo[2]);
---局部變量:桶的容量
local max_permits = tonumber(cacheInfo[3]);
---局部變量:令牌的發(fā)放速率
local rate = cacheInfo[4];
---局部變量:本次的令牌數(shù)
local local_curr_permits = max_permits;
if (type(last_mill_second) ~= 'boolean' and last_mill_second ~= nil) then
--計算時間段內(nèi)的令牌數(shù)
local reverse_permits = math.floor(((curr_mill_second - last_mill_second) / 1000) *rate);
--令牌總數(shù)
local expect_curr_permits = reverse_permits + curr_permits;
--可以申請的令牌總數(shù)
local_curr_permits = math.min(expect_curr_permits, max_permits);
else
--第一次獲取令牌
redis.pcall("HSET", key, "last_mill_second", curr_mill_second)
end
local result = -1;
--有足夠的令牌可以申請
if (local_curr_permits - apply >= 0) then
--保存剩余的令牌
redis.pcall("HSET", key, "curr_permits", local_curr_permits - apply);
--保存時間,下次令牌獲取時使用
redis.pcall("HSET", key, "last_mill_second", curr_mill_second)
--返回令牌獲取成功
result = 1;
else
--保存令牌總數(shù)
redis.pcall("HSET", key, "curr_permits", local_curr_permits);
--返回令牌獲取失敗
result = -1;
end
return result
end
---方法:初始化限流器
---1 success
---@param key key
---@param max_permits 桶的容量
---@param rate 令牌的發(fā)放速率
local function init(key, max_permits, rate)
local rate_limit_info = redis.pcall("HMGET", key, "last_mill_second", "curr_permits", "max_permits", "rate")
local org_max_permits = tonumber(rate_limit_info[3])
local org_rate = rate_limit_info[4]
if (org_max_permits == nil) or (rate ~= org_rate or max_permits ~= org_max_permits) then
redis.pcall("HMSET", key, "max_permits", max_permits, "rate", rate, "curr_permits", max_permits)
end
return 1;
end
---方法:刪除限流Key
local function delete(key)
redis.pcall("DEL", key) return 1;
end
local key = KEYS[1]
local method = ARGV[1]
if method == 'acquire' then
return acquire(key, ARGV[2], ARGV[3])
elseif method == 'init' then
return init(key, ARGV[2], ARGV[3])
elseif method == 'delete' then
return delete(key)
else
--ignore
end該腳本有3個方法,其中兩個方法比較重要,分別說明如下:
(1)限流器初始化方法init(key,max_permits,rate),此方法在限流開始時被調(diào)用。
(2)限流檢測的方法acquire(key,apply),此方法在請求到來時被調(diào)用。
Java分布式令牌桶限流
rate_limiter.lua腳本既可以在Java中調(diào)用,又可以在Nginx中調(diào)用。本小節(jié)先介紹其在Java中的使用,第10章再介紹其在Nginx中的使用。
Java分布式令牌桶限流器的實現(xiàn)就是通過Java代碼向Redis加載rate_limiter.lua腳本,然后封裝其令牌桶初始化方法init(...)和限流監(jiān)測方法acquire(...),以供外部調(diào)用。它的代碼如下:
package com.crazymaker.springcloud.standard.ratelimit;
...
/**
*實現(xiàn):令牌桶限流服務(wù)
*create by尼恩 @ 瘋狂創(chuàng)客圈
**/
@Slf4j
public class RedisRateLimitImpl implements RateLimitService, InitializingBean
{
/**
*限流器的redis key前綴
*/
private static final String RATE_LIMITER_KEY_PREFIX = "rate_limiter:";
//private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
private RedisRateLimitProperties redisRateLimitProperties;
private RedisTemplate redisTemplate;
//lua腳本的實例
private static RedisScript rateLimiterScript = null;
//lua腳本的類路徑
private static String rateLimitLua = "script/rate_limiter.lua";
static
{
//從類路徑文件中加載令牌桶l(fā)ua腳本
String script = IOUtil.loadJarFile(RedisRateLimitImpl.class.getClassLoader(), rateLimitLua);
if (StringUtils.isEmpty(script))
{
log.error("lua script load failed:" + rateLimitLua);
} else
{
//創(chuàng)建Lua腳本實例
rateLimiterScript = new DefaultRedisScript<>(script, Long.class);
}
}
public RedisRateLimitImpl(
RedisRateLimitProperties redisRateLimitProperties,
RedisTemplate redisTemplate)
{
this.redisRateLimitProperties = redisRateLimitProperties;
this.redisTemplate = redisTemplate;
}
private Map<String, LimiterInfo> limiterInfoMap = new HashMap<>();
/**
*限流器的信息
*/
@Builder
@Data
public static class LimiterInfo
{
/**
*限流器的key,如秒殺的id
*/ private String key;
/**
*限流器的類型,如seckill
*/
private String type = "default";
/**
*限流器的最大桶容量
*/
private Integer maxPermits;
/**
*限流器的速率
*/
private Integer rate;
/**
*限流器的redis key
*/
public String fullKey()
{
return RATE_LIMITER_KEY_PREFIX + type + ":" + key;
}
/**
*限流器在map中的緩存key
*/
public String cashKey()
{
return type + ":" + key;
}
}
/**
*限流檢測:是否超過redis令牌桶限速器的限制
*
*@param cacheKey計數(shù)器的key
*@return true or false
*/
@Override
public Boolean tryAcquire(String cacheKey)
{
if (cacheKey == null)
{
return true;
}
if (cacheKey.indexOf(":") <= 0)
{
cacheKey = "default:" + cacheKey;
}
LimiterInfo limiterInfo = limiterInfoMap.get(cacheKey);
if (limiterInfo == null)
{
return true;
}
Long acquire = (Long) redisTemplate.execute(rateLimiterScript,
ImmutableList.of(limiterInfo.fullKey()),
"acquire",
"1");
if (acquire == 1)
{
return false;
}
return true;
}
/**
*重載方法:限流器初始化
*
*@param limiterInfo限流的類型
*/
public void initLimitKey(LimiterInfo limiterInfo)
{
if (null == rateLimiterScript)
{
return;
}
String maxPermits = limiterInfo.getMaxPermits().toString();
String rate = limiterInfo.getRate().toString();
//執(zhí)行redis腳本
Long result = (Long) redisTemplate.execute(rateLimiterScript,
ImmutableList.of(limiterInfo.fullKey()),
"init",
maxPermits,
rate); limiterInfoMap.put(limiterInfo.cashKey(), limiterInfo);
}
/**
*限流器初始化
*
*@param type類型
*@param key id
*@param maxPermits上限
*@param rate 速度
*/
public void initLimitKey(String type, String key,
Integer maxPermits, Integer rate)
{
LimiterInfo limiterInfo = LimiterInfo.builder()
.type(type)
.key(key)
.maxPermits(maxPermits)
.rate(rate)
.build();
initLimitKey(limiterInfo);
}
/**
*獲取redis lua腳本的sha1編碼,并緩存到redis
*/
public String cacheSha1()
{
String sha1 = rateLimiterScript.getSha1();
redisTemplate.opsForValue().set("lua:sha1:rate_limiter", sha1);
return sha1;
}
} Java分布式令牌桶限流的自驗證
自驗證的工作:首先初始化分布式令牌桶限流器,然后使用兩條
線程不斷進(jìn)行限流的檢測。自驗證的代碼如下:
package com.crazymaker.springcloud.ratelimit;
...
@Slf4j
@RunWith(SpringRunner.class)
//指定啟動類
@SpringBootTest(classes = {DemoCloudApplication.class})
/**
*redis分布式令牌桶測試類
*/
public class RedisRateLimitTest
{
@Resource(name = "redisRateLimitImpl")
RedisRateLimitImpl limitService;
//線程池,用于多線程模擬測試
private ExecutorService pool = Executors.newFixedThreadPool(10);
@Test
public void testRedisRateLimit()
{
//初始化分布式令牌桶限流器
limitService.initLimitKey(
"seckill", //redis key中的類型
"10000", //redis key中的業(yè)務(wù)key,比如商品id
2, //桶容量
2); //每秒令牌數(shù)
AtomicInteger count = new AtomicInteger();
long start = System.currentTimeMillis();
//線程數(shù)
final int threads = 2;
//每條線程的執(zhí)行輪數(shù)
final int turns = 20;
//同步器
CountDownLatch countDownLatch = new CountDownLatch(threads);
for (int i = 0; i < threads; i++)
{
pool.submit(() ->
{
try
{
//每個用戶訪問turns次
for (int j = 0; j < turns; j++)
{
boolean limited = limitService.tryAcquire
("seckill:10000");
if (limited)
{
count.getAndIncrement();
}
Thread.sleep(200);
}
} catch (Exception e)
{ e.printStackTrace();
}
countDownLatch.countDown();
});
}
try
{
countDownLatch.await();
} catch (InterruptedException e)
{
e.printStackTrace();
}
float time = (System.currentTimeMillis() - start) / 1000F;
//輸出統(tǒng)計結(jié)果
log.info("限制的次數(shù)為:" + count.get() + " 時長為:" + time);
log.info("限制的次數(shù)為:" + count.get() +
",通過的次數(shù)為:" + (threads *turns - count.get()));
log.info("限制的比例為:" +
(float) count.get() / (float) (threads *turns));
log.info("運行的時長為:" + time);
try
{
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
}兩條線程各運行20次,每一次運行休眠200毫秒,總計耗時4秒,
運行40次,部分輸出結(jié)果如下:
[main] INFO c.c.s.r.RedisRateLimitTest - 限制的次數(shù)為:32 時長為:4.015
[main] INFO c.c.s.r.RedisRateLimitTest - 限制的次數(shù)為:32,通過的次數(shù)為:8
[main] INFO c.c.s.r.RedisRateLimitTest - 限制的比例為:0.8
[main] INFO c.c.s.r.RedisRateLimitTest - 運行的時長為:4.015大家可以自行調(diào)整參數(shù),運行以上自驗證程序并觀察實驗結(jié)果,體驗一下分布式令牌桶限流的效果。
本文給大家講解的內(nèi)容是高并發(fā)核心編程,限流原理與實戰(zhàn),實戰(zhàn):分布式令牌桶限流
下篇文章給大家講解的是高并發(fā)核心編程,Spring Cloud+Nginx秒殺實戰(zhàn);
覺得文章不錯的朋友可以轉(zhuǎn)發(fā)此文關(guān)注小編;
感謝大家的支持!
本文就是愿天堂沒有BUG給大家分享的內(nèi)容,大家有收獲的話可以分享下,想學(xué)習(xí)更多的話可以到微信公眾號里找我,我等你哦。
