手寫RPC框架,理解更透徹,代碼已上傳Github!
閱讀本文大概需要 9?分鐘。
來自:https://www.cnblogs.com/2YSP/p/13545217.html
一、前言
除了Java序列化協(xié)議,增加了protobuf和kryo序列化協(xié)議,配置即用。 增加多種負(fù)載均衡算法(隨機(jī)、輪詢、加權(quán)輪詢、平滑加權(quán)輪詢),配置即用。 客戶端增加本地服務(wù)列表緩存,提高性能。 修復(fù)高并發(fā)情況下,netty導(dǎo)致的內(nèi)存泄漏問題 由原來的每個(gè)請求建立一次連接,改為建立TCP長連接,并多次復(fù)用。 服務(wù)端增加線程池提高消息處理能力
二、介紹

客戶端(ServerA)執(zhí)行遠(yuǎn)程方法時(shí)就調(diào)用client stub傳遞類名、方法名和參數(shù)等信息。 client stub會(huì)將參數(shù)等信息序列化為二進(jìn)制流的形式,然后通過Sockect發(fā)送給服務(wù)端(ServerB) 服務(wù)端收到數(shù)據(jù)包后,server stub 需要進(jìn)行解析反序列化為類名、方法名和參數(shù)等信息。 server stub調(diào)用對應(yīng)的本地方法,并把執(zhí)行結(jié)果返回給客戶端
服務(wù)消費(fèi)者
服務(wù)提供者
注冊中心
監(jiān)控運(yùn)維(可選)
三、實(shí)現(xiàn)
使用zookeeper作為注冊中心 使用netty作為通信框架 消息編解碼:protostuff、kryo、java spring 使用SPI來根據(jù)配置動(dòng)態(tài)選擇負(fù)載均衡算法等
3.1動(dòng)態(tài)負(fù)載均衡算法

/**
?*?負(fù)載均衡注解
?*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public?@interface?LoadBalanceAno?{
????String?value()?default?"";
}
/**
?*?輪詢算法
?*/
@LoadBalanceAno(RpcConstant.BALANCE_ROUND)
public?class?FullRoundBalance?implements?LoadBalance?{
????private?static?Logger?logger?=?LoggerFactory.getLogger(FullRoundBalance.class);
????private?volatile?int?index;
????@Override
????public?synchronized?Service?chooseOne(List?services) ?{
????????//?加鎖防止多線程情況下,index超出services.size()
????????if?(index?==?services.size())?{
????????????index?=?0;
????????}
????????return?services.get(index++);
????}
}

