20個(gè)使用 Java CompletableFuture的示例,不服不行
來源:colobu.com
在Java中異步編程,不一定非要使用rxJava, Java本身的庫中的CompletableFuture可以很好的應(yīng)對大部分的場景。
CompletableFuture類實(shí)現(xiàn)了CompletionStage接口,首先我們需要理解這個(gè)接口的契約。它代表了一個(gè)特定的計(jì)算的階段,可以同步或者異步的被完成。你可以把它看成一個(gè)計(jì)算流水線上的一個(gè)單元,最終會產(chǎn)生一個(gè)最終結(jié)果,這意味著幾個(gè)CompletionStage可以串聯(lián)起來,一個(gè)完成的階段可以觸發(fā)下一階段的執(zhí)行,接著觸發(fā)下一次,接著……CompletionStage接口,CompletableFuture也實(shí)現(xiàn)了future接口, 代表一個(gè)未完成的異步事件。CompletableFuture提供了方法,能夠顯式地完成這個(gè)future,所以它叫CompletableFuture。1、 創(chuàng)建一個(gè)完成的CompletableFuture
static?void?completedFutureExample()?{
????CompletableFuture?cf?=?CompletableFuture.completedFuture("message");
????assertTrue(cf.isDone());
????assertEquals("message",?cf.getNow(null));
}
getNow(null)方法在future完成的情況下會返回結(jié)果,就比如上面這個(gè)例子,否則返回null (傳入的參數(shù))。2、運(yùn)行一個(gè)簡單的異步階段
static?void?runAsyncExample()?{
????CompletableFuture?cf?=?CompletableFuture.runAsync(()?->?{
????????assertTrue(Thread.currentThread().isDaemon());
????????randomSleep();
????});
????assertFalse(cf.isDone());
????sleepEnough();
????assertTrue(cf.isDone());
}
Async結(jié)尾,它會異步的執(zhí)行(沒有指定executor的情況下), 異步執(zhí)行通過ForkJoinPool實(shí)現(xiàn), 它使用守護(hù)線程去執(zhí)行任務(wù)。注意這是CompletableFuture的特性, 其它CompletionStage可以override這個(gè)默認(rèn)的行為。3、在前一個(gè)階段上應(yīng)用函數(shù)
message,然后應(yīng)用一個(gè)函數(shù)把它變成大寫字母。static?void?thenApplyExample()?{
????CompletableFuture?cf?=?CompletableFuture.completedFuture("message").thenApply(s?->?{
????????assertFalse(Thread.currentThread().isDaemon());
????????return?s.toUpperCase();
????});
????assertEquals("MESSAGE",?cf.getNow(null));
}
thenApply方法名稱代表的行為。then意味著這個(gè)階段的動作發(fā)生當(dāng)前的階段正常完成之后。本例中,當(dāng)前節(jié)點(diǎn)完成,返回字符串message。Apply意味著返回的階段將會對結(jié)果前一階段的結(jié)果應(yīng)用一個(gè)函數(shù)。getNow()只有打斜操作被完成后才返回。4、在前一個(gè)階段上異步應(yīng)用函數(shù)
static?void?thenApplyAsyncExample()?{
????CompletableFuture?cf?=?CompletableFuture.completedFuture("message").thenApplyAsync(s?->?{
????????assertTrue(Thread.currentThread().isDaemon());
????????randomSleep();
????????return?s.toUpperCase();
????});
????assertNull(cf.getNow(null));
????assertEquals("MESSAGE",?cf.join());
}
5、使用定制的Executor在前一個(gè)階段上異步應(yīng)用函數(shù)
static?ExecutorService?executor?=?Executors.newFixedThreadPool(3,?new?ThreadFactory()?{
????int?count?=?1;
?
????@Override
????public?Thread?newThread(Runnable?runnable)?{
????????return?new?Thread(runnable,?"custom-executor-"?+?count++);
????}
});
?
static?void?thenApplyAsyncWithExecutorExample()?{
????CompletableFuture?cf?=?CompletableFuture.completedFuture("message").thenApplyAsync(s?->?{
????????assertTrue(Thread.currentThread().getName().startsWith("custom-executor-"));
????????assertFalse(Thread.currentThread().isDaemon());
????????randomSleep();
????????return?s.toUpperCase();
????},?executor);
?
????assertNull(cf.getNow(null));
????assertEquals("MESSAGE",?cf.join());
}
6、消費(fèi)前一階段的結(jié)果
thenAccept:static?void?thenAcceptExample()?{
????StringBuilder?result?=?new?StringBuilder();
????CompletableFuture.completedFuture("thenAccept?message")
????????????.thenAccept(s?->?result.append(s));
????assertTrue("Result?was?empty",?result.length()?>?0);
}
join方法。7、異步地消費(fèi)遷移階段的結(jié)果
thenAcceptAsync方法, 串聯(lián)的CompletableFuture可以異步地執(zhí)行。static?void?thenAcceptAsyncExample()?{
????StringBuilder?result?=?new?StringBuilder();
????CompletableFuture?cf?=?CompletableFuture.completedFuture("thenAcceptAsync?message")
????????????.thenAcceptAsync(s?->?result.append(s));
????cf.join();
????assertTrue("Result?was?empty",?result.length()?>?0);
}
8、完成計(jì)算異常
thenApplyAsync(Function, Executor)方法,第一個(gè)參數(shù)傳入大寫函數(shù), executor是一個(gè)delayed executor,在執(zhí)行前會延遲一秒。static?void?completeExceptionallyExample()?{
????CompletableFuture?cf?=?CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
????????????CompletableFuture.delayedExecutor(1,?TimeUnit.SECONDS));
????CompletableFuture?exceptionHandler?=?cf.handle((s,?th)?->?{?return?(th?!=?null)???"message?upon?cancel"?:?"";?});
????cf.completeExceptionally(new?RuntimeException("completed?exceptionally"));
assertTrue("Was?not?completed?exceptionally",?cf.isCompletedExceptionally());
????try?{
????????cf.join();
????????fail("Should?have?thrown?an?exception");
????}?catch(CompletionException?ex)?{?//?just?for?testing
????????assertEquals("completed?exceptionally",?ex.getCause().getMessage());
????}
?
????assertEquals("message?upon?cancel",?exceptionHandler.join());
}
message,接著我們調(diào)用thenApplyAsync方法,它返回一個(gè)CompletableFuture。這個(gè)方法在第一個(gè)函數(shù)完成后,異步地應(yīng)用轉(zhuǎn)大寫字母函數(shù)。delayedExecutor(timeout, timeUnit)延遲執(zhí)行一個(gè)異步任務(wù)。handler階段:exceptionHandler, 它處理異常異常,在異常情況下返回message upon cancel。join方法,它會執(zhí)行大寫轉(zhuǎn)換,然后拋出CompletionException(正常的join會等待1秒,然后得到大寫的字符串。不過我們的例子還沒等它執(zhí)行就完成了異常), 然后它觸發(fā)了handler階段。9、取消計(jì)算
cancel(boolean mayInterruptIfRunning)取消計(jì)算。對于CompletableFuture類,布爾參數(shù)并沒有被使用,這是因?yàn)樗]有使用中斷去取消操作,相反,cancel等價(jià)于completeExceptionally(new CancellationException())。static?void?cancelExample()?{
????CompletableFuture?cf?=?CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
????????????CompletableFuture.delayedExecutor(1,?TimeUnit.SECONDS));
????CompletableFuture?cf2?=?cf.exceptionally(throwable?->?"canceled?message");
????assertTrue("Was?not?canceled",?cf.cancel(true));
????assertTrue("Was?not?completed?exceptionally",?cf.isCompletedExceptionally());
????assertEquals("canceled?message",?cf2.join());
}
10、在兩個(gè)完成的階段其中之一上應(yīng)用函數(shù)
CompletableFuture,?applyToEither處理兩個(gè)階段, 在其中之一上應(yīng)用函數(shù)(包保證哪一個(gè)被執(zhí)行)。本例中的兩個(gè)階段一個(gè)是應(yīng)用大寫轉(zhuǎn)換在原始的字符串上, 另一個(gè)階段是應(yīng)用小些轉(zhuǎn)換。static?void?applyToEitherExample()?{
????String?original?=?"Message";
????CompletableFuture?cf1?=?CompletableFuture.completedFuture(original)
????????????.thenApplyAsync(s?->?delayedUpperCase(s));
????CompletableFuture?cf2?=?cf1.applyToEither(
????????????CompletableFuture.completedFuture(original).thenApplyAsync(s?->?delayedLowerCase(s)),
????????????s?->?s?+?"?from?applyToEither");
????assertTrue(cf2.join().endsWith("?from?applyToEither"));
}
11、在兩個(gè)完成的階段其中之一上調(diào)用消費(fèi)函數(shù)
static?void?acceptEitherExample()?{
????String?original?=?"Message";
????StringBuilder?result?=?new?StringBuilder();
????CompletableFuture?cf?=?CompletableFuture.completedFuture(original)
????????????.thenApplyAsync(s?->?delayedUpperCase(s))
????????????.acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s?->?delayedLowerCase(s)),
????????????????????s?->?result.append(s).append("acceptEither"));
????cf.join();
????assertTrue("Result?was?empty",?result.toString().endsWith("acceptEither"));
}
12、在兩個(gè)階段都執(zhí)行完后運(yùn)行一個(gè)Runnable
static?void?runAfterBothExample()?{
????String?original?=?"Message";
????StringBuilder?result?=?new?StringBuilder();
????CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(
????????????CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
????????????()?->?result.append("done"));
????assertTrue("Result?was?empty",?result.length()?>?0);
}
13、 使用BiConsumer處理兩個(gè)階段的結(jié)果
static?void?thenAcceptBothExample()?{
????String?original?=?"Message";
????StringBuilder?result?=?new?StringBuilder();
????CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(
????????????CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
????????????(s1,?s2)?->?result.append(s1?+?s2));
????assertEquals("MESSAGEmessage",?result.toString());
}
14、使用BiFunction處理兩個(gè)階段的結(jié)果
thenCombine()函數(shù)。整個(gè)流水線是同步的,所以getNow()會得到最終的結(jié)果,它把大寫和小寫字符串連接起來。static?void?thenCombineExample()?{
????String?original?=?"Message";
????CompletableFuture?cf?=?CompletableFuture.completedFuture(original).thenApply(s?->?delayedUpperCase(s))
????????????.thenCombine(CompletableFuture.completedFuture(original).thenApply(s?->?delayedLowerCase(s)),
????????????????????(s1,?s2)?->?s1?+?s2);
????assertEquals("MESSAGEmessage",?cf.getNow(null));
}
15、異步使用BiFunction處理兩個(gè)階段的結(jié)果
thenCombine()也異步地執(zhí)行,即時(shí)它沒有Async后綴。Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method
join方法等待結(jié)果的完成。static?void?thenCombineAsyncExample()?{
????String?original?=?"Message";
????CompletableFuture?cf?=?CompletableFuture.completedFuture(original)
????????????.thenApplyAsync(s?->?delayedUpperCase(s))
????????????.thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s?->?delayedLowerCase(s)),
????????????????????(s1,?s2)?->?s1?+?s2);
????assertEquals("MESSAGEmessage",?cf.join());
}
16、組合 CompletableFuture
thenCompose()完成上面兩個(gè)例子。這個(gè)方法等待第一個(gè)階段的完成(大寫轉(zhuǎn)換), 它的結(jié)果傳給一個(gè)指定的返回CompletableFuture函數(shù),它的結(jié)果就是返回的CompletableFuture的結(jié)果。static?void?thenComposeExample()?{
????String?original?=?"Message";
????CompletableFuture?cf?=?CompletableFuture.completedFuture(original).thenApply(s?->?delayedUpperCase(s))
????????????.thenCompose(upper?->?CompletableFuture.completedFuture(original).thenApply(s?->?delayedLowerCase(s))
????????????????????.thenApply(s?->?upper?+?s));
????assertEquals("MESSAGEmessage",?cf.join());
}
17、當(dāng)幾個(gè)階段中的一個(gè)完成,創(chuàng)建一個(gè)完成的階段
anyOf中創(chuàng)建的CompletableFuture會立即完成,這樣所有的階段都已完成,我們使用whenComplete(BiConsumer super Object, ? super Throwable> action)處理完成的結(jié)果。static?void?anyOfExample()?{
????StringBuilder?result?=?new?StringBuilder();
????List?messages?=?Arrays.asList("a",?"b",?"c");
????List?futures?=?messages.stream()
????????????.map(msg?->?CompletableFuture.completedFuture(msg).thenApply(s?->?delayedUpperCase(s)))
????????????.collect(Collectors.toList());
????CompletableFuture.anyOf(futures.toArray(new?CompletableFuture[futures.size()])).whenComplete((res,?th)?->?{
????????if(th?==?null)?{
????????????assertTrue(isUpperCase((String)?res));
????????????result.append(res);
????????}
????});
????assertTrue("Result?was?empty",?result.length()?>?0);
}
18、當(dāng)所有的階段都完成后創(chuàng)建一個(gè)階段
static?void?allOfExample()?{
????StringBuilder?result?=?new?StringBuilder();
????List?messages?=?Arrays.asList("a",?"b",?"c");
????List?futures?=?messages.stream()
????????????.map(msg?->?CompletableFuture.completedFuture(msg).thenApply(s?->?delayedUpperCase(s)))
????????????.collect(Collectors.toList());
????CompletableFuture.allOf(futures.toArray(new?CompletableFuture[futures.size()])).whenComplete((v,?th)?->?{
????????futures.forEach(cf?->?assertTrue(isUpperCase(cf.getNow(null))));
????????result.append("done");
????});
????assertTrue("Result?was?empty",?result.length()?>?0);
}
19、當(dāng)所有的階段都完成后異步地創(chuàng)建一個(gè)階段
thenApplyAsync()替換那些單個(gè)的CompletableFutures的方法,allOf()會在通用池中的線程中異步地執(zhí)行。所以我們需要調(diào)用join方法等待它完成。static?void?allOfAsyncExample()?{
????StringBuilder?result?=?new?StringBuilder();
????List?messages?=?Arrays.asList("a",?"b",?"c");
????List?futures?=?messages.stream()
????????????.map(msg?->?CompletableFuture.completedFuture(msg).thenApplyAsync(s?->?delayedUpperCase(s)))
????????????.collect(Collectors.toList());
????CompletableFuture?allOf?=?CompletableFuture.allOf(futures.toArray(new?CompletableFuture[futures.size()]))
????????????.whenComplete((v,?th)?->?{
????????????????futures.forEach(cf?->?assertTrue(isUpperCase(cf.getNow(null))));
????????????????result.append("done");
????????????});
????allOf.join();
????assertTrue("Result?was?empty",?result.length()?>?0);
}
20、真實(shí)的例子
首先異步調(diào)用 cars方法獲得Car的列表,它返回CompletionStage場景。cars消費(fèi)一個(gè)遠(yuǎn)程的REST API。然后我們復(fù)合一個(gè)CompletionStage填寫每個(gè)汽車的評分,通過 rating(manufacturerId)返回一個(gè)CompletionStage, 它會異步地獲取汽車的評分(可能又是一個(gè)REST API調(diào)用)當(dāng)所有的汽車填好評分后,我們結(jié)束這個(gè)列表,所以我們調(diào)用 allOf得到最終的階段, 它在前面階段所有階段完成后才完成。在最終的階段調(diào)用 whenComplete(),我們打印出每個(gè)汽車和它的評分。
cars().thenCompose(cars?->?{
????List?updatedCars?=?cars.stream()
????????????.map(car?->?rating(car.manufacturerId).thenApply(r?->?{
????????????????car.setRating(r);
????????????????return?car;
????????????})).collect(Collectors.toList());
?
????CompletableFuture?done?=?CompletableFuture
????????????.allOf(updatedCars.toArray(new?CompletableFuture[updatedCars.size()]));
????return?done.thenApply(v?->?updatedCars.stream().map(CompletionStage::toCompletableFuture)
????????????.map(CompletableFuture::join).collect(Collectors.toList()));
}).whenComplete((cars,?th)?->?{
????if?(th?==?null)?{
????????cars.forEach(System.out::println);
????}?else?{
????????throw?new?RuntimeException(th);
????}
}).toCompletableFuture().join();
allOf方法,而不是手工的線程等待(Thread#join() 或 a CountDownLatch)。后臺回復(fù)?學(xué)習(xí)資料?領(lǐng)取學(xué)習(xí)視頻
如有收獲,點(diǎn)個(gè)在看,誠摯感謝
