<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          項目自從用了接口請求合并,效率直接加倍!

          共 31755字,需瀏覽 64分鐘

           ·

          2023-10-23 08:48

          胖虎和朋友原創(chuàng)的視頻教程有興趣的可以看看


          (文末附課程大綱)


          ??2024 最新,Java成神之路,架構視頻(點擊查看)


          ??超全技術棧的Java入門+進階+實戰(zhàn)!(點擊查看)


          請求合并到底有什么意義呢?我們來看下圖。

          圖片

          假設我們3個用戶(用戶id分別是1、2、3),現(xiàn)在他們都要查詢自己的基本信息,請求到服務器,服務器端請求數(shù)據(jù)庫,發(fā)出3次請求。我們都知道數(shù)據(jù)庫連接資源是相當寶貴的,那么我們怎么盡可能節(jié)省連接資源呢?

          這里把數(shù)據(jù)庫換成被調(diào)用的遠程服務,也是同樣的道理。

          我們改變下思路,如下圖所示。

          圖片

          我們在服務器端把請求合并,只發(fā)出一條SQL查詢數(shù)據(jù)庫,數(shù)據(jù)庫返回后,服務器端處理返回數(shù)據(jù),根據(jù)一個唯一請求ID,把數(shù)據(jù)分組,返回給對應用戶。

          技術手段

          • LinkedBlockQueue 阻塞隊列
          • ScheduledThreadPoolExecutor 定時任務線程池
          • CompleteableFuture future 阻塞機制(Java 8 的 CompletableFuture 并沒有 timeout 機制,后面優(yōu)化,使用了隊列替代)

          2024最新架構課程,對標培訓機構

          ??點擊查看:Java成神之路-進階架構視頻!

          代碼實現(xiàn)

          查詢用戶的代碼

          public interface UserService {

              Map<String, Users> queryUserByIdBatch(List<UserWrapBatchService.Request> userReqs);
          }
          @Service
          public class UserServiceImpl implements UserService {

              @Resource
              private UsersMapper usersMapper;

              @Override
              public Map<String, Users> queryUserByIdBatch(List<UserWrapBatchService.Request> userReqs) {
                  // 全部參數(shù)
                  List<Long> userIds = userReqs.stream().map(UserWrapBatchService.Request::getUserId).collect(Collectors.toList());
                  QueryWrapper<Users> queryWrapper = new QueryWrapper<>();
                  // 用in語句合并成一條SQL,避免多次請求數(shù)據(jù)庫的IO
                  queryWrapper.in("id", userIds);
                  List<Users> users = usersMapper.selectList(queryWrapper);
                  Map<Long, List<Users>> userGroup = users.stream().collect(Collectors.groupingBy(Users::getId));
                  HashMap<String, Users> result = new HashMap<>();
                  userReqs.forEach(val -> {
                      List<Users> usersList = userGroup.get(val.getUserId());
                      if (!CollectionUtils.isEmpty(usersList)) {
                          result.put(val.getRequestId(), usersList.get(0));
                      } else {
                          // 表示沒數(shù)據(jù)
                          result.put(val.getRequestId(), null);
                      }
                  });
                  return result;
              }
          }

          合并請求的實現(xiàn)

          package com.springboot.sample.service.impl;

          import com.springboot.sample.bean.Users;
          import com.springboot.sample.service.UserService;
          import org.springframework.stereotype.Service;

          import javax.annotation.PostConstruct;
          import javax.annotation.Resource;
          import java.util.*;
          import java.util.concurrent.*;

          /***
           * zzq
           * 包裝成批量執(zhí)行的地方
           * */
          @Service
          public class UserWrapBatchService {
              @Resource
              private UserService userService;

              /**
               * 最大任務數(shù)
               **/
              public static int MAX_TASK_NUM = 100;


              /**
               * 請求類,code為查詢的共同特征,例如查詢商品,通過不同id的來區(qū)分
               * CompletableFuture將處理結果返回
               */
              public class Request {
                  // 請求id 唯一
                  String requestId;
                  // 參數(shù)
                  Long userId;
                  //TODO Java 8 的 CompletableFuture 并沒有 timeout 機制
                  CompletableFuture<Users> completableFuture;

                  public String getRequestId() {
                      return requestId;
                  }

                  public void setRequestId(String requestId) {
                      this.requestId = requestId;
                  }

                  public Long getUserId() {
                      return userId;
                  }

                  public void setUserId(Long userId) {
                      this.userId = userId;
                  }

                  public CompletableFuture getCompletableFuture() {
                      return completableFuture;
                  }

                  public void setCompletableFuture(CompletableFuture completableFuture) {
                      this.completableFuture = completableFuture;
                  }
              }

              /*
              LinkedBlockingQueue是一個阻塞的隊列,內(nèi)部采用鏈表的結果,通過兩個ReenTrantLock來保證線程安全
              LinkedBlockingQueue與ArrayBlockingQueue的區(qū)別
              ArrayBlockingQueue默認指定了長度,而LinkedBlockingQueue的默認長度是Integer.MAX_VALUE,也就是無界隊列,在移除的速度小于添加的速度時,容易造成OOM。
              ArrayBlockingQueue的存儲容器是數(shù)組,而LinkedBlockingQueue是存儲容器是鏈表
              兩者的實現(xiàn)隊列添加或移除的鎖不一樣,ArrayBlockingQueue實現(xiàn)的隊列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個ReenterLock鎖,
              而LinkedBlockingQueue實現(xiàn)的隊列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊列的吞吐量,
              也意味著在高并發(fā)的情況下生產(chǎn)者和消費者可以并行地操作隊列中的數(shù)據(jù),以此來提高整個隊列的并發(fā)性能。
               */
              private final Queue<Request> queue = new LinkedBlockingQueue();

              @PostConstruct
              public void init() {
                  //定時任務線程池,創(chuàng)建一個支持定時、周期性或延時任務的限定線程數(shù)目(這里傳入的是1)的線程池
                  ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

                  scheduledExecutorService.scheduleAtFixedRate(() -> {
                      int size = queue.size();
                      //如果隊列沒數(shù)據(jù),表示這段時間沒有請求,直接返回
                      if (size == 0) {
                          return;
                      }
                      List<Request> list = new ArrayList<>();
                      System.out.println("合并了 [" + size + "] 個請求");
                      //將隊列的請求消費到一個集合保存
                      for (int i = 0; i < size; i++) {
                          // 后面的SQL語句是有長度限制的,所以還要做限制每次批量的數(shù)量,超過最大任務數(shù),等下次執(zhí)行
                          if (i < MAX_TASK_NUM) {
                              list.add(queue.poll());
                          }
                      }
                      //拿到我們需要去數(shù)據(jù)庫查詢的特征,保存為集合
                      List<Request> userReqs = new ArrayList<>();
                      for (Request request : list) {
                          userReqs.add(request);
                      }
                      //將參數(shù)傳入service處理, 這里是本地服務,也可以把userService 看成RPC之類的遠程調(diào)用
                      Map<String, Users> response = userService.queryUserByIdBatch(userReqs);
                      //將處理結果返回各自的請求
                      for (Request request : list) {
                          Users result = response.get(request.requestId);
                          request.completableFuture.complete(result);    //completableFuture.complete方法完成賦值,這一步執(zhí)行完畢,下面future.get()阻塞的請求可以繼續(xù)執(zhí)行了
                      }
                  }, 100, 10, TimeUnit.MILLISECONDS);
                  //scheduleAtFixedRate是周期性執(zhí)行 schedule是延遲執(zhí)行 initialDelay是初始延遲 period是周期間隔 后面是單位
                  //這里我寫的是 初始化后100毫秒后執(zhí)行,周期性執(zhí)行10毫秒執(zhí)行一次
              }

              public Users queryUser(Long userId) {
                  Request request = new Request();
                  // 這里用UUID做請求id
                  request.requestId = UUID.randomUUID().toString().replace("-""");
                  request.userId = userId;
                  CompletableFuture<Users> future = new CompletableFuture<>();
                  request.completableFuture = future;
                  //將對象傳入隊列
                  queue.offer(request);
                  //如果這時候沒完成賦值,那么就會阻塞,直到能夠拿到值
                  try {
                      return future.get();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  } catch (ExecutionException e) {
                      e.printStackTrace();
                  }
                  return null;
              }
          }

          控制層調(diào)用

          /***
           * 請求合并
           * */
          @RequestMapping("/merge")
          public Callable<Users> merge(Long userId) {
              return new Callable<Users>() {
                  @Override
                  public Users call() throws Exception {
                      return userBatchService.queryUser(userId);
                  }
              };
          }

          Callable是什么可以參考:

          https://blog.csdn.net/baidu_19473529/article/details/123596792

          模擬高并發(fā)查詢的代碼

          package com.springboot.sample;

          import org.springframework.web.client.RestTemplate;

          import java.util.Random;
          import java.util.concurrent.CountDownLatch;

          public class TestBatch {
              private static int threadCount = 30;

              private final static CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(threadCount); //為保證30個線程同時并發(fā)運行

              private static final RestTemplate restTemplate = new RestTemplate();

              public static void main(String[] args) {


                  for (int i = 0; i < threadCount; i++) {//循環(huán)開30個線程
                      new Thread(new Runnable() {
                          public void run() {
                              COUNT_DOWN_LATCH.countDown();//每次減一
                              try {
                                  COUNT_DOWN_LATCH.await(); //此處等待狀態(tài),為了讓30個線程同時進行
                              } catch (InterruptedException e) {
                                  e.printStackTrace();
                              }

                              for (int j = 1; j <= 3; j++) {
                                  int param = new Random().nextInt(4);
                                  if (param <=0){
                                      param++;
                                  }
                                  String responseBody = restTemplate.getForObject("http://localhost:8080/asyncAndMerge/merge?userId=" + param, String.class);
                                  System.out.println(Thread.currentThread().getName() + "參數(shù) " + param + " 返回值 " + responseBody);
                              }
                          }
                      }).start();

                  }
              }
          }

          測試效果

          圖片
          圖片

          要注意的問題

          • Java 8 的 CompletableFuture 并沒有 timeout 機制
          • 后面的SQL語句是有長度限制的,所以還要做限制每次批量的數(shù)量,超過最大任務數(shù),等下次執(zhí)行(本例中加了MAX_TASK_NUM判斷)

          使用隊列的超時解決Java 8 的 CompletableFuture 并沒有 timeout 機制

          核心代碼

          package com.springboot.sample.service.impl;

          import com.springboot.sample.bean.Users;
          import com.springboot.sample.service.UserService;
          import org.springframework.stereotype.Service;

          import javax.annotation.PostConstruct;
          import javax.annotation.Resource;
          import java.util.*;
          import java.util.concurrent.*;

          /***
           * zzq
           * 包裝成批量執(zhí)行的地方,使用queue解決超時問題
           * */
          @Service
          public class UserWrapBatchQueueService {
              @Resource
              private UserService userService;

              /**
               * 最大任務數(shù)
               **/
              public static int MAX_TASK_NUM = 100;


              /**
               * 請求類,code為查詢的共同特征,例如查詢商品,通過不同id的來區(qū)分
               * CompletableFuture將處理結果返回
               */
              public class Request {
                  // 請求id
                  String requestId;

                  // 參數(shù)
                  Long userId;
                  // 隊列,這個有超時機制
                  LinkedBlockingQueue<Users> usersQueue;


                  public String getRequestId() {
                      return requestId;
                  }

                  public void setRequestId(String requestId) {
                      this.requestId = requestId;
                  }

                  public Long getUserId() {
                      return userId;
                  }

                  public void setUserId(Long userId) {
                      this.userId = userId;
                  }

                  public LinkedBlockingQueue<Users> getUsersQueue() {
                      return usersQueue;
                  }

                  public void setUsersQueue(LinkedBlockingQueue<Users> usersQueue) {
                      this.usersQueue = usersQueue;
                  }
              }

              /*
              LinkedBlockingQueue是一個阻塞的隊列,內(nèi)部采用鏈表的結果,通過兩個ReenTrantLock來保證線程安全
              LinkedBlockingQueue與ArrayBlockingQueue的區(qū)別
              ArrayBlockingQueue默認指定了長度,而LinkedBlockingQueue的默認長度是Integer.MAX_VALUE,也就是無界隊列,在移除的速度小于添加的速度時,容易造成OOM。
              ArrayBlockingQueue的存儲容器是數(shù)組,而LinkedBlockingQueue是存儲容器是鏈表
              兩者的實現(xiàn)隊列添加或移除的鎖不一樣,ArrayBlockingQueue實現(xiàn)的隊列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個ReenterLock鎖,
              而LinkedBlockingQueue實現(xiàn)的隊列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊列的吞吐量,
              也意味著在高并發(fā)的情況下生產(chǎn)者和消費者可以并行地操作隊列中的數(shù)據(jù),以此來提高整個隊列的并發(fā)性能。
               */
              private final Queue<Request> queue = new LinkedBlockingQueue();

              @PostConstruct
              public void init() {
                  //定時任務線程池,創(chuàng)建一個支持定時、周期性或延時任務的限定線程數(shù)目(這里傳入的是1)的線程池
                  ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

                  scheduledExecutorService.scheduleAtFixedRate(() -> {
                      int size = queue.size();
                      //如果隊列沒數(shù)據(jù),表示這段時間沒有請求,直接返回
                      if (size == 0) {
                          return;
                      }
                      List<Request> list = new ArrayList<>();
                      System.out.println("合并了 [" + size + "] 個請求");
                      //將隊列的請求消費到一個集合保存
                      for (int i = 0; i < size; i++) {
                          // 后面的SQL語句是有長度限制的,所以還要做限制每次批量的數(shù)量,超過最大任務數(shù),等下次執(zhí)行
                          if (i < MAX_TASK_NUM) {
                              list.add(queue.poll());
                          }
                      }
                      //拿到我們需要去數(shù)據(jù)庫查詢的特征,保存為集合
                      List<Request> userReqs = new ArrayList<>();
                      for (Request request : list) {
                          userReqs.add(request);
                      }
                      //將參數(shù)傳入service處理, 這里是本地服務,也可以把userService 看成RPC之類的遠程調(diào)用
                      Map<String, Users> response = userService.queryUserByIdBatchQueue(userReqs);
                      for (Request userReq : userReqs) {
                          // 這里再把結果放到隊列里
                          Users users = response.get(userReq.getRequestId());
                          userReq.usersQueue.offer(users);
                      }

                  }, 100, 10, TimeUnit.MILLISECONDS);
                  //scheduleAtFixedRate是周期性執(zhí)行 schedule是延遲執(zhí)行 initialDelay是初始延遲 period是周期間隔 后面是單位
                  //這里我寫的是 初始化后100毫秒后執(zhí)行,周期性執(zhí)行10毫秒執(zhí)行一次
              }

              public Users queryUser(Long userId) {
                  Request request = new Request();
                  // 這里用UUID做請求id
                  request.requestId = UUID.randomUUID().toString().replace("-""");
                  request.userId = userId;
                  LinkedBlockingQueue<Users> usersQueue = new LinkedBlockingQueue<>();
                  request.usersQueue = usersQueue;
                  //將對象傳入隊列
                  queue.offer(request);
                  //取出元素時,如果隊列為空,給定阻塞多少毫秒再隊列取值,這里是3秒
                  try {
                      return usersQueue.poll(3000,TimeUnit.MILLISECONDS);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  return null;
              }
          }
          ...省略..

              @Override
              public Map<String, Users> queryUserByIdBatchQueue(List<UserWrapBatchQueueService.Request> userReqs) {
                  // 全部參數(shù)
                  List<Long> userIds = userReqs.stream().map(UserWrapBatchQueueService.Request::getUserId).collect(Collectors.toList());
                  QueryWrapper<Users> queryWrapper = new QueryWrapper<>();
                  // 用in語句合并成一條SQL,避免多次請求數(shù)據(jù)庫的IO
                  queryWrapper.in("id", userIds);
                  List<Users> users = usersMapper.selectList(queryWrapper);
                  Map<Long, List<Users>> userGroup = users.stream().collect(Collectors.groupingBy(Users::getId));
                  HashMap<String, Users> result = new HashMap<>();
                  // 數(shù)據(jù)分組
                  userReqs.forEach(val -> {
                      List<Users> usersList = userGroup.get(val.getUserId());
                      if (!CollectionUtils.isEmpty(usersList)) {
                          result.put(val.getRequestId(), usersList.get(0));
                      } else {
                          // 表示沒數(shù)據(jù) , 這里要new,不然加入隊列會空指針
                          result.put(val.getRequestId(), new Users());
                      }
                  });
                  return result;
              }

          ...省略...

          小結

          請求合并,批量的辦法能大幅節(jié)省被調(diào)用系統(tǒng)的連接資源,本例是以數(shù)據(jù)庫為例,其他RPC調(diào)用也是類似的道理。缺點就是請求的時間在執(zhí)行實際的邏輯之前增加了等待時間,不適合低并發(fā)的場景。

          源碼:https://gitee.com/apple_1030907690/spring-boot-kubernetes/tree/v1.0.5


                      
                      

          胖虎聯(lián)合兩位大佬朋友,一位是知名培訓機構講師和科大訊飛架構,聯(lián)合打造了《Java架構師成長之路》的視頻教程。完全對標外面2萬左右的培訓課程。

          除了基本的視頻教程之外,還提供了超詳細的課堂筆記,以及源碼等資料包..


          課程階段:

          1. Java核心 提升閱讀源碼的內(nèi)功心法
          2. 深入講解企業(yè)開發(fā)必備技術棧,夯實基礎,為跳槽加薪增加籌碼
          3. 分布式架構設計方法論。為學習分布式微服務做鋪墊
          4. 學習NetFilx公司產(chǎn)品,如Eureka、Hystrix、Zuul、Feign、Ribbon等,以及學習Spring Cloud Alibabba體系
          5. 微服務架構下的性能優(yōu)化
          6. 中間件源碼剖析
          7. 元原生以及虛擬化技術
          8. 從0開始,項目實戰(zhàn) SpringCloud Alibaba電商項目

          點擊下方超鏈接查看詳情(或者點擊文末閱讀原文):

          (點擊查看)  2024年,最新Java架構師成長之路 視頻教程!

          以下是課程大綱,大家可以雙擊打開原圖查看

          瀏覽 1203
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  最新中文字幕在线观看 | 六月婷婷视频 | 亚洲色图欧美在线 | 东京热一区二区 | 亚洲欧美非洲黄色毛片 |