<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          項(xiàng)目自從用了接口請(qǐng)求合并,效率直接加倍!

          共 29156字,需瀏覽 59分鐘

           ·

          2023-01-12 15:50

          點(diǎn)擊關(guān)注公眾號(hào):互聯(lián)網(wǎng)架構(gòu)師,后臺(tái)回復(fù) 2T獲取2TB學(xué)習(xí)資源!

          上一篇:Alibaba開源內(nèi)網(wǎng)高并發(fā)編程手冊(cè).pdf

          請(qǐng)求合并到底有什么意義呢?我們來看下圖。

          假設(shè)我們3個(gè)用戶(用戶id分別是1、2、3),現(xiàn)在他們都要查詢自己的基本信息,請(qǐng)求到服務(wù)器,服務(wù)器端請(qǐng)求數(shù)據(jù)庫,發(fā)出3次請(qǐng)求。我們都知道數(shù)據(jù)庫連接資源是相當(dāng)寶貴的,那么我們?cè)趺幢M可能節(jié)省連接資源呢?

          這里把數(shù)據(jù)庫換成被調(diào)用的遠(yuǎn)程服務(wù),也是同樣的道理。

          我們改變下思路,如下圖所示。

          我們?cè)诜?wù)器端把請(qǐng)求合并,只發(fā)出一條SQL查詢數(shù)據(jù)庫,數(shù)據(jù)庫返回后,服務(wù)器端處理返回?cái)?shù)據(jù),根據(jù)一個(gè)唯一請(qǐng)求ID,把數(shù)據(jù)分組,返回給對(duì)應(yīng)用戶。

          技術(shù)手段

          • LinkedBlockQueue 阻塞隊(duì)列
          • ScheduledThreadPoolExecutor 定時(shí)任務(wù)線程池
          • CompleteableFuture future 阻塞機(jī)制(Java 8 的 CompletableFuture 并沒有 timeout 機(jī)制,后面優(yōu)化,使用了隊(duì)列替代)

          代碼實(shí)現(xiàn)

          查詢用戶的代碼

          public interface UserService {

              Map<String, Users> queryUserByIdBatch(List<UserWrapBatchService.Request> userReqs);
          }
          @Service
          public class UserServiceImpl implements UserService {

              @Resource
              private UsersMapper usersMapper;

              @Override
              public Map<String, Users> queryUserByIdBatch(List<UserWrapBatchService.Request> userReqs) {
                  // 全部參數(shù)
                  List<Long> userIds = userReqs.stream().map(UserWrapBatchService.Request::getUserId).collect(Collectors.toList());
                  QueryWrapper<Users> queryWrapper = new QueryWrapper<>();
                  // 用in語句合并成一條SQL,避免多次請(qǐng)求數(shù)據(jù)庫的IO
                  queryWrapper.in("id", userIds);
                  List<Users> users = usersMapper.selectList(queryWrapper);
                  Map<Long, List<Users>> userGroup = users.stream().collect(Collectors.groupingBy(Users::getId));
                  HashMap<String, Users> result = new HashMap<>();
                  userReqs.forEach(val -> {
                      List<Users> usersList = userGroup.get(val.getUserId());
                      if (!CollectionUtils.isEmpty(usersList)) {
                          result.put(val.getRequestId(), usersList.get(0));
                      } else {
                          // 表示沒數(shù)據(jù)
                          result.put(val.getRequestId(), null);
                      }
                  });
                  return result;
              }
          }

          合并請(qǐng)求的實(shí)現(xiàn)

          package com.springboot.sample.service.impl;

          import com.springboot.sample.bean.Users;
          import com.springboot.sample.service.UserService;
          import org.springframework.stereotype.Service;

          import javax.annotation.PostConstruct;
          import javax.annotation.Resource;
          import java.util.*;
          import java.util.concurrent.*;

          /***
           * zzq
           * 包裝成批量執(zhí)行的地方
           * */

          @Service
          public class UserWrapBatchService {
              @Resource
              private UserService userService;

              /**
               * 最大任務(wù)數(shù)
               **/

              public static int MAX_TASK_NUM = 100;


              /**
               * 請(qǐng)求類,code為查詢的共同特征,例如查詢商品,通過不同id的來區(qū)分
               * CompletableFuture將處理結(jié)果返回
               */

              public class Request {
                  // 請(qǐng)求id 唯一
                  String requestId;
                  // 參數(shù)
                  Long userId;
                  //TODO Java 8 的 CompletableFuture 并沒有 timeout 機(jī)制
                  CompletableFuture<Users> completableFuture;

                  public String getRequestId() {
                      return requestId;
                  }

                  public void setRequestId(String requestId) {
                      this.requestId = requestId;
                  }

                  public Long getUserId() {
                      return userId;
                  }

                  public void setUserId(Long userId) {
                      this.userId = userId;
                  }

                  public CompletableFuture getCompletableFuture() {
                      return completableFuture;
                  }

                  public void setCompletableFuture(CompletableFuture completableFuture) {
                      this.completableFuture = completableFuture;
                  }
              }

              /*
              LinkedBlockingQueue是一個(gè)阻塞的隊(duì)列,內(nèi)部采用鏈表的結(jié)果,通過兩個(gè)ReenTrantLock來保證線程安全
              LinkedBlockingQueue與ArrayBlockingQueue的區(qū)別
              ArrayBlockingQueue默認(rèn)指定了長度,而LinkedBlockingQueue的默認(rèn)長度是Integer.MAX_VALUE,也就是無界隊(duì)列,在移除的速度小于添加的速度時(shí),容易造成OOM。
              ArrayBlockingQueue的存儲(chǔ)容器是數(shù)組,而LinkedBlockingQueue是存儲(chǔ)容器是鏈表
              兩者的實(shí)現(xiàn)隊(duì)列添加或移除的鎖不一樣,ArrayBlockingQueue實(shí)現(xiàn)的隊(duì)列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個(gè)ReenterLock鎖,
              而LinkedBlockingQueue實(shí)現(xiàn)的隊(duì)列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊(duì)列的吞吐量,
              也意味著在高并發(fā)的情況下生產(chǎn)者和消費(fèi)者可以并行地操作隊(duì)列中的數(shù)據(jù),以此來提高整個(gè)隊(duì)列的并發(fā)性能。
               */

              private final Queue<Request> queue = new LinkedBlockingQueue();

              @PostConstruct
              public void init() {
                  //定時(shí)任務(wù)線程池,創(chuàng)建一個(gè)支持定時(shí)、周期性或延時(shí)任務(wù)的限定線程數(shù)目(這里傳入的是1)的線程池
                  ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

                  scheduledExecutorService.scheduleAtFixedRate(() -> {
                      int size = queue.size();
                      //如果隊(duì)列沒數(shù)據(jù),表示這段時(shí)間沒有請(qǐng)求,直接返回
                      if (size == 0) {
                          return;
                      }
                      List<Request> list = new ArrayList<>();
                      System.out.println("合并了 [" + size + "] 個(gè)請(qǐng)求");
                      //將隊(duì)列的請(qǐng)求消費(fèi)到一個(gè)集合保存
                      for (int i = 0; i < size; i++) {
                          // 后面的SQL語句是有長度限制的,所以還要做限制每次批量的數(shù)量,超過最大任務(wù)數(shù),等下次執(zhí)行
                          if (i < MAX_TASK_NUM) {
                              list.add(queue.poll());
                          }
                      }
                      //拿到我們需要去數(shù)據(jù)庫查詢的特征,保存為集合
                      List<Request> userReqs = new ArrayList<>();
                      for (Request request : list) {
                          userReqs.add(request);
                      }
                      //將參數(shù)傳入service處理, 這里是本地服務(wù),也可以把userService 看成RPC之類的遠(yuǎn)程調(diào)用
                      Map<String, Users> response = userService.queryUserByIdBatch(userReqs);
                      //將處理結(jié)果返回各自的請(qǐng)求
                      for (Request request : list) {
                          Users result = response.get(request.requestId);
                          request.completableFuture.complete(result);    //completableFuture.complete方法完成賦值,這一步執(zhí)行完畢,下面future.get()阻塞的請(qǐng)求可以繼續(xù)執(zhí)行了
                      }
                  }, 10010, TimeUnit.MILLISECONDS);
                  //scheduleAtFixedRate是周期性執(zhí)行 schedule是延遲執(zhí)行 initialDelay是初始延遲 period是周期間隔 后面是單位
                  //這里我寫的是 初始化后100毫秒后執(zhí)行,周期性執(zhí)行10毫秒執(zhí)行一次
              }

              public Users queryUser(Long userId) {
                  Request request = new Request();
                  // 這里用UUID做請(qǐng)求id
                  request.requestId = UUID.randomUUID().toString().replace("-""");
                  request.userId = userId;
                  CompletableFuture<Users> future = new CompletableFuture<>();
                  request.completableFuture = future;
                  //將對(duì)象傳入隊(duì)列
                  queue.offer(request);
                  //如果這時(shí)候沒完成賦值,那么就會(huì)阻塞,直到能夠拿到值
                  try {
                      return future.get();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  } catch (ExecutionException e) {
                      e.printStackTrace();
                  }
                  return null;
              }
          }

          控制層調(diào)用

          /***
           * 請(qǐng)求合并
           * */

          @RequestMapping("/merge")
          public Callable<Users> merge(Long userId) {
              return new Callable<Users>() {
                  @Override
                  public Users call() throws Exception {
                      return userBatchService.queryUser(userId);
                  }
              };
          }

          Callable是什么可以參考:

          https://blog.csdn.net/baidu_19473529/article/details/123596792

          模擬高并發(fā)查詢的代碼

          package com.springboot.sample;

          import org.springframework.web.client.RestTemplate;

          import java.util.Random;
          import java.util.concurrent.CountDownLatch;

          public class TestBatch {
              private static int threadCount = 30;

              private final static CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(threadCount); //為保證30個(gè)線程同時(shí)并發(fā)運(yùn)行

              private static final RestTemplate restTemplate = new RestTemplate();

              public static void main(String[] args) {


                  for (int i = 0; i < threadCount; i++) {//循環(huán)開30個(gè)線程
                      new Thread(new Runnable() {
                          public void run() {
                              COUNT_DOWN_LATCH.countDown();//每次減一
                              try {
                                  COUNT_DOWN_LATCH.await(); //此處等待狀態(tài),為了讓30個(gè)線程同時(shí)進(jìn)行
                              } catch (InterruptedException e) {
                                  e.printStackTrace();
                              }

                              for (int j = 1; j <= 3; j++) {
                                  int param = new Random().nextInt(4);
                                  if (param <=0){
                                      param++;
                                  }
                                  String responseBody = restTemplate.getForObject("http://localhost:8080/asyncAndMerge/merge?userId=" + param, String.class);
                                  System.out.println(Thread.currentThread().getName() + "參數(shù) " + param + " 返回值 " + responseBody);
                              }
                          }
                      }).start();

                  }
              }
          }

          測(cè)試效果

          要注意的問題

          • Java 8 的 CompletableFuture 并沒有 timeout 機(jī)制
          • 后面的SQL語句是有長度限制的,所以還要做限制每次批量的數(shù)量,超過最大任務(wù)數(shù),等下次執(zhí)行(本例中加了MAX_TASK_NUM判斷)

          使用隊(duì)列的超時(shí)解決Java 8 的 CompletableFuture 并沒有

          timeout 機(jī)制

          核心代碼

          package com.springboot.sample.service.impl;

          import com.springboot.sample.bean.Users;
          import com.springboot.sample.service.UserService;
          import org.springframework.stereotype.Service;

          import javax.annotation.PostConstruct;
          import javax.annotation.Resource;
          import java.util.*;
          import java.util.concurrent.*;

          /***
           * zzq
           * 包裝成批量執(zhí)行的地方,使用queue解決超時(shí)問題
           * */

          @Service
          public class UserWrapBatchQueueService {
              @Resource
              private UserService userService;

              /**
               * 最大任務(wù)數(shù)
               **/

              public static int MAX_TASK_NUM = 100;


              /**
               * 請(qǐng)求類,code為查詢的共同特征,例如查詢商品,通過不同id的來區(qū)分
               * CompletableFuture將處理結(jié)果返回
               */

              public class Request {
                  // 請(qǐng)求id
                  String requestId;

                  // 參數(shù)
                  Long userId;
                  // 隊(duì)列,這個(gè)有超時(shí)機(jī)制
                  LinkedBlockingQueue<Users> usersQueue;


                  public String getRequestId() {
                      return requestId;
                  }

                  public void setRequestId(String requestId) {
                      this.requestId = requestId;
                  }

                  public Long getUserId() {
                      return userId;
                  }

                  public void setUserId(Long userId) {
                      this.userId = userId;
                  }

                  public LinkedBlockingQueue<Users> getUsersQueue() {
                      return usersQueue;
                  }

                  public void setUsersQueue(LinkedBlockingQueue<Users> usersQueue) {
                      this.usersQueue = usersQueue;
                  }
              }

              /*
              LinkedBlockingQueue是一個(gè)阻塞的隊(duì)列,內(nèi)部采用鏈表的結(jié)果,通過兩個(gè)ReenTrantLock來保證線程安全
              LinkedBlockingQueue與ArrayBlockingQueue的區(qū)別
              ArrayBlockingQueue默認(rèn)指定了長度,而LinkedBlockingQueue的默認(rèn)長度是Integer.MAX_VALUE,也就是無界隊(duì)列,在移除的速度小于添加的速度時(shí),容易造成OOM。
              ArrayBlockingQueue的存儲(chǔ)容器是數(shù)組,而LinkedBlockingQueue是存儲(chǔ)容器是鏈表
              兩者的實(shí)現(xiàn)隊(duì)列添加或移除的鎖不一樣,ArrayBlockingQueue實(shí)現(xiàn)的隊(duì)列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個(gè)ReenterLock鎖,
              而LinkedBlockingQueue實(shí)現(xiàn)的隊(duì)列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊(duì)列的吞吐量,
              也意味著在高并發(fā)的情況下生產(chǎn)者和消費(fèi)者可以并行地操作隊(duì)列中的數(shù)據(jù),以此來提高整個(gè)隊(duì)列的并發(fā)性能。
               */

              private final Queue<Request> queue = new LinkedBlockingQueue();

              @PostConstruct
              public void init() {
                  //定時(shí)任務(wù)線程池,創(chuàng)建一個(gè)支持定時(shí)、周期性或延時(shí)任務(wù)的限定線程數(shù)目(這里傳入的是1)的線程池
                  ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

                  scheduledExecutorService.scheduleAtFixedRate(() -> {
                      int size = queue.size();
                      //如果隊(duì)列沒數(shù)據(jù),表示這段時(shí)間沒有請(qǐng)求,直接返回
                      if (size == 0) {
                          return;
                      }
                      List<Request> list = new ArrayList<>();
                      System.out.println("合并了 [" + size + "] 個(gè)請(qǐng)求");
                      //將隊(duì)列的請(qǐng)求消費(fèi)到一個(gè)集合保存
                      for (int i = 0; i < size; i++) {
                          // 后面的SQL語句是有長度限制的,所以還要做限制每次批量的數(shù)量,超過最大任務(wù)數(shù),等下次執(zhí)行
                          if (i < MAX_TASK_NUM) {
                              list.add(queue.poll());
                          }
                      }
                      //拿到我們需要去數(shù)據(jù)庫查詢的特征,保存為集合
                      List<Request> userReqs = new ArrayList<>();
                      for (Request request : list) {
                          userReqs.add(request);
                      }
                      //將參數(shù)傳入service處理, 這里是本地服務(wù),也可以把userService 看成RPC之類的遠(yuǎn)程調(diào)用
                      Map<String, Users> response = userService.queryUserByIdBatchQueue(userReqs);
                      for (Request userReq : userReqs) {
                          // 這里再把結(jié)果放到隊(duì)列里
                          Users users = response.get(userReq.getRequestId());
                          userReq.usersQueue.offer(users);
                      }

                  }, 10010, TimeUnit.MILLISECONDS);
                  //scheduleAtFixedRate是周期性執(zhí)行 schedule是延遲執(zhí)行 initialDelay是初始延遲 period是周期間隔 后面是單位
                  //這里我寫的是 初始化后100毫秒后執(zhí)行,周期性執(zhí)行10毫秒執(zhí)行一次
              }

              public Users queryUser(Long userId) {
                  Request request = new Request();
                  // 這里用UUID做請(qǐng)求id
                  request.requestId = UUID.randomUUID().toString().replace("-""");
                  request.userId = userId;
                  LinkedBlockingQueue<Users> usersQueue = new LinkedBlockingQueue<>();
                  request.usersQueue = usersQueue;
                  //將對(duì)象傳入隊(duì)列
                  queue.offer(request);
                  //取出元素時(shí),如果隊(duì)列為空,給定阻塞多少毫秒再隊(duì)列取值,這里是3秒
                  try {
                      return usersQueue.poll(3000,TimeUnit.MILLISECONDS);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  return null;
              }
          }
          ...省略..

              @Override
              public Map<String, Users> queryUserByIdBatchQueue(List<UserWrapBatchQueueService.Request> userReqs) {
                  // 全部參數(shù)
                  List<Long> userIds = userReqs.stream().map(UserWrapBatchQueueService.Request::getUserId).collect(Collectors.toList());
                  QueryWrapper<Users> queryWrapper = new QueryWrapper<>();
                  // 用in語句合并成一條SQL,避免多次請(qǐng)求數(shù)據(jù)庫的IO
                  queryWrapper.in("id", userIds);
                  List<Users> users = usersMapper.selectList(queryWrapper);
                  Map<Long, List<Users>> userGroup = users.stream().collect(Collectors.groupingBy(Users::getId));
                  HashMap<String, Users> result = new HashMap<>();
                  // 數(shù)據(jù)分組
                  userReqs.forEach(val -> {
                      List<Users> usersList = userGroup.get(val.getUserId());
                      if (!CollectionUtils.isEmpty(usersList)) {
                          result.put(val.getRequestId(), usersList.get(0));
                      } else {
                          // 表示沒數(shù)據(jù) , 這里要new,不然加入隊(duì)列會(huì)空指針
                          result.put(val.getRequestId(), new Users());
                      }
                  });
                  return result;
              }

          ...省略...

          小結(jié)

          請(qǐng)求合并,批量的辦法能大幅節(jié)省被調(diào)用系統(tǒng)的連接資源,本例是以數(shù)據(jù)庫為例,其他RPC調(diào)用也是類似的道理。缺點(diǎn)就是請(qǐng)求的時(shí)間在執(zhí)行實(shí)際的邏輯之前增加了等待時(shí)間,不適合低并發(fā)的場(chǎng)景。

          源碼:https://gitee.com/apple_1030907690/spring-boot-kubernetes/tree/v1.0.5

          最后,關(guān)注公眾號(hào)互聯(lián)網(wǎng)架構(gòu)師,在后臺(tái)回復(fù):2T,可以獲取我整理的 Java 系列面試題和答案,非常齊全


          正文結(jié)束


          推薦閱讀 ↓↓↓

          1.再見了 ,Shiro!

          2.從零開始搭建創(chuàng)業(yè)公司后臺(tái)技術(shù)棧

          3.程序員一般可以從什么平臺(tái)接私活?

          4.流程引擎的架構(gòu)設(shè)計(jì)

          5.為什么國內(nèi) 996 干不過國外的 955呢?

          6.中國的鐵路訂票系統(tǒng)在世界上屬于什么水平?                        

          7.15張圖看懂瞎忙和高效的區(qū)別!


          瀏覽 19
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产传媒-第1页-MM606-萌萌视频 | 黄色亚洲无码在线观看 | 大鸡巴久久久久久久久久久 | 不卡高清无码在线 | 亚洲激情内射 |