<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          粘包和拆包及Netty解決方案

          共 3391字,需瀏覽 7分鐘

           ·

          2020-12-28 08:25

          點(diǎn)擊上方「藍(lán)字」關(guān)注我們

          在RPC框架中,粘包和拆包問(wèn)題是必須解決一個(gè)問(wèn)題,因?yàn)镽PC框架中,各個(gè)微服務(wù)相互之間都是維系了一個(gè)TCP長(zhǎng)連接,比如dubbo就是一個(gè)全雙工的長(zhǎng)連接。由于微服務(wù)往對(duì)方發(fā)送信息的時(shí)候,所有的請(qǐng)求都是使用的同一個(gè)連接,這樣就會(huì)產(chǎn)生粘包和拆包的問(wèn)題。本文首先會(huì)對(duì)粘包和拆包問(wèn)題進(jìn)行描述,然后介紹其常用的解決方案,最后會(huì)對(duì)Netty提供的幾種解決方案進(jìn)行講解。

          0x01. 粘包和拆包

          產(chǎn)生粘包和拆包問(wèn)題的主要原因是,操作系統(tǒng)在發(fā)送TCP數(shù)據(jù)的時(shí)候,底層會(huì)有一個(gè)緩沖區(qū),例如1024個(gè)字節(jié)大小,如果一次請(qǐng)求發(fā)送的數(shù)據(jù)量比較小,沒(méi)達(dá)到緩沖區(qū)大小,TCP則會(huì)將多個(gè)請(qǐng)求合并為同一個(gè)請(qǐng)求進(jìn)行發(fā)送,這就形成了粘包問(wèn)題;如果一次請(qǐng)求發(fā)送的數(shù)據(jù)量比較大,超過(guò)了緩沖區(qū)大小,TCP就會(huì)將其拆分為多次發(fā)送,這就是拆包,也就是將一個(gè)大的包拆分為多個(gè)小包進(jìn)行發(fā)送。如下圖展示了粘包和拆包的一個(gè)示意圖:

          ?

          上圖中演示了粘包和拆包的三種情況:

          • A和B兩個(gè)包都剛好滿足TCP緩沖區(qū)的大小,或者說(shuō)其等待時(shí)間已經(jīng)達(dá)到TCP等待時(shí)長(zhǎng),從而還是使用兩個(gè)獨(dú)立的包進(jìn)行發(fā)送;

          • A和B兩次請(qǐng)求間隔時(shí)間內(nèi)較短,并且數(shù)據(jù)包較小,因而合并為同一個(gè)包發(fā)送給服務(wù)端;

          • B包比較大,因而將其拆分為兩個(gè)包B_1和B_2進(jìn)行發(fā)送,而這里由于拆分后的B_2比較小,其又與A包合并在一起發(fā)送。


          0x02. 常見(jiàn)解決方案

          對(duì)于粘包和拆包問(wèn)題,常見(jiàn)的解決方案有四種:

          • 客戶端在發(fā)送數(shù)據(jù)包的時(shí)候,每個(gè)包都固定長(zhǎng)度,比如1024個(gè)字節(jié)大小,如果客戶端發(fā)送的數(shù)據(jù)長(zhǎng)度不足1024個(gè)字節(jié),則通過(guò)補(bǔ)充空格的方式補(bǔ)全到指定長(zhǎng)度;

          • 客戶端在每個(gè)包的末尾使用固定的分隔符,例如\r\n,如果一個(gè)包被拆分了,則等待下一個(gè)包發(fā)送過(guò)來(lái)之后找到其中的\r\n,然后對(duì)其拆分后的頭部部分與前一個(gè)包的剩余部分進(jìn)行合并,這樣就得到了一個(gè)完整的包;

          • 將消息分為頭部和消息體,在頭部中保存有當(dāng)前整個(gè)消息的長(zhǎng)度,只有在讀取到足夠長(zhǎng)度的消息之后才算是讀到了一個(gè)完整的消息;

          • 通過(guò)自定義協(xié)議進(jìn)行粘包和拆包的處理。


          0x03. Netty提供的粘包拆包解決方案

          1) FixedLengthFrameDecoder

          對(duì)于使用固定長(zhǎng)度的粘包和拆包場(chǎng)景,可以使用FixedLengthFrameDecoder,該解碼器會(huì)每次讀取固定長(zhǎng)度的消息,如果當(dāng)前讀取到的消息不足指定長(zhǎng)度,那么就會(huì)等待下一個(gè)消息到達(dá)后進(jìn)行補(bǔ)足。其使用也比較簡(jiǎn)單,只需要在構(gòu)造函數(shù)中指定每個(gè)消息的長(zhǎng)度即可。這里需要注意的是,F(xiàn)ixedLengthFrameDecoder只是一個(gè)解碼器,Netty也只提供了一個(gè)解碼器,這是因?yàn)閷?duì)于解碼是需要等待下一個(gè)包的進(jìn)行補(bǔ)全的,代碼相對(duì)復(fù)雜,而對(duì)于編碼器,用戶可以自行編寫(xiě),因?yàn)榫幋a時(shí)只需要將不足指定長(zhǎng)度的部分進(jìn)行補(bǔ)全即可。下面的示例中展示了如何使用FixedLengthFrameDecoder來(lái)進(jìn)行粘包和拆包處理:

          public?class?EchoServer?{

          ??public?void?bind(int?port)?throws?InterruptedException?{
          ????EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();
          ????EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();
          ????try?{
          ??????ServerBootstrap?bootstrap?=?new?ServerBootstrap();
          ??????bootstrap.group(bossGroup,?workerGroup)
          ????????.channel(NioServerSocketChannel.class)
          ????????.option(ChannelOption.SO_BACKLOG,?1024)
          ????????.handler(new?LoggingHandler(LogLevel.INFO))
          ????????.childHandler(new?ChannelInitializer()?{
          ??????????@Override
          ??????????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
          ????????????//?這里將FixedLengthFrameDecoder添加到pipeline中,指定長(zhǎng)度為20
          ????????????ch.pipeline().addLast(new?FixedLengthFrameDecoder(20));
          ????????????//?將前一步解碼得到的數(shù)據(jù)轉(zhuǎn)碼為字符串
          ????????????ch.pipeline().addLast(new?StringDecoder());
          ????????????//?這里FixedLengthFrameEncoder是我們自定義的,用于將長(zhǎng)度不足20的消息進(jìn)行補(bǔ)全空格
          ????????????ch.pipeline().addLast(new?FixedLengthFrameEncoder(20));
          ????????????//?最終的數(shù)據(jù)處理
          ????????????ch.pipeline().addLast(new?EchoServerHandler());
          ??????????}
          ????????});

          ??????ChannelFuture?future?=?bootstrap.bind(port).sync();
          ??????future.channel().closeFuture().sync();
          ????}?finally?{
          ??????bossGroup.shutdownGracefully();
          ??????workerGroup.shutdownGracefully();
          ????}
          ??}

          ??public?static?void?main(String[]?args)?throws?InterruptedException?{
          ????new?EchoServer().bind(8080);
          ??}
          }

          上面的pipeline中,對(duì)于入棧數(shù)據(jù),這里主要添加了FixedLengthFrameDecoder和StringDecoder,前面一個(gè)用于處理固定長(zhǎng)度的消息的粘包和拆包問(wèn)題,第二個(gè)則是將處理之后的消息轉(zhuǎn)換為字符串。最后由EchoServerHandler處理最終得到的數(shù)據(jù),處理完成后,將處理得到的數(shù)據(jù)交由FixedLengthFrameEncoder處理,該編碼器是我們自定義的實(shí)現(xiàn),主要作用是將長(zhǎng)度不足20的消息進(jìn)行空格補(bǔ)全。下面是FixedLengthFrameEncoder的實(shí)現(xiàn)代碼:

          public?class?FixedLengthFrameEncoder?extends?MessageToByteEncoder<String>?{
          ??private?int?length;

          ??public?FixedLengthFrameEncoder(int?length)?{
          ????this.length?=?length;
          ??}

          ??@Override
          ??protected?void?encode(ChannelHandlerContext?ctx,?String?msg,?ByteBuf?out)
          ??????throws?Exception?
          {
          ????//?對(duì)于超過(guò)指定長(zhǎng)度的消息,這里直接拋出異常
          ????if?(msg.length()?>?length)?{
          ??????throw?new?UnsupportedOperationException(
          ??????????"message?length?is?too?large,?it's?limited?"?+?length);
          ????}

          ????//?如果長(zhǎng)度不足,則進(jìn)行補(bǔ)全
          ????if?(msg.length()???????msg?=?addSpace(msg);
          ????}

          ????ctx.writeAndFlush(Unpooled.wrappedBuffer(msg.getBytes()));
          ??}

          ??//?進(jìn)行空格補(bǔ)全
          ??private?String?addSpace(String?msg)?{
          ????StringBuilder?builder?=?new?StringBuilder(msg);
          ????for?(int?i?=?0;?i???????builder.append("?");
          ????}

          ????return?builder.toString();
          ??}
          }

          這里FixedLengthFrameEncoder實(shí)現(xiàn)了decode()方法,在該方法中,主要是將消息長(zhǎng)度不足20的消息進(jìn)行空格補(bǔ)全。EchoServerHandler的作用主要是打印接收到的消息,然后發(fā)送響應(yīng)給客戶端:

          public?class?EchoServerHandler?extends?SimpleChannelInboundHandler<String>?{

          ??@Override
          ??protected?void?channelRead0(ChannelHandlerContext?ctx,?String?msg)?throws?Exception?{
          ????System.out.println("server?receives?message:?"?+?msg.trim());
          ????ctx.writeAndFlush("hello?client!");
          ??}
          }

          對(duì)于客戶端,其實(shí)現(xiàn)方式基本與服務(wù)端的使用方式類似,只是在最后進(jìn)行消息發(fā)送的時(shí)候與服務(wù)端的處理方式不同。如下是客戶端EchoClient的代碼:

          public?class?EchoClient?{

          ??public?void?connect(String?host,?int?port)?throws?InterruptedException?{
          ????EventLoopGroup?group?=?new?NioEventLoopGroup();
          ????try?{
          ??????Bootstrap?bootstrap?=?new?Bootstrap();
          ??????bootstrap.group(group)
          ????????.channel(NioSocketChannel.class)
          ????????.option(ChannelOption.TCP_NODELAY,?true)
          ????????.handler(new?ChannelInitializer()?{
          ??????????@Override
          ??????????protected?void?initChannel(SocketChannel?ch)?throws?Exception?
          {
          ????????????//?對(duì)服務(wù)端發(fā)送的消息進(jìn)行粘包和拆包處理,由于服務(wù)端發(fā)送的消息已經(jīng)進(jìn)行了空格補(bǔ)全,
          ????????????//?并且長(zhǎng)度為20,因而這里指定的長(zhǎng)度也為20
          ????????????ch.pipeline().addLast(new?FixedLengthFrameDecoder(20));
          ????????????//?將粘包和拆包處理得到的消息轉(zhuǎn)換為字符串
          ????????????ch.pipeline().addLast(new?StringDecoder());
          ????????????//?對(duì)客戶端發(fā)送的消息進(jìn)行空格補(bǔ)全,保證其長(zhǎng)度為20
          ????????????ch.pipeline().addLast(new?FixedLengthFrameEncoder(20));
          ????????????//?客戶端發(fā)送消息給服務(wù)端,并且處理服務(wù)端響應(yīng)的消息
          ????????????ch.pipeline().addLast(new?EchoClientHandler());
          ??????????}
          ????????});

          ??????ChannelFuture?future?=?bootstrap.connect(host,?port).sync();
          ??????future.channel().closeFuture().sync();
          ????}?finally?{
          ??????group.shutdownGracefully();
          ????}
          ??}

          ??public?static?void?main(String[]?args)?throws?InterruptedException?{
          ????new?EchoClient().connect("127.0.0.1",?8080);
          ??}
          }

          對(duì)于客戶端而言,其消息的處理流程其實(shí)與服務(wù)端是相似的,對(duì)于入站消息,需要對(duì)其進(jìn)行粘包和拆包處理,然后將其轉(zhuǎn)碼為字符串,對(duì)于出站消息,則需要將長(zhǎng)度不足20的消息進(jìn)行空格補(bǔ)全。客戶端與服務(wù)端處理的主要區(qū)別在于最后的消息處理handler不一樣,也即這里的EchoClientHandler,如下是該handler的源碼:

          public?class?EchoClientHandler?extends?SimpleChannelInboundHandler<String>?{

          ??@Override
          ??protected?void?channelRead0(ChannelHandlerContext?ctx,?String?msg)?throws?Exception?{
          ????System.out.println("client?receives?message:?"?+?msg.trim());
          ??}

          ??@Override
          ??public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
          ????ctx.writeAndFlush("hello?server!");
          ??}
          }

          這里客戶端的處理主要是重寫(xiě)了channelActive()和channelRead0()兩個(gè)方法,這兩個(gè)方法的主要作用在于,channelActive()會(huì)在客戶端連接上服務(wù)器時(shí)執(zhí)行,也就是說(shuō),其連上服務(wù)器之后就會(huì)往服務(wù)器發(fā)送消息。而channelRead0()主要是在服務(wù)器發(fā)送響應(yīng)給客戶端時(shí)執(zhí)行,這里主要是打印服務(wù)器的響應(yīng)消息。對(duì)于服務(wù)端而言,前面我們我們可以看到,EchoServerHandler只重寫(xiě)了channelRead0()方法,這是因?yàn)榉?wù)器只需要等待客戶端發(fā)送消息過(guò)來(lái),然后在該方法中進(jìn)行處理,處理完成后直接將響應(yīng)發(fā)送給客戶端。如下是分別啟動(dòng)服務(wù)端和客戶端之后控制臺(tái)打印的數(shù)據(jù):

          //?server
          server?receives?message:?hello?server!
          //?client
          client?receives?message:?hello?client!

          2) LineBasedFrameDecoder與DelimiterBasedFrameDecoder

          對(duì)于通過(guò)分隔符進(jìn)行粘包和拆包問(wèn)題的處理,Netty提供了兩個(gè)編解碼的類,LineBasedFrameDecoder和DelimiterBasedFrameDecoder。這里L(fēng)ineBasedFrameDecoder的作用主要是通過(guò)換行符,即\n或者\(yùn)r\n對(duì)數(shù)據(jù)進(jìn)行處理;而DelimiterBasedFrameDecoder的作用則是通過(guò)用戶指定的分隔符對(duì)數(shù)據(jù)進(jìn)行粘包和拆包處理。同樣的,這兩個(gè)類都是解碼器類,而對(duì)于數(shù)據(jù)的編碼,也即在每個(gè)數(shù)據(jù)包最后添加換行符或者指定分割符的部分需要用戶自行進(jìn)行處理。這里以DelimiterBasedFrameDecoder為例進(jìn)行講解,如下是EchoServer中使用該類的代碼片段,其余部分與前面的例子中的完全一致:

          @Override
          protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
          ????String?delimiter?=?"_$";
          ????//?將delimiter設(shè)置到DelimiterBasedFrameDecoder中,經(jīng)過(guò)該解碼器進(jìn)行處理之后,源數(shù)據(jù)將會(huì)
          ????//?被按照_$進(jìn)行分隔,這里1024指的是分隔的最大長(zhǎng)度,即當(dāng)讀取到1024個(gè)字節(jié)的數(shù)據(jù)之后,若還是未
          ????//?讀取到分隔符,則舍棄當(dāng)前數(shù)據(jù)段,因?yàn)槠浜苡锌赡苁怯捎诖a流紊亂造成的
          ????ch.pipeline().addLast(new?DelimiterBasedFrameDecoder(1024,
          ????????Unpooled.wrappedBuffer(delimiter.getBytes())));
          ????//?將分隔之后的字節(jié)數(shù)據(jù)轉(zhuǎn)換為字符串?dāng)?shù)據(jù)
          ????ch.pipeline().addLast(new?StringDecoder());
          ????//?這是我們自定義的一個(gè)編碼器,主要作用是在返回的響應(yīng)數(shù)據(jù)最后添加分隔符
          ????ch.pipeline().addLast(new?DelimiterBasedFrameEncoder(delimiter));
          ????//?最終處理數(shù)據(jù)并且返回響應(yīng)的handler
          ????ch.pipeline().addLast(new?EchoServerHandler());
          }

          上面pipeline的設(shè)置中,添加的解碼器主要有DelimiterBasedFrameDecoder和StringDecoder,經(jīng)過(guò)這兩個(gè)處理器處理之后,接收到的字節(jié)流就會(huì)被分隔,并且轉(zhuǎn)換為字符串?dāng)?shù)據(jù),最終交由EchoServerHandler處理。這里DelimiterBasedFrameEncoder是我們自定義的編碼器,其主要作用是在返回的響應(yīng)數(shù)據(jù)之后添加分隔符。如下是該編碼器的源碼:

          public?class?DelimiterBasedFrameEncoder?extends?MessageToByteEncoder<String>?{

          ??private?String?delimiter;

          ??public?DelimiterBasedFrameEncoder(String?delimiter)?{
          ????this.delimiter?=?delimiter;
          ??}

          ??@Override
          ??protected?void?encode(ChannelHandlerContext?ctx,?String?msg,?ByteBuf?out)?
          ??????throws?Exception?
          {
          ????//?在響應(yīng)的數(shù)據(jù)后面添加分隔符
          ????ctx.writeAndFlush(Unpooled.wrappedBuffer((msg?+?delimiter).getBytes()));
          ??}
          }

          對(duì)于客戶端而言,這里的處理方式與服務(wù)端類似,其pipeline的添加方式如下:

          @Override
          protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
          ????String?delimiter?=?"_$";
          ????//?對(duì)服務(wù)端返回的消息通過(guò)_$進(jìn)行分隔,并且每次查找的最大大小為1024字節(jié)
          ????ch.pipeline().addLast(new?DelimiterBasedFrameDecoder(1024,?
          ????????Unpooled.wrappedBuffer(delimiter.getBytes())));
          ????//?將分隔之后的字節(jié)數(shù)據(jù)轉(zhuǎn)換為字符串
          ????ch.pipeline().addLast(new?StringDecoder());
          ????//?對(duì)客戶端發(fā)送的數(shù)據(jù)進(jìn)行編碼,這里主要是在客戶端發(fā)送的數(shù)據(jù)最后添加分隔符
          ????ch.pipeline().addLast(new?DelimiterBasedFrameEncoder(delimiter));
          ????//?客戶端發(fā)送數(shù)據(jù)給服務(wù)端,并且處理從服務(wù)端響應(yīng)的數(shù)據(jù)
          ????ch.pipeline().addLast(new?EchoClientHandler());
          }

          這里客戶端的處理方式與服務(wù)端基本一致,關(guān)于這里沒(méi)展示的代碼,其與示例一中的代碼完全一致,這里則不予展示。

          3) LengthFieldBasedFrameDecoder與LengthFieldPrepender

          這里L(fēng)engthFieldBasedFrameDecoder與LengthFieldPrepender需要配合起來(lái)使用,其實(shí)本質(zhì)上來(lái)講,這兩者一個(gè)是解碼,一個(gè)是編碼的關(guān)系。它們處理粘、拆包的主要思想是在生成的數(shù)據(jù)包中添加一個(gè)長(zhǎng)度字段,用于記錄當(dāng)前數(shù)據(jù)包的長(zhǎng)度。LengthFieldBasedFrameDecoder會(huì)按照參數(shù)指定的包長(zhǎng)度偏移量數(shù)據(jù)對(duì)接收到的數(shù)據(jù)進(jìn)行解碼,從而得到目標(biāo)消息體數(shù)據(jù);而LengthFieldPrepender則會(huì)在響應(yīng)的數(shù)據(jù)前面添加指定的字節(jié)數(shù)據(jù),這個(gè)字節(jié)數(shù)據(jù)中保存了當(dāng)前消息體的整體字節(jié)數(shù)據(jù)長(zhǎng)度。LengthFieldBasedFrameDecoder的解碼過(guò)程如下圖所示:

          關(guān)于LengthFieldBasedFrameDecoder,這里需要對(duì)其構(gòu)造函數(shù)參數(shù)進(jìn)行介紹:

          • maxFrameLength:指定了每個(gè)包所能傳遞的最大數(shù)據(jù)包大小;

          • lengthFieldOffset:指定了長(zhǎng)度字段在字節(jié)碼中的偏移量;

          • lengthFieldLength:指定了長(zhǎng)度字段所占用的字節(jié)長(zhǎng)度;

          • lengthAdjustment:對(duì)一些不僅包含有消息頭和消息體的數(shù)據(jù)進(jìn)行消息頭的長(zhǎng)度的調(diào)整,這樣就可以只得到消息體的數(shù)據(jù),這里的lengthAdjustment指定的就是消息頭的長(zhǎng)度;

          • initialBytesToStrip:對(duì)于長(zhǎng)度字段在消息頭中間的情況,可以通過(guò)initialBytesToStrip忽略掉消息頭以及長(zhǎng)度字段占用的字節(jié)。

          這里我們以json序列化為例對(duì)LengthFieldBasedFrameDecoder和LengthFieldPrepender的使用方式進(jìn)行講解。如下是EchoServer的源碼:

          public?class?EchoServer?{

          ??public?void?bind(int?port)?throws?InterruptedException?{
          ????EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();
          ????EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();
          ????try?{
          ??????ServerBootstrap?bootstrap?=?new?ServerBootstrap();
          ??????bootstrap.group(bossGroup,?workerGroup)
          ????????.channel(NioServerSocketChannel.class)
          ????????.option(ChannelOption.SO_BACKLOG,?1024)
          ????????.handler(new?LoggingHandler(LogLevel.INFO))
          ????????.childHandler(new?ChannelInitializer()?{
          ??????????@Override
          ??????????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
          ????????????//?這里將LengthFieldBasedFrameDecoder添加到pipeline的首位,因?yàn)槠湫枰獙?duì)接收到的數(shù)據(jù)
          ????????????//?進(jìn)行長(zhǎng)度字段解碼,這里也會(huì)對(duì)數(shù)據(jù)進(jìn)行粘包和拆包處理
          ????????????ch.pipeline().addLast(new?LengthFieldBasedFrameDecoder(1024,?0,?2,?0,?2));
          ????????????//?LengthFieldPrepender是一個(gè)編碼器,主要是在響應(yīng)字節(jié)數(shù)據(jù)前面添加字節(jié)長(zhǎng)度字段
          ????????????ch.pipeline().addLast(new?LengthFieldPrepender(2));
          ????????????//?對(duì)經(jīng)過(guò)粘包和拆包處理之后的數(shù)據(jù)進(jìn)行json反序列化,從而得到User對(duì)象
          ????????????ch.pipeline().addLast(new?JsonDecoder());
          ????????????//?對(duì)響應(yīng)數(shù)據(jù)進(jìn)行編碼,主要是將User對(duì)象序列化為json
          ????????????ch.pipeline().addLast(new?JsonEncoder());
          ????????????//?處理客戶端的請(qǐng)求的數(shù)據(jù),并且進(jìn)行響應(yīng)
          ????????????ch.pipeline().addLast(new?EchoServerHandler());
          ??????????}
          ????????});

          ??????ChannelFuture?future?=?bootstrap.bind(port).sync();
          ??????future.channel().closeFuture().sync();
          ????}?finally?{
          ??????bossGroup.shutdownGracefully();
          ??????workerGroup.shutdownGracefully();
          ????}
          ??}

          ??public?static?void?main(String[]?args)?throws?InterruptedException?{
          ????new?EchoServer().bind(8080);
          ??}
          }

          這里EchoServer主要是在pipeline中添加了兩個(gè)編碼器和兩個(gè)解碼器,編碼器主要是負(fù)責(zé)將響應(yīng)的User對(duì)象序列化為json對(duì)象,然后在其字節(jié)數(shù)組前面添加一個(gè)長(zhǎng)度字段的字節(jié)數(shù)組;解碼器主要是對(duì)接收到的數(shù)據(jù)進(jìn)行長(zhǎng)度字段的解碼,然后將其反序列化為一個(gè)User對(duì)象。下面是JsonDecoder的源碼:

          public?class?JsonDecoder?extends?MessageToMessageDecoder<ByteBuf>?{

          ??@Override
          ??protected?void?decode(ChannelHandlerContext?ctx,?ByteBuf?buf,?List?out)?
          ??????throws?Exception?{
          ????byte[]?bytes?=?new?byte[buf.readableBytes()];
          ????buf.readBytes(bytes);
          ????User?user?=?JSON.parseObject(new?String(bytes,?CharsetUtil.UTF_8),?User.class);
          ????out.add(user);
          ??}
          }

          JsonDecoder首先從接收到的數(shù)據(jù)流中讀取字節(jié)數(shù)組,然后將其反序列化為一個(gè)User對(duì)象。下面我們看看JsonEncoder的源碼:

          public?class?JsonEncoder?extends?MessageToByteEncoder<User>?{

          ??@Override
          ??protected?void?encode(ChannelHandlerContext?ctx,?User?user,?ByteBuf?buf)
          ??????throws?Exception?
          {
          ????String?json?=?JSON.toJSONString(user);
          ????ctx.writeAndFlush(Unpooled.wrappedBuffer(json.getBytes()));
          ??}
          }

          JsonEncoder將響應(yīng)得到的User對(duì)象轉(zhuǎn)換為一個(gè)json對(duì)象,然后寫(xiě)入響應(yīng)中。對(duì)于EchoServerHandler,其主要作用就是接收客戶端數(shù)據(jù),并且進(jìn)行響應(yīng),如下是其源碼:

          public?class?EchoServerHandler?extends?SimpleChannelInboundHandler<User>?{

          ??@Override
          ??protected?void?channelRead0(ChannelHandlerContext?ctx,?User?user)?throws?Exception?{
          ????System.out.println("receive?from?client:?"?+?user);
          ????ctx.write(user);
          ??}
          }

          對(duì)于客戶端,其主要邏輯與服務(wù)端的基本類似,這里主要展示其pipeline的添加方式,以及最后發(fā)送請(qǐng)求,并且對(duì)服務(wù)器響應(yīng)進(jìn)行處理的過(guò)程:

          @Override
          protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
          ????ch.pipeline().addLast(new?LengthFieldBasedFrameDecoder(1024,?0,?2,?0,?2));
          ????ch.pipeline().addLast(new?LengthFieldPrepender(2));
          ????ch.pipeline().addLast(new?JsonDecoder());
          ????ch.pipeline().addLast(new?JsonEncoder());
          ????ch.pipeline().addLast(new?EchoClientHandler());
          }
          public?class?EchoClientHandler?extends?SimpleChannelInboundHandler<User>?{

          ??@Override
          ??public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
          ????ctx.write(getUser());
          ??}

          ??private?User?getUser()?{
          ????User?user?=?new?User();
          ????user.setAge(27);
          ????user.setName("zhangxufeng");
          ????return?user;
          ??}

          ??@Override
          ??protected?void?channelRead0(ChannelHandlerContext?ctx,?User?user)?throws?Exception?{
          ????System.out.println("receive?message?from?server:?"?+?user);
          ??}
          }

          這里客戶端首先會(huì)在連接上服務(wù)器時(shí),往服務(wù)器發(fā)送一個(gè)User對(duì)象數(shù)據(jù),然后在接收到服務(wù)器響應(yīng)之后,會(huì)打印服務(wù)器響應(yīng)的數(shù)據(jù)。

          4) 自定義粘包與拆包器

          對(duì)于粘包與拆包問(wèn)題,其實(shí)前面三種基本上已經(jīng)能夠滿足大多數(shù)情形了,但是對(duì)于一些更加復(fù)雜的協(xié)議,可能有一些定制化的需求。對(duì)于這些場(chǎng)景,其實(shí)本質(zhì)上,我們也不需要手動(dòng)從頭開(kāi)始寫(xiě)一份粘包與拆包處理器,而是通過(guò)繼承LengthFieldBasedFrameDecoder和LengthFieldPrepender來(lái)實(shí)現(xiàn)粘包和拆包的處理。

          如果用戶確實(shí)需要不通過(guò)繼承的方式實(shí)現(xiàn)自己的粘包和拆包處理器,這里可以通過(guò)實(shí)現(xiàn)MessageToByteEncoder和ByteToMessageDecoder來(lái)實(shí)現(xiàn)。這里MessageToByteEncoder的作用是將響應(yīng)數(shù)據(jù)編碼為一個(gè)ByteBuf對(duì)象,而B(niǎo)yteToMessageDecoder則是將接收到的ByteBuf數(shù)據(jù)轉(zhuǎn)換為某個(gè)對(duì)象數(shù)據(jù)。通過(guò)實(shí)現(xiàn)這兩個(gè)抽象類,用戶就可以達(dá)到實(shí)現(xiàn)自定義粘包和拆包處理的目的。如下是這兩個(gè)類及其抽象方法的聲明:

          public?abstract?class?ByteToMessageDecoder?extends?ChannelInboundHandlerAdapter?{
          ????protected?abstract?void?decode(ChannelHandlerContext?ctx,?ByteBuf?in,?List?out)?
          ????????throws?Exception;
          }


          public?abstract?class?MessageToByteEncoder<I>?extends?ChannelOutboundHandlerAdapter?{
          ????protected?abstract?void?encode(ChannelHandlerContext?ctx,?I?msg,?ByteBuf?out)?
          ????????throws?Exception
          ;
          }


          source:https://www.cnblogs.com/barrywxx/p/11532012.html

          掃碼二維碼

          獲取更多精彩

          Java樂(lè)園

          有用!分享+在看?
          瀏覽 63
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                    <th id="afajh"><progress id="afajh"></progress></th>
                    欧美在线精 | 人人操超碰在线观看 | 一区二区三区免费精品 | 老女人性爱视频在线观看 | 狼人狠狠干 |