為了帶你搞懂RPC,我們手寫了一個(gè)RPC框架
如今,分布式系統(tǒng)大行其道,RPC 有著舉足輕重的地位。Dubbo、Thrift、gRpc 等框架各領(lǐng)風(fēng)騷,學(xué)習(xí)RPC是新手也是老鳥的必修課。本文帶你手?jǐn)]一個(gè)rpc-spring-starter,深入學(xué)習(xí)和理解rpc相關(guān)技術(shù),包括但不限于 RPC 原理、動態(tài)代理、Javassist 字節(jié)碼增強(qiáng)、服務(wù)注冊與發(fā)現(xiàn)、Netty 網(wǎng)絡(luò)通訊、傳輸協(xié)議、序列化、包壓縮、TCP 粘包、拆包、長連接復(fù)用、心跳檢測、SpringBoot 自動裝載、服務(wù)分組、接口版本、客戶端連接池、負(fù)載均衡、異步調(diào)用等知識,值得收藏。
RPC定義
遠(yuǎn)程服務(wù)調(diào)用(Remote procedure call)的概念歷史已久,1981年就已經(jīng)被提出,最初的目的就是為了調(diào)用遠(yuǎn)程方法像調(diào)用本地方法一樣簡單,經(jīng)歷了四十多年的更新與迭代,RPC 的大體思路已經(jīng)趨于穩(wěn)定,如今百家爭鳴的 RPC 協(xié)議和框架,諸如 Dubbo (阿里)、Thrift(FaceBook)、gRpc(Google)、brpc (百度)等都在不同側(cè)重點(diǎn)去解決最初的目的,有的想極致完美,有的追求極致性能,有的偏向極致簡單。
RPC基本原理
讓我們回到 RPC 最初的目的,要想實(shí)現(xiàn)調(diào)用遠(yuǎn)程方法想調(diào)用本地方法一樣簡單,至少要解決如下問題:
如何獲取可用的遠(yuǎn)程服務(wù)器 如何表示數(shù)據(jù) 如何傳遞數(shù)據(jù) 服務(wù)端如何確定并調(diào)用目標(biāo)方法
上述四點(diǎn)問題,都能與現(xiàn)在分布式系統(tǒng)火熱的術(shù)語一一對應(yīng),如何獲取可用的遠(yuǎn)程服務(wù)器(服務(wù)注冊與發(fā)現(xiàn))、如何表示數(shù)據(jù)(序列化與反序列化)、如何傳遞數(shù)據(jù)(網(wǎng)絡(luò)通訊)、服務(wù)端如何確定并調(diào)用目標(biāo)方法(調(diào)用方法映射)。筆者將通過一個(gè)簡單 RPC 項(xiàng)目來解決這些問題。
首先來看 RPC 的整體系統(tǒng)架構(gòu)圖:

圖中服務(wù)端啟動時(shí)將自己的服務(wù)節(jié)點(diǎn)信息注冊到注冊中心,客戶端調(diào)用遠(yuǎn)程方法時(shí)會訂閱注冊中心中的可用服務(wù)節(jié)點(diǎn)信息,拿到可用服務(wù)節(jié)點(diǎn)之后遠(yuǎn)程調(diào)用方法,當(dāng)注冊中心中的可用服務(wù)節(jié)點(diǎn)發(fā)生變化時(shí)會通知客戶端,避免客戶端繼續(xù)調(diào)用已經(jīng)失效的節(jié)點(diǎn)。那客戶端是如何調(diào)用遠(yuǎn)程方法的呢,來看一下遠(yuǎn)程調(diào)用示意圖:

