10個類手寫實現(xiàn) RPC 通信框架原理
鏈接:autumn200.com/2020/06/21/write-rpc/
??? ?
? ?正文? ?
什么是rpc
RPC:remote procedure call Protocol 遠(yuǎn)程過程調(diào)用 調(diào)用遠(yuǎn)程服務(wù),就像調(diào)用本地的服務(wù)一樣,不用關(guān)心調(diào)用細(xì)節(jié),就像調(diào)用本機的服務(wù)一樣的
RPC原理
實現(xiàn)RPC通信的程序包括5個部分:rpc-client、客戶端proxy、socket、服務(wù)端proxy、rpc-server

request
客戶端:當(dāng)rpc-client發(fā)起遠(yuǎn)程調(diào)用時,實際上是通過客戶端代理 將要調(diào)用的接口、方法、參數(shù)、參數(shù)類型進(jìn)行序列化,然后通過socket實時將封裝調(diào)用參數(shù)的實例發(fā)送到服務(wù)端。 服務(wù)端:socket接收到客戶端傳來的信息進(jìn)行反序列化,然后通過這些信息委派到具體的實現(xiàn)對象
response
服務(wù)端:目標(biāo)方法執(zhí)行完成后,將執(zhí)行結(jié)果返回給socket 客戶端:socket接收到結(jié)果后,返回給rpc-client,調(diào)用結(jié)束
應(yīng)用到的技術(shù)
java spring 序列化 socket 反射 動態(tài)代理
項目GitHub地址
https://github.com/autumnqfeng/write_rpc
服務(wù)端項目
項目結(jié)構(gòu)
rpc-server項目包含2個子項目:order-api、order-provider
order-api中存放請求接口與RpcRequest(類名、方法名、參數(shù)的實體類)
order-provider為請求接口實現(xiàn)、socket、proxy相關(guān)類

order-api

order-provider

