CompletableFuture:讓你的代碼免受阻塞之苦
你知道的越多,不知道的就越多,業(yè)余的像一棵小草!
你來(lái),我們一起精進(jìn)!你不來(lái),我和你的競(jìng)爭(zhēng)對(duì)手一起精進(jìn)!
編輯:業(yè)余草
juejin.cn/post/6844904024332828685
推薦:https://www.xttblog.com/?p=5300
?提高應(yīng)用性能的時(shí)候很容易就會(huì)想起異步,異步去處理一些任務(wù)這樣主線程可以盡快響應(yīng)。
?
寫(xiě)在前面
通過(guò)閱讀本篇文章你將了解到:
CompletableFuture的使用 CompletableFure異步和同步的性能測(cè)試 已經(jīng)有了Future為什么仍需要在JDK1.8中引入CompletableFuture CompletableFuture的應(yīng)用場(chǎng)景 對(duì)CompletableFuture的使用優(yōu)化
場(chǎng)景說(shuō)明
查詢所有商店某個(gè)商品的價(jià)格并返回,并且查詢商店某個(gè)商品的價(jià)格的API為同步 一個(gè)Shop類,提供一個(gè)名為getPrice的同步方法
店鋪類:Shop.java
public?class?Shop?{
????private?Random?random?=?new?Random();
????/**
?????*?根據(jù)產(chǎn)品名查找價(jià)格
?????*?*/
????public?double?getPrice(String?product)?{
????????return?calculatePrice(product);
????}
????/**
?????*?計(jì)算價(jià)格
?????*
?????*?@param?product
?????*?@return
?????*?*/
????private?double?calculatePrice(String?product)?{
????????delay();
????????//random.nextDouble()隨機(jī)返回折扣
????????return?random.nextDouble()?*?product.charAt(0)?+?product.charAt(1);
????}
????/**
?????*?通過(guò)睡眠模擬其他耗時(shí)操作
?????*?*/
????private?void?delay()?{
????????try?{
????????????Thread.sleep(1000);
????????}?catch?(InterruptedException?e)?{
????????????e.printStackTrace();
????????}
????}
}
查詢商品的價(jià)格為同步方法,并通過(guò)sleep方法模擬其他操作。這個(gè)場(chǎng)景模擬了當(dāng)需要調(diào)用第三方API,但第三方提供的是同步API,在無(wú)法修改第三方API時(shí)如何設(shè)計(jì)代碼調(diào)用提高應(yīng)用的性能和吞吐量,這時(shí)候可以使用CompletableFuture類
CompletableFuture使用
Completable是Future接口的實(shí)現(xiàn)類,在JDK1.8中引入
「CompletableFuture的創(chuàng)建:」
使用new方法
CompletableFuture?futurePrice?=?new?CompletableFuture<>();
使用CompletableFuture#completedFuture靜態(tài)方法創(chuàng)建
public?static??CompletableFuture?completedFuture(U?value)?{
????return?new?CompletableFuture((value?==?null)???NIL?:?value);
}
參數(shù)的值為任務(wù)執(zhí)行完的結(jié)果,一般該方法在實(shí)際應(yīng)用中較少應(yīng)用
使用 CompletableFuture#supplyAsync靜態(tài)方法創(chuàng)建 supplyAsync有兩個(gè)重載方法:
//方法一
public?static??CompletableFuture?supplyAsync(Supplier?supplier)?{
????return?asyncSupplyStage(asyncPool,?supplier);
}
//方法二
public?static??CompletableFuture?supplyAsync(Supplier?supplier,
???????????????????????????????????????????????????Executor?executor)?{
????return?asyncSupplyStage(screenExecutor(executor),?supplier);
}
使用CompletableFuture#runAsync靜態(tài)方法創(chuàng)建 runAsync有兩個(gè)重載方法
//方法一
public?static?CompletableFuture?runAsync(Runnable?runnable)? {
????return?asyncRunStage(asyncPool,?runnable);
}
//方法二
public?static?CompletableFuture?runAsync(Runnable?runnable,?Executor?executor)? {
????return?asyncRunStage(screenExecutor(executor),?runnable);
}
說(shuō)明:
兩個(gè)重載方法之間的區(qū)別 => 后者可以傳入自定義Executor,前者是默認(rèn)的,使用的ForkJoinPool
supplyAsync和runAsync方法之間的區(qū)別 => 前者有返回值,后者無(wú)返回值
Supplier是函數(shù)式接口,因此該方法需要傳入該接口的實(shí)現(xiàn)類,追蹤源碼會(huì)發(fā)現(xiàn)在run方法中會(huì)調(diào)用該接口的方法。因此使用該方法創(chuàng)建CompletableFuture對(duì)象只需重寫(xiě)Supplier中的get方法,在get方法中定義任務(wù)即可。又因?yàn)楹瘮?shù)式接口可以使用Lambda表達(dá)式,和new創(chuàng)建CompletableFuture對(duì)象相比代碼會(huì)「簡(jiǎn)潔」不少
「結(jié)果的獲?。骸?/strong> 對(duì)于結(jié)果的獲取CompltableFuture類提供了四種方式
//方式一
public?T?get()
//方式二
public?T?get(long?timeout,?TimeUnit?unit)
//方式三
public?T?getNow(T?valueIfAbsent)
//方式四
public?T?join()
說(shuō)明:
get()和get(long timeout, TimeUnit unit) => 在Future中就已經(jīng)提供了,后者提供超時(shí)處理,如果在指定時(shí)間內(nèi)未獲取結(jié)果將拋出超時(shí)異常 getNow => 立即獲取結(jié)果不阻塞,結(jié)果計(jì)算已完成將返回結(jié)果或計(jì)算過(guò)程中的異常,如果未計(jì)算完成將返回設(shè)定的valueIfAbsent值 join => 方法里不會(huì)拋出異常
示例:
public?class?AcquireResultTest?{
????public?static?void?main(String[]?args)?throws?ExecutionException,?InterruptedException?{
????????//getNow方法測(cè)試
????????CompletableFuture?cp1?=?CompletableFuture.supplyAsync(()?->?{
????????????try?{
????????????????Thread.sleep(60?*?1000?*?60?);
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????????return?"hello?world";
????????});
????????System.out.println(cp1.getNow("hello?h2t"));
????????//join方法測(cè)試
????????CompletableFuture?cp2?=?CompletableFuture.supplyAsync((()->?1?/?0));
????????System.out.println(cp2.join());
????????//get方法測(cè)試
????????CompletableFuture?cp3?=?CompletableFuture.supplyAsync((()->?1?/?0));
????????System.out.println(cp3.get());
????}
}
說(shuō)明:
第一個(gè)執(zhí)行結(jié)果為hello h2t,因?yàn)橐人?分鐘結(jié)果不能立即獲取 join方法獲取結(jié)果方法里不會(huì)拋異常,但是執(zhí)行結(jié)果會(huì)拋異常,拋出的異常為CompletionException get方法獲取結(jié)果方法里將拋出異常,執(zhí)行結(jié)果拋出的異常為ExecutionException 「異常處理:」 使用靜態(tài)方法創(chuàng)建的CompletableFuture對(duì)象無(wú)需顯示處理異常,使用new創(chuàng)建的對(duì)象需要調(diào)用completeExceptionally方法設(shè)置捕獲到的異常,舉例說(shuō)明:
CompletableFuture?completableFuture?=?new?CompletableFuture();
new?Thread(()?->?{
?????try?{
?????????//doSomething,調(diào)用complete方法將其他方法的執(zhí)行結(jié)果記錄在completableFuture對(duì)象中
?????????completableFuture.complete(null);
?????}?catch?(Exception?e)?{
?????????//異常處理
?????????completableFuture.completeExceptionally(e);
??????}
?}).start();
同步方法Pick異步方法查詢所有店鋪某個(gè)商品價(jià)格
店鋪為一個(gè)列表:
private?static?List?shopList?=?Arrays.asList(
????????new?Shop("BestPrice"),
????????new?Shop("LetsSaveBig"),
????????new?Shop("MyFavoriteShop"),
????????new?Shop("BuyItAll")
);
「同步方法:」
private?static?List?findPriceSync(String?product)? {
????return?shopList.stream()
????????????.map(shop?->?String.format("%s?price?is?%.2f",
????????????????????shop.getName(),?shop.getPrice(product)))??//格式轉(zhuǎn)換
????????????.collect(Collectors.toList());
}
「異步方法:」
private?static?List?findPriceAsync(String?product)? {
????List>?completableFutureList?=?shopList.stream()
????????????//轉(zhuǎn)異步執(zhí)行
????????????.map(shop?->?CompletableFuture.supplyAsync(
????????????????????()?->?String.format("%s?price?is?%.2f",
????????????????????????????shop.getName(),?shop.getPrice(product))))??//格式轉(zhuǎn)換
????????????.collect(Collectors.toList());
????return?completableFutureList.stream()
????????????.map(CompletableFuture::join)??//獲取結(jié)果不會(huì)拋出異常
????????????.collect(Collectors.toList());
}
「性能測(cè)試結(jié)果:」
Find?Price?Sync?Done?in?4141
Find?Price?Async?Done?in?1033
「異步」執(zhí)行效率「提高四倍」
為什么仍需要CompletableFuture
在JDK1.8以前,通過(guò)調(diào)用線程池的submit方法可以讓任務(wù)以異步的方式運(yùn)行,該方法會(huì)返回一個(gè)Future對(duì)象,通過(guò)調(diào)用get方法獲取異步執(zhí)行的結(jié)果:
private?static?List?findPriceFutureAsync(String?product)? {
????ExecutorService?es?=?Executors.newCachedThreadPool();
????List>?futureList?=?shopList.stream().map(shop?->?es.submit(()?->?String.format("%s?price?is?%.2f",
????????????shop.getName(),?shop.getPrice(product)))).collect(Collectors.toList());
????return?futureList.stream()
????????????.map(f?->?{
????????????????String?result?=?null;
????????????????try?{
????????????????????result?=?f.get();
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?catch?(ExecutionException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????????return?result;
????????????}).collect(Collectors.toList());
}
既生瑜何生亮,為什么仍需要引入CompletableFuture?
對(duì)于簡(jiǎn)單的業(yè)務(wù)場(chǎng)景使用Future完全沒(méi)有,但是想將多個(gè)異步任務(wù)的計(jì)算結(jié)果組合起來(lái),后一個(gè)異步任務(wù)的計(jì)算結(jié)果需要前一個(gè)異步任務(wù)的值等等,使用Future提供的那點(diǎn)API就囊中羞澀,處理起來(lái)不夠優(yōu)雅,這時(shí)候還是讓CompletableFuture以「聲明式」的方式優(yōu)雅的處理這些需求。而且在Future編程中想要拿到Future的值然后拿這個(gè)值去做后續(xù)的計(jì)算任務(wù),只能通過(guò)輪詢的方式去判斷任務(wù)是否完成這樣非常占CPU并且代碼也不優(yōu)雅,用偽代碼表示如下:
while(future.isDone())?{
????result?=?future.get();
????doSomrthingWithResult(result);
}?
但CompletableFuture提供了API幫助我們實(shí)現(xiàn)這樣的需求
其他API介紹
whenComplete計(jì)算結(jié)果的處理:
對(duì)前面計(jì)算結(jié)果進(jìn)行處理,無(wú)法返回新值
提供了三個(gè)方法:
//方法一
public?CompletableFuture?whenComplete(BiConsumer?super?T,??super?Throwable>?action)
//方法二
public?CompletableFuture?whenCompleteAsync(BiConsumer?super?T,??super?Throwable>?action)
//方法三
public?CompletableFuture?whenCompleteAsync(BiConsumer?super?T,??super?Throwable>?action,?Executor?executor)
說(shuō)明:
BiFunction super T,? super U,? extends V>fn參數(shù) => 定義對(duì)結(jié)果的處理Executor executor參數(shù) => 自定義線程池 以async結(jié)尾的方法將會(huì)在一個(gè)新的線程中執(zhí)行組合操作
示例:
public?class?WhenCompleteTest?{
????public?static?void?main(String[]?args)?{
????????CompletableFuture?cf1?=?CompletableFuture.supplyAsync(()?->?"hello");
????????CompletableFuture?cf2?=?cf1.whenComplete((v,?e)?->
????????????????System.out.println(String.format("value:%s,?exception:%s",?v,?e)));
????????System.out.println(cf2.join());
????}
}
thenApply轉(zhuǎn)換:
將前面計(jì)算結(jié)果的的CompletableFuture傳遞給thenApply,返回thenApply處理后的結(jié)果??梢哉J(rèn)為通過(guò)thenApply方法實(shí)現(xiàn)CompletableFuture至CompletableFuture的轉(zhuǎn)換。白話一點(diǎn)就是將CompletableFuture的計(jì)算結(jié)果作為thenApply方法的參數(shù),返回thenApply方法處理后的結(jié)果
提供了三個(gè)方法:
//方法一
public??CompletableFuture?thenApply(
????Function?super?T,??extends?U>?fn)?{
????return?uniApplyStage(null,?fn);
}
//方法二
public??CompletableFuture?thenApplyAsync(
????Function?super?T,??extends?U>?fn)?{
????return?uniApplyStage(asyncPool,?fn);
}
//方法三
public??CompletableFuture?thenApplyAsync(
????Function?super?T,??extends?U>?fn,?Executor?executor)?{
????return?uniApplyStage(screenExecutor(executor),?fn);
}
說(shuō)明:
Function super T,? extends U>fn參數(shù) => 對(duì)前一個(gè)CompletableFuture 計(jì)算結(jié)果的轉(zhuǎn)化操作Executor executor參數(shù) => 自定義線程池 以async結(jié)尾的方法將會(huì)在一個(gè)新的線程中執(zhí)行組合操作 示例:
public?class?ThenApplyTest?{
????public?static?void?main(String[]?args)?throws?ExecutionException,?InterruptedException?{
????????CompletableFuture?result?=?CompletableFuture.supplyAsync(ThenApplyTest::randomInteger).thenApply((i)?->?i?*?8);
????????System.out.println(result.get());
????}
????public?static?Integer?randomInteger()?{
????????return?10;
????}
}
這里將前一個(gè)CompletableFuture計(jì)算出來(lái)的結(jié)果擴(kuò)大八倍
thenAccept結(jié)果處理:
thenApply也可以歸類為對(duì)結(jié)果的處理,thenAccept和thenApply的區(qū)別就是沒(méi)有返回值
提供了三個(gè)方法:
//方法一
public?CompletableFuture?thenAccept(Consumer?super?T>?action)? {
????return?uniAcceptStage(null,?action);
}
//方法二
public?CompletableFuture?thenAcceptAsync(Consumer?super?T>?action)? {
????return?uniAcceptStage(asyncPool,?action);
}
//方法三
public?CompletableFuture?thenAcceptAsync(Consumer?super?T>?action,
???????????????????????????????????????????????Executor?executor)? {
????return?uniAcceptStage(screenExecutor(executor),?action);
}
說(shuō)明:
Consumer super T>action參數(shù) => 對(duì)前一個(gè)CompletableFuture計(jì)算結(jié)果的操作Executor executor參數(shù) => 自定義線程池 同理以async結(jié)尾的方法將會(huì)在一個(gè)新的線程中執(zhí)行組合操作 示例:
public?class?ThenAcceptTest?{
????public?static?void?main(String[]?args)?{
????????CompletableFuture.supplyAsync(ThenAcceptTest::getList).thenAccept(strList?->?strList.stream()
????????????????.forEach(m?->?System.out.println(m)));
????}
????public?static?List?getList()? {
????????return?Arrays.asList("a",?"b",?"c");
????}
}
將前一個(gè)CompletableFuture計(jì)算出來(lái)的結(jié)果打印出來(lái)
thenCompose異步結(jié)果流水化:
thenCompose方法可以將兩個(gè)異步操作進(jìn)行流水操作
提供了三個(gè)方法:
//方法一
public??CompletableFuture?thenCompose(
????Function?super?T,???extends?CompletionStage>?fn)?{
????return?uniComposeStage(null,?fn);
}
//方法二
public??CompletableFuture?thenComposeAsync(
????Function?super?T,???extends?CompletionStage>?fn)?{
????return?uniComposeStage(asyncPool,?fn);
}
//方法三
public??CompletableFuture?thenComposeAsync(
????Function?super?T,???extends?CompletionStage>?fn,
????Executor?executor)?{
????return?uniComposeStage(screenExecutor(executor),?fn);
}
說(shuō)明:
Function super T, ? extends CompletionStage> fn參數(shù) => 當(dāng)前CompletableFuture計(jì)算結(jié)果的執(zhí)行Executor executor參數(shù) => 自定義線程池 同理以async結(jié)尾的方法將會(huì)在一個(gè)新的線程中執(zhí)行組合操作
示例:
public?class?ThenComposeTest?{
????public?static?void?main(String[]?args)?throws?ExecutionException,?InterruptedException?{
????????CompletableFuture?result?=?CompletableFuture.supplyAsync(ThenComposeTest::getInteger)
????????????????.thenCompose(i?->?CompletableFuture.supplyAsync(()?->?i?*?10));
????????System.out.println(result.get());
????}
????private?static?int?getInteger()?{
????????return?666;
????}
????private?static?int?expandValue(int?num)?{
????????return?num?*?10;
????}
}
執(zhí)行流程圖:

thenCombine組合結(jié)果:
thenCombine方法將兩個(gè)無(wú)關(guān)的CompletableFuture組合起來(lái),第二個(gè)Completable并不依賴第一個(gè)Completable的結(jié)果
提供了三個(gè)方法:
//方法一
public??CompletableFuture?thenCombine(?
????CompletionStage?extends?U>?other,
????BiFunction?super?T,??super?U,??extends?V>?fn)? {
????return?biApplyStage(null,?other,?fn);
}
??//方法二
??public??CompletableFuture?thenCombineAsync(
??????CompletionStage?extends?U>?other,
??????BiFunction?super?T,??super?U,??extends?V>?fn)? {
??????return?biApplyStage(asyncPool,?other,?fn);
??}
??//方法三
??public??CompletableFuture?thenCombineAsync(
??????CompletionStage?extends?U>?other,
??????BiFunction?super?T,??super?U,??extends?V>?fn,?Executor?executor)? {
??????return?biApplyStage(screenExecutor(executor),?other,?fn);
??}
說(shuō)明:
CompletionStage extends U>other參數(shù) => 新的CompletableFuture的計(jì)算結(jié)果BiFunction super T,? super U,? extends V>fn參數(shù) => 定義了兩個(gè)CompletableFuture對(duì)象「完成計(jì)算后」如何合并結(jié)果,該參數(shù)是一個(gè)函數(shù)式接口,因此可以使用Lambda表達(dá)式Executor executor參數(shù) => 自定義線程池 同理以async結(jié)尾的方法將會(huì)在一個(gè)新的線程中執(zhí)行組合操作
示例:
public?class?ThenCombineTest?{
????private?static?Random?random?=?new?Random();
????public?static?void?main(String[]?args)?throws?ExecutionException,?InterruptedException?{
????????CompletableFuture?result?=?CompletableFuture.supplyAsync(ThenCombineTest::randomInteger).thenCombine(
????????????????CompletableFuture.supplyAsync(ThenCombineTest::randomInteger),?(i,?j)?->?i?*?j
????????);
????????System.out.println(result.get());
????}
????public?static?Integer?randomInteger()?{
????????return?random.nextInt(100);
????}
}
將兩個(gè)線程計(jì)算出來(lái)的值做一個(gè)乘法在返回,執(zhí)行流程圖:

allOf&anyOf組合多個(gè)CompletableFuture:
方法介紹:
//allOf
public?static?CompletableFuture?allOf(CompletableFuture>...?cfs)? {
????return?andTree(cfs,?0,?cfs.length?-?1);
}
//anyOf
public?static?CompletableFuture{
????return?orTree(cfs,?0,?cfs.length?-?1);
}
說(shuō)明:
allOf => 所有的CompletableFuture都執(zhí)行完后執(zhí)行計(jì)算。 anyOf => 任意一個(gè)CompletableFuture執(zhí)行完后就會(huì)執(zhí)行計(jì)算
示例:
allOf方法測(cè)試
public?class?AllOfTest?{
????public?static?void?main(String[]?args)?throws?ExecutionException,?InterruptedException?{
????????CompletableFuture?future1?=?CompletableFuture.supplyAsync(()?->?{
????????????System.out.println("hello");
????????????return?null;
????????});
????????CompletableFuture?future2?=?CompletableFuture.supplyAsync(()?->?{
????????????System.out.println("world");?return?null;
????????});
????????CompletableFuture?result?=?CompletableFuture.allOf(future1,?future2);
????????System.out.println(result.get());
????}
}
allOf方法沒(méi)有返回值,適合沒(méi)有返回值并且需要前面所有任務(wù)執(zhí)行完畢才能執(zhí)行后續(xù)任務(wù)的應(yīng)用場(chǎng)景
anyOf方法測(cè)試
public?class?AnyOfTest?{
????private?static?Random?random?=?new?Random();
????public?static?void?main(String[]?args)?throws?ExecutionException,?InterruptedException?{
????????CompletableFuture?future1?=?CompletableFuture.supplyAsync(()?->?{
????????????randomSleep();
????????????System.out.println("hello");
????????????return?"hello";});
????????CompletableFuture?future2?=?CompletableFuture.supplyAsync(()?->?{
????????????randomSleep();
????????????System.out.println("world");
????????????return?"world";
????????});
????????CompletableFuture 兩個(gè)線程都會(huì)將結(jié)果打印出來(lái),但是get方法只會(huì)返回最先完成任務(wù)的結(jié)果。該方法比較適合只要有一個(gè)返回值就可以繼續(xù)執(zhí)行其他任務(wù)的應(yīng)用場(chǎng)景
注意點(diǎn)
很多方法都提供了異步實(shí)現(xiàn)【帶async后綴】,但是需小心謹(jǐn)慎使用這些異步方法,因?yàn)楫惒揭馕吨嬖谏舷挛那袚Q,可能性能不一定比同步好。如果需要使用異步的方法,「先做測(cè)試」,用測(cè)試數(shù)據(jù)說(shuō)話!?。?/p>
CompletableFuture的應(yīng)用場(chǎng)景
存在IO密集型的任務(wù)可以選擇CompletableFuture,IO部分交由另外一個(gè)線程去執(zhí)行。Logback、Log4j2異步日志記錄的實(shí)現(xiàn)原理就是新起了一個(gè)線程去執(zhí)行IO操作,這部分可以以CompletableFuture.runAsync(()->{ioOperation();})的方式去調(diào)用,有關(guān)Logback異步日志記錄的原理我們后續(xù)發(fā)文來(lái)闡述。如果是CPU密集型就不推薦使用了推薦使用并行流。
優(yōu)化空間
supplyAsync執(zhí)行任務(wù)底層實(shí)現(xiàn):
public?static??CompletableFuture?supplyAsync(Supplier?supplier)?{
????return?asyncSupplyStage(asyncPool,?supplier);
}
static??CompletableFuture?asyncSupplyStage(Executor?e,?Supplier?f)?{
????if?(f?==?null)?throw?new?NullPointerException();
????CompletableFuture?d?=?new?CompletableFuture();
????e.execute(new?AsyncSupply(d,?f));
????return?d;
}
底層調(diào)用的是線程池去執(zhí)行任務(wù),而CompletableFuture中默認(rèn)線程池為ForkJoinPool
private?static?final?Executor?asyncPool?=?useCommonPool??
????????ForkJoinPool.commonPool()?:?new?ThreadPerTaskExecutor();
ForkJoinPool線程池的大小取決于CPU的核數(shù)。CPU密集型任務(wù)線程池大小配置為CPU核心數(shù)就可以了,但是IO密集型,線程池的大小由CPU數(shù)量 * CPU利用率 * (1 + 線程等待時(shí)間/線程CPU時(shí)間)確定。而CompletableFuture的應(yīng)用場(chǎng)景就是IO密集型任務(wù),因此默認(rèn)的ForkJoinPool一般無(wú)法達(dá)到最佳性能,我們需自己根據(jù)業(yè)務(wù)創(chuàng)建線程池
最后需要本文案例完整源碼的可以加我微信:xttblog2,分享給你!
