<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          徒手?jǐn)]了一個RPC框架,理解更透徹了,代碼已上傳github,自取~

          共 37623字,需瀏覽 76分鐘

           ·

          2021-03-05 12:05

          由于公眾號文章推送規(guī)則改變,所以為了大家能夠準(zhǔn)時收到我們的文章推送,請記得將公眾號: JAVA 設(shè)為星標(biāo)~這樣就不會錯過每一篇精彩的推送啦~

           來源 |https://www.cnblogs.com/2YSP/p/13545217.html


          一、前言

          前段時間看到一篇不錯的文章《看了這篇你就會手寫RPC框架了》,于是便來了興趣對著實現(xiàn)了一遍,后面覺得還有很多優(yōu)化的地方便對其進行了改進。

          主要改動點如下:

          1. 除了Java序列化協(xié)議,增加了protobuf和kryo序列化協(xié)議,配置即用。
          2. 增加多種負(fù)載均衡算法(隨機、輪詢、加權(quán)輪詢、平滑加權(quán)輪詢),配置即用。
          3. 客戶端增加本地服務(wù)列表緩存,提高性能。
          4. 修復(fù)高并發(fā)情況下,netty導(dǎo)致的內(nèi)存泄漏問題
          5. 由原來的每個請求建立一次連接,改為建立TCP長連接,并多次復(fù)用。
          6. 服務(wù)端增加線程池提高消息處理能力

          二、介紹

          RPC,即 Remote Procedure Call(遠程過程調(diào)用),調(diào)用遠程計算機上的服務(wù),就像調(diào)用本地服務(wù)一樣。RPC可以很好的解耦系統(tǒng),如WebService就是一種基于Http協(xié)議的RPC。

          調(diào)用示意圖

          調(diào)用示意圖

          總的來說,就如下幾個步驟:

          1. 客戶端(ServerA)執(zhí)行遠程方法時就調(diào)用client stub傳遞類名、方法名和參數(shù)等信息。
          2. client stub會將參數(shù)等信息序列化為二進制流的形式,然后通過Sockect發(fā)送給服務(wù)端(ServerB)
          3. 服務(wù)端收到數(shù)據(jù)包后,server stub 需要進行解析反序列化為類名、方法名和參數(shù)等信息。
          4. server stub調(diào)用對應(yīng)的本地方法,并把執(zhí)行結(jié)果返回給客戶端

          所以一個RPC框架有如下角色:

          服務(wù)消費者

          遠程方法的調(diào)用方,即客戶端。一個服務(wù)既可以是消費者也可以是提供者。

          服務(wù)提供者

          遠程服務(wù)的提供方,即服務(wù)端。一個服務(wù)既可以是消費者也可以是提供者。

          注冊中心

          保存服務(wù)提供者的服務(wù)地址等信息,一般由zookeeper、redis等實現(xiàn)。

          監(jiān)控運維(可選)

          監(jiān)控接口的響應(yīng)時間、統(tǒng)計請求數(shù)量等,及時發(fā)現(xiàn)系統(tǒng)問題并發(fā)出告警通知。

          三、實現(xiàn)

          本RPC框架rpc-spring-boot-starter涉及技術(shù)棧如下:

          • 使用zookeeper作為注冊中心
          • 使用netty作為通信框架
          • 消息編解碼:protostuff、kryo、java
          • spring
          • 使用SPI來根據(jù)配置動態(tài)選擇負(fù)載均衡算法等

          由于代碼過多,這里只講幾處改動點。

          3.1動態(tài)負(fù)載均衡算法

          1.編寫LoadBalance的實現(xiàn)類

          負(fù)載均衡算法實現(xiàn)類

          2.自定義注解 @LoadBalanceAno

          /**
           * 負(fù)載均衡注解
           */

          @Target(ElementType.TYPE)
          @Retention(RetentionPolicy.RUNTIME)
          @Documented
          public @interface LoadBalanceAno {

              String value() default "";
          }

          /**
           * 輪詢算法
           */

          @LoadBalanceAno(RpcConstant.BALANCE_ROUND)
          public class FullRoundBalance implements LoadBalance {

              private static Logger logger = LoggerFactory.getLogger(FullRoundBalance.class);

              private volatile int index;

              @Override
              public synchronized Service chooseOne(List<Service> services) {
                  // 加鎖防止多線程情況下,index超出services.size()
                  if (index == services.size()) {
                      index = 0;
                  }
                  return services.get(index++);
              }
          }

          3.新建在resource目錄下META-INF/servers文件夾并創(chuàng)建文件

          enter description here

          4.RpcConfig增加配置項loadBalance

          /**
           * @author 2YSP
           * @date 2020/7/26 15:13
           */

          @ConfigurationProperties(prefix = "sp.rpc")
          public class RpcConfig {

              /**
               * 服務(wù)注冊中心地址
               */

              private String registerAddress = "127.0.0.1:2181";

              /**
               * 服務(wù)暴露端口
               */

              private Integer serverPort = 9999;
              /**
               * 服務(wù)協(xié)議
               */

              private String protocol = "java";
              /**
               * 負(fù)載均衡算法
               */

              private String loadBalance = "random";
              /**
               * 權(quán)重,默認(rèn)為1
               */

              private Integer weight = 1;

             // 省略getter setter
          }

          5.在自動配置類RpcAutoConfiguration根據(jù)配置選擇對應(yīng)的算法實現(xiàn)類

          /**
               * 使用spi匹配符合配置的負(fù)載均衡算法
               *
               * @param name
               * @return
               */

              private LoadBalance getLoadBalance(String name) {
                  ServiceLoader<LoadBalance> loader = ServiceLoader.load(LoadBalance.class);
                  Iterator<LoadBalance> iterator = loader.iterator();
                  while (iterator.hasNext()) {
                      LoadBalance loadBalance = iterator.next();
                      LoadBalanceAno ano = loadBalance.getClass().getAnnotation(LoadBalanceAno.class);
                      Assert.notNull(ano, "load balance name can not be empty!");
                      if (name.equals(ano.value())) {
                          return loadBalance;
                      }
                  }
                  throw new RpcException("invalid load balance config");
              }

           @Bean
              public ClientProxyFactory proxyFactory(@Autowired RpcConfig rpcConfig) {
                  ClientProxyFactory clientProxyFactory = new ClientProxyFactory();
                  // 設(shè)置服務(wù)發(fā)現(xiàn)著
                  clientProxyFactory.setServerDiscovery(new           ZookeeperServerDiscovery(rpcConfig.getRegisterAddress()));

                  // 設(shè)置支持的協(xié)議
                  Map<String, MessageProtocol> supportMessageProtocols = buildSupportMessageProtocols();
                  clientProxyFactory.setSupportMessageProtocols(supportMessageProtocols);
                  // 設(shè)置負(fù)載均衡算法
                  LoadBalance loadBalance = getLoadBalance(rpcConfig.getLoadBalance());
                  clientProxyFactory.setLoadBalance(loadBalance);
                  // 設(shè)置網(wǎng)絡(luò)層實現(xiàn)
                  clientProxyFactory.setNetClient(new NettyNetClient());

                  return clientProxyFactory;
              }

          3.2本地服務(wù)列表緩存

          使用Map來緩存數(shù)據(jù)

          /**
           * 服務(wù)發(fā)現(xiàn)本地緩存
           */

          public class ServerDiscoveryCache {
              /**
               * key: serviceName
               */

              private static final Map<String, List<Service>> SERVER_MAP = new ConcurrentHashMap<>();
              /**
               * 客戶端注入的遠程服務(wù)service class
               */

              public static final List<String> SERVICE_CLASS_NAMES = new ArrayList<>();

              public static void put(String serviceName, List<Service> serviceList) {
                  SERVER_MAP.put(serviceName, serviceList);
              }

              /**
               * 去除指定的值
               * @param serviceName
               * @param service
               */

              public static void remove(String serviceName, Service service) {
                  SERVER_MAP.computeIfPresent(serviceName, (key, value) ->
                          value.stream().filter(o -> !o.toString().equals(service.toString())).collect(Collectors.toList())
                  );
              }

              public static void removeAll(String serviceName) {
                  SERVER_MAP.remove(serviceName);
              }


              public static boolean isEmpty(String serviceName) {
                  return SERVER_MAP.get(serviceName) == null || SERVER_MAP.get(serviceName).size() == 0;
              }

              public static List<Service> get(String serviceName) {
                  return SERVER_MAP.get(serviceName);
              }
          }

          ClientProxyFactory,先查本地緩存,緩存沒有再查詢zookeeper。

          /**
               * 根據(jù)服務(wù)名獲取可用的服務(wù)地址列表
               * @param serviceName
               * @return
               */

              private List<Service> getServiceList(String serviceName) {
                  List<Service> services;
                  synchronized (serviceName){
                      if (ServerDiscoveryCache.isEmpty(serviceName)) {
                          services = serverDiscovery.findServiceList(serviceName);
                          if (services == null || services.size() == 0) {
                              throw new RpcException("No provider available!");
                          }
                          ServerDiscoveryCache.put(serviceName, services);
                      } else {
                          services = ServerDiscoveryCache.get(serviceName);
                      }
                  }
                  return services;
              }

          問題: 如果服務(wù)端因為宕機或網(wǎng)絡(luò)問題下線了,緩存卻還在就會導(dǎo)致客戶端請求已經(jīng)不可用的服務(wù)端,增加請求失敗率。**解決方案:**由于服務(wù)端注冊的是臨時節(jié)點,所以如果服務(wù)端下線節(jié)點會被移除。只要監(jiān)聽zookeeper的子節(jié)點,如果新增或刪除子節(jié)點就直接清空本地緩存即可。

          DefaultRpcProcessor

          /**
           * Rpc處理者,支持服務(wù)啟動暴露,自動注入Service
           * @author 2YSP
           * @date 2020/7/26 14:46
           */

          public class DefaultRpcProcessor implements ApplicationListener<ContextRefreshedEvent{

             

              @Override
              public void onApplicationEvent(ContextRefreshedEvent event) {
                  // Spring啟動完畢過后會收到一個事件通知
                  if (Objects.isNull(event.getApplicationContext().getParent())){
                      ApplicationContext context = event.getApplicationContext();
                      // 開啟服務(wù)
                      startServer(context);
                      // 注入Service
                      injectService(context);
                  }
              }

              private void injectService(ApplicationContext context) {
                  String[] names = context.getBeanDefinitionNames();
                  for(String name : names){
                      Class<?> clazz = context.getType(name);
                      if (Objects.isNull(clazz)){
                          continue;
                      }

                      Field[] declaredFields = clazz.getDeclaredFields();
                      for(Field field : declaredFields){
                          // 找出標(biāo)記了InjectService注解的屬性
                          InjectService injectService = field.getAnnotation(InjectService.class);
                          if (injectService == null){
                              continue;
                          }

                          Class<?> fieldClass = field.getType();
                          Object object = context.getBean(name);
                          field.setAccessible(true);
                          try {
                              field.set(object,clientProxyFactory.getProxy(fieldClass));
                          } catch (IllegalAccessException e) {
                              e.printStackTrace();
                          }
              // 添加本地服務(wù)緩存
                          ServerDiscoveryCache.SERVICE_CLASS_NAMES.add(fieldClass.getName());
                      }
                  }
                  // 注冊子節(jié)點監(jiān)聽
                  if (clientProxyFactory.getServerDiscovery() instanceof ZookeeperServerDiscovery){
                      ZookeeperServerDiscovery serverDiscovery = (ZookeeperServerDiscovery) clientProxyFactory.getServerDiscovery();
                      ZkClient zkClient = serverDiscovery.getZkClient();
                      ServerDiscoveryCache.SERVICE_CLASS_NAMES.forEach(name ->{
                          String servicePath = RpcConstant.ZK_SERVICE_PATH + RpcConstant.PATH_DELIMITER + name + "/service";
                          zkClient.subscribeChildChanges(servicePath, new ZkChildListenerImpl());
                      });
                      logger.info("subscribe service zk node successfully");
                  }

              }

              private void startServer(ApplicationContext context) {
                  ...

              }
          }

          ZkChildListenerImpl

          /**
           * 子節(jié)點事件監(jiān)聽處理類
           */

          public class ZkChildListenerImpl implements IZkChildListener {

              private static Logger logger = LoggerFactory.getLogger(ZkChildListenerImpl.class);

              /**
               * 監(jiān)聽子節(jié)點的刪除和新增事件
               * @param parentPath /rpc/serviceName/service
               * @param childList
               * @throws Exception
               */

              @Override
              public void handleChildChange(String parentPath, List<String> childList) throws Exception {
                  logger.debug("Child change parentPath:[{}] -- childList:[{}]", parentPath, childList);
                  // 只要子節(jié)點有改動就清空緩存
                  String[] arr = parentPath.split("/");
                  ServerDiscoveryCache.removeAll(arr[2]);
              }
          }

          3.3nettyClient支持TCP長連接

          這部分的改動最多,先增加新的sendRequest接口。

          添加接口

          實現(xiàn)類NettyNetClient

          /**
           * @author 2YSP
           * @date 2020/7/25 20:12
           */

          public class NettyNetClient implements NetClient {

              private static Logger logger = LoggerFactory.getLogger(NettyNetClient.class);

              private static ExecutorService threadPool = new ThreadPoolExecutor(410200,
                      TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadFactoryBuilder()
                      .setNameFormat("rpcClient-%d")
                      .build());

              private EventLoopGroup loopGroup = new NioEventLoopGroup(4);

              /**
               * 已連接的服務(wù)緩存
               * key: 服務(wù)地址,格式:ip:port
               */

              public static Map<String, SendHandlerV2> connectedServerNodes = new ConcurrentHashMap<>();

              @Override
              public byte[] sendRequest(byte[] data, Service service) throws InterruptedException {
            ....
                  return respData;
              }

              @Override
              public RpcResponse sendRequest(RpcRequest rpcRequest, Service service, MessageProtocol messageProtocol) {

                  String address = service.getAddress();
                  synchronized (address) {
                      if (connectedServerNodes.containsKey(address)) {
                          SendHandlerV2 handler = connectedServerNodes.get(address);
                          logger.info("使用現(xiàn)有的連接");
                          return handler.sendRequest(rpcRequest);
                      }

                      String[] addrInfo = address.split(":");
                      final String serverAddress = addrInfo[0];
                      final String serverPort = addrInfo[1];
                      final SendHandlerV2 handler = new SendHandlerV2(messageProtocol, address);
                      threadPool.submit(() -> {
                                  // 配置客戶端
                                  Bootstrap b = new Bootstrap();
                                  b.group(loopGroup).channel(NioSocketChannel.class)
                                          .option(ChannelOption.TCP_NODELAYtrue)
                                          .handler(new ChannelInitializer<SocketChannel>() 
          {
                                              @Override
                                              protected void initChannel(SocketChannel socketChannel) throws Exception {
                                                  ChannelPipeline pipeline = socketChannel.pipeline();
                                                  pipeline
                                                          .addLast(handler);
                                              }
                                          });
                                  // 啟用客戶端連接
                                  ChannelFuture channelFuture = b.connect(serverAddress, Integer.parseInt(serverPort));
                                  channelFuture.addListener(new ChannelFutureListener() {
                                      @Override
                                      public void operationComplete(ChannelFuture channelFuture) throws Exception {
                                          connectedServerNodes.put(address, handler);
                                      }
                                  });
                              }
                      );
                      logger.info("使用新的連接。。。");
                      return handler.sendRequest(rpcRequest);
                  }
              }
          }

          每次請求都會調(diào)用sendRequest()方法,用線程池異步和服務(wù)端創(chuàng)建TCP長連接,連接成功后將SendHandlerV2緩存到ConcurrentHashMap中方便復(fù)用,后續(xù)請求的請求地址(ip+port)如果在connectedServerNodes中存在則使用connectedServerNodes中的handler處理不再重新建立連接。

          SendHandlerV2

          /**
           * @author 2YSP
           * @date 2020/8/19 20:06
           */

          public class SendHandlerV2 extends ChannelInboundHandlerAdapter {

              private static Logger logger = LoggerFactory.getLogger(SendHandlerV2.class);

              /**
               * 等待通道建立最大時間
               */

              static final int CHANNEL_WAIT_TIME = 4;
              /**
               * 等待響應(yīng)最大時間
               */

              static final int RESPONSE_WAIT_TIME = 8;

              private volatile Channel channel;

              private String remoteAddress;

              private static Map<String, RpcFuture<RpcResponse>> requestMap = new ConcurrentHashMap<>();

              private MessageProtocol messageProtocol;

              private CountDownLatch latch = new CountDownLatch(1);

              public SendHandlerV2(MessageProtocol messageProtocol,String remoteAddress) {
                  this.messageProtocol = messageProtocol;
                  this.remoteAddress = remoteAddress;
              }

              @Override
              public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                  this.channel = ctx.channel();
                  latch.countDown();
              }

              @Override
              public void channelActive(ChannelHandlerContext ctx) throws Exception {
                  logger.debug("Connect to server successfully:{}", ctx);
              }

              @Override
              public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                  logger.debug("Client reads message:{}", msg);
                  ByteBuf byteBuf = (ByteBuf) msg;
                  byte[] resp = new byte[byteBuf.readableBytes()];
                  byteBuf.readBytes(resp);
                  // 手動回收
                  ReferenceCountUtil.release(byteBuf);
                  RpcResponse response = messageProtocol.unmarshallingResponse(resp);
                  RpcFuture<RpcResponse> future = requestMap.get(response.getRequestId());
                  future.setResponse(response);
              }

              @Override
              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                  cause.printStackTrace();
                  logger.error("Exception occurred:{}", cause.getMessage());
                  ctx.close();
              }

              @Override
              public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                  ctx.flush();
              }

              @Override
              public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                  super.channelInactive(ctx);
                  logger.error("channel inactive with remoteAddress:[{}]",remoteAddress);
                  NettyNetClient.connectedServerNodes.remove(remoteAddress);

              }

              @Override
              public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                  super.userEventTriggered(ctx, evt);
              }

              public RpcResponse sendRequest(RpcRequest request) {
                  RpcResponse response;
                  RpcFuture<RpcResponse> future = new RpcFuture<>();
                  requestMap.put(request.getRequestId(), future);
                  try {
                      byte[] data = messageProtocol.marshallingRequest(request);
                      ByteBuf reqBuf = Unpooled.buffer(data.length);
                      reqBuf.writeBytes(data);
                      if (latch.await(CHANNEL_WAIT_TIME,TimeUnit.SECONDS)){
                          channel.writeAndFlush(reqBuf);
                          // 等待響應(yīng)
                          response = future.get(RESPONSE_WAIT_TIME, TimeUnit.SECONDS);
                      }else {
                          throw new RpcException("establish channel time out");
                      }
                  } catch (Exception e) {
                      throw new RpcException(e.getMessage());
                  } finally {
                      requestMap.remove(request.getRequestId());
                  }
                  return response;
              }
          }

          RpcFuture

          package cn.sp.rpc.client.net;

          import java.util.concurrent.*;

          /**
           * @author 2YSP
           * @date 2020/8/19 22:31
           */

          public class RpcFuture<Timplements Future<T{

              private T response;
              /**
               * 因為請求和響應(yīng)是一一對應(yīng)的,所以這里是1
               */

              private CountDownLatch countDownLatch = new CountDownLatch(1);
              /**
               * Future的請求時間,用于計算Future是否超時
               */

              private long beginTime = System.currentTimeMillis();

              @Override
              public boolean cancel(boolean mayInterruptIfRunning) {
                  return false;
              }

              @Override
              public boolean isCancelled() {
                  return false;
              }

              @Override
              public boolean isDone() {
                  if (response != null) {
                      return true;
                  }
                  return false;
              }

              /**
               * 獲取響應(yīng),直到有結(jié)果才返回
               * @return
               * @throws InterruptedException
               * @throws ExecutionException
               */

              @Override
              public T get() throws InterruptedException, ExecutionException {
                  countDownLatch.await();
                  return response;
              }

              @Override
              public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                  if (countDownLatch.await(timeout,unit)){
                      return response;
                  }
                  return null;
              }

              public void setResponse(T response) {
                  this.response = response;
                  countDownLatch.countDown();
              }

              public long getBeginTime() {
                  return beginTime;
              }
          }

          此處邏輯,第一次執(zhí)行 SendHandlerV2#sendRequest() 時channel需要等待通道建立好之后才能發(fā)送請求,所以用CountDownLatch來控制,等待通道建立。自定義Future+requestMap緩存來實現(xiàn)netty的請求和阻塞等待響應(yīng),RpcRequest對象在創(chuàng)建時會生成一個請求的唯一標(biāo)識requestId,發(fā)送請求前先將RpcFuture緩存到requestMap中,key為requestId,讀取到服務(wù)端的響應(yīng)信息后(channelRead方法),將響應(yīng)結(jié)果放入對應(yīng)的RpcFuture中。SendHandlerV2#channelInactive() 方法中,如果連接的服務(wù)端異常斷開連接了,則及時清理緩存中對應(yīng)的serverNode。

          四、壓力測試

          測試環(huán)境:

          • (英特爾)Intel(R) Core(TM) i5-6300HQ CPU @ 2.30GHz 4核
          • windows10家庭版(64位)
          • 16G內(nèi)存

          1.本地啟動zookeeper 2.本地啟動一個消費者,兩個服務(wù)端,輪詢算法 3.使用ab進行壓力測試,4個線程發(fā)送10000個請求

          ab -c 4 -n 10000 http://localhost:8080/test/user?id=1

          測試結(jié)果

          測試結(jié)果

          從圖片可以看出,10000個請求只用了11s,比之前的130+秒耗時減少了10倍以上。

          代碼地址:
          https://github.com/2YSP/rpc-spring-boot-starter

          https://github.com/2YSP/rpc-example

          【END】
          讀 
          1. 帶工作流的springboot后臺管理項目,一個企業(yè)級快速開發(fā)解決方案
          2. 面試官問:為什么SpringBoot的 jar 可以直接運行?
          3. 推薦一套超高顏值的 Spring Boot 快速開發(fā)框架【文末送書】
          4. 防止刪庫跑路?市值縮水近 24 億元!就靠堡壘機?這貨這么吊?

          5.  2020年度開發(fā)者工具Top 100名單!


          瀏覽 57
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩一级片在线看 | 亚洲黄色电影网址 | 伊人日逼 | 久久久精品国产 | 亚洲天堂视频网 |