<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          京東一面:說(shuō)說(shuō) CompletableFuture 的實(shí)現(xiàn)原理和使用場(chǎng)景?懵逼了。。

          共 360字,需瀏覽 1分鐘

           ·

          2022-04-16 14:21


          1.概述

          CompletableFuture是jdk1.8引入的實(shí)現(xiàn)類。擴(kuò)展了Future和CompletionStage,是一個(gè)可以在任務(wù)完成階段觸發(fā)一些操作Future。簡(jiǎn)單的來(lái)講就是可以實(shí)現(xiàn)異步回調(diào)。

          2.為什么引入CompletableFuture

          對(duì)于jdk1.5的Future,雖然提供了異步處理任務(wù)的能力,但是獲取結(jié)果的方式很不優(yōu)雅,還是需要通過(guò)阻塞(或者輪訓(xùn))的方式。如何避免阻塞呢?其實(shí)就是注冊(cè)回調(diào)。

          業(yè)界結(jié)合觀察者模式實(shí)現(xiàn)異步回調(diào)。也就是當(dāng)任務(wù)執(zhí)行完成后去通知觀察者。比如Netty的ChannelFuture,可以通過(guò)注冊(cè)監(jiān)聽(tīng)實(shí)現(xiàn)異步結(jié)果的處理。

          Netty的ChannelFuture

          public?Promise?addListener(GenericFutureListenersuper?V>>?listener)?{
          ????checkNotNull(listener,?"listener");
          ????synchronized?(this)?{
          ????????addListener0(listener);
          ????}
          ????if?(isDone())?{
          ????????notifyListeners();
          ????}
          ????return?this;
          }
          private?boolean?setValue0(Object?objResult)?{
          ????if?(RESULT_UPDATER.compareAndSet(this,?null,?objResult)?||
          ????????RESULT_UPDATER.compareAndSet(this,?UNCANCELLABLE,?objResult))?{
          ????????if?(checkNotifyWaiters())?{
          ????????????notifyListeners();
          ????????}
          ????????return?true;
          ????}
          ????return?false;
          }

          通過(guò)addListener方法注冊(cè)監(jiān)聽(tīng)。如果任務(wù)完成,會(huì)調(diào)用notifyListeners通知。

          CompletableFuture通過(guò)擴(kuò)展Future,引入函數(shù)式編程,通過(guò)回調(diào)的方式去處理結(jié)果。

          3.功能

          CompletableFuture的功能主要體現(xiàn)在他的CompletionStage。

          可以實(shí)現(xiàn)如下等功能

          • 轉(zhuǎn)換(thenCompose)
          • 組合(thenCombine)
          • 消費(fèi)(thenAccept)
          • 運(yùn)行(thenRun)。
          • 帶返回的消費(fèi)(thenApply)

          消費(fèi)和運(yùn)行的區(qū)別:

          消費(fèi)使用執(zhí)行結(jié)果。運(yùn)行則只是運(yùn)行特定任務(wù)。具體其他功能大家可以根據(jù)需求自行查看。

          CompletableFuture借助CompletionStage的方法可以實(shí)現(xiàn)鏈?zhǔn)秸{(diào)用。并且可以選擇同步或者異步兩種方式。

          這里舉個(gè)簡(jiǎn)單的例子來(lái)體驗(yàn)一下他的功能。

          public?static?void?thenApply()?{
          ????ExecutorService?executorService?=?Executors.newFixedThreadPool(2);
          ????CompletableFuture?cf?=?CompletableFuture.supplyAsync(()?->?{
          ????????try?{
          ????????????//??Thread.sleep(2000);
          ????????}?catch?(Exception?e)?{
          ????????????e.printStackTrace();
          ????????}
          ????????System.out.println("supplyAsync?"?+?Thread.currentThread().getName());
          ????????return?"hello";
          ????},?executorService).thenApplyAsync(s?->?{
          ????????System.out.println(s?+?"world");
          ????????return?"hhh";
          ????},?executorService);
          ????cf.thenRunAsync(()?->?{
          ????????System.out.println("ddddd");
          ????});
          ????cf.thenRun(()?->?{
          ????????System.out.println("ddddsd");
          ????});
          ????cf.thenRun(()?->?{
          ????????System.out.println(Thread.currentThread());
          ????????System.out.println("dddaewdd");
          ????});
          }

          執(zhí)行結(jié)果

          supplyAsync?pool-1-thread-1
          helloworld
          ddddd
          ddddsd
          Thread[main,5,main]
          dddaewdd

          根據(jù)結(jié)果我們可以看到會(huì)有序執(zhí)行對(duì)應(yīng)任務(wù)。

          注意:

          如果是同步執(zhí)行cf.thenRun。他的執(zhí)行線程可能main線程,也可能是執(zhí)行源任務(wù)的線程。如果執(zhí)行源任務(wù)的線程在main調(diào)用之前執(zhí)行完了任務(wù)。那么cf.thenRun方法會(huì)由main線程調(diào)用。

          這里說(shuō)明一下,如果是同一任務(wù)的依賴任務(wù)有多個(gè):

          • 如果這些依賴任務(wù)都是同步執(zhí)行。那么假如這些任務(wù)被當(dāng)前調(diào)用線程(main)執(zhí)行,則是有序執(zhí)行,假如被執(zhí)行源任務(wù)的線程執(zhí)行,那么會(huì)是倒序執(zhí)行。因?yàn)閮?nèi)部任務(wù)數(shù)據(jù)結(jié)構(gòu)為L(zhǎng)IFO。
          • 如果這些依賴任務(wù)都是異步執(zhí)行,那么他會(huì)通過(guò)異步線程池去執(zhí)行任務(wù)。不能保證任務(wù)的執(zhí)行順序。

          上面的結(jié)論是通過(guò)閱讀源代碼得到的。下面我們深入源代碼。

          4.源碼追蹤

          創(chuàng)建CompletableFuture

          創(chuàng)建的方法有很多,甚至可以直接new一個(gè)。我們來(lái)看一下supplyAsync異步創(chuàng)建的方法。

          public?static??CompletableFuture?supplyAsync(Supplier?supplier,
          ???????????????????????????????????????????????????Executor?executor)
          ?
          {
          ????return?asyncSupplyStage(screenExecutor(executor),?supplier);
          }
          static?Executor?screenExecutor(Executor?e)?{
          ????if?(!useCommonPool?&&?e?==?ForkJoinPool.commonPool())
          ????????return?asyncPool;
          ????if?(e?==?null)?throw?new?NullPointerException();
          ????return?e;
          }

          入?yún)upplier,帶返回值的函數(shù)。如果是異步方法,并且傳遞了執(zhí)行器,那么會(huì)使用傳入的執(zhí)行器去執(zhí)行任務(wù)。否則采用公共的ForkJoin并行線程池,如果不支持并行,新建一個(gè)線程去執(zhí)行。

          這里我們需要注意ForkJoin是通過(guò)守護(hù)線程去執(zhí)行任務(wù)的。所以必須有非守護(hù)線程的存在才行。

          asyncSupplyStage方法

          static??CompletableFuture?asyncSupplyStage(Executor?e,
          ?????????????????????????????????????????????????Supplier?f)
          ?
          {
          ????if?(f?==?null)?throw?new?NullPointerException();
          ????CompletableFuture?d?=?new?CompletableFuture();
          ????e.execute(new?AsyncSupply(d,?f));
          ????return?d;
          }

          這里會(huì)創(chuàng)建一個(gè)用于返回的CompletableFuture。

          然后構(gòu)造一個(gè)AsyncSupply,并將創(chuàng)建的CompletableFuture作為構(gòu)造參數(shù)傳入。

          那么,任務(wù)的執(zhí)行完全依賴AsyncSupply。

          AsyncSupply#run

          public?void?run()?{
          ????CompletableFuture?d;?Supplier?f;
          ????if?((d?=?dep)?!=?null?&&?(f?=?fn)?!=?null)?{
          ????????dep?=?null;?fn?=?null;
          ????????if?(d.result?==?null)?{
          ????????????try?{
          ????????????????d.completeValue(f.get());
          ????????????}?catch?(Throwable?ex)?{
          ????????????????d.completeThrowable(ex);
          ????????????}
          ????????}
          ????????d.postComplete();
          ????}
          }
          1. 該方法會(huì)調(diào)用Supplier的get方法。并將結(jié)果設(shè)置到CompletableFuture中。我們應(yīng)該清楚這些操作都是在異步線程中調(diào)用的。
          2. d.postComplete方法就是通知任務(wù)執(zhí)行完成。觸發(fā)后續(xù)依賴任務(wù)的執(zhí)行,也就是實(shí)現(xiàn)CompletionStage的關(guān)鍵點(diǎn)。

          在看postComplete方法之前我們先來(lái)看一下創(chuàng)建依賴任務(wù)的邏輯。

          thenAcceptAsync方法

          public?CompletableFuture?thenAcceptAsync(Consumersuper?T>?action)?{
          ????return?uniAcceptStage(asyncPool,?action);
          }
          private?CompletableFuture?uniAcceptStage(Executor?e,
          ???????????????????????????????????????????????Consumersuper
          ?T>?f)
          ?{
          ????if?(f?==?null)?throw?new?NullPointerException();
          ????CompletableFuture?d?=?new?CompletableFuture();
          ????if?(e?!=?null?||?!d.uniAccept(this,?f,?null))?{
          ????????#?1
          ????????UniAccept?c?=?new?UniAccept(e,?d,?this,?f);
          ????????push(c);
          ????????c.tryFire(SYNC);
          ????}
          ????return?d;
          }

          上面提到過(guò)。thenAcceptAsync是用來(lái)消費(fèi)CompletableFuture的。該方法調(diào)用uniAcceptStage。

          uniAcceptStage邏輯:

          1. 構(gòu)造一個(gè)CompletableFuture,主要是為了鏈?zhǔn)秸{(diào)用。
          2. 如果為異步任務(wù),直接返回。因?yàn)樵慈蝿?wù)結(jié)束后會(huì)觸發(fā)異步線程執(zhí)行對(duì)應(yīng)邏輯。
          3. 如果為同步任務(wù)(e==null),會(huì)調(diào)用d.uniAccept方法。這個(gè)方法在這里邏輯:如果源任務(wù)完成,調(diào)用f,返回true。否則進(jìn)入if代碼塊(Mark 1)。
          4. 如果是異步任務(wù)直接進(jìn)入if(Mark 1)。

          Mark1邏輯:

          1. 構(gòu)造一個(gè)UniAccept,將其push入棧。這里通過(guò)CAS實(shí)現(xiàn)樂(lè)觀鎖實(shí)現(xiàn)。
          2. 調(diào)用c.tryFire方法。
          final?CompletableFuture?tryFire(int?mode)?{
          ????CompletableFuture?d;?CompletableFuture?a;
          ????if?((d?=?dep)?==?null?||
          ????????!d.uniAccept(a?=?src,?fn,?mode?>?0???null?:?this))
          ????????return?null;
          ????dep?=?null;?src?=?null;?fn?=?null;
          ????return?d.postFire(a,?mode);
          }
          1. 會(huì)調(diào)用d.uniAccept方法。其實(shí)該方法判斷源任務(wù)是否完成,如果完成則執(zhí)行依賴任務(wù),否則返回false。
          2. 如果依賴任務(wù)已經(jīng)執(zhí)行,調(diào)用d.postFire,主要就是Fire的后續(xù)處理。根據(jù)不同模式邏輯不同。

          這里簡(jiǎn)單說(shuō)一下,其實(shí)mode有同步異步,和迭代。迭代為了避免無(wú)限遞歸。

          這里強(qiáng)調(diào)一下d.uniAccept方法的第三個(gè)參數(shù)。

          如果是異步調(diào)用(mode>0),傳入null。否則傳入this。

          區(qū)別看下面代碼。c不為null會(huì)調(diào)用c.claim方法。

          try?{
          ????if?(c?!=?null?&&?!c.claim())
          ????????return?false;
          ????@SuppressWarnings("unchecked")?S?s?=?(S)?r;
          ????f.accept(s);
          ????completeNull();
          }?catch?(Throwable?ex)?{
          ????completeThrowable(ex);
          }

          final?boolean?claim()?{
          ????Executor?e?=?executor;
          ????if?(compareAndSetForkJoinTaskTag((short)0,?(short)1))?{
          ????????if?(e?==?null)
          ????????????return?true;
          ????????executor?=?null;?//?disable
          ????????e.execute(this);
          ????}
          ????return?false;
          }

          claim方法是邏輯:

          • 如果異步線程為null。說(shuō)明同步,那么直接返回true。最后上層函數(shù)會(huì)調(diào)用f.accept(s)同步執(zhí)行任務(wù)。
          • 如果異步線程不為null,那么使用異步線程去執(zhí)行this。

          this的run任務(wù)如下。也就是在異步線程同步調(diào)用tryFire方法。達(dá)到其被異步線程執(zhí)行的目的。

          public?final?void?run(){?
          ???tryFire(ASYNC);?
          }

          看完上面的邏輯,我們基本理解依賴任務(wù)的邏輯。

          其實(shí)就是先判斷源任務(wù)是否完成,如果完成,直接在對(duì)應(yīng)線程執(zhí)行以來(lái)任務(wù)(如果是同步,則在當(dāng)前線程處理,否則在異步線程處理)

          如果任務(wù)沒(méi)有完成,直接返回,因?yàn)榈热蝿?wù)完成之后會(huì)通過(guò)postComplete去觸發(fā)調(diào)用依賴任務(wù)。

          postComplete方法

          final?void?postComplete()?{
          ????/*
          ?????*?On?each?step,?variable?f?holds?current?dependents?to?pop
          ?????*?and?run.??It?is?extended?along?only?one?path?at?a?time,
          ?????*?pushing?others?to?avoid?unbounded?recursion.
          ?????*/

          ????CompletableFuture?f?=?this;?Completion?h;
          ????while?((h?=?f.stack)?!=?null?||
          ???????????(f?!=?this?&&?(h?=?(f?=?this).stack)?!=?null))?{
          ????????CompletableFuture?d;?Completion?t;
          ????????if?(f.casStack(h,?t?=?h.next))?{
          ????????????if?(t?!=?null)?{
          ????????????????if?(f?!=?this)?{
          ????????????????????pushStack(h);
          ????????????????????continue;
          ????????????????}
          ????????????????h.next?=?null;????//?detach
          ????????????}
          ????????????f?=?(d?=?h.tryFire(NESTED))?==?null???this?:?d;
          ????????}
          ????}
          }

          在源任務(wù)完成之后會(huì)調(diào)用。

          其實(shí)邏輯很簡(jiǎn)單,就是迭代堆棧的依賴任務(wù)。調(diào)用h.tryFire方法。NESTED就是為了避免遞歸死循環(huán)。因?yàn)镕irePost會(huì)調(diào)用postComplete。如果是NESTED,則不調(diào)用。

          堆棧的內(nèi)容其實(shí)就是在依賴任務(wù)創(chuàng)建的時(shí)候加入進(jìn)去的。上面我們已經(jīng)提到過(guò)。

          4.總結(jié)

          基本上述源碼已經(jīng)分析了邏輯。

          因?yàn)樯婕爱惒降炔僮鳎覀冃枰硪幌拢ㄟ@里針對(duì)全異步任務(wù)):

          1. 創(chuàng)建CompletableFuture成功之后會(huì)通過(guò)異步線程去執(zhí)行對(duì)應(yīng)任務(wù)。
          2. 如果CompletableFuture還有依賴任務(wù)(異步),會(huì)將任務(wù)加入到CompletableFuture的堆棧保存起來(lái)。以供后續(xù)完成后執(zhí)行依賴任務(wù)。

          當(dāng)然,創(chuàng)建依賴任務(wù)并不只是將其加入堆棧。如果源任務(wù)在創(chuàng)建依賴任務(wù)的時(shí)候已經(jīng)執(zhí)行完成,那么當(dāng)前線程會(huì)觸發(fā)依賴任務(wù)的異步線程直接處理依賴任務(wù)。并且會(huì)告訴堆棧其他的依賴任務(wù)源任務(wù)已經(jīng)完成。

          主要是考慮代碼的復(fù)用。所以邏輯相對(duì)難理解。

          postComplete方法會(huì)被源任務(wù)線程執(zhí)行完源任務(wù)后調(diào)用。同樣也可能被依賴任務(wù)線程后調(diào)用。

          執(zhí)行依賴任務(wù)的方法主要就是靠tryFire方法。因?yàn)檫@個(gè)方法可能會(huì)被多種不同類型線程觸發(fā),所以邏輯也繞一點(diǎn)。(其他依賴任務(wù)線程、源任務(wù)線程、當(dāng)前依賴任務(wù)線程)

          • 如果是當(dāng)前依賴任務(wù)線程,那么會(huì)執(zhí)行依賴任務(wù),并且會(huì)通知其他依賴任務(wù)。
          • 如果是源任務(wù)線程,和其他依賴任務(wù)線程,則將任務(wù)轉(zhuǎn)換給依賴線程去執(zhí)行。不需要通知其他依賴任務(wù),避免死遞歸。

          不得不說(shuō)Doug Lea的編碼,真的是藝術(shù)。代碼的復(fù)用性全體現(xiàn)在邏輯上了。

          推薦閱讀:

          世界的真實(shí)格局分析,地球人類社會(huì)底層運(yùn)行原理

          不是你需要中臺(tái),而是一名合格的架構(gòu)師(附各大廠中臺(tái)建設(shè)PPT)

          億級(jí)(無(wú)限級(jí))并發(fā),沒(méi)那么難

          論數(shù)字化轉(zhuǎn)型——轉(zhuǎn)什么,如何轉(zhuǎn)?

          華為干部與人才發(fā)展手冊(cè)(附PPT)

          企業(yè)10大管理流程圖,數(shù)字化轉(zhuǎn)型從業(yè)者必備!

          【中臺(tái)實(shí)踐】華為大數(shù)據(jù)中臺(tái)架構(gòu)分享.pdf

          華為的數(shù)字化轉(zhuǎn)型方法論

          華為如何實(shí)施數(shù)字化轉(zhuǎn)型(附PPT)

          超詳細(xì)280頁(yè)Docker實(shí)戰(zhàn)文檔!開(kāi)放下載

          華為大數(shù)據(jù)解決方案(PPT)

          瀏覽 14
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                    <th id="afajh"><progress id="afajh"></progress></th>
                    偷偷操av | 亚洲性色Aⅴ | 大香煮伊在一区二区2022 | 骚逼影院 | 黄色一级片欧美 |