<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          100 行代碼搞定了 RPC 原理,面試官隨便問!

          共 40639字,需瀏覽 82分鐘

           ·

          2022-06-20 02:05

          關(guān)注公眾號(hào),學(xué)習(xí)更多 Java 干貨!

          作者:孫浩
          來源:https://xiaomi-info.github.io/2020/03/02/rpc-achieve/

          引言

          本文主要論述的是“RPC 實(shí)現(xiàn)原理”,那么首先明確一個(gè)問題什么是 RPC 呢?RPC 是 Remote Procedure Call 的縮寫,即,遠(yuǎn)程過程調(diào)用。

          RPC 是一個(gè)計(jì)算機(jī)通信協(xié)議。該協(xié)議允許運(yùn)行于一臺(tái)計(jì)算機(jī)的程序調(diào)用另一臺(tái)計(jì)算機(jī)的子程序,而開發(fā)人員無需額外地為這個(gè)交互編程。

          值得注意是,兩個(gè)或多個(gè)應(yīng)用程序都分布在不同的服務(wù)器上,它們之間的調(diào)用都像是本地方法調(diào)用一樣。接下來我們便來分析一下一次 RPC 調(diào)用發(fā)生了些什么?

          一次基本的 RPC 調(diào)用會(huì)涉及到什么?

          現(xiàn)在業(yè)界內(nèi)比較流行的一些 RPC 框架,例如 Dubbo 提供的是基于接口的遠(yuǎn)程方法調(diào)用,即客戶端只需要知道接口的定義即可調(diào)用遠(yuǎn)程服務(wù)。在 Java 中接口并不能直接調(diào)用實(shí)例方法,必須通過其實(shí)現(xiàn)類對(duì)象來完成此操作,這意味著客戶端必須為這些接口生成代理對(duì)象,對(duì)此 Java 提供了 Proxy、InvocationHandler 生成動(dòng)態(tài)代理的支持;生成了代理對(duì)象,那么每個(gè)具體的發(fā)方法是怎么調(diào)用的呢?

          jdk 動(dòng)態(tài)代理生成的代理對(duì)象調(diào)用指定方法時(shí)實(shí)際會(huì)執(zhí)行 InvocationHandler 中定義的 #invoke 方法,在該方法中完成遠(yuǎn)程方法調(diào)用并獲取結(jié)果。

          拋開客戶端,回過頭來看 RPC 是兩臺(tái)計(jì)算機(jī)間的調(diào)用,實(shí)質(zhì)上是兩臺(tái)主機(jī)間的網(wǎng)絡(luò)通信,涉及到網(wǎng)絡(luò)通信又必然會(huì)有序列化、反序列化編解碼等一些必須要考慮的問題;同時(shí)實(shí)際上現(xiàn)在大多系統(tǒng)都是集群部署的,多臺(tái)主機(jī)/容器對(duì)外提供相同的服務(wù),如果集群的節(jié)點(diǎn)數(shù)量很大的話,那么管理服務(wù)地址也將是一件十分繁瑣的事情。

          常見的做法是各個(gè)服務(wù)節(jié)點(diǎn)將自己的地址和提供的服務(wù)列表注冊(cè)到一個(gè) 注冊(cè)中心,由 注冊(cè)中心 來統(tǒng)一管理服務(wù)列表;這樣的做法解決了一些問題同時(shí)為客戶端增加了一項(xiàng)新的工作——那就是服務(wù)發(fā)現(xiàn),通俗來說就是從注冊(cè)中心中找到遠(yuǎn)程方法對(duì)應(yīng)的服務(wù)列表并通過某種策略從中選取一個(gè)服務(wù)地址來完成網(wǎng)絡(luò)通信。

          聊了客戶端和 注冊(cè)中心,另外一個(gè)重要的角色自然是服務(wù)端,服務(wù)端最重要的任務(wù)便是提供服務(wù)接口的真正實(shí)現(xiàn)并在某個(gè)端口上監(jiān)聽網(wǎng)絡(luò)請(qǐng)求,監(jiān)聽到請(qǐng)求后從網(wǎng)絡(luò)請(qǐng)求中獲取到對(duì)應(yīng)的參數(shù)(比如服務(wù)接口、方法、請(qǐng)求參數(shù)等),再根據(jù)這些參數(shù)通過反射的方式調(diào)用接口的真正實(shí)現(xiàn)獲取結(jié)果并將其寫入對(duì)應(yīng)的響應(yīng)流中。

          綜上所述,一次基本的 RPC 調(diào)用流程大致如下:

          基本實(shí)現(xiàn)

          服務(wù)端(生產(chǎn)者)

          • 服務(wù)接口

          在 RPC 中,生產(chǎn)者和消費(fèi)者有一個(gè)共同的服務(wù)接口 API。如下,定義一個(gè) HelloService 接口。

          /**
           * @author 孫浩
           * @Descrption  服務(wù)接口
           ***/
          public interface HelloService {
              String sayHello(String somebody);
          }
          • 服務(wù)實(shí)現(xiàn)

          生產(chǎn)者要提供服務(wù)接口的實(shí)現(xiàn),創(chuàng)建 HelloServiceImpl 實(shí)現(xiàn)類。

          /**
           * @author 孫浩
           * @Descrption 服務(wù)實(shí)現(xiàn)
           ***/
          public class HelloServiceImpl implements HelloService {
              @Override
              public String sayHello(String somebody) {
                  return "hello " + somebody + "!";
              }
          }
          • 服務(wù)注冊(cè)

          本例使用 Spring 來管理 bean,采用自定義 xml 和解析器的方式來將服務(wù)實(shí)現(xiàn)類載入容器(當(dāng)然也可以采用自定義注解的方式,此處不過多論述)并將服務(wù)接口信息注冊(cè)到注冊(cè)中心。 首先自定義xsd,

          <xsd:element name="service">
              <xsd:complexType>
                  <xsd:complexContent>
                      <xsd:extension base="beans:identifiedType">
                          <xsd:attribute name="interface" type="xsd:string" use="required"/>
                          <xsd:attribute name="timeout" type="xsd:int" use="required"/>
                          <xsd:attribute name="serverPort" type="xsd:int" use="required"/>
                          <xsd:attribute name="ref" type="xsd:string" use="required"/>
                          <xsd:attribute name="weight" type="xsd:int" use="optional"/>
                          <xsd:attribute name="workerThreads" type="xsd:int" use="optional"/>
                          <xsd:attribute name="appKey" type="xsd:string" use="required"/>
                          <xsd:attribute name="groupName" type="xsd:string" use="optional"/>
                      </xsd:extension>
                  </xsd:complexContent>
              </xsd:complexType>
          </xsd:element>

          分別指定 schema 和 xmd,schema 和對(duì)應(yīng) handler 的映射:

          schema
          http\://www.storm.com/schema/storm-service.xsd=META-INF/storm-service.xsd
          http\://www.storm.com/schema/storm-reference.xsd=META-INF/storm-reference.xsd
          handler
          http\://www.storm.com/schema/storm-service=com.hsunfkqm.storm.framework.spring.StormServiceNamespaceHandler
          http\://www.storm.com/schema/storm-reference=com.hsunfkqm.storm.framework.spring.StormRemoteReferenceNamespaceHandler

          將編寫好的文件放 classpath 下的 META-INF 目錄下:

          在 Spring 配置文件中配置服務(wù)類,這里推薦 Spring Boot,基礎(chǔ)就不介紹了,推薦下這個(gè)實(shí)戰(zhàn)教程:https://github.com/javastacks/spring-boot-best-practice

          <!-- 發(fā)布遠(yuǎn)程服務(wù) -->
           <bean id="helloService" class="com.hsunfkqm.storm.framework.test.HelloServiceImpl"/>
           <storm:service id="helloServiceRegister"
                               interface="com.hsunfkqm.storm.framework.test.HelloService"
                               ref="helloService"
                               groupName="default"
                               weight="2"
                               appKey="ares"
                               workerThreads="100"
                               serverPort="8081"
                               timeout="600"/>

          編寫對(duì)應(yīng)的 Handler 和 Parser:StormServiceNamespaceHandler

          import org.springframework.beans.factory.xml.NamespaceHandlerSupport;

          /**
           * @author 孫浩
           * @Descrption 服務(wù)發(fā)布自定義標(biāo)簽
           ***/
          public class StormServiceNamespaceHandler extends NamespaceHandlerSupport {
              @Override
              public void init() {
                  registerBeanDefinitionParser("service", new ProviderFactoryBeanDefinitionParser());
              }
          }

          ProviderFactoryBeanDefinitionParser

          protected Class getBeanClass(Element element) {
                  return ProviderFactoryBean.class;
              }

              protected void doParse(Element element, BeanDefinitionBuilder bean) {

                  try {
                      String serviceItf = element.getAttribute("interface");
                      String serverPort = element.getAttribute("serverPort");
                      String ref = element.getAttribute("ref");
                      // ....
                      bean.addPropertyValue("serverPort", Integer.parseInt(serverPort));
                      bean.addPropertyValue("serviceItf", Class.forName(serviceItf));
                      bean.addPropertyReference("serviceObject", ref);
                      //...
                      if (NumberUtils.isNumber(weight)) {
                          bean.addPropertyValue("weight", Integer.parseInt(weight));
                      }
                      //...
                 } catch (Exception e) {
                      // ...
                }
              }

          ProviderFactoryBean

          /**
           * @author 孫浩
           * @Descrption 服務(wù)發(fā)布
           ***/
          public class ProviderFactoryBean implements FactoryBean, InitializingBean {

              //服務(wù)接口
              private Class<?> serviceItf;
              //服務(wù)實(shí)現(xiàn)
              private Object serviceObject;
              //服務(wù)端口
              private String serverPort;
              //服務(wù)超時(shí)時(shí)間
              private long timeout;
              //服務(wù)代理對(duì)象,暫時(shí)沒有用到
              private Object serviceProxyObject;
              //服務(wù)提供者唯一標(biāo)識(shí)
              private String appKey;
              //服務(wù)分組組名
              private String groupName = "default";
              //服務(wù)提供者權(quán)重,默認(rèn)為 1 , 范圍為 [1-100]
              private int weight = 1;
              //服務(wù)端線程數(shù),默認(rèn) 10 個(gè)線程
              private int workerThreads = 10;

              @Override
              public Object getObject() throws Exception {
                  return serviceProxyObject;
              }

              @Override
              public Class<?> getObjectType() {
                  return serviceItf;
              }

              @Override
              public void afterPropertiesSet() throws Exception {
                  //啟動(dòng) Netty 服務(wù)端
                  NettyServer.singleton().start(Integer.parseInt(serverPort));
                  //注冊(cè)到 zk, 元數(shù)據(jù)注冊(cè)中心
                  List<ProviderService> providerServiceList = buildProviderServiceInfos();
                  IRegisterCenter4Provider registerCenter4Provider = RegisterCenter.singleton();
                  registerCenter4Provider.registerProvider(providerServiceList);
              }
          }

          //================RegisterCenter#registerProvider======================
          @Override
          public void registerProvider(final List<ProviderService> serviceMetaData) {
              if (CollectionUtils.isEmpty(serviceMetaData)) {
                  return;
              }

              //連接 zk, 注冊(cè)服務(wù)
              synchronized (RegisterCenter.class) {
                  for (ProviderService provider : serviceMetaData) {
                      String serviceItfKey = provider.getServiceItf().getName();

                      List<ProviderService> providers = providerServiceMap.get(serviceItfKey);
                      if (providers == null) {
                          providers = Lists.newArrayList();
                      }
                      providers.add(provider);
                      providerServiceMap.put(serviceItfKey, providers);
                  }

                  if (zkClient == null) {
                      zkClient = new ZkClient(ZK_SERVICE, ZK_SESSION_TIME_OUT, ZK_CONNECTION_TIME_OUT, new SerializableSerializer());
                  }

                  //創(chuàng)建 ZK 命名空間/當(dāng)前部署應(yīng)用 APP 命名空間/
                  String APP_KEY = serviceMetaData.get(0).getAppKey();
                  String ZK_PATH = ROOT_PATH + "/" + APP_KEY;
                  boolean exist = zkClient.exists(ZK_PATH);
                  if (!exist) {
                      zkClient.createPersistent(ZK_PATH, true);
                  }

                  for (Map.Entry<String, List<ProviderService>> entry : providerServiceMap.entrySet()) {
                      //服務(wù)分組
                      String groupName = entry.getValue().get(0).getGroupName();
                      //創(chuàng)建服務(wù)提供者
                      String serviceNode = entry.getKey();
                      String servicePath = ZK_PATH + "/" + groupName + "/" + serviceNode + "/" + PROVIDER_TYPE;
                      exist = zkClient.exists(servicePath);
                      if (!exist) {
                          zkClient.createPersistent(servicePath, true);
                      }

                      //創(chuàng)建當(dāng)前服務(wù)器節(jié)點(diǎn)
                      int serverPort = entry.getValue().get(0).getServerPort();//服務(wù)端口
                      int weight = entry.getValue().get(0).getWeight();//服務(wù)權(quán)重
                      int workerThreads = entry.getValue().get(0).getWorkerThreads();//服務(wù)工作線程
                      String localIp = IPHelper.localIp();
                      String currentServiceIpNode = servicePath + "/" + localIp + "|" + serverPort + "|" + weight + "|" + workerThreads + "|" + groupName;
                      exist = zkClient.exists(currentServiceIpNode);
                      if (!exist) {
                          //注意,這里創(chuàng)建的是臨時(shí)節(jié)點(diǎn)
                          zkClient.createEphemeral(currentServiceIpNode);
                      }
                      //監(jiān)聽注冊(cè)服務(wù)的變化,同時(shí)更新數(shù)據(jù)到本地緩存
                      zkClient.subscribeChildChanges(servicePath, new IZkChildListener() {
                          @Override
                          public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                              if (currentChilds == null) {
                                  currentChilds = Lists.newArrayList();
                              }
                              //存活的服務(wù) IP 列表
                              List<String> activityServiceIpList = Lists.newArrayList(Lists.transform(currentChilds, new Function<String, String>() {
                                  @Override
                                  public String apply(String input) {
                                      return StringUtils.split(input, "|")[0];
                                  }
                              }));
                              refreshActivityService(activityServiceIpList);
                          }
                      });

                  }
              }
          }

          至此服務(wù)實(shí)現(xiàn)類已被載入 Spring 容器中,且服務(wù)接口信息也注冊(cè)到了注冊(cè)中心。

          Java項(xiàng)目分享 最新整理全集,找項(xiàng)目不累啦 07版

          • 網(wǎng)絡(luò)通信

          作為生產(chǎn)者對(duì)外提供 RPC 服務(wù),必須有一個(gè)網(wǎng)絡(luò)程序來來監(jiān)聽請(qǐng)求和做出響應(yīng)。在 Java 領(lǐng)域 Netty 是一款高性能的 NIO 通信框架,很多的框架的通信都是采用 Netty 來實(shí)現(xiàn)的,本例中也采用它當(dāng)做通信服務(wù)器。

          構(gòu)建并啟動(dòng) Netty 服務(wù)監(jiān)聽指定端口:

          public void start(final int port) {
              synchronized (NettyServer.class) {
                  if (bossGroup != null || workerGroup != null) {
                      return;
                  }

                  bossGroup = new NioEventLoopGroup();
                  workerGroup = new NioEventLoopGroup();
                  ServerBootstrap serverBootstrap = new ServerBootstrap();
                  serverBootstrap
                          .group(bossGroup, workerGroup)
                          .channel(NioServerSocketChannel.class)
                          .option(ChannelOption.SO_BACKLOG, 1024)
                          .childOption(ChannelOption.SO_KEEPALIVE, true)
                          .childOption(ChannelOption.TCP_NODELAY, true)
                          .handler(new LoggingHandler(LogLevel.INFO))
                          .childHandler(new ChannelInitializer<SocketChannel>() {
                              @Override
                              protected void initChannel(SocketChannel ch) throws Exception {
                                  //注冊(cè)解碼器 NettyDecoderHandler
                                  ch.pipeline().addLast(new NettyDecoderHandler(StormRequest.class, serializeType));
                                  //注冊(cè)編碼器 NettyEncoderHandler
                                  ch.pipeline().addLast(new NettyEncoderHandler(serializeType));
                                  //注冊(cè)服務(wù)端業(yè)務(wù)邏輯處理器 NettyServerInvokeHandler
                                  ch.pipeline().addLast(new NettyServerInvokeHandler());
                              }
                          });
                  try {
                      channel = serverBootstrap.bind(port).sync().channel();
                  } catch (InterruptedException e) {
                      throw new RuntimeException(e);
                  }
              }
          }

          上面的代碼中向 Netty 服務(wù)的 pipeline 中添加了編解碼和業(yè)務(wù)處理器,當(dāng)接收到請(qǐng)求時(shí),經(jīng)過編解碼后,真正處理業(yè)務(wù)的是業(yè)務(wù)處理器,即NettyServerInvokeHandler, 該處理器繼承自SimpleChannelInboundHandler, 當(dāng)數(shù)據(jù)讀取完成將觸發(fā)一個(gè)事件,并調(diào)用NettyServerInvokeHandler#channelRead0方法來處理請(qǐng)求。


          @Override
          protected void channelRead0(ChannelHandlerContext ctx, StormRequest request) throws Exception {
              if (ctx.channel().isWritable()) {
                  //從服務(wù)調(diào)用對(duì)象里獲取服務(wù)提供者信息
                  ProviderService metaDataModel = request.getProviderService();
                  long consumeTimeOut = request.getInvokeTimeout();
                  final String methodName = request.getInvokedMethodName();

                  //根據(jù)方法名稱定位到具體某一個(gè)服務(wù)提供者
                  String serviceKey = metaDataModel.getServiceItf().getName();
                  //獲取限流工具類
                  int workerThread = metaDataModel.getWorkerThreads();
                  Semaphore semaphore = serviceKeySemaphoreMap.get(serviceKey);
                  if (semaphore == null) {
                      synchronized (serviceKeySemaphoreMap) {
                          semaphore = serviceKeySemaphoreMap.get(serviceKey);
                          if (semaphore == null) {
                              semaphore = new Semaphore(workerThread);
                              serviceKeySemaphoreMap.put(serviceKey, semaphore);
                          }
                      }
                  }

                  //獲取注冊(cè)中心服務(wù)
                  IRegisterCenter4Provider registerCenter4Provider = RegisterCenter.singleton();
                  List<ProviderService> localProviderCaches = registerCenter4Provider.getProviderServiceMap().get(serviceKey);

                  Object result = null;
                  boolean acquire = false;

                  try {
                      ProviderService localProviderCache = Collections2.filter(localProviderCaches, new Predicate<ProviderService>() {
                          @Override
                          public boolean apply(ProviderService input) {
                              return StringUtils.equals(input.getServiceMethod().getName(), methodName);
                          }
                      }).iterator().next();
                      Object serviceObject = localProviderCache.getServiceObject();

                      //利用反射發(fā)起服務(wù)調(diào)用
                      Method method = localProviderCache.getServiceMethod();
                      //利用 semaphore 實(shí)現(xiàn)限流
                      acquire = semaphore.tryAcquire(consumeTimeOut, TimeUnit.MILLISECONDS);
                      if (acquire) {
                          result = method.invoke(serviceObject, request.getArgs());
                          //System.out.println("---------------"+result);
                      }
                  } catch (Exception e) {
                      System.out.println(JSON.toJSONString(localProviderCaches) + "  " + methodName+" "+e.getMessage());
                      result = e;
                  } finally {
                      if (acquire) {
                          semaphore.release();
                      }
                  }
                  //根據(jù)服務(wù)調(diào)用結(jié)果組裝調(diào)用返回對(duì)象
                  StormResponse response = new StormResponse();
                  response.setInvokeTimeout(consumeTimeOut);
                  response.setUniqueKey(request.getUniqueKey());
                  response.setResult(result);
                  //將服務(wù)調(diào)用返回對(duì)象回寫到消費(fèi)端
                  ctx.writeAndFlush(response);
              } else {
                  logger.error("------------channel closed!---------------");
              }
          }

          此處還有部分細(xì)節(jié)如自定義的編解碼器等,篇幅所限不在此詳述,繼承 MessageToByteEncoderByteToMessageDecoder 覆寫對(duì)應(yīng)的 encodedecode 方法即可自定義編解碼器,使用到的序列化工具如 Hessian/Proto 等可參考對(duì)應(yīng)的官方文檔。

          Java項(xiàng)目分享 最新整理全集,找項(xiàng)目不累啦 

          • 請(qǐng)求和響應(yīng)包裝 為便于封裝請(qǐng)求和響應(yīng),定義兩個(gè) bean 來表示請(qǐng)求和響應(yīng)。

          請(qǐng)求:

          /**
           * @author 孫浩
           * @Descrption
           ***/
          public class StormRequest implements Serializable {

              private static final long serialVersionUID = -5196465012408804755L;
              //UUID, 唯一標(biāo)識(shí)一次返回值
              private String uniqueKey;
              //服務(wù)提供者信息
              private ProviderService providerService;
              //調(diào)用的方法名稱
              private String invokedMethodName;
              //傳遞參數(shù)
              private Object[] args;
              //消費(fèi)端應(yīng)用名
              private String appName;
              //消費(fèi)請(qǐng)求超時(shí)時(shí)長(zhǎng)
              private long invokeTimeout;
              // getter/setter
          }

          響應(yīng):

          /**
           * @author 孫浩
           * @Descrption
           ***/
          public class StormResponse implements Serializable {
              private static final long serialVersionUID = 5785265307118147202L;
              //UUID, 唯一標(biāo)識(shí)一次返回值
              private String uniqueKey;
              //客戶端指定的服務(wù)超時(shí)時(shí)間
              private long invokeTimeout;
              //接口調(diào)用返回的結(jié)果對(duì)象
              private Object result;
              //getter/setter
          }

          客戶端(消費(fèi)者)

          客戶端(消費(fèi)者)在 RPC 調(diào)用中主要是生成服務(wù)接口的代理對(duì)象,并從注冊(cè)中心獲取對(duì)應(yīng)的服務(wù)列表發(fā)起網(wǎng)絡(luò)請(qǐng)求。 客戶端和服務(wù)端一樣采用 Spring 來管理 bean 解析 xml 配置等不再贅述,重點(diǎn)看下以下幾點(diǎn):

          • 通過 jdk 動(dòng)態(tài)代理來生成引入服務(wù)接口的代理對(duì)象
          public Object getProxy() {
              return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{targetInterface}, this);
          }
          • 從注冊(cè)中心獲取服務(wù)列表并依據(jù)某種策略選取其中一個(gè)服務(wù)節(jié)點(diǎn)
          //服務(wù)接口名稱
          String serviceKey = targetInterface.getName();
          //獲取某個(gè)接口的服務(wù)提供者列表
          IRegisterCenter4Invoker registerCenter4Consumer = RegisterCenter.singleton();
          List<ProviderService> providerServices = registerCenter4Consumer.getServiceMetaDataMap4Consume().get(serviceKey);
          //根據(jù)軟負(fù)載策略,從服務(wù)提供者列表選取本次調(diào)用的服務(wù)提供者
          ClusterStrategy clusterStrategyService = ClusterEngine.queryClusterStrategy(clusterStrategy);
          ProviderService providerService = clusterStrategyService.select(providerServices);
          • 通過 Netty 建立連接,發(fā)起網(wǎng)絡(luò)請(qǐng)求
          /**
           * @author 孫浩
           * @Descrption Netty 消費(fèi)端 bean 代理工廠
           ***/
          public class RevokerProxyBeanFactory implements InvocationHandler {
              private ExecutorService fixedThreadPool = null;
              //服務(wù)接口
              private Class<?> targetInterface;
              //超時(shí)時(shí)間
              private int consumeTimeout;
              //調(diào)用者線程數(shù)
              private static int threadWorkerNumber = 10;
              //負(fù)載均衡策略
              private String clusterStrategy;

              @Override
              public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

                  ...

                  //復(fù)制一份服務(wù)提供者信息
                  ProviderService newProvider = providerService.copy();
                  //設(shè)置本次調(diào)用服務(wù)的方法以及接口
                  newProvider.setServiceMethod(method);
                  newProvider.setServiceItf(targetInterface);

                  //聲明調(diào)用 AresRequest 對(duì)象,AresRequest 表示發(fā)起一次調(diào)用所包含的信息
                  final StormRequest request = new StormRequest();
                  //設(shè)置本次調(diào)用的唯一標(biāo)識(shí)
                  request.setUniqueKey(UUID.randomUUID().toString() + "-" + Thread.currentThread().getId());
                  //設(shè)置本次調(diào)用的服務(wù)提供者信息
                  request.setProviderService(newProvider);
                  //設(shè)置本次調(diào)用的方法名稱
                  request.setInvokedMethodName(method.getName());
                  //設(shè)置本次調(diào)用的方法參數(shù)信息
                  request.setArgs(args);

                  try {
                      //構(gòu)建用來發(fā)起調(diào)用的線程池
                      if (fixedThreadPool == null) {
                          synchronized (RevokerProxyBeanFactory.class) {
                              if (null == fixedThreadPool) {
                                  fixedThreadPool = Executors.newFixedThreadPool(threadWorkerNumber);
                              }
                          }
                      }
                      //根據(jù)服務(wù)提供者的 ip,port, 構(gòu)建 InetSocketAddress 對(duì)象,標(biāo)識(shí)服務(wù)提供者地址
                      String serverIp = request.getProviderService().getServerIp();
                      int serverPort = request.getProviderService().getServerPort();
                      InetSocketAddress inetSocketAddress = new InetSocketAddress(serverIp, serverPort);
                      //提交本次調(diào)用信息到線程池 fixedThreadPool, 發(fā)起調(diào)用
                      Future<StormResponse> responseFuture = fixedThreadPool.submit(RevokerServiceCallable.of(inetSocketAddress, request));
                      //獲取調(diào)用的返回結(jié)果
                      StormResponse response = responseFuture.get(request.getInvokeTimeout(), TimeUnit.MILLISECONDS);
                      if (response != null) {
                          return response.getResult();
                      }
                  } catch (Exception e) {
                      throw new RuntimeException(e);
                  }
                  return null;
              }
              //  ...
          }

          Netty 的響應(yīng)是異步的,為了在方法調(diào)用返回前獲取到響應(yīng)結(jié)果,需要將異步的結(jié)果同步化。

          • Netty 異步返回的結(jié)果存入阻塞隊(duì)列
          @Override
          protected void channelRead0(ChannelHandlerContext channelHandlerContext, StormResponse response) throws Exception {
              //將 Netty 異步返回的結(jié)果存入阻塞隊(duì)列,以便調(diào)用端同步獲取
              RevokerResponseHolder.putResultValue(response);
          }
          • 請(qǐng)求發(fā)出后同步獲取結(jié)果
          //提交本次調(diào)用信息到線程池 fixedThreadPool, 發(fā)起調(diào)用
          Future<StormResponse> responseFuture = fixedThreadPool.submit(RevokerServiceCallable.of(inetSocketAddress, request));
          //獲取調(diào)用的返回結(jié)果
          StormResponse response = responseFuture.get(request.getInvokeTimeout(), TimeUnit.MILLISECONDS);
          if (response != null) {
              return response.getResult();
          }

          //===================================================
          //從返回結(jié)果容器中獲取返回結(jié)果,同時(shí)設(shè)置等待超時(shí)時(shí)間為 invokeTimeout
          long invokeTimeout = request.getInvokeTimeout();
          StormResponse response = RevokerResponseHolder.getValue(request.getUniqueKey(), invokeTimeout);

          測(cè)試

          Server

          /**
           * @author 孫浩
           * @Descrption
           ***/
          public class MainServer {
              public static void main(String[] args) throws Exception {
                  //發(fā)布服務(wù)
                  final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("storm-server.xml");
                  System.out.println(" 服務(wù)發(fā)布完成");
              }
          }

          Client

          public class Client {

              private static final Logger logger = LoggerFactory.getLogger(Client.class);

              public static void main(String[] args) throws Exception {

                  final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("storm-client.xml");
                  final HelloService helloService = (HelloService) context.getBean("helloService");
                  String result = helloService.sayHello("World");
                  System.out.println(result);
                  for (;;) {

                  }
              }
          }

          結(jié)果

          生產(chǎn)者:

          消費(fèi)者:

          注冊(cè)中心


          總結(jié)

          本文簡(jiǎn)單介紹了 RPC 的整個(gè)流程,并實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的 RPC 調(diào)用。希望閱讀完本文之后,能加深你對(duì) RPC 的一些認(rèn)識(shí)。

          • 生產(chǎn)者端流程:
            • 加載服務(wù)接口,并緩存
            • 服務(wù)注冊(cè),將服務(wù)接口以及服務(wù)主機(jī)信息寫入注冊(cè)中心(本例使用的是 zookeeper)
            • 啟動(dòng)網(wǎng)絡(luò)服務(wù)器并監(jiān)聽
            • 反射,本地調(diào)用
          • 消費(fèi)者端流程:
            • 代理服務(wù)接口生成代理對(duì)象
            • 服務(wù)發(fā)現(xiàn)(連接 zookeeper,拿到服務(wù)地址列表,通過客戶端負(fù)載策略獲取合適的服務(wù)地址)
            • 遠(yuǎn)程方法調(diào)用(本例通過 Netty,發(fā)送消息,并獲取響應(yīng)結(jié)果)


          — End —


          程序汪資料鏈接

          程序汪接的7個(gè)私活都在這里,經(jīng)驗(yàn)整理

          Java項(xiàng)目分享  最新整理全集,找項(xiàng)目不累啦 07版

          堪稱神級(jí)的Spring Boot手冊(cè),從基礎(chǔ)入門到實(shí)戰(zhàn)進(jìn)階

          臥槽!字節(jié)跳動(dòng)《算法中文手冊(cè)》火了,完整版 PDF 開放下載!

          臥槽!阿里大佬總結(jié)的《圖解Java》火了,完整版PDF開放下載!

          字節(jié)跳動(dòng)總結(jié)的設(shè)計(jì)模式 PDF 火了,完整版開放下載!


          歡迎添加程序汪個(gè)人微信 itwang009  進(jìn)粉絲群或圍觀朋友圈

          瀏覽 49
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久草手机视频在线 | jiujiujiu999 | 亚洲午夜精品久久久久久APP | 91成人高清| 午夜无码操逼逼 |