<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          異步神器:CompletableFuture實(shí)現(xiàn)原理和使用場景

          共 16256字,需瀏覽 33分鐘

           ·

          2022-03-04 13:48


          源 /         文/ 



          先送大家一份福利:

          《美團(tuán)技術(shù)年貨.pdf》(2019-2021)

          在2022年春節(jié)到來之際,美團(tuán)技術(shù)團(tuán)隊精選過去3年公眾號50多篇技術(shù)文章以及 20多篇國際頂會論文,整理制作成一本厚達(dá)1200多頁的電子書,作為新年禮物贈送給大家。 

          這本電子書內(nèi)容覆蓋算法、前端、后端、數(shù)據(jù)、安全、測試等多個領(lǐng)域。

          希望能對同學(xué)們的工作和學(xué)習(xí)有所幫助。

          Code A Better Life


          需要該P(yáng)DF文檔,可直接長按掃碼添加好友,回復(fù) 「PDF」 獲取:



          來源:blog.csdn.net/weixin_39332800/article/

          details/108185931



          1.概述

          CompletableFuture是jdk1.8引入的實(shí)現(xiàn)類。擴(kuò)展了Future和CompletionStage,是一個可以在任務(wù)完成階段觸發(fā)一些操作Future。簡單的來講就是可以實(shí)現(xiàn)異步回調(diào)。

          2.為什么引入CompletableFuture

          對于jdk1.5的Future,雖然提供了異步處理任務(wù)的能力,但是獲取結(jié)果的方式很不優(yōu)雅,還是需要通過阻塞(或者輪訓(xùn))的方式。如何避免阻塞呢?其實(shí)就是注冊回調(diào)。

          業(yè)界結(jié)合觀察者模式實(shí)現(xiàn)異步回調(diào)。也就是當(dāng)任務(wù)執(zhí)行完成后去通知觀察者。比如Netty的ChannelFuture,可以通過注冊監(jiān)聽實(shí)現(xiàn)異步結(jié)果的處理。

          Netty的ChannelFuture
          public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {  
              checkNotNull(listener, "listener");  
              synchronized (this) {  
                  addListener0(listener);  
              }  
              if (isDone()) {  
                  notifyListeners();  
              }  
              return this;  
          }  
          private boolean setValue0(Object objResult) {  
              if (RESULT_UPDATER.compareAndSet(thisnull, objResult) ||  
                  RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {  
                  if (checkNotifyWaiters()) {  
                      notifyListeners();  
                  }  
                  return true;  
              }  
              return false;  
          }  

          通過addListener方法注冊監(jiān)聽。如果任務(wù)完成,會調(diào)用notifyListeners通知。

          CompletableFuture通過擴(kuò)展Future,引入函數(shù)式編程,通過回調(diào)的方式去處理結(jié)果。

          3.功能

          CompletableFuture的功能主要體現(xiàn)在他的CompletionStage。

          可以實(shí)現(xiàn)如下等功能

          • 轉(zhuǎn)換(thenCompose)

          • 組合(thenCombine)

          • 消費(fèi)(thenAccept)

          • 運(yùn)行(thenRun)。

          • 帶返回的消費(fèi)(thenApply)

          消費(fèi)和運(yùn)行的區(qū)別:

          消費(fèi)使用執(zhí)行結(jié)果。運(yùn)行則只是運(yùn)行特定任務(wù)。具體其他功能大家可以根據(jù)需求自行查看。

          CompletableFuture借助CompletionStage的方法可以實(shí)現(xiàn)鏈?zhǔn)秸{(diào)用。并且可以選擇同步或者異步兩種方式。

          這里舉個簡單的例子來體驗(yàn)一下他的功能。

          public static void thenApply() {  
              ExecutorService executorService = Executors.newFixedThreadPool(2);  
              CompletableFuture cf = CompletableFuture.supplyAsync(() -> {  
                  try {  
                      //  Thread.sleep(2000);  
                  } catch (Exception e) {  
                      e.printStackTrace();  
                  }  
                  System.out.println("supplyAsync " + Thread.currentThread().getName());  
                  return "hello";  
              }, executorService).thenApplyAsync(s -> {  
                  System.out.println(s + "world");  
                  return "hhh";  
              }, executorService);  
              cf.thenRunAsync(() -> {  
                  System.out.println("ddddd");  
              });  
              cf.thenRun(() -> {  
                  System.out.println("ddddsd");  
              });  
              cf.thenRun(() -> {  
                  System.out.println(Thread.currentThread());  
                  System.out.println("dddaewdd");  
              });  
          }  

          執(zhí)行結(jié)果

          supplyAsync pool-1-thread-1  
          helloworld  
          ddddd  
          ddddsd  
          Thread[main,5,main]  
          dddaewdd  

          根據(jù)結(jié)果我們可以看到會有序執(zhí)行對應(yīng)任務(wù)。

          注意:

          如果是同步執(zhí)行cf.thenRun。他的執(zhí)行線程可能main線程,也可能是執(zhí)行源任務(wù)的線程。如果執(zhí)行源任務(wù)的線程在main調(diào)用之前執(zhí)行完了任務(wù)。那么cf.thenRun方法會由main線程調(diào)用。

          這里說明一下,如果是同一任務(wù)的依賴任務(wù)有多個:

          • 如果這些依賴任務(wù)都是同步執(zhí)行。那么假如這些任務(wù)被當(dāng)前調(diào)用線程(main)執(zhí)行,則是有序執(zhí)行,假如被執(zhí)行源任務(wù)的線程執(zhí)行,那么會是倒序執(zhí)行。因?yàn)閮?nèi)部任務(wù)數(shù)據(jù)結(jié)構(gòu)為LIFO。

          • 如果這些依賴任務(wù)都是異步執(zhí)行,那么他會通過異步線程池去執(zhí)行任務(wù)。不能保證任務(wù)的執(zhí)行順序。

          上面的結(jié)論是通過閱讀源代碼得到的。下面我們深入源代碼。

          4.源碼追蹤

          創(chuàng)建CompletableFuture

          創(chuàng)建的方法有很多,甚至可以直接new一個。我們來看一下supplyAsync異步創(chuàng)建的方法。

          public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,  
                                                             Executor executor)
           
          {  
              return asyncSupplyStage(screenExecutor(executor), supplier);  
          }  
          static Executor screenExecutor(Executor e) {  
              if (!useCommonPool && e == ForkJoinPool.commonPool())  
                  return asyncPool;  
              if (e == nullthrow new NullPointerException();  
              return e;  
          }  

          入?yún)upplier,帶返回值的函數(shù)。如果是異步方法,并且傳遞了執(zhí)行器,那么會使用傳入的執(zhí)行器去執(zhí)行任務(wù)。否則采用公共的ForkJoin并行線程池,如果不支持并行,新建一個線程去執(zhí)行。

          這里我們需要注意ForkJoin是通過守護(hù)線程去執(zhí)行任務(wù)的。所以必須有非守護(hù)線程的存在才行。

          asyncSupplyStage方法
          static <U> CompletableFuture<U> asyncSupplyStage(Executor e,  
                                                           Supplier<U> f)
           
          {  
              if (f == nullthrow new NullPointerException();  
              CompletableFuture<U> d = new CompletableFuture<U>();  
              e.execute(new AsyncSupply<U>(d, f));  
              return d;  
          }  

          這里會創(chuàng)建一個用于返回的CompletableFuture。

          然后構(gòu)造一個AsyncSupply,并將創(chuàng)建的CompletableFuture作為構(gòu)造參數(shù)傳入。

          那么,任務(wù)的執(zhí)行完全依賴AsyncSupply。

          AsyncSupply#run
          public void run() {  
              CompletableFuture<T> d; Supplier<T> f;  
              if ((d = dep) != null && (f = fn) != null) {  
                  dep = null; fn = null;  
                  if (d.result == null) {  
                      try {  
                          d.completeValue(f.get());  
                      } catch (Throwable ex) {  
                          d.completeThrowable(ex);  
                      }  
                  }  
                  d.postComplete();  
              }  
          }  
          1. 該方法會調(diào)用Supplier的get方法。并將結(jié)果設(shè)置到CompletableFuture中。我們應(yīng)該清楚這些操作都是在異步線程中調(diào)用的。

          2. d.postComplete方法就是通知任務(wù)執(zhí)行完成。觸發(fā)后續(xù)依賴任務(wù)的執(zhí)行,也就是實(shí)現(xiàn)CompletionStage的關(guān)鍵點(diǎn)。

          在看postComplete方法之前我們先來看一下創(chuàng)建依賴任務(wù)的邏輯。

          thenAcceptAsync方法
          public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {  
              return uniAcceptStage(asyncPool, action);  
          }  
          private CompletableFuture<Void> uniAcceptStage(Executor e,  
                                                         Consumer<? super T> f)
           
          {  
              if (f == nullthrow new NullPointerException();  
              CompletableFuture<Void> d = new CompletableFuture<Void>();  
              if (e != null || !d.uniAccept(this, f, null)) {  
                  # 1  
                  UniAccept<T> c = new UniAccept<T>(e, d, this, f);  
                  push(c);  
                  c.tryFire(SYNC);  
              }  
              return d;  
          }  

          上面提到過。thenAcceptAsync是用來消費(fèi)CompletableFuture的。該方法調(diào)用uniAcceptStage。

          uniAcceptStage邏輯:

          1. 構(gòu)造一個CompletableFuture,主要是為了鏈?zhǔn)秸{(diào)用。

          2. 如果為異步任務(wù),直接返回。因?yàn)樵慈蝿?wù)結(jié)束后會觸發(fā)異步線程執(zhí)行對應(yīng)邏輯。

          3. 如果為同步任務(wù)(e==null),會調(diào)用d.uniAccept方法。這個方法在這里邏輯:如果源任務(wù)完成,調(diào)用f,返回true。否則進(jìn)入if代碼塊(Mark 1)。

          4. 如果是異步任務(wù)直接進(jìn)入if(Mark 1)。

          Mark1邏輯:

          1. 構(gòu)造一個UniAccept,將其push入棧。這里通過CAS實(shí)現(xiàn)樂觀鎖實(shí)現(xiàn)。

          2. 調(diào)用c.tryFire方法。

          final CompletableFuture<Void> tryFire(int mode) {  
              CompletableFuture<Void> d; CompletableFuture<T> a;  
              if ((d = dep) == null ||  
                  !d.uniAccept(a = src, fn, mode > 0 ? null : this))  
                  return null;  
              dep = null; src = null; fn = null;  
              return d.postFire(a, mode);  
          }  
          1. 會調(diào)用d.uniAccept方法。其實(shí)該方法判斷源任務(wù)是否完成,如果完成則執(zhí)行依賴任務(wù),否則返回false。

          2. 如果依賴任務(wù)已經(jīng)執(zhí)行,調(diào)用d.postFire,主要就是Fire的后續(xù)處理。根據(jù)不同模式邏輯不同。

          這里簡單說一下,其實(shí)mode有同步異步,和迭代。迭代為了避免無限遞歸。

          這里強(qiáng)調(diào)一下d.uniAccept方法的第三個參數(shù)。

          如果是異步調(diào)用(mode>0),傳入null。否則傳入this。

          區(qū)別看下面代碼。c不為null會調(diào)用c.claim方法。

          try {  
              if (c != null && !c.claim())  
                  return false;  
              @SuppressWarnings("unchecked") S s = (S) r;  
              f.accept(s);  
              completeNull();  
          catch (Throwable ex) {  
              completeThrowable(ex);  
          }  
            
          final boolean claim() {  
              Executor e = executor;  
              if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {  
                  if (e == null)  
                      return true;  
                  executor = null// disable  
                  e.execute(this);  
              }  
              return false;  
          }  

          claim方法是邏輯:

          • 如果異步線程為null。說明同步,那么直接返回true。最后上層函數(shù)會調(diào)用f.accept(s)同步執(zhí)行任務(wù)。

          • 如果異步線程不為null,那么使用異步線程去執(zhí)行this。

          this的run任務(wù)如下。也就是在異步線程同步調(diào)用tryFire方法。達(dá)到其被異步線程執(zhí)行的目的。

          public final void run(){   
             tryFire(ASYNC);   
          }  

          看完上面的邏輯,我們基本理解依賴任務(wù)的邏輯。

          其實(shí)就是先判斷源任務(wù)是否完成,如果完成,直接在對應(yīng)線程執(zhí)行以來任務(wù)(如果是同步,則在當(dāng)前線程處理,否則在異步線程處理)

          如果任務(wù)沒有完成,直接返回,因?yàn)榈热蝿?wù)完成之后會通過postComplete去觸發(fā)調(diào)用依賴任務(wù)。

          postComplete方法
          final void postComplete() {  
              /*  
               * On each step, variable f holds current dependents to pop  
               * and run.  It is extended along only one path at a time,  
               * pushing others to avoid unbounded recursion.  
               */
            
              CompletableFuture<?> f = this; Completion h;  
              while ((h = f.stack) != null ||  
                     (f != this && (h = (f = this).stack) != null)) {  
                  CompletableFuture<?> d; Completion t;  
                  if (f.casStack(h, t = h.next)) {  
                      if (t != null) {  
                          if (f != this) {  
                              pushStack(h);  
                              continue;  
                          }  
                          h.next = null;    // detach  
                      }  
                      f = (d = h.tryFire(NESTED)) == null ? this : d;  
                  }  
              }  
          }  

          在源任務(wù)完成之后會調(diào)用。

          其實(shí)邏輯很簡單,就是迭代堆棧的依賴任務(wù)。調(diào)用h.tryFire方法。NESTED就是為了避免遞歸死循環(huán)。因?yàn)镕irePost會調(diào)用postComplete。如果是NESTED,則不調(diào)用。

          堆棧的內(nèi)容其實(shí)就是在依賴任務(wù)創(chuàng)建的時候加入進(jìn)去的。上面我們已經(jīng)提到過。

          4.總結(jié)

          基本上述源碼已經(jīng)分析了邏輯。

          因?yàn)樯婕爱惒降炔僮鳎覀冃枰硪幌拢ㄟ@里針對全異步任務(wù)):

          1. 創(chuàng)建CompletableFuture成功之后會通過異步線程去執(zhí)行對應(yīng)任務(wù)。

          2. 如果CompletableFuture還有依賴任務(wù)(異步),會將任務(wù)加入到CompletableFuture的堆棧保存起來。以供后續(xù)完成后執(zhí)行依賴任務(wù)。

          當(dāng)然,創(chuàng)建依賴任務(wù)并不只是將其加入堆棧。如果源任務(wù)在創(chuàng)建依賴任務(wù)的時候已經(jīng)執(zhí)行完成,那么當(dāng)前線程會觸發(fā)依賴任務(wù)的異步線程直接處理依賴任務(wù)。并且會告訴堆棧其他的依賴任務(wù)源任務(wù)已經(jīng)完成。

          主要是考慮代碼的復(fù)用。所以邏輯相對難理解。

          postComplete方法會被源任務(wù)線程執(zhí)行完源任務(wù)后調(diào)用。同樣也可能被依賴任務(wù)線程后調(diào)用。

          執(zhí)行依賴任務(wù)的方法主要就是靠tryFire方法。因?yàn)檫@個方法可能會被多種不同類型線程觸發(fā),所以邏輯也繞一點(diǎn)。(其他依賴任務(wù)線程、源任務(wù)線程、當(dāng)前依賴任務(wù)線程)

          • 如果是當(dāng)前依賴任務(wù)線程,那么會執(zhí)行依賴任務(wù),并且會通知其他依賴任務(wù)。

          • 如果是源任務(wù)線程,和其他依賴任務(wù)線程,則將任務(wù)轉(zhuǎn)換給依賴線程去執(zhí)行。不需要通知其他依賴任務(wù),避免死遞歸。

          不得不說Doug Lea的編碼,真的是藝術(shù)。代碼的復(fù)用性全體現(xiàn)在邏輯上了。




          end





          頂級程序員:topcoding

          做最好的程序員社區(qū):Java后端開發(fā)、Python、大數(shù)據(jù)、AI


          一鍵三連「分享」、「點(diǎn)贊」和「在看」



          瀏覽 65
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  99操逼| 男女拍拍视频网站 | 囯产精品久久久久久久久久辛辛 | 日本精品视频网站 | 色色色色色综合 |