自己寫了個 RPC ,真牛逼!
點擊上方藍色字體,選擇“標星公眾號”
優(yōu)質文章,第一時間送達
RPC(remote procedure call)遠程過程調用
RPC是為了在分布式應用中,兩臺主機的Java進程進行通信,當A主機調用B主機的方法時,過程簡潔,就像是調用自己進程里的方法一樣。
RPC框架的職責就是,封裝好底層調用的細節(jié),客戶端只要調用方法,就能夠獲取服務提供者的響應,方便開發(fā)者編寫代碼。
RPC底層使用的是TCP協(xié)議,服務端和客戶端和點對點通信。
作用
在RPC的應用場景中,客戶端調用服務端的代碼
客戶端需要有相應的api接口,將方法名、方法參數(shù)類型、具體參數(shù)等等都發(fā)送給服務端
服務端需要有方法的具體實現(xiàn),在接收到客戶端的請求后,根據(jù)信息調用對應的方法,并返回響應給客戶端
?
?
流程圖演示
?

?
?
代碼實現(xiàn)
首先客戶端要知道服務端的接口,然后封裝一個請求對象,發(fā)送給服務端
要調用一個方法需要有:方法名、方法參數(shù)類型、具體參數(shù)、執(zhí)行方法的類名
@Data
public?class?RpcRequest?{
????private?String??methodName;
????private?String?className;
????private?Class[]?paramType;
????private?Object[]?args;
}
由服務端返回給客戶端的響應(方法調用結果)也使用一個對象進行封裝
@Data
public?class?RpcResponse?{
????private?int?code;
????private?Object?result;
}
如果是在多線程調用中,需要具體把每個響應返回給對應的請求,可以加一個ID進行標識
將對象通過網絡傳輸,需要先進行序列化操作,這里使用的是jackson工具
????????????com.fasterxml.jackson.core
????????????jackson-databind
????????????2.11.4
public?class?JsonSerialization?{
????private?static?ObjectMapper?objectMapper?=?new?ObjectMapper();
????static?{
????????objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
????????objectMapper.disable(SerializationFeature.WRITE_DATE_KEYS_AS_TIMESTAMPS);
????????objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
????????objectMapper.setDateFormat(new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss"));
????????objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
????}
????public?static?byte[]?serialize(Object?output)?throws?JsonProcessingException?{
????????byte[]?bytes?=?objectMapper.writeValueAsBytes(output);
????????return?bytes;
????}
????public?static?Object?deserialize(byte[]?input,Class?clazz)?throws?IOException?{
????????Object?parse?=?objectMapper.readValue(input,clazz);
????????return?parse;
????}
}在反序列化過程中,需要指定要轉化的類型,而服務端接收request,客戶端接收response,二者類型是不一樣的,所以在后續(xù)傳輸時指定類型
有了需要傳輸?shù)臄?shù)據(jù)后,使用Netty開啟網絡服務進行傳輸
服務端
綁定端口號,開啟連接
public?class?ServerNetty?{
????public?static?void?connect(int?port)?throws?InterruptedException?{
????????EventLoopGroup?workGroup?=?new?NioEventLoopGroup();
????????EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();
????????ServerBootstrap?bootstrap?=?new?ServerBootstrap();
????????bootstrap.channel(NioServerSocketChannel.class)
????????????????.group(bossGroup,workGroup)
????????????????.childHandler(new?ChannelInitializer()?{
????????????????????@Override
????????????????????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
????????????????????????/**
?????????????????????????*?加入自定義協(xié)議的數(shù)據(jù)處理器,指定接收到的數(shù)據(jù)類型
?????????????????????????*?加入服務端處理器
?????????????????????????*/
????????????????????????ch.pipeline().addLast(new?NettyProtocolHandler(RpcRequest.class));
????????????????????????ch.pipeline().addLast(new?ServerHandler());
????????????????????}
????????????????});
????????bootstrap.bind(port).sync();
????}
}
Netty中綁定了兩個數(shù)據(jù)處理器
一個是數(shù)據(jù)處理器,服務端接收到請求->調用方法->返回響應,這些過程都在數(shù)據(jù)處理器中執(zhí)行
public?class?ServerHandler?extends?SimpleChannelInboundHandler?{
????@Override
????protected?void?channelRead0(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{
????????RpcRequest?rpcRequest?=?(RpcRequest)msg;
????????//?獲取使用反射需要的各個參數(shù)
????????String?methodName?=?rpcRequest.getMethodName();
????????Class[]?paramTypes?=?rpcRequest.getParamType();
????????Object[]?args?=?rpcRequest.getArgs();
????????String?className?=?rpcRequest.getClassName();
????????//從注冊中心容器中獲取對象
????????Object?object?=?Server.hashMap.get(className);
????????Method?method?=?object.getClass().getMethod(methodName,paramTypes);
????????//反射調用方法
???????String?result?=?(String)?method.invoke(object,args);
????????//?將響應結果封裝好后發(fā)送回去
????????RpcResponse?rpcResponse?=?new?RpcResponse();
????????rpcResponse.setCode(200);
????????rpcResponse.setResult(result);
????????ctx.writeAndFlush(rpcResponse);
????}
}
這里從hash表中獲取對象,有一個預先進行的操作:將有可能被遠程調用的對象放入容器中,等待使用
一個是自定義的TCP協(xié)議處理器,為了解決TCP的常見問題:因為客戶端發(fā)送的數(shù)據(jù)包和服務端接收數(shù)據(jù)緩沖區(qū)之間,大小不匹配導致的粘包、拆包問題。
/**
?*?網絡傳輸?shù)淖远xTCP協(xié)議
?*?發(fā)送時:為傳輸?shù)淖止?jié)流添加兩個魔數(shù)作為頭部,再計算數(shù)據(jù)的長度,將數(shù)據(jù)長度也添加到頭部,最后才是數(shù)據(jù)
?*?接收時:識別出兩個魔數(shù)后,下一個就是首部,最后使用長度對應的字節(jié)數(shù)組接收數(shù)據(jù)
?*/
public?class?NettyProtocolHandler?extends?ChannelDuplexHandler?{
????private?static?final?byte[]?MAGIC?=?new?byte[]{0x15,0x66};
????private?Class?decodeType;
????public?NettyProtocolHandler()?{
????}
????public?NettyProtocolHandler(Class?decodeType){
????????this.decodeType?=?decodeType;
????}
????@Override
????public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{
????????ByteBuf?in?=?(ByteBuf)?msg;
????????//接收響應對象
????????Object?dstObject;
????????byte[]?header?=?new?byte[2];
????????in.readBytes(header);
????????byte[]?lenByte?=?new?byte[4];
????????in.readBytes(lenByte);
????????int?len?=?ByteUtils.Bytes2Int_BE(lenByte);
????????byte[]?object?=?new?byte[len];
????????in.readBytes(object);
????????dstObject?=?JsonSerialization.deserialize(object,?decodeType);
????????//交給下一個數(shù)據(jù)處理器
????????ctx.fireChannelRead(dstObject);
????}
????@Override
????public?void?write(ChannelHandlerContext?ctx,?Object?msg,?ChannelPromise?promise)?throws?Exception?{
????????ByteBuf?byteBuf?=?Unpooled.buffer();
????????//寫入魔數(shù)
????????byteBuf.writeBytes(MAGIC);
????????byte[]?object?=?JsonSerialization.serialize(msg);
????????//數(shù)據(jù)長度轉化為字節(jié)數(shù)組并寫入
????????int?len?=?object.length;
????????byte[]?bodyLen?=?ByteUtils.int2bytes(len);
????????byteBuf.writeBytes(bodyLen);
????????//寫入對象
????????byteBuf.writeBytes(object);
????????ctx.writeAndFlush(byteBuf);
????}
}
這個數(shù)據(jù)處理器是服務端和客戶端都要使用的,就相當于是一個雙方定好傳輸數(shù)據(jù)要遵守的協(xié)議
在這里進行了對象的序列化和反序列化,所以反序列化類型在這個處理器中指定
這里面要將數(shù)據(jù)的長度發(fā)送,需一個將整數(shù)類型轉化為字節(jié)類型的工具
?轉化數(shù)據(jù)工具類
public?class?ByteUtils?{
????/**?short2\u5B57\u8282\u6570\u7EC4?*/
????public?static?byte[]?short2bytes(short?v)?{
????????byte[]?b?=?new?byte[4];
????????b[1]?=?(byte)?v;
????????b[0]?=?(byte)?(v?>>>?8);
????????return?b;
????}
????/**?int4\u5B57\u8282\u6570\u7EC4?*/
????public?static?byte[]?int2bytes(int?v)?{
????????byte[]?b?=?new?byte[4];
????????b[3]?=?(byte)?v;
????????b[2]?=?(byte)?(v?>>>?8);
????????b[1]?=?(byte)?(v?>>>?16);
????????b[0]?=?(byte)?(v?>>>?24);
????????return?b;
????}
????/**?long8\u5B57\u8282\u6570\u7EC4?*/
????public?static?byte[]?long2bytes(long?v)?{
????????byte[]?b?=?new?byte[8];
????????b[7]?=?(byte)?v;
????????b[6]?=?(byte)?(v?>>>?8);
????????b[5]?=?(byte)?(v?>>>?16);
????????b[4]?=?(byte)?(v?>>>?24);
????????b[3]?=?(byte)?(v?>>>?32);
????????b[2]?=?(byte)?(v?>>>?40);
????????b[1]?=?(byte)?(v?>>>?48);
????????b[0]?=?(byte)?(v?>>>?56);
????????return?b;
????}
????/**?\u5B57\u8282\u6570\u7EC4\u8F6C\u5B57\u7B26\u4E32?*/
????public?static?String?bytesToHexString(byte[]?bs)?{
????????if?(bs?==?null?||?bs.length?==?0)?{
????????????return?null;
????????}
????????StringBuffer?sb?=?new?StringBuffer();
????????String?tmp?=?null;
????????for?(byte?b?:?bs)?{
????????????tmp?=?Integer.toHexString(Byte.toUnsignedInt(b));
????????????if?(tmp.length()?2)?{
????????????????sb.append(0);
????????????}
????????????sb.append(tmp);
????????}
????????return?sb.toString();
????}
????/**
?????*?@return
?????*/
????public?static?int?Bytes2Int_BE(byte[]?bytes)?{
????????if(bytes.length?4){
????????????return?-1;
????????}
????????int?iRst?=?(bytes[0]?<24)?&?0xFF;
????????iRst?|=?(bytes[1]?<16)?&?0xFF;
????????iRst?|=?(bytes[2]?<8)?&?0xFF;
????????iRst?|=?bytes[3]?&?0xFF;
????????return?iRst;
????}
????/**
?????*?long\u8F6C8\u5B57\u8282\u6570\u7EC4
?????*/
????public?static?long?bytes2long(byte[]?b)?{
????????ByteBuffer?buffer?=?ByteBuffer.allocate(8);
????????buffer.put(b,?0,?b.length);
????????buffer.flip();//?need?flip
????????return?buffer.getLong();
????}
}
客戶端
將Netty的操作封裝了起來,最后返回一個Channle類型,由它進行發(fā)送數(shù)據(jù)的操作
public?class?ClientNetty?{
????public?static?Channel?connect(String?host,int?port)?throws?InterruptedException?{
????????InetSocketAddress?address?=?new?InetSocketAddress(host,port);
????????EventLoopGroup?workGroup?=?new?NioEventLoopGroup();
????????Bootstrap?bootstrap?=?new?Bootstrap();
????????????bootstrap.channel(NioSocketChannel.class)
????????????????????.group(workGroup)
????????????????????.handler(new?ChannelInitializer()?{
????????????????????????@Override
????????????????????????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
????????????????????????????//自定義協(xié)議handler(客戶端接收的是response)
????????????????????????????ch.pipeline().addLast(new?NettyProtocolHandler(RpcResponse.class));
????????????????????????????//處理數(shù)據(jù)handler
????????????????????????????ch.pipeline().addLast(new?ClientHandler());
????????????????????????}
????????????????????});
????????????Channel?channel?=?bootstrap.connect(address).sync().channel();
????????????return?channel;
????}
}
數(shù)據(jù)處理器負責接收response,并將響應結果放入在future中,future的使用在后續(xù)的動態(tài)代理中
public?class?ClientHandler?extends?SimpleChannelInboundHandler?{
????@Override
????protected?void?channelRead0(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{
????????RpcResponse?rpcResponse?=?(RpcResponse)?msg;
????????//服務端正常情況返回碼為200
????????if(rpcResponse.getCode()?!=?200){
????????????throw?new?Exception();
????????}
????????//將結果放到future里
????????RPCInvocationHandler.future.complete(rpcResponse.getResult());
????}
????@Override
????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{
????????super.exceptionCaught(ctx,?cause);
????}
}
要讓客戶端在調用遠程方法時像調用本地方法一樣,就需要一個代理對象,供客戶端調用,讓代理對象去調用服務端的實現(xiàn)。
代理對象構造
public?class?ProxyFactory?{
????public?static?Object?getProxy(Class>[]?interfaces){
????????return?Proxy.newProxyInstance(ProxyFactory.class.getClassLoader(),
????????????????interfaces,
????????????????new?RPCInvocationHandler());
????}
}
客戶端代理對象的方法執(zhí)行
將request發(fā)送給服務端后,一直阻塞,等到future里面有了結果為止。
public?class?RPCInvocationHandler?implements?InvocationHandler?{
????static?public?CompletableFuture?future;
????static?Channel?channel;
????static?{
????????future?=?new?CompletableFuture();
????????//開啟netty網絡服務
????????try?{
????????????channel?=?ClientNetty.connect("127.0.0.1",8989);
????????}?catch?(InterruptedException?e)?{
????????????e.printStackTrace();
????????}
????}
????@Override
????public?Object?invoke(Object?proxy,?Method?method,?Object[]?args)?throws?Throwable?{
????????RpcRequest?rpcRequest?=?new?RpcRequest();
????????rpcRequest.setArgs(args);
????????rpcRequest.setMethodName(method.getName());
????????rpcRequest.setParamType(method.getParameterTypes());
????????rpcRequest.setClassName(method.getDeclaringClass().getSimpleName());
???????channel.writeAndFlush(rpcRequest);
????????//一個阻塞操作,等待網絡傳輸?shù)慕Y果
???????String?result?=?(String)?future.get();
????????return?result;
????}
}
這里用static修飾future和channle,沒有考慮到客戶端去連接多個服務端和多次遠程調用
可以使用一個hash表,存儲與不同服務端對應的channle,每次調用時從hash表中獲取即可
用hash表存儲與不同request對應的future,每個響應的結果與之對應
客戶端
要進行遠程調用需要擁有的接口
public?interface?OrderService?{
????public?String?buy();
}預先的操作和測試代碼
public?class?Client?{
????static?OrderService?orderService;
????public?static?void?main(String[]?args)?throws?InterruptedException?{
????????//創(chuàng)建一個代理對象給進行遠程調用的類
????????orderService?=?(OrderService)?ProxyFactory.getProxy(new?Class[]{OrderService.class});
????????String?result?=?orderService.buy();
????????System.out.println(result);
????}
}
服務端
要接受遠程調用需要擁有的具體實現(xiàn)類
public?class?OrderImpl?implements?OrderService?{
????public?OrderImpl()?{
????}
????@Override
????public?String?buy()?{
????????System.out.println("調用buy方法");
????????return?"調用buy方法成功";
????}
}
預先操作和測試代碼
public?class?Server?{
???public?static?HashMap?hashMap?=?new?HashMap<>();
????public?static?void?main(String[]?args)?throws?InterruptedException?{
????????//開啟netty網絡服務
????????ServerNetty.connect(8989);
????????//提前將需要開放的服務注冊到hash表中
????????hashMap.put("OrderService",new?OrderImpl());
????}
}
?
執(zhí)行結果

?
?
?
? 作者?|??劃水的魚dm
來源 |??cnblogs.com/davidFB/p/15481823.html

