10 個(gè)類手寫實(shí)現(xiàn) RPC 通信框架
鏈接:autumn200.com

什么是rpc
RPC:remote procedure call Protocol 遠(yuǎn)程過程調(diào)用 調(diào)用遠(yuǎn)程服務(wù),就像調(diào)用本地的服務(wù)一樣,不用關(guān)心調(diào)用細(xì)節(jié),就像調(diào)用本機(jī)的服務(wù)一樣的
RPC原理
實(shí)現(xiàn)RPC通信的程序包括5個(gè)部分:rpc-client、客戶端proxy、socket、服務(wù)端proxy、rpc-server

request
客戶端:當(dāng)rpc-client發(fā)起遠(yuǎn)程調(diào)用時(shí),實(shí)際上是通過客戶端代理 將要調(diào)用的接口、方法、參數(shù)、參數(shù)類型進(jìn)行序列化,然后通過socket實(shí)時(shí)將封裝調(diào)用參數(shù)的實(shí)例發(fā)送到服務(wù)端。 服務(wù)端:socket接收到客戶端傳來的信息進(jìn)行反序列化,然后通過這些信息委派到具體的實(shí)現(xiàn)對(duì)象
response
服務(wù)端:目標(biāo)方法執(zhí)行完成后,將執(zhí)行結(jié)果返回給socket 客戶端:socket接收到結(jié)果后,返回給rpc-client,調(diào)用結(jié)束
應(yīng)用到的技術(shù)
java spring 序列化 socket 反射 動(dòng)態(tài)代理
項(xiàng)目GitHub地址
掘金地址
服務(wù)端項(xiàng)目
項(xiàng)目結(jié)構(gòu)
RpcRequest(類名、方法名、參數(shù)的實(shí)體類)
order-api

order-provider

