<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          請求合并的 3 種方式,大大提高接口性能!

          共 4086字,需瀏覽 9分鐘

           ·

          2022-04-09 21:04


          來源:zhenbianshu.github.io/

          • 前言
          • Hystrix Collapser
          • BatchCollapser
          • ConcurrentHashMultiset
          • 小結(jié)

          將相似或重復(fù)請求在上游系統(tǒng)中合并后發(fā)往下游系統(tǒng),可以大大降低下游系統(tǒng)的負(fù)載,提升系統(tǒng)整體吞吐率。文章介紹了 hystrix collapser、ConcurrentHashMultiset、自實現(xiàn)BatchCollapser 三種請求合并技術(shù),并通過其具體實現(xiàn)對比各自適用的場景。

          前言

          工作中,我們常見的請求模型都是”請求-應(yīng)答”式,即一次請求中,服務(wù)給請求分配一個獨立的線程,一塊獨立的內(nèi)存空間,所有的操作都是獨立的,包括資源和系統(tǒng)運算。我們也知道,在請求中處理一次系統(tǒng) I/O 的消耗是非常大的,如果有非常多的請求都進行同一類 I/O 操作,那么是否可以將這些 I/O 操作都合并到一起,進行一次 I/O 操作,是否可以大大降低下游資源服務(wù)器的負(fù)擔(dān)呢?

          最近我工作之余的大部分時間都花在這個問題的探究上了,對比了幾個現(xiàn)有類庫,為了解決一個小問題把 hystrix javanica 的代碼翻了一遍,也根據(jù)自己工作中遇到的業(yè)務(wù)需求實現(xiàn)了一個簡單的合并類,收獲還是挺大的。可能這個需求有點”偏門”,在網(wǎng)上搜索結(jié)果并不多,也沒有綜合一點的資料,索性自己總結(jié)分享一下,希望能幫到后來遇到這種問題的小伙伴。

          Hystrix Collapser

          hystrix

          開源的請求合并類庫(知名的)好像也只有 Netflix 公司開源的 Hystrix 了, hystrix 專注于保持 WEB 服務(wù)器在高并發(fā)環(huán)境下的系統(tǒng)穩(wěn)定,我們常用它的熔斷器(Circuit Breaker) 來實現(xiàn)服務(wù)的服務(wù)隔離和災(zāi)時降級,有了它,可以使整個系統(tǒng)不至于被某一個接口的高并發(fā)洪流沖塌,即使接口掛了也可以將服務(wù)降級,返回一個人性化的響應(yīng)。請求合并作為一個保障下游服務(wù)穩(wěn)定的利器,在 hystrix 內(nèi)實現(xiàn)也并不意外。

          我們在使用 hystrix 時,常用它的 javanica 模塊,以注解的方式編寫 hystrix 代碼,使代碼更簡潔而且對業(yè)務(wù)代碼侵入更低。所以在項目中我們一般至少需要引用 hystrix-core 和 hystrix-javanica 兩個包。

          另外,hystrix 的實現(xiàn)都是通過 AOP,我們要還要在項目 xml 里顯式配置 HystrixAspect 的 bean 來啟用它。


          "hystrixAspect"?class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect"?/>

          collapser

          hystrix collapser 是 hystrix 內(nèi)的請求合并器,它有自定義 BatchMethod 和 注解兩種實現(xiàn)方式,自定義 BatchMethod 網(wǎng)上有各種教程,實現(xiàn)起來很復(fù)雜,需要手寫大量代碼,而注解方式只需要添加兩行注解即可,但配置方式我在官方文檔上也沒找見,中文方面本文應(yīng)該是獨一份兒了。

          其實現(xiàn)需要注意的是:

          • 我們在需要合并的方法上添加 @HystrixCollapser 注解,在定義好的合并方法上添加 @HystrixCommand 注解;
          • single 方法只能傳入一個參數(shù),多參數(shù)情況下需要自己包裝一個參數(shù)類,而 batch 方法需要 java.util.List
          • single 方法返回 java.util.concurrent.Future, batch 方法返回 java.util.List,且要保證返回的結(jié)果數(shù)量和傳入的參數(shù)數(shù)量一致。

          下面是一個簡單的示例:

          public?class?HystrixCollapserSample?{

          ????@HystrixCollapser(batchMethod?=?"batch")
          ????public?Future?single(String?input)?{
          ????????return?null;?//?single方法不會被執(zhí)行到
          ????}

          ????public?List?batch(List?inputs)?{
          ????????return?inputs.stream().map(it?->?Boolean.TRUE).collect(Collectors.toList());
          ????}
          }

          源碼實現(xiàn)

          為了解決 hystrix collapser 的配置問題看了下 hystrix javanica 的源碼,這里簡單總結(jié)一下 hystrix 請求合并器的具體實現(xiàn),源碼的詳細(xì)解析在我的筆記:Hystrix collasper 源碼解析。

          • 在 spring-boot 內(nèi)注冊切面類的 bean,里面包含 @HystrixCollapser 注解切面;
          • 在方法執(zhí)行時檢測到方法被 HystrixCollapser 注解后,spring 調(diào)用 methodsAnnotatedWithHystrixCommand方法來執(zhí)行 hystrix 代理;
          • hystrix 獲取一個 collapser 實例(在當(dāng)前 scope 內(nèi)檢測不到即創(chuàng)建);
          • hystrix 將當(dāng)前請求的參數(shù)提交給 collapser, 由 collapser 存儲在一個 concurrentHashMap (RequestArgumentType -> CollapsedRequest)內(nèi),此方法會創(chuàng)建一個 Observable 對象,并返回一個 觀察此對象的 Future 給業(yè)務(wù)線程;
          • collpser 在創(chuàng)建時會創(chuàng)建一個 timer 線程,定時消費存儲的請求,timer 會將多個請求構(gòu)造成一個合并后的請求,調(diào)用 batch 執(zhí)行后將結(jié)果順序映射到輸出參數(shù),并通知 Future 任務(wù)已完成。

          需要注意,由于需要等待 timer 執(zhí)行真正的請求操作,collapser 會導(dǎo)致所有的請求的 cost 都會增加約 timerInterval/2 ms;

          配置

          hystrix collapser 的配置需要在 @HystrixCollapser 注解上使用,主要包括兩個部分,專有配置和 hystrixCommand 通用配置;

          專有配置包括:

          • collapserKey,這個可以不用配置,hystrix 會默認(rèn)使用當(dāng)前方法名;
          • batchMethod,配置 batch 方法名,我們一般會將 single 方法和 batch 方法定義在同一個類內(nèi),直接填方法名即可;
          • scope,最坑的配置項,也是逼我讀源碼的元兇,com.netflix.hystrix.HystrixCollapser.Scope 枚舉類,有 REQUEST, GLOBAL 兩種選項,在 scope 為 REQUEST 時,hystrix 會為每個請求都創(chuàng)建一個 collapser, 此時你會發(fā)現(xiàn) batch 方法執(zhí)行時,傳入的請求數(shù)總為1。而且 REQUEST 項還是默認(rèn)項,不明白這樣請求合并還有什么意義;
          • collapserProperties, 在此選項內(nèi)我們可以配置 hystrixCommand 的通用配置;

          通用配置包括:

          • maxRequestsInBatch, 構(gòu)造批量請求時,使用的單個請求的最大數(shù)量;
          • timerDelayInMilliseconds, 此選項配置 collapser 的 timer 線程多久會合并一次請求;
          • requestCache.enabled, 配置提交請求時是否緩存;

          一個完整的配置如下:

          @HystrixCollapser(
          ????????????batchMethod?=?"batch",
          ????????????collapserKey?=?"single",
          ????????????scope?=?com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
          ????????????collapserProperties?=?{
          ????????????????????@HystrixProperty(name?=?"maxRequestsInBatch",?value?=?"100"),
          ????????????????????@HystrixProperty(name?=?"timerDelayInMilliseconds",?value?=?"1000"),
          ????????????????????@HystrixProperty(name?=?"requestCache.enabled",?value?=?"true")
          ????????????})

          BatchCollapser

          設(shè)計

          由于業(yè)務(wù)需求,我們并不太關(guān)心被合并請求的返回值,而且覺得 hystrix 保持那么多的 Future 并沒有必要,于是自己實現(xiàn)了一個簡單的請求合并器,業(yè)務(wù)線程簡單地將請求放到一個容器里,請求數(shù)累積到一定量或延遲了一定的時間,就取出容器內(nèi)的數(shù)據(jù)統(tǒng)一發(fā)送給下游系統(tǒng)。

          設(shè)計思想跟 hystrix 類似,合并器有一個字段作為存儲請求的容器,且設(shè)置一個 timer 線程定時消費容器內(nèi)的請求,業(yè)務(wù)線程將請求參數(shù)提交到合并 器的容器內(nèi)。不同之處在于,業(yè)務(wù)線程將請求提交給容器后立即同步返回成功,不必管請求的消費結(jié)果,這樣便實現(xiàn)了時間維度上的合并觸發(fā)。

          另外,我還添加了另外一個維度的觸發(fā)條件,每次將請求參數(shù)添加到容器后都會檢驗一下容器內(nèi)請求的數(shù)量,如果數(shù)量達(dá)到一定的閾值,將在業(yè)務(wù)線程內(nèi)合并執(zhí)行一次。

          由于有兩個維度會觸發(fā)合并,就不可避免會遇到線程安全問題。為了保證容器內(nèi)的請求不會被多個線程重復(fù)消費或都漏掉,我需要一個容器能滿足以下條件:

          • 是一種 Collection,類似于 ArrayList 或 Queue,可以存重復(fù)元素且有順序;
          • 在多線程環(huán)境中能安全地將里面的數(shù)據(jù)全取出來進行消費,而不用自己實現(xiàn)鎖。

          java.util.concurrent 包內(nèi)的 LinkedBlockingDeque 剛好符合要求,首先它實現(xiàn)了 BlockingDeque 接口,多線程環(huán)境下的存取操作是安全的;此外,它還提供 drainTo(Collection c, int maxElements)方法,可以將容器內(nèi) maxElements 個元素安全地取出來,放到 Collection c 中。

          實現(xiàn)

          以下是具體的代碼實現(xiàn):

          public?class?BatchCollapser<E>?implements?InitializingBean?{
          ?????private?static?final?Logger?logger?=?LoggerFactory.getLogger(BatchCollapser.class);
          ?????private?static?volatile?Map?instance?=?Maps.newConcurrentMap();
          ?????private?static?final?ScheduledExecutorService?SCHEDULE_EXECUTOR?=?Executors.newScheduledThreadPool(1);

          ?????private?volatile?LinkedBlockingDeque?batchContainer?=?new?LinkedBlockingDeque<>();
          ?????private?Handler,?Boolean>?cleaner;
          ?????private?long?interval;
          ?????private?int?threshHold;

          ?????private?BatchCollapser(Handler,?Boolean>?cleaner,?int?threshHold,?long?interval)?{
          ?????????this.cleaner?=?cleaner;
          ?????????this.threshHold?=?threshHold;
          ?????????this.interval?=?interval;
          ?????}

          ?????@Override
          ?????public?void?afterPropertiesSet()?throws?Exception?{
          ?????????SCHEDULE_EXECUTOR.scheduleAtFixedRate(()?->?{
          ?????????????try?{
          ?????????????????this.clean();
          ?????????????}?catch?(Exception?e)?{
          ?????????????????logger.error("clean?container?exception",?e);
          ?????????????}
          ?????????},?0,?interval,?TimeUnit.MILLISECONDS);
          ?????}

          ?????public?void?submit(E?event)?{
          ?????????batchContainer.add(event);
          ?????????if?(batchContainer.size()?>=?threshHold)?{
          ?????????????clean();
          ?????????}
          ?????}

          ?????private?void?clean()?{
          ?????????List?transferList?=?Lists.newArrayListWithExpectedSize(threshHold);
          ?????????batchContainer.drainTo(transferList,?100);
          ?????????if?(CollectionUtils.isEmpty(transferList))?{
          ?????????????return;
          ?????????}

          ?????????try?{
          ?????????????cleaner.handle(transferList);
          ?????????}?catch?(Exception?e)?{
          ?????????????logger.error("batch?execute?error,?transferList:{}",?transferList,?e);
          ?????????}
          ?????}

          ?????public?static??BatchCollapser?getInstance(Handler,?Boolean>?cleaner,?int?threshHold,?long?interval)?{
          ?????????Class?jobClass?=?cleaner.getClass();
          ?????????if?(instance.get(jobClass)?==?null)?{
          ?????????????synchronized?(BatchCollapser.class)?{
          ?????????????????if?(instance.get(jobClass)?==?null)?{
          ?????????????????????instance.put(jobClass,?new?BatchCollapser<>(cleaner,?threshHold,?interval));
          ?????????????????}
          ?????????????}
          ?????????}

          ?????????return?instance.get(jobClass);
          ?????}
          ?}

          以下代碼內(nèi)需要注意的點:

          • 由于合并器的全局性需求,需要將合并器實現(xiàn)為一個單例,另外為了提升它的通用性,內(nèi)部使用使用 concurrentHashMap 和 double check 實現(xiàn)了一個簡單的單例工廠。
          • 為了區(qū)分不同用途的合并器,工廠需要傳入一個實現(xiàn)了 Handler 的實例,通過實例的 class 來對請求進行分組存儲。
          • 由于 java.util.Timer 的阻塞特性,一個 Timer 線程在阻塞時不會啟動另一個同樣的 Timer 線程,所以使用 ScheduledExecutorService 定時啟動 Timer 線程。

          ConcurrentHashMultiset

          設(shè)計

          上面介紹的請求合并都是將多個請求一次發(fā)送,下游服務(wù)器處理時本質(zhì)上還是多個請求,最好的請求合并是在內(nèi)存中進行,將請求結(jié)果簡單合并成一個發(fā)送給下游服務(wù)器。如我們經(jīng)常會遇到的需求:元素分值累加或數(shù)據(jù)統(tǒng)計,就可以先在內(nèi)存中將某一項的分值或數(shù)據(jù)累加起來,定時請求數(shù)據(jù)庫保存。

          Guava 內(nèi)就提供了這么一種數(shù)據(jù)結(jié)構(gòu):ConcurrentHashMultiset,它不同于普通的 set 結(jié)構(gòu)存儲相同元素時直接覆蓋原有元素,而是給每個元素保持一個計數(shù) count, 插入重復(fù)時元素的 count 值加1。而且它在添加和刪除時并不加鎖也能保證線程安全,具體實現(xiàn)是通過一個 while(true) 循環(huán)嘗試操作,直到操作夠所需要的數(shù)量。

          ConcurrentHashMultiset 這種排重計數(shù)的特性,非常適合數(shù)據(jù)統(tǒng)計這種元素在短時間內(nèi)重復(fù)率很高的場景,經(jīng)過排重后的數(shù)量計算,可以大大降低下游服務(wù)器的壓力,即使重復(fù)率不高,能用少量的內(nèi)存空間換取系統(tǒng)可用性的提高,也是很劃算的。

          實現(xiàn)

          使用 ConcurrentHashMultiset 進行請求合并與使用普通容器在整體結(jié)構(gòu)上并無太大差異,具體類似于:

          if?(ConcurrentHashMultiset.isEmpty())?{
          ????return;
          }

          List?transferList?=?Lists.newArrayList();
          ConcurrentHashMultiset.elementSet().forEach(request?->?{
          ????int?count?=?ConcurrentHashMultiset.count(request);
          ????if?(count?<=?0)?{
          ????????return;
          ????}

          ????transferList.add(count?==?1???request?:?new?Request(request.getIncrement()?*?count));
          ????ConcurrentHashMultiset.remove(request,?count);
          });

          小結(jié)

          最后總結(jié)一下各個技術(shù)適用的場景:

          • hystrix collapser: 需要每個請求的結(jié)果,并且不在意每個請求的 cost 會增加;
          • BatchCollapser: 不在意請求的結(jié)果,需要請求合并能在時間和數(shù)量兩個維度上觸發(fā);
          • ConcurrentHashMultiset:請求重復(fù)率很高的統(tǒng)計類場景;

          另外,如果選擇自己來實現(xiàn)的話,完全可以將 BatchCollapserConcurrentHashMultiset 結(jié)合一下,在BatchCollapser 里使用 ConcurrentHashMultiset 作為容器,這樣就可以結(jié)合兩者的優(yōu)勢了。




          推薦閱讀:

          世界的真實格局分析,地球人類社會底層運行原理

          不是你需要中臺,而是一名合格的架構(gòu)師(附各大廠中臺建設(shè)PPT)

          億級(無限級)并發(fā),沒那么難

          論數(shù)字化轉(zhuǎn)型——轉(zhuǎn)什么,如何轉(zhuǎn)?

          華為干部與人才發(fā)展手冊(附PPT)

          企業(yè)10大管理流程圖,數(shù)字化轉(zhuǎn)型從業(yè)者必備!

          【中臺實踐】華為大數(shù)據(jù)中臺架構(gòu)分享.pdf

          華為的數(shù)字化轉(zhuǎn)型方法論

          華為如何實施數(shù)字化轉(zhuǎn)型(附PPT)

          超詳細(xì)280頁Docker實戰(zhàn)文檔!開放下載

          華為大數(shù)據(jù)解決方案(PPT)

          瀏覽 59
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  人妻互换一二三区免费 | 中文字幕无码乱伦 | 无码 高潮 在线白丝护士 | 在线亚洲中文在线观看 | 爱插综合网|