接口響應(yīng)慢?那是你沒用 CompletableFuture 來優(yōu)化!
共 59845字,需瀏覽 120分鐘
·
2024-04-26 12:16
來源:blog.csdn.net/qq_43372633/article/details/130814200
?? 歡迎加入小哈的星球 ,你將獲得: 專屬的項(xiàng)目實(shí)戰(zhàn) / Java 學(xué)習(xí)路線 / 一對一提問 / 學(xué)習(xí)打卡 / 贈(zèng)書福利
全棧前后端分離博客項(xiàng)目 2.0 版本完結(jié)啦, 演示鏈接:http://116.62.199.48/ ,新項(xiàng)目正在醞釀中。全程手摸手,后端 + 前端全棧開發(fā),從 0 到 1 講解每個(gè)功能點(diǎn)開發(fā)步驟,1v1 答疑,直到項(xiàng)目上線。目前已更新了239小節(jié),累計(jì)38w+字,講解圖:1645張,還在持續(xù)爆肝中.. 后續(xù)還會(huì)上新更多項(xiàng)目,目標(biāo)是將Java領(lǐng)域典型的項(xiàng)目都整一波,如秒殺系統(tǒng), 在線商城, IM即時(shí)通訊,Spring Cloud Alibaba 等等,戳我加入學(xué)習(xí),已有1300+小伙伴加入(早鳥價(jià)超低)
-
前言 -
為什么要用異步編程 -
回顧Future -
CompletableFuture使用場景 -
CompletableFuture注意點(diǎn)
前言
大多數(shù)程序員在平時(shí)工作中,都是增刪改查。這里我跟大家講解如何利用CompletableFuture優(yōu)化項(xiàng)目代碼,使項(xiàng)目性能更佳!
為什么要用異步編程
舉個(gè)例子:用戶登錄成功,需要返回前端用戶角色,菜單權(quán)限,個(gè)人信息,用戶余額,積分情況等。正常邏輯是依次查詢不同表,得到對應(yīng)的數(shù)據(jù)封裝返回給前端,代碼如下:
@Test
public void login(Long userId){
log.info("開始查詢用戶全部信息---串行!");
// 查詢用戶角色信息
getUserRole(userId);
// 查詢用戶菜單信息
getUserMenu(userId);
// 查詢用戶余額信息
getUserAmount(userId);
// 查詢用戶積分信息
getUserIntegral(userId);
log.info("封裝用戶信息返回給前端!");
}
假如查詢用戶角色,用戶菜單,用戶余額,用戶積分分別耗時(shí)500,200,200,100毫秒,則登錄接口耗時(shí)為1秒。如果采用異步(多線程并行)形式,則登錄接口耗時(shí)以單個(gè)查詢最慢的任務(wù)為主,為查詢用戶角色信息500毫秒。相當(dāng)于登錄接口性能提升一倍!查詢?nèi)蝿?wù)越多,則其性能提升越大!
代碼演示(串行):
@Test
public void login() throws InterruptedException {
long startTime = System.currentTimeMillis();
log.info("開始查詢用戶全部信息!");
log.info("開始查詢用戶角色信息!");
Thread.sleep(500);
String role = "管理員";
log.info("開始查詢用戶菜單信息!");
Thread.sleep(200);
String menu = "首頁,賬戶管理,積分管理";
log.info("開始查詢查詢用戶余額信息!");
Thread.sleep(200);
Integer amount = 1999;
log.info("開始查詢查詢查詢用戶積分信息!");
Thread.sleep(100);
Integer integral = 1015;
log.info("封裝用戶信息返回給前端!");
log.info("查詢用戶全部信息總耗時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
}
結(jié)果:
代碼演示(異步):
@Test
public void asyncLogin() {
long startTime = System.currentTimeMillis();
log.info("開始查詢用戶角色信息!");
CompletableFuture<Map<String, Object>> roleFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<String, Object> roleMap = new HashMap<String, Object>();
roleMap.put("role", "管理員");
long endTime = System.currentTimeMillis();
log.info("查詢用戶角色信息耗時(shí):" + (endTime - startTime) + "毫秒");
return roleMap;
});
log.info("開始查詢用戶菜單信息!");
CompletableFuture<Map<String, Object>> menuFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<String, Object> menuMap = new HashMap<String, Object>();
menuMap.put("menu", "首頁,賬戶管理,積分管理");
long endTime = System.currentTimeMillis();
log.info("查詢用戶菜單信息耗時(shí):" + (endTime - startTime) + "毫秒");
return menuMap;
});
log.info("開始查詢用戶余額信息!");
CompletableFuture<Map<String, Object>> amountFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<String, Object> amountMap = new HashMap<String, Object>();
amountMap.put("amount", 1999);
long endTime = System.currentTimeMillis();
log.info("查詢用戶余額信息耗時(shí):" + (endTime - startTime) + "毫秒");
return amountMap;
});
log.info("開始查詢用戶積分信息!");
CompletableFuture<Map<String, Object>> integralFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<String, Object> integralMap = new HashMap<String, Object>();
integralMap.put("integral", 1015);
long endTime = System.currentTimeMillis();
log.info("查詢用戶積分信息耗時(shí):" + (endTime - startTime) + "毫秒");
return integralMap;
});
roleFuture.join();
menuFuture.join();
amountFuture.join();
integralFuture.join();
log.info("查詢用戶全部信息總耗時(shí):" + (System.currentTimeMillis() - startTime) + "毫秒");
}
結(jié)果:
直觀的可以看出,異步執(zhí)行的優(yōu)勢!
回顧Future
Future是什么?
-
Java 1.5中引入Callable解決多線程執(zhí)行無返回值的問題。 -
Future是為了配合Callable/Runnable而產(chǎn)生的。簡單來講,我們可以通過future來對任務(wù)查詢、取消、執(zhí)行結(jié)果的獲取,是調(diào)用方與異步執(zhí)行方之間溝通的橋梁。 -
FutureTask實(shí)現(xiàn)了RunnableFuture接口,同時(shí)具有Runnable、Future的能力,即既可以作為Future得到Callable的返回值,又可以作為一個(gè)Runnable。 -
CompletableFuture實(shí)現(xiàn)了Futrue接口。 -
Future是Java5新加的一個(gè)接口,它提供了一種異步并行計(jì)算的功能。如果主線程需要執(zhí)行一個(gè)很耗時(shí)的計(jì)算任務(wù),我們可以將這個(gè)任務(wù)通過Future放到異步線程中去執(zhí)行。主線程繼續(xù)處理其他任務(wù),處理完成后,再通過Future獲取計(jì)算結(jié)果。 -
Future可以在連續(xù)流程中滿足數(shù)據(jù)驅(qū)動(dòng)的并發(fā)需求,既獲得了并發(fā)執(zhí)行的性能提升,又不失連續(xù)流程的簡潔優(yōu)雅。
代碼演示(不使用自定義線程池):
@Test
public void callable() throws ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
Callable amountCall = new Callable() {
@Override
public Object call() throws Exception {
long startTime = System.currentTimeMillis();
Thread.sleep(6000);
Map<String, Object> amountMap = new HashMap<String, Object>();
amountMap.put("amount", 99);
long endTime = System.currentTimeMillis();
log.info("查詢金額信息耗時(shí):" + (endTime - startTime) / 1000 + "秒");
return amountMap;
}
};
FutureTask<Map> amountFuture = new FutureTask<>(amountCall);
new Thread(amountFuture).start();
Callable roleCall = new Callable() {
@Override
public Object call() throws Exception {
long startTime = System.currentTimeMillis();
Thread.sleep(5000);
Map<String, String> roleMap = new HashMap<String, String>();
roleMap.put("name", "管理員");
long endTime = System.currentTimeMillis();
log.info("查詢角色信息耗時(shí):" + (endTime - startTime) / 1000 + "秒");
return roleMap;
}
};
FutureTask<Map> roleFuture = new FutureTask<>(roleCall);
new Thread(roleFuture).start();
log.info("金額查詢結(jié)果為:" + amountFuture.get());
log.info("角色查詢結(jié)果為:" + roleFuture.get());
long endTime = System.currentTimeMillis();
log.info("總耗時(shí):" + (endTime - startTime) / 1000 + "秒");
}
“
這里要注意:Future對于結(jié)果的獲取,不是很友好,只能通過阻塞或者輪詢的方式得到任務(wù)的結(jié)果。
Future.get()就是阻塞調(diào)用,在線程獲取結(jié)果之前get方法會(huì)一直阻塞;Future提供了一個(gè)isDone方法,可以在程序中輪詢這個(gè)方法查詢執(zhí)行結(jié)果。
這里的 amountFuture.get()如果放到如下圖所示的位置,則amountFuture下面的線程將等amountFuture.get()完成后才能執(zhí)行,沒有執(zhí)行完,則一直阻塞。
結(jié)果:
代碼演示(使用自定義線程池):
@Test
public void executor() throws ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(2);
Callable amountCall = new Callable() {
@Override
public Object call() throws Exception {
long startTime = System.currentTimeMillis();
Thread.sleep(6000);
Map<String, Object> amountMap = new HashMap<String, Object>();
amountMap.put("amount", 99);
long endTime = System.currentTimeMillis();
log.info("查詢金額信息耗時(shí):" + (endTime - startTime) / 1000 + "秒");
return amountMap;
}
};
Callable roleCall = new Callable() {
@Override
public Object call() throws Exception {
long startTime = System.currentTimeMillis();
Thread.sleep(5000);
Map<String, String> roleMap = new HashMap<String, String>();
roleMap.put("name", "管理員");
long endTime = System.currentTimeMillis();
log.info("查詢用戶角色信息耗時(shí):" + (endTime - startTime) / 1000 + "秒");
return roleMap;
}
};
Future amountFuture = executor.submit(amountCall);
Future roleFuture = executor.submit(roleCall);
log.info("金額查詢結(jié)果為:" + amountFuture.get());
log.info("角色查詢結(jié)果為:" + roleFuture.get());
long endTime = System.currentTimeMillis();
log.info("總耗時(shí):" + (endTime - startTime) / 1000 + "秒");
}
結(jié)果:
CompletableFuture使用場景
創(chuàng)建異步任務(wù)
CompletableFuture創(chuàng)建異步任務(wù),一般有supplyAsync和runAsync兩個(gè)方法:
-
supplyAsync執(zhí)行CompletableFuture任務(wù),支持返回值。 -
runAsync執(zhí)行CompletableFuture任務(wù),沒有返回值。
supplyAsync方法
//使用默認(rèn)內(nèi)置線程池ForkJoinPool.commonPool(),根據(jù)supplier構(gòu)建執(zhí)行任務(wù)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//自定義線程,根據(jù)supplier構(gòu)建執(zhí)行任務(wù)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
runAsync方法
//使用默認(rèn)內(nèi)置線程池ForkJoinPool.commonPool(),根據(jù)runnable構(gòu)建執(zhí)行任務(wù)
public static CompletableFuture<Void> runAsync(Runnable runnable)
//自定義線程,根據(jù)runnable構(gòu)建執(zhí)行任務(wù)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
代碼演示:
@Test
// supplyAsync執(zhí)行CompletableFuture任務(wù),支持返回值
public void defaultSupplyAsync() throws ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
// 構(gòu)建執(zhí)行任務(wù)
CompletableFuture<Map<String, Object>> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<String, Object> amountMap = new HashMap<String, Object>();
amountMap.put("amount", 99);
long endTime = System.currentTimeMillis();
log.info("查詢金額信息耗時(shí):" + (endTime - startTime) / 1000 + "秒");
return amountMap;
});
// 這行代碼在這里 則會(huì)進(jìn)行6秒的阻塞 下面代碼其他線程無法創(chuàng)建
// 只能等這個(gè)線程6秒過后結(jié)束才能創(chuàng)建其他線程
// Map<String, Object> userMap = userCompletableFuture.get();
CompletableFuture<Map<String, Object>> roleCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<String, Object> roleMap = new HashMap<String, Object>();
roleMap.put("name", "管理員");
return roleMap;
});
log.info("金額查詢結(jié)果為:" + amountCompletableFuture.join());
log.info("角色查詢結(jié)果為:" + roleCompletableFuture.join());
log.info("總耗時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
}
@Test
// supplyAsync執(zhí)行CompletableFuture任務(wù),支持返回值
public void customSupplyAsync() throws ExecutionException, InterruptedException {
// 自定義線程池
ExecutorService executorService = Executors.newCachedThreadPool();
long startTime = System.currentTimeMillis();
CompletableFuture<Map<String, Object>> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<String, Object> amountMap = new HashMap<String, Object>();
amountMap.put("amount", 99);
long endTime = System.currentTimeMillis();
log.info("查詢金額信息耗時(shí):" + (endTime - startTime) / 1000 + "秒");
return amountMap;
}, executorService);
// 這行代碼在這里 則會(huì)進(jìn)行6秒的阻塞 下面代碼其他線程無法創(chuàng)建
// 只能等這個(gè)線程6秒過后結(jié)束才能創(chuàng)建其他線程
// Map<String, Object> userMap = userCompletableFuture.get();
CompletableFuture<Map<String, Object>> roleCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<String, Object> roleMap = new HashMap<String, Object>();
roleMap.put("name", "管理員");
return roleMap;
}, executorService);
log.info("金額查詢結(jié)果為:" + amountCompletableFuture.join());
log.info("角色查詢結(jié)果為:" + roleCompletableFuture.join());
// 線程池需要關(guān)閉
executorService.shutdown();
log.info("總耗時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
}
@Test
// runAsync執(zhí)行CompletableFuture任務(wù),沒有返回值
public void defaultRunAsync() {
long lordStartTime = System.currentTimeMillis();
CompletableFuture<Void> amountCompletableFuture = CompletableFuture.runAsync(() -> {
long startTime = System.currentTimeMillis();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("執(zhí)行金額增刪改操作用時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
});
CompletableFuture<Void> roleCompletableFuture = CompletableFuture.runAsync(() -> {
long startTime = System.currentTimeMillis();
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("執(zhí)行角色增刪改操作用時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
});
log.info("金額查詢結(jié)果為:" + amountCompletableFuture.join());
log.info("角色查詢結(jié)果為:" + roleCompletableFuture.join());
log.info("總耗時(shí):" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
}
@Test
// runAsync執(zhí)行CompletableFuture任務(wù),沒有返回值
public void customRunAsync() {
long lordStartTime = System.currentTimeMillis();
ExecutorService executor = Executors.newCachedThreadPool();
CompletableFuture<Void> amountCompletableFuture = CompletableFuture.runAsync(() -> {
long startTime = System.currentTimeMillis();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("執(zhí)行金額增刪改操作用時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
}, executor);
CompletableFuture<Void> roleCompletableFuture = CompletableFuture.runAsync(() -> {
long startTime = System.currentTimeMillis();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("執(zhí)行角色增刪改操作用時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
}, executor);
log.info("金額查詢結(jié)果為:" + amountCompletableFuture.join());
log.info("角色查詢結(jié)果為:" + roleCompletableFuture.join());
// 關(guān)閉線程池
executor.shutdown();
log.info("總耗時(shí):" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
}
注意:這里的get()與join()都是獲取任務(wù)線程的返回值。join()方法拋出的是uncheck異常(即RuntimeException),不會(huì)強(qiáng)制開發(fā)者拋出, 會(huì)將異常包裝成CompletionException異常 /CancellationException異常,但是本質(zhì)原因還是代碼內(nèi)存在的真正的異常;
get()方法拋出的是經(jīng)過檢查的異常,ExecutionException, InterruptedException 需要用戶手動(dòng)處理(拋出或者 try catch)。
異步任務(wù)回調(diào)
thenRun / thenRunAsync
CompletableFuture的thenRun方法,通俗點(diǎn)講就是,做完第一個(gè)任務(wù)后,再做第二個(gè)任務(wù)。某個(gè)任務(wù)執(zhí)行完成后,執(zhí)行回調(diào)方法;但是前后兩個(gè)任務(wù)沒有參數(shù)傳遞,第二個(gè)任務(wù)也沒有返回值。
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
thenRun / thenRunAsync的區(qū)別? 源碼解釋:
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
如果你執(zhí)行第一個(gè)任務(wù)的時(shí)候,傳入了一個(gè)自定義線程池:
-
調(diào)用thenRun方法執(zhí)行第二個(gè)任務(wù)時(shí),則第二個(gè)任務(wù)和第一個(gè)任務(wù)是共用同一個(gè)線程池。 -
調(diào)用thenRunAsync執(zhí)行第二個(gè)任務(wù)時(shí),則第一個(gè)任務(wù)使用的是你自己傳入的線程池,第二個(gè)任務(wù)使用的是ForkJoin線程池。
后面介紹的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它們之間的區(qū)別也是這個(gè)哈!
代碼演示:
@Test
// 執(zhí)行第一個(gè)任務(wù)后 可以繼續(xù)執(zhí)行第二個(gè)任務(wù) 兩個(gè)任務(wù)之間無傳參 無返回值
public void defaultThenRun() throws ExecutionException, InterruptedException {
long lordStartTime = System.currentTimeMillis();
CompletableFuture<Void> amountCompletableFuture = CompletableFuture.runAsync(() -> {
long startTime = System.currentTimeMillis();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("執(zhí)行金額增刪改操作用時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
});
CompletableFuture<Void> thenCompletableFuture = amountCompletableFuture.thenRun(() -> {
long startTime = System.currentTimeMillis();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("執(zhí)行角色增刪改操作用時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
});
thenCompletableFuture.get();
log.info("總耗時(shí):" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
}
結(jié)果:
thenAccept / thenAcceptAsync
CompletableFuture的thenAccept方法表示,第一個(gè)任務(wù)執(zhí)行完成后,執(zhí)行第二個(gè)回調(diào)方法任務(wù),會(huì)將該任務(wù)的執(zhí)行結(jié)果,作為入?yún)ⅲ瑐鬟f到回調(diào)方法中,但是回調(diào)方法是沒有返回值的。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
代碼演示:
@Test
// 執(zhí)行第一個(gè)任務(wù)后 可以繼續(xù)執(zhí)行第二個(gè)任務(wù) 并攜帶第一個(gè)任務(wù)的返回值 第二個(gè)任務(wù)執(zhí)行完沒有返回值
public void defaultThenAccept() throws ExecutionException, InterruptedException {
long lordStartTime = System.currentTimeMillis();
CompletableFuture<Map> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
long startTime = System.currentTimeMillis();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<String, Object> amountMap = new HashMap<String, Object>();
amountMap.put("amount", 90);
log.info("執(zhí)行金額查詢操作用時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
return amountMap;
});
CompletableFuture<Void> thenCompletableFuture = amountCompletableFuture.thenAccept((map) -> {
long startTime = System.currentTimeMillis();
if (Integer.parseInt(map.get("amount").toString()) > 90) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("金額充足,可以購買!:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
} else {
log.info("金額不足,無法購買!:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
}
});
thenCompletableFuture.get();
log.info("總耗時(shí):" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
}
結(jié)果:
thenApply / thenApplyAsync
CompletableFuture的thenApply方法表示,第一個(gè)任務(wù)執(zhí)行完成后,執(zhí)行第二個(gè)回調(diào)方法任務(wù),會(huì)將該任務(wù)的執(zhí)行結(jié)果,作為入?yún)ⅲ瑐鬟f到回調(diào)方法中,并且回調(diào)方法是有返回值的。
public <U> CompletableFuture<U> thenApplyAsync();
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
代碼演示:
@Test
// 執(zhí)行第一個(gè)任務(wù)后 可以繼續(xù)執(zhí)行第二個(gè)任務(wù) 并攜帶第一個(gè)任務(wù)的返回值 第二個(gè)任務(wù)執(zhí)行完有返回值
public void defaultThenApply() throws ExecutionException, InterruptedException {
long lordStartTime = System.currentTimeMillis();
CompletableFuture<Map> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
long startTime = System.currentTimeMillis();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<String, Object> amountMap = new HashMap<String, Object>();
amountMap.put("amount", 90);
log.info("執(zhí)行金額查詢操作用時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
return amountMap;
});
CompletableFuture<Integer> thenCompletableFuture = amountCompletableFuture.thenApply((map) -> {
int number = 0;
if (Integer.parseInt(map.get("amount").toString()) > 3) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 可口可樂3元一瓶 看金額一共能購買多少瓶
number = Integer.parseInt(map.get("amount").toString()) / 3;
}
return number;
});
log.info("當(dāng)前金額一共可以買" + thenCompletableFuture.get() + "瓶可口可樂!");
Integer integer = thenCompletableFuture.get();
log.info("總耗時(shí):" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
}
結(jié)果:
exceptionally
CompletableFuture的exceptionally方法表示,某個(gè)任務(wù)執(zhí)行異常時(shí),執(zhí)行的回調(diào)方法;并且有拋出異常作為參數(shù),傳遞到回調(diào)方法。
public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(fn);
}
代碼演示:
@Test
// 某個(gè)任務(wù)執(zhí)行異常時(shí),執(zhí)行的回調(diào)方法;并且有拋出異常作為參數(shù),傳遞到回調(diào)方法。
public void exceptionally() throws ExecutionException, InterruptedException {
long lordStartTime = System.currentTimeMillis();
CompletableFuture<Map> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
long startTime = System.currentTimeMillis();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<String, Object> amountMap = new HashMap<String, Object>();
amountMap.put("amount", 90);
log.info("執(zhí)行金額查詢操作用時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
return amountMap;
});
CompletableFuture<Integer> thenCompletableFuture = amountCompletableFuture.thenApply((map) -> {
int number = 0;
if (Integer.parseInt(map.get("amount").toString()) > 3) {
try {
Thread.sleep(1000);
// 可口可樂3元一瓶 看金額一共能購買多少瓶
number = Integer.parseInt(map.get("amount").toString()) / 0;
} catch (ArithmeticException | InterruptedException e) {
e.printStackTrace();
throw new ArithmeticException(); // 這里一定要將異常拋除了,不然exceptionally無效
}
}
return number;
});
CompletableFuture<Integer> exceptionFuture = thenCompletableFuture.exceptionally((e) -> {
log.error("除數(shù)為0,則默認(rèn)商為0!");
return 0;
});
log.info("當(dāng)前金額一共可以買" + thenCompletableFuture.get() + "瓶可口可樂!");
exceptionFuture.get();
log.info("總耗時(shí):" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
}
“
注意:這里的異常一定要拋出來,不然exceptionally無效!
whenComplete
CompletableFuture的whenComplete方法表示,某個(gè)任務(wù)執(zhí)行完成后,執(zhí)行的回調(diào)方法,無返回值;并且whenComplete方法返回的CompletableFuture的result是上個(gè)任務(wù)的結(jié)果。
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
代碼演示:
@Test
// 某個(gè)任務(wù)執(zhí)行完成后,執(zhí)行的回調(diào)方法,無返回值;并且whenComplete方法返回的CompletableFuture的result是上個(gè)任務(wù)的結(jié)果。
public void whenComplete() {
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
return "周杰倫";
});
CompletableFuture<String> stringCompletableFuture1 = stringCompletableFuture.whenComplete((a, throwable) -> {
log.info("周杰倫喜歡唱");
});
log.info("輸出結(jié)果為第一個(gè)任務(wù):" + stringCompletableFuture1.join());
}
結(jié)果:
handle
CompletableFuture的handle方法表示,某個(gè)QQ賬號買號平臺地圖任務(wù)執(zhí)行完成后,執(zhí)行回調(diào)方法,并且是有返回值的;并且handle方法返回的CompletableFuture的result是回調(diào)方法執(zhí)行的結(jié)果。
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
代碼演示:
@Test
// 某個(gè)任務(wù)執(zhí)行完成后,執(zhí)行的回調(diào)方法,有返回值;并且handle方法返回的CompletableFuture的result是第二個(gè)任務(wù)的結(jié)果。
public void handle() {
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
return "周杰倫";
});
CompletableFuture<String> stringCompletableFuture1 = stringCompletableFuture.handle((a, throwable) -> {
return "周杰倫喜歡唱歌!";
});
log.info("輸出結(jié)果為第二個(gè)任務(wù):" + stringCompletableFuture1.join());
}
結(jié)果:
多個(gè)任務(wù)組合處理
AND組合關(guān)系
thenCombine / thenAcceptBoth / runAfterBoth都表示:將兩個(gè)CompletableFuture組合起來,只有這兩個(gè)都正常執(zhí)行完了,才會(huì)執(zhí)行某個(gè)任務(wù)。
-
thenCombine:會(huì)將兩個(gè)任務(wù)的執(zhí)行結(jié)果作為方法入?yún)ⅲ瑐鬟f到指定方法中,且有返回值。 -
thenAcceptBoth: 會(huì)將兩個(gè)任務(wù)的執(zhí)行結(jié)果作為方法入?yún)ⅲ瑐鬟f到指定方法中,且無返回值。 -
runAfterBoth 不會(huì)把執(zhí)行結(jié)果當(dāng)做方法入?yún)ⅲ覜]有返回值。
代碼演示:
@Test
public void thenCombine() {
CompletableFuture<Integer> first = CompletableFuture.supplyAsync(() -> {
return 7;
});
CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 2).thenCombine(first, Integer::sum);
log.info("結(jié)果為:" + second.join());
}
結(jié)果為:
OR組合關(guān)系
applyToEither / acceptEither / runAfterEither 都表示:將兩個(gè)CompletableFuture組合起來,只要其中一個(gè)執(zhí)行完了,就會(huì)執(zhí)行某個(gè)任務(wù)。
-
applyToEither:會(huì)將已經(jīng)執(zhí)行完成的任務(wù),作為方法入?yún)ⅲ瑐鬟f到指定方法中,且有返回值。 -
acceptEither: 會(huì)將已經(jīng)執(zhí)行完成的任務(wù),作為方法入?yún)ⅲ瑐鬟f到指定方法中,且無返回值。 -
runAfterEither:不會(huì)把執(zhí)行結(jié)果當(dāng)做方法入?yún)ⅲ覜]有返回值。
代碼演示:
@Test
public void applyToEither1() {
log.info("魏凱下班準(zhǔn)備回家。。。");
log.info("魏凱等待2號,4號地鐵。。。");
CompletableFuture<String> busCF = CompletableFuture.supplyAsync(() -> {
log.info("2號在路上。。。");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "2";
}).applyToEither(CompletableFuture.supplyAsync(() -> {
log.info("4號地鐵在路上。。。");
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "4";
}), first -> first + "號");
log.info("魏凱坐上" + busCF.join() + "地鐵");
}
@Test
// OR
public void applyToEither() {
CompletableFuture<Integer> first = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 7;
});
CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 7;
}).applyToEither(first, num -> num);
log.info("最后結(jié)果為:" + second.join());
}
結(jié)果演示:
AllOf
所有任務(wù)都執(zhí)行完成后,才執(zhí)行 allOf返回的CompletableFuture。如果任意一個(gè)任務(wù)異常,allOf的CompletableFuture,執(zhí)行g(shù)et方法,會(huì)拋出異常。
代碼演示:
@Test
// 所有任務(wù)都執(zhí)行完成后,才執(zhí)行 allOf返回的CompletableFuture。如果任意一個(gè)任務(wù)異常,allOf的CompletableFuture,執(zhí)行g(shù)et方法,會(huì)拋出異常。
// 這里第一次執(zhí)行沒有睡眠的話,是可以直接執(zhí)行第三個(gè)任務(wù)的。如果有睡眠,則需要手動(dòng)join啟動(dòng)。
public void allOf() {
CompletableFuture<Void> first = CompletableFuture.runAsync(() -> {
// try {
// Thread.sleep(2000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
log.info("第一個(gè)任務(wù)執(zhí)行完成!");
});
CompletableFuture<Void> second = CompletableFuture.runAsync(() -> {
// try {
// Thread.sleep(500);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
log.info("第二個(gè)任務(wù)執(zhí)行完成!");
});
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(first, second).whenComplete((m, n) -> {
log.info("第三個(gè)任務(wù)完成!");
});
// voidCompletableFuture.join();
}
結(jié)果:
“
注意:這里第一次啟動(dòng)執(zhí)行沒有睡眠的話,是可以直接執(zhí)行第三個(gè)任務(wù)的,因?yàn)檫@兩個(gè)任務(wù)都執(zhí)行完成,啟動(dòng)的瞬間第三個(gè)也同時(shí)執(zhí)行完。如果有睡眠,則需要手動(dòng)join啟動(dòng),等待最長睡眠任務(wù)時(shí)間過后,第三個(gè)任務(wù)完成!
AnyOf
任意一個(gè)任務(wù)執(zhí)行完,就執(zhí)行anyOf返回的CompletableFuture。如果執(zhí)行的任務(wù)異常,anyOf的CompletableFuture,執(zhí)行g(shù)et方法,會(huì)拋出異常。
代碼演示:
@Test
// 前提任務(wù)任意執(zhí)行完一個(gè),則目標(biāo)任務(wù)執(zhí)行。其他前提任務(wù)則不在執(zhí)行。
// 任意一個(gè)任務(wù)執(zhí)行完,就執(zhí)行anyOf返回的CompletableFuture。如果執(zhí)行的任務(wù)異常,anyOf的CompletableFuture,執(zhí)行g(shù)et方法,會(huì)拋出異常。
public void anyOf() {
CompletableFuture<Void> first = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("第一個(gè)任務(wù)執(zhí)行完成!");
});
CompletableFuture<Void> second = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("第二個(gè)任務(wù)執(zhí)行完成!");
});
CompletableFuture<Object> voidCompletableFuture = CompletableFuture.anyOf(first, second).whenComplete((m, n) -> {
log.info("第三個(gè)任務(wù)完成!");
});
voidCompletableFuture.join();
}
結(jié)果:
thenCompose
thenCompose方法會(huì)在某個(gè)任務(wù)執(zhí)行完成后,將該任務(wù)的執(zhí)行結(jié)果,作為方法入?yún)?去執(zhí)行指定的方法。該方法會(huì)返回一個(gè)新的CompletableFuture實(shí)例。
-
如果該CompletableFuture實(shí)例的result不為null,則返回一個(gè)基于該result新的CompletableFuture實(shí)例。 -
如果該CompletableFuture實(shí)例為null,然后就執(zhí)行這個(gè)新任務(wù)。
代碼演示:
@Test
public void thenCompose1() {
CompletableFuture<Integer> stringCompletableFuture = CompletableFuture.supplyAsync(() -> 4)
.thenCompose(value -> CompletableFuture.supplyAsync(() -> {
// thenCompose方法返回一個(gè)新的CompletableFuture
if (Integer.valueOf(4).equals(value)) {
return 66;
} else {
return 99;
}
}));
log.info("結(jié)果:" + stringCompletableFuture.join());
}
@Test
public void thenCompose() {
CompletableFuture<String> first = CompletableFuture.completedFuture("第一個(gè)任務(wù)");
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> 4)
.thenCompose((data) -> {
log.info("data為:" + data);
return first;
});
log.info("結(jié)果:" + stringCompletableFuture.join());
}
結(jié)果:
CompletableFuture注意點(diǎn)
Future需要獲取返回值,才能獲取異常信息
@Test
public void futureTest(){
ExecutorService executor = Executors.newFixedThreadPool(2);
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
int m = 9;
int n = 0;
return m / n;
},executor);
// integerCompletableFuture.join(); // 這行代碼不加,則不會(huì)拋出異常
}
Future需要獲取返回值,才能獲取到異常信息。如果不加 get()/join()方法,看不到異常信息。小伙伴們使用的時(shí)候,注意一下哈,考慮是否加try…catch…或者使用exceptionally方法。
CompletableFuture的get()方法是阻塞的
//反例
CompletableFuture.get();
//正例
CompletableFuture.get(9, TimeUnit.SECONDS);
CompletableFuture的get()方法是阻塞的,如果使用它來獲取異步調(diào)用的返回值,需要添加超時(shí)時(shí)間
默認(rèn)線程池的注意點(diǎn)
CompletableFuture代碼中又使用了默認(rèn)的線程池,處理的線程個(gè)數(shù)是電腦CPU核數(shù)-1。在大量請求過來的時(shí)候,處理邏輯復(fù)雜的話,響應(yīng)會(huì)很慢。一般建議使用自定義線程池,優(yōu)化線程池配置參數(shù)。
自定義線程池時(shí),注意飽和策略
CompletableFuture的get()方法是阻塞的,我們一般建議使用future.get(3, TimeUnit.SECONDS)。并且一般建議使用自定義線程池。但是如果線程池拒絕策略是DiscardPolicy或者DiscardOldestPolicy,當(dāng)線程池飽和時(shí),會(huì)直接丟棄任務(wù),不會(huì)拋棄異常。
因此建議,CompletableFuture線程池策略最好使用AbortPolicy,然后耗時(shí)的異步線程,做好線程池隔離!
?? 歡迎加入小哈的星球 ,你將獲得: 專屬的項(xiàng)目實(shí)戰(zhàn) / Java 學(xué)習(xí)路線 / 一對一提問 / 學(xué)習(xí)打卡 / 贈(zèng)書福利
全棧前后端分離博客項(xiàng)目 2.0 版本完結(jié)啦, 演示鏈接:http://116.62.199.48/ ,新項(xiàng)目正在醞釀中。全程手摸手,后端 + 前端全棧開發(fā),從 0 到 1 講解每個(gè)功能點(diǎn)開發(fā)步驟,1v1 答疑,直到項(xiàng)目上線。目前已更新了239小節(jié),累計(jì)38w+字,講解圖:1645張,還在持續(xù)爆肝中.. 后續(xù)還會(huì)上新更多項(xiàng)目,目標(biāo)是將Java領(lǐng)域典型的項(xiàng)目都整一波,如秒殺系統(tǒng), 在線商城, IM即時(shí)通訊,Spring Cloud Alibaba 等等,戳我加入學(xué)習(xí),已有1300+小伙伴加入(早鳥價(jià)超低)
最近面試BAT,整理一份面試資料《Java面試BATJ通關(guān)手冊》,覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫、數(shù)據(jù)結(jié)構(gòu)等等。
獲取方式:點(diǎn)“在看”,關(guān)注公眾號并回復(fù) Java 領(lǐng)取,更多內(nèi)容陸續(xù)奉上。
PS:因公眾號平臺更改了推送規(guī)則,如果不想錯(cuò)過內(nèi)容,記得讀完點(diǎn)一下“在看”,加個(gè)“星標(biāo)”,這樣每次新文章推送才會(huì)第一時(shí)間出現(xiàn)在你的訂閱列表里。
點(diǎn)“在看”支持小哈呀,謝謝啦