客戶端模塊代理所有遠(yuǎn)程方法的調(diào)用 將目標(biāo)服務(wù)、目標(biāo)方法、調(diào)用目標(biāo)方法的參數(shù)等必要信息序列化 序列化之后的數(shù)據(jù)包進(jìn)一步壓縮,壓縮后的數(shù)據(jù)包通過網(wǎng)絡(luò)通信傳輸?shù)侥繕?biāo)服務(wù)節(jié)點(diǎn) 服務(wù)節(jié)點(diǎn)將接受到的數(shù)據(jù)包進(jìn)行解壓 解壓后的數(shù)據(jù)包反序列化成目標(biāo)服務(wù)、目標(biāo)方法、目標(biāo)方法的調(diào)用參數(shù) 通過服務(wù)端代理調(diào)用目標(biāo)方法獲取結(jié)果,結(jié)果同樣需要序列化、壓縮然后回傳給客戶端
通過以上描述,相信讀者應(yīng)該大體上了解了 RPC 是如何工作的,接下來看如何使用代碼具體實(shí)現(xiàn)上述的流程。鑒于篇幅筆者會選擇重要或者網(wǎng)絡(luò)上介紹相對較少的模塊來講述。
RPC實(shí)現(xiàn)細(xì)節(jié)
1. 服務(wù)注冊與發(fā)現(xiàn)
作為一個(gè)入門項(xiàng)目,我們的系統(tǒng)選用 Zookeeper 作為注冊中心, ZooKeeper 將數(shù)據(jù)保存在內(nèi)存中,性能很高。在讀多寫少的場景中尤其適用,因?yàn)?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(255, 100, 65);">寫操作會導(dǎo)致所有的服務(wù)器間同步狀態(tài)。服務(wù)注冊與發(fā)現(xiàn)是典型的讀多寫少的協(xié)調(diào)服務(wù)場景。Zookeeper 是一個(gè)典型的CP系統(tǒng),在服務(wù)選舉或者集群半數(shù)機(jī)器宕機(jī)時(shí)是不可用狀態(tài),相對于服務(wù)發(fā)現(xiàn)中主流的AP系統(tǒng)來說,可用性稍低,但是用于理解RPC的實(shí)現(xiàn),也是綽綽有余。
ZooKeeper節(jié)點(diǎn)介紹
持久節(jié)點(diǎn)( PERSISENT ):一旦創(chuàng)建,除非主動調(diào)用刪除操作,否則一直持久化存儲。 臨時(shí)節(jié)點(diǎn)( EPHEMERAL ):與客戶端會話綁定,客戶端會話失效,這個(gè)客戶端所創(chuàng)建的所有臨時(shí)節(jié)點(diǎn)都會被刪除除。 節(jié)點(diǎn)順序( SEQUENTIAL ):創(chuàng)建子節(jié)點(diǎn)時(shí),如果設(shè)置SEQUENTIAL屬性,則會自動在節(jié)點(diǎn)名后追加一個(gè)整形數(shù)字,上限是整形的最大值;同一目錄下共享順序,例如(/a0000000001,/b0000000002,/c0000000003,/test0000000004)。
ZooKeeper服務(wù)注冊
在 ZooKeeper 根節(jié)點(diǎn)下根據(jù)服務(wù)名創(chuàng)建持久節(jié)點(diǎn) /rpc/{serviceName}/service ,將該服務(wù)的所有服務(wù)節(jié)點(diǎn)使用臨時(shí)節(jié)點(diǎn)創(chuàng)建在 /rpc/{serviceName}/service 目錄下,代碼如下(為方便展示,后續(xù)展示代碼都做了刪減):
public?void?exportService(Service?serviceResource)?{
??String?name?=?serviceResource.getName();
??String?uri?=?GSON.toJson(serviceResource);
??String?servicePath?=?"rpc/"?+?name?+?"/service";
??zkClient.createPersistent(servicePath,?true);
??String?uriPath?=?servicePath?+?"/"?+?uri;
??//創(chuàng)建一個(gè)新的臨時(shí)節(jié)點(diǎn),當(dāng)該節(jié)點(diǎn)宕機(jī)會話失效時(shí),該臨時(shí)節(jié)點(diǎn)會被清理
??zkClient.createEphemeral(uriPath);
}
注冊效果如圖,本地啟動兩個(gè)服務(wù)則 service 下有兩個(gè)服務(wù)節(jié)點(diǎn)信息:


