面試官:jdk8的異步CompletableFuture實現(xiàn)原理和使用場景,具體講下

來源:blog.csdn.net/weixin_39332800/article/
details/108185931
面試中關(guān)于JDK中異步的API能說清楚,談薪資時也比較有底氣,現(xiàn)在并發(fā)多線程是面試官最愛問的題型
1.概述
CompletableFuture是jdk1.8引入的實現(xiàn)類。擴展了Future和CompletionStage,是一個可以在任務(wù)完成階段觸發(fā)一些操作Future。簡單的來講就是可以實現(xiàn)異步回調(diào)。
2.為什么引入CompletableFuture
對于jdk1.5的Future,雖然提供了異步處理任務(wù)的能力,但是獲取結(jié)果的方式很不優(yōu)雅,還是需要通過阻塞(或者輪訓)的方式。如何避免阻塞呢?其實就是注冊回調(diào)。
業(yè)界結(jié)合觀察者模式實現(xiàn)異步回調(diào)。也就是當任務(wù)執(zhí)行完成后去通知觀察者。比如Netty的ChannelFuture,可以通過注冊監(jiān)聽實現(xiàn)異步結(jié)果的處理。
Netty的ChannelFuture
public?Promise?addListener(GenericFutureListener?extends?Future?super?V>>?listener)? {??
????checkNotNull(listener,?"listener");??
????synchronized?(this)?{??
????????addListener0(listener);??
????}??
????if?(isDone())?{??
????????notifyListeners();??
????}??
????return?this;??
}??
private?boolean?setValue0(Object?objResult)?{??
????if?(RESULT_UPDATER.compareAndSet(this,?null,?objResult)?||??
????????RESULT_UPDATER.compareAndSet(this,?UNCANCELLABLE,?objResult))?{??
????????if?(checkNotifyWaiters())?{??
????????????notifyListeners();??
????????}??
????????return?true;??
????}??
????return?false;??
}??
通過addListener方法注冊監(jiān)聽。如果任務(wù)完成,會調(diào)用notifyListeners通知。
CompletableFuture通過擴展Future,引入函數(shù)式編程,通過回調(diào)的方式去處理結(jié)果。
3.功能
CompletableFuture的功能主要體現(xiàn)在他的CompletionStage。
可以實現(xiàn)如下等功能
轉(zhuǎn)換(thenCompose)
組合(thenCombine)
消費(thenAccept)
運行(thenRun)。
帶返回的消費(thenApply)
消費和運行的區(qū)別:
消費使用執(zhí)行結(jié)果。運行則只是運行特定任務(wù)。具體其他功能大家可以根據(jù)需求自行查看。
CompletableFuture借助CompletionStage的方法可以實現(xiàn)鏈式調(diào)用。并且可以選擇同步或者異步兩種方式。
這里舉個簡單的例子來體驗一下他的功能。
public?static?void?thenApply()?{??
????ExecutorService?executorService?=?Executors.newFixedThreadPool(2);??
????CompletableFuture?cf?=?CompletableFuture.supplyAsync(()?->?{??
????????try?{??
????????????//??Thread.sleep(2000);??
????????}?catch?(Exception?e)?{??
????????????e.printStackTrace();??
????????}??
????????System.out.println("supplyAsync?"?+?Thread.currentThread().getName());??
????????return?"hello";??
????},?executorService).thenApplyAsync(s?->?{??
????????System.out.println(s?+?"world");??
????????return?"hhh";??
????},?executorService);??
????cf.thenRunAsync(()?->?{??
????????System.out.println("ddddd");??
????});??
????cf.thenRun(()?->?{??
????????System.out.println("ddddsd");??
????});??
????cf.thenRun(()?->?{??
????????System.out.println(Thread.currentThread());??
????????System.out.println("dddaewdd");??
????});??
}??
執(zhí)行結(jié)果
supplyAsync?pool-1-thread-1??
helloworld??
ddddd??
ddddsd??
Thread[main,5,main]??
dddaewdd??
根據(jù)結(jié)果我們可以看到會有序執(zhí)行對應任務(wù)。
注意:
如果是同步執(zhí)行cf.thenRun。他的執(zhí)行線程可能main線程,也可能是執(zhí)行源任務(wù)的線程。如果執(zhí)行源任務(wù)的線程在main調(diào)用之前執(zhí)行完了任務(wù)。那么cf.thenRun方法會由main線程調(diào)用。
這里說明一下,如果是同一任務(wù)的依賴任務(wù)有多個:
如果這些依賴任務(wù)都是同步執(zhí)行。那么假如這些任務(wù)被當前調(diào)用線程(main)執(zhí)行,則是有序執(zhí)行,假如被執(zhí)行源任務(wù)的線程執(zhí)行,那么會是倒序執(zhí)行。因為內(nèi)部任務(wù)數(shù)據(jù)結(jié)構(gòu)為LIFO。
如果這些依賴任務(wù)都是異步執(zhí)行,那么他會通過異步線程池去執(zhí)行任務(wù)。不能保證任務(wù)的執(zhí)行順序。
上面的結(jié)論是通過閱讀源代碼得到的。下面我們深入源代碼。
4.源碼追蹤
創(chuàng)建CompletableFuture
創(chuàng)建的方法有很多,甚至可以直接new一個。我們來看一下supplyAsync異步創(chuàng)建的方法。
public?static??CompletableFuture?supplyAsync(Supplier?supplier,??
???????????????????????????????????????????????????Executor?executor)?{??
????return?asyncSupplyStage(screenExecutor(executor),?supplier);??
}??
static?Executor?screenExecutor(Executor?e)?{??
????if?(!useCommonPool?&&?e?==?ForkJoinPool.commonPool())??
????????return?asyncPool;??
????if?(e?==?null)?throw?new?NullPointerException();??
????return?e;??
}??
入?yún)upplier,帶返回值的函數(shù)。如果是異步方法,并且傳遞了執(zhí)行器,那么會使用傳入的執(zhí)行器去執(zhí)行任務(wù)。否則采用公共的ForkJoin并行線程池,如果不支持并行,新建一個線程去執(zhí)行。關(guān)注Java項目分享
這里我們需要注意ForkJoin是通過守護線程去執(zhí)行任務(wù)的。所以必須有非守護線程的存在才行。
asyncSupplyStage方法
static??CompletableFuture?asyncSupplyStage(Executor?e,??
?????????????????????????????????????????????????Supplier?f)?{??
????if?(f?==?null)?throw?new?NullPointerException();??
????CompletableFuture?d?=?new?CompletableFuture();??
????e.execute(new?AsyncSupply(d,?f));??
????return?d;??
}??
這里會創(chuàng)建一個用于返回的CompletableFuture。
然后構(gòu)造一個AsyncSupply,并將創(chuàng)建的CompletableFuture作為構(gòu)造參數(shù)傳入。
那么,任務(wù)的執(zhí)行完全依賴AsyncSupply。
AsyncSupply#run
public?void?run()?{??
????CompletableFuture?d;?Supplier?f;??
????if?((d?=?dep)?!=?null?&&?(f?=?fn)?!=?null)?{??
????????dep?=?null;?fn?=?null;??
????????if?(d.result?==?null)?{??
????????????try?{??
????????????????d.completeValue(f.get());??
????????????}?catch?(Throwable?ex)?{??
????????????????d.completeThrowable(ex);??
????????????}??
????????}??
????????d.postComplete();??
????}??
}??
該方法會調(diào)用Supplier的get方法。并將結(jié)果設(shè)置到CompletableFuture中。我們應該清楚這些操作都是在異步線程中調(diào)用的。
d.postComplete方法就是通知任務(wù)執(zhí)行完成。觸發(fā)后續(xù)依賴任務(wù)的執(zhí)行,也就是實現(xiàn)CompletionStage的關(guān)鍵點。關(guān)注Java項目分享
在看postComplete方法之前我們先來看一下創(chuàng)建依賴任務(wù)的邏輯。
thenAcceptAsync方法
public?CompletableFuture?thenAcceptAsync(Consumer?super?T>?action)? {??
????return?uniAcceptStage(asyncPool,?action);??
}??
private?CompletableFuture?uniAcceptStage(Executor?e,??
???????????????????????????????????????????????Consumer?super?T>?f)? {??
????if?(f?==?null)?throw?new?NullPointerException();??
????CompletableFuture?d?=?new?CompletableFuture();??
????if?(e?!=?null?||?!d.uniAccept(this,?f,?null))?{??
????????#?1??
????????UniAccept?c?=?new?UniAccept(e,?d,?this,?f);??
????????push(c);??
????????c.tryFire(SYNC);??
????}??
????return?d;??
}??
上面提到過。thenAcceptAsync是用來消費CompletableFuture的。該方法調(diào)用uniAcceptStage。
uniAcceptStage邏輯:
構(gòu)造一個CompletableFuture,主要是為了鏈式調(diào)用。
如果為異步任務(wù),直接返回。因為源任務(wù)結(jié)束后會觸發(fā)異步線程執(zhí)行對應邏輯。
如果為同步任務(wù)(e==null),會調(diào)用d.uniAccept方法。這個方法在這里邏輯:如果源任務(wù)完成,調(diào)用f,返回true。否則進入if代碼塊(Mark 1)。
如果是異步任務(wù)直接進入if(Mark 1)。
Mark1邏輯:
構(gòu)造一個UniAccept,將其push入棧。這里通過CAS實現(xiàn)樂觀鎖實現(xiàn)。
調(diào)用c.tryFire方法。
final?CompletableFuture?tryFire(int?mode)? {??
????CompletableFuture?d;?CompletableFuture?a;??
????if?((d?=?dep)?==?null?||??
????????!d.uniAccept(a?=?src,?fn,?mode?>?0???null?:?this))??
????????return?null;??
????dep?=?null;?src?=?null;?fn?=?null;??
????return?d.postFire(a,?mode);??
}??
會調(diào)用d.uniAccept方法。其實該方法判斷源任務(wù)是否完成,如果完成則執(zhí)行依賴任務(wù),否則返回false。
如果依賴任務(wù)已經(jīng)執(zhí)行,調(diào)用d.postFire,主要就是Fire的后續(xù)處理。根據(jù)不同模式邏輯不同。
這里簡單說一下,其實mode有同步異步,和迭代。迭代為了避免無限遞歸。
這里強調(diào)一下d.uniAccept方法的第三個參數(shù)。
如果是異步調(diào)用(mode>0),傳入null。否則傳入this。
區(qū)別看下面代碼。c不為null會調(diào)用c.claim方法。關(guān)注Java項目分享
try?{??
????if?(c?!=?null?&&?!c.claim())??
????????return?false;??
????@SuppressWarnings("unchecked")?S?s?=?(S)?r;??
????f.accept(s);??
????completeNull();??
}?catch?(Throwable?ex)?{??
????completeThrowable(ex);??
}??
??
final?boolean?claim()?{??
????Executor?e?=?executor;??
????if?(compareAndSetForkJoinTaskTag((short)0,?(short)1))?{??
????????if?(e?==?null)??
????????????return?true;??
????????executor?=?null;?//?disable??
????????e.execute(this);??
????}??
????return?false;??
}??
claim方法是邏輯:
如果異步線程為null。說明同步,那么直接返回true。最后上層函數(shù)會調(diào)用f.accept(s)同步執(zhí)行任務(wù)。
如果異步線程不為null,那么使用異步線程去執(zhí)行this。
this的run任務(wù)如下。也就是在異步線程同步調(diào)用tryFire方法。達到其被異步線程執(zhí)行的目的。
public?final?void?run(){???
???tryFire(ASYNC);???
}??
看完上面的邏輯,我們基本理解依賴任務(wù)的邏輯。
其實就是先判斷源任務(wù)是否完成,如果完成,直接在對應線程執(zhí)行以來任務(wù)(如果是同步,則在當前線程處理,否則在異步線程處理)
如果任務(wù)沒有完成,直接返回,因為等任務(wù)完成之后會通過postComplete去觸發(fā)調(diào)用依賴任務(wù)。
postComplete方法
final?void?postComplete()?{??
????/*??
?????*?On?each?step,?variable?f?holds?current?dependents?to?pop??
?????*?and?run.??It?is?extended?along?only?one?path?at?a?time,??
?????*?pushing?others?to?avoid?unbounded?recursion.??
?????*/??
????CompletableFuture>?f?=?this;?Completion?h;??
????while?((h?=?f.stack)?!=?null?||??
???????????(f?!=?this?&&?(h?=?(f?=?this).stack)?!=?null))?{??
????????CompletableFuture>?d;?Completion?t;??
????????if?(f.casStack(h,?t?=?h.next))?{??
????????????if?(t?!=?null)?{??
????????????????if?(f?!=?this)?{??
????????????????????pushStack(h);??
????????????????????continue;??
????????????????}??
????????????????h.next?=?null;????//?detach??
????????????}??
????????????f?=?(d?=?h.tryFire(NESTED))?==?null???this?:?d;??
????????}??
????}??
}??
在源任務(wù)完成之后會調(diào)用。
其實邏輯很簡單,就是迭代堆棧的依賴任務(wù)。調(diào)用h.tryFire方法。NESTED就是為了避免遞歸死循環(huán)。因為FirePost會調(diào)用postComplete。如果是NESTED,則不調(diào)用。
堆棧的內(nèi)容其實就是在依賴任務(wù)創(chuàng)建的時候加入進去的。上面我們已經(jīng)提到過。關(guān)注Java項目分享
4.總結(jié)
基本上述源碼已經(jīng)分析了邏輯。
因為涉及異步等操作,我們需要理一下(這里針對全異步任務(wù)):
創(chuàng)建CompletableFuture成功之后會通過異步線程去執(zhí)行對應任務(wù)。
如果CompletableFuture還有依賴任務(wù)(異步),會將任務(wù)加入到CompletableFuture的堆棧保存起來。以供后續(xù)完成后執(zhí)行依賴任務(wù)。
當然,創(chuàng)建依賴任務(wù)并不只是將其加入堆棧。如果源任務(wù)在創(chuàng)建依賴任務(wù)的時候已經(jīng)執(zhí)行完成,那么當前線程會觸發(fā)依賴任務(wù)的異步線程直接處理依賴任務(wù)。并且會告訴堆棧其他的依賴任務(wù)源任務(wù)已經(jīng)完成。
主要是考慮代碼的復用。所以邏輯相對難理解。
postComplete方法會被源任務(wù)線程執(zhí)行完源任務(wù)后調(diào)用。同樣也可能被依賴任務(wù)線程后調(diào)用。
執(zhí)行依賴任務(wù)的方法主要就是靠tryFire方法。因為這個方法可能會被多種不同類型線程觸發(fā),所以邏輯也繞一點。(其他依賴任務(wù)線程、源任務(wù)線程、當前依賴任務(wù)線程)
如果是當前依賴任務(wù)線程,那么會執(zhí)行依賴任務(wù),并且會通知其他依賴任務(wù)。
如果是源任務(wù)線程,和其他依賴任務(wù)線程,則將任務(wù)轉(zhuǎn)換給依賴線程去執(zhí)行。不需要通知其他依賴任務(wù),避免死遞歸。
不得不說Doug Lea的編碼,真的是藝術(shù)。代碼的復用性全體現(xiàn)在邏輯上了。
程序汪資料鏈接
堪稱神級的Spring Boot手冊,從基礎(chǔ)入門到實戰(zhàn)進階
臥槽!字節(jié)跳動《算法中文手冊》火了,完整版 PDF 開放下載!
臥槽!阿里大佬總結(jié)的《圖解Java》火了,完整版PDF開放下載!
字節(jié)跳動總結(jié)的設(shè)計模式 PDF 火了,完整版開放下載!
歡迎添加程序汪個人微信 itwang008? 進粉絲群或圍觀朋友圈
