強大的CompletableFuture
異步計算
異步調(diào)用其實就是實現(xiàn)一個可無需等待被調(diào)用函數(shù)的返回值而讓操作繼續(xù)運行的方法。打個比方,你拿了一袋子衣 服到你中意的干洗店去洗。干洗店的員工會給你張發(fā)票,告訴你什么時候你的衣服會洗好。同時,你可以去做其他的事情。洗衣服對于你就是一個異步的過程。
在開發(fā)中如何使用異步呢?
Future 接口
Future是JDK5新增的接口,用于描述一個異步計算的結(jié)果。它提供了檢查計算是否完成的方法,以等待計算的完成,并獲取計算的結(jié)果。計算完成后只能使用 get 方法來獲取結(jié)果。使用Future以異步的方式執(zhí)行一個耗時的操作,完成上面洗衣服的例子。
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1. 創(chuàng)建ExecutorService,通過它你可以向線程池提交任務(wù)
ExecutorService executor = Executors.newCachedThreadPool();
//2.向executorService提交一個callable對象
Future<String> future = executor.submit(()->{
try {
System.out.println("把衣服交給干洗店");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "洗完了";
});
//3.可以做其他事情了
//goShopping();
//4.獲取異步執(zhí)行的結(jié)果
System.out.println(future.get());
System.out.println("finish!");
}
}
CompletableFuture
為什么有了Future, JDK8之后又提供了CompletableFuture,因為Future有其局限性; 比如以下情況
將兩個異步計算合并為一個; 等待多個Future的所有任務(wù)都完成 僅等待Future集合中最快結(jié)束的任務(wù)完成,并返回它的結(jié)果 Future完成后,需要做一些事情。
CompletableFuture即實現(xiàn)了Future接口,又實現(xiàn)了CompletionStage接口, CompletionStage也是JDK8新增的接口,他給CompletableFuture提供了函數(shù)式編程的能力,通過他,我們可以在一個CompletableFuture結(jié)果上多次流式調(diào)用,得到最后的結(jié)果。
創(chuàng)建異步對象
CompletableFuture提供了runAsync, supplyAsync靜態(tài)方法創(chuàng)建一個異步操作。runAsync沒有返回值的場景,如執(zhí)行一個簡單的異步操作。supplyAsync可以獲得返回結(jié)果的場景,如計算某些數(shù)據(jù)返回等。同時,這兩種方法都可以通過傳入executor設(shè)置使用的線程池,如果不傳,則使用默認的ForkJoinPool.common線程池。
//無返回值 runAsync()
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
//有返回值supplyAsync()
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//異步調(diào)用,無返回值的情況
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
System.out.println(Thread.currentThread().getName());
});
completableFuture.get();
//異步調(diào)用,有返回值的情況
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
System.out.println("構(gòu)建任務(wù)");
return calc(15, 10);
});
System.out.println(future.get());
}
public static Integer calc(Integer a, Integer b) {
try {
//模擬長時間的任務(wù)
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return a / b;
}
}
ForkJoinPool.commonPool-worker-1
構(gòu)建任務(wù)
1
完成時回調(diào)
whenComplete
CompletableFuture提供了一種異步編排的功能,使用whenComplete ,當(dāng)Future任務(wù)完成后,調(diào)用一個回調(diào)。
//whenComplete 不帶Async,表示同步,與上一個Future用同一個線程執(zhí)行
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
//whenCompleteAsync,異步,表示任務(wù)執(zhí)行完后另起線程異步執(zhí)行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
//whenCompleteAsync,異步,表示任務(wù)執(zhí)行完后交給指定線程池異步執(zhí)行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable>action, Executor executor)
whenComplete可以處理正常和異常的計算結(jié)果,exceptionally處理異常情況。whenComplete是執(zhí)行當(dāng)前任務(wù)的線程繼續(xù)執(zhí)行whenComplete的任務(wù)。whenCompleteAsync是提交給其他線程池來執(zhí)行。
whenComplete的參數(shù)是一個BiConsumer, BiConsumer接收兩個參數(shù),一個是上次任務(wù)的結(jié)果,一個是上次任務(wù)的異常。所以可以獲取到任務(wù)的執(zhí)行結(jié)果和異常,同時,CompletableFuture可以對異常進行處理,當(dāng)發(fā)生異常時,給定一個默認返回。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
return 10/0;
}).whenComplete((res, exception)->{
System.out.println(res);
System.out.println(exception);
}).exceptionally(throwable -> 10);
System.out.println(future.get());
null
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
10
handle
handle方法和whenComplete大致相同。
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
不同在于whenComplete傳入的是BiConsumer, 而handle方法傳入的是BiFunction。懂了!就是handle方法有返回值,可以對CompletableFuture任務(wù)的執(zhí)行結(jié)果做處理,得到新結(jié)果返回,而whenComplete不能。
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(()->{
return 10/2;
}).handle((res, exception)->{
if (res!=null) {
//handle方法可以修改結(jié)果返回
return res * res;
}
return 0;
});
System.out.println(future2.get()); //get()返回結(jié)果25
任務(wù)串行
串行即一個任務(wù)完成后繼續(xù)執(zhí)行下一個任務(wù)。CompletableFuture提供了下面幾種方法。
thenRun/thenRunAsync
一個任務(wù)執(zhí)行完后開始執(zhí)行后面的任務(wù),我們可以看到傳入的參數(shù)是個Runnbale, 多以后面的任務(wù)不依賴前面的任務(wù)執(zhí)行結(jié)果。
// 與前任務(wù)是要同一線程
public CompletableFuture<Void> thenRun(Runnable action)
// 另起一線程執(zhí)行
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
例如
CompletableFuture.runAsync(()->{
System.out.println("task 1 finish!");
}).thenRun(()->{
System.out.println("task 2 finish!");
}).get();
thenAccept/thenAcceptAsync
thenAccept與runAsync的區(qū)別在于它的參數(shù)是一個Consumer,可以回去上一個任務(wù)的返回結(jié)果。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
CompletableFuture.supplyAsync(()->{
System.out.println("task 1 finish!");
return "Hello";
}).thenAccept((res)->{
System.out.println("this is task 2, task 1 return :"+res);
}).get();
----------Console print---------------
task 1 finish!
this is task 2, task 1 return :Hello
thenApply/thenApplyAsync
似曾相識。thenApply與thenAccept的區(qū)別,就是有返回值。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
String s = CompletableFuture.supplyAsync(() -> {
System.out.println("task 1 finish!");
return "Hello";
}).thenApply((res) -> {
System.out.println("this is task 2, task 1 return :" + res);
return res + " world";
}).get();
System.out.println(s);
----------Console print---------------
task 1 finish!
this is task 2, task 1 return :Hello
Hello world
兩任務(wù)組合
runAfterBoth/runAfterBothAsync
組合兩個CompletableFuture,執(zhí)行不需要依賴他們的結(jié)果,當(dāng)連個CompletableFuture都執(zhí)行完后,執(zhí)行action。
//套路都一樣,這里就只粘上自定義線程池的runAfterBothAsy了。
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
例如
CompletableFuture.runAsync(()->{
System.out.println("CompletableFuture 2.");
}).runAfterBoth(CompletableFuture.runAsync(()->{
System.out.println("CompletableFuture 1.");
}), ()->{
System.out.println("all finish!");
}).join();
thenAcceptBoth/thenAcceptBothAsync
組合兩個CompletableFuture, 獲取他們的返回結(jié)果,然后執(zhí)行action,無返回值。
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)
CompletableFuture.supplyAsync(()->{
System.out.println("CompletableFuture 1.");
return 1;
}).thenAcceptBoth(CompletableFuture.supplyAsync(()->{
System.out.println("CompletableFuture 2.");
return 2;
}), (f1, f2)->{
System.out.println("Result of CompletableFuture 1 is "+f1);
System.out.println("Result of CompletableFuture 2 is "+f2);
System.out.println("all finish!");
}).join();
----------------Console Print-----------------
CompletableFuture 1.
CompletableFuture 2.
Result of CompletableFuture 1 is 1
Result of CompletableFuture 2 is 2
all finish!
thenCombine/thenCombineAsync
組合兩個CompletableFuture, 獲取他們的返回結(jié)果,然后執(zhí)行action,有返回值。
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor)
看到這,我相信也不用在寫demo了...
runAfterEither/runAfterEitherAsync
組合兩個CompletableFuture,當(dāng)其中一個CompletableFuture都執(zhí)行完后,再執(zhí)行action。
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)
acceptEither/acceptEitherAsync
組合兩個CompletableFuture,當(dāng)其中一個CompletableFuture都執(zhí)行完后,可以接受他們的返回結(jié)果,再執(zhí)行action,無返回值。
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor)
applyToEither/applyToEitherAsync
組合兩個CompletableFuture,當(dāng)其中一個CompletableFuture都執(zhí)行完后,可以接受他們的返回結(jié)果,再執(zhí)行action,無返回值。
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor)
多任務(wù)
上面都是兩個任務(wù),如果需要多個怎么辦?
allOf是多個任務(wù)都完成后再執(zhí)行后面的操作;anyOf是任意一個任務(wù)完成,就可以執(zhí)行后面的操作。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
例如
CompletableFuture exportSheet1 = CompletableFuture.runAsync(()->{
System.out.println("export sheet1");
});
CompletableFuture exportSheet2 = CompletableFuture.runAsync(()->{
System.out.println("export sheet2");
});
CompletableFuture exportSheet3 = CompletableFuture.runAsync(()->{
System.out.println("export sheet3");
});
CompletableFuture.allOf(exportSheet1, exportSheet2, exportSheet3).join();
總結(jié)
CompletableFuture功能還是挺強大的,提供的方法眾多,我們這樣看起來,無外乎就是異步任務(wù)有無返回結(jié)果,任務(wù)需不需要拿到上個任務(wù)的返回結(jié)果,對需不需要返回結(jié)果再處理都返回。總結(jié)到這里,希望對大家有所幫助。
