Java8已經(jīng)發(fā)布7年了,不會還有人沒用過CompletableFuture吧
日常開發(fā)中,我們都會用到線程池,一般會用execute()和submit()方法提交任務(wù)。但是當(dāng)你用過CompletableFuture之后,就會發(fā)現(xiàn)以前的線程池處理任務(wù)有多難用,功能有多簡陋,CompletableFuture又是多么簡潔優(yōu)雅。
要知道CompletableFuture已經(jīng)隨著Java8發(fā)布7年了,還沒有過它就有點(diǎn)說不過去了。
今天5分鐘帶你深入淺出CompletableFuture實(shí)用教程。
1. 使用線程池處理任務(wù)
/**
* @author yideng
* @apiNote 線程池使用示例
*/
public class ThreadDemo {
public static void main(String[] args) {
// 1. 創(chuàng)建線程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Integer> list = Arrays.asList(1, 2, 3);
List<Future<String>> futures = new ArrayList<>();
for (Integer key : list) {
// 2. 提交任務(wù)
Future<String> future = executorService.submit(() -> {
// 睡眠一秒,模仿處理過程
Thread.sleep(1000L);
return "結(jié)果" + key;
});
futures.add(future);
}
// 3. 獲取結(jié)果
for (Future<String> future : futures) {
try {
String result = future.get();
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
executorService.shutdown();
}
}輸出結(jié)果:
結(jié)果1
結(jié)果2
結(jié)果3一般大家都會這樣使用線程池,但是有沒有思考過這樣使用有沒有什么問題?
反正我發(fā)現(xiàn)兩個比較嚴(yán)重的問題:
獲取結(jié)果時,調(diào)用的future.get()方法,會阻塞當(dāng)前線程,直到返回結(jié)果,大大降低性能 有一半的代碼在寫怎么使用線程,其實(shí)我們不應(yīng)該關(guān)心怎么使用線程,更應(yīng)該關(guān)注任務(wù)的處理
有沒有具體的優(yōu)化方案呢?當(dāng)然有了,請出來我們今天的主角CompletableFuture
2. 使用CompletableFuture重構(gòu)任務(wù)處理
看一下使用CompletableFuture改造后代碼:
/**
* @author yideng
* @apiNote CompletableFuture使用示例
*/
public class ThreadDemo {
public static void main(String[] args) {
// 1. 創(chuàng)建線程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Integer> list = Arrays.asList(1, 2, 3);
for (Integer key : list) {
// 2. 提交任務(wù)
CompletableFuture.supplyAsync(() -> {
// 睡眠一秒,模仿處理過程
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
return "結(jié)果" + key;
}, executorService).whenCompleteAsync((result, exception) -> {
// 3. 獲取結(jié)果
System.out.println(result);
});;
}
executorService.shutdown();
// 由于whenCompleteAsync獲取結(jié)果的方法是異步的,所以要阻塞當(dāng)前線程才能輸出結(jié)果
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}輸出結(jié)果:
結(jié)果1
結(jié)果2
結(jié)果3代碼中使用了CompletableFuture的兩個方法,
supplyAsync()方法作用是提交異步任務(wù),有兩個傳參,任務(wù)和自定義線程池。
whenCompleteAsync()方法作用是異步獲取結(jié)果,也有兩個傳參,結(jié)果和異常信息。
代碼經(jīng)過CompletableFuture改造后,是多么的簡潔優(yōu)雅。
提交任務(wù)也不用再關(guān)心線程池是怎么使用了,獲取結(jié)果也不用再阻塞當(dāng)前線程了。
如果你比較倔強(qiáng),還想同步獲取結(jié)果,可以使用whenComplete()方法,或者單獨(dú)調(diào)用join()方法。
join()方法配合Stream流是這樣用的:
/**
* @author yideng
* @apiNote CompletableFuture使用示例
*/
public class ThreadDemo {
public static void main(String[] args) {
// 1. 創(chuàng)建線程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Integer> list = Arrays.asList(1, 2, 3);
// 2. 提交任務(wù)
List<String> results = list.stream().map(key ->
CompletableFuture.supplyAsync(() -> {
// 睡眠一秒,模仿處理過程
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
return "結(jié)果" + key;
}, executorService))
.map(CompletableFuture::join).collect(Collectors.toList());
executorService.shutdown();
// 3. 獲取結(jié)果
System.out.println(results);
}
}輸出結(jié)果:
[結(jié)果1,結(jié)果2,結(jié)果3]多么的簡潔優(yōu)雅??!原來executorService.submit()這種使用線程池的方式,可以徹底丟掉了。
3. CompletableFuture更多妙用
3.1 等待所有任務(wù)執(zhí)行完成
如果讓你實(shí)現(xiàn)等待所有任務(wù)線程執(zhí)行完成,再進(jìn)行下一步操作,你會怎么做?
我猜你一定會使用 線程池+CountDownLatch,像下面這樣:
/**
* @author yideng
* @apiNote 線程池和CountDownLatch使用示例
*/
public class ThreadDemo {
public static void main(String[] args) {
// 1. 創(chuàng)建線程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Integer> list = Arrays.asList(1, 2, 3);
CountDownLatch countDownLatch = new CountDownLatch(list.size());
for (Integer key : list) {
// 2. 提交任務(wù)
executorService.execute(() -> {
// 睡眠一秒,模仿處理過程
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
System.out.println("結(jié)果" + key);
countDownLatch.countDown();
});
}
executorService.shutdown();
// 3. 阻塞等待所有任務(wù)執(zhí)行完成
try {
countDownLatch.await();
} catch (InterruptedException e) {
}
}
}輸出結(jié)果:
結(jié)果2
結(jié)果3
結(jié)果1Low不Low?十年前可以這樣寫,Java8都已經(jīng)發(fā)布7年了,你還不會用Java8的寫法?看一下使用CompletableFuture是怎么重構(gòu)的:
/**
* @author yideng
* @apiNote CompletableFuture.allOf()方法使用示例
*/
public class ThreadDemo {
public static void main(String[] args) {
// 1. 創(chuàng)建線程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Integer> list = Arrays.asList(1, 2, 3);
// 2. 提交任務(wù),并調(diào)用join()阻塞等待所有任務(wù)執(zhí)行完成
CompletableFuture
.allOf(
list.stream().map(key ->
CompletableFuture.runAsync(() -> {
// 睡眠一秒,模仿處理過程
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
System.out.println("結(jié)果" + key);
}, executorService))
.toArray(CompletableFuture[]::new))
.join();
executorService.shutdown();
}
}輸出結(jié)果:
結(jié)果3
結(jié)果1
結(jié)果2代碼看著有點(diǎn)亂,其實(shí)邏輯很清晰。
遍歷list集合,提交CompletableFuture任務(wù),把結(jié)果轉(zhuǎn)換成數(shù)組 再把數(shù)組放到CompletableFuture的allOf()方法里面 最后調(diào)用join()方法阻塞等待所有任務(wù)執(zhí)行完成
CompletableFuture的allOf()方法的作用就是,等待所有任務(wù)處理完成。
這樣寫是不是簡潔優(yōu)雅了許多?
3.2 任何一個任務(wù)處理完成就返回
如果要實(shí)現(xiàn)這樣一個需求,往線程池提交一批任務(wù),只要有其中一個任務(wù)處理完成就返回。
該怎么做?如果你手動實(shí)現(xiàn)這個邏輯的話,代碼肯定復(fù)雜且低效,有了CompletableFuture就非常簡單了,只需調(diào)用anyOf()方法就行了。
/**
* @author yideng
* @apiNote CompletableFuture.anyOf()方法使用示例
*/
public class ThreadDemo {
public static void main(String[] args) {
// 1. 創(chuàng)建線程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Integer> list = Arrays.asList(1, 2, 3);
long start = System.currentTimeMillis();
// 2. 提交任務(wù)
CompletableFuture<Object> completableFuture = CompletableFuture
.anyOf(
list.stream().map(key ->
CompletableFuture.supplyAsync(() -> {
// 睡眠一秒,模仿處理過程
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
return "結(jié)果" + key;
}, executorService))
.toArray(CompletableFuture[]::new));
executorService.shutdown();
// 3. 獲取結(jié)果
System.out.println(completableFuture.join());
}
}輸出結(jié)果:
結(jié)果3一切都是那么簡單優(yōu)雅。
3.3 一個線程執(zhí)行完成,交給另一個線程接著執(zhí)行
有這么一個需求:
一個線程處理完成,把處理的結(jié)果交給另一個線程繼續(xù)處理,怎么實(shí)現(xiàn)?
你是不是想到了一堆工具,線程池、CountDownLatch、Semaphore、ReentrantLock、Synchronized,該怎么進(jìn)行組合使用呢?AB組合還是BC組合?
別瞎想了,你寫的肯定沒有CompletableFuture好用,看一下CompletableFuture是怎么用的:
/**
* @author yideng
* @apiNote CompletableFuture線程接力處理示例
*/
public class ThreadDemo {
public static void main(String[] args) {
// 1. 創(chuàng)建線程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 2. 提交任務(wù),并調(diào)用join()阻塞等待任務(wù)執(zhí)行完成
String result2 = CompletableFuture.supplyAsync(() -> {
// 睡眠一秒,模仿處理過程
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
return "結(jié)果1";
}, executorService).thenApplyAsync(result1 -> {
// 睡眠一秒,模仿處理過程
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
return result1 + "結(jié)果2";
}, executorService).join();
executorService.shutdown();
// 3. 獲取結(jié)果
System.out.println(result2);
}
}輸出結(jié)果:
結(jié)果1結(jié)果2代碼主要用到了CompletableFuture的thenApplyAsync()方法,作用就是異步處理上一個線程的結(jié)果。
是不是太方便了?
這么好用的CompletableFuture還有沒有其他功能?當(dāng)然有。
4. CompletableFuture常用API
4.1 CompletableFuture常用API說明
提交任務(wù)
supplyAsync
runAsync接力處理
thenRun thenRunAsync
thenAccept thenAcceptAsync
thenApply thenApplyAsync
handle handleAsync
applyToEither applyToEitherAsync
acceptEither acceptEitherAsync
runAfterEither runAfterEitherAsync
thenCombine thenCombineAsync
thenAcceptBoth thenAcceptBothAsync
API太多,有點(diǎn)眼花繚亂,很容易分類。
帶run的方法,無入?yún)?,無返回值。
帶accept的方法,有入?yún)?,無返回值。
帶supply的方法,無入?yún)?,有返回值?br>帶apply的方法,有入?yún)?,有返回值?br>帶handle的方法,有入?yún)?,有返回值,并且?guī)М惓L幚怼?br>以Async結(jié)尾的方法,都是異步的,否則是同步的。
以Either結(jié)尾的方法,只需完成任意一個。
以Both/Combine結(jié)尾的方法,必須所有都完成。
獲取結(jié)果
join 阻塞等待,不會拋異常
get 阻塞等待,會拋異常
complete(T value) 不阻塞,如果任務(wù)已完成,返回處理結(jié)果。如果沒完成,則返回傳參value。
completeExceptionally(Throwable ex) 不阻塞,如果任務(wù)已完成,返回處理結(jié)果。如果沒完成,拋異常。
4. CompletableFuture常用API使用示例
用最常見的煮飯來舉例:
4.1 then、handle方法使用示例
/**
* @author yideng
* @apiNote then、handle方法使用示例
*/
public class ThreadDemo {
public static void main(String[] args) {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("1. 開始淘米");
return "2. 淘米完成";
}).thenApplyAsync(result -> {
System.out.println(result);
System.out.println("3. 開始煮飯");
// 生成一個1~10的隨機(jī)數(shù)
if (RandomUtils.nextInt(1, 10) > 5) {
throw new RuntimeException("4. 電飯煲壞了,煮不了");
}
return "4. 煮飯完成";
}).handleAsync((result, exception) -> {
if (exception != null) {
System.out.println(exception.getMessage());
return "5. 今天沒飯吃";
} else {
System.out.println(result);
return "5. 開始吃飯";
}
});
try {
String result = completableFuture.get();
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
}輸出結(jié)果可能是:
1. 開始淘米
2. 淘米完成
3. 開始煮飯
4. 煮飯完成
5. 開始吃飯也可能是:
1. 開始淘米
2. 淘米完成
3. 開始煮飯
java.lang.RuntimeException: 4. 電飯煲壞了,煮不了
5. 今天沒飯吃4.2 complete方法使用示例
/**
* @author yideng
* @apiNote complete使用示例
*/
public class ThreadDemo {
public static void main(String[] args) {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
return "飯做好了";
});
//try {
// Thread.sleep(1L);
//} catch (InterruptedException e) {
//}
completableFuture.complete("飯還沒做好,我點(diǎn)外賣了");
System.out.println(completableFuture.join());
}
}輸出結(jié)果:
飯還沒做好,我點(diǎn)外賣了如果把注釋的sleep()方法放開,輸出結(jié)果就是:
飯做好了4.3 either方法使用示例
/**
* @author yideng
* @apiNote either方法使用示例
*/
public class ThreadDemo {
public static void main(String[] args) {
CompletableFuture<String> meal = CompletableFuture.supplyAsync(() -> {
return "飯做好了";
});
CompletableFuture<String> outMeal = CompletableFuture.supplyAsync(() -> {
return "外賣到了";
});
// 飯先做好,就吃飯。外賣先到,就吃外賣。就是這么任性。
CompletableFuture<String> completableFuture = meal.applyToEither(outMeal, myMeal -> {
return myMeal;
});
System.out.println(completableFuture.join());
}
}輸出結(jié)果可能是:
飯做好了也可能是:
外賣到了學(xué)會了嗎?開發(fā)中趕快用起來吧!
