<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          自己寫了個 RPC ,真牛逼!

          共 6657字,需瀏覽 14分鐘

           ·

          2021-11-11 21:41

          點擊上方藍色字體,選擇“標星公眾號”

          優(yōu)質文章,第一時間送達

          RPC(remote procedure call)遠程過程調用

          RPC是為了在分布式應用中,兩臺主機的Java進程進行通信,當A主機調用B主機的方法時,過程簡潔,就像是調用自己進程里的方法一樣。
          RPC框架的職責就是,封裝好底層調用的細節(jié),客戶端只要調用方法,就能夠獲取服務提供者的響應,方便開發(fā)者編寫代碼。
          RPC底層使用的是TCP協(xié)議,服務端和客戶端和點對點通信。

          作用

          在RPC的應用場景中,客戶端調用服務端的代碼

          客戶端需要有相應的api接口,將方法名、方法參數(shù)類型、具體參數(shù)等等都發(fā)送給服務端

          服務端需要有方法的具體實現(xiàn),在接收到客戶端的請求后,根據(jù)信息調用對應的方法,并返回響應給客戶端

          ?

          ?

          流程圖演示

          ?

          ?

          ?

          代碼實現(xiàn)

          首先客戶端要知道服務端的接口,然后封裝一個請求對象,發(fā)送給服務端

          要調用一個方法需要有:方法名、方法參數(shù)類型、具體參數(shù)、執(zhí)行方法的類名

          @Data
          public?class?RpcRequest?{

          ????private?String??methodName;

          ????private?String?className;

          ????private?Class[]?paramType;

          ????private?Object[]?args;
          }

          由服務端返回給客戶端的響應(方法調用結果)也使用一個對象進行封裝

          @Data
          public?class?RpcResponse?{

          ????private?int?code;

          ????private?Object?result;
          }
          • 如果是在多線程調用中,需要具體把每個響應返回給對應的請求,可以加一個ID進行標識

          將對象通過網絡傳輸,需要先進行序列化操作,這里使用的是jackson工具


          ????????????com.fasterxml.jackson.core
          ????????????jackson-databind
          ????????????2.11.4

          public?class?JsonSerialization?{

          ????private?static?ObjectMapper?objectMapper?=?new?ObjectMapper();

          ????static?{
          ????????objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
          ????????objectMapper.disable(SerializationFeature.WRITE_DATE_KEYS_AS_TIMESTAMPS);
          ????????objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
          ????????objectMapper.setDateFormat(new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss"));
          ????????objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
          ????}

          ????public?static?byte[]?serialize(Object?output)?throws?JsonProcessingException?{
          ????????byte[]?bytes?=?objectMapper.writeValueAsBytes(output);
          ????????return?bytes;
          ????}

          ????public?static?Object?deserialize(byte[]?input,Class?clazz)?throws?IOException?{
          ????????Object?parse?=?objectMapper.readValue(input,clazz);
          ????????return?parse;
          ????}
          }
          • 在反序列化過程中,需要指定要轉化的類型,而服務端接收request,客戶端接收response,二者類型是不一樣的,所以在后續(xù)傳輸時指定類型

          有了需要傳輸?shù)臄?shù)據(jù)后,使用Netty開啟網絡服務進行傳輸

          服務端

          綁定端口號,開啟連接

          public?class?ServerNetty?{

          ????public?static?void?connect(int?port)?throws?InterruptedException?{

          ????????EventLoopGroup?workGroup?=?new?NioEventLoopGroup();
          ????????EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();

          ????????ServerBootstrap?bootstrap?=?new?ServerBootstrap();
          ????????bootstrap.channel(NioServerSocketChannel.class)
          ????????????????.group(bossGroup,workGroup)
          ????????????????.childHandler(new?ChannelInitializer()?{
          ????????????????????@Override
          ????????????????????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
          ????????????????????????/**
          ?????????????????????????*?加入自定義協(xié)議的數(shù)據(jù)處理器,指定接收到的數(shù)據(jù)類型
          ?????????????????????????*?加入服務端處理器
          ?????????????????????????*/
          ????????????????????????ch.pipeline().addLast(new?NettyProtocolHandler(RpcRequest.class));

          ????????????????????????ch.pipeline().addLast(new?ServerHandler());
          ????????????????????}
          ????????????????});

          ????????bootstrap.bind(port).sync();
          ????}
          }

          Netty中綁定了兩個數(shù)據(jù)處理器

          一個是數(shù)據(jù)處理器,服務端接收到請求->調用方法->返回響應,這些過程都在數(shù)據(jù)處理器中執(zhí)行

          public?class?ServerHandler?extends?SimpleChannelInboundHandler?{
          ????@Override
          ????protected?void?channelRead0(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{

          ????????RpcRequest?rpcRequest?=?(RpcRequest)msg;

          ????????//?獲取使用反射需要的各個參數(shù)
          ????????String?methodName?=?rpcRequest.getMethodName();
          ????????Class[]?paramTypes?=?rpcRequest.getParamType();
          ????????Object[]?args?=?rpcRequest.getArgs();
          ????????String?className?=?rpcRequest.getClassName();

          ????????//從注冊中心容器中獲取對象
          ????????Object?object?=?Server.hashMap.get(className);

          ????????Method?method?=?object.getClass().getMethod(methodName,paramTypes);
          ????????//反射調用方法
          ???????String?result?=?(String)?method.invoke(object,args);


          ????????//?將響應結果封裝好后發(fā)送回去
          ????????RpcResponse?rpcResponse?=?new?RpcResponse();
          ????????rpcResponse.setCode(200);
          ????????rpcResponse.setResult(result);

          ????????ctx.writeAndFlush(rpcResponse);
          ????}
          }
          • 這里從hash表中獲取對象,有一個預先進行的操作:將有可能被遠程調用的對象放入容器中,等待使用

          一個是自定義的TCP協(xié)議處理器,為了解決TCP的常見問題:因為客戶端發(fā)送的數(shù)據(jù)包和服務端接收數(shù)據(jù)緩沖區(qū)之間,大小不匹配導致的粘包、拆包問題。

          /**
          ?*?網絡傳輸?shù)淖远xTCP協(xié)議
          ?*?發(fā)送時:為傳輸?shù)淖止?jié)流添加兩個魔數(shù)作為頭部,再計算數(shù)據(jù)的長度,將數(shù)據(jù)長度也添加到頭部,最后才是數(shù)據(jù)
          ?*?接收時:識別出兩個魔數(shù)后,下一個就是首部,最后使用長度對應的字節(jié)數(shù)組接收數(shù)據(jù)
          ?*/
          public?class?NettyProtocolHandler?extends?ChannelDuplexHandler?{

          ????private?static?final?byte[]?MAGIC?=?new?byte[]{0x15,0x66};

          ????private?Class?decodeType;

          ????public?NettyProtocolHandler()?{
          ????}

          ????public?NettyProtocolHandler(Class?decodeType){
          ????????this.decodeType?=?decodeType;
          ????}

          ????@Override
          ????public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{

          ????????ByteBuf?in?=?(ByteBuf)?msg;
          ????????//接收響應對象
          ????????Object?dstObject;

          ????????byte[]?header?=?new?byte[2];
          ????????in.readBytes(header);

          ????????byte[]?lenByte?=?new?byte[4];
          ????????in.readBytes(lenByte);

          ????????int?len?=?ByteUtils.Bytes2Int_BE(lenByte);

          ????????byte[]?object?=?new?byte[len];
          ????????in.readBytes(object);

          ????????dstObject?=?JsonSerialization.deserialize(object,?decodeType);
          ????????//交給下一個數(shù)據(jù)處理器
          ????????ctx.fireChannelRead(dstObject);

          ????}

          ????@Override
          ????public?void?write(ChannelHandlerContext?ctx,?Object?msg,?ChannelPromise?promise)?throws?Exception?{

          ????????ByteBuf?byteBuf?=?Unpooled.buffer();

          ????????//寫入魔數(shù)
          ????????byteBuf.writeBytes(MAGIC);

          ????????byte[]?object?=?JsonSerialization.serialize(msg);

          ????????//數(shù)據(jù)長度轉化為字節(jié)數(shù)組并寫入
          ????????int?len?=?object.length;

          ????????byte[]?bodyLen?=?ByteUtils.int2bytes(len);

          ????????byteBuf.writeBytes(bodyLen);

          ????????//寫入對象
          ????????byteBuf.writeBytes(object);

          ????????ctx.writeAndFlush(byteBuf);
          ????}
          }
          • 這個數(shù)據(jù)處理器是服務端和客戶端都要使用的,就相當于是一個雙方定好傳輸數(shù)據(jù)要遵守的協(xié)議

          • 在這里進行了對象的序列化和反序列化,所以反序列化類型在這個處理器中指定

          • 這里面要將數(shù)據(jù)的長度發(fā)送,需一個將整數(shù)類型轉化為字節(jié)類型的工具

          ?轉化數(shù)據(jù)工具類

          public?class?ByteUtils?{

          ????/**?short2\u5B57\u8282\u6570\u7EC4?*/
          ????public?static?byte[]?short2bytes(short?v)?{
          ????????byte[]?b?=?new?byte[4];
          ????????b[1]?=?(byte)?v;
          ????????b[0]?=?(byte)?(v?>>>?8);
          ????????return?b;
          ????}

          ????/**?int4\u5B57\u8282\u6570\u7EC4?*/
          ????public?static?byte[]?int2bytes(int?v)?{
          ????????byte[]?b?=?new?byte[4];
          ????????b[3]?=?(byte)?v;
          ????????b[2]?=?(byte)?(v?>>>?8);
          ????????b[1]?=?(byte)?(v?>>>?16);
          ????????b[0]?=?(byte)?(v?>>>?24);
          ????????return?b;
          ????}

          ????/**?long8\u5B57\u8282\u6570\u7EC4?*/
          ????public?static?byte[]?long2bytes(long?v)?{
          ????????byte[]?b?=?new?byte[8];
          ????????b[7]?=?(byte)?v;
          ????????b[6]?=?(byte)?(v?>>>?8);
          ????????b[5]?=?(byte)?(v?>>>?16);
          ????????b[4]?=?(byte)?(v?>>>?24);
          ????????b[3]?=?(byte)?(v?>>>?32);
          ????????b[2]?=?(byte)?(v?>>>?40);
          ????????b[1]?=?(byte)?(v?>>>?48);
          ????????b[0]?=?(byte)?(v?>>>?56);
          ????????return?b;
          ????}

          ????/**?\u5B57\u8282\u6570\u7EC4\u8F6C\u5B57\u7B26\u4E32?*/
          ????public?static?String?bytesToHexString(byte[]?bs)?{
          ????????if?(bs?==?null?||?bs.length?==?0)?{
          ????????????return?null;
          ????????}

          ????????StringBuffer?sb?=?new?StringBuffer();
          ????????String?tmp?=?null;
          ????????for?(byte?b?:?bs)?{
          ????????????tmp?=?Integer.toHexString(Byte.toUnsignedInt(b));
          ????????????if?(tmp.length()?????????????????sb.append(0);
          ????????????}
          ????????????sb.append(tmp);
          ????????}
          ????????return?sb.toString();
          ????}

          ????/**
          ?????*?@return
          ?????*/
          ????public?static?int?Bytes2Int_BE(byte[]?bytes)?{
          ????????if(bytes.length?????????????return?-1;
          ????????}
          ????????int?iRst?=?(bytes[0]?<????????iRst?|=?(bytes[1]?<????????iRst?|=?(bytes[2]?<????????iRst?|=?bytes[3]?&?0xFF;
          ????????return?iRst;
          ????}

          ????/**
          ?????*?long\u8F6C8\u5B57\u8282\u6570\u7EC4
          ?????*/
          ????public?static?long?bytes2long(byte[]?b)?{
          ????????ByteBuffer?buffer?=?ByteBuffer.allocate(8);
          ????????buffer.put(b,?0,?b.length);
          ????????buffer.flip();//?need?flip
          ????????return?buffer.getLong();
          ????}
          }

          客戶端

          將Netty的操作封裝了起來,最后返回一個Channle類型,由它進行發(fā)送數(shù)據(jù)的操作

          public?class?ClientNetty?{

          ????public?static?Channel?connect(String?host,int?port)?throws?InterruptedException?{

          ????????InetSocketAddress?address?=?new?InetSocketAddress(host,port);

          ????????EventLoopGroup?workGroup?=?new?NioEventLoopGroup();

          ????????Bootstrap?bootstrap?=?new?Bootstrap();
          ????????????bootstrap.channel(NioSocketChannel.class)
          ????????????????????.group(workGroup)
          ????????????????????.handler(new?ChannelInitializer()?{
          ????????????????????????@Override
          ????????????????????????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{

          ????????????????????????????//自定義協(xié)議handler(客戶端接收的是response)
          ????????????????????????????ch.pipeline().addLast(new?NettyProtocolHandler(RpcResponse.class));
          ????????????????????????????//處理數(shù)據(jù)handler
          ????????????????????????????ch.pipeline().addLast(new?ClientHandler());
          ????????????????????????}
          ????????????????????});

          ????????????Channel?channel?=?bootstrap.connect(address).sync().channel();

          ????????????return?channel;
          ????}
          }

          數(shù)據(jù)處理器負責接收response,并將響應結果放入在future中,future的使用在后續(xù)的動態(tài)代理中

          public?class?ClientHandler?extends?SimpleChannelInboundHandler?{

          ????@Override
          ????protected?void?channelRead0(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{

          ????????RpcResponse?rpcResponse?=?(RpcResponse)?msg;

          ????????//服務端正常情況返回碼為200
          ????????if(rpcResponse.getCode()?!=?200){
          ????????????throw?new?Exception();
          ????????}

          ????????//將結果放到future里
          ????????RPCInvocationHandler.future.complete(rpcResponse.getResult());
          ????}

          ????@Override
          ????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{
          ????????super.exceptionCaught(ctx,?cause);
          ????}
          }


          要讓客戶端在調用遠程方法時像調用本地方法一樣,就需要一個代理對象,供客戶端調用,讓代理對象去調用服務端的實現(xiàn)。

          代理對象構造

          public?class?ProxyFactory?{

          ????public?static?Object?getProxy(Class[]?interfaces){

          ????????return?Proxy.newProxyInstance(ProxyFactory.class.getClassLoader(),
          ????????????????interfaces,
          ????????????????new?RPCInvocationHandler());
          ????}
          }

          客戶端代理對象的方法執(zhí)行

          將request發(fā)送給服務端后,一直阻塞,等到future里面有了結果為止。

          public?class?RPCInvocationHandler?implements?InvocationHandler?{


          ????static?public?CompletableFuture?future;
          ????static?Channel?channel;

          ????static?{
          ????????future?=?new?CompletableFuture();
          ????????//開啟netty網絡服務
          ????????try?{
          ????????????channel?=?ClientNetty.connect("127.0.0.1",8989);
          ????????}?catch?(InterruptedException?e)?{
          ????????????e.printStackTrace();
          ????????}
          ????}

          ????@Override
          ????public?Object?invoke(Object?proxy,?Method?method,?Object[]?args)?throws?Throwable?{

          ????????RpcRequest?rpcRequest?=?new?RpcRequest();

          ????????rpcRequest.setArgs(args);
          ????????rpcRequest.setMethodName(method.getName());
          ????????rpcRequest.setParamType(method.getParameterTypes());
          ????????rpcRequest.setClassName(method.getDeclaringClass().getSimpleName());

          ???????channel.writeAndFlush(rpcRequest);
          ????????//一個阻塞操作,等待網絡傳輸?shù)慕Y果
          ???????String?result?=?(String)?future.get();

          ????????return?result;
          ????}
          }
          • 這里用static修飾future和channle,沒有考慮到客戶端去連接多個服務端和多次遠程調用

          • 可以使用一個hash表,存儲與不同服務端對應的channle,每次調用時從hash表中獲取即可

          • 用hash表存儲與不同request對應的future,每個響應的結果與之對應

          客戶端

          要進行遠程調用需要擁有的接口

          public?interface?OrderService?{

          ????public?String?buy();
          }

          預先的操作和測試代碼

          public?class?Client?{

          ????static?OrderService?orderService;

          ????public?static?void?main(String[]?args)?throws?InterruptedException?{

          ????????//創(chuàng)建一個代理對象給進行遠程調用的類
          ????????orderService?=?(OrderService)?ProxyFactory.getProxy(new?Class[]{OrderService.class});

          ????????String?result?=?orderService.buy();

          ????????System.out.println(result);
          ????}
          }


          服務端

          要接受遠程調用需要擁有的具體實現(xiàn)類

          public?class?OrderImpl?implements?OrderService?{

          ????public?OrderImpl()?{
          ????}

          ????@Override
          ????public?String?buy()?{
          ????????System.out.println("調用buy方法");
          ????????return?"調用buy方法成功";
          ????}
          }

          預先操作和測試代碼

          public?class?Server?{

          ???public?static?HashMap?hashMap?=?new?HashMap<>();

          ????public?static?void?main(String[]?args)?throws?InterruptedException?{
          ????????//開啟netty網絡服務
          ????????ServerNetty.connect(8989);

          ????????//提前將需要開放的服務注冊到hash表中
          ????????hashMap.put("OrderService",new?OrderImpl());

          ????}
          }

          ?

          執(zhí)行結果

          ?

          ?

          ?


          ? 作者?|??劃水的魚dm

          來源 |??cnblogs.com/davidFB/p/15481823.html


          加鋒哥微信:?java1239??
          圍觀鋒哥朋友圈,每天推送Java干貨!

          瀏覽 41
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天堂在线在线 | 人人妻人人上 | 国产色情性黄 片免费视频 | 99re视频在线播放 | 操爱|