Netty 解決粘包半包問題

來源:SegmentFault 思否社區(qū)
作者:y豬
一.什么是TCP粘包半包
客戶端發(fā)送數(shù)據(jù)包給服務(wù)端,因服務(wù)端一次讀取到的字節(jié)數(shù)是不確定的,有好幾種情況.
服務(wù)端分兩次讀取到了兩個獨立的數(shù)據(jù)包,沒有粘包和拆包; 服務(wù)端一次接收到了兩個數(shù)據(jù)包,粘合在一起,被稱為 TCP 粘包; 服務(wù)端分兩次讀取到了兩個數(shù)據(jù)包, 第一次讀取到了完整的包和另外一個包的部分內(nèi)容,第二次讀取到了另一個包的剩余內(nèi)容, 這被稱為 TCP 拆包; 服務(wù)端分兩次讀取到了兩個數(shù)據(jù)包, 第一次讀取到了包的部分內(nèi)容 , 第二次讀取到了之前未讀完的包剩余內(nèi)容和另一個包,發(fā)生了拆包和粘包。 服務(wù)端 TCP 接收滑動窗口很小,數(shù)據(jù)包比較大, 即服務(wù)端分多次才能將 包接收完全,發(fā)生多次拆包
二.粘包半包的原因
1.粘包
TCP協(xié)議:本身是 面向連接的可靠地協(xié)議-三次握手機制。
客戶端與服務(wù)器會維持一個連接(Channel) ,在連接不斷開的情況下, 可以將多個數(shù)據(jù)包發(fā)往服務(wù)器,但是發(fā)送的網(wǎng)絡(luò)數(shù)據(jù)包太小, 那么本身會啟用 Nagle 算法(可配置是否啟用) 對較小的數(shù)據(jù)包進行合并(因此,TCP 的網(wǎng)絡(luò)延遲要 UDP 的高些)然后再發(fā)送(超時或者包大小足夠)。
服務(wù)器在接收到消息(數(shù)據(jù)流)的時候就無法區(qū)分哪些數(shù)據(jù)包是客戶端自己分開發(fā)送的,這樣產(chǎn)生了粘包;
服務(wù)器在接收到數(shù)據(jù)后,放到緩沖區(qū)中,如果消息沒有被及時從緩存區(qū)取走,下次在取數(shù)據(jù)的時候可能就會出現(xiàn)一次取出多個
數(shù)據(jù)包的情況,造成粘包現(xiàn)象。
UDP:本身作為無連接的不可靠的傳輸協(xié)議(適合頻繁發(fā)送較小的數(shù)據(jù)包) , 他不會對數(shù)據(jù)包進行合并發(fā)送,直接是一端發(fā)送什么數(shù)據(jù), 直接就發(fā)出去了, 既然他不會對數(shù)據(jù)合并, 每一個數(shù)據(jù)包都是完整的(數(shù)據(jù)+UDP 頭+IP 頭等等發(fā)一 次數(shù)據(jù)封裝一次) 也就沒有粘包了。
2.半包
分包產(chǎn)生的原因:可能是IP分片傳輸導致的, 也可能是傳輸過程中丟失部 分包導致出現(xiàn)的半包, 還有可能就是一個包可能被分成了兩次傳輸, 在取數(shù)據(jù)的時候,先取到了一部分(還可能與接收的緩沖區(qū)大小有關(guān)系) , 總之就是一個數(shù)據(jù)包被分成了多次接收。
更具體的原因有三個, 分別如下。
應(yīng)用程序?qū)懭霐?shù)據(jù)的字節(jié)大小大于套接字發(fā)送緩沖區(qū)的大小 進行 MSS 大小的 TCP 分段。MSS 是最大報文段長度的縮寫。MSS 是 TCP 報文段中的數(shù)據(jù)字段的最大長度。數(shù)據(jù)字段加上 TCP 首部才等于整個的 TCP 報文段。所以 MSS 并不是
TCP 報文段的最大長度, 而是:MSS=TCP 報文段長度-TCP 首部長度
以太網(wǎng)的 payload 大于 MTU 進行 IP 分片。MTU 指:一種通信協(xié)議的某一層上面所能
通過的最大數(shù)據(jù)包大小。如果 IP 層有一個數(shù)據(jù)包要傳, 而且數(shù)據(jù)的長度比鏈路層的 MTU 大,那么 IP 層就會進行分片, 把數(shù)據(jù)包分成托干片, 讓每一片都不超過 MTU。注意, IP 分片可
以發(fā)生在原始發(fā)送端主機上, 也可以發(fā)生在中間路由器上。
3.解決粘包半包問題
由于底層的 TCP 無法理解上層的業(yè)務(wù)數(shù)據(jù), 所以在底層是無法保證數(shù)據(jù)包不被拆分和重組的, 這個問題只能通過上層的應(yīng)用協(xié)議棧設(shè)計來解決。業(yè)界的主流協(xié)議的解決方案,可以歸納如下。
(1) 在包尾增加分割符, 比如回車換行符進行分割, 例如 FTP 協(xié)議;
(2) 消息定長, 例如每個報文的大小為固定長度 200 字節(jié), 如果不夠, 空位補空格;
(3) 將消息分為消息頭和消息體, 消息頭中包含表示消息總長度(或者消息體長度)的字段, 通常設(shè)計思路為消息頭的第一個字段使用 int32 來表示消息的總長度,LengthFieldBasedFrameDecoder。
下面列舉一個包尾增加分隔符的例子:
服務(wù)端程序:
import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;import java.util.concurrent.atomic.AtomicInteger;/*** 入站處理器*/@ChannelHandler.Sharablepublic class DelimiterServerHandler extends ChannelInboundHandlerAdapter {private AtomicInteger counter = new AtomicInteger(0);private AtomicInteger completeCounter = new AtomicInteger(0);/*** 服務(wù)端讀取到網(wǎng)絡(luò)數(shù)據(jù)后的處理*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf in = (ByteBuf)msg;String request = in.toString(CharsetUtil.UTF_8);System.out.println("Server Accept["+request+"] and the counter is:"+counter.incrementAndGet());String resp = "Hello,"+request+". Welcome to Netty World!"+ DelimiterEchoServer.DELIMITER_SYMBOL;ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));}/*** 服務(wù)端讀取完成網(wǎng)絡(luò)數(shù)據(jù)后的處理*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx)throws Exception {ctx.fireChannelReadComplete();System.out.println("the ReadComplete count is "+completeCounter.incrementAndGet());}/*** 發(fā)生異常后的處理*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}}
import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;import java.net.InetSocketAddress;/*** 服務(wù)端*/public class DelimiterEchoServer {public static final String DELIMITER_SYMBOL = "@~";public static final int PORT = 9997;public static void main(String[] args) throws InterruptedException {DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer();System.out.println("服務(wù)器即將啟動");delimiterEchoServer.start();}public void start() throws InterruptedException {final DelimiterServerHandler serverHandler = new DelimiterServerHandler();EventLoopGroup group = new NioEventLoopGroup();/*線程組*/try {ServerBootstrap b = new ServerBootstrap();/*服務(wù)端啟動必須*/b.group(group)/*將線程組傳入*/.channel(NioServerSocketChannel.class)/*指定使用NIO進行網(wǎng)絡(luò)傳輸*/.localAddress(new InetSocketAddress(PORT))/*指定服務(wù)器監(jiān)聽端口*//*服務(wù)端每接收到一個連接請求,就會新啟一個socket通信,也就是channel,所以下面這段代碼的作用就是為這個子channel增加handle*/.childHandler(new ChannelInitializerImp());ChannelFuture f = b.bind().sync();/*異步綁定到服務(wù)器,sync()會阻塞直到完成*/System.out.println("服務(wù)器啟動完成,等待客戶端的連接和數(shù)據(jù).....");f.channel().closeFuture().sync();/*阻塞直到服務(wù)器的channel關(guān)閉*/} finally {group.shutdownGracefully().sync();/*優(yōu)雅關(guān)閉線程組*/}}private static class ChannelInitializerImp extends ChannelInitializer{ @Overrideprotected void initChannel(Channel ch) throws Exception {ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL.getBytes());ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024,delimiter));ch.pipeline().addLast(new DelimiterServerHandler());}}}
客戶端程序
import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.util.CharsetUtil;/*** 入站處理器*/public class DelimiterClientHandler extends SimpleChannelInboundHandler{ private AtomicInteger counter = new AtomicInteger(0);/*** 客戶端讀取到網(wǎng)絡(luò)數(shù)據(jù)后的處理*/protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)+"] and the counter is:"+counter.incrementAndGet());}/*** 客戶端被通知channel活躍后,做事*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ByteBuf msg = null;String request = "Mark,Lison,Peter,James,Deer"+ DelimiterEchoServer.DELIMITER_SYMBOL;for(int i=0;i<10;i++){msg = Unpooled.buffer(request.length());msg.writeBytes(request.getBytes());ctx.writeAndFlush(msg);}}/*** 發(fā)生異常后的處理*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}}import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;/*** 客戶端*/public class DelimiterEchoClient {private final String host;public DelimiterEchoClient(String host) {this.host = host;}public void start() throws InterruptedException {EventLoopGroup group = new NioEventLoopGroup();/*線程組*/try {final Bootstrap b = new Bootstrap();;/*客戶端啟動必須*/b.group(group)/*將線程組傳入*/.channel(NioSocketChannel.class)/*指定使用NIO進行網(wǎng)絡(luò)傳輸*/.remoteAddress(new InetSocketAddress(host,DelimiterEchoServer.PORT))/*配置要連接服務(wù)器的ip地址和端口*/.handler(new ChannelInitializerImp());ChannelFuture f = b.connect().sync();System.out.println("已連接到服務(wù)器.....");f.channel().closeFuture().sync();} finally {group.shutdownGracefully().sync();}}private static class ChannelInitializerImp extends ChannelInitializer{ @Overrideprotected void initChannel(Channel ch) throws Exception {ByteBuf delimiter= Unpooled.copiedBuffer(DelimiterEchoServer.DELIMITER_SYMBOL.getBytes());ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024,delimiter));ch.pipeline().addLast(new DelimiterClientHandler());}}public static void main(String[] args) throws InterruptedException {new DelimiterEchoClient("127.0.0.1").start();}}
關(guān)鍵代碼:
1.建立連接后,客戶端給服務(wù)端發(fā)數(shù)據(jù)包,每次發(fā)送已特殊字符`@~結(jié)尾。
2.服務(wù)端收到數(shù)據(jù)包后經(jīng)過DelimiterBasedFrameDecoder即分隔符基礎(chǔ)框架解碼器解碼為一個個帶有分隔符的數(shù)據(jù)包。
3.再到服務(wù)端的業(yè)務(wù)層處理器DelimiterServerHandler

