<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Netty 解決粘包半包問題

          共 6200字,需瀏覽 13分鐘

           ·

          2020-08-19 11:15


          來源:SegmentFault 思否社區(qū)

          作者:y豬




          一.什么是TCP粘包半包


          客戶端發(fā)送數(shù)據(jù)包給服務(wù)端,因服務(wù)端一次讀取到的字節(jié)數(shù)是不確定的,有好幾種情況.


          1. 服務(wù)端分兩次讀取到了兩個獨立的數(shù)據(jù)包,沒有粘包和拆包;
          2. 服務(wù)端一次接收到了兩個數(shù)據(jù)包,粘合在一起,被稱為 TCP 粘包;
          3. 服務(wù)端分兩次讀取到了兩個數(shù)據(jù)包, 第一次讀取到了完整的包和另外一個包的部分內(nèi)容,第二次讀取到了另一個包的剩余內(nèi)容, 這被稱為 TCP 拆包;
          4. 服務(wù)端分兩次讀取到了兩個數(shù)據(jù)包, 第一次讀取到了包的部分內(nèi)容 , 第二次讀取到了之前未讀完的包剩余內(nèi)容和另一個包,發(fā)生了拆包和粘包。
          5. 服務(wù)端 TCP 接收滑動窗口很小,數(shù)據(jù)包比較大, 即服務(wù)端分多次才能將 包接收完全,發(fā)生多次拆包




          二.粘包半包的原因


          1.粘包


          TCP協(xié)議:本身是 面向連接的可靠地協(xié)議-三次握手機制。


          客戶端與服務(wù)器會維持一個連接(Channel) ,在連接不斷開的情況下, 可以將多個數(shù)據(jù)包發(fā)往服務(wù)器,但是發(fā)送的網(wǎng)絡(luò)數(shù)據(jù)包太小, 那么本身會啟用 Nagle 算法(可配置是否啟用) 對較小的數(shù)據(jù)包進行合并(因此,TCP 的網(wǎng)絡(luò)延遲要 UDP 的高些)然后再發(fā)送(超時或者包大小足夠)。


          服務(wù)器在接收到消息(數(shù)據(jù)流)的時候就無法區(qū)分哪些數(shù)據(jù)包是客戶端自己分開發(fā)送的,這樣產(chǎn)生了粘包;


          服務(wù)器在接收到數(shù)據(jù)后,放到緩沖區(qū)中,如果消息沒有被及時從緩存區(qū)取走,下次在取數(shù)據(jù)的時候可能就會出現(xiàn)一次取出多個


          數(shù)據(jù)包的情況,造成粘包現(xiàn)象。


          UDP:本身作為無連接的不可靠的傳輸協(xié)議(適合頻繁發(fā)送較小的數(shù)據(jù)包) , 他不會對數(shù)據(jù)包進行合并發(fā)送,直接是一端發(fā)送什么數(shù)據(jù), 直接就發(fā)出去了, 既然他不會對數(shù)據(jù)合并, 每一個數(shù)據(jù)包都是完整的(數(shù)據(jù)+UDP 頭+IP 頭等等發(fā)一 次數(shù)據(jù)封裝一次) 也就沒有粘包了。


          2.半包


          分包產(chǎn)生的原因:可能是IP分片傳輸導致的, 也可能是傳輸過程中丟失部 分包導致出現(xiàn)的半包, 還有可能就是一個包可能被分成了兩次傳輸, 在取數(shù)據(jù)的時候,先取到了一部分(還可能與接收的緩沖區(qū)大小有關(guān)系) , 總之就是一個數(shù)據(jù)包被分成了多次接收。


          更具體的原因有三個, 分別如下。


          1. 應(yīng)用程序?qū)懭霐?shù)據(jù)的字節(jié)大小大于套接字發(fā)送緩沖區(qū)的大小
          2. 進行 MSS 大小的 TCP 分段。MSS 是最大報文段長度的縮寫。MSS 是 TCP 報文段中的數(shù)據(jù)字段的最大長度。數(shù)據(jù)字段加上 TCP 首部才等于整個的 TCP 報文段。所以 MSS 并不是


          TCP 報文段的最大長度, 而是:MSS=TCP 報文段長度-TCP 首部長度


          1. 以太網(wǎng)的 payload 大于 MTU 進行 IP 分片。MTU 指:一種通信協(xié)議的某一層上面所能


          通過的最大數(shù)據(jù)包大小。如果 IP 層有一個數(shù)據(jù)包要傳, 而且數(shù)據(jù)的長度比鏈路層的 MTU 大,那么 IP 層就會進行分片, 把數(shù)據(jù)包分成托干片, 讓每一片都不超過 MTU。注意, IP 分片可


          以發(fā)生在原始發(fā)送端主機上, 也可以發(fā)生在中間路由器上。


          3.解決粘包半包問題


          由于底層的 TCP 無法理解上層的業(yè)務(wù)數(shù)據(jù), 所以在底層是無法保證數(shù)據(jù)包不被拆分和重組的, 這個問題只能通過上層的應(yīng)用協(xié)議棧設(shè)計來解決。業(yè)界的主流協(xié)議的解決方案,可以歸納如下。


          (1) 在包尾增加分割符, 比如回車換行符進行分割, 例如 FTP 協(xié)議;
          (2) 消息定長, 例如每個報文的大小為固定長度 200 字節(jié), 如果不夠, 空位補空格;
          (3) 將消息分為消息頭和消息體, 消息頭中包含表示消息總長度(或者消息體長度)的字段, 通常設(shè)計思路為消息頭的第一個字段使用 int32 來表示消息的總長度,LengthFieldBasedFrameDecoder。


          下面列舉一個包尾增加分隔符的例子:

          服務(wù)端程序:


          import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;
          import java.util.concurrent.atomic.AtomicInteger;
          /** * 入站處理器 */@ChannelHandler.Sharablepublic class DelimiterServerHandler extends ChannelInboundHandlerAdapter {
          private AtomicInteger counter = new AtomicInteger(0); private AtomicInteger completeCounter = new AtomicInteger(0);
          /*** 服務(wù)端讀取到網(wǎng)絡(luò)數(shù)據(jù)后的處理*/ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf)msg; String request = in.toString(CharsetUtil.UTF_8); System.out.println("Server Accept["+request +"] and the counter is:"+counter.incrementAndGet()); String resp = "Hello,"+request+". Welcome to Netty World!" + DelimiterEchoServer.DELIMITER_SYMBOL; ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes())); }
          /*** 服務(wù)端讀取完成網(wǎng)絡(luò)數(shù)據(jù)后的處理*/ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); System.out.println("the ReadComplete count is " +completeCounter.incrementAndGet()); }
          /*** 發(fā)生異常后的處理*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}


          import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;
          import java.net.InetSocketAddress;
          /** * 服務(wù)端 */public class DelimiterEchoServer {
          public static final String DELIMITER_SYMBOL = "@~"; public static final int PORT = 9997;
          public static void main(String[] args) throws InterruptedException { DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer(); System.out.println("服務(wù)器即將啟動"); delimiterEchoServer.start(); }
          public void start() throws InterruptedException { final DelimiterServerHandler serverHandler = new DelimiterServerHandler(); EventLoopGroup group = new NioEventLoopGroup();/*線程組*/ try { ServerBootstrap b = new ServerBootstrap();/*服務(wù)端啟動必須*/ b.group(group)/*將線程組傳入*/ .channel(NioServerSocketChannel.class)/*指定使用NIO進行網(wǎng)絡(luò)傳輸*/ .localAddress(new InetSocketAddress(PORT))/*指定服務(wù)器監(jiān)聽端口*/ /*服務(wù)端每接收到一個連接請求,就會新啟一個socket通信,也就是channel, 所以下面這段代碼的作用就是為這個子channel增加handle*/ .childHandler(new ChannelInitializerImp()); ChannelFuture f = b.bind().sync();/*異步綁定到服務(wù)器,sync()會阻塞直到完成*/ System.out.println("服務(wù)器啟動完成,等待客戶端的連接和數(shù)據(jù)....."); f.channel().closeFuture().sync();/*阻塞直到服務(wù)器的channel關(guān)閉*/ } finally { group.shutdownGracefully().sync();/*優(yōu)雅關(guān)閉線程組*/ } }
          private static class ChannelInitializerImp extends ChannelInitializer {
          @Override protected void initChannel(Channel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL .getBytes()); ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new DelimiterServerHandler()); } }
          }


          客戶端程序


          import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.util.CharsetUtil;

          /** * 入站處理器 */public class DelimiterClientHandler extends SimpleChannelInboundHandler {
          private AtomicInteger counter = new AtomicInteger(0);
          /*** 客戶端讀取到網(wǎng)絡(luò)數(shù)據(jù)后的處理*/ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8) +"] and the counter is:"+counter.incrementAndGet()); }
          /*** 客戶端被通知channel活躍后,做事*/ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf msg = null; String request = "Mark,Lison,Peter,James,Deer" + DelimiterEchoServer.DELIMITER_SYMBOL; for(int i=0;i<10;i++){ msg = Unpooled.buffer(request.length()); msg.writeBytes(request.getBytes()); ctx.writeAndFlush(msg); } }
          /*** 發(fā)生異常后的處理*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}


          import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;
          /** * 客戶端 */public class DelimiterEchoClient {
          private final String host;
          public DelimiterEchoClient(String host) { this.host = host; }
          public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup();/*線程組*/ try { final Bootstrap b = new Bootstrap();;/*客戶端啟動必須*/ b.group(group)/*將線程組傳入*/ .channel(NioSocketChannel.class)/*指定使用NIO進行網(wǎng)絡(luò)傳輸*/ .remoteAddress(new InetSocketAddress(host,DelimiterEchoServer.PORT))/*配置要連接服務(wù)器的ip地址和端口*/ .handler(new ChannelInitializerImp()); ChannelFuture f = b.connect().sync(); System.out.println("已連接到服務(wù)器....."); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } }
          private static class ChannelInitializerImp extends ChannelInitializer {
          @Override protected void initChannel(Channel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer(DelimiterEchoServer.DELIMITER_SYMBOL .getBytes()); ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new DelimiterClientHandler()); } }
          public static void main(String[] args) throws InterruptedException { new DelimiterEchoClient("127.0.0.1").start(); }}


          關(guān)鍵代碼:


          1.建立連接后,客戶端給服務(wù)端發(fā)數(shù)據(jù)包,每次發(fā)送已特殊字符`@~結(jié)尾。


          2.服務(wù)端收到數(shù)據(jù)包后經(jīng)過DelimiterBasedFrameDecoder即分隔符基礎(chǔ)框架解碼器解碼為一個個帶有分隔符的數(shù)據(jù)包。


          3.再到服務(wù)端的業(yè)務(wù)層處理器DelimiterServerHandler





          點擊左下角閱讀原文,到?SegmentFault 思否社區(qū)?和文章作者展開更多互動和交流。

          -?END -

          瀏覽 35
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  黑人操亚洲比大片 | 91人妻无码精品蜜桃 | 日韩无码电影网站 | 性感美女视频一二三 | 亚洲va国产 |