<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          面試官:jdk8的異步CompletableFuture實現(xiàn)原理和使用場景,具體講下

          共 559字,需瀏覽 2分鐘

           ·

          2022-02-11 10:34


          來源:blog.csdn.net/weixin_39332800/article/

          details/108185931



          面試中關(guān)于JDK中異步的API能說清楚,談薪資時也比較有底氣,現(xiàn)在并發(fā)多線程是面試官最愛問的題型

          1.概述

          CompletableFuture是jdk1.8引入的實現(xiàn)類。擴展了Future和CompletionStage,是一個可以在任務(wù)完成階段觸發(fā)一些操作Future。簡單的來講就是可以實現(xiàn)異步回調(diào)。

          2.為什么引入CompletableFuture

          對于jdk1.5的Future,雖然提供了異步處理任務(wù)的能力,但是獲取結(jié)果的方式很不優(yōu)雅,還是需要通過阻塞(或者輪訓)的方式。如何避免阻塞呢?其實就是注冊回調(diào)。

          業(yè)界結(jié)合觀察者模式實現(xiàn)異步回調(diào)。也就是當任務(wù)執(zhí)行完成后去通知觀察者。比如Netty的ChannelFuture,可以通過注冊監(jiān)聽實現(xiàn)異步結(jié)果的處理。

          Netty的ChannelFuture
          public?Promise?addListener(GenericFutureListenersuper?V>>?listener)?{??
          ????checkNotNull(listener,?"listener");??
          ????synchronized?(this)?{??
          ????????addListener0(listener);??
          ????}??
          ????if?(isDone())?{??
          ????????notifyListeners();??
          ????}??
          ????return?this;??
          }??
          private?boolean?setValue0(Object?objResult)?{??
          ????if?(RESULT_UPDATER.compareAndSet(this,?null,?objResult)?||??
          ????????RESULT_UPDATER.compareAndSet(this,?UNCANCELLABLE,?objResult))?{??
          ????????if?(checkNotifyWaiters())?{??
          ????????????notifyListeners();??
          ????????}??
          ????????return?true;??
          ????}??
          ????return?false;??
          }??

          通過addListener方法注冊監(jiān)聽。如果任務(wù)完成,會調(diào)用notifyListeners通知。

          CompletableFuture通過擴展Future,引入函數(shù)式編程,通過回調(diào)的方式去處理結(jié)果。

          3.功能

          CompletableFuture的功能主要體現(xiàn)在他的CompletionStage。

          可以實現(xiàn)如下等功能

          • 轉(zhuǎn)換(thenCompose)

          • 組合(thenCombine)

          • 消費(thenAccept)

          • 運行(thenRun)。

          • 帶返回的消費(thenApply)

          消費和運行的區(qū)別:

          消費使用執(zhí)行結(jié)果。運行則只是運行特定任務(wù)。具體其他功能大家可以根據(jù)需求自行查看。

          CompletableFuture借助CompletionStage的方法可以實現(xiàn)鏈式調(diào)用。并且可以選擇同步或者異步兩種方式。

          這里舉個簡單的例子來體驗一下他的功能。

          public?static?void?thenApply()?{??
          ????ExecutorService?executorService?=?Executors.newFixedThreadPool(2);??
          ????CompletableFuture?cf?=?CompletableFuture.supplyAsync(()?->?{??
          ????????try?{??
          ????????????//??Thread.sleep(2000);??
          ????????}?catch?(Exception?e)?{??
          ????????????e.printStackTrace();??
          ????????}??
          ????????System.out.println("supplyAsync?"?+?Thread.currentThread().getName());??
          ????????return?"hello";??
          ????},?executorService).thenApplyAsync(s?->?{??
          ????????System.out.println(s?+?"world");??
          ????????return?"hhh";??
          ????},?executorService);??
          ????cf.thenRunAsync(()?->?{??
          ????????System.out.println("ddddd");??
          ????});??
          ????cf.thenRun(()?->?{??
          ????????System.out.println("ddddsd");??
          ????});??
          ????cf.thenRun(()?->?{??
          ????????System.out.println(Thread.currentThread());??
          ????????System.out.println("dddaewdd");??
          ????});??
          }??

          執(zhí)行結(jié)果

          supplyAsync?pool-1-thread-1??
          helloworld??
          ddddd??
          ddddsd??
          Thread[main,5,main]??
          dddaewdd??

          根據(jù)結(jié)果我們可以看到會有序執(zhí)行對應任務(wù)。

          注意:

          如果是同步執(zhí)行cf.thenRun。他的執(zhí)行線程可能main線程,也可能是執(zhí)行源任務(wù)的線程。如果執(zhí)行源任務(wù)的線程在main調(diào)用之前執(zhí)行完了任務(wù)。那么cf.thenRun方法會由main線程調(diào)用。

          這里說明一下,如果是同一任務(wù)的依賴任務(wù)有多個:

          • 如果這些依賴任務(wù)都是同步執(zhí)行。那么假如這些任務(wù)被當前調(diào)用線程(main)執(zhí)行,則是有序執(zhí)行,假如被執(zhí)行源任務(wù)的線程執(zhí)行,那么會是倒序執(zhí)行。因為內(nèi)部任務(wù)數(shù)據(jù)結(jié)構(gòu)為LIFO。

          • 如果這些依賴任務(wù)都是異步執(zhí)行,那么他會通過異步線程池去執(zhí)行任務(wù)。不能保證任務(wù)的執(zhí)行順序。

          上面的結(jié)論是通過閱讀源代碼得到的。下面我們深入源代碼。

          4.源碼追蹤

          創(chuàng)建CompletableFuture

          創(chuàng)建的方法有很多,甚至可以直接new一個。我們來看一下supplyAsync異步創(chuàng)建的方法。

          public?static??CompletableFuture?supplyAsync(Supplier?supplier,??
          ???????????????????????????????????????????????????Executor?executor)
          ?
          {??
          ????return?asyncSupplyStage(screenExecutor(executor),?supplier);??
          }??
          static?Executor?screenExecutor(Executor?e)?{??
          ????if?(!useCommonPool?&&?e?==?ForkJoinPool.commonPool())??
          ????????return?asyncPool;??
          ????if?(e?==?null)?throw?new?NullPointerException();??
          ????return?e;??
          }??

          入?yún)upplier,帶返回值的函數(shù)。如果是異步方法,并且傳遞了執(zhí)行器,那么會使用傳入的執(zhí)行器去執(zhí)行任務(wù)。否則采用公共的ForkJoin并行線程池,如果不支持并行,新建一個線程去執(zhí)行。關(guān)注Java項目分享

          這里我們需要注意ForkJoin是通過守護線程去執(zhí)行任務(wù)的。所以必須有非守護線程的存在才行。

          asyncSupplyStage方法
          static??CompletableFuture?asyncSupplyStage(Executor?e,??
          ?????????????????????????????????????????????????Supplier?f)
          ?
          {??
          ????if?(f?==?null)?throw?new?NullPointerException();??
          ????CompletableFuture?d?=?new?CompletableFuture();??
          ????e.execute(new?AsyncSupply(d,?f));??
          ????return?d;??
          }??

          這里會創(chuàng)建一個用于返回的CompletableFuture。

          然后構(gòu)造一個AsyncSupply,并將創(chuàng)建的CompletableFuture作為構(gòu)造參數(shù)傳入。

          那么,任務(wù)的執(zhí)行完全依賴AsyncSupply。

          AsyncSupply#run
          public?void?run()?{??
          ????CompletableFuture?d;?Supplier?f;??
          ????if?((d?=?dep)?!=?null?&&?(f?=?fn)?!=?null)?{??
          ????????dep?=?null;?fn?=?null;??
          ????????if?(d.result?==?null)?{??
          ????????????try?{??
          ????????????????d.completeValue(f.get());??
          ????????????}?catch?(Throwable?ex)?{??
          ????????????????d.completeThrowable(ex);??
          ????????????}??
          ????????}??
          ????????d.postComplete();??
          ????}??
          }??
          1. 該方法會調(diào)用Supplier的get方法。并將結(jié)果設(shè)置到CompletableFuture中。我們應該清楚這些操作都是在異步線程中調(diào)用的。

          2. d.postComplete方法就是通知任務(wù)執(zhí)行完成。觸發(fā)后續(xù)依賴任務(wù)的執(zhí)行,也就是實現(xiàn)CompletionStage的關(guān)鍵點。關(guān)注Java項目分享

          在看postComplete方法之前我們先來看一下創(chuàng)建依賴任務(wù)的邏輯。

          thenAcceptAsync方法
          public?CompletableFuture?thenAcceptAsync(Consumersuper?T>?action)?{??
          ????return?uniAcceptStage(asyncPool,?action);??
          }??
          private?CompletableFuture?uniAcceptStage(Executor?e,??
          ???????????????????????????????????????????????Consumersuper
          ?T>?f)
          ?{??
          ????if?(f?==?null)?throw?new?NullPointerException();??
          ????CompletableFuture?d?=?new?CompletableFuture();??
          ????if?(e?!=?null?||?!d.uniAccept(this,?f,?null))?{??
          ????????#?1??
          ????????UniAccept?c?=?new?UniAccept(e,?d,?this,?f);??
          ????????push(c);??
          ????????c.tryFire(SYNC);??
          ????}??
          ????return?d;??
          }??

          上面提到過。thenAcceptAsync是用來消費CompletableFuture的。該方法調(diào)用uniAcceptStage。

          uniAcceptStage邏輯:

          1. 構(gòu)造一個CompletableFuture,主要是為了鏈式調(diào)用。

          2. 如果為異步任務(wù),直接返回。因為源任務(wù)結(jié)束后會觸發(fā)異步線程執(zhí)行對應邏輯。

          3. 如果為同步任務(wù)(e==null),會調(diào)用d.uniAccept方法。這個方法在這里邏輯:如果源任務(wù)完成,調(diào)用f,返回true。否則進入if代碼塊(Mark 1)。

          4. 如果是異步任務(wù)直接進入if(Mark 1)。

          Mark1邏輯:

          1. 構(gòu)造一個UniAccept,將其push入棧。這里通過CAS實現(xiàn)樂觀鎖實現(xiàn)。

          2. 調(diào)用c.tryFire方法。

          final?CompletableFuture?tryFire(int?mode)?{??
          ????CompletableFuture?d;?CompletableFuture?a;??
          ????if?((d?=?dep)?==?null?||??
          ????????!d.uniAccept(a?=?src,?fn,?mode?>?0???null?:?this))??
          ????????return?null;??
          ????dep?=?null;?src?=?null;?fn?=?null;??
          ????return?d.postFire(a,?mode);??
          }??
          1. 會調(diào)用d.uniAccept方法。其實該方法判斷源任務(wù)是否完成,如果完成則執(zhí)行依賴任務(wù),否則返回false。

          2. 如果依賴任務(wù)已經(jīng)執(zhí)行,調(diào)用d.postFire,主要就是Fire的后續(xù)處理。根據(jù)不同模式邏輯不同。

          這里簡單說一下,其實mode有同步異步,和迭代。迭代為了避免無限遞歸。

          這里強調(diào)一下d.uniAccept方法的第三個參數(shù)。

          如果是異步調(diào)用(mode>0),傳入null。否則傳入this。

          區(qū)別看下面代碼。c不為null會調(diào)用c.claim方法。關(guān)注Java項目分享

          try?{??
          ????if?(c?!=?null?&&?!c.claim())??
          ????????return?false;??
          ????@SuppressWarnings("unchecked")?S?s?=?(S)?r;??
          ????f.accept(s);??
          ????completeNull();??
          }?catch?(Throwable?ex)?{??
          ????completeThrowable(ex);??
          }??
          ??
          final?boolean?claim()?{??
          ????Executor?e?=?executor;??
          ????if?(compareAndSetForkJoinTaskTag((short)0,?(short)1))?{??
          ????????if?(e?==?null)??
          ????????????return?true;??
          ????????executor?=?null;?//?disable??
          ????????e.execute(this);??
          ????}??
          ????return?false;??
          }??

          claim方法是邏輯:

          • 如果異步線程為null。說明同步,那么直接返回true。最后上層函數(shù)會調(diào)用f.accept(s)同步執(zhí)行任務(wù)。

          • 如果異步線程不為null,那么使用異步線程去執(zhí)行this。

          this的run任務(wù)如下。也就是在異步線程同步調(diào)用tryFire方法。達到其被異步線程執(zhí)行的目的。

          public?final?void?run(){???
          ???tryFire(ASYNC);???
          }??

          看完上面的邏輯,我們基本理解依賴任務(wù)的邏輯。

          其實就是先判斷源任務(wù)是否完成,如果完成,直接在對應線程執(zhí)行以來任務(wù)(如果是同步,則在當前線程處理,否則在異步線程處理)

          如果任務(wù)沒有完成,直接返回,因為等任務(wù)完成之后會通過postComplete去觸發(fā)調(diào)用依賴任務(wù)。

          postComplete方法
          final?void?postComplete()?{??
          ????/*??
          ?????*?On?each?step,?variable?f?holds?current?dependents?to?pop??
          ?????*?and?run.??It?is?extended?along?only?one?path?at?a?time,??
          ?????*?pushing?others?to?avoid?unbounded?recursion.??
          ?????*/
          ??
          ????CompletableFuture?f?=?this;?Completion?h;??
          ????while?((h?=?f.stack)?!=?null?||??
          ???????????(f?!=?this?&&?(h?=?(f?=?this).stack)?!=?null))?{??
          ????????CompletableFuture?d;?Completion?t;??
          ????????if?(f.casStack(h,?t?=?h.next))?{??
          ????????????if?(t?!=?null)?{??
          ????????????????if?(f?!=?this)?{??
          ????????????????????pushStack(h);??
          ????????????????????continue;??
          ????????????????}??
          ????????????????h.next?=?null;????//?detach??
          ????????????}??
          ????????????f?=?(d?=?h.tryFire(NESTED))?==?null???this?:?d;??
          ????????}??
          ????}??
          }??

          在源任務(wù)完成之后會調(diào)用。

          其實邏輯很簡單,就是迭代堆棧的依賴任務(wù)。調(diào)用h.tryFire方法。NESTED就是為了避免遞歸死循環(huán)。因為FirePost會調(diào)用postComplete。如果是NESTED,則不調(diào)用。

          堆棧的內(nèi)容其實就是在依賴任務(wù)創(chuàng)建的時候加入進去的。上面我們已經(jīng)提到過。關(guān)注Java項目分享

          4.總結(jié)

          基本上述源碼已經(jīng)分析了邏輯。

          因為涉及異步等操作,我們需要理一下(這里針對全異步任務(wù)):

          1. 創(chuàng)建CompletableFuture成功之后會通過異步線程去執(zhí)行對應任務(wù)。

          2. 如果CompletableFuture還有依賴任務(wù)(異步),會將任務(wù)加入到CompletableFuture的堆棧保存起來。以供后續(xù)完成后執(zhí)行依賴任務(wù)。

          當然,創(chuàng)建依賴任務(wù)并不只是將其加入堆棧。如果源任務(wù)在創(chuàng)建依賴任務(wù)的時候已經(jīng)執(zhí)行完成,那么當前線程會觸發(fā)依賴任務(wù)的異步線程直接處理依賴任務(wù)。并且會告訴堆棧其他的依賴任務(wù)源任務(wù)已經(jīng)完成。

          主要是考慮代碼的復用。所以邏輯相對難理解。

          postComplete方法會被源任務(wù)線程執(zhí)行完源任務(wù)后調(diào)用。同樣也可能被依賴任務(wù)線程后調(diào)用。

          執(zhí)行依賴任務(wù)的方法主要就是靠tryFire方法。因為這個方法可能會被多種不同類型線程觸發(fā),所以邏輯也繞一點。(其他依賴任務(wù)線程、源任務(wù)線程、當前依賴任務(wù)線程)

          • 如果是當前依賴任務(wù)線程,那么會執(zhí)行依賴任務(wù),并且會通知其他依賴任務(wù)。

          • 如果是源任務(wù)線程,和其他依賴任務(wù)線程,則將任務(wù)轉(zhuǎn)換給依賴線程去執(zhí)行。不需要通知其他依賴任務(wù),避免死遞歸。

          不得不說Doug Lea的編碼,真的是藝術(shù)。代碼的復用性全體現(xiàn)在邏輯上了。

          程序汪資料鏈接

          程序汪接的7個私活都在這里,經(jīng)驗整理

          Java項目分享 最新整理全集,找項目不累啦 06版

          堪稱神級的Spring Boot手冊,從基礎(chǔ)入門到實戰(zhàn)進階

          臥槽!字節(jié)跳動《算法中文手冊》火了,完整版 PDF 開放下載!

          臥槽!阿里大佬總結(jié)的《圖解Java》火了,完整版PDF開放下載!

          字節(jié)跳動總結(jié)的設(shè)計模式 PDF 火了,完整版開放下載!

          歡迎添加程序汪個人微信 itwang008? 進粉絲群或圍觀朋友圈

          瀏覽 41
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                    <th id="afajh"><progress id="afajh"></progress></th>
                    婷婷五月综合久久中文字幕 | 伊人妻| 久久精品黄色 | 国产棈品久久久久久久久久九秃 | A片网站在线观看 |