<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          手寫RPC框架,理解更透徹,代碼已上傳Github!

          共 17148字,需瀏覽 35分鐘

           ·

          2021-02-19 00:26

          程序員的成長之路
          互聯(lián)網(wǎng)/程序員/技術(shù)/資料共享?
          關(guān)注


          閱讀本文大概需要 9?分鐘。

          來自:https://www.cnblogs.com/2YSP/p/13545217.html

          一、前言

          前段時(shí)間看到一篇不錯(cuò)的文章《看了這篇你就會(huì)手寫RPC框架了》,于是便來了興趣對著實(shí)現(xiàn)了一遍,后面覺得還有很多優(yōu)化的地方便對其進(jìn)行了改進(jìn)。
          主要改動(dòng)點(diǎn)如下:
          1. 除了Java序列化協(xié)議,增加了protobuf和kryo序列化協(xié)議,配置即用。
          2. 增加多種負(fù)載均衡算法(隨機(jī)、輪詢、加權(quán)輪詢、平滑加權(quán)輪詢),配置即用。
          3. 客戶端增加本地服務(wù)列表緩存,提高性能。
          4. 修復(fù)高并發(fā)情況下,netty導(dǎo)致的內(nèi)存泄漏問題
          5. 由原來的每個(gè)請求建立一次連接,改為建立TCP長連接,并多次復(fù)用。
          6. 服務(wù)端增加線程池提高消息處理能力

          二、介紹

          RPC,即 Remote Procedure Call(遠(yuǎn)程過程調(diào)用),調(diào)用遠(yuǎn)程計(jì)算機(jī)上的服務(wù),就像調(diào)用本地服務(wù)一樣。RPC可以很好的解耦系統(tǒng),如WebService就是一種基于Http協(xié)議的RPC。
          調(diào)用示意圖
          調(diào)用示意圖
          總的來說,就如下幾個(gè)步驟:
          1. 客戶端(ServerA)執(zhí)行遠(yuǎn)程方法時(shí)就調(diào)用client stub傳遞類名、方法名和參數(shù)等信息。
          2. client stub會(huì)將參數(shù)等信息序列化為二進(jìn)制流的形式,然后通過Sockect發(fā)送給服務(wù)端(ServerB)
          3. 服務(wù)端收到數(shù)據(jù)包后,server stub 需要進(jìn)行解析反序列化為類名、方法名和參數(shù)等信息。
          4. server stub調(diào)用對應(yīng)的本地方法,并把執(zhí)行結(jié)果返回給客戶端
          所以一個(gè)RPC框架有如下角色:

          服務(wù)消費(fèi)者

          遠(yuǎn)程方法的調(diào)用方,即客戶端。一個(gè)服務(wù)既可以是消費(fèi)者也可以是提供者。

          服務(wù)提供者

          遠(yuǎn)程服務(wù)的提供方,即服務(wù)端。一個(gè)服務(wù)既可以是消費(fèi)者也可以是提供者。

          注冊中心

          保存服務(wù)提供者的服務(wù)地址等信息,一般由zookeeper、redis等實(shí)現(xiàn)。

          監(jiān)控運(yùn)維(可選)

          監(jiān)控接口的響應(yīng)時(shí)間、統(tǒng)計(jì)請求數(shù)量等,及時(shí)發(fā)現(xiàn)系統(tǒng)問題并發(fā)出告警通知。

          三、實(shí)現(xiàn)

          本RPC框架rpc-spring-boot-starter涉及技術(shù)棧如下:
          • 使用zookeeper作為注冊中心
          • 使用netty作為通信框架
          • 消息編解碼:protostuff、kryo、java
          • spring
          • 使用SPI來根據(jù)配置動(dòng)態(tài)選擇負(fù)載均衡算法等
          由于代碼過多,這里只講幾處改動(dòng)點(diǎn)。

          3.1動(dòng)態(tài)負(fù)載均衡算法

          1.編寫LoadBalance的實(shí)現(xiàn)類
          負(fù)載均衡算法實(shí)現(xiàn)類
          2.自定義注解?@LoadBalanceAno

          /**
          ?*?負(fù)載均衡注解
          ?*/

          @Target(ElementType.TYPE)
          @Retention(RetentionPolicy.RUNTIME)
          @Documented
          public?@interface?LoadBalanceAno?{

          ????String?value()?default?"";
          }

          /**
          ?*?輪詢算法
          ?*/

          @LoadBalanceAno(RpcConstant.BALANCE_ROUND)
          public?class?FullRoundBalance?implements?LoadBalance?{

          ????private?static?Logger?logger?=?LoggerFactory.getLogger(FullRoundBalance.class);

          ????private?volatile?int?index;

          ????@Override
          ????public?synchronized?Service?chooseOne(List?services)?{
          ????????//?加鎖防止多線程情況下,index超出services.size()
          ????????if?(index?==?services.size())?{
          ????????????index?=?0;
          ????????}
          ????????return?services.get(index++);
          ????}
          }

          3.新建在resource目錄下META-INF/servers文件夾并創(chuàng)建文件
          enter description here
          4.RpcConfig增加配置項(xiàng)loadBalance

          /**
          ?*?@author?2YSP
          ?*?@date?2020/7/26?15:13
          ?*/

          @ConfigurationProperties(prefix?=?"sp.rpc")
          public?class?RpcConfig?{

          ????/**
          ?????*?服務(wù)注冊中心地址
          ?????*/

          ????private?String?registerAddress?=?"127.0.0.1:2181";

          ????/**
          ?????*?服務(wù)暴露端口
          ?????*/

          ????private?Integer?serverPort?=?9999;
          ????/**
          ?????*?服務(wù)協(xié)議
          ?????*/

          ????private?String?protocol?=?"java";
          ????/**
          ?????*?負(fù)載均衡算法
          ?????*/

          ????private?String?loadBalance?=?"random";
          ????/**
          ?????*?權(quán)重,默認(rèn)為1
          ?????*/

          ????private?Integer?weight?=?1;

          ???//?省略getter?setter
          }

          5.在自動(dòng)配置類RpcAutoConfiguration根據(jù)配置選擇對應(yīng)的算法實(shí)現(xiàn)類

          /**
          ?????*?使用spi匹配符合配置的負(fù)載均衡算法
          ?????*
          ?????*?@param?name
          ?????*?@return
          ?????*/

          ????private?LoadBalance?getLoadBalance(String?name)?{
          ????????ServiceLoader?loader?=?ServiceLoader.load(LoadBalance.class);
          ????????Iterator?iterator?=?loader.iterator();
          ????????while?(iterator.hasNext())?{
          ????????????LoadBalance?loadBalance?=?iterator.next();
          ????????????LoadBalanceAno?ano?=?loadBalance.getClass().getAnnotation(LoadBalanceAno.class);
          ????????????Assert.notNull(ano,?"load?balance?name?can?not?be?empty!");
          ????????????if?(name.equals(ano.value()))?{
          ????????????????return?loadBalance;
          ????????????}
          ????????}
          ????????throw?new?RpcException("invalid?load?balance?config");
          ????}

          ?@Bean
          ????public?ClientProxyFactory?proxyFactory(@Autowired?RpcConfig?rpcConfig)?{
          ????????ClientProxyFactory?clientProxyFactory?=?new?ClientProxyFactory();
          ????????//?設(shè)置服務(wù)發(fā)現(xiàn)著
          ????????clientProxyFactory.setServerDiscovery(new???????????ZookeeperServerDiscovery(rpcConfig.getRegisterAddress()));

          ????????//?設(shè)置支持的協(xié)議
          ????????Map?supportMessageProtocols?=?buildSupportMessageProtocols();
          ????????clientProxyFactory.setSupportMessageProtocols(supportMessageProtocols);
          ????????//?設(shè)置負(fù)載均衡算法
          ????????LoadBalance?loadBalance?=?getLoadBalance(rpcConfig.getLoadBalance());
          ????????clientProxyFactory.setLoadBalance(loadBalance);
          ????????//?設(shè)置網(wǎng)絡(luò)層實(shí)現(xiàn)
          ????????clientProxyFactory.setNetClient(new?NettyNetClient());

          ????????return?clientProxyFactory;
          ????}

          3.2本地服務(wù)列表緩存

          使用Map來緩存數(shù)據(jù)

          /**
          ?*?服務(wù)發(fā)現(xiàn)本地緩存
          ?*/

          public?class?ServerDiscoveryCache?{
          ????/**
          ?????*?key:?serviceName
          ?????*/

          ????private?static?final?Map>?SERVER_MAP?=?new?ConcurrentHashMap<>();
          ????/**
          ?????*?客戶端注入的遠(yuǎn)程服務(wù)service?class
          ?????*/

          ????public?static?final?List?SERVICE_CLASS_NAMES?=?new?ArrayList<>();

          ????public?static?void?put(String?serviceName,?List?serviceList)?{
          ????????SERVER_MAP.put(serviceName,?serviceList);
          ????}

          ????/**
          ?????*?去除指定的值
          ?????*?@param?serviceName
          ?????*?@param?service
          ?????*/

          ????public?static?void?remove(String?serviceName,?Service?service)?{
          ????????SERVER_MAP.computeIfPresent(serviceName,?(key,?value)?->
          ????????????????value.stream().filter(o?->?!o.toString().equals(service.toString())).collect(Collectors.toList())
          ????????);
          ????}

          ????public?static?void?removeAll(String?serviceName)?{
          ????????SERVER_MAP.remove(serviceName);
          ????}


          ????public?static?boolean?isEmpty(String?serviceName)?{
          ????????return?SERVER_MAP.get(serviceName)?==?null?||?SERVER_MAP.get(serviceName).size()?==?0;
          ????}

          ????public?static?List?get(String?serviceName)?{
          ????????return?SERVER_MAP.get(serviceName);
          ????}
          }

          ClientProxyFactory,先查本地緩存,緩存沒有再查詢zookeeper。

          /**
          ?????*?根據(jù)服務(wù)名獲取可用的服務(wù)地址列表
          ?????*?@param?serviceName
          ?????*?@return
          ?????*/

          ????private?List?getServiceList(String?serviceName)?{
          ????????List?services;
          ????????synchronized?(serviceName){
          ????????????if?(ServerDiscoveryCache.isEmpty(serviceName))?{
          ????????????????services?=?serverDiscovery.findServiceList(serviceName);
          ????????????????if?(services?==?null?||?services.size()?==?0)?{
          ????????????????????throw?new?RpcException("No?provider?available!");
          ????????????????}
          ????????????????ServerDiscoveryCache.put(serviceName,?services);
          ????????????}?else?{
          ????????????????services?=?ServerDiscoveryCache.get(serviceName);
          ????????????}
          ????????}
          ????????return?services;
          ????}

          問題:?如果服務(wù)端因?yàn)殄礄C(jī)或網(wǎng)絡(luò)問題下線了,緩存卻還在就會(huì)導(dǎo)致客戶端請求已經(jīng)不可用的服務(wù)端,增加請求失敗率。**解決方案:**由于服務(wù)端注冊的是臨時(shí)節(jié)點(diǎn),所以如果服務(wù)端下線節(jié)點(diǎn)會(huì)被移除。只要監(jiān)聽zookeeper的子節(jié)點(diǎn),如果新增或刪除子節(jié)點(diǎn)就直接清空本地緩存即可。
          DefaultRpcProcessor

          /**
          ?*?Rpc處理者,支持服務(wù)啟動(dòng)暴露,自動(dòng)注入Service
          ?*?@author?2YSP
          ?*?@date?2020/7/26?14:46
          ?*/

          public?class?DefaultRpcProcessor?implements?ApplicationListener<ContextRefreshedEvent>?{

          ???

          ????@Override
          ????public?void?onApplicationEvent(ContextRefreshedEvent?event)?{
          ????????//?Spring啟動(dòng)完畢過后會(huì)收到一個(gè)事件通知
          ????????if?(Objects.isNull(event.getApplicationContext().getParent())){
          ????????????ApplicationContext?context?=?event.getApplicationContext();
          ????????????//?開啟服務(wù)
          ????????????startServer(context);
          ????????????//?注入Service
          ????????????injectService(context);
          ????????}
          ????}

          ????private?void?injectService(ApplicationContext?context)?{
          ????????String[]?names?=?context.getBeanDefinitionNames();
          ????????for(String?name?:?names){
          ????????????Class?clazz?=?context.getType(name);
          ????????????if?(Objects.isNull(clazz)){
          ????????????????continue;
          ????????????}

          ????????????Field[]?declaredFields?=?clazz.getDeclaredFields();
          ????????????for(Field?field?:?declaredFields){
          ????????????????//?找出標(biāo)記了InjectService注解的屬性
          ????????????????InjectService?injectService?=?field.getAnnotation(InjectService.class);
          ????????????????if?(injectService?==?null){
          ????????????????????continue;
          ????????????????}

          ????????????????Class?fieldClass?=?field.getType();
          ????????????????Object?object?=?context.getBean(name);
          ????????????????field.setAccessible(true);
          ????????????????try?{
          ????????????????????field.set(object,clientProxyFactory.getProxy(fieldClass));
          ????????????????}?catch?(IllegalAccessException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????//?添加本地服務(wù)緩存
          ????????????????ServerDiscoveryCache.SERVICE_CLASS_NAMES.add(fieldClass.getName());
          ????????????}
          ????????}
          ????????//?注冊子節(jié)點(diǎn)監(jiān)聽
          ????????if?(clientProxyFactory.getServerDiscovery()?instanceof?ZookeeperServerDiscovery){
          ????????????ZookeeperServerDiscovery?serverDiscovery?=?(ZookeeperServerDiscovery)?clientProxyFactory.getServerDiscovery();
          ????????????ZkClient?zkClient?=?serverDiscovery.getZkClient();
          ????????????ServerDiscoveryCache.SERVICE_CLASS_NAMES.forEach(name?->{
          ????????????????String?servicePath?=?RpcConstant.ZK_SERVICE_PATH?+?RpcConstant.PATH_DELIMITER?+?name?+?"/service";
          ????????????????zkClient.subscribeChildChanges(servicePath,?new?ZkChildListenerImpl());
          ????????????});
          ????????????logger.info("subscribe?service?zk?node?successfully");
          ????????}

          ????}

          ????private?void?startServer(ApplicationContext?context)?{
          ????????...

          ????}
          }

          ZkChildListenerImpl

          /**
          ?*?子節(jié)點(diǎn)事件監(jiān)聽處理類
          ?*/

          public?class?ZkChildListenerImpl?implements?IZkChildListener?{

          ????private?static?Logger?logger?=?LoggerFactory.getLogger(ZkChildListenerImpl.class);

          ????/**
          ?????*?監(jiān)聽子節(jié)點(diǎn)的刪除和新增事件
          ?????*?@param?parentPath?/rpc/serviceName/service
          ?????*?@param?childList
          ?????*?@throws?Exception
          ?????*/

          ????@Override
          ????public?void?handleChildChange(String?parentPath,?List?childList)?throws?Exception?{
          ????????logger.debug("Child?change?parentPath:[{}]?--?childList:[{}]",?parentPath,?childList);
          ????????//?只要子節(jié)點(diǎn)有改動(dòng)就清空緩存
          ????????String[]?arr?=?parentPath.split("/");
          ????????ServerDiscoveryCache.removeAll(arr[2]);
          ????}
          }

          3.3nettyClient支持TCP長連接

          這部分的改動(dòng)最多,先增加新的sendRequest接口。
          添加接口
          實(shí)現(xiàn)類NettyNetClient

          /**
          ?*?@author?2YSP
          ?*?@date?2020/7/25?20:12
          ?*/

          public?class?NettyNetClient?implements?NetClient?{

          ????private?static?Logger?logger?=?LoggerFactory.getLogger(NettyNetClient.class);

          ????private?static?ExecutorService?threadPool?=?new?ThreadPoolExecutor(4,?10,?200,
          ????????????TimeUnit.SECONDS,?new?LinkedBlockingQueue<>(1000),?new?ThreadFactoryBuilder()
          ????????????.setNameFormat("rpcClient-%d")
          ????????????.build());

          ????private?EventLoopGroup?loopGroup?=?new?NioEventLoopGroup(4);

          ????/**
          ?????*?已連接的服務(wù)緩存
          ?????* key:?服務(wù)地址,格式:ip:port
          ?????*/

          ????public?static?Map?connectedServerNodes?=?new?ConcurrentHashMap<>();

          ????@Override
          ????public?byte[]?sendRequest(byte[]?data,?Service?service)?throws?InterruptedException?{
          ??....
          ????????return?respData;
          ????}

          ????@Override
          ????public?RpcResponse?sendRequest(RpcRequest?rpcRequest,?Service?service,?MessageProtocol?messageProtocol)?{

          ????????String?address?=?service.getAddress();
          ????????synchronized?(address)?{
          ????????????if?(connectedServerNodes.containsKey(address))?{
          ????????????????SendHandlerV2?handler?=?connectedServerNodes.get(address);
          ????????????????logger.info("使用現(xiàn)有的連接");
          ????????????????return?handler.sendRequest(rpcRequest);
          ????????????}

          ????????????String[]?addrInfo?=?address.split(":");
          ????????????final?String?serverAddress?=?addrInfo[0];
          ????????????final?String?serverPort?=?addrInfo[1];
          ????????????final?SendHandlerV2?handler?=?new?SendHandlerV2(messageProtocol,?address);
          ????????????threadPool.submit(()?->?{
          ????????????????????????//?配置客戶端
          ????????????????????????Bootstrap?b?=?new?Bootstrap();
          ????????????????????????b.group(loopGroup).channel(NioSocketChannel.class)
          ????????????????????????????????.option(ChannelOption.TCP_NODELAY,?true)
          ????????????????????????????????.handler(new?ChannelInitializer<SocketChannel>()?
          {
          ????????????????????????????????????@Override
          ????????????????????????????????????protected?void?initChannel(SocketChannel?socketChannel)?throws?Exception?{
          ????????????????????????????????????????ChannelPipeline?pipeline?=?socketChannel.pipeline();
          ????????????????????????????????????????pipeline
          ????????????????????????????????????????????????.addLast(handler);
          ????????????????????????????????????}
          ????????????????????????????????});
          ????????????????????????//?啟用客戶端連接
          ????????????????????????ChannelFuture?channelFuture?=?b.connect(serverAddress,?Integer.parseInt(serverPort));
          ????????????????????????channelFuture.addListener(new?ChannelFutureListener()?{
          ????????????????????????????@Override
          ????????????????????????????public?void?operationComplete(ChannelFuture?channelFuture)?throws?Exception?{
          ????????????????????????????????connectedServerNodes.put(address,?handler);
          ????????????????????????????}
          ????????????????????????});
          ????????????????????}
          ????????????);
          ????????????logger.info("使用新的連接。。。");
          ????????????return?handler.sendRequest(rpcRequest);
          ????????}
          ????}
          }

          每次請求都會(huì)調(diào)用sendRequest()方法,用線程池異步和服務(wù)端創(chuàng)建TCP長連接,連接成功后將SendHandlerV2緩存到ConcurrentHashMap中方便復(fù)用,后續(xù)請求的請求地址(ip+port)如果在connectedServerNodes中存在則使用connectedServerNodes中的handler處理不再重新建立連接。
          SendHandlerV2

          /**
          ?*?@author?2YSP
          ?*?@date?2020/8/19?20:06
          ?*/

          public?class?SendHandlerV2?extends?ChannelInboundHandlerAdapter?{

          ????private?static?Logger?logger?=?LoggerFactory.getLogger(SendHandlerV2.class);

          ????/**
          ?????*?等待通道建立最大時(shí)間
          ?????*/

          ????static?final?int?CHANNEL_WAIT_TIME?=?4;
          ????/**
          ?????*?等待響應(yīng)最大時(shí)間
          ?????*/

          ????static?final?int?RESPONSE_WAIT_TIME?=?8;

          ????private?volatile?Channel?channel;

          ????private?String?remoteAddress;

          ????private?static?Map>?requestMap?=?new?ConcurrentHashMap<>();

          ????private?MessageProtocol?messageProtocol;

          ????private?CountDownLatch?latch?=?new?CountDownLatch(1);

          ????public?SendHandlerV2(MessageProtocol?messageProtocol,String?remoteAddress)?{
          ????????this.messageProtocol?=?messageProtocol;
          ????????this.remoteAddress?=?remoteAddress;
          ????}

          ????@Override
          ????public?void?channelRegistered(ChannelHandlerContext?ctx)?throws?Exception?{
          ????????this.channel?=?ctx.channel();
          ????????latch.countDown();
          ????}

          ????@Override
          ????public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
          ????????logger.debug("Connect?to?server?successfully:{}",?ctx);
          ????}

          ????@Override
          ????public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{
          ????????logger.debug("Client?reads?message:{}",?msg);
          ????????ByteBuf?byteBuf?=?(ByteBuf)?msg;
          ????????byte[]?resp?=?new?byte[byteBuf.readableBytes()];
          ????????byteBuf.readBytes(resp);
          ????????//?手動(dòng)回收
          ????????ReferenceCountUtil.release(byteBuf);
          ????????RpcResponse?response?=?messageProtocol.unmarshallingResponse(resp);
          ????????RpcFuture?future?=?requestMap.get(response.getRequestId());
          ????????future.setResponse(response);
          ????}

          ????@Override
          ????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{
          ????????cause.printStackTrace();
          ????????logger.error("Exception?occurred:{}",?cause.getMessage());
          ????????ctx.close();
          ????}

          ????@Override
          ????public?void?channelReadComplete(ChannelHandlerContext?ctx)?throws?Exception?{
          ????????ctx.flush();
          ????}

          ????@Override
          ????public?void?channelInactive(ChannelHandlerContext?ctx)?throws?Exception?{
          ????????super.channelInactive(ctx);
          ????????logger.error("channel?inactive?with?remoteAddress:[{}]",remoteAddress);
          ????????NettyNetClient.connectedServerNodes.remove(remoteAddress);

          ????}

          ????@Override
          ????public?void?userEventTriggered(ChannelHandlerContext?ctx,?Object?evt)?throws?Exception?{
          ????????super.userEventTriggered(ctx,?evt);
          ????}

          ????public?RpcResponse?sendRequest(RpcRequest?request)?{
          ????????RpcResponse?response;
          ????????RpcFuture?future?=?new?RpcFuture<>();
          ????????requestMap.put(request.getRequestId(),?future);
          ????????try?{
          ????????????byte[]?data?=?messageProtocol.marshallingRequest(request);
          ????????????ByteBuf?reqBuf?=?Unpooled.buffer(data.length);
          ????????????reqBuf.writeBytes(data);
          ????????????if?(latch.await(CHANNEL_WAIT_TIME,TimeUnit.SECONDS)){
          ????????????????channel.writeAndFlush(reqBuf);
          ????????????????//?等待響應(yīng)
          ????????????????response?=?future.get(RESPONSE_WAIT_TIME,?TimeUnit.SECONDS);
          ????????????}else?{
          ????????????????throw?new?RpcException("establish?channel?time?out");
          ????????????}
          ????????}?catch?(Exception?e)?{
          ????????????throw?new?RpcException(e.getMessage());
          ????????}?finally?{
          ????????????requestMap.remove(request.getRequestId());
          ????????}
          ????????return?response;
          ????}
          }

          RpcFuture
          package?cn.sp.rpc.client.net;

          import?java.util.concurrent.*;

          /**
          ?*?@author?2YSP
          ?*?@date?2020/8/19?22:31
          ?*/

          public?class?RpcFuture<T>?implements?Future<T>?{

          ????private?T?response;
          ????/**
          ?????*?因?yàn)檎埱蠛晚憫?yīng)是一一對應(yīng)的,所以這里是1
          ?????*/

          ????private?CountDownLatch?countDownLatch?=?new?CountDownLatch(1);
          ????/**
          ?????*?Future的請求時(shí)間,用于計(jì)算Future是否超時(shí)
          ?????*/

          ????private?long?beginTime?=?System.currentTimeMillis();

          ????@Override
          ????public?boolean?cancel(boolean?mayInterruptIfRunning)?{
          ????????return?false;
          ????}

          ????@Override
          ????public?boolean?isCancelled()?{
          ????????return?false;
          ????}

          ????@Override
          ????public?boolean?isDone()?{
          ????????if?(response?!=?null)?{
          ????????????return?true;
          ????????}
          ????????return?false;
          ????}

          ????/**
          ?????*?獲取響應(yīng),直到有結(jié)果才返回
          ?????*?@return
          ?????*?@throws?InterruptedException
          ?????*?@throws?ExecutionException
          ?????*/

          ????@Override
          ????public?T?get()?throws?InterruptedException,?ExecutionException?{
          ????????countDownLatch.await();
          ????????return?response;
          ????}

          ????@Override
          ????public?T?get(long?timeout,?TimeUnit?unit)?throws?InterruptedException,?ExecutionException,?TimeoutException?{
          ????????if?(countDownLatch.await(timeout,unit)){
          ????????????return?response;
          ????????}
          ????????return?null;
          ????}

          ????public?void?setResponse(T?response)?{
          ????????this.response?=?response;
          ????????countDownLatch.countDown();
          ????}

          ????public?long?getBeginTime()?{
          ????????return?beginTime;
          ????}
          }

          此處邏輯,第一次執(zhí)行 SendHandlerV2#sendRequest()?時(shí)channel需要等待通道建立好之后才能發(fā)送請求,所以用CountDownLatch來控制,等待通道建立。自定義Future+requestMap緩存來實(shí)現(xiàn)netty的請求和阻塞等待響應(yīng),RpcRequest對象在創(chuàng)建時(shí)會(huì)生成一個(gè)請求的唯一標(biāo)識requestId,發(fā)送請求前先將RpcFuture緩存到requestMap中,key為requestId,讀取到服務(wù)端的響應(yīng)信息后(channelRead方法),將響應(yīng)結(jié)果放入對應(yīng)的RpcFuture中。SendHandlerV2#channelInactive()?方法中,如果連接的服務(wù)端異常斷開連接了,則及時(shí)清理緩存中對應(yīng)的serverNode。

          四、壓力測試

          測試環(huán)境:
          • (英特爾)Intel(R) Core(TM) i5-6300HQ CPU @ 2.30GHz 4核
          • windows10家庭版(64位)
          • 16G內(nèi)存
          1.本地啟動(dòng)zookeeper 2.本地啟動(dòng)一個(gè)消費(fèi)者,兩個(gè)服務(wù)端,輪詢算法 3.使用ab進(jìn)行壓力測試,4個(gè)線程發(fā)送10000個(gè)請求

          ab?-c?4?-n?10000?http://localhost:8080/test/user?id=1

          測試結(jié)果
          測試結(jié)果
          從圖片可以看出,10000個(gè)請求只用了11s,比之前的130+秒耗時(shí)減少了10倍以上。
          代碼地址:
          https://github.com/2YSP/rpc-spring-boot-starter
          https://github.com/2YSP/rpc-example

          推薦閱讀:

          程序員牛逼的摸魚神器來了?上班也可以在看股票、基金實(shí)時(shí)數(shù)據(jù)~

          一行代碼搞定Spring Boot反爬蟲,防止接口盜刷!

          5T技術(shù)資源大放送!包括但不限于:C/C++,Linux,Python,Java,PHP,人工智能,單片機(jī),樹莓派,等等。在公眾號內(nèi)回復(fù)「2048」,即可免費(fèi)獲取!!

          微信掃描二維碼,關(guān)注我的公眾號

          朕已閱?

          瀏覽 100
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久久久久久久伊人大香蕉视频 | 久久男人天堂 | 爱爱免费视频 | 亚洲黄色网页 | 国产欧美日韩 |