分布式接口冪等性、分布式限流(Guava 、nginx和lua限流)
你知道的越多,不知道的就越多,業(yè)余的像一棵小草!
你來,我們一起精進!你不來,我和你的競爭對手一起精進!
編輯:業(yè)余草
blog.csdn.net/qq_34886352
推薦:https://www.xttblog.com/?p=5304
一、接口冪等性
接口冪等性就是用戶對于同一操作發(fā)起的一次請求或者多次請求的結(jié)果是一致的,不會因為多次點擊而產(chǎn)生了副作用。舉個最簡單的例子,那就是支付,用戶購買商品后支付,支付扣款成功,但是返回結(jié)果的時候網(wǎng)絡(luò)異常,此時錢已經(jīng)扣了,用戶再次點擊按鈕,此時會進行第二次扣款,返回結(jié)果成功,用戶查詢余額返發(fā)現(xiàn)多扣錢了,流水記錄也變成了兩條,這就沒有保證接口的冪等性。
冪等性的核心思想:通過唯一的業(yè)務單號保障冪等性,非并發(fā)的情況下,查詢業(yè)務單號有沒有操作過,沒有則執(zhí)行操作,并發(fā)情況下,這個操作過程需要加鎖。
1、Update操作的冪等性
1)根據(jù)唯一業(yè)務號去更新數(shù)據(jù)
通過版本號的方式,來控制update的操作的冪等性,用戶查詢出要修改的數(shù)據(jù),系統(tǒng)將數(shù)據(jù)返回給頁面,將數(shù)據(jù)版本號放入隱藏域,用戶修改數(shù)據(jù),點擊提交,將版本號一同提交給后臺,后臺使用版本號作為更新條件
update?set?version?=?version?+1?,xxx=${xxx}?where?id?=xxx?and?version?=?${version};
2、使用Token機制,保證update、insert操作的冪等性
1)沒有唯一業(yè)務號的update與insert操作
進入到注冊頁時,后臺統(tǒng)一生成Token, 返回前臺隱藏域中,用戶在頁面點擊提交時,將Token一同傳入后臺,使用Token獲取分布式鎖,完成Insert操作,執(zhí)行成功后,不釋放鎖,等待過期自動釋放。
二、分布式限流
1、分布式限流的幾種維度
時間 限流基于某段時間范圍或者某個時間點,也就是我們常說的“時間窗口”,比如對每分鐘、每秒鐘的時間窗口做限定 資源 基于可用資源的限制,比如設(shè)定最大訪問次數(shù),或最高可用連接數(shù)
上面兩個維度結(jié)合起來看,限流就是在某個時間窗口對資源訪問做限制,比如設(shè)定每秒最多100個訪問請求。但在真正的場景里,我們不止設(shè)置一種限流規(guī)則,而是會設(shè)置多個限流規(guī)則共同作用,主要的幾種限流規(guī)則如下:
1)QPS和連接數(shù)控制
針對上圖中的連接數(shù)和QPS(query per second)限流來說,我們可以設(shè)定IP維度的限流,也可以設(shè)置基于單個服務器的限流。在真實環(huán)境中通常會設(shè)置多個維度的限流規(guī)則,比如設(shè)定同一個IP每秒訪問頻率小于10,連接數(shù)小于5,再設(shè)定每臺機器QPS最高1000,連接數(shù)最大保持200。更進一步,我們可以把某個服務器組或整個機房的服務器當做一個整體,設(shè)置更high-level的限流規(guī)則,這些所有限流規(guī)則都會共同作用于流量控制。
2)傳輸速率
對于“傳輸速率”大家都不會陌生,比如資源的下載速度。有的網(wǎng)站在這方面的限流邏輯做的更細致,比如普通注冊用戶下載速度為100k/s,購買會員后是10M/s,這背后就是基于用戶組或者用戶標簽的限流邏輯。
3)黑白名單
黑白名單是各個大型企業(yè)應用里很常見的限流和放行手段,而且黑白名單往往是動態(tài)變化的。舉個例子,如果某個IP在一段時間的訪問次數(shù)過于頻繁,被系統(tǒng)識別為機器人用戶或流量攻擊,那么這個IP就會被加入到黑名單,從而限制其對系統(tǒng)資源的訪問,這就是我們俗稱的“封IP”。
我們平時見到的爬蟲程序,比如說爬知乎上的美女圖片,或者爬券商系統(tǒng)的股票分時信息,這類爬蟲程序都必須實現(xiàn)更換IP的功能,以防被加入黑名單。有時我們還會發(fā)現(xiàn)公司的網(wǎng)絡(luò)無法訪問12306這類大型公共網(wǎng)站,這也是因為某些公司的出網(wǎng)IP是同一個地址,因此在訪問量過高的情況下,這個IP地址就被對方系統(tǒng)識別,進而被添加到了黑名單。使用家庭寬帶的同學們應該知道,大部分網(wǎng)絡(luò)運營商都會將用戶分配到不同出網(wǎng)IP段,或者時不時動態(tài)更換用戶的IP地址。
白名單就更好理解了,相當于御賜金牌在身,可以自由穿梭在各種限流規(guī)則里,暢行無阻。比如某些電商公司會將超大賣家的賬號加入白名單,因為這類賣家往往有自己的一套運維系統(tǒng),需要對接公司的IT系統(tǒng)做大量的商品發(fā)布、補貨等等操作。
4)分布式環(huán)境
所謂的分布式限流,其實道理很簡單,一句話就可以解釋清楚。分布式區(qū)別于單機限流的場景,它把整個分布式環(huán)境中所有服務器當做一個整體來考量。比如說針對IP的限流,我們限制了1個IP每秒最多10個訪問,不管來自這個IP的請求落在了哪臺機器上,只要是訪問了集群中的服務節(jié)點,那么都會受到限流規(guī)則的制約。
從上面的例子不難看出,我們必須將限流信息保存在一個“中心化”的組件上,這樣它就可以獲取到集群中所有機器的訪問狀態(tài),目前有兩個比較主流的限流方案:
網(wǎng)關(guān)層限流
將限流規(guī)則應用在所有流量的入口處
中間件限流
將限流信息存儲在分布式環(huán)境中某個中間件里(比如Redis緩存),每個組件都可以從這里獲取到當前時刻的流量統(tǒng)計,從而決定是拒絕服務還是放行流量
2、限流方案常用算法講解
1)令牌桶算法
Token Bucket令牌桶算法是目前應用最為廣泛的限流算法,顧名思義,它有以下兩個關(guān)鍵角色:
令牌 獲取到令牌的Request才會被處理,其他Requests要么排隊要么被直接丟棄 桶 用來裝令牌的地方,所有Request都從這個桶里面獲取令牌

