<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          10 個(gè)類手寫實(shí)現(xiàn) RPC 通信框架

          共 9493字,需瀏覽 19分鐘

           ·

          2020-08-07 01:25

          鏈接:autumn200.com



          么是rpc

          RPC:remote procedure call Protocol 遠(yuǎn)程過程調(diào)用 調(diào)用遠(yuǎn)程服務(wù),就像調(diào)用本地的服務(wù)一樣,不用關(guān)心調(diào)用細(xì)節(jié),就像調(diào)用本機(jī)的服務(wù)一樣的

          RPC原理

          實(shí)現(xiàn)RPC通信的程序包括5個(gè)部分:rpc-client、客戶端proxy、socket、服務(wù)端proxy、rpc-server

          request


          • 客戶端:當(dāng)rpc-client發(fā)起遠(yuǎn)程調(diào)用時(shí),實(shí)際上是通過客戶端代理 將要調(diào)用的接口、方法、參數(shù)、參數(shù)類型進(jìn)行序列化,然后通過socket實(shí)時(shí)將封裝調(diào)用參數(shù)的實(shí)例發(fā)送到服務(wù)端。
          • 服務(wù)端:socket接收到客戶端傳來的信息進(jìn)行反序列化,然后通過這些信息委派到具體的實(shí)現(xiàn)對(duì)象

          response

          • 服務(wù)端:目標(biāo)方法執(zhí)行完成后,將執(zhí)行結(jié)果返回給socket
          • 客戶端:socket接收到結(jié)果后,返回給rpc-client,調(diào)用結(jié)束

          應(yīng)用到的技術(shù)

          • java
          • spring
          • 序列化
          • socket
          • 反射
          • 動(dòng)態(tài)代理


          項(xiàng)目GitHub地址

          https://github.com/autumnqfeng/write_rpc

          掘金地址

          https://juejin.im/post/5eef0fe551882565a459e8b6

          服務(wù)端項(xiàng)目


          項(xiàng)目結(jié)構(gòu)

          rpc-server項(xiàng)目包含2個(gè)子項(xiàng)目:order-api、order-provider
          order-api中存放請(qǐng)求接口與RpcRequest(類名、方法名、參數(shù)的實(shí)體類)
          order-provider為請(qǐng)求接口實(shí)現(xiàn)、socket、proxy相關(guān)類

          order-api

          order-provider

          服務(wù)注冊(cè)

          要想實(shí)現(xiàn)動(dòng)態(tài)調(diào)用ServiceImpl,關(guān)鍵就需要將service類管理起來,那問題來了,我們?nèi)绾喂芾磉@些服務(wù)類呢?
          我們可以參照spring中的@Service注解,通過自定義注解的方式來做到服務(wù)注冊(cè),我們定義一個(gè)注解@RpcRemoteService作用在ServiceImpl類上,將標(biāo)記此注解的類名、方法名保存到Map中,以此來定位到具體實(shí)現(xiàn)類。
          @RpcRemoteService注解
          /**
          ?* 服務(wù)端服務(wù)發(fā)現(xiàn)注解
          ?*
          ?* @author: ***
          ?* @date: 2020/6/21 16:21
          ?*/

          @Target(ElementType.TYPE)
          @Retention(RetentionPolicy.RUNTIME)
          @Component
          public @interface?RpcRemoteService {
          }


          服務(wù)注冊(cè)類InitialMerdiator
          在spring容器初始化完成之后,掃描到@RpcRemoteService標(biāo)記的類,并保存到Mediator.ROUTING中。
          /**
          ?* 初始化中間代理層對(duì)象
          ?*
          ?* @author: ***
          ?* @date: 2020/6/21 16:33
          ?*/

          @Component
          public?class?InitialMerdiator?implements?BeanPostProcessor?{

          ????@Override
          ????public?Object postProcessAfterInitialization(Object bean, String beanName)?throws?BeansException {
          ????????//加了服務(wù)發(fā)布標(biāo)記的bean進(jìn)行遠(yuǎn)程發(fā)布
          ????????if?(bean.getClass().isAnnotationPresent(RpcRemoteService.class)) {
          ????????????Method[] methods = bean.getClass().getDeclaredMethods();
          ????????????for?(Method method : methods) {
          ????????????????String routingKey = bean.getClass().getInterfaces()[0].getName() + "."?+ method.getName();
          ????????????????BeanMethod beanMethod = new?BeanMethod();
          ????????????????beanMethod.setBean(bean);
          ????????????????beanMethod.setMethod(method);
          ????????????????Mediator.ROUTING.put(routingKey, beanMethod);
          ????????????}
          ????????}
          ????????return?bean;
          ????}
          }
          socket監(jiān)聽
          socket監(jiān)聽客戶端請(qǐng)求
          socket啟動(dòng)類SocketServer
          spring容器加載完成之后,啟動(dòng)socket
          /**
          ?* spring 容器啟動(dòng)完成之后,會(huì)發(fā)布一個(gè)ContextRefreshedEven
          ?* 容器啟動(dòng)后啟動(dòng)socket監(jiān)聽
          ?*
          ?* @author: ***
          ?* @date: 2020/6/21 16:51
          ?*/

          @Component
          public?class?SocketServer?implements?ApplicationListener<ContextRefreshedEvent> {
          ????private?final?ExecutorService executorService= Executors.newCachedThreadPool();

          ????@Override
          ????public?void?onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent)?{
          ????????ServerSocket serverSocket=null;
          ????????try?{
          ????????????serverSocket = new?ServerSocket(8888);
          ????????????while?(true) {
          ????????????????Socket accept = serverSocket.accept();
          ????????????????// 線程池處理socket
          ????????executorService.execute(new?ProcessorHandler(accept));
          ????????????}
          ????????} catch?(IOException e) {
          ????????????e.printStackTrace();
          ????????} finally?{
          ????????????if?(serverSocket != null) {
          ????????????????try?{
          ????????????????????serverSocket.close();
          ????????????????} catch?(IOException e) {
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}
          ????????}
          ????}
          socket處理類ProcessorHandler
          處理監(jiān)聽到的每個(gè)socket
          public?class?ProcessorHandler?implements?Runnable?{

          ????private?Socket socket;

          ????public?ProcessorHandler(Socket socket)?{
          ????????this.socket = socket;
          ????}

          ????@Override
          ????public?void?run()?{
          ????????ObjectInputStream inputStream = null;
          ????????ObjectOutputStream outputStream = null;
          ????????try?{
          ????????????inputStream = new?ObjectInputStream(socket.getInputStream());
          ????????????// 反序列化
          ????????????RpcRequest rpcRequest = (RpcRequest) inputStream.readObject();

          ????????????// 中間代理執(zhí)行目標(biāo)方法
          ????????????Mediator mediator = Mediator.getInstance();
          ????????????Object response = mediator.processor(rpcRequest);
          ????????????System.out.println("服務(wù)端的執(zhí)行結(jié)果:"+response);

          ????????????outputStream = new?ObjectOutputStream(socket.getOutputStream());
          ????????????outputStream.writeObject(response);
          ????????????outputStream.flush();

          ????????} catch?(Exception e) {
          ????????????e.printStackTrace();
          ????????} finally?{
          ????????????closeStream(inputStream, outputStream);
          ????????}
          ????}

          ????private?void?closeStream(ObjectInputStream inputStream, ObjectOutputStream outputStream)?{
          ????????// 關(guān)閉流
          ????????if(inputStream!=null){
          ????????????try?{
          ????????????????inputStream.close();
          ????????????} catch?(IOException e) {
          ????????????????e.printStackTrace();
          ????????????}
          ????????}
          ????????if?(outputStream!=null){
          ????????????try?{
          ????????????????outputStream.close();
          ????????????} catch?(IOException e) {
          ????????????????e.printStackTrace();
          ????????????}
          ????????}
          ????}
          服務(wù)端代理
          Mediator
          /**
          ?* 服務(wù)端socket與目標(biāo)方法的中間代理層
          ?*
          ?* @author: ***
          ?* @date: 2020/6/21 16:25
          ?*/

          public?class?Mediator?{

          ????/** 用來存儲(chǔ)發(fā)布的服務(wù)的實(shí)例(服務(wù)調(diào)用的路由) */
          ????public?static?Map ROUTING = new?ConcurrentHashMap<>();

          ????/** 單例模式創(chuàng)建該代理層實(shí)例 */
          ????private?volatile?static?Mediator instance;

          ????private?Mediator() {
          ????}

          ????public?static?Mediator getInstance() {
          ????????if?(instance == null) {
          ????????????synchronized (Mediator.class) {
          ????????????????if?(instance == null) {
          ????????????????????instance = new?Mediator();
          ????????????????}
          ????????????}
          ????????}
          ????????return?instance;
          ????}

          ????public?Object processor(RpcRequest rpcRequest) {
          ????????// 路由key
          ????????String routingKey = rpcRequest.getClassName() + "."?+ rpcRequest.getMethodName();
          ????????BeanMethod beanMethod = ROUTING.get(routingKey);
          ????????if?(beanMethod == null) {
          ????????????return?null;
          ????????}
          ????????// 執(zhí)行目標(biāo)方法
          ????????Object bean = beanMethod.getBean();
          ????????Method method = beanMethod.getMethod();
          ????????try?{
          ????????????return?method.invoke(bean, rpcRequest.getArgs());
          ????????} catch?(Exception e) {
          ????????????e.printStackTrace();
          ????????}
          ????????return?null;
          ????}
          }


          BeanMethod
          /**
          ?* 中間層反射調(diào)用時(shí),存儲(chǔ)目標(biāo)方法、目標(biāo)類的實(shí)體
          ?*
          ?* @author: ***
          ?* @date: 2020/6/21 16:43
          ?*/

          public?class?BeanMethod?{

          ????private?Object bean;

          ????private?Method method;

          ????// setter、getter略
          }


          客戶端項(xiàng)目


          項(xiàng)目結(jié)構(gòu)

          服務(wù)發(fā)現(xiàn)

          服務(wù)發(fā)現(xiàn)我們同樣使用注解來做,這就需要參照Spring中@Autowired的原理實(shí)現(xiàn),我們自定義@RpcReference注解,定義在字段上,將接口實(shí)現(xiàn)的代理類注入到該字段上。
          @RpcReference注解
          /**
          ?* 服務(wù)注入注解
          ?*
          ?* @author: ***
          ?* @date: 2020/6/20 22:41
          ?*/

          @Target(ElementType.FIELD)
          @Retention(RetentionPolicy.RUNTIME)
          @Component
          public @interface?RpcReference {


          服務(wù)發(fā)現(xiàn)類ReferenceInvokeProxy
          在spring容器初始化之前,掃描bean中所有@RpcReference注解標(biāo)記的字段。
          /**
          ?* 遠(yuǎn)程動(dòng)態(tài)調(diào)用service代理
          ?*
          ?* @author: ***
          ?* @date: 2020/6/20 22:44
          ?*/

          @Component
          public?class?ReferenceInvokeProxy?implements?BeanPostProcessor?{

          ????@Autowired
          ????private?RemoteInvocationHandler invocationHandler;

          ????@Override
          ????public?Object postProcessBeforeInitialization(Object bean, String beanName)?throws?BeansException {
          ????????Field[] fields = bean.getClass().getDeclaredFields();
          ????????for?(Field field : fields) {
          ????????????if?(field.isAnnotationPresent(RpcReference.class)) {
          ????????????????field.setAccessible(true);
          ????????????????// 針對(duì)這個(gè)加了RpcReference注解的字段,設(shè)置為一個(gè)代理的值
          ????????????????Object proxy = Proxy.newProxyInstance(field.getType().getClassLoader(), new?Class[]{field.getType()}, invocationHandler);
          ????????????????try?{
          ????????????????????// 相當(dāng)于針對(duì)加了RpcReference的注解,設(shè)置了一個(gè)代理,這個(gè)代理的實(shí)現(xiàn)是invocationHandler
          ????????????????????field.set(bean, proxy);
          ????????????????} catch?(IllegalAccessException e) {
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}
          ????????}
          ????????return?bean;
          ????}
          }
          客戶端代理
          客戶端動(dòng)態(tài)代理InvocationHandler實(shí)現(xiàn)類RemoteInvocationHandler
          將目標(biāo)方法名、目標(biāo)類名、參數(shù)信息封裝到RpcRequest,然后交給socket發(fā)送到服務(wù)端。
          /**
          ?* @author: ***
          ?* @date: 2020/6/20 22:51
          ?*/

          @Component
          public?class?RemoteInvocationHandler?implements?InvocationHandler?{

          ????@Autowired
          ????private?RpcNetTransport rpcNetTransport;

          ????@Override
          ????public?Object invoke(Object proxy, Method method, Object[] args)?throws?Throwable {
          ????????RpcRequest rpcRequest = new?RpcRequest();
          ????????rpcRequest.setClassName(method.getDeclaringClass().getName());
          ????????rpcRequest.setMethodName(method.getName());
          ????????rpcRequest.setTypes(method.getParameterTypes());
          ????????rpcRequest.setArgs(args);
          ????????return?rpcNetTransport.send(rpcRequest);
          ????}
          }
          客戶端socket
          網(wǎng)絡(luò)傳輸RpcNetTransport


          /**
          * rpc socket 網(wǎng)絡(luò)傳輸
          ?*
          ?* @author: ***
          ?* @date: 2020/6/20 22:59
          ?*/

          @Component
          public?class?RpcNetTransport?{
          ????@Value("${rpc.host}")
          ????private?String host;
          ????@Value("${rpc.port}")
          ????private?int?port;


          ????public?Object send(RpcRequest rpcRequest)?{
          ????????ObjectOutputStream outputStream = null;
          ????????ObjectInputStream inputStream = null;
          ????????try?{
          ????????????Socket socket = new?Socket(host, port);
          ????????????// 發(fā)送目標(biāo)方法信息
          ????????????outputStream = new?ObjectOutputStream(socket.getOutputStream());
          ????????????outputStream.writeObject(rpcRequest);
          ????????????outputStream.flush();
          ??????// 接收返回值
          ????????????inputStream = new?ObjectInputStream(socket.getInputStream());
          ????????????return?inputStream.readObject();
          ????????} catch?(IOException | ClassNotFoundException e) {
          ????????????e.printStackTrace();
          ????????} finally?{
          ????????????closeStream(inputStream, outputStream);
          ????????}
          ????????return?null;
          ????}

          ????private?void?closeStream(ObjectInputStream inputStream, ObjectOutputStream outputStream)?{
          ????????// 關(guān)閉流
          ????????if(inputStream!=null){
          ????????????try?{
          ????????????????inputStream.close();
          ????????????} catch?(IOException e) {
          ????????????????e.printStackTrace();
          ????????????}
          ????????}
          ????????if?(outputStream!=null){
          ????????????try?{
          ????????????????outputStream.close();
          ????????????} catch?(IOException e) {
          ????????????????e.printStackTrace();
          ????????????}
          ????????}
          ????}
          }




          -END-

          如果看到這里,說明你喜歡這篇文章,請(qǐng)?轉(zhuǎn)發(fā)、點(diǎn)贊。同時(shí)?標(biāo)星(置頂)本公眾號(hào)可以第一時(shí)間接受到博文推送。

          博主微信:baiseyumaoxx,之前博主分享了很多資源,有的已經(jīng)刪除了(你懂得),如果有的你當(dāng)時(shí)沒有領(lǐng)到還想領(lǐng)得就可以加我微信,我在發(fā)給你,你需要得資源也可以給我說,我盡力給你找~


          明天見(??ω??)??


          喜歡文章,點(diǎn)個(gè)在看?

          瀏覽 66
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  蜜桃成人无码AV在线观看一电影 | 影音先锋亚洲无码在线观看 | 色国产avav | 国产偷抇久久精品A片69探花 | 蜜桃AV久久经品人人搡 |