spring-boot漏桶限流實現(xiàn)實踐

前言
今天最開始是打算通過線程池來實現(xiàn)漏桶限流算法的,但是實際分析之后發(fā)現(xiàn)似乎不具備可行性,難點有兩個,一個是資源問題,如果每個接口方法都創(chuàng)建一個線程池的話,那是不敢想象的;另一個問題,如果全局采用一個線程池,那就無法實現(xiàn)精細化的接口限流,似乎也不夠靈活,所以就放棄了,下面是我最初的思路:
定義一個線程池,漏桶通過線程池工作隊列實現(xiàn),漏桶出口速率通過線程的休眠來控制,丟棄超出容量的請求通過線程池的拒絕策略來實現(xiàn)。
最后,我直接找了一種網(wǎng)絡上能夠搜到的實現(xiàn)算法來完成今天實例demo,下面讓我們直接開始吧。
漏桶算法實現(xiàn)
首先我們先回顧下漏桶限流算法,它的具體原理是這樣的:我們需要定義一個容量固定的漏桶,因為外部請求數(shù)量是不確定的,所以我們要通過漏桶的容量來控制請求數(shù)量。同時要確定漏桶釋放請求的速率(出口),我們通過出口的速率,控制接口服務被調(diào)用的頻速。當漏桶中的請求數(shù)達到上限時,所有申請加入漏桶的請求都會被丟棄掉。
詳細研究漏桶算法,你會發(fā)現(xiàn),關(guān)于請求丟棄的處理有兩種方式,一種是直接丟棄多出來的請求,返回錯誤信息,另一種就是讓當前請求進出阻塞狀態(tài),等到漏桶中釋放出資源之后,再將請求放進漏桶中。今天我們先來看第一種,至于第二種,待我研究清楚了再說。
創(chuàng)建項目
和昨天一樣,我們先創(chuàng)建一個spring boot的web項目,但是今天的項目就比較簡單了,不需要引入任何外部包,只是為了方便測試,我引入了fastJson的依賴:
<dependency>
????<groupId>com.alibabagroupId>
????<artifactId>fastjsonartifactId>
????<version>1.2.72version>
dependency>
核心業(yè)務實現(xiàn)
我們先看下漏桶限流算法實現(xiàn):
public?final?class?LeakyBucket?{
????//?桶的容量
????private?int?capacity?=?10;
????//?木桶剩余的水滴的量(初始化的時候的空的桶)
????private?AtomicInteger?water?=?new?AtomicInteger(0);
????//?水滴的流出的速率?每1000毫秒流出1滴
????private?int?leakRate;
????//?第一次請求之后,木桶在這個時間點開始漏水
????private?long?leakTimeStamp;
????public?LeakyBucket(int?capacity,?int?leakRate)?{
????????this.capacity?=?capacity;
????????this.leakRate?=?leakRate;
????}
????public?LeakyBucket(int?leakRate)?{
????????this.leakRate?=?leakRate;
????}
????public?boolean?acquire()?{
????????//?如果是空桶,就當前時間作為桶開是漏出的時間
????????if?(water.get()?==?0)?{
????????????leakTimeStamp?=?System.currentTimeMillis();
????????????water.addAndGet(1);
????????????return?capacity?!=?0;
????????}
????????//?先執(zhí)行漏水,計算剩余水量
????????int?waterLeft?=?water.get()?-?((int)?((System.currentTimeMillis()?-?leakTimeStamp)?/?1000))?*?leakRate;
????????water.set(Math.max(0,?waterLeft));
????????//?重新更新leakTimeStamp
????????leakTimeStamp?=?System.currentTimeMillis();
????????//?嘗試加水,并且水還未滿
????????if?((water.get())?????????????water.addAndGet(1);
????????????return?true;
????????}?else?{
????????????//?水滿,拒絕加水
????????????return?false;
????????}
????}
}
目前,網(wǎng)絡上檢索到的也基本上都是這種實現(xiàn)(也不知道誰抄的誰,我是不是也沒臉說話,畢竟我也是代碼搬運工)。
關(guān)于漏桶算法的實現(xiàn),核心點是acquire()方法,這個方法會判斷漏桶是否已經(jīng)滿了,滿了會直接返回false,首次調(diào)用這個方法會返回true,從第二次開始,會計算漏桶中的剩余水量,同時會更新水量,如果水量未達到水量上限,水量會+1并返回true。
但是這個算法的實現(xiàn)問題也很明顯:leakRate(出口速率)處理用于計算剩余水位外,并沒有參與其他運算,這也就導致了漏桶的出口并不均勻。更合理的做法是,通過速率計算休眠時間,然后通過休眠時間控制速率的均勻性,今天由于時間的關(guān)系,我就先繼續(xù)往下了,后面有時間了,優(yōu)化完再來分享。
攔截器實現(xiàn)
今天的限速依然是通過攔截器來實現(xiàn),實現(xiàn)過程也比較簡單:
@Component
public?class?LeakyBucketLimiterInterceptor?implements?HandlerInterceptor?{
????@Override
????public?boolean?preHandle(HttpServletRequest?request,?HttpServletResponse?response,?Object?handler)?throws?Exception?{
????????if?(handler?instanceof?HandlerMethod)?{
????????????HandlerMethod?handlerMethod?=?(HandlerMethod)?handler;
????????????//?判斷方法是否包含CounterLimit,有這個注解就需要進行限速操作
????????????if?(handlerMethod.hasMethodAnnotation(LeakyBucketLimit.class))?{
????????????????LeakyBucketLimit?annotation?=?handlerMethod.getMethod().getAnnotation(LeakyBucketLimit.class);
????????????????LeakyBucket?leakyBucket?=?(LeakyBucket)BeanTool.getBean(annotation.limitClass());
????????????????boolean?acquire?=?leakyBucket.acquire();
????????????????response.setContentType("text/json;charset=utf-8");
????????????????JSONObject?result?=?new?JSONObject();
????????????????if?(acquire)?{
????????????????????result.put("result",?"請求成功");
????????????????}?else?{
????????????????????result.put("result",?"達到訪問次數(shù)限制,禁止訪問");
????????????????????response.getWriter().print(JSON.toJSONString(result));
????????????????}
????????????????System.out.println(result);
????????????????return?acquire;
????????????}
????????}
????????return?Boolean.TRUE;
????}
}
首先我在配置類中構(gòu)建漏桶算法的bean,然后在攔截器中獲取漏桶算法的實例,執(zhí)行其acquire()進行攔截操作,如果加入漏桶成功,則訪問相關(guān)接口,否則直接返回錯誤信息。下面是漏桶算法的配置:
@Configuration
public?class?LeakyBucketConfig?{
????@Bean("leakyBucket")
????public?LeakyBucket?leakyBucket()?{
????????return?new?LeakyBucket(10,?5);
????}
}
然后再是攔截器注解:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public?@interface?LeakyBucketLimit?{
????/**
?????*?限流器名稱
?????*?@return
?????*/
????String?limitBeanName();
????/**
?????*?攔截器class
?????*?
?????*?@return
?????*/
????Class>?limitClass()?default?LeakyBucket.class;
}
將該注解加到我們的目標接口上即可實現(xiàn)限流操作:
@LeakyBucketLimit(limitBeanName?=?"leakyBucket")
@GetMapping("/bucket")
public?Object?bucketTest()?{
????JSONObject?result?=?new?JSONObject();
????result.put("result",?"請求成功");
????logger.info("timestamp:?{},?result:?{}",?System.currentTimeMillis(),?result);
????return?result;
}
測試
這里測試直接通過postman批量調(diào)用即可(具體可以自行百度):

這里我創(chuàng)建了20個線程,然后直接調(diào)用接口:

從調(diào)用結(jié)果可以看出來,我們同時發(fā)起了20個請求,但是系統(tǒng)只接受了10個請求(也就是漏桶的上限),其余的請求直接被拋棄掉,說明限流效果已經(jīng)達到,但是從系統(tǒng)運行的時間戳來看,這種限流算法的實現(xiàn)出口并不均勻,效果上甚至和我們昨天分享的計數(shù)器限流差不多,當然這也是我想吐槽的,所以說各位小伙伴在抄網(wǎng)上代碼的時候,一定要親自實踐下,不能盲目抄作業(yè)。
結(jié)語
總結(jié)的話我在前面已經(jīng)說了:我對這個算法并不滿意。因為它的出口速率并不均勻,還需要進一步優(yōu)化,因此今天的demo示例只能算成功了一半——漏桶算法的web實現(xiàn)思路分享完了,主要是業(yè)務層和限流解耦的思路,但是關(guān)于漏桶算法的核心實現(xiàn)并沒解決,后面我打算參考guava的RateLimiter的休眠操作,優(yōu)化上面的算法,所以今天就先到這里吧,各位小伙伴,晚安喲!