/**
?*?@author?2YSP
?*?@date?2020/7/26?15:13
?*/
@ConfigurationProperties(prefix?=?"sp.rpc")
public?class?RpcConfig?{
????/**
?????*?服務(wù)注冊中心地址
?????*/
????private?String?registerAddress?=?"127.0.0.1:2181";
????/**
?????*?服務(wù)暴露端口
?????*/
????private?Integer?serverPort?=?9999;
????/**
?????*?服務(wù)協(xié)議
?????*/
????private?String?protocol?=?"java";
????/**
?????*?負(fù)載均衡算法
?????*/
????private?String?loadBalance?=?"random";
????/**
?????*?權(quán)重,默認(rèn)為1
?????*/
????private?Integer?weight?=?1;
???//?省略getter?setter
}
/**
?????*?使用spi匹配符合配置的負(fù)載均衡算法
?????*
?????*?@param?name
?????*?@return
?????*/
????private?LoadBalance?getLoadBalance(String?name)?{
????????ServiceLoader?loader?=?ServiceLoader.load(LoadBalance.class);
????????Iterator?iterator?=?loader.iterator();
????????while?(iterator.hasNext())?{
????????????LoadBalance?loadBalance?=?iterator.next();
????????????LoadBalanceAno?ano?=?loadBalance.getClass().getAnnotation(LoadBalanceAno.class);
????????????Assert.notNull(ano,?"load?balance?name?can?not?be?empty!");
????????????if?(name.equals(ano.value()))?{
????????????????return?loadBalance;
????????????}
????????}
????????throw?new?RpcException("invalid?load?balance?config");
????}
?@Bean
????public?ClientProxyFactory?proxyFactory(@Autowired?RpcConfig?rpcConfig)?{
????????ClientProxyFactory?clientProxyFactory?=?new?ClientProxyFactory();
????????//?設(shè)置服務(wù)發(fā)現(xiàn)著
????????clientProxyFactory.setServerDiscovery(new???????????ZookeeperServerDiscovery(rpcConfig.getRegisterAddress()));
????????//?設(shè)置支持的協(xié)議
????????Map?supportMessageProtocols?=?buildSupportMessageProtocols();
????????clientProxyFactory.setSupportMessageProtocols(supportMessageProtocols);
????????//?設(shè)置負(fù)載均衡算法
????????LoadBalance?loadBalance?=?getLoadBalance(rpcConfig.getLoadBalance());
????????clientProxyFactory.setLoadBalance(loadBalance);
????????//?設(shè)置網(wǎng)絡(luò)層實(shí)現(xiàn)
????????clientProxyFactory.setNetClient(new?NettyNetClient());
????????return?clientProxyFactory;
????}
3.2本地服務(wù)列表緩存
/**
?*?服務(wù)發(fā)現(xiàn)本地緩存
?*/
public?class?ServerDiscoveryCache?{
????/**
?????*?key:?serviceName
?????*/
????private?static?final?Map>?SERVER_MAP?=?new?ConcurrentHashMap<>();
????/**
?????*?客戶端注入的遠(yuǎn)程服務(wù)service?class
?????*/
????public?static?final?List?SERVICE_CLASS_NAMES?=?new?ArrayList<>();
????public?static?void?put(String?serviceName,?List?serviceList) ?{
????????SERVER_MAP.put(serviceName,?serviceList);
????}
????/**
?????*?去除指定的值
?????*?@param?serviceName
?????*?@param?service
?????*/
????public?static?void?remove(String?serviceName,?Service?service)?{
????????SERVER_MAP.computeIfPresent(serviceName,?(key,?value)?->
????????????????value.stream().filter(o?->?!o.toString().equals(service.toString())).collect(Collectors.toList())
????????);
????}
????public?static?void?removeAll(String?serviceName)?{
????????SERVER_MAP.remove(serviceName);
????}
????public?static?boolean?isEmpty(String?serviceName)?{
????????return?SERVER_MAP.get(serviceName)?==?null?||?SERVER_MAP.get(serviceName).size()?==?0;
????}
????public?static?List?get(String?serviceName)? {
????????return?SERVER_MAP.get(serviceName);
????}
}
/**
?????*?根據(jù)服務(wù)名獲取可用的服務(wù)地址列表
?????*?@param?serviceName
?????*?@return
?????*/
????private?List?getServiceList(String?serviceName)? {
????????List?services;
????????synchronized?(serviceName){
????????????if?(ServerDiscoveryCache.isEmpty(serviceName))?{
????????????????services?=?serverDiscovery.findServiceList(serviceName);
????????????????if?(services?==?null?||?services.size()?==?0)?{
????????????????????throw?new?RpcException("No?provider?available!");
????????????????}
????????????????ServerDiscoveryCache.put(serviceName,?services);
????????????}?else?{
????????????????services?=?ServerDiscoveryCache.get(serviceName);
????????????}
????????}
????????return?services;
????}
/**
?*?Rpc處理者,支持服務(wù)啟動(dòng)暴露,自動(dòng)注入Service
?*?@author?2YSP
?*?@date?2020/7/26?14:46
?*/
public?class?DefaultRpcProcessor?implements?ApplicationListener<ContextRefreshedEvent>?{
???
????@Override
????public?void?onApplicationEvent(ContextRefreshedEvent?event)?{
????????//?Spring啟動(dòng)完畢過后會(huì)收到一個(gè)事件通知
????????if?(Objects.isNull(event.getApplicationContext().getParent())){
????????????ApplicationContext?context?=?event.getApplicationContext();
????????????//?開啟服務(wù)
????????????startServer(context);
????????????//?注入Service
????????????injectService(context);
????????}
????}
????private?void?injectService(ApplicationContext?context)?{
????????String[]?names?=?context.getBeanDefinitionNames();
????????for(String?name?:?names){
????????????Class>?clazz?=?context.getType(name);
????????????if?(Objects.isNull(clazz)){
????????????????continue;
????????????}
????????????Field[]?declaredFields?=?clazz.getDeclaredFields();
????????????for(Field?field?:?declaredFields){
????????????????//?找出標(biāo)記了InjectService注解的屬性
????????????????InjectService?injectService?=?field.getAnnotation(InjectService.class);
????????????????if?(injectService?==?null){
????????????????????continue;
????????????????}
????????????????Class>?fieldClass?=?field.getType();
????????????????Object?object?=?context.getBean(name);
????????????????field.setAccessible(true);
????????????????try?{
????????????????????field.set(object,clientProxyFactory.getProxy(fieldClass));
????????????????}?catch?(IllegalAccessException?e)?{
????????????????????e.printStackTrace();
????????????????}
????//?添加本地服務(wù)緩存
????????????????ServerDiscoveryCache.SERVICE_CLASS_NAMES.add(fieldClass.getName());
????????????}
????????}
????????//?注冊子節(jié)點(diǎn)監(jiān)聽
????????if?(clientProxyFactory.getServerDiscovery()?instanceof?ZookeeperServerDiscovery){
????????????ZookeeperServerDiscovery?serverDiscovery?=?(ZookeeperServerDiscovery)?clientProxyFactory.getServerDiscovery();
????????????ZkClient?zkClient?=?serverDiscovery.getZkClient();
????????????ServerDiscoveryCache.SERVICE_CLASS_NAMES.forEach(name?->{
????????????????String?servicePath?=?RpcConstant.ZK_SERVICE_PATH?+?RpcConstant.PATH_DELIMITER?+?name?+?"/service";
????????????????zkClient.subscribeChildChanges(servicePath,?new?ZkChildListenerImpl());
????????????});
????????????logger.info("subscribe?service?zk?node?successfully");
????????}
????}
????private?void?startServer(ApplicationContext?context)?{
????????...
????}
}
/**
?*?子節(jié)點(diǎn)事件監(jiān)聽處理類
?*/
public?class?ZkChildListenerImpl?implements?IZkChildListener?{
????private?static?Logger?logger?=?LoggerFactory.getLogger(ZkChildListenerImpl.class);
????/**
?????*?監(jiān)聽子節(jié)點(diǎn)的刪除和新增事件
?????*?@param?parentPath?/rpc/serviceName/service
?????*?@param?childList
?????*?@throws?Exception
?????*/
????@Override
????public?void?handleChildChange(String?parentPath,?List?childList) ?throws?Exception?{
????????logger.debug("Child?change?parentPath:[{}]?--?childList:[{}]",?parentPath,?childList);
????????//?只要子節(jié)點(diǎn)有改動(dòng)就清空緩存
????????String[]?arr?=?parentPath.split("/");
????????ServerDiscoveryCache.removeAll(arr[2]);
????}
}
3.3nettyClient支持TCP長連接