服務(wù)注冊
要想實現(xiàn)動態(tài)調(diào)用ServiceImpl,關(guān)鍵就需要將service類管理起來,那問題來了,我們?nèi)绾喂芾磉@些服務(wù)類呢?
我們可以參照spring中的@Service注解,通過自定義注解的方式來做到服務(wù)注冊,我們定義一個注解@RpcRemoteService作用在ServiceImpl類上,將標(biāo)記此注解的類名、方法名保存到Map中,以此來定位到具體實現(xiàn)類。
@RpcRemoteService注解
/**
?*?服務(wù)端服務(wù)發(fā)現(xiàn)注解
?*
?*?@author:?***
?*?@date:?2020/6/21?16:21
?*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public?@interface?RpcRemoteService?{
}
服務(wù)注冊類InitialMerdiator
在spring容器初始化完成之后,掃描到@RpcRemoteService標(biāo)記的類,并保存到Mediator.ROUTING中。
/**
?*?初始化中間代理層對象
?*
?*?@author:?***
?*?@date:?2020/6/21?16:33
?*/
@Component
public?class?InitialMerdiator?implements?BeanPostProcessor?{
????@Override
????public?Object?postProcessAfterInitialization(Object?bean,?String?beanName)?throws?BeansException?{
????????//加了服務(wù)發(fā)布標(biāo)記的bean進(jìn)行遠(yuǎn)程發(fā)布
????????if?(bean.getClass().isAnnotationPresent(RpcRemoteService.class))?{
????????????Method[]?methods?=?bean.getClass().getDeclaredMethods();
????????????for?(Method?method?:?methods)?{
????????????????String?routingKey?=?bean.getClass().getInterfaces()[0].getName()?+?"."?+?method.getName();
????????????????BeanMethod?beanMethod?=?new?BeanMethod();
????????????????beanMethod.setBean(bean);
????????????????beanMethod.setMethod(method);
????????????????Mediator.ROUTING.put(routingKey,?beanMethod);
????????????}
????????}
????????return?bean;
????}
}
socket監(jiān)聽
socket監(jiān)聽客戶端請求
socket啟動類SocketServer
spring容器加載完成之后,啟動socket
/**
?*?spring?容器啟動完成之后,會發(fā)布一個ContextRefreshedEven
?*?容器啟動后啟動socket監(jiān)聽
?*
?*?@author:?***
?*?@date:?2020/6/21?16:51
?*/
@Component
public?class?SocketServer?implements?ApplicationListener<ContextRefreshedEvent>?{
????private?final?ExecutorService?executorService=?Executors.newCachedThreadPool();
????@Override
????public?void?onApplicationEvent(ContextRefreshedEvent?contextRefreshedEvent)?{
????????ServerSocket?serverSocket=null;
????????try?{
????????????serverSocket?=?new?ServerSocket(8888);
????????????while?(true)?{
????????????????Socket?accept?=?serverSocket.accept();
????????????????//?線程池處理socket
????executorService.execute(new?ProcessorHandler(accept));
????????????}
????????}?catch?(IOException?e)?{
????????????e.printStackTrace();
????????}?finally?{
????????????if?(serverSocket?!=?null)?{
????????????????try?{
????????????????????serverSocket.close();
????????????????}?catch?(IOException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????}
????}
socket處理類ProcessorHandler
處理監(jiān)聽到的每個socket
public?class?ProcessorHandler?implements?Runnable?{
????private?Socket?socket;
????public?ProcessorHandler(Socket?socket)?{
????????this.socket?=?socket;
????}
????@Override
????public?void?run()?{
????????ObjectInputStream?inputStream?=?null;
????????ObjectOutputStream?outputStream?=?null;
????????try?{
????????????inputStream?=?new?ObjectInputStream(socket.getInputStream());
????????????//?反序列化
????????????RpcRequest?rpcRequest?=?(RpcRequest)?inputStream.readObject();
????????????//?中間代理執(zhí)行目標(biāo)方法
????????????Mediator?mediator?=?Mediator.getInstance();
????????????Object?response?=?mediator.processor(rpcRequest);
????????????System.out.println("服務(wù)端的執(zhí)行結(jié)果:"+response);
????????????outputStream?=?new?ObjectOutputStream(socket.getOutputStream());
????????????outputStream.writeObject(response);
????????????outputStream.flush();
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}?finally?{
????????????closeStream(inputStream,?outputStream);
????????}
????}
????private?void?closeStream(ObjectInputStream?inputStream,?ObjectOutputStream?outputStream)?{
????????//?關(guān)閉流
????????if(inputStream!=null){
????????????try?{
????????????????inputStream.close();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????????if?(outputStream!=null){
????????????try?{
????????????????outputStream.close();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????}
服務(wù)端代理
Mediator
/**
?*?服務(wù)端socket與目標(biāo)方法的中間代理層
?*
?*?@author:?***
?*?@date:?2020/6/21?16:25
?*/
public?class?Mediator?{
????/**?用來存儲發(fā)布的服務(wù)的實例(服務(wù)調(diào)用的路由)?*/
????public?static?Map?ROUTING?=?new?ConcurrentHashMap<>();
????/**?單例模式創(chuàng)建該代理層實例?*/
????private?volatile?static?Mediator?instance;
????private?Mediator()?{
????}
????public?static?Mediator?getInstance()?{
????????if?(instance?==?null)?{
????????????synchronized?(Mediator.class)?{
????????????????if?(instance?==?null)?{
????????????????????instance?=?new?Mediator();
????????????????}
????????????}
????????}
????????return?instance;
????}
????public?Object?processor(RpcRequest?rpcRequest)?{
????????//?路由key
????????String?routingKey?=?rpcRequest.getClassName()?+?"."?+?rpcRequest.getMethodName();
????????BeanMethod?beanMethod?=?ROUTING.get(routingKey);
????????if?(beanMethod?==?null)?{
????????????return?null;
????????}
????????//?執(zhí)行目標(biāo)方法
????????Object?bean?=?beanMethod.getBean();
????????Method?method?=?beanMethod.getMethod();
????????try?{
????????????return?method.invoke(bean,?rpcRequest.getArgs());
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????????return?null;
????}
}
BeanMethod
/**
?*?中間層反射調(diào)用時,存儲目標(biāo)方法、目標(biāo)類的實體
?*
?*?@author:?***
?*?@date:?2020/6/21?16:43
?*/
public?class?BeanMethod?{
????private?Object?bean;
????private?Method?method;
????//?setter、getter略
}
客戶端項目
項目結(jié)構(gòu)

服務(wù)發(fā)現(xiàn)
服務(wù)發(fā)現(xiàn)我們同樣使用注解來做,這就需要參照Spring中@Autowired的原理實現(xiàn),我們自定義@RpcReference注解,定義在字段上,將接口實現(xiàn)的代理類注入到該字段上。
@RpcReference注解
/**
?*?服務(wù)注入注解
?*
?*?@author:?***
?*?@date:?2020/6/20?22:41
?*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Component
public?@interface?RpcReference?{
}
服務(wù)發(fā)現(xiàn)類ReferenceInvokeProxy
在spring容器初始化之前,掃描bean中所有@RpcReference注解標(biāo)記的字段。
/**
?*?遠(yuǎn)程動態(tài)調(diào)用service代理
?*
?*?@author:?***
?*?@date:?2020/6/20?22:44
?*/
@Component
public?class?ReferenceInvokeProxy?implements?BeanPostProcessor?{
????@Autowired
????private?RemoteInvocationHandler?invocationHandler;
????@Override
????public?Object?postProcessBeforeInitialization(Object?bean,?String?beanName)?throws?BeansException?{
????????Field[]?fields?=?bean.getClass().getDeclaredFields();
????????for?(Field?field?:?fields)?{
????????????if?(field.isAnnotationPresent(RpcReference.class))?{
????????????????field.setAccessible(true);
????????????????//?針對這個加了RpcReference注解的字段,設(shè)置為一個代理的值
????????????????Object?proxy?=?Proxy.newProxyInstance(field.getType().getClassLoader(),?new?Class[]{field.getType()},?invocationHandler);
????????????????try?{
????????????????????//?相當(dāng)于針對加了RpcReference的注解,設(shè)置了一個代理,這個代理的實現(xiàn)是invocationHandler
????????????????????field.set(bean,?proxy);
????????????????}?catch?(IllegalAccessException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????}
????????return?bean;
????}
}
客戶端代理
客戶端動態(tài)代理InvocationHandler實現(xiàn)類RemoteInvocationHandler
將目標(biāo)方法名、目標(biāo)類名、參數(shù)信息封裝到RpcRequest,然后交給socket發(fā)送到服務(wù)端。
/**
?*?@author:?***
?*?@date:?2020/6/20?22:51
?*/
@Component
public?class?RemoteInvocationHandler?implements?InvocationHandler?{
????@Autowired
????private?RpcNetTransport?rpcNetTransport;
????@Override
????public?Object?invoke(Object?proxy,?Method?method,?Object[]?args)?throws?Throwable?{
????????RpcRequest?rpcRequest?=?new?RpcRequest();
????????rpcRequest.setClassName(method.getDeclaringClass().getName());
????????rpcRequest.setMethodName(method.getName());
????????rpcRequest.setTypes(method.getParameterTypes());
????????rpcRequest.setArgs(args);
????????return?rpcNetTransport.send(rpcRequest);
????}
}
客戶端socket
網(wǎng)絡(luò)傳輸RpcNetTransport
/**
?*?rpc?socket?網(wǎng)絡(luò)傳輸
?*
?*?@author:?***
?*?@date:?2020/6/20?22:59
?*/
@Component
public?class?RpcNetTransport?{
????@Value("${rpc.host}")
????private?String?host;
????@Value("${rpc.port}")
????private?int?port;
????public?Object?send(RpcRequest?rpcRequest)?{
????????ObjectOutputStream?outputStream?=?null;
????????ObjectInputStream?inputStream?=?null;
????????try?{
????????????Socket?socket?=?new?Socket(host,?port);
????????????//?發(fā)送目標(biāo)方法信息
????????????outputStream?=?new?ObjectOutputStream(socket.getOutputStream());
????????????outputStream.writeObject(rpcRequest);
????????????outputStream.flush();
???//?接收返回值
????????????inputStream?=?new?ObjectInputStream(socket.getInputStream());
????????????return?inputStream.readObject();
????????}?catch?(IOException?|?ClassNotFoundException?e)?{
????????????e.printStackTrace();
????????}?finally?{
????????????closeStream(inputStream,?outputStream);
????????}
????????return?null;
????}
????private?void?closeStream(ObjectInputStream?inputStream,?ObjectOutputStream?outputStream)?{
????????//?關(guān)閉流
????????if(inputStream!=null){
????????????try?{
????????????????inputStream.close();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????????if?(outputStream!=null){
????????????try?{
????????????????outputStream.close();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????}
}