20個使用 Java CompletableFuture的例子
閱讀本文大概需要 2.8 分鐘。
在Java中異步編程,不一定非要使用rxJava, Java本身的庫中的CompletableFuture可以很好的應(yīng)對大部分的場景。
CompletableFuture類實現(xiàn)了CompletionStage接口,首先我們需要理解這個接口的契約。它代表了一個特定的計算的階段,可以同步或者異步的被完成。你可以把它看成一個計算流水線上的一個單元,最終會產(chǎn)生一個最終結(jié)果,這意味著幾個CompletionStage可以串聯(lián)起來,一個完成的階段可以觸發(fā)下一階段的執(zhí)行,接著觸發(fā)下一次,接著……CompletionStage接口,CompletableFuture也實現(xiàn)了future接口, 代表一個未完成的異步事件。CompletableFuture提供了方法,能夠顯式地完成這個future,所以它叫CompletableFuture。1、 創(chuàng)建一個完成的CompletableFuture
static?void?completedFutureExample()?{
????CompletableFuture?cf?=?CompletableFuture.completedFuture("message");
????assertTrue(cf.isDone());
????assertEquals("message",?cf.getNow(null));
}
getNow(null)方法在future完成的情況下會返回結(jié)果,就比如上面這個例子,否則返回null (傳入的參數(shù))。2、運行一個簡單的異步階段
static?void?runAsyncExample()?{
????CompletableFuture?cf?=?CompletableFuture.runAsync(()?->?{
????????assertTrue(Thread.currentThread().isDaemon());
????????randomSleep();
????});
????assertFalse(cf.isDone());
????sleepEnough();
????assertTrue(cf.isDone());
}
Async結(jié)尾,它會異步的執(zhí)行(沒有指定executor的情況下), 異步執(zhí)行通過ForkJoinPool實現(xiàn), 它使用守護線程去執(zhí)行任務(wù)。注意這是CompletableFuture的特性, 其它CompletionStage可以override這個默認的行為。3、在前一個階段上應(yīng)用函數(shù)
message,然后應(yīng)用一個函數(shù)把它變成大寫字母。static?void?thenApplyExample()?{
????CompletableFuture?cf?=?CompletableFuture.completedFuture("message").thenApply(s?->?{
????????assertFalse(Thread.currentThread().isDaemon());
????????return?s.toUpperCase();
????});
????assertEquals("MESSAGE",?cf.getNow(null));
}
thenApply方法名稱代表的行為。then意味著這個階段的動作發(fā)生當前的階段正常完成之后。本例中,當前節(jié)點完成,返回字符串message。Apply意味著返回的階段將會對結(jié)果前一階段的結(jié)果應(yīng)用一個函數(shù)。getNow()只有打斜操作被完成后才返回。4、在前一個階段上異步應(yīng)用函數(shù)
static?void?thenApplyAsyncExample()?{
????CompletableFuture?cf?=?CompletableFuture.completedFuture("message").thenApplyAsync(s?->?{
????????assertTrue(Thread.currentThread().isDaemon());
????????randomSleep();
????????return?s.toUpperCase();
????});
????assertNull(cf.getNow(null));
????assertEquals("MESSAGE",?cf.join());
}
5、使用定制的Executor在前一個階段上異步應(yīng)用函數(shù)
static?ExecutorService?executor?=?Executors.newFixedThreadPool(3,?new?ThreadFactory()?{
????int?count?=?1;
?
????@Override
????public?Thread?newThread(Runnable?runnable)?{
????????return?new?Thread(runnable,?"custom-executor-"?+?count++);
????}
});
?
static?void?thenApplyAsyncWithExecutorExample()?{
????CompletableFuture?cf?=?CompletableFuture.completedFuture("message").thenApplyAsync(s?->?{
????????assertTrue(Thread.currentThread().getName().startsWith("custom-executor-"));
????????assertFalse(Thread.currentThread().isDaemon());
????????randomSleep();
????????return?s.toUpperCase();
????},?executor);
?
????assertNull(cf.getNow(null));
????assertEquals("MESSAGE",?cf.join());
}
6、消費前一階段的結(jié)果
thenAccept:static?void?thenAcceptExample()?{
????StringBuilder?result?=?new?StringBuilder();
????CompletableFuture.completedFuture("thenAccept?message")
????????????.thenAccept(s?->?result.append(s));
????assertTrue("Result?was?empty",?result.length()?>?0);
}
join方法。7、異步地消費遷移階段的結(jié)果
thenAcceptAsync方法, 串聯(lián)的CompletableFuture可以異步地執(zhí)行。static?void?thenAcceptAsyncExample()?{
????StringBuilder?result?=?new?StringBuilder();
????CompletableFuture?cf?=?CompletableFuture.completedFuture("thenAcceptAsync?message")
????????????.thenAcceptAsync(s?->?result.append(s));
????cf.join();
????assertTrue("Result?was?empty",?result.length()?>?0);
}
8、完成計算異常
thenApplyAsync(Function, Executor)方法,第一個參數(shù)傳入大寫函數(shù), executor是一個delayed executor,在執(zhí)行前會延遲一秒。static?void?completeExceptionallyExample()?{
????CompletableFuture?cf?=?CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
????????????CompletableFuture.delayedExecutor(1,?TimeUnit.SECONDS));
????CompletableFuture?exceptionHandler?=?cf.handle((s,?th)?->?{?return?(th?!=?null)???"message?upon?cancel"?:?"";?});
????cf.completeExceptionally(new?RuntimeException("completed?exceptionally"));
assertTrue("Was?not?completed?exceptionally",?cf.isCompletedExceptionally());
????try?{
????????cf.join();
????????fail("Should?have?thrown?an?exception");
????}?catch(CompletionException?ex)?{?//?just?for?testing
????????assertEquals("completed?exceptionally",?ex.getCause().getMessage());
????}
?
????assertEquals("message?upon?cancel",?exceptionHandler.join());
}
message,接著我們調(diào)用thenApplyAsync方法,它返回一個CompletableFuture。這個方法在第一個函數(shù)完成后,異步地應(yīng)用轉(zhuǎn)大寫字母函數(shù)。delayedExecutor(timeout, timeUnit)延遲執(zhí)行一個異步任務(wù)。handler階段:exceptionHandler, 它處理異常異常,在異常情況下返回message upon cancel。join方法,它會執(zhí)行大寫轉(zhuǎn)換,然后拋出CompletionException(正常的join會等待1秒,然后得到大寫的字符串。不過我們的例子還沒等它執(zhí)行就完成了異常), 然后它觸發(fā)了handler階段。9、取消計算
cancel(boolean mayInterruptIfRunning)取消計算。對于CompletableFuture類,布爾參數(shù)并沒有被使用,這是因為它并沒有使用中斷去取消操作,相反,cancel等價于completeExceptionally(new CancellationException())。static?void?cancelExample()?{
????CompletableFuture?cf?=?CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
????????????CompletableFuture.delayedExecutor(1,?TimeUnit.SECONDS));
????CompletableFuture?cf2?=?cf.exceptionally(throwable?->?"canceled?message");
????assertTrue("Was?not?canceled",?cf.cancel(true));
????assertTrue("Was?not?completed?exceptionally",?cf.isCompletedExceptionally());
????assertEquals("canceled?message",?cf2.join());
}
10、在兩個完成的階段其中之一上應(yīng)用函數(shù)
CompletableFuture,?applyToEither處理兩個階段, 在其中之一上應(yīng)用函數(shù)(包保證哪一個被執(zhí)行)。本例中的兩個階段一個是應(yīng)用大寫轉(zhuǎn)換在原始的字符串上, 另一個階段是應(yīng)用小些轉(zhuǎn)換。static?void?applyToEitherExample()?{
????String?original?=?"Message";
????CompletableFuture?cf1?=?CompletableFuture.completedFuture(original)
????????????.thenApplyAsync(s?->?delayedUpperCase(s));
????CompletableFuture?cf2?=?cf1.applyToEither(
????????????CompletableFuture.completedFuture(original).thenApplyAsync(s?->?delayedLowerCase(s)),
????????????s?->?s?+?"?from?applyToEither");
????assertTrue(cf2.join().endsWith("?from?applyToEither"));
}
11、在兩個完成的階段其中之一上調(diào)用消費函數(shù)
static?void?acceptEitherExample()?{
????String?original?=?"Message";
????StringBuilder?result?=?new?StringBuilder();
????CompletableFuture?cf?=?CompletableFuture.completedFuture(original)
????????????.thenApplyAsync(s?->?delayedUpperCase(s))
????????????.acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s?->?delayedLowerCase(s)),
????????????????????s?->?result.append(s).append("acceptEither"));
????cf.join();
????assertTrue("Result?was?empty",?result.toString().endsWith("acceptEither"));
}
12、在兩個階段都執(zhí)行完后運行一個Runnable
static?void?runAfterBothExample()?{
????String?original?=?"Message";
????StringBuilder?result?=?new?StringBuilder();
????CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(
????????????CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
????????????()?->?result.append("done"));
????assertTrue("Result?was?empty",?result.length()?>?0);
}
13、 使用BiConsumer處理兩個階段的結(jié)果
static?void?thenAcceptBothExample()?{
????String?original?=?"Message";
????StringBuilder?result?=?new?StringBuilder();
????CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(
????????????CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
????????????(s1,?s2)?->?result.append(s1?+?s2));
????assertEquals("MESSAGEmessage",?result.toString());
}
14、使用BiFunction處理兩個階段的結(jié)果
thenCombine()函數(shù)。整個流水線是同步的,所以getNow()會得到最終的結(jié)果,它把大寫和小寫字符串連接起來。static?void?thenCombineExample()?{
????String?original?=?"Message";
????CompletableFuture?cf?=?CompletableFuture.completedFuture(original).thenApply(s?->?delayedUpperCase(s))
????????????.thenCombine(CompletableFuture.completedFuture(original).thenApply(s?->?delayedLowerCase(s)),
????????????????????(s1,?s2)?->?s1?+?s2);
????assertEquals("MESSAGEmessage",?cf.getNow(null));
}
15、異步使用BiFunction處理兩個階段的結(jié)果
thenCombine()也異步地執(zhí)行,即時它沒有Async后綴。Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method
join方法等待結(jié)果的完成。static?void?thenCombineAsyncExample()?{
????String?original?=?"Message";
????CompletableFuture?cf?=?CompletableFuture.completedFuture(original)
????????????.thenApplyAsync(s?->?delayedUpperCase(s))
????????????.thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s?->?delayedLowerCase(s)),
????????????????????(s1,?s2)?->?s1?+?s2);
????assertEquals("MESSAGEmessage",?cf.join());
}
16、組合 CompletableFuture
thenCompose()完成上面兩個例子。這個方法等待第一個階段的完成(大寫轉(zhuǎn)換), 它的結(jié)果傳給一個指定的返回CompletableFuture函數(shù),它的結(jié)果就是返回的CompletableFuture的結(jié)果。static?void?thenComposeExample()?{
????String?original?=?"Message";
????CompletableFuture?cf?=?CompletableFuture.completedFuture(original).thenApply(s?->?delayedUpperCase(s))
????????????.thenCompose(upper?->?CompletableFuture.completedFuture(original).thenApply(s?->?delayedLowerCase(s))
????????????????????.thenApply(s?->?upper?+?s));
????assertEquals("MESSAGEmessage",?cf.join());
}
17、當幾個階段中的一個完成,創(chuàng)建一個完成的階段
anyOf中創(chuàng)建的CompletableFuture會立即完成,這樣所有的階段都已完成,我們使用whenComplete(BiConsumer super Object, ? super Throwable> action)處理完成的結(jié)果。static?void?anyOfExample()?{
????StringBuilder?result?=?new?StringBuilder();
????List?messages?=?Arrays.asList("a",?"b",?"c");
????List?futures?=?messages.stream()
????????????.map(msg?->?CompletableFuture.completedFuture(msg).thenApply(s?->?delayedUpperCase(s)))
????????????.collect(Collectors.toList());
????CompletableFuture.anyOf(futures.toArray(new?CompletableFuture[futures.size()])).whenComplete((res,?th)?->?{
????????if(th?==?null)?{
????????????assertTrue(isUpperCase((String)?res));
????????????result.append(res);
????????}
????});
????assertTrue("Result?was?empty",?result.length()?>?0);
}
18、當所有的階段都完成后創(chuàng)建一個階段
static?void?allOfExample()?{
????StringBuilder?result?=?new?StringBuilder();
????List?messages?=?Arrays.asList("a",?"b",?"c");
????List?futures?=?messages.stream()
????????????.map(msg?->?CompletableFuture.completedFuture(msg).thenApply(s?->?delayedUpperCase(s)))
????????????.collect(Collectors.toList());
????CompletableFuture.allOf(futures.toArray(new?CompletableFuture[futures.size()])).whenComplete((v,?th)?->?{
????????futures.forEach(cf?->?assertTrue(isUpperCase(cf.getNow(null))));
????????result.append("done");
????});
????assertTrue("Result?was?empty",?result.length()?>?0);
}
19、當所有的階段都完成后異步地創(chuàng)建一個階段
thenApplyAsync()替換那些單個的CompletableFutures的方法,allOf()會在通用池中的線程中異步地執(zhí)行。所以我們需要調(diào)用join方法等待它完成。static?void?allOfAsyncExample()?{
????StringBuilder?result?=?new?StringBuilder();
????List?messages?=?Arrays.asList("a",?"b",?"c");
????List?futures?=?messages.stream()
????????????.map(msg?->?CompletableFuture.completedFuture(msg).thenApplyAsync(s?->?delayedUpperCase(s)))
????????????.collect(Collectors.toList());
????CompletableFuture?allOf?=?CompletableFuture.allOf(futures.toArray(new?CompletableFuture[futures.size()]))
????????????.whenComplete((v,?th)?->?{
????????????????futures.forEach(cf?->?assertTrue(isUpperCase(cf.getNow(null))));
????????????????result.append("done");
????????????});
????allOf.join();
????assertTrue("Result?was?empty",?result.length()?>?0);
}
20、真實的例子
首先異步調(diào)用 cars方法獲得Car的列表,它返回CompletionStage場景。cars消費一個遠程的REST API。然后我們復(fù)合一個CompletionStage填寫每個汽車的評分,通過 rating(manufacturerId)返回一個CompletionStage, 它會異步地獲取汽車的評分(可能又是一個REST API調(diào)用)當所有的汽車填好評分后,我們結(jié)束這個列表,所以我們調(diào)用 allOf得到最終的階段, 它在前面階段所有階段完成后才完成。在最終的階段調(diào)用 whenComplete(),我們打印出每個汽車和它的評分。
cars().thenCompose(cars?->?{
????List?updatedCars?=?cars.stream()
????????????.map(car?->?rating(car.manufacturerId).thenApply(r?->?{
????????????????car.setRating(r);
????????????????return?car;
????????????})).collect(Collectors.toList());
?
????CompletableFuture?done?=?CompletableFuture
????????????.allOf(updatedCars.toArray(new?CompletableFuture[updatedCars.size()]));
????return?done.thenApply(v?->?updatedCars.stream().map(CompletionStage::toCompletableFuture)
????????????.map(CompletableFuture::join).collect(Collectors.toList()));
}).whenComplete((cars,?th)?->?{
????if?(th?==?null)?{
????????cars.forEach(System.out::println);
????}?else?{
????????throw?new?RuntimeException(th);
????}
}).toCompletableFuture().join();
allOf方法,而不是手工的線程等待(Thread#join() 或 a CountDownLatch)。推薦閱讀:
牛逼!一款基于SpringBoot的微信點餐系統(tǒng)
微信掃描二維碼,關(guān)注我的公眾號
朕已閱?

