<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          接口響應(yīng)慢?那是你沒用 CompletableFuture 來優(yōu)化!

          共 59845字,需瀏覽 120分鐘

           ·

          2024-04-26 12:16

          來源:blog.csdn.net/qq_43372633/article/details/130814200

          ?? 歡迎加入小哈的星球 ,你將獲得: 專屬的項(xiàng)目實(shí)戰(zhàn) / Java 學(xué)習(xí)路線 / 一對一提問 / 學(xué)習(xí)打卡 /  贈(zèng)書福利


          全棧前后端分離博客項(xiàng)目 2.0 版本完結(jié)啦, 演示鏈接http://116.62.199.48/ ,新項(xiàng)目正在醞釀中。全程手摸手,后端 + 前端全棧開發(fā),從 0 到 1 講解每個(gè)功能點(diǎn)開發(fā)步驟,1v1 答疑,直到項(xiàng)目上線。目前已更新了239小節(jié),累計(jì)38w+字,講解圖:1645張,還在持續(xù)爆肝中.. 后續(xù)還會(huì)上新更多項(xiàng)目,目標(biāo)是將Java領(lǐng)域典型的項(xiàng)目都整一波,如秒殺系統(tǒng), 在線商城, IM即時(shí)通訊,Spring Cloud Alibaba 等等,戳我加入學(xué)習(xí),已有1300+小伙伴加入(早鳥價(jià)超低)


          • 前言
          • 為什么要用異步編程
          • 回顧Future
          • CompletableFuture使用場景
          • CompletableFuture注意點(diǎn)

          前言

          大多數(shù)程序員在平時(shí)工作中,都是增刪改查。這里我跟大家講解如何利用CompletableFuture優(yōu)化項(xiàng)目代碼,使項(xiàng)目性能更佳!

          為什么要用異步編程

          舉個(gè)例子:用戶登錄成功,需要返回前端用戶角色,菜單權(quán)限,個(gè)人信息,用戶余額,積分情況等。正常邏輯是依次查詢不同表,得到對應(yīng)的數(shù)據(jù)封裝返回給前端,代碼如下:

          @Test
          public void login(Long userId){
              log.info("開始查詢用戶全部信息---串行!");
              // 查詢用戶角色信息
              getUserRole(userId);
              // 查詢用戶菜單信息
              getUserMenu(userId);
              // 查詢用戶余額信息
              getUserAmount(userId);
              // 查詢用戶積分信息
              getUserIntegral(userId);
              log.info("封裝用戶信息返回給前端!");
          }

          假如查詢用戶角色,用戶菜單,用戶余額,用戶積分分別耗時(shí)500,200,200,100毫秒,則登錄接口耗時(shí)為1秒。如果采用異步(多線程并行)形式,則登錄接口耗時(shí)以單個(gè)查詢最慢的任務(wù)為主,為查詢用戶角色信息500毫秒。相當(dāng)于登錄接口性能提升一倍!查詢?nèi)蝿?wù)越多,則其性能提升越大!

          代碼演示(串行):

          @Test
          public void login() throws InterruptedException {

              long startTime = System.currentTimeMillis();
              log.info("開始查詢用戶全部信息!");

              log.info("開始查詢用戶角色信息!");
              Thread.sleep(500);
              String role = "管理員";
              log.info("開始查詢用戶菜單信息!");
              Thread.sleep(200);
              String menu = "首頁,賬戶管理,積分管理";
              log.info("開始查詢查詢用戶余額信息!");
              Thread.sleep(200);
              Integer amount = 1999;
              log.info("開始查詢查詢查詢用戶積分信息!");
              Thread.sleep(100);
              Integer integral = 1015;

              log.info("封裝用戶信息返回給前端!");
              log.info("查詢用戶全部信息總耗時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
          }

          結(jié)果:

          圖片

          代碼演示(異步):

          @Test
          public void asyncLogin() {
              long startTime = System.currentTimeMillis();
              log.info("開始查詢用戶角色信息!");
              CompletableFuture<Map<String, Object>> roleFuture = CompletableFuture.supplyAsync(() -> {
                  try {
                      Thread.sleep(500);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  Map<String, Object> roleMap = new HashMap<String, Object>();
                  roleMap.put("role""管理員");
                  long endTime = System.currentTimeMillis();
                  log.info("查詢用戶角色信息耗時(shí):" + (endTime - startTime)  + "毫秒");
                  return roleMap;
              });

              log.info("開始查詢用戶菜單信息!");
              CompletableFuture<Map<String, Object>> menuFuture = CompletableFuture.supplyAsync(() -> {
                  try {
                      Thread.sleep(200);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  Map<String, Object> menuMap = new HashMap<String, Object>();
                  menuMap.put("menu""首頁,賬戶管理,積分管理");
                  long endTime = System.currentTimeMillis();
                  log.info("查詢用戶菜單信息耗時(shí):" + (endTime - startTime)  + "毫秒");
                  return menuMap;
              });

              log.info("開始查詢用戶余額信息!");
              CompletableFuture<Map<String, Object>> amountFuture = CompletableFuture.supplyAsync(() -> {
                  try {
                      Thread.sleep(200);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  Map<String, Object> amountMap = new HashMap<String, Object>();
                  amountMap.put("amount", 1999);
                  long endTime = System.currentTimeMillis();
                  log.info("查詢用戶余額信息耗時(shí):" + (endTime - startTime)  + "毫秒");
                  return amountMap;
              });

              log.info("開始查詢用戶積分信息!");
              CompletableFuture<Map<String, Object>> integralFuture = CompletableFuture.supplyAsync(() -> {
                  try {
                      Thread.sleep(200);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  Map<String, Object> integralMap = new HashMap<String, Object>();
                  integralMap.put("integral", 1015);
                  long endTime = System.currentTimeMillis();
                  log.info("查詢用戶積分信息耗時(shí):" + (endTime - startTime)  + "毫秒");
                  return integralMap;
              });

              roleFuture.join();
              menuFuture.join();
              amountFuture.join();
              integralFuture.join();
              log.info("查詢用戶全部信息總耗時(shí):" + (System.currentTimeMillis() - startTime)  + "毫秒");

          }

          結(jié)果:

          圖片

          直觀的可以看出,異步執(zhí)行的優(yōu)勢!

          回顧Future

          Future是什么?

          • Java 1.5中引入Callable解決多線程執(zhí)行無返回值的問題。
          • Future是為了配合Callable/Runnable而產(chǎn)生的。簡單來講,我們可以通過future來對任務(wù)查詢、取消、執(zhí)行結(jié)果的獲取,是調(diào)用方與異步執(zhí)行方之間溝通的橋梁。
          • FutureTask實(shí)現(xiàn)了RunnableFuture接口,同時(shí)具有Runnable、Future的能力,即既可以作為Future得到Callable的返回值,又可以作為一個(gè)Runnable。
          • CompletableFuture實(shí)現(xiàn)了Futrue接口。
          • Future是Java5新加的一個(gè)接口,它提供了一種異步并行計(jì)算的功能。如果主線程需要執(zhí)行一個(gè)很耗時(shí)的計(jì)算任務(wù),我們可以將這個(gè)任務(wù)通過Future放到異步線程中去執(zhí)行。主線程繼續(xù)處理其他任務(wù),處理完成后,再通過Future獲取計(jì)算結(jié)果。
          • Future可以在連續(xù)流程中滿足數(shù)據(jù)驅(qū)動(dòng)的并發(fā)需求,既獲得了并發(fā)執(zhí)行的性能提升,又不失連續(xù)流程的簡潔優(yōu)雅。

          代碼演示(不使用自定義線程池):

          @Test
          public void callable() throws ExecutionException, InterruptedException {
              long startTime = System.currentTimeMillis();

              Callable amountCall = new Callable() {
                  @Override
                  public Object call() throws Exception {
                      long startTime = System.currentTimeMillis();
                      Thread.sleep(6000);
                      Map<String, Object> amountMap = new HashMap<String, Object>();
                      amountMap.put("amount", 99);
                      long endTime = System.currentTimeMillis();
                      log.info("查詢金額信息耗時(shí):" + (endTime - startTime) / 1000 + "秒");
                      return amountMap;
                  }
              };

              FutureTask<Map> amountFuture = new FutureTask<>(amountCall);
              new Thread(amountFuture).start();

              Callable roleCall = new Callable() {
                  @Override
                  public Object call() throws Exception {
                      long startTime = System.currentTimeMillis();
                      Thread.sleep(5000);
                      Map<String, String> roleMap = new HashMap<String, String>();
                      roleMap.put("name""管理員");
                      long endTime = System.currentTimeMillis();
                      log.info("查詢角色信息耗時(shí):" + (endTime - startTime) / 1000 + "秒");
                      return roleMap;
                  }
              };

              FutureTask<Map> roleFuture = new FutureTask<>(roleCall);
              new Thread(roleFuture).start();

              log.info("金額查詢結(jié)果為:" + amountFuture.get());
              log.info("角色查詢結(jié)果為:" + roleFuture.get());

              long endTime = System.currentTimeMillis();
              log.info("總耗時(shí):" + (endTime - startTime) / 1000 + "秒");

          }

          圖片

          這里要注意:Future對于結(jié)果的獲取,不是很友好,只能通過阻塞或者輪詢的方式得到任務(wù)的結(jié)果。Future.get() 就是阻塞調(diào)用,在線程獲取結(jié)果之前get方法會(huì)一直阻塞;Future提供了一個(gè)isDone方法,可以在程序中輪詢這個(gè)方法查詢執(zhí)行結(jié)果。

          這里的 amountFuture.get()如果放到如下圖所示的位置,則amountFuture下面的線程將等amountFuture.get()完成后才能執(zhí)行,沒有執(zhí)行完,則一直阻塞。

          圖片

          結(jié)果:

          圖片

          代碼演示(使用自定義線程池):

          @Test
          public void executor() throws ExecutionException, InterruptedException {
              long startTime = System.currentTimeMillis();

              ExecutorService executor = Executors.newFixedThreadPool(2);

              Callable amountCall = new Callable() {
                  @Override
                  public Object call() throws Exception {
                      long startTime = System.currentTimeMillis();
                      Thread.sleep(6000);
                      Map<String, Object> amountMap = new HashMap<String, Object>();
                      amountMap.put("amount", 99);
                      long endTime = System.currentTimeMillis();
                      log.info("查詢金額信息耗時(shí):" + (endTime - startTime) / 1000 + "秒");
                      return amountMap;
                  }
              };

              Callable roleCall = new Callable() {
                  @Override
                  public Object call() throws Exception {
                      long startTime = System.currentTimeMillis();
                      Thread.sleep(5000);
                      Map<String, String> roleMap = new HashMap<String, String>();
                      roleMap.put("name""管理員");
                      long endTime = System.currentTimeMillis();
                      log.info("查詢用戶角色信息耗時(shí):" + (endTime - startTime) / 1000 + "秒");
                      return roleMap;
                  }
              };

              Future amountFuture = executor.submit(amountCall);
              Future roleFuture = executor.submit(roleCall);

              log.info("金額查詢結(jié)果為:" + amountFuture.get());
              log.info("角色查詢結(jié)果為:" + roleFuture.get());

              long endTime = System.currentTimeMillis();
              log.info("總耗時(shí):" + (endTime - startTime) / 1000 + "秒");

          }

          結(jié)果:

          圖片

          CompletableFuture使用場景

          圖片

          創(chuàng)建異步任務(wù)

          圖片

          CompletableFuture創(chuàng)建異步任務(wù),一般有supplyAsync和runAsync兩個(gè)方法:

          • supplyAsync執(zhí)行CompletableFuture任務(wù),支持返回值。
          • runAsync執(zhí)行CompletableFuture任務(wù),沒有返回值。

          supplyAsync方法

          //使用默認(rèn)內(nèi)置線程池ForkJoinPool.commonPool(),根據(jù)supplier構(gòu)建執(zhí)行任務(wù)
          public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
          //自定義線程,根據(jù)supplier構(gòu)建執(zhí)行任務(wù)
          public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

          runAsync方法

          //使用默認(rèn)內(nèi)置線程池ForkJoinPool.commonPool(),根據(jù)runnable構(gòu)建執(zhí)行任務(wù)
          public static CompletableFuture<Void> runAsync(Runnable runnable) 
          //自定義線程,根據(jù)runnable構(gòu)建執(zhí)行任務(wù)
          public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)

          代碼演示:

           @Test
              // supplyAsync執(zhí)行CompletableFuture任務(wù),支持返回值
              public void defaultSupplyAsync() throws ExecutionException, InterruptedException {
                  long startTime = System.currentTimeMillis();
                  // 構(gòu)建執(zhí)行任務(wù)
                  CompletableFuture<Map<String, Object>> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
                      try {
                          Thread.sleep(6000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      Map<String, Object> amountMap = new HashMap<String, Object>();
                      amountMap.put("amount", 99);
                      long endTime = System.currentTimeMillis();
                      log.info("查詢金額信息耗時(shí):" + (endTime - startTime) / 1000 + "秒");
                      return amountMap;
                  });
          //        這行代碼在這里 則會(huì)進(jìn)行6秒的阻塞 下面代碼其他線程無法創(chuàng)建
          //        只能等這個(gè)線程6秒過后結(jié)束才能創(chuàng)建其他線程
          //        Map<String, Object> userMap = userCompletableFuture.get();
                  CompletableFuture<Map<String, Object>> roleCompletableFuture = CompletableFuture.supplyAsync(() -> {
                      try {
                          Thread.sleep(5000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      Map<String, Object> roleMap = new HashMap<String, Object>();
                      roleMap.put("name""管理員");
                      return roleMap;
                  });

                  log.info("金額查詢結(jié)果為:" + amountCompletableFuture.join());
                  log.info("角色查詢結(jié)果為:" + roleCompletableFuture.join());

                  log.info("總耗時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
              }

              @Test
              // supplyAsync執(zhí)行CompletableFuture任務(wù),支持返回值
              public void customSupplyAsync() throws ExecutionException, InterruptedException {
                  // 自定義線程池
                  ExecutorService executorService = Executors.newCachedThreadPool();
                  long startTime = System.currentTimeMillis();
                  CompletableFuture<Map<String, Object>> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
                      try {
                          Thread.sleep(6000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      Map<String, Object> amountMap = new HashMap<String, Object>();
                      amountMap.put("amount", 99);
                      long endTime = System.currentTimeMillis();
                      log.info("查詢金額信息耗時(shí):" + (endTime - startTime) / 1000 + "秒");
                      return amountMap;
                  }, executorService);
          //        這行代碼在這里 則會(huì)進(jìn)行6秒的阻塞 下面代碼其他線程無法創(chuàng)建
          //        只能等這個(gè)線程6秒過后結(jié)束才能創(chuàng)建其他線程
          //        Map<String, Object> userMap = userCompletableFuture.get();
                  CompletableFuture<Map<String, Object>> roleCompletableFuture = CompletableFuture.supplyAsync(() -> {
                      try {
                          Thread.sleep(5000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      Map<String, Object> roleMap = new HashMap<String, Object>();
                      roleMap.put("name""管理員");
                      return roleMap;
                  }, executorService);
                  log.info("金額查詢結(jié)果為:" + amountCompletableFuture.join());
                  log.info("角色查詢結(jié)果為:" + roleCompletableFuture.join());
                  // 線程池需要關(guān)閉
                  executorService.shutdown();
                  log.info("總耗時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
              }

           @Test
              // runAsync執(zhí)行CompletableFuture任務(wù),沒有返回值
              public void defaultRunAsync() {

                  long lordStartTime = System.currentTimeMillis();

                  CompletableFuture<Void> amountCompletableFuture = CompletableFuture.runAsync(() -> {
                      long startTime = System.currentTimeMillis();
                      try {
                          Thread.sleep(3000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      log.info("執(zhí)行金額增刪改操作用時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
                  });

                  CompletableFuture<Void> roleCompletableFuture = CompletableFuture.runAsync(() -> {
                      long startTime = System.currentTimeMillis();
                      try {
                          Thread.sleep(4000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      log.info("執(zhí)行角色增刪改操作用時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
                  });
                  log.info("金額查詢結(jié)果為:" + amountCompletableFuture.join());
                  log.info("角色查詢結(jié)果為:" + roleCompletableFuture.join());

                  log.info("總耗時(shí):" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
              }

              @Test
              // runAsync執(zhí)行CompletableFuture任務(wù),沒有返回值
              public void customRunAsync() {

                  long lordStartTime = System.currentTimeMillis();
                  ExecutorService executor = Executors.newCachedThreadPool();

                  CompletableFuture<Void> amountCompletableFuture = CompletableFuture.runAsync(() -> {
                      long startTime = System.currentTimeMillis();
                      try {
                          Thread.sleep(2000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      log.info("執(zhí)行金額增刪改操作用時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
                  }, executor);

                  CompletableFuture<Void> roleCompletableFuture = CompletableFuture.runAsync(() -> {
                      long startTime = System.currentTimeMillis();
                      try {
                          Thread.sleep(1000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      log.info("執(zhí)行角色增刪改操作用時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
                  }, executor);
                  log.info("金額查詢結(jié)果為:" + amountCompletableFuture.join());
                  log.info("角色查詢結(jié)果為:" + roleCompletableFuture.join());

                  // 關(guān)閉線程池
                  executor.shutdown();
                  log.info("總耗時(shí):" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
              }

          注意:這里的get()與join()都是獲取任務(wù)線程的返回值。join()方法拋出的是uncheck異常(即RuntimeException),不會(huì)強(qiáng)制開發(fā)者拋出, 會(huì)將異常包裝成CompletionException異常 /CancellationException異常,但是本質(zhì)原因還是代碼內(nèi)存在的真正的異常;

          get()方法拋出的是經(jīng)過檢查的異常,ExecutionException, InterruptedException 需要用戶手動(dòng)處理(拋出或者 try catch)。

          異步任務(wù)回調(diào)

          圖片

          thenRun / thenRunAsync

          CompletableFuture的thenRun方法,通俗點(diǎn)講就是,做完第一個(gè)任務(wù)后,再做第二個(gè)任務(wù)。某個(gè)任務(wù)執(zhí)行完成后,執(zhí)行回調(diào)方法;但是前后兩個(gè)任務(wù)沒有參數(shù)傳遞,第二個(gè)任務(wù)也沒有返回值。

          public CompletableFuture<Void> thenRun(Runnable action);
          public CompletableFuture<Void> thenRunAsync(Runnable action);

          thenRun / thenRunAsync的區(qū)別? 源碼解釋:

          private static final Executor asyncPool = useCommonPool ?
              ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
              
          public CompletableFuture<Void> thenRun(Runnable action) {
              return uniRunStage(null, action);
          }

          public CompletableFuture<Void> thenRunAsync(Runnable action) {
              return uniRunStage(asyncPool, action);
          }

          如果你執(zhí)行第一個(gè)任務(wù)的時(shí)候,傳入了一個(gè)自定義線程池:

          • 調(diào)用thenRun方法執(zhí)行第二個(gè)任務(wù)時(shí),則第二個(gè)任務(wù)和第一個(gè)任務(wù)是共用同一個(gè)線程池。
          • 調(diào)用thenRunAsync執(zhí)行第二個(gè)任務(wù)時(shí),則第一個(gè)任務(wù)使用的是你自己傳入的線程池,第二個(gè)任務(wù)使用的是ForkJoin線程池。

          后面介紹的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它們之間的區(qū)別也是這個(gè)哈!

          代碼演示:

          @Test
          // 執(zhí)行第一個(gè)任務(wù)后 可以繼續(xù)執(zhí)行第二個(gè)任務(wù) 兩個(gè)任務(wù)之間無傳參 無返回值
          public void defaultThenRun() throws ExecutionException, InterruptedException {
              long lordStartTime = System.currentTimeMillis();
              CompletableFuture<Void> amountCompletableFuture = CompletableFuture.runAsync(() -> {
                  long startTime = System.currentTimeMillis();
                  try {
                      Thread.sleep(2000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  log.info("執(zhí)行金額增刪改操作用時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
              });
              CompletableFuture<Void> thenCompletableFuture = amountCompletableFuture.thenRun(() -> {
                  long startTime = System.currentTimeMillis();
                  try {
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  log.info("執(zhí)行角色增刪改操作用時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
              });
              thenCompletableFuture.get();
              log.info("總耗時(shí):" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");

          }

          結(jié)果:

          圖片

          thenAccept / thenAcceptAsync

          CompletableFuture的thenAccept方法表示,第一個(gè)任務(wù)執(zhí)行完成后,執(zhí)行第二個(gè)回調(diào)方法任務(wù),會(huì)將該任務(wù)的執(zhí)行結(jié)果,作為入?yún)ⅲ瑐鬟f到回調(diào)方法中,但是回調(diào)方法是沒有返回值的。

          public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
          public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);

          代碼演示:

          @Test
          // 執(zhí)行第一個(gè)任務(wù)后 可以繼續(xù)執(zhí)行第二個(gè)任務(wù) 并攜帶第一個(gè)任務(wù)的返回值 第二個(gè)任務(wù)執(zhí)行完沒有返回值
          public void defaultThenAccept() throws ExecutionException, InterruptedException {
              long lordStartTime = System.currentTimeMillis();
              CompletableFuture<Map> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
                  long startTime = System.currentTimeMillis();
                  try {
                      Thread.sleep(2000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  Map<String, Object> amountMap = new HashMap<String, Object>();
                  amountMap.put("amount", 90);
                  log.info("執(zhí)行金額查詢操作用時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
                  return amountMap;
              });
              CompletableFuture<Void> thenCompletableFuture = amountCompletableFuture.thenAccept((map) -> {
                  long startTime = System.currentTimeMillis();
                  if (Integer.parseInt(map.get("amount").toString()) > 90) {
                      try {
                          Thread.sleep(1000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      log.info("金額充足,可以購買!:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
                  } else {
                      log.info("金額不足,無法購買!:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
                  }
              });
              thenCompletableFuture.get();
              log.info("總耗時(shí):" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");

          }

          結(jié)果:

          圖片

          thenApply / thenApplyAsync

          CompletableFuture的thenApply方法表示,第一個(gè)任務(wù)執(zhí)行完成后,執(zhí)行第二個(gè)回調(diào)方法任務(wù),會(huì)將該任務(wù)的執(zhí)行結(jié)果,作為入?yún)ⅲ瑐鬟f到回調(diào)方法中,并且回調(diào)方法是有返回值的。

          public <U> CompletableFuture<U> thenApplyAsync();
          public CompletableFuture<Void> thenAccept(Consumer<? super T> action);

          代碼演示:

          @Test
          // 執(zhí)行第一個(gè)任務(wù)后 可以繼續(xù)執(zhí)行第二個(gè)任務(wù) 并攜帶第一個(gè)任務(wù)的返回值 第二個(gè)任務(wù)執(zhí)行完有返回值
          public void defaultThenApply() throws ExecutionException, InterruptedException {
              long lordStartTime = System.currentTimeMillis();
              CompletableFuture<Map> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
                  long startTime = System.currentTimeMillis();
                  try {
                      Thread.sleep(2000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  Map<String, Object> amountMap = new HashMap<String, Object>();
                  amountMap.put("amount", 90);
                  log.info("執(zhí)行金額查詢操作用時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
                  return amountMap;
              });
              CompletableFuture<Integer> thenCompletableFuture = amountCompletableFuture.thenApply((map) -> {
                  int number = 0;
                  if (Integer.parseInt(map.get("amount").toString()) > 3) {
                      try {
                          Thread.sleep(1000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      // 可口可樂3元一瓶 看金額一共能購買多少瓶
                      number = Integer.parseInt(map.get("amount").toString()) / 3;
                  }
                  return number;
              });

              log.info("當(dāng)前金額一共可以買" + thenCompletableFuture.get() + "瓶可口可樂!");
              Integer integer = thenCompletableFuture.get();
              log.info("總耗時(shí):" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
          }

          結(jié)果:

          圖片

          exceptionally

          CompletableFuture的exceptionally方法表示,某個(gè)任務(wù)執(zhí)行異常時(shí),執(zhí)行的回調(diào)方法;并且有拋出異常作為參數(shù),傳遞到回調(diào)方法。

          public CompletableFuture<T> exceptionally(
              Function<Throwable, ? extends T> fn) {
              return uniExceptionallyStage(fn);
          }

          代碼演示:

           @Test
          // 某個(gè)任務(wù)執(zhí)行異常時(shí),執(zhí)行的回調(diào)方法;并且有拋出異常作為參數(shù),傳遞到回調(diào)方法。
          public void exceptionally() throws ExecutionException, InterruptedException {
              long lordStartTime = System.currentTimeMillis();
              CompletableFuture<Map> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
                  long startTime = System.currentTimeMillis();
                  try {
                      Thread.sleep(2000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  Map<String, Object> amountMap = new HashMap<String, Object>();
                  amountMap.put("amount", 90);
                  log.info("執(zhí)行金額查詢操作用時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
                  return amountMap;
              });
              CompletableFuture<Integer> thenCompletableFuture = amountCompletableFuture.thenApply((map) -> {
                  int number = 0;
                  if (Integer.parseInt(map.get("amount").toString()) > 3) {
                      try {
                          Thread.sleep(1000);
                          // 可口可樂3元一瓶 看金額一共能購買多少瓶
                          number = Integer.parseInt(map.get("amount").toString()) / 0;
                      } catch (ArithmeticException | InterruptedException e) {
                          e.printStackTrace();
                          throw new ArithmeticException(); // 這里一定要將異常拋除了,不然exceptionally無效
                      }
                  }
                  return number;
              });

              CompletableFuture<Integer> exceptionFuture = thenCompletableFuture.exceptionally((e) -> {
                  log.error("除數(shù)為0,則默認(rèn)商為0!");
                  return 0;
              });
              log.info("當(dāng)前金額一共可以買" + thenCompletableFuture.get() + "瓶可口可樂!");
              exceptionFuture.get();
              log.info("總耗時(shí):" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
          }

          圖片

          注意:這里的異常一定要拋出來,不然exceptionally無效!

          whenComplete

          CompletableFuture的whenComplete方法表示,某個(gè)任務(wù)執(zhí)行完成后,執(zhí)行的回調(diào)方法,無返回值;并且whenComplete方法返回的CompletableFuture的result是上個(gè)任務(wù)的結(jié)果。

          public CompletableFuture<T> whenComplete(
              BiConsumer<? super T, ? super Throwable> action) {
              return uniWhenCompleteStage(null, action);
          }

          代碼演示:

          @Test
          // 某個(gè)任務(wù)執(zhí)行完成后,執(zhí)行的回調(diào)方法,無返回值;并且whenComplete方法返回的CompletableFuture的result是上個(gè)任務(wù)的結(jié)果。
          public void whenComplete() {
              CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
                  return "周杰倫";
              });
              CompletableFuture<String> stringCompletableFuture1 = stringCompletableFuture.whenComplete((a, throwable) -> {
                  log.info("周杰倫喜歡唱");
              });
              log.info("輸出結(jié)果為第一個(gè)任務(wù):" + stringCompletableFuture1.join());
          }

          結(jié)果:

          圖片

          handle

          CompletableFuture的handle方法表示,某個(gè)QQ賬號買號平臺地圖任務(wù)執(zhí)行完成后,執(zhí)行回調(diào)方法,并且是有返回值的;并且handle方法返回的CompletableFuture的result是回調(diào)方法執(zhí)行的結(jié)果。

          public <U> CompletableFuture<U> handle(
              BiFunction<? super T, Throwable, ? extends U> fn) {
              return uniHandleStage(null, fn);
          }

          代碼演示:

          @Test
          //    某個(gè)任務(wù)執(zhí)行完成后,執(zhí)行的回調(diào)方法,有返回值;并且handle方法返回的CompletableFuture的result是第二個(gè)任務(wù)的結(jié)果。
          public void handle() {
              CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
                  return "周杰倫";
              });
              CompletableFuture<String> stringCompletableFuture1 = stringCompletableFuture.handle((a, throwable) -> {
                  return "周杰倫喜歡唱歌!";
              });
              log.info("輸出結(jié)果為第二個(gè)任務(wù):" + stringCompletableFuture1.join());
          }

          結(jié)果:

          圖片

          多個(gè)任務(wù)組合處理

          圖片

          AND組合關(guān)系

          thenCombine / thenAcceptBoth / runAfterBoth都表示:將兩個(gè)CompletableFuture組合起來,只有這兩個(gè)都正常執(zhí)行完了,才會(huì)執(zhí)行某個(gè)任務(wù)。

          • thenCombine:會(huì)將兩個(gè)任務(wù)的執(zhí)行結(jié)果作為方法入?yún)ⅲ瑐鬟f到指定方法中,且有返回值。
          • thenAcceptBoth: 會(huì)將兩個(gè)任務(wù)的執(zhí)行結(jié)果作為方法入?yún)ⅲ瑐鬟f到指定方法中,且無返回值。
          • runAfterBoth 不會(huì)把執(zhí)行結(jié)果當(dāng)做方法入?yún)ⅲ覜]有返回值。

          代碼演示:

          @Test
          public void thenCombine() {
              CompletableFuture<Integer> first = CompletableFuture.supplyAsync(() -> {
                  return 7;
              });
              CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 2).thenCombine(first, Integer::sum);
              log.info("結(jié)果為:" + second.join());
          }

          結(jié)果為:

          圖片

          OR組合關(guān)系

          applyToEither / acceptEither / runAfterEither 都表示:將兩個(gè)CompletableFuture組合起來,只要其中一個(gè)執(zhí)行完了,就會(huì)執(zhí)行某個(gè)任務(wù)。

          • applyToEither:會(huì)將已經(jīng)執(zhí)行完成的任務(wù),作為方法入?yún)ⅲ瑐鬟f到指定方法中,且有返回值。
          • acceptEither: 會(huì)將已經(jīng)執(zhí)行完成的任務(wù),作為方法入?yún)ⅲ瑐鬟f到指定方法中,且無返回值。
          • runAfterEither:不會(huì)把執(zhí)行結(jié)果當(dāng)做方法入?yún)ⅲ覜]有返回值。

          代碼演示:

          @Test
          public void applyToEither1() {
              log.info("魏凱下班準(zhǔn)備回家。。。");
              log.info("魏凱等待2號,4號地鐵。。。");
              CompletableFuture<String> busCF = CompletableFuture.supplyAsync(() -> {
                  log.info("2號在路上。。。");
                  try {
                      Thread.sleep(3000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  return "2";
              }).applyToEither(CompletableFuture.supplyAsync(() -> {
                  log.info("4號地鐵在路上。。。");
                  try {
                      Thread.sleep(4000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  return "4";
              }), first -> first + "號");
              log.info("魏凱坐上" + busCF.join() + "地鐵");
          }

          @Test
          // OR
          public void applyToEither() {
              CompletableFuture<Integer> first = CompletableFuture.supplyAsync(() -> {
                  try {
                      Thread.sleep(2000L);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  return 7;
              });
              CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> {
                  try {
                      Thread.sleep(3000L);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  return 7;
              }).applyToEither(first, num -> num);
              log.info("最后結(jié)果為:" + second.join());
          }

          結(jié)果演示:

          圖片

          AllOf

          所有任務(wù)都執(zhí)行完成后,才執(zhí)行 allOf返回的CompletableFuture。如果任意一個(gè)任務(wù)異常,allOf的CompletableFuture,執(zhí)行g(shù)et方法,會(huì)拋出異常。

          代碼演示:

          @Test
          // 所有任務(wù)都執(zhí)行完成后,才執(zhí)行 allOf返回的CompletableFuture。如果任意一個(gè)任務(wù)異常,allOf的CompletableFuture,執(zhí)行g(shù)et方法,會(huì)拋出異常。
          // 這里第一次執(zhí)行沒有睡眠的話,是可以直接執(zhí)行第三個(gè)任務(wù)的。如果有睡眠,則需要手動(dòng)join啟動(dòng)。
          public void allOf() {
              CompletableFuture<Void> first = CompletableFuture.runAsync(() -> {
          //            try {
          //                Thread.sleep(2000);
          //            } catch (InterruptedException e) {
          //                e.printStackTrace();
          //            }
                  log.info("第一個(gè)任務(wù)執(zhí)行完成!");
              });
              CompletableFuture<Void> second = CompletableFuture.runAsync(() -> {
          //            try {
          //                Thread.sleep(500);
          //            } catch (InterruptedException e) {
          //                e.printStackTrace();
          //            }
                  log.info("第二個(gè)任務(wù)執(zhí)行完成!");
              });
              CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(first, second).whenComplete((m, n) -> {
                  log.info("第三個(gè)任務(wù)完成!");
              });
          //        voidCompletableFuture.join();
          }

          結(jié)果:

          圖片

          注意:這里第一次啟動(dòng)執(zhí)行沒有睡眠的話,是可以直接執(zhí)行第三個(gè)任務(wù)的,因?yàn)檫@兩個(gè)任務(wù)都執(zhí)行完成,啟動(dòng)的瞬間第三個(gè)也同時(shí)執(zhí)行完。如果有睡眠,則需要手動(dòng)join啟動(dòng),等待最長睡眠任務(wù)時(shí)間過后,第三個(gè)任務(wù)完成!

          AnyOf

          任意一個(gè)任務(wù)執(zhí)行完,就執(zhí)行anyOf返回的CompletableFuture。如果執(zhí)行的任務(wù)異常,anyOf的CompletableFuture,執(zhí)行g(shù)et方法,會(huì)拋出異常。

          代碼演示:

          @Test
          // 前提任務(wù)任意執(zhí)行完一個(gè),則目標(biāo)任務(wù)執(zhí)行。其他前提任務(wù)則不在執(zhí)行。
          // 任意一個(gè)任務(wù)執(zhí)行完,就執(zhí)行anyOf返回的CompletableFuture。如果執(zhí)行的任務(wù)異常,anyOf的CompletableFuture,執(zhí)行g(shù)et方法,會(huì)拋出異常。
          public void anyOf() {
              CompletableFuture<Void> first = CompletableFuture.runAsync(() -> {
                  try {
                      Thread.sleep(2000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  log.info("第一個(gè)任務(wù)執(zhí)行完成!");
              });
              CompletableFuture<Void> second = CompletableFuture.runAsync(() -> {
                  try {
                      Thread.sleep(500);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  log.info("第二個(gè)任務(wù)執(zhí)行完成!");
              });
              CompletableFuture<Object> voidCompletableFuture = CompletableFuture.anyOf(first, second).whenComplete((m, n) -> {
                  log.info("第三個(gè)任務(wù)完成!");
              });
              voidCompletableFuture.join();
          }

          結(jié)果:

          圖片

          thenCompose

          thenCompose方法會(huì)在某個(gè)任務(wù)執(zhí)行完成后,將該任務(wù)的執(zhí)行結(jié)果,作為方法入?yún)?去執(zhí)行指定的方法。該方法會(huì)返回一個(gè)新的CompletableFuture實(shí)例。

          • 如果該CompletableFuture實(shí)例的result不為null,則返回一個(gè)基于該result新的CompletableFuture實(shí)例。
          • 如果該CompletableFuture實(shí)例為null,然后就執(zhí)行這個(gè)新任務(wù)。

          代碼演示:

          @Test
          public void thenCompose1() {
              CompletableFuture<Integer> stringCompletableFuture = CompletableFuture.supplyAsync(() -> 4)
                      .thenCompose(value -> CompletableFuture.supplyAsync(() -> {
                          // thenCompose方法返回一個(gè)新的CompletableFuture
                          if (Integer.valueOf(4).equals(value)) {
                              return 66;
                          } else {
                              return 99;
                          }
                      }));
              log.info("結(jié)果:" + stringCompletableFuture.join());

          }

          @Test
          public void thenCompose() {
              CompletableFuture<String> first = CompletableFuture.completedFuture("第一個(gè)任務(wù)");
              CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> 4)
                      .thenCompose((data) -> {
                          log.info("data為:" + data);
                          return first;
                      });
              log.info("結(jié)果:" + stringCompletableFuture.join());

          }

          結(jié)果:

          圖片

          CompletableFuture注意點(diǎn)

          圖片

          Future需要獲取返回值,才能獲取異常信息

            @Test
              public void futureTest(){
                  ExecutorService executor = Executors.newFixedThreadPool(2);

                  CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
                      int m = 9;
                      int n = 0;
                      return m / n;
                  },executor);
          //        integerCompletableFuture.join(); // 這行代碼不加,則不會(huì)拋出異常
              }

          Future需要獲取返回值,才能獲取到異常信息。如果不加 get()/join()方法,看不到異常信息。小伙伴們使用的時(shí)候,注意一下哈,考慮是否加try…catch…或者使用exceptionally方法。

          CompletableFuture的get()方法是阻塞的

          //反例
           CompletableFuture.get();
          //正例
          CompletableFuture.get(9, TimeUnit.SECONDS);

          CompletableFuture的get()方法是阻塞的,如果使用它來獲取異步調(diào)用的返回值,需要添加超時(shí)時(shí)間

          默認(rèn)線程池的注意點(diǎn)

          CompletableFuture代碼中又使用了默認(rèn)的線程池,處理的線程個(gè)數(shù)是電腦CPU核數(shù)-1。在大量請求過來的時(shí)候,處理邏輯復(fù)雜的話,響應(yīng)會(huì)很慢。一般建議使用自定義線程池,優(yōu)化線程池配置參數(shù)。

          自定義線程池時(shí),注意飽和策略

          CompletableFuture的get()方法是阻塞的,我們一般建議使用future.get(3, TimeUnit.SECONDS)。并且一般建議使用自定義線程池。但是如果線程池拒絕策略是DiscardPolicy或者DiscardOldestPolicy,當(dāng)線程池飽和時(shí),會(huì)直接丟棄任務(wù),不會(huì)拋棄異常。

          因此建議,CompletableFuture線程池策略最好使用AbortPolicy,然后耗時(shí)的異步線程,做好線程池隔離!

          ?? 歡迎加入小哈的星球 ,你將獲得: 專屬的項(xiàng)目實(shí)戰(zhàn) / Java 學(xué)習(xí)路線 / 一對一提問 / 學(xué)習(xí)打卡 /  贈(zèng)書福利


          全棧前后端分離博客項(xiàng)目 2.0 版本完結(jié)啦, 演示鏈接http://116.62.199.48/ ,新項(xiàng)目正在醞釀中。全程手摸手,后端 + 前端全棧開發(fā),從 0 到 1 講解每個(gè)功能點(diǎn)開發(fā)步驟,1v1 答疑,直到項(xiàng)目上線。目前已更新了239小節(jié),累計(jì)38w+字,講解圖:1645張,還在持續(xù)爆肝中.. 后續(xù)還會(huì)上新更多項(xiàng)目,目標(biāo)是將Java領(lǐng)域典型的項(xiàng)目都整一波,如秒殺系統(tǒng), 在線商城, IM即時(shí)通訊,Spring Cloud Alibaba 等等,戳我加入學(xué)習(xí),已有1300+小伙伴加入(早鳥價(jià)超低)



              
                 

          1. 我的私密學(xué)習(xí)小圈子~

          2. 分庫分表,可能真的要退出歷史舞臺了!

          3. Java與lua互相調(diào)用簡單教程

          4. 面試官:電商庫存扣減如何設(shè)計(jì)?如何防止超賣?

          最近面試BAT,整理一份面試資料Java面試BATJ通關(guān)手冊,覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫、數(shù)據(jù)結(jié)構(gòu)等等。

          獲取方式:點(diǎn)“在看”,關(guān)注公眾號并回復(fù) Java 領(lǐng)取,更多內(nèi)容陸續(xù)奉上。

          PS:因公眾號平臺更改了推送規(guī)則,如果不想錯(cuò)過內(nèi)容,記得讀完點(diǎn)一下在看,加個(gè)星標(biāo),這樣每次新文章推送才會(huì)第一時(shí)間出現(xiàn)在你的訂閱列表里。

          點(diǎn)“在看”支持小哈呀,謝謝啦

          瀏覽 754
          3點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          3點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  成人精品视频在线 | 欧美A片视频 | 美国一级A片在线 | 色综合久久88色综合天天看泰 | 成人电影91 |