服務(wù)注冊(cè)
ServiceImpl,關(guān)鍵就需要將service類管理起來,那問題來了,我們?nèi)绾喂芾磉@些服務(wù)類呢?@Service注解,通過自定義注解的方式來做到服務(wù)注冊(cè),我們定義一個(gè)注解@RpcRemoteService作用在ServiceImpl類上,將標(biāo)記此注解的類名、方法名保存到Map中,以此來定位到具體實(shí)現(xiàn)類。@RpcRemoteService注解
/**
?* 服務(wù)端服務(wù)發(fā)現(xiàn)注解
?*
?* @author: ***
?* @date: 2020/6/21 16:21
?*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface?RpcRemoteService {
}服務(wù)注冊(cè)類InitialMerdiator
@RpcRemoteService標(biāo)記的類,并保存到Mediator.ROUTING中。/**
?* 初始化中間代理層對(duì)象
?*
?* @author: ***
?* @date: 2020/6/21 16:33
?*/
@Component
public?class?InitialMerdiator?implements?BeanPostProcessor?{
????@Override
????public?Object postProcessAfterInitialization(Object bean, String beanName)?throws?BeansException {
????????//加了服務(wù)發(fā)布標(biāo)記的bean進(jìn)行遠(yuǎn)程發(fā)布
????????if?(bean.getClass().isAnnotationPresent(RpcRemoteService.class)) {
????????????Method[] methods = bean.getClass().getDeclaredMethods();
????????????for?(Method method : methods) {
????????????????String routingKey = bean.getClass().getInterfaces()[0].getName() + "."?+ method.getName();
????????????????BeanMethod beanMethod = new?BeanMethod();
????????????????beanMethod.setBean(bean);
????????????????beanMethod.setMethod(method);
????????????????Mediator.ROUTING.put(routingKey, beanMethod);
????????????}
????????}
????????return?bean;
????}
}socket啟動(dòng)類SocketServer
/**
?* spring 容器啟動(dòng)完成之后,會(huì)發(fā)布一個(gè)ContextRefreshedEven
?* 容器啟動(dòng)后啟動(dòng)socket監(jiān)聽
?*
?* @author: ***
?* @date: 2020/6/21 16:51
?*/
@Component
public?class?SocketServer?implements?ApplicationListener<ContextRefreshedEvent> {
????private?final?ExecutorService executorService= Executors.newCachedThreadPool();
????@Override
????public?void?onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent)?{
????????ServerSocket serverSocket=null;
????????try?{
????????????serverSocket = new?ServerSocket(8888);
????????????while?(true) {
????????????????Socket accept = serverSocket.accept();
????????????????// 線程池處理socket
????????executorService.execute(new?ProcessorHandler(accept));
????????????}
????????} catch?(IOException e) {
????????????e.printStackTrace();
????????} finally?{
????????????if?(serverSocket != null) {
????????????????try?{
????????????????????serverSocket.close();
????????????????} catch?(IOException e) {
????????????????????e.printStackTrace();
????????????????}
????????????}
????????}
????}ProcessorHandlerpublic?class?ProcessorHandler?implements?Runnable?{
????private?Socket socket;
????public?ProcessorHandler(Socket socket)?{
????????this.socket = socket;
????}
????@Override
????public?void?run()?{
????????ObjectInputStream inputStream = null;
????????ObjectOutputStream outputStream = null;
????????try?{
????????????inputStream = new?ObjectInputStream(socket.getInputStream());
????????????// 反序列化
????????????RpcRequest rpcRequest = (RpcRequest) inputStream.readObject();
????????????// 中間代理執(zhí)行目標(biāo)方法
????????????Mediator mediator = Mediator.getInstance();
????????????Object response = mediator.processor(rpcRequest);
????????????System.out.println("服務(wù)端的執(zhí)行結(jié)果:"+response);
????????????outputStream = new?ObjectOutputStream(socket.getOutputStream());
????????????outputStream.writeObject(response);
????????????outputStream.flush();
????????} catch?(Exception e) {
????????????e.printStackTrace();
????????} finally?{
????????????closeStream(inputStream, outputStream);
????????}
????}
????private?void?closeStream(ObjectInputStream inputStream, ObjectOutputStream outputStream)?{
????????// 關(guān)閉流
????????if(inputStream!=null){
????????????try?{
????????????????inputStream.close();
????????????} catch?(IOException e) {
????????????????e.printStackTrace();
????????????}
????????}
????????if?(outputStream!=null){
????????????try?{
????????????????outputStream.close();
????????????} catch?(IOException e) {
????????????????e.printStackTrace();
????????????}
????????}
????}Mediator
/**
?* 服務(wù)端socket與目標(biāo)方法的中間代理層
?*
?* @author: ***
?* @date: 2020/6/21 16:25
?*/
public?class?Mediator?{
????/** 用來存儲(chǔ)發(fā)布的服務(wù)的實(shí)例(服務(wù)調(diào)用的路由) */
????public?static?Map ROUTING = new?ConcurrentHashMap<>();
????/** 單例模式創(chuàng)建該代理層實(shí)例 */
????private?volatile?static?Mediator instance;
????private?Mediator() {
????}
????public?static?Mediator getInstance() {
????????if?(instance == null) {
????????????synchronized (Mediator.class) {
????????????????if?(instance == null) {
????????????????????instance = new?Mediator();
????????????????}
????????????}
????????}
????????return?instance;
????}
????public?Object processor(RpcRequest rpcRequest) {
????????// 路由key
????????String routingKey = rpcRequest.getClassName() + "."?+ rpcRequest.getMethodName();
????????BeanMethod beanMethod = ROUTING.get(routingKey);
????????if?(beanMethod == null) {
????????????return?null;
????????}
????????// 執(zhí)行目標(biāo)方法
????????Object bean = beanMethod.getBean();
????????Method method = beanMethod.getMethod();
????????try?{
????????????return?method.invoke(bean, rpcRequest.getArgs());
????????} catch?(Exception e) {
????????????e.printStackTrace();
????????}
????????return?null;
????}
} BeanMethod
/**
?* 中間層反射調(diào)用時(shí),存儲(chǔ)目標(biāo)方法、目標(biāo)類的實(shí)體
?*
?* @author: ***
?* @date: 2020/6/21 16:43
?*/
public?class?BeanMethod?{
????private?Object bean;
????private?Method method;
????// setter、getter略
}客戶端項(xiàng)目
項(xiàng)目結(jié)構(gòu)

服務(wù)發(fā)現(xiàn)
@Autowired的原理實(shí)現(xiàn),我們自定義@RpcReference注解,定義在字段上,將接口實(shí)現(xiàn)的代理類注入到該字段上。@RpcReference注解
/**
?* 服務(wù)注入注解
?*
?* @author: ***
?* @date: 2020/6/20 22:41
?*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface?RpcReference {服務(wù)發(fā)現(xiàn)類ReferenceInvokeProxy
@RpcReference注解標(biāo)記的字段。/**
?* 遠(yuǎn)程動(dòng)態(tài)調(diào)用service代理
?*
?* @author: ***
?* @date: 2020/6/20 22:44
?*/
@Component
public?class?ReferenceInvokeProxy?implements?BeanPostProcessor?{
????@Autowired
????private?RemoteInvocationHandler invocationHandler;
????@Override
????public?Object postProcessBeforeInitialization(Object bean, String beanName)?throws?BeansException {
????????Field[] fields = bean.getClass().getDeclaredFields();
????????for?(Field field : fields) {
????????????if?(field.isAnnotationPresent(RpcReference.class)) {
????????????????field.setAccessible(true);
????????????????// 針對(duì)這個(gè)加了RpcReference注解的字段,設(shè)置為一個(gè)代理的值
????????????????Object proxy = Proxy.newProxyInstance(field.getType().getClassLoader(), new?Class>[]{field.getType()}, invocationHandler);
????????????????try?{
????????????????????// 相當(dāng)于針對(duì)加了RpcReference的注解,設(shè)置了一個(gè)代理,這個(gè)代理的實(shí)現(xiàn)是invocationHandler
????????????????????field.set(bean, proxy);
????????????????} catch?(IllegalAccessException e) {
????????????????????e.printStackTrace();
????????????????}
????????????}
????????}
????????return?bean;
????}
}客戶端動(dòng)態(tài)代理InvocationHandler實(shí)現(xiàn)類RemoteInvocationHandler
RpcRequest,然后交給socket發(fā)送到服務(wù)端。/**
?* @author: ***
?* @date: 2020/6/20 22:51
?*/
@Component
public?class?RemoteInvocationHandler?implements?InvocationHandler?{
????@Autowired
????private?RpcNetTransport rpcNetTransport;
????@Override
????public?Object invoke(Object proxy, Method method, Object[] args)?throws?Throwable {
????????RpcRequest rpcRequest = new?RpcRequest();
????????rpcRequest.setClassName(method.getDeclaringClass().getName());
????????rpcRequest.setMethodName(method.getName());
????????rpcRequest.setTypes(method.getParameterTypes());
????????rpcRequest.setArgs(args);
????????return?rpcNetTransport.send(rpcRequest);
????}
}網(wǎng)絡(luò)傳輸RpcNetTransport
/**
* rpc socket 網(wǎng)絡(luò)傳輸
?*
?* @author: ***
?* @date: 2020/6/20 22:59
?*/
@Component
public?class?RpcNetTransport?{
????@Value("${rpc.host}")
????private?String host;
????@Value("${rpc.port}")
????private?int?port;
????public?Object send(RpcRequest rpcRequest)?{
????????ObjectOutputStream outputStream = null;
????????ObjectInputStream inputStream = null;
????????try?{
????????????Socket socket = new?Socket(host, port);
????????????// 發(fā)送目標(biāo)方法信息
????????????outputStream = new?ObjectOutputStream(socket.getOutputStream());
????????????outputStream.writeObject(rpcRequest);
????????????outputStream.flush();
??????// 接收返回值
????????????inputStream = new?ObjectInputStream(socket.getInputStream());
????????????return?inputStream.readObject();
????????} catch?(IOException | ClassNotFoundException e) {
????????????e.printStackTrace();
????????} finally?{
????????????closeStream(inputStream, outputStream);
????????}
????????return?null;
????}
????private?void?closeStream(ObjectInputStream inputStream, ObjectOutputStream outputStream)?{
????????// 關(guān)閉流
????????if(inputStream!=null){
????????????try?{
????????????????inputStream.close();
????????????} catch?(IOException e) {
????????????????e.printStackTrace();
????????????}
????????}
????????if?(outputStream!=null){
????????????try?{
????????????????outputStream.close();
????????????} catch?(IOException e) {
????????????????e.printStackTrace();
????????????}
????????}
????}
}如果看到這里,說明你喜歡這篇文章,請(qǐng)?轉(zhuǎn)發(fā)、點(diǎn)贊。同時(shí)?標(biāo)星(置頂)本公眾號(hào)可以第一時(shí)間接受到博文推送。
博主微信:baiseyumaoxx,之前博主分享了很多資源,有的已經(jīng)刪除了(你懂得),如果有的你當(dāng)時(shí)沒有領(lǐng)到還想領(lǐng)得就可以加我微信,我在發(fā)給你,你需要得資源也可以給我說,我盡力給你找~
明天見(??ω??)??
喜歡文章,點(diǎn)個(gè)在看?
評(píng)論
圖片
表情
