<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          請求合并的三種方式,大大提高接口性能!

          共 4184字,需瀏覽 9分鐘

           ·

          2022-02-17 05:32

          程序員的成長之路
          互聯(lián)網(wǎng)/程序員/技術(shù)/資料共享?
          關(guān)注


          閱讀本文大概需要 8 分鐘。

          作者:zhenbianshu
          來源:https://zhenbianshu.github.io/
          將相似或重復(fù)請求在上游系統(tǒng)中合并后發(fā)往下游系統(tǒng),可以大大降低下游系統(tǒng)的負載,提升系統(tǒng)整體吞吐率。
          文章介紹了 hystrix collapser、ConcurrentHashMultiset、自實現(xiàn)BatchCollapser 三種請求合并技術(shù),并通過其具體實現(xiàn)對比各自適用的場景。

          前言

          工作中,我們常見的請求模型都是”請求-應(yīng)答”式,即一次請求中,服務(wù)給請求分配一個獨立的線程,一塊獨立的內(nèi)存空間,所有的操作都是獨立的,包括資源和系統(tǒng)運算。我們也知道,在請求中處理一次系統(tǒng) I/O 的消耗是非常大的,如果有非常多的請求都進行同一類 I/O 操作,那么是否可以將這些 I/O 操作都合并到一起,進行一次 I/O 操作,是否可以大大降低下游資源服務(wù)器的負擔(dān)呢?
          最近我工作之余的大部分時間都花在這個問題的探究上了,對比了幾個現(xiàn)有類庫,為了解決一個小問題把 hystrix javanica 的代碼翻了一遍,也根據(jù)自己工作中遇到的業(yè)務(wù)需求實現(xiàn)了一個簡單的合并類,收獲還是挺大的??赡苓@個需求有點”偏門”,在網(wǎng)上搜索結(jié)果并不多,也沒有綜合一點的資料,索性自己總結(jié)分享一下,希望能幫到后來遇到這種問題的小伙伴。

          Hystrix Collapser

          hystrix
          開源的請求合并類庫(知名的)好像也只有 Netflix 公司開源的 Hystrix 了, hystrix 專注于保持 WEB 服務(wù)器在高并發(fā)環(huán)境下的系統(tǒng)穩(wěn)定,我們常用它的熔斷器(Circuit Breaker) 來實現(xiàn)服務(wù)的服務(wù)隔離和災(zāi)時降級,有了它,可以使整個系統(tǒng)不至于被某一個接口的高并發(fā)洪流沖塌,即使接口掛了也可以將服務(wù)降級,返回一個人性化的響應(yīng)。請求合并作為一個保障下游服務(wù)穩(wěn)定的利器,在 hystrix 內(nèi)實現(xiàn)也并不意外。
          我們在使用 hystrix 時,常用它的 javanica 模塊,以注解的方式編寫 hystrix 代碼,使代碼更簡潔而且對業(yè)務(wù)代碼侵入更低。所以在項目中我們一般至少需要引用 hystrix-core 和 hystrix-javanica 兩個包。
          另外,hystrix 的實現(xiàn)都是通過 AOP,我們要還要在項目 xml 里顯式配置 HystrixAspect 的 bean 來啟用它。

          "hystrixAspect"?class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect"?/>
          collapser
          hystrix collapser 是 hystrix 內(nèi)的請求合并器,它有自定義 BatchMethod 和 注解兩種實現(xiàn)方式,自定義 BatchMethod 網(wǎng)上有各種教程,實現(xiàn)起來很復(fù)雜,需要手寫大量代碼,而注解方式只需要添加兩行注解即可,但配置方式我在官方文檔上也沒找見,中文方面本文應(yīng)該是獨一份兒了。
          其實現(xiàn)需要注意的是:
          • 我們在需要合并的方法上添加 @HystrixCollapser 注解,在定義好的合并方法上添加 @HystrixCommand 注解;
          • single 方法只能傳入一個參數(shù),多參數(shù)情況下需要自己包裝一個參數(shù)類,而 batch 方法需要?java.util.List;
          • single 方法返回?java.util.concurrent.Future, batch 方法返回?java.util.List,且要保證返回的結(jié)果數(shù)量和傳入的參數(shù)數(shù)量一致。
          下面是一個簡單的示例:
          public?class?HystrixCollapserSample?{
          ????@HystrixCollapser(batchMethod?=?"batch")
          ????public?Future?single(String?input)?{
          ????????return?null;?//?single方法不會被執(zhí)行到
          ????}
          ????public?List?batch(List?inputs)?{
          ????????return?inputs.stream().map(it?->?Boolean.TRUE).collect(Collectors.toList());
          ????}
          }
          源碼實現(xiàn)
          為了解決 hystrix collapser 的配置問題看了下 hystrix javanica 的源碼,這里簡單總結(jié)一下 hystrix 請求合并器的具體實現(xiàn),源碼的詳細解析在我的筆記:Hystrix collasper 源碼解析。
          • 在 spring-boot 內(nèi)注冊切面類的 bean,里面包含 @HystrixCollapser 注解切面;
          • 在方法執(zhí)行時檢測到方法被 HystrixCollapser 注解后,spring 調(diào)用?methodsAnnotatedWithHystrixCommand方法來執(zhí)行 hystrix 代理;
          • hystrix 獲取一個 collapser 實例(在當(dāng)前 scope 內(nèi)檢測不到即創(chuàng)建);
          • hystrix 將當(dāng)前請求的參數(shù)提交給 collapser, 由 collapser 存儲在一個?concurrentHashMap (RequestArgumentType -> CollapsedRequest)內(nèi),此方法會創(chuàng)建一個 Observable 對象,并返回一個 觀察此對象的 Future 給業(yè)務(wù)線程;
          • collpser 在創(chuàng)建時會創(chuàng)建一個 timer 線程,定時消費存儲的請求,timer 會將多個請求構(gòu)造成一個合并后的請求,調(diào)用 batch 執(zhí)行后將結(jié)果順序映射到輸出參數(shù),并通知 Future 任務(wù)已完成。
          需要注意,由于需要等待 timer 執(zhí)行真正的請求操作,collapser 會導(dǎo)致所有的請求的 cost 都會增加約 timerInterval/2 ms;
          配置
          hystrix collapser 的配置需要在 @HystrixCollapser 注解上使用,主要包括兩個部分,專有配置和 hystrixCommand 通用配置;
          專有配置包括:
          • collapserKey,這個可以不用配置,hystrix 會默認使用當(dāng)前方法名;
          • batchMethod,配置 batch 方法名,我們一般會將 single 方法和 batch 方法定義在同一個類內(nèi),直接填方法名即可;
          • scope,最坑的配置項,也是逼我讀源碼的元兇,com.netflix.hystrix.HystrixCollapser.Scope?枚舉類,有 REQUEST, GLOBAL 兩種選項,在 scope 為 REQUEST 時,hystrix 會為每個請求都創(chuàng)建一個 collapser, 此時你會發(fā)現(xiàn) batch 方法執(zhí)行時,傳入的請求數(shù)總為1。而且 REQUEST 項還是默認項,不明白這樣請求合并還有什么意義;
          • collapserProperties, 在此選項內(nèi)我們可以配置 hystrixCommand 的通用配置;
          通用配置包括:
          • maxRequestsInBatch, 構(gòu)造批量請求時,使用的單個請求的最大數(shù)量;
          • timerDelayInMilliseconds, 此選項配置 collapser 的 timer 線程多久會合并一次請求;
          • requestCache.enabled, 配置提交請求時是否緩存;
          一個完整的配置如下:
          @HystrixCollapser(
          ????????????batchMethod?=?"batch",
          ????????????collapserKey?=?"single",
          ????????????scope?=?com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
          ????????????collapserProperties?=?{
          ????????????????????@HystrixProperty(name?=?"maxRequestsInBatch",?value?=?"100"),
          ????????????????????@HystrixProperty(name?=?"timerDelayInMilliseconds",?value?=?"1000"),
          ????????????????????@HystrixProperty(name?=?"requestCache.enabled",?value?=?"true")
          ????????????})

          BatchCollapser

          設(shè)計
          由于業(yè)務(wù)需求,我們并不太關(guān)心被合并請求的返回值,而且覺得 hystrix 保持那么多的 Future 并沒有必要,于是自己實現(xiàn)了一個簡單的請求合并器,業(yè)務(wù)線程簡單地將請求放到一個容器里,請求數(shù)累積到一定量或延遲了一定的時間,就取出容器內(nèi)的數(shù)據(jù)統(tǒng)一發(fā)送給下游系統(tǒng)。
          設(shè)計思想跟 hystrix 類似,合并器有一個字段作為存儲請求的容器,且設(shè)置一個 timer 線程定時消費容器內(nèi)的請求,業(yè)務(wù)線程將請求參數(shù)提交到合并 器的容器內(nèi)。不同之處在于,業(yè)務(wù)線程將請求提交給容器后立即同步返回成功,不必管請求的消費結(jié)果,這樣便實現(xiàn)了時間維度上的合并觸發(fā)。
          另外,我還添加了另外一個維度的觸發(fā)條件,每次將請求參數(shù)添加到容器后都會檢驗一下容器內(nèi)請求的數(shù)量,如果數(shù)量達到一定的閾值,將在業(yè)務(wù)線程內(nèi)合并執(zhí)行一次。
          由于有兩個維度會觸發(fā)合并,就不可避免會遇到線程安全問題。為了保證容器內(nèi)的請求不會被多個線程重復(fù)消費或都漏掉,我需要一個容器能滿足以下條件:
          • 是一種 Collection,類似于 ArrayList 或 Queue,可以存重復(fù)元素且有順序;
          • 在多線程環(huán)境中能安全地將里面的數(shù)據(jù)全取出來進行消費,而不用自己實現(xiàn)鎖。
          java.util.concurrent?包內(nèi)的 LinkedBlockingDeque 剛好符合要求,首先它實現(xiàn)了 BlockingDeque 接口,多線程環(huán)境下的存取操作是安全的;此外,它還提供?drainTo(Collection c, int maxElements)方法,可以將容器內(nèi) maxElements 個元素安全地取出來,放到 Collection c 中。
          實現(xiàn)
          以下是具體的代碼實現(xiàn):
          public?class?BatchCollapser<E>?implements?InitializingBean?{
          ?????private?static?final?Logger?logger?=?LoggerFactory.getLogger(BatchCollapser.class);
          ?????private?static?volatile?Map?instance?=?Maps.newConcurrentMap();
          ?????private?static?final?ScheduledExecutorService?SCHEDULE_EXECUTOR?=?Executors.newScheduledThreadPool(1);
          ?????private?volatile?LinkedBlockingDeque?batchContainer?=?new?LinkedBlockingDeque<>();
          ?????private?Handler,?Boolean>?cleaner;
          ?????private?long?interval;
          ?????private?int?threshHold;
          ?????private?BatchCollapser(Handler,?Boolean>?cleaner,?int?threshHold,?long?interval)?{
          ?????????this.cleaner?=?cleaner;
          ?????????this.threshHold?=?threshHold;
          ?????????this.interval?=?interval;
          ?????}

          ?????@Override
          ?????public?void?afterPropertiesSet()?throws?Exception?{
          ?????????SCHEDULE_EXECUTOR.scheduleAtFixedRate(()?->?{
          ?????????????try?{
          ?????????????????this.clean();
          ?????????????}?catch?(Exception?e)?{
          ?????????????????logger.error("clean?container?exception",?e);
          ?????????????}
          ?????????},?0,?interval,?TimeUnit.MILLISECONDS);
          ?????}

          ?????public?void?submit(E?event)?{
          ?????????batchContainer.add(event);
          ?????????if?(batchContainer.size()?>=?threshHold)?{
          ?????????????clean();
          ?????????}
          ?????}

          ?????private?void?clean()?{
          ?????????List?transferList?=?Lists.newArrayListWithExpectedSize(threshHold);
          ?????????batchContainer.drainTo(transferList,?100);
          ?????????if?(CollectionUtils.isEmpty(transferList))?{
          ?????????????return;
          ?????????}

          ?????????try?{
          ?????????????cleaner.handle(transferList);
          ?????????}?catch?(Exception?e)?{
          ?????????????logger.error("batch?execute?error,?transferList:{}",?transferList,?e);
          ?????????}
          ?????}

          ?????public?static??BatchCollapser?getInstance(Handler,?Boolean>?cleaner,?int?threshHold,?long?interval)?{
          ?????????Class?jobClass?=?cleaner.getClass();
          ?????????if?(instance.get(jobClass)?==?null)?{
          ?????????????synchronized?(BatchCollapser.class)?{
          ?????????????????if?(instance.get(jobClass)?==?null)?{
          ?????????????????????instance.put(jobClass,?new?BatchCollapser<>(cleaner,?threshHold,?interval));
          ?????????????????}
          ?????????????}
          ?????????}

          ?????????return?instance.get(jobClass);
          ?????}
          ?}
          以下代碼內(nèi)需要注意的點:
          • 由于合并器的全局性需求,需要將合并器實現(xiàn)為一個單例,另外為了提升它的通用性,內(nèi)部使用使用 concurrentHashMap 和 double check 實現(xiàn)了一個簡單的單例工廠。
          • 為了區(qū)分不同用途的合并器,工廠需要傳入一個實現(xiàn)了 Handler 的實例,通過實例的 class 來對請求進行分組存儲。
          • 由于?java.util.Timer?的阻塞特性,一個 Timer 線程在阻塞時不會啟動另一個同樣的 Timer 線程,所以使用?ScheduledExecutorService?定時啟動 Timer 線程。

          ConcurrentHashMultiset

          設(shè)計
          上面介紹的請求合并都是將多個請求一次發(fā)送,下游服務(wù)器處理時本質(zhì)上還是多個請求,最好的請求合并是在內(nèi)存中進行,將請求結(jié)果簡單合并成一個發(fā)送給下游服務(wù)器。如我們經(jīng)常會遇到的需求:元素分值累加或數(shù)據(jù)統(tǒng)計,就可以先在內(nèi)存中將某一項的分值或數(shù)據(jù)累加起來,定時請求數(shù)據(jù)庫保存。
          Guava 內(nèi)就提供了這么一種數(shù)據(jù)結(jié)構(gòu):ConcurrentHashMultiset,它不同于普通的 set 結(jié)構(gòu)存儲相同元素時直接覆蓋原有元素,而是給每個元素保持一個計數(shù) count, 插入重復(fù)時元素的 count 值加1。而且它在添加和刪除時并不加鎖也能保證線程安全,具體實現(xiàn)是通過一個?while(true)?循環(huán)嘗試操作,直到操作夠所需要的數(shù)量。
          ConcurrentHashMultiset?這種排重計數(shù)的特性,非常適合數(shù)據(jù)統(tǒng)計這種元素在短時間內(nèi)重復(fù)率很高的場景,經(jīng)過排重后的數(shù)量計算,可以大大降低下游服務(wù)器的壓力,即使重復(fù)率不高,能用少量的內(nèi)存空間換取系統(tǒng)可用性的提高,也是很劃算的。
          實現(xiàn)
          使用?ConcurrentHashMultiset?進行請求合并與使用普通容器在整體結(jié)構(gòu)上并無太大差異,具體類似于:
          if?(ConcurrentHashMultiset.isEmpty())?{
          ????return;
          }

          List?transferList?=?Lists.newArrayList();
          ConcurrentHashMultiset.elementSet().forEach(request?->?{
          ????int?count?=?ConcurrentHashMultiset.count(request);
          ????if?(count?<=?0)?{
          ????????return;
          ????}

          ????transferList.add(count?==?1???request?:?new?Request(request.getIncrement()?*?count));
          ????ConcurrentHashMultiset.remove(request,?count);
          });

          小結(jié)

          最后總結(jié)一下各個技術(shù)適用的場景:
          • hystrix collapser: 需要每個請求的結(jié)果,并且不在意每個請求的 cost 會增加;
          • BatchCollapser: 不在意請求的結(jié)果,需要請求合并能在時間和數(shù)量兩個維度上觸發(fā);
          • ConcurrentHashMultiset:請求重復(fù)率很高的統(tǒng)計類場景;
          另外,如果選擇自己來實現(xiàn)的話,完全可以將?BatchCollapser?和?ConcurrentHashMultiset?結(jié)合一下,在BatchCollapser 里使用?ConcurrentHashMultiset?作為容器,這樣就可以結(jié)合兩者的優(yōu)勢了。

          推薦閱讀:

          吳恩達,確診新冠陽性!

          MySQL的varchar水真的太深了,你真的會用嗎?

          互聯(lián)網(wǎng)初中高級大廠面試題(9個G)

          內(nèi)容包含Java基礎(chǔ)、JavaWeb、MySQL性能優(yōu)化、JVM、鎖、百萬并發(fā)、消息隊列、高性能緩存、反射、Spring全家桶原理、微服務(wù)、Zookeeper、數(shù)據(jù)結(jié)構(gòu)、限流熔斷降級......等技術(shù)棧!

          ?戳閱讀原文領(lǐng)?。?/span>? ? ? ? ? ? ? ??? ??? ? ? ? ? ? ? ? ? ?朕已閱?

          瀏覽 54
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  中曰韩欧美一级 | 91在线导航 | 亚洲三级片无码高清 | 人妻日韩精品中文字幕 | 91午夜影院 |