存儲的節(jié)點(diǎn)信息包括服務(wù)名,服務(wù) IP:PORT ,序列化協(xié)議,壓縮協(xié)議等。
ZooKeeper服務(wù)發(fā)現(xiàn)
客戶端啟動后,不會立即從注冊中心獲取可用服務(wù)節(jié)點(diǎn),而是在調(diào)用遠(yuǎn)程方法時(shí)獲取節(jié)點(diǎn)信息(懶加載),并放入本地緩存 MAP 中,供后續(xù)調(diào)用,當(dāng)注冊中心通知目錄變化時(shí)清空服務(wù)所有節(jié)點(diǎn)緩存,代碼如下:
public?List?getServices(String?name)? {
??Map>?SERVER_MAP?=?new?ConcurrentHashMap<>();
??String?servicePath?=?"rpc/"?+?name?+?"/service";
??List?children?=?zkClient.getChildren(servicePath);
??List?serviceList?=?Optional.ofNullable(children).orElse(new?ArrayList<>()).stream().map(str?->?{
????String?deCh?=?URLDecoder.decode(str,?StandardCharsets.UTF_8.toString());
????return?gson.fromJson(deCh,?Service.class);
??}).collect(Collectors.toList());
??SERVER_MAP.put(name,?serviceList);
??return?serviceList;
}
public?class?ZkChildListenerImpl?implements?IZkChildListener?{
????//監(jiān)聽子節(jié)點(diǎn)的刪除和新增事件
????@Override
????public?void?handleChildChange(String?parentPath,?List?childList) ?throws?Exception?{
????????//有變動就清空服務(wù)所有節(jié)點(diǎn)緩存
????????String[]?arr?=?parentPath.split("/");
????????SERVER_MAP.remove(arr[2]);
????}
}
PS:美團(tuán)分布式 ID 生成系統(tǒng)Leaf就使用 Zookeeper 的順序節(jié)點(diǎn)來注冊 WorkerID ,臨時(shí)節(jié)點(diǎn)保存節(jié)點(diǎn) IP:PORT 信息。
2. 客戶端實(shí)現(xiàn)
客戶端調(diào)用本地方法一樣調(diào)用遠(yuǎn)程方法的完美體驗(yàn)與 Java 動態(tài)代理的強(qiáng)大密不可分。
DefaultRpcBaseProcessor 抽象類實(shí)現(xiàn)了 ApplicationListener , onApplicationEvent 方法在 Spring 項(xiàng)目啟動完畢會收到時(shí)間通知,獲取 ApplicationContext 上下文之后開始注入服務(wù) injectService (依賴其他服務(wù))或者啟動服務(wù) startServer (自身服務(wù)實(shí)現(xiàn))。
injectService 方法會遍歷 ApplicationContext 上下文中的所有 Bean , Bean 中是否有屬性使用了 InjectService 注解。有的話生成代理類,注入到 Bean 的屬性中。代碼如下:
public?abstract?class?DefaultRpcBaseProcessor?implements?ApplicationListener<ContextRefreshedEvent>?{
??@Override
??public?void?onApplicationEvent(ContextRefreshedEvent?contextRefreshedEvent)?{
????//Spring啟動完畢會收到Event
????if?(Objects.isNull(contextRefreshedEvent.getApplicationContext().getParent()))?{
??????ApplicationContext?applicationContext?=?contextRefreshedEvent.getApplicationContext();
??????//保存spring上下文?后續(xù)使用
??????Container.setSpringContext(applicationContext);
??????startServer(applicationContext);
??????injectService(applicationContext);
????}
??}
??private?void?injectService(ApplicationContext?context)?{
????String[]?names?=?context.getBeanDefinitionNames();
????for?(String?name?:?names)?{
??????Object?bean?=?context.getBean(name);
??????Class>?clazz?=?bean.getClass();
??????//clazz?=?clazz.getSuperclass();?aop增強(qiáng)的類生成cglib類,需要Superclass才能獲取定義的字段
??????Field[]?declaredFields?=?clazz.getDeclaredFields();
??????//設(shè)置InjectService的代理類
??????for?(Field?field?:?declaredFields)?{
????????InjectService?injectService?=?field.getAnnotation(InjectService.class);
????????if?(injectService?==?null)?{continue;
????????Class>?fieldClass?=?field.getType();
????????Object?object?=?context.getBean(name);
????????field.set(object,?clientProxyFactory.getProxy(fieldClass,?injectService.group(),?injectService.version()));
????????ServerDiscoveryCache.SERVER_CLASS_NAMES.add(fieldClass.getName());
??????}
????}
??}
??protected?abstract?void?startServer(ApplicationContext?context);
}
調(diào)用 ClientProxyFactory 類的 getProxy ,根據(jù)服務(wù)接口、服務(wù)分組、服務(wù)版本、是否異步調(diào)用來創(chuàng)建該接口的代理類,對該接口的所有方法都會使用創(chuàng)建的代理類來調(diào)用。方法調(diào)用的實(shí)現(xiàn)細(xì)節(jié)都在 ClientInvocationHandler 中的 invoke 方法,主要內(nèi)容是,獲取服務(wù)節(jié)點(diǎn)信息,選擇調(diào)用節(jié)點(diǎn),構(gòu)建 request 對象,最后調(diào)用網(wǎng)絡(luò)模塊發(fā)送請求。
public?class?ClientProxyFactory?{
????public??T?getProxy(Class?clazz,?String?group,?String?version,?boolean?async) ?{
????????return?(T)?objectCache.computeIfAbsent(clazz.getName()?+?group?+?version,?clz?->?Proxy.newProxyInstance(clazz.getClassLoader(),?new?Class[]{clazz},?new?ClientInvocationHandler(clazz,?group,?version,?async)));
????}
????private?class?ClientInvocationHandler?implements?InvocationHandler?{
????????public?ClientInvocationHandler(Class>?clazz,?String?group,?String?version,?boolean?async)?{
????????}
??????
????????@Override
????????public?Object?invoke(Object?proxy,?Method?method,?Object[]?args)?throws?Throwable?{
????????????//1.?獲得服務(wù)信息
????????????String?serviceName?=?clazz.getName();
????????????List?serviceList?=?getServiceList(serviceName);
????????????Service?service?=?loadBalance.selectOne(serviceList);
????????????//2.?構(gòu)建request對象
????????????RpcRequest?rpcRequest?=?new?RpcRequest();
????????????rpcRequest.setServiceName(service.getName());
????????????rpcRequest.setMethod(method.getName());
????????????rpcRequest.setGroup(group);
????????????rpcRequest.setVersion(version);
????????????rpcRequest.setParameters(args);
????????????rpcRequest.setParametersTypes(method.getParameterTypes());
????????????//3.?協(xié)議編組
????????????RpcProtocolEnum?messageProtocol?=?RpcProtocolEnum.getProtocol(service.getProtocol());
????????????RpcCompressEnum?compresser?=?RpcCompressEnum.getCompress(service.getCompress());
????????????RpcResponse?response?=?netClient.sendRequest(rpcRequest,?service,?messageProtocol,?compresser);
????????????return?response.getReturnValue();
????????}
????}
}
3. 網(wǎng)絡(luò)傳輸
客戶端封裝調(diào)用請求對象之后需要通過網(wǎng)絡(luò)將調(diào)用信息發(fā)送到服務(wù)端,在發(fā)送請求對象之前還需要經(jīng)歷序列化、壓縮兩個(gè)階段。
序列化與反序列化
序列化與反序列化的核心作用就是對象的保存與重建,方便客戶端與服務(wù)端通過字節(jié)流傳遞對象,快速對接交互。
序列化就是指把 Java 對象轉(zhuǎn)換為字節(jié)序列的過程。 反序列化就是指把字節(jié)序列恢復(fù)為 Java 對象的過程。
Java序列化的方式有很多,諸如 JDK 自帶的 Serializable 、 Protobuf 、 kryo 等,上述三種筆者自測性能最高的是 Kryo 、其次是 Protobuf 。Json 也不失為一種簡單且高效的序列化方法,有很多大道至簡的框架采用。序列化接口比較簡單,讀者可以自行查看實(shí)現(xiàn)代碼。
public?interface?MessageProtocol?{
????byte[]?marshallingRequest(RpcRequest?request)?throws?Exception;
????RpcRequest?unmarshallingRequest(byte[]?data)?throws?Exception;
????byte[]?marshallingResponse(RpcResponse?response)?throws?Exception;
????RpcResponse?unmarshallingResponse(byte[]?data)?throws?Exception;
}
壓縮與解壓
網(wǎng)絡(luò)通信的成本很高,為了減小網(wǎng)絡(luò)傳輸數(shù)據(jù)包的體積,將序列化之后的字節(jié)碼壓縮不失為一種很好的選擇。Gzip 壓縮算法比率在3到10倍左右,可以大大節(jié)省服務(wù)器的網(wǎng)絡(luò)帶寬,各種流行的 web 服務(wù)器也都支持 Gzip 壓縮算法。Java 接入也比較容易,接入代碼可以查看下方接口的實(shí)現(xiàn)。
public?interface?Compresser?{
????byte[]?compress(byte[]?bytes);
????byte[]?decompress(byte[]?bytes);
}
網(wǎng)絡(luò)通信
萬事俱備只欠東風(fēng)。將請求對象序列化成字節(jié)碼,并且壓縮體積之后,需要使用網(wǎng)絡(luò)將字節(jié)碼傳輸?shù)椒?wù)器。常用網(wǎng)絡(luò)傳輸協(xié)議有 HTTP 、 TCP 、 WebSocke t等。HTTP、WebSocket 是應(yīng)用層協(xié)議,TCP 是傳輸層協(xié)議。有些追求簡潔、易用的 RPC 框架也有選擇 HTTP 協(xié)議的。TCP傳輸?shù)母呖煽啃院蜆O致性能是主流RPC框架選擇的最主要原因。談到 Java 生態(tài)的通信領(lǐng)域,Netty 的領(lǐng)銜地位短時(shí)間內(nèi)無人能及。選用 Netty 作為網(wǎng)絡(luò)通信模塊, TCP 數(shù)據(jù)流的粘包、拆包不可避免。
粘包、拆包問題
TCP 傳輸協(xié)議是一種面向連接的、可靠的、基于字節(jié)流的傳輸層通信協(xié)議。為了最大化傳輸效率。發(fā)送方可能將單個(gè)較小數(shù)據(jù)包合并發(fā)送,這種情況就需要接收方來拆包處理數(shù)據(jù)了。
Netty 提供了3種類型的解碼器來處理 TCP 粘包/拆包問題:
定長消息解碼器: FixedLengthFrameDecoder。發(fā)送方和接收方規(guī)定一個(gè)固定的消息長度,不夠用空格等字符補(bǔ)全,這樣接收方每次從接受到的字節(jié)流中讀取固定長度的字節(jié)即可,長度不夠就保留本次接受的數(shù)據(jù),再在下一個(gè)字節(jié)流中獲取剩下數(shù)量的字節(jié)數(shù)據(jù)。分隔符解碼器: LineBasedFrameDecoder或DelimiterBasedFrameDecoder。LineBasedFrameDecoder是行分隔符解碼器,分隔符為\n或\r\n;DelimiterBasedFrameDecoder是自定義分隔符解碼器,可以定義一個(gè)或多個(gè)分隔符。接收端在收到的字節(jié)流中查找分隔符,然后返回分隔符之前的數(shù)據(jù),沒找到就繼續(xù)從下一個(gè)字節(jié)流中查找。數(shù)據(jù)長度解碼器: LengthFieldBasedFrameDecoder。將發(fā)送的消息分為 header 和 body,header 存儲消息的長度(字節(jié)數(shù)),body 是發(fā)送的消息的內(nèi)容。同時(shí)發(fā)送方和接收方要協(xié)商好這個(gè) header 的字節(jié)數(shù),因?yàn)?int 能表示長度,long 也能表示長度。接收方首先從字節(jié)流中讀取前n(header的字節(jié)數(shù))個(gè)字節(jié)(header),然后根據(jù)長度讀取等量的字節(jié),不夠就從下一個(gè)數(shù)據(jù)流中查找。
不想使用內(nèi)置的解碼器也可自定義解碼器,自定傳輸協(xié)議。
網(wǎng)絡(luò)通信這部分內(nèi)容比較復(fù)雜,說來話長,代碼易讀,讀者可先自行閱讀代碼。后續(xù)有機(jī)會細(xì)說此節(jié)內(nèi)容。
5. 服務(wù)端實(shí)現(xiàn)
客戶端通過網(wǎng)絡(luò)傳輸將請求對象序列化、壓縮之后的字節(jié)碼傳輸?shù)椒?wù)端之后,同樣先通過解壓、反序列化將字節(jié)碼重建為請求對象。有了請求對象之后,就可以進(jìn)行關(guān)鍵的方法調(diào)用環(huán)節(jié)了。
public?abstract?class?RequestBaseHandler?{
????public?RpcResponse?handleRequest(RpcRequest?request)?throws?Exception?{
????????//1.?查找目標(biāo)服務(wù)代理對象
????????ServiceObject?serviceObject?=?serverRegister.getServiceObject(request.getServiceName()?+?request.getGroup()?+?request.getVersion());
????????RpcResponse?response?=?null;
????????//2.?調(diào)用對應(yīng)的方法
????????response?=?invoke(serviceObject,?request);
????????//響應(yīng)客戶端
????????return?response;
????}
????//具體代理調(diào)用
????public?abstract?RpcResponse?invoke(ServiceObject?serviceObject,?RpcRequest?request)?throws?Exception;
}
上述抽象類 RequestBaseHandler 是調(diào)用服務(wù)方法的抽象實(shí)現(xiàn) handleRequest 通過請求對象的服務(wù)名、服務(wù)分組、服務(wù)版本在 serverRegister.getServiceObject 獲取代理對象。然后調(diào)用 invoke 抽象方法來真正通過代理對象調(diào)用方法獲得結(jié)果。
服務(wù)的代理對象怎么產(chǎn)生的? 如何通過代理對象調(diào)用方法?
生成服務(wù)代理對象
帶著上述問題來看 DefaultRpcBaseProcessor 抽象類:
public?abstract?class?DefaultRpcBaseProcessor?implements?ApplicationListener<ContextRefreshedEvent>?{
????@Override
????public?void?onApplicationEvent(ContextRefreshedEvent?contextRefreshedEvent)?{
????????//Spring啟動完畢會收到Event
????????if?(Objects.isNull(contextRefreshedEvent.getApplicationContext().getParent()))?{
????????????ApplicationContext?applicationContext?=?contextRefreshedEvent.getApplicationContext();
????????????Container.setSpringContext(applicationContext);
????????????startServer(applicationContext);
????????????injectService(applicationContext);
????????}
????}
????private?void?injectService(ApplicationContext?context)?{}
????protected?abstract?void?startServer(ApplicationContext?context);
}
DefaultRpcBaseProcessor 抽象類也有兩個(gè)實(shí)現(xiàn)類 DefaultRpcReflectProcessor 和 DefaultRpcJavassistProcessor,來實(shí)現(xiàn)關(guān)鍵的生成代理對象的 startServer 方法。
服務(wù)接口實(shí)現(xiàn)類的 Bean 作為代理對象
public?class?DefaultRpcReflectProcessor?extends?DefaultRpcBaseProcessor?{
????@Override
????protected?void?startServer(ApplicationContext?context)?{
????????Map?beans?=?context.getBeansWithAnnotation(RpcService.class);
????????if?(beans.size()?>?0)?{
????????????boolean?startServerFlag?=?true;
????????????for?(Object?obj?:?beans.values())?{
??????????????Class>?clazz?=?obj.getClass();
??????????????Class>[]?interfaces?=?clazz.getInterfaces();
??????????????/*?如果只實(shí)現(xiàn)了一個(gè)接口就用接口的className作為服務(wù)名
???????????????*?如果該類實(shí)現(xiàn)了多個(gè)接口,則使用注解里的value作為服務(wù)名
???????????????*/
??????????????RpcService?service?=?clazz.getAnnotation(RpcService.class);
??????????????if?(interfaces.length?!=?1)?{
????????????????String?value?=?service.value();
????????????????ServiceObject?so?=?new?ServiceObject(value,?Class.forName(value),?obj,?service.group(),?service.version());
??????????????}?else?{
????????????????Class>?supperClass?=?interfaces[0];
????????????????ServiceObject?so?=?new?ServiceObject(supperClass.getName(),?supperClass,?obj,?service.group(),?service.version());
??????????????}
??????????????serverRegister.register(so);
????????????}
????????}
????}
}
DefaultRpcReflectProcessor 中獲取到所有有 RpcService 注解的服務(wù)接口實(shí)現(xiàn)類 Bean,然后將該 Bean 作為服務(wù)代理對象注冊到 serverRegister 中供上述的反射調(diào)用中使用。
使用 Javassist 生成新的代理對象
public?class?DefaultRpcJavassistProcessor?extends?DefaultRpcBaseProcessor?{
????@Override
????protected?void?startServer(ApplicationContext?context)?{
????????Map?beans?=?context.getBeansWithAnnotation(RpcService.class);
????????if?(beans.size()?>?0)?{
????????????boolean?startServerFlag?=?true;
????????????for?(Map.Entry?entry?:?beans.entrySet())?{
??????????????String?beanName?=?entry.getKey();
??????????????Object?obj??=?entry.getValue();
??????????????Class>?clazz?=?obj.getClass();
??????????????Class>[]?interfaces?=?clazz.getInterfaces();
??????????????Method[]?declaredMethods?=?clazz.getDeclaredMethods();
??????????????/*
???????????????*?如果只實(shí)現(xiàn)了一個(gè)接口就用接口的className作為服務(wù)名
???????????????*?如果該類實(shí)現(xiàn)了多個(gè)接口,則使用注解里的value作為服務(wù)名
???????????????*/
??????????????RpcService?service?=?clazz.getAnnotation(RpcService.class);
??????????????if?(interfaces.length?!=?1)?{
????????????????String?value?=?service.value();
????????????????//bean實(shí)現(xiàn)多個(gè)接口時(shí),javassist代理類中生成的方法只按照注解指定的服務(wù)類來生成
????????????????declaredMethods?=?Class.forName(value).getDeclaredMethods();
????????????????Object?proxy?=?ProxyFactory.makeProxy(value,?beanName,?declaredMethods);
????????????????ServiceObject?so?=?new?ServiceObject(value,?Class.forName(value),?proxy,?service.group(),?service.version());
??????????????}?else?{
????????????????Class>?supperClass?=?interfaces[0];
????????????????Object?proxy?=?ProxyFactory.makeProxy(supperClass.getName(),?beanName,?declaredMethods);
????????????????ServiceObject?so?=?new?ServiceObject(supperClass.getName(),?supperClass,?proxy,?service.group(),?service.version());
??????????????}
??????????????serverRegister.register(so);
????????????}
????????}
????}
}
DefaultRpcJavassistProcessor 與 DefaultRpcReflectProcessor 的差異在于后者直接將服務(wù)實(shí)現(xiàn)類對象 Bean 作為服務(wù)代理對象,而前者通過 ProxyFactory.makeProxy(value, beanName, declaredMethods) 創(chuàng)建了新的代理對象,將新的代理對象注冊到 serverRegister 中供后續(xù)調(diào)用調(diào)用中使用。該方法通過 Javassist 來生成代理類,代碼冗長,建議閱讀源碼。我來通過下面的代碼演示實(shí)現(xiàn)的代理類。
首先我們的服務(wù)接口是:
public?interface?HelloService?{
????String?hello(String?name);
}
服務(wù)的實(shí)現(xiàn)類是:
@RpcService
public?class?HelloServiceImpl?implements?HelloService?{
????@Override
????public?String?hello(String?name)?{
????????return?"a1";
????}
}
那最終新生成的代理類是這樣的:
public?class?HelloService$proxy1649315143476?{
????private?static?cn.ppphuang.rpcspringstarter.service.HelloService?serviceProxy?=?
?((org.springframework.context.ApplicationContext)cn.ppphuang.rpcspringstarter.server.Container.getSpringContext()).getBean("helloServiceImpl");
??
????public?cn.ppphuang.rpcspringstarter.common.model.RpcResponse?hello(cn.ppphuang.rpcspringstarter.common.model.RpcRequest?request)?throws?java.lang.Exception?{
????????java.lang.Object[]?params?=?request.getParameters();
????????if(params.length?==?1
???????????&&?(params[0]?==?null||params[0].getClass().getSimpleName().equalsIgnoreCase("String"))){
????????????java.lang.String?arg0?=?null;
????????????arg0?=?cn.ppphuang.rpcspringstarter.util.ConvertUtil.convertToString(params[0]);
????????????java.lang.String?returnValue?=?serviceProxy.hello(arg0);
????????????return?new?cn.ppphuang.rpcspringstarter.common.model.RpcResponse(returnValue);
????????}
????}
??
????public?cn.ppphuang.rpcspringstarter.common.model.RpcResponse?invoke(cn.ppphuang.rpcspringstarter.common.model.RpcRequest?request)?throws?java.lang.Exception?{
????????String?methodName?=?request.getMethod();
????????if(methodName.equalsIgnoreCase("hello")){
????????????java.lang.Object?returnValue?=?hello(request);
????????????return?returnValue;
????????}
????}
}
清理全限定類名后,代碼如下:
public?class?HelloService$proxy1649315143476?{
????private?static?HelloService?serviceProxy?=?((ApplicationContext)Container.getSpringContext()).getBean("helloServiceImpl");
??
????public?RpcResponse?hello(RpcRequest?request)?throws?Exception?{
????????Object[]?params?=?request.getParameters();
????????if(params.length?==?1
???????????&&?(params[0]?==?null||?params[0].getClass().getSimpleName().equalsIgnoreCase("String"))){
????????????String?arg0?=?ConvertUtil.convertToString(params[0]);
????????????String?returnValue?=?serviceProxy.hello(arg0);
????????????return?new?RpcResponse(returnValue);
????????}
????}
????public?RpcResponse?invoke(RpcRequest?request)?throws?Exception?{
????????String?methodName?=?request.getMethod();
????????if(methodName.equalsIgnoreCase("hello")){
????????????Object?returnValue?=?hello(request);
????????????return?returnValue;
????????}
????}
}
代理類 HelloService$proxy1649315143476中有一個(gè)服務(wù)接口類型HelloService的靜態(tài)屬性serviceProxy,值就是通過ApplicationContext上下文獲取到的服務(wù)接口實(shí)現(xiàn)類HelloServiceImpl這個(gè)Bean(SpringContext已經(jīng)被提前緩存到Container類中,讀者可以自行查找代碼了解)。public RpcResponse invoke(RpcRequest request) throws Exception該方法判斷調(diào)用的方法名是hello來調(diào)用代理類中的hello方法。public RpcResponse hello(RpcRequest request) throws Exception該方法通過調(diào)用serviceProxy.hello()的方法獲取結(jié)果。
public?interface?InvokeProxy?{
????/**
?????*?invoke調(diào)用服務(wù)接口
?????*/
????RpcResponse?invoke(RpcRequest?rpcRequest)?throws?Exception;
}
HelloService$proxy1649315143476 類實(shí)現(xiàn) InvokeProxy 接口(ProxyFactory.makeProxy 代碼中有體現(xiàn))。InvokeProxy 接口只有一個(gè) invoke 方法。到這里就能理解通過調(diào)用代理對象的 invoke 方法就能間接調(diào)用到服務(wù)接口實(shí)現(xiàn)類 HelloServiceImpl 的對應(yīng)方法了。
調(diào)用代理對象方法
理清代理對象的生成之后,開始調(diào)用代理對象的方法。
上文中寫到的抽象類 RequestBaseHandler 有兩個(gè)實(shí)現(xiàn)類 RequestJavassistHandler 和 RequestReflectHandler。
Java 反射調(diào)用
先看 RequestReflectHandler:
public?class?RequestReflectHandler?extends?RequestBaseHandler?{
????@Override
????public?RpcResponse?invoke(ServiceObject?serviceObject,?RpcRequest?request)?throws?Exception?{
????????Method?method?=?serviceObject.getClazz().getMethod(request.getMethod(),?request.getParametersTypes());
????????Object?value?=?method.invoke(serviceObject.getObj(),?request.getParameters());
????????RpcResponse?response?=?new?RpcResponse(RpcStatusEnum.SUCCESS);
????????response.setReturnValue(value);
????????return?response;
????}
}
Object value = method.invoke(serviceObject.getObj(), request.getParameters());
這行代碼都很熟悉,用 Java 框架中最常見的反射來調(diào)用代理類中的方法,大部分 RPC 框架也都是這么來實(shí)現(xiàn)的。
通過 Javassists 生成的代理對象 invoke 方法調(diào)用
接著看 RequestJavassistHandler:
public?class?RequestJavassistHandler?extends?RequestBaseHandler?{
????@Override
????public?RpcResponse?invoke(ServiceObject?serviceObject,?RpcRequest?request)?throws?Exception?{
????????InvokeProxy?invokeProxy?=?(InvokeProxy)?serviceObject.getObj();
????????return?invokeProxy.invoke(request);
????}
}
直接將代理對象轉(zhuǎn)為 InvokeProxy,調(diào)用 InvokeProxy.invoke() 方法獲得返回值,如果這里不能理解,回頭再看一下使用 Javassist 生成新的代理對象這個(gè)小節(jié)吧。
調(diào)用代理對象的方法獲取到結(jié)果,仍要通過序列化、壓縮后,將字節(jié)流數(shù)據(jù)包通過網(wǎng)絡(luò)傳輸?shù)娇蛻舳?,客戶端拿到響?yīng)的結(jié)果再解壓,反序列化得到結(jié)果對象。
Javassist介紹
Javassist 是一個(gè)開源的分析、編輯和創(chuàng)建Java字節(jié)碼的類庫。是由東京工業(yè)大學(xué)的數(shù)學(xué)和計(jì)算機(jī)科學(xué)系的 ?Shigeru Chiba(千葉滋)所創(chuàng)建的。簡單來說就是用源碼級別的 api 去修改字節(jié)碼。Duboo、MyBatis 也都使用了 Javassist。Duboo 作者也選擇Javassist作為 Duboo 的代理工具,可以點(diǎn)擊這里查看 Duboo 作者也選擇 Javassist 的原因。
Javassist 還能和諧(pojie)Java 編寫的商業(yè)軟件,例如抓包工具 Charles。代碼在這里,供交流學(xué)習(xí)。
在使用 Javassist 有踩到如下坑,供大家參考:
Javassist是運(yùn)行時(shí),沒有JDK靜態(tài)編譯過程,JDK的很多語法糖都是在靜態(tài)編譯過程中處理的,所以需要自行編碼處理,例如自動拆裝箱。int?i?=?1;
Integer?ii?=?i;???????????????//javassist?錯(cuò)誤?JDK會自動裝箱,javassist需要自行編碼處理?
int?i?=?1;
Integer?ii?=?new?Integer(i);??//javassist?正確自定義的類需要使用類的完全限定名,這也是為什么生成的代理類中類都是完全限定名。
選擇哪種代理方式
可以通過配置文件 application.properties 修改 hp.rpc.server-proxy-type 的值來選擇代理模式。
性能測試,機(jī)器 Macbook Pro M1 8C 16G, 代碼如下:
@Autowired
ClientProxyFactory?clientProxyFactory;
@Test
void?contextLoads()?{
??long?l1?=?System.currentTimeMillis();
??HelloService?proxy?=?clientProxyFactory.getProxy(HelloService.class,"group3","version3");
??for?(int?i?=?0;?i?1000000;?i++)?{
????String?ppphuang?=?proxy.hello("ppphuang");
??}
??long?l2?=?System.currentTimeMillis();
??long?l3?=?l2?-?l1;
??System.out.println(l3);
}
測試結(jié)果(ms):
| 請求次數(shù) | 反射調(diào)用1 | 反射調(diào)用2 | 反射調(diào)用3 | Javassist1 | Javassist2 | Javassist3 |
|---|---|---|---|---|---|---|
| 10000 | 1303 | 1159 | 1164 | 1126 | 1235 | 1094 |
| 100000 | 6110 | 6103 | 6065 | 6259 | 5854 | 6178 |
| 1000000 | 54475 | 51890 | 52329 | 52560 | 52099 | 52794 |
測試結(jié)果差異并不大,Javassist 模式下只是稍快了一點(diǎn)點(diǎn),幾乎可以忽略不記。與Duboo作者博客6樓評論的測試結(jié)果一致。所以想簡單通用性強(qiáng)用反射模式,也可以通過使用 Javassist 模式來學(xué)習(xí)更多知識,因?yàn)?Javassist 需要自己兼容很多特殊的狀況,反射調(diào)用 JDK 已經(jīng)幫你兼容完了。
總結(jié)
寫到這里我們了解了 RPC 的基本原理、服務(wù)注冊與發(fā)現(xiàn)、客戶端代理、網(wǎng)絡(luò)傳輸、重點(diǎn)介紹了服務(wù)端的兩種代理模式,學(xué)習(xí) Javassist 如何實(shí)現(xiàn)代理。
還有很多東西沒有重點(diǎn)講述甚至沒有提及,例如粘、拆包的處理、自定義數(shù)據(jù)包協(xié)議、Javassist 模式下如何實(shí)現(xiàn)方法重載、如何解決一個(gè)服務(wù)接口類有多個(gè)實(shí)現(xiàn)、如何解決一個(gè)實(shí)現(xiàn)類實(shí)現(xiàn)了多個(gè)服務(wù)接口、在 SpringBoot 中如何自動裝載、如何開箱即用、怎么實(shí)現(xiàn)異步調(diào)用、怎么擴(kuò)展序列化、壓縮算法等等...有興趣的讀者可以在源碼中尋找答案,或者尋找優(yōu)化項(xiàng),當(dāng)然也可以尋找 bug 。如果讀者能理解整個(gè)項(xiàng)目的實(shí)現(xiàn),相信你一定會有所收獲。后續(xù)有機(jī)會也會再寫文章與大家交流學(xué)習(xí)。因筆者水平有限,不完善的地方請大家斧正。感謝各位的閱讀,謝謝。
附錄
項(xiàng)目地址:https://github.com/ppphuang/rpc-spring-starter
測試DEMO:https://github.com/ppphuang/rpc-spring-starter-demo
- END -往期精彩文章:
如何保證同事的代碼不會腐爛?一文帶你了解 Alibaba COLA 架構(gòu)
誰會拒絕一臺Win11和MacOS無縫切換的MacBook呢?Parallels17極速體驗(yàn)
