從零開始,徒手?jǐn)]一個簡單的 RPC 框架,輕松搞定!

得知了RPC(遠(yuǎn)程過程調(diào)用)簡單來說就是調(diào)用遠(yuǎn)程的服務(wù)就像調(diào)用本地方法一樣,其中用到的知識有序列化和反序列化、動態(tài)代理、網(wǎng)絡(luò)傳輸、動態(tài)加載、反射這些知識點(diǎn)。發(fā)現(xiàn)這些知識都了解一些。所以就想著試試自己實(shí)現(xiàn)一個簡單的RPC框架,即鞏固了基礎(chǔ)的知識,也能更加深入的了解RPC原理。當(dāng)然一個完整的RPC框架包含了許多的功能,例如服務(wù)的發(fā)現(xiàn)與治理,網(wǎng)關(guān)等等。本篇只是簡單的實(shí)現(xiàn)了一個調(diào)用的過程。
傳參出參分析
一個簡單請求可以抽象為兩步

那么就根據(jù)這兩步進(jìn)行分析,在請求之前我們應(yīng)該發(fā)送給服務(wù)端什么信息?而服務(wù)端處理完以后應(yīng)該返回客戶端什么信息?
1、在請求之前我們應(yīng)該發(fā)送給服務(wù)端什么信息?
由于我們在客戶端調(diào)用的是服務(wù)端提供的接口,所以我們需要將客戶端調(diào)用的信息傳輸過去,那么我們可以將要傳輸?shù)男畔⒎譃閮深?/p>
第一類是服務(wù)端可以根據(jù)這個信息找到相應(yīng)的接口實(shí)現(xiàn)類和方法 第二類是調(diào)用此方法傳輸?shù)膮?shù)信息
那么我們就根據(jù)要傳輸?shù)膬深愋畔⑦M(jìn)行分析,什么信息能夠找到相應(yīng)的實(shí)現(xiàn)類的相應(yīng)的方法?要找到方法必須要先找到類,這里我們可以簡單的用Spring提供的Bean實(shí)例管理ApplicationContext進(jìn)行類的尋找。所以要找到類的實(shí)例只需要知道此類的名字就行,找到了類的實(shí)例,那么如何找到方法呢?在反射中通過反射能夠根據(jù)方法名和參數(shù)類型從而找到這個方法。那么此時第一類的信息我們就明了了,那么就建立相應(yīng)的是實(shí)體類存儲這些信息。
@Data
public?class?Request?implements?Serializable?{
????private?static?final?long?serialVersionUID?=?3933918042687238629L;
????private?String?className;
????private?String?methodName;
????private?Class>?[]?parameTypes;
????private?Object?[]?parameters;
}
2、服務(wù)端處理完以后應(yīng)該返回客戶端什么信息?
上面我們分析了客戶端應(yīng)該傳輸什么信息給服務(wù)端,那么服務(wù)端處理完以后應(yīng)該傳什么樣的返回值呢?這里我們只考慮最簡單的情況,客戶端請求的線程也會一直在等著,不會有異步處理這一說,所以這么分析的話就簡單了,直接將得到的處理結(jié)果返回就行了。
@Data
public?class?Response?implements?Serializable?{
????private?static?final?long?serialVersionUID?=?-2393333111247658778L;
????private?Object?result;
}
由于都涉及到了網(wǎng)絡(luò)傳輸,所以都要實(shí)現(xiàn)序列化的接口
如何獲得傳參信息并執(zhí)行?-客戶端
上面我們分析了客戶端向服務(wù)端發(fā)送的信息都有哪些?那么我們?nèi)绾潍@得這些信息呢?首先我們調(diào)用的是接口,所以我們需要寫自定義注解然后在程序啟動的時候?qū)⑦@些信息加載在Spring容器中。有了這些信息那么我們就需要傳輸了,調(diào)用接口但是實(shí)際上執(zhí)行的確實(shí)網(wǎng)絡(luò)傳輸?shù)倪^程,所以我們需要動態(tài)代理。那么就可以分為以下兩步
初始化信息階段:將key為接口名,value為動態(tài)接口類注冊進(jìn)Spring容器中 執(zhí)行階段:通過動態(tài)代理,實(shí)際執(zhí)行網(wǎng)絡(luò)傳輸
1、初始化信息階段
由于我們使用Spring作為Bean的管理,所以要將接口和對應(yīng)的代理類注冊進(jìn)Spring容器中。而我們?nèi)绾握业轿覀兿胍{(diào)用的接口類呢?我們可以自定義注解進(jìn)行掃描。將想要調(diào)用的接口全部注冊進(jìn)容器中。
創(chuàng)建一個注解類,用于標(biāo)注哪些接口是可以進(jìn)行Rpc的
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public?@interface?RpcClient?{
}
@RpcClient注解的掃描類RpcInitConfig,將其注冊進(jìn)Spring容器中public?class?RpcInitConfig?implements?ImportBeanDefinitionRegistrar{
????@Override
????public?void?registerBeanDefinitions(AnnotationMetadata?importingClassMetadata,?BeanDefinitionRegistry?registry)?{
????????ClassPathScanningCandidateComponentProvider?provider?=?getScanner();
????????//設(shè)置掃描器
????????provider.addIncludeFilter(new?AnnotationTypeFilter(RpcClient.class));
????????//掃描此包下的所有帶有@RpcClient的注解的類
????????Set?beanDefinitionSet?=?provider.findCandidateComponents("com.example.rpcclient.client");
????????for?(BeanDefinition?beanDefinition?:?beanDefinitionSet){
????????????if?(beanDefinition?instanceof?AnnotatedBeanDefinition){
????????????????//獲得注解上的參數(shù)信息
????????????????AnnotatedBeanDefinition?annotatedBeanDefinition?=?(AnnotatedBeanDefinition)?beanDefinition;
????????????????String?beanClassAllName?=?beanDefinition.getBeanClassName();
????????????????Map?paraMap?=?annotatedBeanDefinition.getMetadata()
????????????????????????.getAnnotationAttributes(RpcClient.class.getCanonicalName());
????????????????//將RpcClient的工廠類注冊進(jìn)去
????????????????BeanDefinitionBuilder?builder?=?BeanDefinitionBuilder
????????????????????????.genericBeanDefinition(RpcClinetFactoryBean.class);
????????????????//設(shè)置RpcClinetFactoryBean工廠類中的構(gòu)造函數(shù)的值
????????????????builder.addConstructorArgValue(beanClassAllName);
????????????????builder.getBeanDefinition().setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
????????????????//將其注冊進(jìn)容器中
????????????????registry.registerBeanDefinition(
????????????????????????beanClassAllName?,
????????????????????????builder.getBeanDefinition());
????????????}
????????}
????}
????//允許Spring掃描接口上的注解
????protected?ClassPathScanningCandidateComponentProvider?getScanner()?{
????????return?new?ClassPathScanningCandidateComponentProvider(false)?{
????????????@Override
????????????protected?boolean?isCandidateComponent(AnnotatedBeanDefinition?beanDefinition)?{
????????????????return?beanDefinition.getMetadata().isInterface()?&&?beanDefinition.getMetadata().isIndependent();
????????????}
????????};
????}
}
由于上面注冊的是工廠類,所以我們建立一個工廠類RpcClinetFactoryBean繼承Spring中的FactoryBean類,由其統(tǒng)一創(chuàng)建@RpcClient注解的代理類
如果對FactoryBean類不了解的可以參見FactoryBean講解
@Data
public?class?RpcClinetFactoryBean?implements?FactoryBean?{
????@Autowired
????private?RpcDynamicPro?rpcDynamicPro;
????private?Class>?classType;
????public?RpcClinetFactoryBean(Class>?classType)?{
????????this.classType?=?classType;
????}
????@Override
????public?Object?getObject(){
????????ClassLoader?classLoader?=?classType.getClassLoader();
????????Object?object?=?Proxy.newProxyInstance(classLoader,new?Class>[]{classType},rpcDynamicPro);
????????return?object;
????}
????@Override
????public?Class>?getObjectType()?{
????????return?this.classType;
????}
????@Override
????public?boolean?isSingleton()?{
????????return?false;
????}
}
注意此處的
getObjectType方法,在將工廠類注入到容器中的時候,這個方法返回的是什么Class類型那么注冊進(jìn)容器中就是什么Class類型。
然后看一下我們創(chuàng)建的代理類rpcDynamicPro
@Component
@Slf4j
public?class?RpcDynamicPro?implements?InvocationHandler?{
????@Override
????public?Object?invoke(Object?proxy,?Method?method,?Object[]?args)?throws?Throwable?{
???????String?requestJson?=?objectToJson(method,args);
????????Socket?client?=?new?Socket("127.0.0.1",?20006);
????????client.setSoTimeout(10000);
????????//獲取Socket的輸出流,用來發(fā)送數(shù)據(jù)到服務(wù)端
????????PrintStream?out?=?new?PrintStream(client.getOutputStream());
????????//獲取Socket的輸入流,用來接收從服務(wù)端發(fā)送過來的數(shù)據(jù)
????????BufferedReader?buf?=??new?BufferedReader(new?InputStreamReader(client.getInputStream()));
????????//發(fā)送數(shù)據(jù)到服務(wù)端
????????out.println(requestJson);
????????Response?response?=?new?Response();
????????Gson?gson?=new?Gson();
????????try{
????????????//從服務(wù)器端接收數(shù)據(jù)有個時間限制(系統(tǒng)自設(shè),也可以自己設(shè)置),超過了這個時間,便會拋出該異常
????????????String?responsJson?=?buf.readLine();
????????????response?=?gson.fromJson(responsJson,?Response.class);
????????}catch(SocketTimeoutException?e){
????????????log.info("Time?out,?No?response");
????????}
????????if(client?!=?null){
????????????//如果構(gòu)造函數(shù)建立起了連接,則關(guān)閉套接字,如果沒有建立起連接,自然不用關(guān)閉
????????????client.close();?//只關(guān)閉socket,其關(guān)聯(lián)的輸入輸出流也會被關(guān)閉
????????}
????????return?response.getResult();
????}
????public?String?objectToJson(Method?method,Object?[]?args){
????????Request?request?=?new?Request();
????????String?methodName?=?method.getName();
????????Class>[]?parameterTypes?=?method.getParameterTypes();
????????String?className?=?method.getDeclaringClass().getName();
????????request.setMethodName(methodName);
????????request.setParameTypes(parameterTypes);
????????request.setParameters(args);
????????request.setClassName(getClassName(className));
????????GsonBuilder?gsonBuilder?=?new?GsonBuilder();
????????gsonBuilder.registerTypeAdapterFactory(new?ClassTypeAdapterFactory());
????????Gson?gson?=?gsonBuilder.create();
????????return?gson.toJson(request);
????}
????private?String?getClassName(String?beanClassName){
????????String?className?=?beanClassName.substring(beanClassName.lastIndexOf(".")+1);
????????className?=?className.substring(0,1).toLowerCase()?+?className.substring(1);
????????return?className;
????}
}
我們的客戶端已經(jīng)寫完了,傳給服務(wù)端的信息我們也已經(jīng)拼裝完畢了。剩下的工作就簡單了,開始編寫服務(wù)端的代碼。
服務(wù)端處理完以后應(yīng)該返回客戶端什么信息?-服務(wù)端
服務(wù)端的代碼相比較客戶端來說要簡單一些。可以簡單分為下面三步
拿到接口名以后,通過接口名找到實(shí)現(xiàn)類 通過反射進(jìn)行對應(yīng)方法的執(zhí)行 返回執(zhí)行完的信息
那么我們就根據(jù)這三步進(jìn)行編寫代碼
1、拿到接口名以后,通過接口名找到實(shí)現(xiàn)類
如何通過接口名拿到對應(yīng)接口的實(shí)現(xiàn)類呢?這就需要我們在服務(wù)端啟動的時候?qū)⑵鋵?yīng)信息加載進(jìn)去
@Component
@Log4j
public?class?InitRpcConfig?implements?CommandLineRunner?{
????@Autowired
????private?ApplicationContext?applicationContext;
????public?static?Map?rpcServiceMap?=?new?HashMap<>();
????@Override
????public?void?run(String...?args)?throws?Exception?{
????????Map?beansWithAnnotation?=?applicationContext.getBeansWithAnnotation(Service.class);
????????for?(Object?bean:?beansWithAnnotation.values()){
????????????Class>?clazz?=?bean.getClass();
????????????Class>[]?interfaces?=?clazz.getInterfaces();
????????????for?(Class>?inter?:?interfaces){
????????????????rpcServiceMap.put(getClassName(inter.getName()),bean);
????????????????log.info("已經(jīng)加載的服務(wù):"+inter.getName());
????????????}
????????}
????}
????private?String?getClassName(String?beanClassName){
????????String?className?=?beanClassName.substring(beanClassName.lastIndexOf(".")+1);
????????className?=?className.substring(0,1).toLowerCase()?+?className.substring(1);
????????return?className;
????}
}
rpcServiceMap存儲的就是接口名和其對應(yīng)的實(shí)現(xiàn)類的對應(yīng)關(guān)系。2、通過反射進(jìn)行對應(yīng)方法的執(zhí)行
此時拿到了對應(yīng)關(guān)系以后就能根據(jù)客戶端傳過來的信息找到相應(yīng)的實(shí)現(xiàn)類中的方法。然后進(jìn)行執(zhí)行并返回信息就行
public?Response?invokeMethod(Request?request){
????????String?className?=?request.getClassName();
????????String?methodName?=?request.getMethodName();
????????Object[]?parameters?=?request.getParameters();
????????Class>[]?parameTypes?=?request.getParameTypes();
????????Object?o?=?InitRpcConfig.rpcServiceMap.get(className);
????????Response?response?=?new?Response();
????????try?{
????????????Method?method?=?o.getClass().getDeclaredMethod(methodName,?parameTypes);
????????????Object?invokeMethod?=?method.invoke(o,?parameters);
????????????response.setResult(invokeMethod);
????????}?catch?(NoSuchMethodException?e)?{
????????????log.info("沒有找到"+methodName);
????????}?catch?(IllegalAccessException?e)?{
????????????log.info("執(zhí)行錯誤"+parameters);
????????}?catch?(InvocationTargetException?e)?{
????????????log.info("執(zhí)行錯誤"+parameters);
????????}
????????return?response;
????}
現(xiàn)在我們兩個服務(wù)都啟動起來并且在客戶端進(jìn)行調(diào)用就發(fā)現(xiàn)只是調(diào)用接口就能調(diào)用過來了。
總結(jié)
到現(xiàn)在一個簡單的RPC就完成了,但是其中還有很多的功能需要完善,例如一個完整RPC框架肯定還需要服務(wù)注冊與發(fā)現(xiàn),而且雙方通信肯定也不能是直接開啟一個線程一直在等著,肯定需要是異步的等等的各種功能。后面隨著學(xué)習(xí)的深入,這個框架也會慢慢增加一些東西。不僅是對所學(xué)知識的一個應(yīng)用,更是一個總結(jié)。有時候?qū)W一個東西學(xué)起來覺得很簡單,但是真正應(yīng)用的時候就會發(fā)現(xiàn)各種各樣的小問題。比如在寫這個例子的時候碰到一個問題就是@Autowired的時候一直找不到SendMessage的類型,最后才發(fā)現(xiàn)是工廠類RpcClinetFactoryBean中的getObjectType中的返回類型寫錯了,我之前寫的是
????public?Class>?getObjectType()?{
????????return?this.getClass();;
????}
這樣的話注冊進(jìn)容器的就是RpcClinetFactoryBean類型的而不是SendMessage的類型。

