<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          10個類手寫實現(xiàn) RPC 通信框架原理

          共 9299字,需瀏覽 19分鐘

           ·

          2020-08-27 03:48

          鏈接:autumn200.com/2020/06/21/write-rpc/

          ??? ?

          ? ?正文? ?

          什么是rpc

          RPC:remote procedure call Protocol 遠(yuǎn)程過程調(diào)用 調(diào)用遠(yuǎn)程服務(wù),就像調(diào)用本地的服務(wù)一樣,不用關(guān)心調(diào)用細(xì)節(jié),就像調(diào)用本機的服務(wù)一樣的

          RPC原理

          實現(xiàn)RPC通信的程序包括5個部分:rpc-client、客戶端proxy、socket、服務(wù)端proxy、rpc-server

          request

          • 客戶端:當(dāng)rpc-client發(fā)起遠(yuǎn)程調(diào)用時,實際上是通過客戶端代理 將要調(diào)用的接口、方法、參數(shù)、參數(shù)類型進(jìn)行序列化,然后通過socket實時將封裝調(diào)用參數(shù)的實例發(fā)送到服務(wù)端。
          • 服務(wù)端:socket接收到客戶端傳來的信息進(jìn)行反序列化,然后通過這些信息委派到具體的實現(xiàn)對象

          response

          • 服務(wù)端:目標(biāo)方法執(zhí)行完成后,將執(zhí)行結(jié)果返回給socket
          • 客戶端:socket接收到結(jié)果后,返回給rpc-client,調(diào)用結(jié)束

          應(yīng)用到的技術(shù)

          • java
          • spring
          • 序列化
          • socket
          • 反射
          • 動態(tài)代理

          項目GitHub地址

          https://github.com/autumnqfeng/write_rpc

          服務(wù)端項目

          項目結(jié)構(gòu)

          rpc-server項目包含2個子項目:order-api、order-provider

          • order-api中存放請求接口與RpcRequest(類名、方法名、參數(shù)的實體類)

          • order-provider為請求接口實現(xiàn)、socket、proxy相關(guān)類

          order-api

          order-provider

          服務(wù)注冊

          要想實現(xiàn)動態(tài)調(diào)用ServiceImpl,關(guān)鍵就需要將service類管理起來,那問題來了,我們?nèi)绾喂芾磉@些服務(wù)類呢?

          我們可以參照spring中的@Service注解,通過自定義注解的方式來做到服務(wù)注冊,我們定義一個注解@RpcRemoteService作用在ServiceImpl類上,將標(biāo)記此注解的類名、方法名保存到Map中,以此來定位到具體實現(xiàn)類。

          @RpcRemoteService注解

          /**
          ?*?服務(wù)端服務(wù)發(fā)現(xiàn)注解
          ?*
          ?*?@author:?***
          ?*?@date:?2020/6/21?16:21
          ?*/

          @Target(ElementType.TYPE)
          @Retention(RetentionPolicy.RUNTIME)
          @Component
          public?@interface?RpcRemoteService?{
          }

          服務(wù)注冊類InitialMerdiator

          在spring容器初始化完成之后,掃描到@RpcRemoteService標(biāo)記的類,并保存到Mediator.ROUTING中。

          /**
          ?*?初始化中間代理層對象
          ?*
          ?*?@author:?***
          ?*?@date:?2020/6/21?16:33
          ?*/

          @Component
          public?class?InitialMerdiator?implements?BeanPostProcessor?{

          ????@Override
          ????public?Object?postProcessAfterInitialization(Object?bean,?String?beanName)?throws?BeansException?{
          ????????//加了服務(wù)發(fā)布標(biāo)記的bean進(jìn)行遠(yuǎn)程發(fā)布
          ????????if?(bean.getClass().isAnnotationPresent(RpcRemoteService.class))?{
          ????????????Method[]?methods?=?bean.getClass().getDeclaredMethods();
          ????????????for?(Method?method?:?methods)?{
          ????????????????String?routingKey?=?bean.getClass().getInterfaces()[0].getName()?+?"."?+?method.getName();
          ????????????????BeanMethod?beanMethod?=?new?BeanMethod();
          ????????????????beanMethod.setBean(bean);
          ????????????????beanMethod.setMethod(method);
          ????????????????Mediator.ROUTING.put(routingKey,?beanMethod);
          ????????????}
          ????????}
          ????????return?bean;
          ????}
          }

          socket監(jiān)聽

          socket監(jiān)聽客戶端請求

          socket啟動類SocketServer

          spring容器加載完成之后,啟動socket

          /**
          ?*?spring?容器啟動完成之后,會發(fā)布一個ContextRefreshedEven
          ?*?容器啟動后啟動socket監(jiān)聽
          ?*
          ?*?@author:?***
          ?*?@date:?2020/6/21?16:51
          ?*/

          @Component
          public?class?SocketServer?implements?ApplicationListener<ContextRefreshedEvent>?{
          ????private?final?ExecutorService?executorService=?Executors.newCachedThreadPool();

          ????@Override
          ????public?void?onApplicationEvent(ContextRefreshedEvent?contextRefreshedEvent)?{
          ????????ServerSocket?serverSocket=null;
          ????????try?{
          ????????????serverSocket?=?new?ServerSocket(8888);
          ????????????while?(true)?{
          ????????????????Socket?accept?=?serverSocket.accept();
          ????????????????//?線程池處理socket
          ????executorService.execute(new?ProcessorHandler(accept));
          ????????????}
          ????????}?catch?(IOException?e)?{
          ????????????e.printStackTrace();
          ????????}?finally?{
          ????????????if?(serverSocket?!=?null)?{
          ????????????????try?{
          ????????????????????serverSocket.close();
          ????????????????}?catch?(IOException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}
          ????????}
          ????}

          socket處理類ProcessorHandler

          處理監(jiān)聽到的每個socket

          public?class?ProcessorHandler?implements?Runnable?{

          ????private?Socket?socket;

          ????public?ProcessorHandler(Socket?socket)?{
          ????????this.socket?=?socket;
          ????}

          ????@Override
          ????public?void?run()?{
          ????????ObjectInputStream?inputStream?=?null;
          ????????ObjectOutputStream?outputStream?=?null;
          ????????try?{
          ????????????inputStream?=?new?ObjectInputStream(socket.getInputStream());
          ????????????//?反序列化
          ????????????RpcRequest?rpcRequest?=?(RpcRequest)?inputStream.readObject();

          ????????????//?中間代理執(zhí)行目標(biāo)方法
          ????????????Mediator?mediator?=?Mediator.getInstance();
          ????????????Object?response?=?mediator.processor(rpcRequest);
          ????????????System.out.println("服務(wù)端的執(zhí)行結(jié)果:"+response);

          ????????????outputStream?=?new?ObjectOutputStream(socket.getOutputStream());
          ????????????outputStream.writeObject(response);
          ????????????outputStream.flush();

          ????????}?catch?(Exception?e)?{
          ????????????e.printStackTrace();
          ????????}?finally?{
          ????????????closeStream(inputStream,?outputStream);
          ????????}
          ????}

          ????private?void?closeStream(ObjectInputStream?inputStream,?ObjectOutputStream?outputStream)?{
          ????????//?關(guān)閉流
          ????????if(inputStream!=null){
          ????????????try?{
          ????????????????inputStream.close();
          ????????????}?catch?(IOException?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????}
          ????????if?(outputStream!=null){
          ????????????try?{
          ????????????????outputStream.close();
          ????????????}?catch?(IOException?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????}
          ????}

          服務(wù)端代理

          Mediator

          /**
          ?*?服務(wù)端socket與目標(biāo)方法的中間代理層
          ?*
          ?*?@author:?***
          ?*?@date:?2020/6/21?16:25
          ?*/

          public?class?Mediator?{

          ????/**?用來存儲發(fā)布的服務(wù)的實例(服務(wù)調(diào)用的路由)?*/
          ????public?static?Map?ROUTING?=?new?ConcurrentHashMap<>();

          ????/**?單例模式創(chuàng)建該代理層實例?*/
          ????private?volatile?static?Mediator?instance;

          ????private?Mediator()?{
          ????}

          ????public?static?Mediator?getInstance()?{
          ????????if?(instance?==?null)?{
          ????????????synchronized?(Mediator.class)?{
          ????????????????if?(instance?==?null)?{
          ????????????????????instance?=?new?Mediator();
          ????????????????}
          ????????????}
          ????????}
          ????????return?instance;
          ????}

          ????public?Object?processor(RpcRequest?rpcRequest)?{
          ????????//?路由key
          ????????String?routingKey?=?rpcRequest.getClassName()?+?"."?+?rpcRequest.getMethodName();
          ????????BeanMethod?beanMethod?=?ROUTING.get(routingKey);
          ????????if?(beanMethod?==?null)?{
          ????????????return?null;
          ????????}
          ????????//?執(zhí)行目標(biāo)方法
          ????????Object?bean?=?beanMethod.getBean();
          ????????Method?method?=?beanMethod.getMethod();
          ????????try?{
          ????????????return?method.invoke(bean,?rpcRequest.getArgs());
          ????????}?catch?(Exception?e)?{
          ????????????e.printStackTrace();
          ????????}
          ????????return?null;
          ????}
          }

          BeanMethod

          /**
          ?*?中間層反射調(diào)用時,存儲目標(biāo)方法、目標(biāo)類的實體
          ?*
          ?*?@author:?***
          ?*?@date:?2020/6/21?16:43
          ?*/

          public?class?BeanMethod?{

          ????private?Object?bean;

          ????private?Method?method;

          ????//?setter、getter略
          }

          客戶端項目

          項目結(jié)構(gòu)

          服務(wù)發(fā)現(xiàn)

          服務(wù)發(fā)現(xiàn)我們同樣使用注解來做,這就需要參照Spring中@Autowired的原理實現(xiàn),我們自定義@RpcReference注解,定義在字段上,將接口實現(xiàn)的代理類注入到該字段上。

          @RpcReference注解

          /**
          ?*?服務(wù)注入注解
          ?*
          ?*?@author:?***
          ?*?@date:?2020/6/20?22:41
          ?*/

          @Target(ElementType.FIELD)
          @Retention(RetentionPolicy.RUNTIME)
          @Component
          public?@interface?RpcReference?{
          }

          服務(wù)發(fā)現(xiàn)類ReferenceInvokeProxy

          在spring容器初始化之前,掃描bean中所有@RpcReference注解標(biāo)記的字段。

          /**
          ?*?遠(yuǎn)程動態(tài)調(diào)用service代理
          ?*
          ?*?@author:?***
          ?*?@date:?2020/6/20?22:44
          ?*/

          @Component
          public?class?ReferenceInvokeProxy?implements?BeanPostProcessor?{

          ????@Autowired
          ????private?RemoteInvocationHandler?invocationHandler;

          ????@Override
          ????public?Object?postProcessBeforeInitialization(Object?bean,?String?beanName)?throws?BeansException?{
          ????????Field[]?fields?=?bean.getClass().getDeclaredFields();
          ????????for?(Field?field?:?fields)?{
          ????????????if?(field.isAnnotationPresent(RpcReference.class))?{
          ????????????????field.setAccessible(true);
          ????????????????//?針對這個加了RpcReference注解的字段,設(shè)置為一個代理的值
          ????????????????Object?proxy?=?Proxy.newProxyInstance(field.getType().getClassLoader(),?new?Class[]{field.getType()},?invocationHandler);
          ????????????????try?{
          ????????????????????//?相當(dāng)于針對加了RpcReference的注解,設(shè)置了一個代理,這個代理的實現(xiàn)是invocationHandler
          ????????????????????field.set(bean,?proxy);
          ????????????????}?catch?(IllegalAccessException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}
          ????????}
          ????????return?bean;
          ????}
          }

          客戶端代理

          客戶端動態(tài)代理InvocationHandler實現(xiàn)類RemoteInvocationHandler

          將目標(biāo)方法名、目標(biāo)類名、參數(shù)信息封裝到RpcRequest,然后交給socket發(fā)送到服務(wù)端。

          /**
          ?*?@author:?***
          ?*?@date:?2020/6/20?22:51
          ?*/

          @Component
          public?class?RemoteInvocationHandler?implements?InvocationHandler?{

          ????@Autowired
          ????private?RpcNetTransport?rpcNetTransport;

          ????@Override
          ????public?Object?invoke(Object?proxy,?Method?method,?Object[]?args)?throws?Throwable?{
          ????????RpcRequest?rpcRequest?=?new?RpcRequest();
          ????????rpcRequest.setClassName(method.getDeclaringClass().getName());
          ????????rpcRequest.setMethodName(method.getName());
          ????????rpcRequest.setTypes(method.getParameterTypes());
          ????????rpcRequest.setArgs(args);
          ????????return?rpcNetTransport.send(rpcRequest);
          ????}
          }

          客戶端socket

          網(wǎng)絡(luò)傳輸RpcNetTransport

          /**
          ?*?rpc?socket?網(wǎng)絡(luò)傳輸
          ?*
          ?*?@author:?***
          ?*?@date:?2020/6/20?22:59
          ?*/

          @Component
          public?class?RpcNetTransport?{
          ????@Value("${rpc.host}")
          ????private?String?host;
          ????@Value("${rpc.port}")
          ????private?int?port;


          ????public?Object?send(RpcRequest?rpcRequest)?{
          ????????ObjectOutputStream?outputStream?=?null;
          ????????ObjectInputStream?inputStream?=?null;
          ????????try?{
          ????????????Socket?socket?=?new?Socket(host,?port);
          ????????????//?發(fā)送目標(biāo)方法信息
          ????????????outputStream?=?new?ObjectOutputStream(socket.getOutputStream());
          ????????????outputStream.writeObject(rpcRequest);
          ????????????outputStream.flush();
          ???//?接收返回值
          ????????????inputStream?=?new?ObjectInputStream(socket.getInputStream());
          ????????????return?inputStream.readObject();
          ????????}?catch?(IOException?|?ClassNotFoundException?e)?{
          ????????????e.printStackTrace();
          ????????}?finally?{
          ????????????closeStream(inputStream,?outputStream);
          ????????}
          ????????return?null;
          ????}

          ????private?void?closeStream(ObjectInputStream?inputStream,?ObjectOutputStream?outputStream)?{
          ????????//?關(guān)閉流
          ????????if(inputStream!=null){
          ????????????try?{
          ????????????????inputStream.close();
          ????????????}?catch?(IOException?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????}
          ????????if?(outputStream!=null){
          ????????????try?{
          ????????????????outputStream.close();
          ????????????}?catch?(IOException?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????}
          ????}
          }
          瀏覽 37
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产成人精品视频夜夜嗨 | 日韩足交| 操逼视频在线免费看 | 国产avav | 男男无码一区二区三区 |