<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          無語!你竟然連CompletableFuture都不知道,還天天說在jdk8原地踏步~

          共 9644字,需瀏覽 20分鐘

           ·

          2021-11-08 14:27

          上一篇:在 Spring Boot 中,如何干掉 if else

          來源:https://urlify.cn/ayaMBb


          這篇文章介紹 Java 8 的 CompletionStage API和它的標準庫的實現(xiàn) CompletableFuture。API通過例子的方式演示了它的行為,每個例子演示一到兩個行為。

          既然CompletableFuture類實現(xiàn)了CompletionStage接口,首先我們需要理解這個接口的契約。它代表了一個特定的計算的階段,可以同步或者異步的被完成。你可以把它看成一個計算流水線上的一個單元,最終會產(chǎn)生一個最終結(jié)果,這意味著幾個CompletionStage可以串聯(lián)起來,一個完成的階段可以觸發(fā)下一階段的執(zhí)行,接著觸發(fā)下一次,接著……

          除了實現(xiàn)CompletionStage接口, CompletableFuture也實現(xiàn)了future接口, 代表一個未完成的異步事件。CompletableFuture提供了方法,能夠顯式地完成這個future,所以它叫CompletableFuture。


          創(chuàng)建一個完成的CompletableFuture


          最簡單的例子就是使用一個預(yù)定義的結(jié)果創(chuàng)建一個完成的CompletableFuture,通常我們會在計算的開始階段使用它。
          static void completedFutureExample() {    CompletableFuture cf = CompletableFuture.completedFuture("message");    assertTrue(cf.isDone());    assertEquals("message", cf.getNow(null));}

          getNow(null)方法在future完成的情況下會返回結(jié)果,就比如上面這個例子,否則返回null (傳入的參數(shù))。


          運行一個簡單的異步階段


          這個例子創(chuàng)建一個一個異步執(zhí)行的階段:

          static void runAsyncExample() {    CompletableFuture cf = CompletableFuture.runAsync(() -> {        assertTrue(Thread.currentThread().isDaemon());        randomSleep();    });    assertFalse(cf.isDone());    sleepEnough();    assertTrue(cf.isDone());}

          通過這個例子可以學(xué)到兩件事情:


          CompletableFuture的方法如果以Async結(jié)尾,它會異步的執(zhí)行(沒有指定executor的情況下), 異步執(zhí)行通過ForkJoinPool實現(xiàn), 它使用守護線程去執(zhí)行任務(wù)。注意這是CompletableFuture的特性, 其它CompletionStage可以override這個默認的行為。


          在前一個階段上應(yīng)用函數(shù)


          下面這個例子使用前面 #1 的完成的CompletableFuture, #1返回結(jié)果為字符串message,然后應(yīng)用一個函數(shù)把它變成大寫字母。

          static void thenApplyExample() {    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApply(s -> {        assertFalse(Thread.currentThread().isDaemon());        return s.toUpperCase();    });    assertEquals("MESSAGE", cf.getNow(null));}

          注意thenApply方法名稱代表的行為。


          then意味著這個階段的動作發(fā)生當(dāng)前的階段正常完成之后。本例中,當(dāng)前節(jié)點完成,返回字符串message。


          Apply意味著返回的階段將會對結(jié)果前一階段的結(jié)果應(yīng)用一個函數(shù)。


          函數(shù)的執(zhí)行會被阻塞,這意味著getNow()只有打斜操作被完成后才返回。


          在前一個階段上異步應(yīng)用函數(shù)


          通過調(diào)用異步方法(方法后邊加Async后綴),串聯(lián)起來的CompletableFuture可以異步地執(zhí)行(使用ForkJoinPool.commonPool())。
          static void thenApplyAsyncExample() {    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {        assertTrue(Thread.currentThread().isDaemon());        randomSleep();        return s.toUpperCase();    });    assertNull(cf.getNow(null));    assertEquals("MESSAGE", cf.join());}

          使用定制的Executor在前一個階段上異步應(yīng)用函數(shù)


          異步方法一個非常有用的特性就是能夠提供一個Executor來異步地執(zhí)行CompletableFuture。這個例子演示了如何使用一個固定大小的線程池來應(yīng)用大寫函數(shù)。

          static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {    int count = 1;
          @Override public Thread newThread(Runnable runnable) { return new Thread(runnable, "custom-executor-" + count++); }});
          static void thenApplyAsyncWithExecutorExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { assertTrue(Thread.currentThread().getName().startsWith("custom-executor-")); assertFalse(Thread.currentThread().isDaemon()); randomSleep(); return s.toUpperCase(); }, executor);
          assertNull(cf.getNow(null)); assertEquals("MESSAGE", cf.join());}

          消費前一階段的結(jié)果


          如果下一階段接收了當(dāng)前階段的結(jié)果,但是在計算的時候不需要返回值(它的返回類型是void), 那么它可以不應(yīng)用一個函數(shù),而是一個消費者, 調(diào)用方法也變成了thenAccept:
          static void thenAcceptExample() {    StringBuilder result = new StringBuilder();    CompletableFuture.completedFuture("thenAccept message")            .thenAccept(s -> result.append(s));    assertTrue("Result was empty", result.length() > 0);}

          本例中消費者同步地執(zhí)行,所以我們不需要在CompletableFuture調(diào)用join方法。


          異步地消費遷移階段的結(jié)果


          同樣,可以使用thenAcceptAsync方法, 串聯(lián)的CompletableFuture可以異步地執(zhí)行。
          static void thenAcceptAsyncExample() {    StringBuilder result = new StringBuilder();    CompletableFuture cf = CompletableFuture.completedFuture("thenAcceptAsync message")            .thenAcceptAsync(s -> result.append(s));    cf.join();    assertTrue("Result was empty", result.length() > 0);}


          完成計算異常


          現(xiàn)在我們來看一下異步操作如何顯式地返回異常,用來指示計算失敗。我們簡化這個例子,操作處理一個字符串,把它轉(zhuǎn)換成答謝,我們模擬延遲一秒。


          我們使用thenApplyAsync(Function, Executor)方法,第一個參數(shù)傳入大寫函數(shù), executor是一個delayed executor,在執(zhí)行前會延遲一秒。

          static void completeExceptionallyExample() {    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,            CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));    CompletableFuture exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; });    cf.completeExceptionally(new RuntimeException("completed exceptionally"));assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());    try {        cf.join();        fail("Should have thrown an exception");    } catch(CompletionException ex) { // just for testing        assertEquals("completed exceptionally", ex.getCause().getMessage());    }
          assertEquals("message upon cancel", exceptionHandler.join());}

          讓我們看一下細節(jié)。


          首先我們創(chuàng)建了一個CompletableFuture, 完成后返回一個字符串message,接著我們調(diào)用thenApplyAsync方法,它返回一個CompletableFuture。這個方法在第一個函數(shù)完成后,異步地應(yīng)用轉(zhuǎn)大寫字母函數(shù)。


          這個例子還演示了如何通過delayedExecutor(timeout, timeUnit)延遲執(zhí)行一個異步任務(wù)。


          我們創(chuàng)建了一個分離的handler階段:exceptionHandler, 它處理異常異常,在異常情況下返回message upon cancel。


          下一步我們顯式地用異常完成第二個階段。在階段上調(diào)用join方法,它會執(zhí)行大寫轉(zhuǎn)換,然后拋出CompletionException(正常的join會等待1秒,然后得到大寫的字符串。不過我們的例子還沒等它執(zhí)行就完成了異常), 然后它觸發(fā)了handler階段。


          取消計算


          和完成異常類似,我們可以調(diào)用cancel(boolean mayInterruptIfRunning)取消計算。對于CompletableFuture類,布爾參數(shù)并沒有被使用,這是因為它并沒有使用中斷去取消操作,相反,cancel等價于completeExceptionally(new CancellationException())。
          static void cancelExample() {    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,            CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));    CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message");    assertTrue("Was not canceled", cf.cancel(true));    assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());    assertEquals("canceled message", cf2.join());}

          在兩個完成的階段其中之一上應(yīng)用函數(shù)


          下面的例子創(chuàng)建了CompletableFuture, applyToEither處理兩個階段, 在其中之一上應(yīng)用函數(shù)(包保證哪一個被執(zhí)行)。本例中的兩個階段一個是應(yīng)用大寫轉(zhuǎn)換在原始的字符串上, 另一個階段是應(yīng)用小些轉(zhuǎn)換。
          static void applyToEitherExample() {    String original = "Message";    CompletableFuture cf1 = CompletableFuture.completedFuture(original)            .thenApplyAsync(s -> delayedUpperCase(s));    CompletableFuture cf2 = cf1.applyToEither(            CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),            s -> s + " from applyToEither");    assertTrue(cf2.join().endsWith(" from applyToEither"));}

          在兩個完成的階段其中之一上調(diào)用消費函數(shù)


          和前一個例子很類似了,只不過我們調(diào)用的是消費者函數(shù) (Function變成Consumer):
          static void acceptEitherExample() {    String original = "Message";    StringBuilder result = new StringBuilder();    CompletableFuture cf = CompletableFuture.completedFuture(original)            .thenApplyAsync(s -> delayedUpperCase(s))            .acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),                    s -> result.append(s).append("acceptEither"));    cf.join();    assertTrue("Result was empty", result.toString().endsWith("acceptEither"));}

          在兩個階段都執(zhí)行完后運行一個 Runnable


          這個例子演示了依賴的CompletableFuture如果等待兩個階段完成后執(zhí)行了一個Runnable。注意下面所有的階段都是同步執(zhí)行的,第一個階段執(zhí)行大寫轉(zhuǎn)換,第二個階段執(zhí)行小寫轉(zhuǎn)換。

          static void runAfterBothExample() {    String original = "Message";    StringBuilder result = new StringBuilder();    CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(            CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),            () -> result.append("done"));    assertTrue("Result was empty", result.length() > 0);}

          使用BiConsumer處理兩個階段的結(jié)果

          上面的例子還可以通過BiConsumer來實現(xiàn):

          static void thenAcceptBothExample() {    String original = "Message";    StringBuilder result = new StringBuilder();    CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(            CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),            (s1, s2) -> result.append(s1 + s2));    assertEquals("MESSAGEmessage", result.toString());}

          使用BiFunction處理兩個階段的結(jié)果


          如果CompletableFuture依賴兩個前面階段的結(jié)果, 它復(fù)合兩個階段的結(jié)果再返回一個結(jié)果,我們就可以使用thenCombine()函數(shù)。整個流水線是同步的,所以getNow()會得到最終的結(jié)果,它把大寫和小寫字符串連接起來。
          static void thenCombineExample() {    String original = "Message";    CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))            .thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)),                    (s1, s2) -> s1 + s2);    assertEquals("MESSAGEmessage", cf.getNow(null));}

          異步使用BiFunction處理兩個階段的結(jié)果


          類似上面的例子,但是有一點不同:依賴的前兩個階段異步地執(zhí)行,所以thenCombine()也異步地執(zhí)行,即時它沒有Async后綴。


          Javadoc中有注釋:

          Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method


          所以我們需要join方法等待結(jié)果的完成。

          static void thenCombineAsyncExample() {    String original = "Message";    CompletableFuture cf = CompletableFuture.completedFuture(original)            .thenApplyAsync(s -> delayedUpperCase(s))            .thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),                    (s1, s2) -> s1 + s2);    assertEquals("MESSAGEmessage", cf.join());}

          組合 CompletableFuture


          我們可以使用thenCompose()完成上面兩個例子。這個方法等待第一個階段的完成(大寫轉(zhuǎn)換), 它的結(jié)果傳給一個指定的返回CompletableFuture函數(shù),它的結(jié)果就是返回的CompletableFuture的結(jié)果。


          有點拗口,但是我們看例子來理解。函數(shù)需要一個大寫字符串做參數(shù),然后返回一個CompletableFuture, 這個CompletableFuture會轉(zhuǎn)換字符串變成小寫然后連接在大寫字符串的后面。
          static void thenComposeExample() {    String original = "Message";    CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))            .thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s))                    .thenApply(s -> upper + s));    assertEquals("MESSAGEmessage", cf.join());}

          當(dāng)幾個階段中的一個完成,創(chuàng)建一個完成的階段


          下面的例子演示了當(dāng)任意一個CompletableFuture完成后, 創(chuàng)建一個完成的CompletableFuture.


          待處理的階段首先創(chuàng)建, 每個階段都是轉(zhuǎn)換一個字符串為大寫。因為本例中這些階段都是同步地執(zhí)行(thenApply), 從anyOf中創(chuàng)建的CompletableFuture會立即完成,這樣所有的階段都已完成,我們使用whenComplete(BiConsumer action)處理完成的結(jié)果。
          static void anyOfExample() {    StringBuilder result = new StringBuilder();    List messages = Arrays.asList("a", "b", "c");    List futures = messages.stream()            .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))            .collect(Collectors.toList());    CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> {        if(th == null) {            assertTrue(isUpperCase((String) res));            result.append(res);        }    });    assertTrue("Result was empty", result.length() > 0);}

          當(dāng)所有的階段都完成后創(chuàng)建一個階段


          上一個例子是當(dāng)任意一個階段完成后接著處理,接下來的兩個例子演示當(dāng)所有的階段完成后才繼續(xù)處理, 同步地方式和異步地方式兩種。
          static void allOfExample() {    StringBuilder result = new StringBuilder();    List messages = Arrays.asList("a", "b", "c");    List futures = messages.stream()            .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))            .collect(Collectors.toList());    CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> {        futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));        result.append("done");    });    assertTrue("Result was empty", result.length() > 0);}

          當(dāng)所有的階段都完成后異步地創(chuàng)建一個階段


          使用thenApplyAsync()替換那些單個的CompletableFutures的方法,allOf()會在通用池中的線程中異步地執(zhí)行。所以我們需要調(diào)用join方法等待它完成。
          static void allOfAsyncExample() {    StringBuilder result = new StringBuilder();    List messages = Arrays.asList("a", "b", "c");    List futures = messages.stream()            .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s)))            .collect(Collectors.toList());    CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))            .whenComplete((v, th) -> {                futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));                result.append("done");            });    allOf.join();    assertTrue("Result was empty", result.length() > 0);}

          真實的例子


          Now that the functionality of CompletionStage and specifically CompletableFuture is explored, the below example applies them in a practical scenario:


          現(xiàn)在你已經(jīng)了解了CompletionStage 和 CompletableFuture 的一些函數(shù)的功能,下面的例子是一個實踐場景:


          1. 首先異步調(diào)用cars方法獲得Car的列表,它返回CompletionStage場景。cars消費一個遠程的REST API。

          2. 然后我們復(fù)合一個CompletionStage填寫每個汽車的評分,通過rating(manufacturerId)返回一個CompletionStage, 它會異步地獲取汽車的評分(可能又是一個REST API調(diào)用)

          3. 當(dāng)所有的汽車填好評分后,我們結(jié)束這個列表,所以我們調(diào)用allOf得到最終的階段, 它在前面階段所有階段完成后才完成。

          4. 在最終的階段調(diào)用whenComplete(),我們打印出每個汽車和它的評分。

          cars().thenCompose(cars -> {    List updatedCars = cars.stream()            .map(car -> rating(car.manufacturerId).thenApply(r -> {                car.setRating(r);                return car;            })).collect(Collectors.toList());
          CompletableFuture done = CompletableFuture .allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()])); return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture) .map(CompletableFuture::join).collect(Collectors.toList()));}).whenComplete((cars, th) -> { if (th == null) { cars.forEach(System.out::println); } else { throw new RuntimeException(th); }}).toCompletableFuture().join();

          因為每個汽車的實例都是獨立的,得到每個汽車的評分都可以異步地執(zhí)行,這會提高系統(tǒng)的性能(延遲),而且,等待所有的汽車評分被處理使用的是allOf方法,而不是手工的線程等待(Thread#join() 或 a CountDownLatch)。


          這些例子可以幫助你更好的理解相關(guān)的API,你可以在github上得到所有的例子的代碼。


          其它參考文檔


          1. Reactive programming with Java 8 and simple-react : The Tutorial
          2. CompletableFuture Overview
          3. CompletableFuture vs Future: going async with Java 8 new features
          4. spotify/completable-futures
          感謝您的閱讀,也歡迎您發(fā)表關(guān)于這篇文章的任何建議,關(guān)注我,技術(shù)不迷茫!小編到你上高速。


          ??? · END ·
          最后,關(guān)注公眾號互聯(lián)網(wǎng)架構(gòu)師,在后臺回復(fù):2T,可以獲取我整理的 Java 系列面試題和答案,非常齊全


          正文結(jié)束


          推薦閱讀 ↓↓↓

          1.不認命,從10年流水線工人,到谷歌上班的程序媛,一位湖南妹子的勵志故事

          2.如何才能成為優(yōu)秀的架構(gòu)師?

          3.從零開始搭建創(chuàng)業(yè)公司后臺技術(shù)棧

          4.程序員一般可以從什么平臺接私活?

          5.37歲程序員被裁,120天沒找到工作,無奈去小公司,結(jié)果懵了...

          6.IntelliJ IDEA 2019.3 首個最新訪問版本發(fā)布,新特性搶先看

          7.這封“領(lǐng)導(dǎo)痛批95后下屬”的郵件,句句扎心!

          8.15張圖看懂瞎忙和高效的區(qū)別!


          瀏覽 37
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  在线黄片观看 | 亚洲在线成人视频 | 99免费视屏 | 久草新视频91 | 天天干,天天射免费视频 |