請求合并的三種方式,大大提高接口性能!
閱讀本文大概需要 8 分鐘。
前言
Hystrix Collapser
hystrix
"hystrixAspect"?class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect"?/>
collapser
我們在需要合并的方法上添加 @HystrixCollapser 注解,在定義好的合并方法上添加 @HystrixCommand 注解; single 方法只能傳入一個參數(shù),多參數(shù)情況下需要自己包裝一個參數(shù)類,而 batch 方法需要? java.util.List;single 方法返回? java.util.concurrent.Future, batch 方法返回?java.util.List,且要保證返回的結(jié)果數(shù)量和傳入的參數(shù)數(shù)量一致。
public?class?HystrixCollapserSample?{
????@HystrixCollapser(batchMethod?=?"batch")
????public?Future?single(String?input)? {
????????return?null;?//?single方法不會被執(zhí)行到
????}
????public?List?batch(List {?inputs) ?
????????return?inputs.stream().map(it?->?Boolean.TRUE).collect(Collectors.toList());
????}
}
源碼實現(xiàn)
在 spring-boot 內(nèi)注冊切面類的 bean,里面包含 @HystrixCollapser 注解切面; 在方法執(zhí)行時檢測到方法被 HystrixCollapser 注解后,spring 調(diào)用? methodsAnnotatedWithHystrixCommand方法來執(zhí)行 hystrix 代理;hystrix 獲取一個 collapser 實例(在當(dāng)前 scope 內(nèi)檢測不到即創(chuàng)建); hystrix 將當(dāng)前請求的參數(shù)提交給 collapser, 由 collapser 存儲在一個? concurrentHashMap (RequestArgumentType -> CollapsedRequest)內(nèi),此方法會創(chuàng)建一個 Observable 對象,并返回一個 觀察此對象的 Future 給業(yè)務(wù)線程;collpser 在創(chuàng)建時會創(chuàng)建一個 timer 線程,定時消費存儲的請求,timer 會將多個請求構(gòu)造成一個合并后的請求,調(diào)用 batch 執(zhí)行后將結(jié)果順序映射到輸出參數(shù),并通知 Future 任務(wù)已完成。
需要注意,由于需要等待 timer 執(zhí)行真正的請求操作,collapser 會導(dǎo)致所有的請求的 cost 都會增加約 timerInterval/2 ms;
配置
collapserKey,這個可以不用配置,hystrix 會默認使用當(dāng)前方法名; batchMethod,配置 batch 方法名,我們一般會將 single 方法和 batch 方法定義在同一個類內(nèi),直接填方法名即可; scope,最坑的配置項,也是逼我讀源碼的元兇, com.netflix.hystrix.HystrixCollapser.Scope?枚舉類,有 REQUEST, GLOBAL 兩種選項,在 scope 為 REQUEST 時,hystrix 會為每個請求都創(chuàng)建一個 collapser, 此時你會發(fā)現(xiàn) batch 方法執(zhí)行時,傳入的請求數(shù)總為1。而且 REQUEST 項還是默認項,不明白這樣請求合并還有什么意義;collapserProperties, 在此選項內(nèi)我們可以配置 hystrixCommand 的通用配置;
maxRequestsInBatch, 構(gòu)造批量請求時,使用的單個請求的最大數(shù)量; timerDelayInMilliseconds, 此選項配置 collapser 的 timer 線程多久會合并一次請求; requestCache.enabled, 配置提交請求時是否緩存;
@HystrixCollapser(
????????????batchMethod?=?"batch",
????????????collapserKey?=?"single",
????????????scope?=?com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
????????????collapserProperties?=?{
????????????????????@HystrixProperty(name?=?"maxRequestsInBatch",?value?=?"100"),
????????????????????@HystrixProperty(name?=?"timerDelayInMilliseconds",?value?=?"1000"),
????????????????????@HystrixProperty(name?=?"requestCache.enabled",?value?=?"true")
????????????})
BatchCollapser
設(shè)計
是一種 Collection,類似于 ArrayList 或 Queue,可以存重復(fù)元素且有順序; 在多線程環(huán)境中能安全地將里面的數(shù)據(jù)全取出來進行消費,而不用自己實現(xiàn)鎖。
java.util.concurrent?包內(nèi)的 LinkedBlockingDeque 剛好符合要求,首先它實現(xiàn)了 BlockingDeque 接口,多線程環(huán)境下的存取操作是安全的;此外,它還提供?drainTo(Collection super E> c, int maxElements)方法,可以將容器內(nèi) maxElements 個元素安全地取出來,放到 Collection c 中。實現(xiàn)
public?class?BatchCollapser<E>?implements?InitializingBean?{
?????private?static?final?Logger?logger?=?LoggerFactory.getLogger(BatchCollapser.class);
?????private?static?volatile?Map?instance?=?Maps.newConcurrentMap();
?????private?static?final?ScheduledExecutorService?SCHEDULE_EXECUTOR?=?Executors.newScheduledThreadPool(1);
?????private?volatile?LinkedBlockingDeque?batchContainer?=?new?LinkedBlockingDeque<>();
?????private?Handler,?Boolean>?cleaner;
?????private?long?interval;
?????private?int?threshHold;
?????private?BatchCollapser(Handler,?Boolean>?cleaner,?int?threshHold,?long?interval)
?{
?????????this.cleaner?=?cleaner;
?????????this.threshHold?=?threshHold;
?????????this.interval?=?interval;
?????}
?????@Override
?????public?void?afterPropertiesSet()?throws?Exception?{
?????????SCHEDULE_EXECUTOR.scheduleAtFixedRate(()?->?{
?????????????try?{
?????????????????this.clean();
?????????????}?catch?(Exception?e)?{
?????????????????logger.error("clean?container?exception",?e);
?????????????}
?????????},?0,?interval,?TimeUnit.MILLISECONDS);
?????}
?????public?void?submit(E?event)?{
?????????batchContainer.add(event);
?????????if?(batchContainer.size()?>=?threshHold)?{
?????????????clean();
?????????}
?????}
?????private?void?clean()?{
?????????List?transferList?=?Lists.newArrayListWithExpectedSize(threshHold);
?????????batchContainer.drainTo(transferList,?100);
?????????if?(CollectionUtils.isEmpty(transferList))?{
?????????????return;
?????????}
?????????try?{
?????????????cleaner.handle(transferList);
?????????}?catch?(Exception?e)?{
?????????????logger.error("batch?execute?error,?transferList:{}",?transferList,?e);
?????????}
?????}
?????public?static??BatchCollapser?getInstance(Handler ,?Boolean>?cleaner,?int?threshHold,?long?interval)
?{
?????????Class?jobClass?=?cleaner.getClass();
?????????if?(instance.get(jobClass)?==?null)?{
?????????????synchronized?(BatchCollapser.class)?{
?????????????????if?(instance.get(jobClass)?==?null)?{
?????????????????????instance.put(jobClass,?new?BatchCollapser<>(cleaner,?threshHold,?interval));
?????????????????}
?????????????}
?????????}
?????????return?instance.get(jobClass);
?????}
?}
由于合并器的全局性需求,需要將合并器實現(xiàn)為一個單例,另外為了提升它的通用性,內(nèi)部使用使用 concurrentHashMap 和 double check 實現(xiàn)了一個簡單的單例工廠。 為了區(qū)分不同用途的合并器,工廠需要傳入一個實現(xiàn)了 Handler 的實例,通過實例的 class 來對請求進行分組存儲。 由于? java.util.Timer?的阻塞特性,一個 Timer 線程在阻塞時不會啟動另一個同樣的 Timer 線程,所以使用?ScheduledExecutorService?定時啟動 Timer 線程。
ConcurrentHashMultiset
設(shè)計
ConcurrentHashMultiset,它不同于普通的 set 結(jié)構(gòu)存儲相同元素時直接覆蓋原有元素,而是給每個元素保持一個計數(shù) count, 插入重復(fù)時元素的 count 值加1。而且它在添加和刪除時并不加鎖也能保證線程安全,具體實現(xiàn)是通過一個?while(true)?循環(huán)嘗試操作,直到操作夠所需要的數(shù)量。ConcurrentHashMultiset?這種排重計數(shù)的特性,非常適合數(shù)據(jù)統(tǒng)計這種元素在短時間內(nèi)重復(fù)率很高的場景,經(jīng)過排重后的數(shù)量計算,可以大大降低下游服務(wù)器的壓力,即使重復(fù)率不高,能用少量的內(nèi)存空間換取系統(tǒng)可用性的提高,也是很劃算的。實現(xiàn)
ConcurrentHashMultiset?進行請求合并與使用普通容器在整體結(jié)構(gòu)上并無太大差異,具體類似于:if?(ConcurrentHashMultiset.isEmpty())?{
????return;
}
List?transferList?=?Lists.newArrayList();
ConcurrentHashMultiset.elementSet().forEach(request?->?{
????int?count?=?ConcurrentHashMultiset.count(request);
????if?(count?<=?0)?{
????????return;
????}
????transferList.add(count?==?1???request?:?new?Request(request.getIncrement()?*?count));
????ConcurrentHashMultiset.remove(request,?count);
});
小結(jié)
hystrix collapser: 需要每個請求的結(jié)果,并且不在意每個請求的 cost 會增加;BatchCollapser: 不在意請求的結(jié)果,需要請求合并能在時間和數(shù)量兩個維度上觸發(fā);ConcurrentHashMultiset:請求重復(fù)率很高的統(tǒng)計類場景;
BatchCollapser?和?ConcurrentHashMultiset?結(jié)合一下,在BatchCollapser 里使用?ConcurrentHashMultiset?作為容器,這樣就可以結(jié)合兩者的優(yōu)勢了。推薦閱讀:
內(nèi)容包含Java基礎(chǔ)、JavaWeb、MySQL性能優(yōu)化、JVM、鎖、百萬并發(fā)、消息隊列、高性能緩存、反射、Spring全家桶原理、微服務(wù)、Zookeeper、數(shù)據(jù)結(jié)構(gòu)、限流熔斷降級......等技術(shù)棧!
?戳閱讀原文領(lǐng)?。?/span>? ? ? ? ? ? ? ??? ??? ? ? ? ? ? ? ? ? ?朕已閱?
評論
圖片
表情

