<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Netty 的編碼 解碼 案例

          共 29633字,需瀏覽 60分鐘

           ·

          2021-07-15 00:46

          0x01:半包粘包

          例如發(fā)送兩個數(shù)據(jù)包給服務器,由于服務端一次讀取到的字節(jié)數(shù)不一定的分

          沒有半包和拆包:服務器分兩次讀取到兩個地理的數(shù)據(jù)包,這個情況沒有拆包和粘包的情況

          • 粘包:服務器一次收到兩個數(shù)據(jù)包,在一起收到的

          • 拆包:第一次讀取到完成的第一個包和第二個包的一部分內容,第二次讀取到第二個包的剩余內容

          • 整包:第一次讀取到第一包的部分內容,第二次讀取到第一個包的剩余部分和第二個包的全部

          • 多次拆包:如果接收滑窗非常小,數(shù)據(jù)量大的時候發(fā)生多次發(fā)送的接收的情況

          為什么會出現(xiàn)半包和粘包

          1、HTTP 中有一個 Nagle 算法,每個報文都是一段的,使用網(wǎng)絡發(fā)送發(fā)現(xiàn)網(wǎng)絡效率低,然后 HTTP 設置一個算法,設置到一定程度發(fā),所以出現(xiàn)一些延時,提高銷量,所以形成了粘包

          2、HTTP緩沖區(qū)引起的,報文段大的時候的時候直接弄在一起發(fā)送過去。

          怎么解決

          不斷的從 TCP 的緩沖區(qū)中讀取數(shù)據(jù),每次讀取完成都需要判斷是否是一個完整的數(shù)據(jù)包

          如果是讀取的數(shù)據(jù)不足以拼接成一個完整的業(yè)務數(shù)據(jù)包,那就保留該數(shù)據(jù),繼續(xù)從 TCP 緩沖區(qū)中讀取,直到得到一個完整的數(shù)據(jù)包

          定長
          分隔符
          基于長度的變長包

          如果當前督導的數(shù)據(jù)加上已經(jīng)讀取到的數(shù)據(jù)足以拼接成一個數(shù)據(jù)包,那就講已經(jīng)讀取的數(shù)據(jù)拼接本次讀取的數(shù)據(jù),構成一個完整的業(yè)務數(shù)據(jù)包傳遞到業(yè)務邏輯上,多余的數(shù)據(jù)保留,方便下次的讀取或者數(shù)據(jù)鏈接。


          0x02:Netty常用的編碼器

          • LineBasedFrameDecoder

          回車換行編碼器
          配合StringDecoder
          • DelimiterBasedFrameDecoder

          分隔符解碼器
          • FixedLengthFrameDecoder

          固定長度解碼器
          • LengthFieldBasedFrameDecoder

          不能超過1024個字節(jié)不然會報錯
          基于'長度'解碼器(私有協(xié)議最常用)


          0x03:拆包的類

          • ByteToMessageDecoder

          自解析
          • LengthFieldPrepender

          長度編碼器
          • Netty拆包的基類 - ByteToMessageDecoder

          內部維護了一個數(shù)據(jù)累積器cumulation,每次讀取到數(shù)據(jù)都會不斷累加,然后嘗試對累加到
          的數(shù)據(jù)進行拆包,拆成一個完整的業(yè)務數(shù)據(jù)包
          每次都將讀取到的數(shù)據(jù)通過內存拷貝的方式, 累積到cumulation中

          調用子類的 decode 方法對累積的數(shù)據(jù)嘗試進行拆包

          • LengthFieldBasedFrameDecoder

          參數(shù)說明

          maxFrameLength:包的最大長度
          lengthFieldOffset:長度屬性的起始位(偏移位),包中存放長度屬性字段的起始位置
          lengthFieldLength:長度屬性的長度 
          lengthAdjustment:長度調節(jié)值,在總長被定義為包含包頭長度時,修正信息長度
          initialBytesToStrip:跳過的字節(jié)數(shù),根據(jù)需要跳過lengthFieldLength個字節(jié),以便接收端直接接受到不含“長度屬性”的內容
          • LengthFieldPrepender 編碼器

          參數(shù)說明

          lengthFieldLength:長度屬性的字節(jié)長度
          lengthIncludesLengthFieldLength:false,長度字節(jié)不算在總長度中,true,算到總長度中

          編解碼器的作用就是講原始字節(jié)數(shù)據(jù)與自定義的消息對象進行互轉

          Decoder(解碼器)
          Encoder(編碼器)

          支持業(yè)界主流的序列化框架

          Protobuf
          Jboss Marshalling
          Java Serialization

          解碼1拆包:把整個 ByteBuf 數(shù)據(jù),分成一個個 ByteBuf,每個表示一個包

          解碼2反序列化:把每個包的 ByteBuf 字節(jié)數(shù)組轉成 java object

          package com.demo;

          import io.netty.bootstrap.Bootstrap;
          import io.netty.buffer.Unpooled;
          import io.netty.channel.ChannelFuture;
          import io.netty.channel.ChannelInitializer;
          import io.netty.channel.ChannelOption;
          import io.netty.channel.EventLoopGroup;
          import io.netty.channel.nio.NioEventLoopGroup;
          import io.netty.channel.socket.SocketChannel;
          import io.netty.channel.socket.nio.NioSocketChannel;
          import io.netty.handler.codec.DelimiterBasedFrameDecoder;

          public class StickyDemoClient {

              public static void main(String[] args) throws Exception {
                  int port = 8080;
                  if (args != null && args.length > 0) {
                      try {
                          port = Integer.valueOf(args[0]);
                      } catch (NumberFormatException e) {
                       }
                  }
                  new StickyDemoClient().connect(port, "127.0.0.1");
              }

              public void connect(int port, String host) throws Exception {
                  // 工作線程組 
                  EventLoopGroup group = new NioEventLoopGroup();
                  try {
                      Bootstrap b = new Bootstrap();
                      b.group(group).channel(NioSocketChannel.class)
                              .option(ChannelOption.TCP_NODELAY, true)
                              .handler(new ChannelInitializer<SocketChannel>() {
                                  @Override
                                  public void initChannel(SocketChannel ch) throws Exception {
                                      //ch.pipeline().addLast("framer", new FixedLengthFrameDecoder(139));
                                     // ch.pipeline().addLast("framer", new StickyDemoDecodeHandlerV2(
                                  //       Unpooled.wrappedBuffer(new byte[] { '#' })));
                                      ch.pipeline().addLast("framer"new DelimiterBasedFrameDecoder(8192,
                                              Unpooled.wrappedBuffer(new byte[] { '#' })));

                                      ch.pipeline().addLast(new StickyDemoClientHandler());

                                  }
                              });

                      // 發(fā)起異步連接操作
                      ChannelFuture f = b.connect(host, port).sync();

                      // 等待客戶端鏈路關閉
                      f.channel().closeFuture().sync();
                  } finally {
                      // 優(yōu)雅退出,釋放線程池資源
                      group.shutdownGracefully();
                  }
              }
          }


          package com.demo;

          import io.netty.buffer.ByteBuf;
          import io.netty.buffer.Unpooled;
          import io.netty.channel.ChannelHandlerContext;
          import io.netty.channel.SimpleChannelInboundHandler;
          import io.netty.util.CharsetUtil;

          public class StickyDemoClientHandler extends SimpleChannelInboundHandler<ByteBuf{

              private static String[] alphabets = {"A""B""C""D""E""F""G""H""I",
                                            "J""K""L""M""N""O""P"};

              @Override
              public void channelActive(ChannelHandlerContext ctx) {
                  for(int i=0; i<10; i++) {
                      StringBuilder builder = new StringBuilder();
                      builder.append("這是第");
                      builder.append(i);
                      builder.append("條消息, 內容是:");
                      for(int j=0; j<100; j++) {
                          builder.append(alphabets[i]);
                      }
                      builder.append("......");
                      builder.append("#");


                      System.out.println(builder.toString().getBytes().length);

                      ctx.writeAndFlush(Unpooled.copiedBuffer(builder.toString(),
                              CharsetUtil.UTF_8));
                  }
              }

              @Override
              public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
                  System.out.println("客戶端接收到消息:" + in.toString(CharsetUtil.UTF_8));
              }

              @Override
              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                  cause.printStackTrace();
                  ctx.close();
              }
          }


          package com.demo;

          import io.netty.buffer.ByteBuf;
          import io.netty.buffer.ByteBufAllocator;
          import io.netty.channel.ChannelHandlerContext;
          import io.netty.channel.ChannelInboundHandlerAdapter;

          import java.util.ArrayList;
          import java.util.List;

          public class StickyDemoDecodeHandler extends ChannelInboundHandlerAdapter {

              //存放待拆包數(shù)據(jù)的緩沖區(qū)
              private ByteBuf cache;
              private int frameLength;

              public StickyDemoDecodeHandler(int length) {
                  this.frameLength = length;
              }

              static ByteBuf expandCache(ByteBufAllocator alloc, ByteBuf cache, int readable) {
                  ByteBuf oldCache = cache;
                  cache = alloc.buffer(oldCache.readableBytes() + readable);
                  cache.writeBytes(oldCache);
                  oldCache.release();
                  return cache;
              }

              @Override
              public void channelRead(ChannelHandlerContext ctx, Object msg) {

                  ByteBuf data = (ByteBuf) msg;
                  try {
                      //讀取每一個消息,創(chuàng)建緩沖區(qū)
                      if (cache == null) {
                          cache = ctx.alloc().buffer(1024);
                      } else {
                          //如果現(xiàn)有的緩沖區(qū)容量太小,無法容納原有數(shù)據(jù)+新讀入的數(shù)據(jù),就擴容(重新創(chuàng)建一個大的,并把數(shù)據(jù)拷貝過去)
                          if (cache.writerIndex() > cache.maxCapacity() - data.readableBytes()) {
                              cache = expandCache(ctx.alloc(), cache, data.readableBytes());
                          }
                      }
                      //把新的數(shù)據(jù)讀入緩沖區(qū)
                      cache.writeBytes(data);

                      //每次讀取frameLength(定長)的數(shù)據(jù),做為一個包,存儲起來 
                      List<ByteBuf> output = new ArrayList<>();
                      while (cache.readableBytes() >= frameLength) {
                          output.add(cache.readBytes(frameLength));
                      }

                      //還有部分數(shù)據(jù)不夠一個包,10, 15, 一個10個,還剩5個
                      if (cache.isReadable()) {
                          cache.discardReadBytes();
                      }

                      for (int i = 0; i < output.size(); i++) {
                          ctx.fireChannelRead(output.get(i));
                      }
                  } finally {
                      data.release();
                  }

              }


              @Override
              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                  cause.printStackTrace();
                  ctx.close();
              }
          }


          package com.demo;

          import io.netty.buffer.ByteBuf;
          import io.netty.buffer.ByteBufAllocator;
          import io.netty.channel.ChannelHandlerContext;
          import io.netty.channel.ChannelInboundHandlerAdapter;

          import java.util.ArrayList;
          import java.util.List;

          public class StickyDemoDecodeHandlerV2 extends ChannelInboundHandlerAdapter {
              private ByteBuf cache;
              private byte delimiter; //包分隔符

              public StickyDemoDecodeHandlerV2(ByteBuf delimiter) {
                  if (delimiter == null) {
                      throw new NullPointerException("delimiter");
                  }
                  if (!delimiter.isReadable()) {
                      throw new IllegalArgumentException("empty delimiter");
                  }

                  this.delimiter =  delimiter.readByte();
                  ;
              }

              static ByteBuf expandCache(ByteBufAllocator alloc, ByteBuf cache, int readable) {
                  ByteBuf oldCache = cache;
                  cache = alloc.buffer(oldCache.readableBytes() + readable);
                  cache.writeBytes(oldCache);
                  oldCache.release();
                  return cache;
              }

              @Override
              public void channelRead(ChannelHandlerContext ctx, Object msg) {

                  ByteBuf data = (ByteBuf) msg;
                  try {
                      if (cache == null) {
                          cache = ctx.alloc().buffer(1024);
                      } else {
                          if (cache.writerIndex() > cache.maxCapacity() - data.readableBytes()) {
                              cache = expandCache(ctx.alloc(), cache, data.readableBytes());
                          }
                      }
                      cache.writeBytes(data);

                      List<ByteBuf> output = new ArrayList<>();

                      int frameIndex = 0;
                      int frameEndIndex = 0;
                      int length = cache.readableBytes();
                      while (frameIndex <= length) {
                          frameEndIndex = cache.indexOf(frameIndex + 1, length, delimiter);

                          if (frameEndIndex == -1) {
                              cache.discardReadBytes();
                              break;
                          }

                          output.add(cache.readBytes(frameEndIndex - frameIndex));
                          cache.skipBytes(1);
                          frameIndex = frameEndIndex + 1;

                      }

                      if (cache.isReadable()) {
                          cache.discardReadBytes();
                      }

                      for (int i = 0; i < output.size(); i++) {
                          ctx.fireChannelRead(output.get(i));
                      }
                  } finally {
                      data.release();
                  }

              }

              @Override
              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                  cause.printStackTrace();
                  ctx.close();
              }

          }


          package com.demo;

          import io.netty.bootstrap.ServerBootstrap;
          import io.netty.buffer.Unpooled;
          import io.netty.channel.ChannelFuture;
          import io.netty.channel.ChannelInitializer;
          import io.netty.channel.EventLoopGroup;
          import io.netty.channel.nio.NioEventLoopGroup;
          import io.netty.channel.socket.SocketChannel;
          import io.netty.channel.socket.nio.NioServerSocketChannel;
          import io.netty.handler.codec.DelimiterBasedFrameDecoder;

          public class StickyDemoServer {

              public static void main(String[] args) throws Exception {
                  int port = 8080;
                  if (args != null && args.length > 0) {
                      try {
                          port = Integer.valueOf(args[0]);
                      } catch (NumberFormatException e) {
                          // 采用默認值
                      }
                  }
                  new StickyDemoServer().bind(port);
              }

              public void bind(int port) throws Exception {
                  // 第一步:
                  // 配置服務端的NIO線程組
                  // 主線程組, 用于接受客戶端的連接,但是不做任何具體業(yè)務處理,像老板一樣,負責接待客戶,不具體服務客戶
                  EventLoopGroup bossGroup = new NioEventLoopGroup(1);
                  // 工作線程組, 老板線程組會把任務丟給他,讓手下線程組去做任務,服務客戶
                  EventLoopGroup workerGroup = new NioEventLoopGroup();
                  try {
                      // 類ServerBootstrap用于配置Server相關參數(shù),并啟動Server
                      ServerBootstrap b = new ServerBootstrap();

                      // 鏈式調用
                      // 配置parentGroup和childGroup
                      b.group(bossGroup, workerGroup)
                              // 配置Server通道
                              .channel(NioServerSocketChannel.class)
                              // 配置通道的ChannelPipeline
                              .childHandler(new ChildChannelHandler());

                      // 綁定端口,并啟動server,同時設置啟動方式為同步
                      ChannelFuture f = b.bind(port).sync();

                      System.out.println(StickyDemoServer.class.getName() + " 啟動成功,在地址[" + f.channel().localAddress() + "]上等待客戶請求......");

                      // 等待服務端監(jiān)聽端口關閉
                      f.channel().closeFuture().sync();
                  } finally {
                      // 優(yōu)雅退出,釋放線程池資源
                      bossGroup.shutdownGracefully();
                      workerGroup.shutdownGracefully();
                  }
              }

              private class ChildChannelHandler extends ChannelInitializer<SocketChannel{
                  @Override
                  protected void initChannel(SocketChannel ch) throws Exception {
                      //ch.pipeline().addLast("framer", new FixedLengthFrameDecoder(139));
                      ch.pipeline().addLast("framer"new DelimiterBasedFrameDecoder(8192, Unpooled.wrappedBuffer(new byte[] { '#' })));
                       //ch.pipeline().addLast("framer", new StickyDemoDecodeHandler(139));
                      // ch.pipeline().addLast("framer", new StickyDemoDecodeHandlerV2(
                      //       Unpooled.wrappedBuffer(new byte[] { '#' })));

                      ch.pipeline().addLast(new StickyDemoServerHandler());
                  }
              }
          }


          package com.demo;

           import io.netty.buffer.ByteBuf;
          import io.netty.buffer.Unpooled;
          import io.netty.channel.ChannelFutureListener;
          import io.netty.channel.ChannelHandlerContext;
          import io.netty.channel.ChannelInboundHandlerAdapter;
          import io.netty.util.CharsetUtil;

           public class StickyDemoServerHandler extends ChannelInboundHandlerAdapter {

              @Override
              public void channelRead(ChannelHandlerContext ctx, Object msg) {
                  ByteBuf in = (ByteBuf) msg;
                  System.out.println(
                          "服務器接收到消息:" + in.toString(CharsetUtil.UTF_8));
                  ctx.write(in);
                 // ctx.write(Unpooled.copiedBuffer("#", CharsetUtil.UTF_8));
                  //compositeBuffer.addComponent(in);
                 // ByteBuf buf =  ctx.alloc().directBuffer();
                 // buf.writeBytes("#".getBytes());
                 // CompositeByteBuf compositeBuffer = ctx.alloc().compositeBuffer();
                //  compositeBuffer.addComponents(true, in, buf);


                 // ctx.write(compositeBuffer);
              }

              @Override
              public void channelReadComplete(ChannelHandlerContext ctx)
                      throws Exception 
          {
                  ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                          .addListener(ChannelFutureListener.CLOSE);
              }

              @Override
              public void exceptionCaught(ChannelHandlerContext ctx,
                  Throwable cause)
           
          {
                  cause.printStackTrace();
                  ctx.close();
              }
          }


          source:https://www.yuque.com/yangxinlei/lodfss/nguvm0

          喜歡,在看

          瀏覽 59
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产av激情无码 国产av最新福利 | 美女被艹视频网站 | 成人免费视频网 | 欧美性爱视频一区二区三区 | 欧美18禁黄免费网站 |