<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Netty 解決粘包和拆包問題的四種方案

          共 3520字,需瀏覽 8分鐘

           ·

          2020-08-04 21:53


          點(diǎn)擊上方藍(lán)色“程序猿DD”,選擇“設(shè)為星標(biāo)”

          回復(fù)“資源”獲取獨(dú)家整理的學(xué)習(xí)資料!

          來源 |?https://my.oschina.net/zhangxufeng/blog/3023794

          在RPC框架中,粘包和拆包問題是必須解決一個問題,因為RPC框架中,各個微服務(wù)相互之間都是維系了一個TCP長連接,比如dubbo就是一個全雙工的長連接。由于微服務(wù)往對方發(fā)送信息的時候,所有的請求都是使用的同一個連接,這樣就會產(chǎn)生粘包和拆包的問題。本文首先會對粘包和拆包問題進(jìn)行描述,然后介紹其常用的解決方案,最后會對Netty提供的幾種解決方案進(jìn)行講解。這里說明一下,由于oschina將“jie ma qi”認(rèn)定為敏感文字,因而本文統(tǒng)一使用“解碼一器”表示該含義

          1. 粘包和拆包

          產(chǎn)生粘包和拆包問題的主要原因是,操作系統(tǒng)在發(fā)送TCP數(shù)據(jù)的時候,底層會有一個緩沖區(qū),例如1024個字節(jié)大小,如果一次請求發(fā)送的數(shù)據(jù)量比較小,沒達(dá)到緩沖區(qū)大小,TCP則會將多個請求合并為同一個請求進(jìn)行發(fā)送,這就形成了粘包問題;如果一次請求發(fā)送的數(shù)據(jù)量比較大,超過了緩沖區(qū)大小,TCP就會將其拆分為多次發(fā)送,這就是拆包,也就是將一個大的包拆分為多個小包進(jìn)行發(fā)送。如下圖展示了粘包和拆包的一個示意圖:

          上圖中演示了粘包和拆包的三種情況:

          • A和B兩個包都剛好滿足TCP緩沖區(qū)的大小,或者說其等待時間已經(jīng)達(dá)到TCP等待時長,從而還是使用兩個獨(dú)立的包進(jìn)行發(fā)送;

          • A和B兩次請求間隔時間內(nèi)較短,并且數(shù)據(jù)包較小,因而合并為同一個包發(fā)送給服務(wù)端;

          • B包比較大,因而將其拆分為兩個包B_1和B_2進(jìn)行發(fā)送,而這里由于拆分后的B_2比較小,其又與A包合并在一起發(fā)送。

          2. 常見解決方案

          對于粘包和拆包問題,常見的解決方案有四種:

          • 客戶端在發(fā)送數(shù)據(jù)包的時候,每個包都固定長度,比如1024個字節(jié)大小,如果客戶端發(fā)送的數(shù)據(jù)長度不足1024個字節(jié),則通過補(bǔ)充空格的方式補(bǔ)全到指定長度;

          • 客戶端在每個包的末尾使用固定的分隔符,例如\r\n,如果一個包被拆分了,則等待下一個包發(fā)送過來之后找到其中的\r\n,然后對其拆分后的頭部部分與前一個包的剩余部分進(jìn)行合并,這樣就得到了一個完整的包;

          • 將消息分為頭部和消息體,在頭部中保存有當(dāng)前整個消息的長度,只有在讀取到足夠長度的消息之后才算是讀到了一個完整的消息;

          • 通過自定義協(xié)議進(jìn)行粘包和拆包的處理。

          3. Netty提供的粘包拆包解決方案

          3.1 FixedLengthFrameDecoder

          對于使用固定長度的粘包和拆包場景,可以使用FixedLengthFrameDecoder,該解碼一器會每次讀取固定長度的消息,如果當(dāng)前讀取到的消息不足指定長度,那么就會等待下一個消息到達(dá)后進(jìn)行補(bǔ)足。其使用也比較簡單,只需要在構(gòu)造函數(shù)中指定每個消息的長度即可。這里需要注意的是,F(xiàn)ixedLengthFrameDecoder只是一個解碼一器,Netty也只提供了一個解碼一器,這是因為對于解碼是需要等待下一個包的進(jìn)行補(bǔ)全的,代碼相對復(fù)雜,而對于編碼器,用戶可以自行編寫,因為編碼時只需要將不足指定長度的部分進(jìn)行補(bǔ)全即可。下面的示例中展示了如何使用FixedLengthFrameDecoder來進(jìn)行粘包和拆包處理:

          public?class?EchoServer?{

          ??public?void?bind(int?port)?throws?InterruptedException?{
          ????EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();
          ????EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();
          ????try?{
          ??????ServerBootstrap?bootstrap?=?new?ServerBootstrap();
          ??????bootstrap.group(bossGroup,?workerGroup)
          ????????.channel(NioServerSocketChannel.class)
          ????????.option(ChannelOption.SO_BACKLOG,?1024)
          ????????.handler(new?LoggingHandler(LogLevel.INFO))
          ????????.childHandler(new?ChannelInitializer()?{
          ??????????@Override
          ??????????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
          ????????????//?這里將FixedLengthFrameDecoder添加到pipeline中,指定長度為20
          ????????????ch.pipeline().addLast(new?FixedLengthFrameDecoder(20));
          ????????????//?將前一步解碼得到的數(shù)據(jù)轉(zhuǎn)碼為字符串
          ????????????ch.pipeline().addLast(new?StringDecoder());
          ????????????//?這里FixedLengthFrameEncoder是我們自定義的,用于將長度不足20的消息進(jìn)行補(bǔ)全空格
          ????????????ch.pipeline().addLast(new?FixedLengthFrameEncoder(20));
          ????????????//?最終的數(shù)據(jù)處理
          ????????????ch.pipeline().addLast(new?EchoServerHandler());
          ??????????}
          ????????});

          ??????ChannelFuture?future?=?bootstrap.bind(port).sync();
          ??????future.channel().closeFuture().sync();
          ????}?finally?{
          ??????bossGroup.shutdownGracefully();
          ??????workerGroup.shutdownGracefully();
          ????}
          ??}

          ??public?static?void?main(String[]?args)?throws?InterruptedException?{
          ????new?EchoServer().bind(8080);
          ??}
          }

          上面的pipeline中,對于入棧數(shù)據(jù),這里主要添加了FixedLengthFrameDecoderStringDecoder,前面一個用于處理固定長度的消息的粘包和拆包問題,第二個則是將處理之后的消息轉(zhuǎn)換為字符串。最后由EchoServerHandler處理最終得到的數(shù)據(jù),處理完成后,將處理得到的數(shù)據(jù)交由FixedLengthFrameEncoder處理,該編碼器是我們自定義的實現(xiàn),主要作用是將長度不足20的消息進(jìn)行空格補(bǔ)全。下面是FixedLengthFrameEncoder的實現(xiàn)代碼:

          public?class?FixedLengthFrameEncoder?extends?MessageToByteEncoder<String>?{
          ??private?int?length;

          ??public?FixedLengthFrameEncoder(int?length)?{
          ????this.length?=?length;
          ??}

          ??@Override
          ??protected?void?encode(ChannelHandlerContext?ctx,?String?msg,?ByteBuf?out)
          ??????throws?Exception?
          {
          ????//?對于超過指定長度的消息,這里直接拋出異常
          ????if?(msg.length()?>?length)?{
          ??????throw?new?UnsupportedOperationException(
          ??????????"message?length?is?too?large,?it's?limited?"?+?length);
          ????}

          ????//?如果長度不足,則進(jìn)行補(bǔ)全
          ????if?(msg.length()???????msg?=?addSpace(msg);
          ????}

          ????ctx.writeAndFlush(Unpooled.wrappedBuffer(msg.getBytes()));
          ??}

          ??//?進(jìn)行空格補(bǔ)全
          ??private?String?addSpace(String?msg)?{
          ????StringBuilder?builder?=?new?StringBuilder(msg);
          ????for?(int?i?=?0;?i???????builder.append("?");
          ????}

          ????return?builder.toString();
          ??}
          }

          這里FixedLengthFrameEncoder實現(xiàn)了decode()方法,在該方法中,主要是將消息長度不足20的消息進(jìn)行空格補(bǔ)全。EchoServerHandler的作用主要是打印接收到的消息,然后發(fā)送響應(yīng)給客戶端:

          public?class?EchoServerHandler?extends?SimpleChannelInboundHandler<String>?{

          ??@Override
          ??protected?void?channelRead0(ChannelHandlerContext?ctx,?String?msg)?throws?Exception?{
          ????System.out.println("server?receives?message:?"?+?msg.trim());
          ????ctx.writeAndFlush("hello?client!");
          ??}
          }

          對于客戶端,其實現(xiàn)方式基本與服務(wù)端的使用方式類似,只是在最后進(jìn)行消息發(fā)送的時候與服務(wù)端的處理方式不同。如下是客戶端EchoClient的代碼:

          public?class?EchoClient?{

          ??public?void?connect(String?host,?int?port)?throws?InterruptedException?{
          ????EventLoopGroup?group?=?new?NioEventLoopGroup();
          ????try?{
          ??????Bootstrap?bootstrap?=?new?Bootstrap();
          ??????bootstrap.group(group)
          ????????.channel(NioSocketChannel.class)
          ????????.option(ChannelOption.TCP_NODELAY,?true)
          ????????.handler(new?ChannelInitializer()?{
          ??????????@Override
          ??????????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
          ????????????//?對服務(wù)端發(fā)送的消息進(jìn)行粘包和拆包處理,由于服務(wù)端發(fā)送的消息已經(jīng)進(jìn)行了空格補(bǔ)全,
          ????????????//?并且長度為20,因而這里指定的長度也為20
          ????????????ch.pipeline().addLast(new?FixedLengthFrameDecoder(20));
          ????????????//?將粘包和拆包處理得到的消息轉(zhuǎn)換為字符串
          ????????????ch.pipeline().addLast(new?StringDecoder());
          ????????????//?對客戶端發(fā)送的消息進(jìn)行空格補(bǔ)全,保證其長度為20
          ????????????ch.pipeline().addLast(new?FixedLengthFrameEncoder(20));
          ????????????//?客戶端發(fā)送消息給服務(wù)端,并且處理服務(wù)端響應(yīng)的消息
          ????????????ch.pipeline().addLast(new?EchoClientHandler());
          ??????????}
          ????????});

          ??????ChannelFuture?future?=?bootstrap.connect(host,?port).sync();
          ??????future.channel().closeFuture().sync();
          ????}?finally?{
          ??????group.shutdownGracefully();
          ????}
          ??}

          ??public?static?void?main(String[]?args)?throws?InterruptedException?{
          ????new?EchoClient().connect("127.0.0.1",?8080);
          ??}
          }

          對于客戶端而言,其消息的處理流程其實與服務(wù)端是相似的,對于入站消息,需要對其進(jìn)行粘包和拆包處理,然后將其轉(zhuǎn)碼為字符串,對于出站消息,則需要將長度不足20的消息進(jìn)行空格補(bǔ)全。客戶端與服務(wù)端處理的主要區(qū)別在于最后的消息處理handler不一樣,也即這里的EchoClientHandler,如下是該handler的源碼:

          public?class?EchoClientHandler?extends?SimpleChannelInboundHandler<String>?{

          ??@Override
          ??protected?void?channelRead0(ChannelHandlerContext?ctx,?String?msg)?throws?Exception?{
          ????System.out.println("client?receives?message:?"?+?msg.trim());
          ??}

          ??@Override
          ??public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
          ????ctx.writeAndFlush("hello?server!");
          ??}
          }

          這里客戶端的處理主要是重寫了channelActive()channelRead0()兩個方法,這兩個方法的主要作用在于,channelActive()會在客戶端連接上服務(wù)器時執(zhí)行,也就是說,其連上服務(wù)器之后就會往服務(wù)器發(fā)送消息。而channelRead0()主要是在服務(wù)器發(fā)送響應(yīng)給客戶端時執(zhí)行,這里主要是打印服務(wù)器的響應(yīng)消息。對于服務(wù)端而言,前面我們我們可以看到,EchoServerHandler只重寫了channelRead0()方法,這是因為服務(wù)器只需要等待客戶端發(fā)送消息過來,然后在該方法中進(jìn)行處理,處理完成后直接將響應(yīng)發(fā)送給客戶端。如下是分別啟動服務(wù)端和客戶端之后控制臺打印的數(shù)據(jù):

          //?server
          server?receives?message:?hello?server!
          //?client
          client?receives?message:?hello?client!

          3.2 LineBasedFrameDecoder與DelimiterBasedFrameDecoder

          對于通過分隔符進(jìn)行粘包和拆包問題的處理,Netty提供了兩個編解碼的類,LineBasedFrameDecoderDelimiterBasedFrameDecoder。這里LineBasedFrameDecoder的作用主要是通過換行符,即\n或者\r\n對數(shù)據(jù)進(jìn)行處理;而DelimiterBasedFrameDecoder的作用則是通過用戶指定的分隔符對數(shù)據(jù)進(jìn)行粘包和拆包處理。同樣的,這兩個類都是解碼一器類,而對于數(shù)據(jù)的編碼,也即在每個數(shù)據(jù)包最后添加換行符或者指定分割符的部分需要用戶自行進(jìn)行處理。這里以DelimiterBasedFrameDecoder為例進(jìn)行講解,如下是EchoServer中使用該類的代碼片段,其余部分與前面的例子中的完全一致:

          @Override
          protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
          ????String?delimiter?=?"_$";
          ????//?將delimiter設(shè)置到DelimiterBasedFrameDecoder中,經(jīng)過該解碼一器進(jìn)行處理之后,源數(shù)據(jù)將會
          ????//?被按照_$進(jìn)行分隔,這里1024指的是分隔的最大長度,即當(dāng)讀取到1024個字節(jié)的數(shù)據(jù)之后,若還是未
          ????//?讀取到分隔符,則舍棄當(dāng)前數(shù)據(jù)段,因為其很有可能是由于碼流紊亂造成的
          ????ch.pipeline().addLast(new?DelimiterBasedFrameDecoder(1024,
          ????????Unpooled.wrappedBuffer(delimiter.getBytes())));
          ????//?將分隔之后的字節(jié)數(shù)據(jù)轉(zhuǎn)換為字符串?dāng)?shù)據(jù)
          ????ch.pipeline().addLast(new?StringDecoder());
          ????//?這是我們自定義的一個編碼器,主要作用是在返回的響應(yīng)數(shù)據(jù)最后添加分隔符
          ????ch.pipeline().addLast(new?DelimiterBasedFrameEncoder(delimiter));
          ????//?最終處理數(shù)據(jù)并且返回響應(yīng)的handler
          ????ch.pipeline().addLast(new?EchoServerHandler());
          }

          上面pipeline的設(shè)置中,添加的解碼一器主要有DelimiterBasedFrameDecoder和StringDecoder,經(jīng)過這兩個處理器處理之后,接收到的字節(jié)流就會被分隔,并且轉(zhuǎn)換為字符串?dāng)?shù)據(jù),最終交由EchoServerHandler處理。這里DelimiterBasedFrameEncoder是我們自定義的編碼器,其主要作用是在返回的響應(yīng)數(shù)據(jù)之后添加分隔符。如下是該編碼器的源碼:

          public?class?DelimiterBasedFrameEncoder?extends?MessageToByteEncoder<String>?{

          ??private?String?delimiter;

          ??public?DelimiterBasedFrameEncoder(String?delimiter)?{
          ????this.delimiter?=?delimiter;
          ??}

          ??@Override
          ??protected?void?encode(ChannelHandlerContext?ctx,?String?msg,?ByteBuf?out)?
          ??????throws?Exception?
          {
          ????//?在響應(yīng)的數(shù)據(jù)后面添加分隔符
          ????ctx.writeAndFlush(Unpooled.wrappedBuffer((msg?+?delimiter).getBytes()));
          ??}
          }

          對于客戶端而言,這里的處理方式與服務(wù)端類似,其pipeline的添加方式如下:

          @Override
          protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
          ????String?delimiter?=?"_$";
          ????//?對服務(wù)端返回的消息通過_$進(jìn)行分隔,并且每次查找的最大大小為1024字節(jié)
          ????ch.pipeline().addLast(new?DelimiterBasedFrameDecoder(1024,?
          ????????Unpooled.wrappedBuffer(delimiter.getBytes())));
          ????//?將分隔之后的字節(jié)數(shù)據(jù)轉(zhuǎn)換為字符串
          ????ch.pipeline().addLast(new?StringDecoder());
          ????//?對客戶端發(fā)送的數(shù)據(jù)進(jìn)行編碼,這里主要是在客戶端發(fā)送的數(shù)據(jù)最后添加分隔符
          ????ch.pipeline().addLast(new?DelimiterBasedFrameEncoder(delimiter));
          ????//?客戶端發(fā)送數(shù)據(jù)給服務(wù)端,并且處理從服務(wù)端響應(yīng)的數(shù)據(jù)
          ????ch.pipeline().addLast(new?EchoClientHandler());
          }

          這里客戶端的處理方式與服務(wù)端基本一致,關(guān)于這里沒展示的代碼,其與示例一中的代碼完全一致,這里則不予展示。

          3.3 LengthFieldBasedFrameDecoder與LengthFieldPrepender

          這里LengthFieldBasedFrameDecoderLengthFieldPrepender需要配合起來使用,其實本質(zhì)上來講,這兩者一個是解碼,一個是編碼的關(guān)系。它們處理粘拆包的主要思想是在生成的數(shù)據(jù)包中添加一個長度字段,用于記錄當(dāng)前數(shù)據(jù)包的長度。LengthFieldBasedFrameDecoder會按照參數(shù)指定的包長度偏移量數(shù)據(jù)對接收到的數(shù)據(jù)進(jìn)行解碼,從而得到目標(biāo)消息體數(shù)據(jù);而LengthFieldPrepender則會在響應(yīng)的數(shù)據(jù)前面添加指定的字節(jié)數(shù)據(jù),這個字節(jié)數(shù)據(jù)中保存了當(dāng)前消息體的整體字節(jié)數(shù)據(jù)長度。LengthFieldBasedFrameDecoder的解碼過程如下圖所示:

          LengthFieldPrepender的編碼過程如下圖所示:

          關(guān)于LengthFieldBasedFrameDecoder,這里需要對其構(gòu)造函數(shù)參數(shù)進(jìn)行介紹:

          • maxFrameLength:指定了每個包所能傳遞的最大數(shù)據(jù)包大小;

          • lengthFieldOffset:指定了長度字段在字節(jié)碼中的偏移量;

          • lengthFieldLength:指定了長度字段所占用的字節(jié)長度;

          • lengthAdjustment:對一些不僅包含有消息頭和消息體的數(shù)據(jù)進(jìn)行消息頭的長度的調(diào)整,這樣就可以只得到消息體的數(shù)據(jù),這里的lengthAdjustment指定的就是消息頭的長度;

          • initialBytesToStrip:對于長度字段在消息頭中間的情況,可以通過initialBytesToStrip忽略掉消息頭以及長度字段占用的字節(jié)。

          這里我們以json序列化為例對LengthFieldBasedFrameDecoderLengthFieldPrepender的使用方式進(jìn)行講解。如下是EchoServer的源碼:

          public?class?EchoServer?{

          ??public?void?bind(int?port)?throws?InterruptedException?{
          ????EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();
          ????EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();
          ????try?{
          ??????ServerBootstrap?bootstrap?=?new?ServerBootstrap();
          ??????bootstrap.group(bossGroup,?workerGroup)
          ????????.channel(NioServerSocketChannel.class)
          ????????.option(ChannelOption.SO_BACKLOG,?1024)
          ????????.handler(new?LoggingHandler(LogLevel.INFO))
          ????????.childHandler(new?ChannelInitializer()?{
          ??????????@Override
          ??????????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
          ????????????//?這里將LengthFieldBasedFrameDecoder添加到pipeline的首位,因為其需要對接收到的數(shù)據(jù)
          ????????????//?進(jìn)行長度字段解碼,這里也會對數(shù)據(jù)進(jìn)行粘包和拆包處理
          ????????????ch.pipeline().addLast(new?LengthFieldBasedFrameDecoder(1024,?0,?2,?0,?2));
          ????????????//?LengthFieldPrepender是一個編碼器,主要是在響應(yīng)字節(jié)數(shù)據(jù)前面添加字節(jié)長度字段
          ????????????ch.pipeline().addLast(new?LengthFieldPrepender(2));
          ????????????//?對經(jīng)過粘包和拆包處理之后的數(shù)據(jù)進(jìn)行json反序列化,從而得到User對象
          ????????????ch.pipeline().addLast(new?JsonDecoder());
          ????????????//?對響應(yīng)數(shù)據(jù)進(jìn)行編碼,主要是將User對象序列化為json
          ????????????ch.pipeline().addLast(new?JsonEncoder());
          ????????????//?處理客戶端的請求的數(shù)據(jù),并且進(jìn)行響應(yīng)
          ????????????ch.pipeline().addLast(new?EchoServerHandler());
          ??????????}
          ????????});

          ??????ChannelFuture?future?=?bootstrap.bind(port).sync();
          ??????future.channel().closeFuture().sync();
          ????}?finally?{
          ??????bossGroup.shutdownGracefully();
          ??????workerGroup.shutdownGracefully();
          ????}
          ??}

          ??public?static?void?main(String[]?args)?throws?InterruptedException?{
          ????new?EchoServer().bind(8080);
          ??}
          }

          這里EchoServer主要是在pipeline中添加了兩個編碼器和兩個解碼一器,編碼器主要是負(fù)責(zé)將響應(yīng)的User對象序列化為json對象,然后在其字節(jié)數(shù)組前面添加一個長度字段的字節(jié)數(shù)組;解碼一器主要是對接收到的數(shù)據(jù)進(jìn)行長度字段的解碼,然后將其反序列化為一個User對象。下面是JsonDecoder的源碼:

          public?class?JsonDecoder?extends?MessageToMessageDecoder<ByteBuf>?{

          ??@Override
          ??protected?void?decode(ChannelHandlerContext?ctx,?ByteBuf?buf,?List?out)?
          ??????throws?Exception?{
          ????byte[]?bytes?=?new?byte[buf.readableBytes()];
          ????buf.readBytes(bytes);
          ????User?user?=?JSON.parseObject(new?String(bytes,?CharsetUtil.UTF_8),?User.class);
          ????out.add(user);
          ??}
          }

          JsonDecoder首先從接收到的數(shù)據(jù)流中讀取字節(jié)數(shù)組,然后將其反序列化為一個User對象。下面我們看看JsonEncoder的源碼:

          public?class?JsonEncoder?extends?MessageToByteEncoder<User>?{

          ??@Override
          ??protected?void?encode(ChannelHandlerContext?ctx,?User?user,?ByteBuf?buf)
          ??????throws?Exception?
          {
          ????String?json?=?JSON.toJSONString(user);
          ????ctx.writeAndFlush(Unpooled.wrappedBuffer(json.getBytes()));
          ??}
          }

          JsonEncoder將響應(yīng)得到的User對象轉(zhuǎn)換為一個json對象,然后寫入響應(yīng)中。對于EchoServerHandler,其主要作用就是接收客戶端數(shù)據(jù),并且進(jìn)行響應(yīng),如下是其源碼:

          public?class?EchoServerHandler?extends?SimpleChannelInboundHandler<User>?{

          ??@Override
          ??protected?void?channelRead0(ChannelHandlerContext?ctx,?User?user)?throws?Exception?{
          ????System.out.println("receive?from?client:?"?+?user);
          ????ctx.write(user);
          ??}
          }

          對于客戶端,其主要邏輯與服務(wù)端的基本類似,這里主要展示其pipeline的添加方式,以及最后發(fā)送請求,并且對服務(wù)器響應(yīng)進(jìn)行處理的過程:

          @Override
          protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
          ????ch.pipeline().addLast(new?LengthFieldBasedFrameDecoder(1024,?0,?2,?0,?2));
          ????ch.pipeline().addLast(new?LengthFieldPrepender(2));
          ????ch.pipeline().addLast(new?JsonDecoder());
          ????ch.pipeline().addLast(new?JsonEncoder());
          ????ch.pipeline().addLast(new?EchoClientHandler());
          }
          public?class?EchoClientHandler?extends?SimpleChannelInboundHandler<User>?{

          ??@Override
          ??public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
          ????ctx.write(getUser());
          ??}

          ??private?User?getUser()?{
          ????User?user?=?new?User();
          ????user.setAge(27);
          ????user.setName("zhangxufeng");
          ????return?user;
          ??}

          ??@Override
          ??protected?void?channelRead0(ChannelHandlerContext?ctx,?User?user)?throws?Exception?{
          ????System.out.println("receive?message?from?server:?"?+?user);
          ??}
          }

          這里客戶端首先會在連接上服務(wù)器時,往服務(wù)器發(fā)送一個User對象數(shù)據(jù),然后在接收到服務(wù)器響應(yīng)之后,會打印服務(wù)器響應(yīng)的數(shù)據(jù)。

          3.4 自定義粘包與拆包器

          對于粘包與拆包問題,其實前面三種基本上已經(jīng)能夠滿足大多數(shù)情形了,但是對于一些更加復(fù)雜的協(xié)議,可能有一些定制化的需求。對于這些場景,其實本質(zhì)上,我們也不需要手動從頭開始寫一份粘包與拆包處理器,而是通過繼承LengthFieldBasedFrameDecoderLengthFieldPrepender來實現(xiàn)粘包和拆包的處理。

          如果用戶確實需要不通過繼承的方式實現(xiàn)自己的粘包和拆包處理器,這里可以通過實現(xiàn)MessageToByteEncoderByteToMessageDecoder來實現(xiàn)。這里MessageToByteEncoder的作用是將響應(yīng)數(shù)據(jù)編碼為一個ByteBuf對象,而ByteToMessageDecoder則是將接收到的ByteBuf數(shù)據(jù)轉(zhuǎn)換為某個對象數(shù)據(jù)。通過實現(xiàn)這兩個抽象類,用戶就可以達(dá)到實現(xiàn)自定義粘包和拆包處理的目的。如下是這兩個類及其抽象方法的聲明:

          public?abstract?class?ByteToMessageDecoder?extends?ChannelInboundHandlerAdapter?{
          ????protected?abstract?void?decode(ChannelHandlerContext?ctx,?ByteBuf?in,?List?out)?
          ????????throws?Exception;
          }
          public?abstract?class?MessageToByteEncoder<I>?extends?ChannelOutboundHandlerAdapter?{
          ????protected?abstract?void?encode(ChannelHandlerContext?ctx,?I?msg,?ByteBuf?out)?
          ????????throws?Exception
          ;
          }

          4. 小結(jié)

          本文首先對粘包和拆包的問題原理進(jìn)行描述,幫助讀者理解粘包和拆包問題所在。然后對處理粘包和拆包的幾種常用解決方案進(jìn)行講解。接著通過輔以示例的方式對Netty提供的幾種解決粘包和拆包問題的解決方案進(jìn)行了詳細(xì)講解。

          往期推薦

          Mybatis 框架下 SQL 注入攻擊的 3 種方式,真是防不勝防!

          漫漫優(yōu)化路,總會錯幾步!記一次接口優(yōu)化!

          SQL 查詢總是先執(zhí)行SELECT語句嗎?你們都錯了!

          曝光 Facebook 內(nèi)部高效工作 PPT 指南

          支付寶的架構(gòu)到底有多牛逼?


          歡迎加入我的知識星球,聊技術(shù)、說職場、侃社會。

          頭發(fā)很多的中年程序員DD和他的朋友們在這里期待你的到來!

          加入方式:長按下方二維碼噢


          我的星球是否適合你?

          點(diǎn)擊閱讀原文看看我們都聊過啥?

          瀏覽 49
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                    <th id="afajh"><progress id="afajh"></progress></th>
                    深爱五月综合网 | 成人网站在线视频三级 | 成人吧欧美色图 | 黄色视频网站免费观看 | 国产黄色直播 |