「令牌生成」
這個流程涉及到令牌生成器和令牌桶,前面我們提到過令牌桶是一個裝令牌的地方,既然是個桶那么必然有一個容量,也就是說令牌桶所能容納的令牌數(shù)量是一個固定的數(shù)值。
對于令牌生成器來說,它會根據(jù)一個預定的速率向桶中添加令牌,比如我們可以配置讓它以每秒100個請求的速率發(fā)放令牌,或者每分鐘50個。注意這里的發(fā)放速度是勻速,也就是說這50個令牌并非是在每個時間窗口剛開始的時候一次性發(fā)放,而是會在這個時間窗口內(nèi)勻速發(fā)放。
在令牌發(fā)放器就是一個水龍頭,假如在下面接水的桶子滿了,那么自然這個水(令牌)就流到了外面。在令牌發(fā)放過程中也一樣,令牌桶的容量是有限的,如果當前已經(jīng)放滿了額定容量的令牌,那么新來的令牌就會被丟棄掉。
「令牌獲取」
每個訪問請求到來后,必須獲取到一個令牌才能執(zhí)行后面的邏輯。假如令牌的數(shù)量少,而訪問請求較多的情況下,一部分請求自然無法獲取到令牌,那么這個時候我們可以設(shè)置一個“緩沖隊列”來暫存這些多余的令牌。
緩沖隊列其實是一個可選的選項,并不是所有應用了令牌桶算法的程序都會實現(xiàn)隊列。當有緩存隊列存在的情況下,那些暫時沒有獲取到令牌的請求將被放到這個隊列中排隊,直到新的令牌產(chǎn)生后,再從隊列頭部拿出一個請求來匹配令牌。
當隊列已滿的情況下,這部分訪問請求將被丟棄。在實際應用中我們還可以給這個隊列加一系列的特效,比如設(shè)置隊列中請求的存活時間,或者將隊列改造為PriorityQueue,根據(jù)某種優(yōu)先級排序,而不是先進先出。算法是死的,人是活的,先進的生產(chǎn)力來自于不斷的創(chuàng)造,在技術(shù)領(lǐng)域尤其如此。
2)漏桶算法
Leaky Bucket