/**
?*?@author?2YSP
?*?@date?2020/7/25?20:12
?*/
public?class?NettyNetClient?implements?NetClient?{
????private?static?Logger?logger?=?LoggerFactory.getLogger(NettyNetClient.class);
????private?static?ExecutorService?threadPool?=?new?ThreadPoolExecutor(4,?10,?200,
????????????TimeUnit.SECONDS,?new?LinkedBlockingQueue<>(1000),?new?ThreadFactoryBuilder()
????????????.setNameFormat("rpcClient-%d")
????????????.build());
????private?EventLoopGroup?loopGroup?=?new?NioEventLoopGroup(4);
????/**
?????*?已連接的服務(wù)緩存
?????* key:?服務(wù)地址,格式:ip:port
?????*/
????public?static?Map?connectedServerNodes?=?new?ConcurrentHashMap<>();
????@Override
????public?byte[]?sendRequest(byte[]?data,?Service?service)?throws?InterruptedException?{
??....
????????return?respData;
????}
????@Override
????public?RpcResponse?sendRequest(RpcRequest?rpcRequest,?Service?service,?MessageProtocol?messageProtocol)?{
????????String?address?=?service.getAddress();
????????synchronized?(address)?{
????????????if?(connectedServerNodes.containsKey(address))?{
????????????????SendHandlerV2?handler?=?connectedServerNodes.get(address);
????????????????logger.info("使用現(xiàn)有的連接");
????????????????return?handler.sendRequest(rpcRequest);
????????????}
????????????String[]?addrInfo?=?address.split(":");
????????????final?String?serverAddress?=?addrInfo[0];
????????????final?String?serverPort?=?addrInfo[1];
????????????final?SendHandlerV2?handler?=?new?SendHandlerV2(messageProtocol,?address);
????????????threadPool.submit(()?->?{
????????????????????????//?配置客戶端
????????????????????????Bootstrap?b?=?new?Bootstrap();
????????????????????????b.group(loopGroup).channel(NioSocketChannel.class)
????????????????????????????????.option(ChannelOption.TCP_NODELAY,?true)
????????????????????????????????.handler(new?ChannelInitializer<SocketChannel>()?{
????????????????????????????????????@Override
????????????????????????????????????protected?void?initChannel(SocketChannel?socketChannel)?throws?Exception?{
????????????????????????????????????????ChannelPipeline?pipeline?=?socketChannel.pipeline();
????????????????????????????????????????pipeline
????????????????????????????????????????????????.addLast(handler);
????????????????????????????????????}
????????????????????????????????});
????????????????????????//?啟用客戶端連接
????????????????????????ChannelFuture?channelFuture?=?b.connect(serverAddress,?Integer.parseInt(serverPort));
????????????????????????channelFuture.addListener(new?ChannelFutureListener()?{
????????????????????????????@Override
????????????????????????????public?void?operationComplete(ChannelFuture?channelFuture)?throws?Exception?{
????????????????????????????????connectedServerNodes.put(address,?handler);
????????????????????????????}
????????????????????????});
????????????????????}
????????????);
????????????logger.info("使用新的連接。。。");
????????????return?handler.sendRequest(rpcRequest);
????????}
????}
}
/**
?*?@author?2YSP
?*?@date?2020/8/19?20:06
?*/
public?class?SendHandlerV2?extends?ChannelInboundHandlerAdapter?{
????private?static?Logger?logger?=?LoggerFactory.getLogger(SendHandlerV2.class);
????/**
?????*?等待通道建立最大時(shí)間
?????*/
????static?final?int?CHANNEL_WAIT_TIME?=?4;
????/**
?????*?等待響應(yīng)最大時(shí)間
?????*/
????static?final?int?RESPONSE_WAIT_TIME?=?8;
????private?volatile?Channel?channel;
????private?String?remoteAddress;
????private?static?Map>?requestMap?=?new?ConcurrentHashMap<>();
????private?MessageProtocol?messageProtocol;
????private?CountDownLatch?latch?=?new?CountDownLatch(1);
????public?SendHandlerV2(MessageProtocol?messageProtocol,String?remoteAddress)?{
????????this.messageProtocol?=?messageProtocol;
????????this.remoteAddress?=?remoteAddress;
????}
????@Override
????public?void?channelRegistered(ChannelHandlerContext?ctx)?throws?Exception?{
????????this.channel?=?ctx.channel();
????????latch.countDown();
????}
????@Override
????public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
????????logger.debug("Connect?to?server?successfully:{}",?ctx);
????}
????@Override
????public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{
????????logger.debug("Client?reads?message:{}",?msg);
????????ByteBuf?byteBuf?=?(ByteBuf)?msg;
????????byte[]?resp?=?new?byte[byteBuf.readableBytes()];
????????byteBuf.readBytes(resp);
????????//?手動(dòng)回收
????????ReferenceCountUtil.release(byteBuf);
????????RpcResponse?response?=?messageProtocol.unmarshallingResponse(resp);
????????RpcFuture?future?=?requestMap.get(response.getRequestId());
????????future.setResponse(response);
????}
????@Override
????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{
????????cause.printStackTrace();
????????logger.error("Exception?occurred:{}",?cause.getMessage());
????????ctx.close();
????}
????@Override
????public?void?channelReadComplete(ChannelHandlerContext?ctx)?throws?Exception?{
????????ctx.flush();
????}
????@Override
????public?void?channelInactive(ChannelHandlerContext?ctx)?throws?Exception?{
????????super.channelInactive(ctx);
????????logger.error("channel?inactive?with?remoteAddress:[{}]",remoteAddress);
????????NettyNetClient.connectedServerNodes.remove(remoteAddress);
????}
????@Override
????public?void?userEventTriggered(ChannelHandlerContext?ctx,?Object?evt)?throws?Exception?{
????????super.userEventTriggered(ctx,?evt);
????}
????public?RpcResponse?sendRequest(RpcRequest?request)?{
????????RpcResponse?response;
????????RpcFuture?future?=?new?RpcFuture<>();
????????requestMap.put(request.getRequestId(),?future);
????????try?{
????????????byte[]?data?=?messageProtocol.marshallingRequest(request);
????????????ByteBuf?reqBuf?=?Unpooled.buffer(data.length);
????????????reqBuf.writeBytes(data);
????????????if?(latch.await(CHANNEL_WAIT_TIME,TimeUnit.SECONDS)){
????????????????channel.writeAndFlush(reqBuf);
????????????????//?等待響應(yīng)
????????????????response?=?future.get(RESPONSE_WAIT_TIME,?TimeUnit.SECONDS);
????????????}else?{
????????????????throw?new?RpcException("establish?channel?time?out");
????????????}
????????}?catch?(Exception?e)?{
????????????throw?new?RpcException(e.getMessage());
????????}?finally?{
????????????requestMap.remove(request.getRequestId());
????????}
????????return?response;
????}
}
package?cn.sp.rpc.client.net;
import?java.util.concurrent.*;
/**
?*?@author?2YSP
?*?@date?2020/8/19?22:31
?*/
public?class?RpcFuture<T>?implements?Future<T>?{
????private?T?response;
????/**
?????*?因?yàn)檎埱蠛晚憫?yīng)是一一對應(yīng)的,所以這里是1
?????*/
????private?CountDownLatch?countDownLatch?=?new?CountDownLatch(1);
????/**
?????*?Future的請求時(shí)間,用于計(jì)算Future是否超時(shí)
?????*/
????private?long?beginTime?=?System.currentTimeMillis();
????@Override
????public?boolean?cancel(boolean?mayInterruptIfRunning)?{
????????return?false;
????}
????@Override
????public?boolean?isCancelled()?{
????????return?false;
????}
????@Override
????public?boolean?isDone()?{
????????if?(response?!=?null)?{
????????????return?true;
????????}
????????return?false;
????}
????/**
?????*?獲取響應(yīng),直到有結(jié)果才返回
?????*?@return
?????*?@throws?InterruptedException
?????*?@throws?ExecutionException
?????*/
????@Override
????public?T?get()?throws?InterruptedException,?ExecutionException?{
????????countDownLatch.await();
????????return?response;
????}
????@Override
????public?T?get(long?timeout,?TimeUnit?unit)?throws?InterruptedException,?ExecutionException,?TimeoutException?{
????????if?(countDownLatch.await(timeout,unit)){
????????????return?response;
????????}
????????return?null;
????}
????public?void?setResponse(T?response)?{
????????this.response?=?response;
????????countDownLatch.countDown();
????}
????public?long?getBeginTime()?{
????????return?beginTime;
????}
}
四、壓力測試
(英特爾)Intel(R) Core(TM) i5-6300HQ CPU @ 2.30GHz 4核 windows10家庭版(64位) 16G內(nèi)存
ab?-c?4?-n?10000?http://localhost:8080/test/user?id=1

https://github.com/2YSP/rpc-spring-boot-starter
推薦閱讀:
程序員牛逼的摸魚神器來了?上班也可以在看股票、基金實(shí)時(shí)數(shù)據(jù)~
微信掃描二維碼,關(guān)注我的公眾號
朕已閱?

