<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          為了帶你搞懂RPC,我們手寫了一個(gè)RPC框架

          共 18688字,需瀏覽 38分鐘

           ·

          2022-04-15 20:46

          如今,分布式系統(tǒng)大行其道,RPC 有著舉足輕重的地位。Dubbo、Thrift、gRpc 等框架各領(lǐng)風(fēng)騷,學(xué)習(xí)RPC是新手也是老鳥的必修課。本文帶你手?jǐn)]一個(gè)rpc-spring-starter,深入學(xué)習(xí)和理解rpc相關(guān)技術(shù),包括但不限于 RPC 原理、動態(tài)代理、Javassist 字節(jié)碼增強(qiáng)、服務(wù)注冊與發(fā)現(xiàn)、Netty 網(wǎng)絡(luò)通訊、傳輸協(xié)議、序列化、包壓縮、TCP 粘包、拆包、長連接復(fù)用、心跳檢測、SpringBoot 自動裝載、服務(wù)分組、接口版本、客戶端連接池、負(fù)載均衡、異步調(diào)用等知識,值得收藏。

          RPC定義

          遠(yuǎn)程服務(wù)調(diào)用(Remote procedure call)的概念歷史已久,1981年就已經(jīng)被提出,最初的目的就是為了調(diào)用遠(yuǎn)程方法像調(diào)用本地方法一樣簡單,經(jīng)歷了四十多年的更新與迭代,RPC 的大體思路已經(jīng)趨于穩(wěn)定,如今百家爭鳴的 RPC 協(xié)議和框架,諸如 Dubbo (阿里)、Thrift(FaceBook)、gRpc(Google)、brpc (百度)等都在不同側(cè)重點(diǎn)去解決最初的目的,有的想極致完美,有的追求極致性能,有的偏向極致簡單。

          RPC基本原理

          讓我們回到 RPC 最初的目的,要想實(shí)現(xiàn)調(diào)用遠(yuǎn)程方法想調(diào)用本地方法一樣簡單,至少要解決如下問題:

          1. 如何獲取可用的遠(yuǎn)程服務(wù)器
          2. 如何表示數(shù)據(jù)
          3. 如何傳遞數(shù)據(jù)
          4. 服務(wù)端如何確定并調(diào)用目標(biāo)方法

          上述四點(diǎn)問題,都能與現(xiàn)在分布式系統(tǒng)火熱的術(shù)語一一對應(yīng),如何獲取可用的遠(yuǎn)程服務(wù)器(服務(wù)注冊與發(fā)現(xiàn))、如何表示數(shù)據(jù)(序列化與反序列化)、如何傳遞數(shù)據(jù)(網(wǎng)絡(luò)通訊)、服務(wù)端如何確定并調(diào)用目標(biāo)方法(調(diào)用方法映射)。筆者將通過一個(gè)簡單 RPC 項(xiàng)目來解決這些問題。

          首先來看 RPC 的整體系統(tǒng)架構(gòu)圖:

          圖中服務(wù)端啟動時(shí)將自己的服務(wù)節(jié)點(diǎn)信息注冊到注冊中心,客戶端調(diào)用遠(yuǎn)程方法時(shí)會訂閱注冊中心中的可用服務(wù)節(jié)點(diǎn)信息,拿到可用服務(wù)節(jié)點(diǎn)之后遠(yuǎn)程調(diào)用方法,當(dāng)注冊中心中的可用服務(wù)節(jié)點(diǎn)發(fā)生變化時(shí)會通知客戶端,避免客戶端繼續(xù)調(diào)用已經(jīng)失效的節(jié)點(diǎn)。那客戶端是如何調(diào)用遠(yuǎn)程方法的呢,來看一下遠(yuǎn)程調(diào)用示意圖:

          1. 客戶端模塊代理所有遠(yuǎn)程方法的調(diào)用
          2. 將目標(biāo)服務(wù)、目標(biāo)方法、調(diào)用目標(biāo)方法的參數(shù)等必要信息序列化
          3. 序列化之后的數(shù)據(jù)包進(jìn)一步壓縮,壓縮后的數(shù)據(jù)包通過網(wǎng)絡(luò)通信傳輸?shù)侥繕?biāo)服務(wù)節(jié)點(diǎn)
          4. 服務(wù)節(jié)點(diǎn)將接受到的數(shù)據(jù)包進(jìn)行解壓
          5. 解壓后的數(shù)據(jù)包反序列化成目標(biāo)服務(wù)、目標(biāo)方法、目標(biāo)方法的調(diào)用參數(shù)
          6. 通過服務(wù)端代理調(diào)用目標(biāo)方法獲取結(jié)果,結(jié)果同樣需要序列化、壓縮然后回傳給客戶端

          通過以上描述,相信讀者應(yīng)該大體上了解了 RPC 是如何工作的,接下來看如何使用代碼具體實(shí)現(xiàn)上述的流程。鑒于篇幅筆者會選擇重要或者網(wǎng)絡(luò)上介紹相對較少的模塊來講述。

          RPC實(shí)現(xiàn)細(xì)節(jié)

          1. 服務(wù)注冊與發(fā)現(xiàn)

          作為一個(gè)入門項(xiàng)目,我們的系統(tǒng)選用 Zookeeper 作為注冊中心, ZooKeeper 將數(shù)據(jù)保存在內(nèi)存中,性能很高。在少的場景中尤其適用,因?yàn)?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(255, 100, 65);">寫操作會導(dǎo)致所有的服務(wù)器間同步狀態(tài)。服務(wù)注冊與發(fā)現(xiàn)是典型的少的協(xié)調(diào)服務(wù)場景。Zookeeper 是一個(gè)典型的CP系統(tǒng),在服務(wù)選舉或者集群半數(shù)機(jī)器宕機(jī)時(shí)是不可用狀態(tài),相對于服務(wù)發(fā)現(xiàn)中主流的AP系統(tǒng)來說,可用性稍低,但是用于理解RPC的實(shí)現(xiàn),也是綽綽有余。

          ZooKeeper節(jié)點(diǎn)介紹

          • 持久節(jié)點(diǎn)( PERSISENT ):一旦創(chuàng)建,除非主動調(diào)用刪除操作,否則一直持久化存儲。
          • 臨時(shí)節(jié)點(diǎn)( EPHEMERAL ):與客戶端會話綁定,客戶端會話失效,這個(gè)客戶端所創(chuàng)建的所有臨時(shí)節(jié)點(diǎn)都會被刪除除。
          • 節(jié)點(diǎn)順序( SEQUENTIAL ):創(chuàng)建子節(jié)點(diǎn)時(shí),如果設(shè)置SEQUENTIAL屬性,則會自動在節(jié)點(diǎn)名后追加一個(gè)整形數(shù)字,上限是整形的最大值;同一目錄下共享順序,例如(/a0000000001,/b0000000002,/c0000000003,/test0000000004)。

          ZooKeeper服務(wù)注冊

          在 ZooKeeper 根節(jié)點(diǎn)下根據(jù)服務(wù)名創(chuàng)建持久節(jié)點(diǎn) /rpc/{serviceName}/service ,將該服務(wù)的所有服務(wù)節(jié)點(diǎn)使用臨時(shí)節(jié)點(diǎn)創(chuàng)建在 /rpc/{serviceName}/service 目錄下,代碼如下(為方便展示,后續(xù)展示代碼都做了刪減):

          public?void?exportService(Service?serviceResource)?{
          ??String?name?=?serviceResource.getName();
          ??String?uri?=?GSON.toJson(serviceResource);
          ??String?servicePath?=?"rpc/"?+?name?+?"/service";
          ??zkClient.createPersistent(servicePath,?true);
          ??String?uriPath?=?servicePath?+?"/"?+?uri;
          ??//創(chuàng)建一個(gè)新的臨時(shí)節(jié)點(diǎn),當(dāng)該節(jié)點(diǎn)宕機(jī)會話失效時(shí),該臨時(shí)節(jié)點(diǎn)會被清理
          ??zkClient.createEphemeral(uriPath);
          }

          注冊效果如圖,本地啟動兩個(gè)服務(wù)則 service 下有兩個(gè)服務(wù)節(jié)點(diǎn)信息:

          存儲的節(jié)點(diǎn)信息包括服務(wù)名,服務(wù) IP:PORT ,序列化協(xié)議,壓縮協(xié)議等。

          ZooKeeper服務(wù)發(fā)現(xiàn)

          客戶端啟動后,不會立即從注冊中心獲取可用服務(wù)節(jié)點(diǎn),而是在調(diào)用遠(yuǎn)程方法時(shí)獲取節(jié)點(diǎn)信息(懶加載),并放入本地緩存 MAP 中,供后續(xù)調(diào)用,當(dāng)注冊中心通知目錄變化時(shí)清空服務(wù)所有節(jié)點(diǎn)緩存,代碼如下:

          public?List?getServices(String?name)?{
          ??Map>?SERVER_MAP?=?new?ConcurrentHashMap<>();
          ??String?servicePath?=?"rpc/"?+?name?+?"/service";
          ??List?children?=?zkClient.getChildren(servicePath);
          ??List?serviceList?=?Optional.ofNullable(children).orElse(new?ArrayList<>()).stream().map(str?->?{
          ????String?deCh?=?URLDecoder.decode(str,?StandardCharsets.UTF_8.toString());
          ????return?gson.fromJson(deCh,?Service.class);
          ??}).collect(Collectors.toList());
          ??SERVER_MAP.put(name,?serviceList);
          ??return?serviceList;
          }
          public?class?ZkChildListenerImpl?implements?IZkChildListener?{
          ????//監(jiān)聽子節(jié)點(diǎn)的刪除和新增事件
          ????@Override
          ????public?void?handleChildChange(String?parentPath,?List?childList)?throws?Exception?{
          ????????//有變動就清空服務(wù)所有節(jié)點(diǎn)緩存
          ????????String[]?arr?=?parentPath.split("/");
          ????????SERVER_MAP.remove(arr[2]);
          ????}
          }

          PS:美團(tuán)分布式 ID 生成系統(tǒng)Leaf就使用 Zookeeper 的順序節(jié)點(diǎn)來注冊 WorkerID ,臨時(shí)節(jié)點(diǎn)保存節(jié)點(diǎn) IP:PORT 信息。

          2. 客戶端實(shí)現(xiàn)

          客戶端調(diào)用本地方法一樣調(diào)用遠(yuǎn)程方法的完美體驗(yàn)與 Java 動態(tài)代理的強(qiáng)大密不可分。

          DefaultRpcBaseProcessor 抽象類實(shí)現(xiàn)了 ApplicationListener , onApplicationEvent 方法在 Spring 項(xiàng)目啟動完畢會收到時(shí)間通知,獲取 ApplicationContext 上下文之后開始注入服務(wù) injectService (依賴其他服務(wù))或者啟動服務(wù) startServer (自身服務(wù)實(shí)現(xiàn))。

          injectService 方法會遍歷 ApplicationContext 上下文中的所有 Bean , Bean 中是否有屬性使用了 InjectService 注解。有的話生成代理類,注入到 Bean 的屬性中。代碼如下:

          public?abstract?class?DefaultRpcBaseProcessor?implements?ApplicationListener<ContextRefreshedEvent>?{
          ??@Override
          ??public?void?onApplicationEvent(ContextRefreshedEvent?contextRefreshedEvent)?{
          ????//Spring啟動完畢會收到Event
          ????if?(Objects.isNull(contextRefreshedEvent.getApplicationContext().getParent()))?{
          ??????ApplicationContext?applicationContext?=?contextRefreshedEvent.getApplicationContext();
          ??????//保存spring上下文?后續(xù)使用
          ??????Container.setSpringContext(applicationContext);
          ??????startServer(applicationContext);
          ??????injectService(applicationContext);
          ????}
          ??}
          ??private?void?injectService(ApplicationContext?context)?{
          ????String[]?names?=?context.getBeanDefinitionNames();
          ????for?(String?name?:?names)?{
          ??????Object?bean?=?context.getBean(name);
          ??????Class?clazz?=?bean.getClass();
          ??????//clazz?=?clazz.getSuperclass();?aop增強(qiáng)的類生成cglib類,需要Superclass才能獲取定義的字段
          ??????Field[]?declaredFields?=?clazz.getDeclaredFields();
          ??????//設(shè)置InjectService的代理類
          ??????for?(Field?field?:?declaredFields)?{
          ????????InjectService?injectService?=?field.getAnnotation(InjectService.class);
          ????????if?(injectService?==?null)?{continue;
          ????????Class?fieldClass?=?field.getType();
          ????????Object?object?=?context.getBean(name);
          ????????field.set(object,?clientProxyFactory.getProxy(fieldClass,?injectService.group(),?injectService.version()));
          ????????ServerDiscoveryCache.SERVER_CLASS_NAMES.add(fieldClass.getName());
          ??????}
          ????}
          ??}
          ??protected?abstract?void?startServer(ApplicationContext?context);
          }

          調(diào)用 ClientProxyFactory 類的 getProxy ,根據(jù)服務(wù)接口、服務(wù)分組、服務(wù)版本、是否異步調(diào)用來創(chuàng)建該接口的代理類,對該接口的所有方法都會使用創(chuàng)建的代理類來調(diào)用。方法調(diào)用的實(shí)現(xiàn)細(xì)節(jié)都在 ClientInvocationHandler 中的 invoke 方法,主要內(nèi)容是,獲取服務(wù)節(jié)點(diǎn)信息,選擇調(diào)用節(jié)點(diǎn),構(gòu)建 request 對象,最后調(diào)用網(wǎng)絡(luò)模塊發(fā)送請求。

          public?class?ClientProxyFactory?{
          ????public??T?getProxy(Class?clazz,?String?group,?String?version,?boolean?async)?{
          ????????return?(T)?objectCache.computeIfAbsent(clazz.getName()?+?group?+?version,?clz?->?Proxy.newProxyInstance(clazz.getClassLoader(),?new?Class[]{clazz},?new?ClientInvocationHandler(clazz,?group,?version,?async)));
          ????}
          ????private?class?ClientInvocationHandler?implements?InvocationHandler?{
          ????????public?ClientInvocationHandler(Class?clazz,?String?group,?String?version,?boolean?async)?{
          ????????}
          ??????
          ????????@Override
          ????????public?Object?invoke(Object?proxy,?Method?method,?Object[]?args)?throws?Throwable?{
          ????????????//1.?獲得服務(wù)信息
          ????????????String?serviceName?=?clazz.getName();
          ????????????List?serviceList?=?getServiceList(serviceName);
          ????????????Service?service?=?loadBalance.selectOne(serviceList);
          ????????????//2.?構(gòu)建request對象
          ????????????RpcRequest?rpcRequest?=?new?RpcRequest();
          ????????????rpcRequest.setServiceName(service.getName());
          ????????????rpcRequest.setMethod(method.getName());
          ????????????rpcRequest.setGroup(group);
          ????????????rpcRequest.setVersion(version);
          ????????????rpcRequest.setParameters(args);
          ????????????rpcRequest.setParametersTypes(method.getParameterTypes());
          ????????????//3.?協(xié)議編組
          ????????????RpcProtocolEnum?messageProtocol?=?RpcProtocolEnum.getProtocol(service.getProtocol());
          ????????????RpcCompressEnum?compresser?=?RpcCompressEnum.getCompress(service.getCompress());
          ????????????RpcResponse?response?=?netClient.sendRequest(rpcRequest,?service,?messageProtocol,?compresser);
          ????????????return?response.getReturnValue();
          ????????}
          ????}
          }

          3. 網(wǎng)絡(luò)傳輸

          客戶端封裝調(diào)用請求對象之后需要通過網(wǎng)絡(luò)將調(diào)用信息發(fā)送到服務(wù)端,在發(fā)送請求對象之前還需要經(jīng)歷序列化、壓縮兩個(gè)階段。

          序列化與反序列化

          序列化與反序列化的核心作用就是對象的保存與重建,方便客戶端與服務(wù)端通過字節(jié)流傳遞對象,快速對接交互。

          • 序列化就是指把 Java 對象轉(zhuǎn)換為字節(jié)序列的過程。
          • 反序列化就是指把字節(jié)序列恢復(fù)為 Java 對象的過程。

          Java序列化的方式有很多,諸如 JDK 自帶的 Serializable 、 Protobufkryo 等,上述三種筆者自測性能最高的是 Kryo 、其次是 Protobuf 。Json 也不失為一種簡單且高效的序列化方法,有很多大道至簡的框架采用。序列化接口比較簡單,讀者可以自行查看實(shí)現(xiàn)代碼。

          public?interface?MessageProtocol?{
          ????byte[]?marshallingRequest(RpcRequest?request)?throws?Exception;

          ????RpcRequest?unmarshallingRequest(byte[]?data)?throws?Exception;

          ????byte[]?marshallingResponse(RpcResponse?response)?throws?Exception;

          ????RpcResponse?unmarshallingResponse(byte[]?data)?throws?Exception;
          }

          壓縮與解壓

          網(wǎng)絡(luò)通信的成本很高,為了減小網(wǎng)絡(luò)傳輸數(shù)據(jù)包的體積,將序列化之后的字節(jié)碼壓縮不失為一種很好的選擇。Gzip 壓縮算法比率在3到10倍左右,可以大大節(jié)省服務(wù)器的網(wǎng)絡(luò)帶寬,各種流行的 web 服務(wù)器也都支持 Gzip 壓縮算法。Java 接入也比較容易,接入代碼可以查看下方接口的實(shí)現(xiàn)。

          public?interface?Compresser?{
          ????byte[]?compress(byte[]?bytes);

          ????byte[]?decompress(byte[]?bytes);
          }

          網(wǎng)絡(luò)通信

          萬事俱備只欠東風(fēng)。將請求對象序列化成字節(jié)碼,并且壓縮體積之后,需要使用網(wǎng)絡(luò)將字節(jié)碼傳輸?shù)椒?wù)器。常用網(wǎng)絡(luò)傳輸協(xié)議有 HTTP 、 TCP 、 WebSocke t等。HTTP、WebSocket 是應(yīng)用層協(xié)議,TCP 是傳輸層協(xié)議。有些追求簡潔、易用的 RPC 框架也有選擇 HTTP 協(xié)議的。TCP傳輸?shù)母呖煽啃院蜆O致性能是主流RPC框架選擇的最主要原因。談到 Java 生態(tài)的通信領(lǐng)域,Netty 的領(lǐng)銜地位短時(shí)間內(nèi)無人能及。選用 Netty 作為網(wǎng)絡(luò)通信模塊, TCP 數(shù)據(jù)流的粘包、拆包不可避免。

          粘包、拆包問題

          TCP 傳輸協(xié)議是一種面向連接的、可靠的、基于字節(jié)流的傳輸層通信協(xié)議。為了最大化傳輸效率。發(fā)送方可能將單個(gè)較小數(shù)據(jù)包合并發(fā)送,這種情況就需要接收方來拆包處理數(shù)據(jù)了。

          Netty 提供了3種類型的解碼器來處理 TCP 粘包/拆包問題:

          • 定長消息解碼器:FixedLengthFrameDecoder 。發(fā)送方和接收方規(guī)定一個(gè)固定的消息長度,不夠用空格等字符補(bǔ)全,這樣接收方每次從接受到的字節(jié)流中讀取固定長度的字節(jié)即可,長度不夠就保留本次接受的數(shù)據(jù),再在下一個(gè)字節(jié)流中獲取剩下數(shù)量的字節(jié)數(shù)據(jù)。
          • 分隔符解碼器:LineBasedFrameDecoderDelimiterBasedFrameDecoder。LineBasedFrameDecoder 是行分隔符解碼器,分隔符為 \n\r\nDelimiterBasedFrameDecoder 是自定義分隔符解碼器,可以定義一個(gè)或多個(gè)分隔符。接收端在收到的字節(jié)流中查找分隔符,然后返回分隔符之前的數(shù)據(jù),沒找到就繼續(xù)從下一個(gè)字節(jié)流中查找。
          • 數(shù)據(jù)長度解碼器:LengthFieldBasedFrameDecoder。將發(fā)送的消息分為 header 和 body,header 存儲消息的長度(字節(jié)數(shù)),body 是發(fā)送的消息的內(nèi)容。同時(shí)發(fā)送方和接收方要協(xié)商好這個(gè) header 的字節(jié)數(shù),因?yàn)?int 能表示長度,long 也能表示長度。接收方首先從字節(jié)流中讀取前n(header的字節(jié)數(shù))個(gè)字節(jié)(header),然后根據(jù)長度讀取等量的字節(jié),不夠就從下一個(gè)數(shù)據(jù)流中查找。

          不想使用內(nèi)置的解碼器也可自定義解碼器,自定傳輸協(xié)議。

          網(wǎng)絡(luò)通信這部分內(nèi)容比較復(fù)雜,說來話長,代碼易讀,讀者可先自行閱讀代碼。后續(xù)有機(jī)會細(xì)說此節(jié)內(nèi)容。

          5. 服務(wù)端實(shí)現(xiàn)

          客戶端通過網(wǎng)絡(luò)傳輸將請求對象序列化、壓縮之后的字節(jié)碼傳輸?shù)椒?wù)端之后,同樣先通過解壓、反序列化將字節(jié)碼重建為請求對象。有了請求對象之后,就可以進(jìn)行關(guān)鍵的方法調(diào)用環(huán)節(jié)了。

          public?abstract?class?RequestBaseHandler?{
          ????public?RpcResponse?handleRequest(RpcRequest?request)?throws?Exception?{
          ????????//1.?查找目標(biāo)服務(wù)代理對象
          ????????ServiceObject?serviceObject?=?serverRegister.getServiceObject(request.getServiceName()?+?request.getGroup()?+?request.getVersion());
          ????????RpcResponse?response?=?null;
          ????????//2.?調(diào)用對應(yīng)的方法
          ????????response?=?invoke(serviceObject,?request);
          ????????//響應(yīng)客戶端
          ????????return?response;
          ????}
          ????//具體代理調(diào)用
          ????public?abstract?RpcResponse?invoke(ServiceObject?serviceObject,?RpcRequest?request)?throws?Exception;
          }

          上述抽象類 RequestBaseHandler 是調(diào)用服務(wù)方法的抽象實(shí)現(xiàn) handleRequest 通過請求對象的服務(wù)名、服務(wù)分組、服務(wù)版本在 serverRegister.getServiceObject 獲取代理對象。然后調(diào)用 invoke 抽象方法來真正通過代理對象調(diào)用方法獲得結(jié)果。

          1. 服務(wù)的代理對象怎么產(chǎn)生的?
          2. 如何通過代理對象調(diào)用方法?

          生成服務(wù)代理對象

          帶著上述問題來看 DefaultRpcBaseProcessor 抽象類:

          public?abstract?class?DefaultRpcBaseProcessor?implements?ApplicationListener<ContextRefreshedEvent>?{
          ????@Override
          ????public?void?onApplicationEvent(ContextRefreshedEvent?contextRefreshedEvent)?{
          ????????//Spring啟動完畢會收到Event
          ????????if?(Objects.isNull(contextRefreshedEvent.getApplicationContext().getParent()))?{
          ????????????ApplicationContext?applicationContext?=?contextRefreshedEvent.getApplicationContext();
          ????????????Container.setSpringContext(applicationContext);
          ????????????startServer(applicationContext);
          ????????????injectService(applicationContext);
          ????????}
          ????}
          ????private?void?injectService(ApplicationContext?context)?{}

          ????protected?abstract?void?startServer(ApplicationContext?context);
          }

          DefaultRpcBaseProcessor 抽象類也有兩個(gè)實(shí)現(xiàn)類 DefaultRpcReflectProcessorDefaultRpcJavassistProcessor,來實(shí)現(xiàn)關(guān)鍵的生成代理對象的 startServer 方法。

          服務(wù)接口實(shí)現(xiàn)類的 Bean 作為代理對象
          public?class?DefaultRpcReflectProcessor?extends?DefaultRpcBaseProcessor?{
          ????@Override
          ????protected?void?startServer(ApplicationContext?context)?{
          ????????Map?beans?=?context.getBeansWithAnnotation(RpcService.class);
          ????????if?(beans.size()?>?0)?{
          ????????????boolean?startServerFlag?=?true;
          ????????????for?(Object?obj?:?beans.values())?{
          ??????????????Class?clazz?=?obj.getClass();
          ??????????????Class[]?interfaces?=?clazz.getInterfaces();
          ??????????????/*?如果只實(shí)現(xiàn)了一個(gè)接口就用接口的className作為服務(wù)名
          ???????????????*?如果該類實(shí)現(xiàn)了多個(gè)接口,則使用注解里的value作為服務(wù)名
          ???????????????*/

          ??????????????RpcService?service?=?clazz.getAnnotation(RpcService.class);
          ??????????????if?(interfaces.length?!=?1)?{
          ????????????????String?value?=?service.value();
          ????????????????ServiceObject?so?=?new?ServiceObject(value,?Class.forName(value),?obj,?service.group(),?service.version());
          ??????????????}?else?{
          ????????????????Class?supperClass?=?interfaces[0];
          ????????????????ServiceObject?so?=?new?ServiceObject(supperClass.getName(),?supperClass,?obj,?service.group(),?service.version());
          ??????????????}
          ??????????????serverRegister.register(so);
          ????????????}
          ????????}
          ????}
          }

          DefaultRpcReflectProcessor 中獲取到所有有 RpcService 注解的服務(wù)接口實(shí)現(xiàn)類 Bean,然后將該 Bean 作為服務(wù)代理對象注冊到 serverRegister 中供上述的反射調(diào)用中使用。

          使用 Javassist 生成新的代理對象
          public?class?DefaultRpcJavassistProcessor?extends?DefaultRpcBaseProcessor?{
          ????@Override
          ????protected?void?startServer(ApplicationContext?context)?{
          ????????Map?beans?=?context.getBeansWithAnnotation(RpcService.class);
          ????????if?(beans.size()?>?0)?{
          ????????????boolean?startServerFlag?=?true;
          ????????????for?(Map.Entry?entry?:?beans.entrySet())?{
          ??????????????String?beanName?=?entry.getKey();
          ??????????????Object?obj??=?entry.getValue();
          ??????????????Class?clazz?=?obj.getClass();
          ??????????????Class[]?interfaces?=?clazz.getInterfaces();
          ??????????????Method[]?declaredMethods?=?clazz.getDeclaredMethods();
          ??????????????/*
          ???????????????*?如果只實(shí)現(xiàn)了一個(gè)接口就用接口的className作為服務(wù)名
          ???????????????*?如果該類實(shí)現(xiàn)了多個(gè)接口,則使用注解里的value作為服務(wù)名
          ???????????????*/

          ??????????????RpcService?service?=?clazz.getAnnotation(RpcService.class);
          ??????????????if?(interfaces.length?!=?1)?{
          ????????????????String?value?=?service.value();
          ????????????????//bean實(shí)現(xiàn)多個(gè)接口時(shí),javassist代理類中生成的方法只按照注解指定的服務(wù)類來生成
          ????????????????declaredMethods?=?Class.forName(value).getDeclaredMethods();
          ????????????????Object?proxy?=?ProxyFactory.makeProxy(value,?beanName,?declaredMethods);
          ????????????????ServiceObject?so?=?new?ServiceObject(value,?Class.forName(value),?proxy,?service.group(),?service.version());
          ??????????????}?else?{
          ????????????????Class?supperClass?=?interfaces[0];
          ????????????????Object?proxy?=?ProxyFactory.makeProxy(supperClass.getName(),?beanName,?declaredMethods);
          ????????????????ServiceObject?so?=?new?ServiceObject(supperClass.getName(),?supperClass,?proxy,?service.group(),?service.version());
          ??????????????}
          ??????????????serverRegister.register(so);
          ????????????}
          ????????}
          ????}
          }

          DefaultRpcJavassistProcessor DefaultRpcReflectProcessor 的差異在于后者直接將服務(wù)實(shí)現(xiàn)類對象 Bean 作為服務(wù)代理對象,而前者通過 ProxyFactory.makeProxy(value, beanName, declaredMethods) 創(chuàng)建了新的代理對象,將新的代理對象注冊到 serverRegister 中供后續(xù)調(diào)用調(diào)用中使用。該方法通過 Javassist 來生成代理類,代碼冗長,建議閱讀源碼。我來通過下面的代碼演示實(shí)現(xiàn)的代理類。

          首先我們的服務(wù)接口是:

          public?interface?HelloService?{
          ????String?hello(String?name);
          }

          服務(wù)的實(shí)現(xiàn)類是:

          @RpcService
          public?class?HelloServiceImpl?implements?HelloService?{
          ????@Override
          ????public?String?hello(String?name)?{
          ????????return?"a1";
          ????}
          }

          那最終新生成的代理類是這樣的:

          public?class?HelloService$proxy1649315143476?{
          ????private?static?cn.ppphuang.rpcspringstarter.service.HelloService?serviceProxy?=?
          ?((org.springframework.context.ApplicationContext)cn.ppphuang.rpcspringstarter.server.Container.getSpringContext()).getBean("helloServiceImpl");
          ??
          ????public?cn.ppphuang.rpcspringstarter.common.model.RpcResponse?hello(cn.ppphuang.rpcspringstarter.common.model.RpcRequest?request)?throws?java.lang.Exception?{
          ????????java.lang.Object[]?params?=?request.getParameters();
          ????????if(params.length?==?1
          ???????????&&?(params[0]?==?null||params[0].getClass().getSimpleName().equalsIgnoreCase("String"))){
          ????????????java.lang.String?arg0?=?null;
          ????????????arg0?=?cn.ppphuang.rpcspringstarter.util.ConvertUtil.convertToString(params[0]);
          ????????????java.lang.String?returnValue?=?serviceProxy.hello(arg0);
          ????????????return?new?cn.ppphuang.rpcspringstarter.common.model.RpcResponse(returnValue);
          ????????}
          ????}
          ??
          ????public?cn.ppphuang.rpcspringstarter.common.model.RpcResponse?invoke(cn.ppphuang.rpcspringstarter.common.model.RpcRequest?request)?throws?java.lang.Exception?{
          ????????String?methodName?=?request.getMethod();
          ????????if(methodName.equalsIgnoreCase("hello")){
          ????????????java.lang.Object?returnValue?=?hello(request);
          ????????????return?returnValue;
          ????????}
          ????}
          }

          清理全限定類名后,代碼如下:

          public?class?HelloService$proxy1649315143476?{
          ????private?static?HelloService?serviceProxy?=?((ApplicationContext)Container.getSpringContext()).getBean("helloServiceImpl");
          ??
          ????public?RpcResponse?hello(RpcRequest?request)?throws?Exception?{
          ????????Object[]?params?=?request.getParameters();
          ????????if(params.length?==?1
          ???????????&&?(params[0]?==?null||?params[0].getClass().getSimpleName().equalsIgnoreCase("String"))){
          ????????????String?arg0?=?ConvertUtil.convertToString(params[0]);
          ????????????String?returnValue?=?serviceProxy.hello(arg0);
          ????????????return?new?RpcResponse(returnValue);
          ????????}
          ????}

          ????public?RpcResponse?invoke(RpcRequest?request)?throws?Exception?{
          ????????String?methodName?=?request.getMethod();
          ????????if(methodName.equalsIgnoreCase("hello")){
          ????????????Object?returnValue?=?hello(request);
          ????????????return?returnValue;
          ????????}
          ????}
          }
          • 代理類 HelloService$proxy1649315143476 中有一個(gè)服務(wù)接口類型 HelloService 的靜態(tài)屬性 serviceProxy,值就是通過 ApplicationContext 上下文獲取到的服務(wù)接口實(shí)現(xiàn)類 HelloServiceImpl 這個(gè) BeanSpringContext 已經(jīng)被提前緩存到 Container 類中,讀者可以自行查找代碼了解)。
          • public RpcResponse invoke(RpcRequest request) throws Exception 該方法判斷調(diào)用的方法名是 hello 來調(diào)用代理類中的hello方法。
          • public RpcResponse hello(RpcRequest request) throws Exception 該方法通過調(diào)用 serviceProxy.hello() 的方法獲取結(jié)果。
          public?interface?InvokeProxy?{
          ????/**
          ?????*?invoke調(diào)用服務(wù)接口
          ?????*/

          ????RpcResponse?invoke(RpcRequest?rpcRequest)?throws?Exception;
          }

          HelloService$proxy1649315143476 類實(shí)現(xiàn) InvokeProxy 接口(ProxyFactory.makeProxy 代碼中有體現(xiàn))。InvokeProxy 接口只有一個(gè) invoke 方法。到這里就能理解通過調(diào)用代理對象的 invoke 方法就能間接調(diào)用到服務(wù)接口實(shí)現(xiàn)類 HelloServiceImpl 的對應(yīng)方法了。

          調(diào)用代理對象方法

          理清代理對象的生成之后,開始調(diào)用代理對象的方法。

          上文中寫到的抽象類 RequestBaseHandler 有兩個(gè)實(shí)現(xiàn)類 RequestJavassistHandlerRequestReflectHandler。

          Java 反射調(diào)用

          先看 RequestReflectHandler

          public?class?RequestReflectHandler?extends?RequestBaseHandler?{
          ????@Override
          ????public?RpcResponse?invoke(ServiceObject?serviceObject,?RpcRequest?request)?throws?Exception?{
          ????????Method?method?=?serviceObject.getClazz().getMethod(request.getMethod(),?request.getParametersTypes());
          ????????Object?value?=?method.invoke(serviceObject.getObj(),?request.getParameters());
          ????????RpcResponse?response?=?new?RpcResponse(RpcStatusEnum.SUCCESS);
          ????????response.setReturnValue(value);
          ????????return?response;
          ????}
          }

          Object value = method.invoke(serviceObject.getObj(), request.getParameters());

          這行代碼都很熟悉,用 Java 框架中最常見的反射來調(diào)用代理類中的方法,大部分 RPC 框架也都是這么來實(shí)現(xiàn)的。

          通過 Javassists 生成的代理對象 invoke 方法調(diào)用

          接著看 RequestJavassistHandler:

          public?class?RequestJavassistHandler?extends?RequestBaseHandler?{
          ????@Override
          ????public?RpcResponse?invoke(ServiceObject?serviceObject,?RpcRequest?request)?throws?Exception?{
          ????????InvokeProxy?invokeProxy?=?(InvokeProxy)?serviceObject.getObj();
          ????????return?invokeProxy.invoke(request);
          ????}
          }

          直接將代理對象轉(zhuǎn)為 InvokeProxy,調(diào)用 InvokeProxy.invoke() 方法獲得返回值,如果這里不能理解,回頭再看一下使用 Javassist 生成新的代理對象這個(gè)小節(jié)吧。

          調(diào)用代理對象的方法獲取到結(jié)果,仍要通過序列化、壓縮后,將字節(jié)流數(shù)據(jù)包通過網(wǎng)絡(luò)傳輸?shù)娇蛻舳?,客戶端拿到響?yīng)的結(jié)果再解壓,反序列化得到結(jié)果對象。

          Javassist介紹

          Javassist 是一個(gè)開源的分析、編輯和創(chuàng)建Java字節(jié)碼的類庫。是由東京工業(yè)大學(xué)的數(shù)學(xué)和計(jì)算機(jī)科學(xué)系的 ?Shigeru Chiba(千葉滋)所創(chuàng)建的。簡單來說就是用源碼級別的 api 去修改字節(jié)碼。Duboo、MyBatis 也都使用了 Javassist。Duboo 作者也選擇Javassist作為 Duboo 的代理工具,可以點(diǎn)擊這里查看 Duboo 作者也選擇 Javassist 的原因。

          Javassist 還能和諧(pojie)Java 編寫的商業(yè)軟件,例如抓包工具 Charles。代碼在這里,供交流學(xué)習(xí)。

          在使用 Javassist 有踩到如下坑,供大家參考:

          1. Javassist 是運(yùn)行時(shí),沒有 JDK 靜態(tài)編譯過程,JDK 的很多語法糖都是在靜態(tài)編譯過程中處理的,所以需要自行編碼處理,例如自動拆裝箱。

            int?i?=?1;
            Integer?ii?=?i;???????????????//javassist?錯(cuò)誤?JDK會自動裝箱,javassist需要自行編碼處理?

            int?i?=?1;
            Integer?ii?=?new?Integer(i);??//javassist?正確
          2. 自定義的類需要使用類的完全限定名,這也是為什么生成的代理類中類都是完全限定名。

          選擇哪種代理方式

          可以通過配置文件 application.properties 修改 hp.rpc.server-proxy-type 的值來選擇代理模式。

          性能測試,機(jī)器 Macbook Pro M1 8C 16G, 代碼如下:

          @Autowired
          ClientProxyFactory?clientProxyFactory;
          @Test
          void?contextLoads()?{
          ??long?l1?=?System.currentTimeMillis();
          ??HelloService?proxy?=?clientProxyFactory.getProxy(HelloService.class,"group3","version3");
          ??for?(int?i?=?0;?i?1000000;?i++)?{
          ????String?ppphuang?=?proxy.hello("ppphuang");
          ??}
          ??long?l2?=?System.currentTimeMillis();
          ??long?l3?=?l2?-?l1;
          ??System.out.println(l3);
          }

          測試結(jié)果(ms):

          請求次數(shù)反射調(diào)用1反射調(diào)用2反射調(diào)用3Javassist1Javassist2Javassist3
          10000130311591164112612351094
          100000611061036065625958546178
          1000000544755189052329525605209952794

          測試結(jié)果差異并不大,Javassist 模式下只是稍快了一點(diǎn)點(diǎn),幾乎可以忽略不記。與Duboo作者博客6樓評論的測試結(jié)果一致。所以想簡單通用性強(qiáng)用反射模式,也可以通過使用 Javassist 模式來學(xué)習(xí)更多知識,因?yàn)?Javassist 需要自己兼容很多特殊的狀況,反射調(diào)用 JDK 已經(jīng)幫你兼容完了。

          總結(jié)

          寫到這里我們了解了 RPC 的基本原理、服務(wù)注冊與發(fā)現(xiàn)、客戶端代理、網(wǎng)絡(luò)傳輸、重點(diǎn)介紹了服務(wù)端的兩種代理模式,學(xué)習(xí) Javassist 如何實(shí)現(xiàn)代理。

          還有很多東西沒有重點(diǎn)講述甚至沒有提及,例如粘、拆包的處理、自定義數(shù)據(jù)包協(xié)議、Javassist 模式下如何實(shí)現(xiàn)方法重載、如何解決一個(gè)服務(wù)接口類有多個(gè)實(shí)現(xiàn)、如何解決一個(gè)實(shí)現(xiàn)類實(shí)現(xiàn)了多個(gè)服務(wù)接口、在 SpringBoot 中如何自動裝載、如何開箱即用、怎么實(shí)現(xiàn)異步調(diào)用、怎么擴(kuò)展序列化、壓縮算法等等...有興趣的讀者可以在源碼中尋找答案,或者尋找優(yōu)化項(xiàng),當(dāng)然也可以尋找 bug 。如果讀者能理解整個(gè)項(xiàng)目的實(shí)現(xiàn),相信你一定會有所收獲。后續(xù)有機(jī)會也會再寫文章與大家交流學(xué)習(xí)。因筆者水平有限,不完善的地方請大家斧正。感謝各位的閱讀,謝謝。

          附錄

          項(xiàng)目地址:https://github.com/ppphuang/rpc-spring-starter

          測試DEMO:https://github.com/ppphuang/rpc-spring-starter-demo

          - END -



          往期精彩文章:


          如何使用注解優(yōu)雅的記錄操作日志


          你買的云服務(wù)器,可能正泡在水里。


          模仿UP主,做一個(gè)彈幕控制的直播間!


          如何保證同事的代碼不會腐爛?一文帶你了解 Alibaba COLA 架構(gòu)


          誰會拒絕一臺Win11和MacOS無縫切換的MacBook呢?Parallels17極速體驗(yàn)


          瀏覽 61
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  h文在线观看 | 中国美女操逼网站 | av短片在线播放婷婷 | 伦中文亚洲| 激情视频综合网 |