漏桶算法的前半段和令牌桶類似,但是操作的對象不同,令牌桶是將令牌放入桶里,而漏桶是將訪問請求的數(shù)據(jù)包放到桶里。同樣的是,如果桶滿了,那么后面新來的數(shù)據(jù)包將被丟棄。
漏桶算法的后半程是有鮮明特色的,它永遠只會以一個恒定的速率將數(shù)據(jù)包從桶內(nèi)流出。打個比方,如果我設(shè)置了漏桶可以存放100個數(shù)據(jù)包,然后流出速度是1s一個,那么不管數(shù)據(jù)包以什么速率流入桶里,也不管桶里有多少數(shù)據(jù)包,漏桶能保證這些數(shù)據(jù)包永遠以1s一個的恒定速度被處理。
「漏桶 vs 令牌桶的區(qū)別」
根據(jù)它們各自的特點不難看出來,這兩種算法都有一個“恒定”的速率和“不定”的速率。令牌桶是以恒定速率創(chuàng)建令牌,但是訪問請求獲取令牌的速率“不定”,反正有多少令牌發(fā)多少,令牌沒了就干等。而漏桶是以“恒定”的速率處理請求,但是這些請求流入桶的速率是“不定”的。
從這兩個特點來說,漏桶的天然特性決定了它不會發(fā)生突發(fā)流量,就算每秒1000個請求到來,那么它對后臺服務輸出的訪問速率永遠恒定。而令牌桶則不同,其特性可以“預存”一定量的令牌,因此在應對突發(fā)流量的時候可以在短時間消耗所有令牌,其突發(fā)流量處理效率會比漏桶高,但是導向后臺系統(tǒng)的壓力也會相應增多。
3、分布式限流的主流方案
這里主要講nginx和lua的限流,gateway和hystrix放在后面springcloud中講
1)Guava RateLimiter客戶端限流
引入maven
<dependency>
????<groupId>com.google.guavagroupId>
????<artifactId>guavaartifactId>
????<version>18.0version>
dependency>
2.編寫Controller
@RestController
@Slf4j
public?class?Controller{
????//每秒鐘可以創(chuàng)建兩個令牌
????RateLimiter?limiter?=?RateLimiter.create(2.0);
????
????//非阻塞限流
????@GetMapping("/tryAcquire")
????public?String?tryAcquire(Integer?count){
????????//count?每次消耗的令牌
????????if(limiter.tryAcquire(count)){
????????????log.info("成功,允許通過,速率為{}",limiter.getRate());
????????????return?"success";
????????}else{
????????????log.info("錯誤,不允許通過,速率為{}",limiter.getRate());
????????????return?"fail";
????????}
????}
????
????//限定時間的非阻塞限流
????@GetMapping("/tryAcquireWithTimeout")
????public?String?tryAcquireWithTimeout(Integer?count,?Integer?timeout){
????????//count?每次消耗的令牌??timeout?超時等待的時間
????????if(limiter.tryAcquire(count,timeout,TimeUnit.SECONDS)){
????????????log.info("成功,允許通過,速率為{}",limiter.getRate());
????????????return?"success";
????????}else{
????????????log.info("錯誤,不允許通過,速率為{}",limiter.getRate());
????????????return?"fail";
????????}
????}
????
????//同步阻塞限流
????@GetMapping("/acquire")
????public?String?acquire(Integer?count){
????????limiter.acquire(count);
????????log.info("成功,允許通過,速率為{}",limiter.getRate());
????????return?"success";
????}
}
2)基于Nginx的限流
1.iP限流
編寫Controller
@RestController
@Slf4j
public?class?Controller{
????//nginx測試使用
????@GetMapping("/nginx")
????public?String?nginx(){
????????log.info("Nginx?success");
????}
}
修改host文件,添加一個網(wǎng)址域名
127.0.0.1 www.test.com
3.修改nginx,將步驟2中的域名,添加到路由規(guī)則當中
打開nginx的配置文件
vim /usr/local/nginx/conf/nginx.conf
添加一個服務
#根據(jù)IP地址限制速度
#1)$binary_remote_addr binary_目的是縮寫內(nèi)存占用,remote_addr表示通過IP地址來限流
#2)zone=iplimit:20m iplimit是一塊內(nèi)存區(qū)域(記錄訪問頻率信息),20m是指這塊內(nèi)存區(qū)域的大小
#3)rate=1r/s 每秒放行1個請求
limit_req_zone $binary_remote_addr zone=iplimit:20m rate=1r/s;
server{
server_name www.test.com;
location /access-limit/ {
proxy_pass http://127.0.0.1:8080/;
#基于ip地址的限制
#1)zone=iplimit 引用limit_rep_zone中的zone變量
#2)burst=2 設(shè)置一個大小為2的緩沖區(qū)域,當大量請求到來,請求數(shù)量超過限流頻率時,將其放入緩沖區(qū)域
#3)nodelay 緩沖區(qū)滿了以后,直接返回503異常
limit_req zone=iplimit burst=2 nodelay;
}
}
4.訪問地址,測試是否限流
www.test.com/access-limit/nginx
2.多維度限流
1.修改nginx配置
#根據(jù)IP地址限制速度
limit_req_zone $binary_remote_addr zone=iplimit:20m rate=10r/s;
#根據(jù)服務器級別做限流
limit_req_zone $server_name zone=serverlimit:10m rate=1r/s;
#根據(jù)ip地址的鏈接數(shù)量做限流
limit_conn_zone $binary_remote_addr zone=perip:20m;
#根據(jù)服務器的連接數(shù)做限流
limit_conn_zone $server_name zone=perserver:20m;
server{
server_name www.test.com;
location /access-limit/ {
proxy_pass http://127.0.0.1:8080/;
#基于ip地址的限制
limit_req zone=iplimit burst=2 nodelay;
#基于服務器級別做限流
limit_req zone=serverlimit burst=2 nodelay;
#基于ip地址的鏈接數(shù)量做限流 最多保持100個鏈接
limit_conn zone=perip 100;
#基于服務器的連接數(shù)做限流 最多保持100個鏈接
limit_conn zone=perserver 1;
#配置request的異常返回504(默認為503)
limit_req_status 504;
limit_conn_status 504;
}
location /download/ {
#前100m不限制速度
limit_rate_affer 100m;
#限制速度為256k
limit_rate 256k;
}
}
3)基于Redis+Lua的分布式限流
1.Lua腳本
Lua是一個很小巧精致的語言,它的誕生(1993年)甚至比JDK 1.0還要早。Lua是由標準的C語言編寫的,它的源碼部分不過2萬多行C代碼,甚至一個完整的Lua解釋器也就200k的大小。
Lua往大了說是一個新的編程語言,往小了說就是一個腳本語言。對于有編程經(jīng)驗的同學,拿到一個Lua腳本大體上就能把業(yè)務邏輯猜的八九不離十了。
Redis內(nèi)置了Lua解釋器,執(zhí)行過程保證原子性。
2.Lua安裝
安裝Lua:
參考
http://www.lua.org教程,下載5.3.5_1版本,本地安裝如果你使用的是Mac,那建議用brew工具直接執(zhí)行brew install lua就可以順利安裝,
有關(guān)brew工具的安裝可以參考https://brew.sh/網(wǎng)站。
使用brew安裝后的目錄在/usr/local/Cellar/lua/5.3.5_1安裝IDEA插件,在IDEA->Preferences面板,Plugins,
里面Browse repositories,在里面搜索lua,然后就選擇同名插件lua。安裝好后重啟IDEA
配置Lua SDK的位置: IDEA->File->Project Structure,
選擇添加Lua,路徑指向Lua SDK的bin文件夾
4.都配置好之后,在項目中右鍵創(chuàng)建Module,左側(cè)欄選擇lua,點下一步,選擇lua的sdk,下一步,輸入lua項目名,完成
3.編寫hello lua
print 'Hello Lua'
4.編寫模擬限流
-- 模擬限流
-- 用作限流的key
local key = 'my key'
-- 限流的最大閾值
local limit = 2
-- 當前限流大小
local currentLimit = 2
-- 是否超過限流標準
if currentLimit + 1 > limit then
print 'reject'
return false
else
print 'accept'
return true
end
5.限流組件封裝
1.添加maven
<dependency>
????<groupId>org.springframework.bootgroupId>
????<artifactId>spring-boot-starter-data-redisartifactId>
dependency>
<dependency>
????<groupId>org.springframework.bootgroupId>
????<artifactId>spring-boot-starter-aopartifactId>
dependency>
<dependency>
????<groupId>com.google.guavagroupId>
????<artifactId>guavaartifactId>
????<version>18.0version>
dependency>
2.添加Spring配置
不是重要內(nèi)容就隨便寫點,主要就是把reids配置一下
server.port=8080
spring.redis.database=0
spring.redis.host=localhost
spring.redis.port=6376
3.編寫限流腳本
lua腳本放在resource目錄下就可以了
--?獲取方法簽名特征
local?methodKey?=?KEYS[1]
redis.log(redis.LOG_DEBUG,'key?is',methodKey)
--?調(diào)用腳本傳入的限流大小
local?limit?=?tonumber(ARGV[1])
--?獲取當前流量大小
local?count?=?tonumber(redis.call('get',methodKey)?or?"0")
--是否超出限流值
if?count?+?1?>limit?then
????--?拒絕訪問
????return?false
else
????--?沒有超過閾值
????--?設(shè)置當前訪問數(shù)量+1
????redis.call('INCRBY',methodKey,1)
????--?設(shè)置過期時間
????redis.call('EXPIRE',methodKey,1)
????--?放行
????return?true
end
4.使用spring-data-redis組件集成Lua和Redis
創(chuàng)建限流類
@Service
@Slf4j
public?class?AccessLimiter{
????@Autowired
????private?StringRedisTemplate?stringRedisTemplate;
????@Autowired
????private?RedisScript?rateLimitLua;
????public?void?limitAccess(String?key,Integer?limit){
????????boolean?acquired?=?stringRedisTemplate.execute(
????????????rateLimitLua,//lua腳本的真身
????????????Lists.newArrayList(key),//lua腳本中的key列表
????????????limit.toString()//lua腳本的value列表
????????);
????????if(!acquired){
????????????log.error("Your?access?is?blocked,key={}",key);
????????????throw?new?RuntimeException("Your?access?is?blocked");
????????}
????}
}
創(chuàng)建配置類
@Configuration
public?class?RedisConfiguration{
????public?RedisTemplate?redisTemplate(RedisConnectionFactory?factory) {
????????return?new?StringRedisTemplate(factory);
????}
????
????public?DefaultRedisScript?loadRedisScript(){
????????DefaultRedisScript?redisScript?=?new?DefaultRedisScript();
????????redisScript.setLocation(new?ClassPathResource("rateLimiter.lua"));
????????redisScript.setResultType(java.lang.Boolean.class);
????????return?redisScript;
????}
}
5.在Controller中添加測試方法驗證限流效果
@RestController
@Slf4j
public?class?Controller{
????@Autowired
????private?AccessLimiter?accessLimiter;
????
????@GetMapping("test")
????public?String?test(){
????????accessLimiter.limitAccess("ratelimiter-test",1);
????????return?"success";
????}
}?
6.編寫限流注解
1.新增注解
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public?@interface?AccessLimiterAop{
????int?limit();
????
????String?methodKey()?default?"";
}
2.新增切面
@Slf4j
@Aspect
@Component
public?class?AccessLimiterAspect{
????@Autowired
????private?AccessLimiter??accessLimiter;
????//根據(jù)注解的位置,自己修改
????@Pointcut("@annotation(com.gyx.demo.annotation.AccessLimiter)")
????public?void?cut(){
????????log.info("cut");
????}
????
????@Before("cut()")
????public?void?before(JoinPoint?joinPoint){
????????//獲取方法簽名,作為methodkey
????????MethodSignature?signature?=(MethodSignature)?joinPoint.getSignature();
????????Method?method?=?signature.getMethod();
????????AccessLimiterAop?annotation?=?method.getAnnotation(AccessLimiterAop.class);
????????
????????if(annotation?==?null){
????????????return;
????????}
????????String?key?=?annotation.methodKey();
????????Integer?limit?=?annotation.limit();
????????//如果沒有設(shè)置methodKey,就自動添加一個
????????if(StringUtils.isEmpty(key)){
????????????Class[]?type?=?method.getParameterType();
????????????key?=?method.getName();
????????????if?(type?!=?null){
????????????????String?paramTypes=Arrays.stream(type)
????????????????????.map(Class::getName)
????????????????????.collect(Collectors.joining(","));
????????????????????key?+=?"#"+paramTypes;
????????????}
????????}
????????
????????//調(diào)用redis
????????return?accessLimiter.limitAccess(key,limit);
????}
}
3.在Controller中添加測試方法驗證限流效果
@RestController
@Slf4j
public?class?Controller{
????@Autowired
????private?AccessLimiter?accessLimiter;
????
????@GetMapping("test")
????@AccessLImiterAop(limit?=1)
????public?String?test(){
????????return?"success";
????}
}?
