Netty 的編碼 解碼 案例
0x01:半包粘包
例如發(fā)送兩個數(shù)據(jù)包給服務器,由于服務端一次讀取到的字節(jié)數(shù)不一定的分
沒有半包和拆包:服務器分兩次讀取到兩個地理的數(shù)據(jù)包,這個情況沒有拆包和粘包的情況
粘包:服務器一次收到兩個數(shù)據(jù)包,在一起收到的
拆包:第一次讀取到完成的第一個包和第二個包的一部分內容,第二次讀取到第二個包的剩余內容
整包:第一次讀取到第一包的部分內容,第二次讀取到第一個包的剩余部分和第二個包的全部
多次拆包:如果接收滑窗非常小,數(shù)據(jù)量大的時候發(fā)生多次發(fā)送的接收的情況
為什么會出現(xiàn)半包和粘包
1、HTTP 中有一個 Nagle 算法,每個報文都是一段的,使用網(wǎng)絡發(fā)送發(fā)現(xiàn)網(wǎng)絡效率低,然后 HTTP 設置一個算法,設置到一定程度發(fā),所以出現(xiàn)一些延時,提高銷量,所以形成了粘包
2、HTTP緩沖區(qū)引起的,報文段大的時候的時候直接弄在一起發(fā)送過去。
怎么解決
不斷的從 TCP 的緩沖區(qū)中讀取數(shù)據(jù),每次讀取完成都需要判斷是否是一個完整的數(shù)據(jù)包
如果是讀取的數(shù)據(jù)不足以拼接成一個完整的業(yè)務數(shù)據(jù)包,那就保留該數(shù)據(jù),繼續(xù)從 TCP 緩沖區(qū)中讀取,直到得到一個完整的數(shù)據(jù)包
定長
分隔符
基于長度的變長包如果當前督導的數(shù)據(jù)加上已經(jīng)讀取到的數(shù)據(jù)足以拼接成一個數(shù)據(jù)包,那就講已經(jīng)讀取的數(shù)據(jù)拼接本次讀取的數(shù)據(jù),構成一個完整的業(yè)務數(shù)據(jù)包傳遞到業(yè)務邏輯上,多余的數(shù)據(jù)保留,方便下次的讀取或者數(shù)據(jù)鏈接。
0x02:Netty常用的編碼器
LineBasedFrameDecoder
回車換行編碼器
配合StringDecoder
DelimiterBasedFrameDecoder
分隔符解碼器
FixedLengthFrameDecoder
固定長度解碼器
LengthFieldBasedFrameDecoder
不能超過1024個字節(jié)不然會報錯
基于'長度'解碼器(私有協(xié)議最常用)
0x03:拆包的類
ByteToMessageDecoder
自解析
LengthFieldPrepender
長度編碼器
Netty拆包的基類 - ByteToMessageDecoder
內部維護了一個數(shù)據(jù)累積器cumulation,每次讀取到數(shù)據(jù)都會不斷累加,然后嘗試對累加到
的數(shù)據(jù)進行拆包,拆成一個完整的業(yè)務數(shù)據(jù)包
每次都將讀取到的數(shù)據(jù)通過內存拷貝的方式, 累積到cumulation中
調用子類的 decode 方法對累積的數(shù)據(jù)嘗試進行拆包
LengthFieldBasedFrameDecoder
參數(shù)說明
maxFrameLength:包的最大長度
lengthFieldOffset:長度屬性的起始位(偏移位),包中存放長度屬性字段的起始位置
lengthFieldLength:長度屬性的長度
lengthAdjustment:長度調節(jié)值,在總長被定義為包含包頭長度時,修正信息長度
initialBytesToStrip:跳過的字節(jié)數(shù),根據(jù)需要跳過lengthFieldLength個字節(jié),以便接收端直接接受到不含“長度屬性”的內容
LengthFieldPrepender 編碼器
參數(shù)說明
lengthFieldLength:長度屬性的字節(jié)長度
lengthIncludesLengthFieldLength:false,長度字節(jié)不算在總長度中,true,算到總長度中
編解碼器的作用就是講原始字節(jié)數(shù)據(jù)與自定義的消息對象進行互轉
Decoder(解碼器)
Encoder(編碼器)
支持業(yè)界主流的序列化框架
Protobuf
Jboss Marshalling
Java Serialization
解碼1拆包:把整個 ByteBuf 數(shù)據(jù),分成一個個 ByteBuf,每個表示一個包
解碼2反序列化:把每個包的 ByteBuf 字節(jié)數(shù)組轉成 java object
package com.demo;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
public class StickyDemoClient {
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
}
}
new StickyDemoClient().connect(port, "127.0.0.1");
}
public void connect(int port, String host) throws Exception {
// 工作線程組
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//ch.pipeline().addLast("framer", new FixedLengthFrameDecoder(139));
// ch.pipeline().addLast("framer", new StickyDemoDecodeHandlerV2(
// Unpooled.wrappedBuffer(new byte[] { '#' })));
ch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(8192,
Unpooled.wrappedBuffer(new byte[] { '#' })));
ch.pipeline().addLast(new StickyDemoClientHandler());
}
});
// 發(fā)起異步連接操作
ChannelFuture f = b.connect(host, port).sync();
// 等待客戶端鏈路關閉
f.channel().closeFuture().sync();
} finally {
// 優(yōu)雅退出,釋放線程池資源
group.shutdownGracefully();
}
}
}
package com.demo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class StickyDemoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static String[] alphabets = {"A", "B", "C", "D", "E", "F", "G", "H", "I",
"J", "K", "L", "M", "N", "O", "P"};
@Override
public void channelActive(ChannelHandlerContext ctx) {
for(int i=0; i<10; i++) {
StringBuilder builder = new StringBuilder();
builder.append("這是第");
builder.append(i);
builder.append("條消息, 內容是:");
for(int j=0; j<100; j++) {
builder.append(alphabets[i]);
}
builder.append("......");
builder.append("#");
System.out.println(builder.toString().getBytes().length);
ctx.writeAndFlush(Unpooled.copiedBuffer(builder.toString(),
CharsetUtil.UTF_8));
}
}
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
System.out.println("客戶端接收到消息:" + in.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
package com.demo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.ArrayList;
import java.util.List;
public class StickyDemoDecodeHandler extends ChannelInboundHandlerAdapter {
//存放待拆包數(shù)據(jù)的緩沖區(qū)
private ByteBuf cache;
private int frameLength;
public StickyDemoDecodeHandler(int length) {
this.frameLength = length;
}
static ByteBuf expandCache(ByteBufAllocator alloc, ByteBuf cache, int readable) {
ByteBuf oldCache = cache;
cache = alloc.buffer(oldCache.readableBytes() + readable);
cache.writeBytes(oldCache);
oldCache.release();
return cache;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf data = (ByteBuf) msg;
try {
//讀取每一個消息,創(chuàng)建緩沖區(qū)
if (cache == null) {
cache = ctx.alloc().buffer(1024);
} else {
//如果現(xiàn)有的緩沖區(qū)容量太小,無法容納原有數(shù)據(jù)+新讀入的數(shù)據(jù),就擴容(重新創(chuàng)建一個大的,并把數(shù)據(jù)拷貝過去)
if (cache.writerIndex() > cache.maxCapacity() - data.readableBytes()) {
cache = expandCache(ctx.alloc(), cache, data.readableBytes());
}
}
//把新的數(shù)據(jù)讀入緩沖區(qū)
cache.writeBytes(data);
//每次讀取frameLength(定長)的數(shù)據(jù),做為一個包,存儲起來
List<ByteBuf> output = new ArrayList<>();
while (cache.readableBytes() >= frameLength) {
output.add(cache.readBytes(frameLength));
}
//還有部分數(shù)據(jù)不夠一個包,10, 15, 一個10個,還剩5個
if (cache.isReadable()) {
cache.discardReadBytes();
}
for (int i = 0; i < output.size(); i++) {
ctx.fireChannelRead(output.get(i));
}
} finally {
data.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
package com.demo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.ArrayList;
import java.util.List;
public class StickyDemoDecodeHandlerV2 extends ChannelInboundHandlerAdapter {
private ByteBuf cache;
private byte delimiter; //包分隔符
public StickyDemoDecodeHandlerV2(ByteBuf delimiter) {
if (delimiter == null) {
throw new NullPointerException("delimiter");
}
if (!delimiter.isReadable()) {
throw new IllegalArgumentException("empty delimiter");
}
this.delimiter = delimiter.readByte();
;
}
static ByteBuf expandCache(ByteBufAllocator alloc, ByteBuf cache, int readable) {
ByteBuf oldCache = cache;
cache = alloc.buffer(oldCache.readableBytes() + readable);
cache.writeBytes(oldCache);
oldCache.release();
return cache;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf data = (ByteBuf) msg;
try {
if (cache == null) {
cache = ctx.alloc().buffer(1024);
} else {
if (cache.writerIndex() > cache.maxCapacity() - data.readableBytes()) {
cache = expandCache(ctx.alloc(), cache, data.readableBytes());
}
}
cache.writeBytes(data);
List<ByteBuf> output = new ArrayList<>();
int frameIndex = 0;
int frameEndIndex = 0;
int length = cache.readableBytes();
while (frameIndex <= length) {
frameEndIndex = cache.indexOf(frameIndex + 1, length, delimiter);
if (frameEndIndex == -1) {
cache.discardReadBytes();
break;
}
output.add(cache.readBytes(frameEndIndex - frameIndex));
cache.skipBytes(1);
frameIndex = frameEndIndex + 1;
}
if (cache.isReadable()) {
cache.discardReadBytes();
}
for (int i = 0; i < output.size(); i++) {
ctx.fireChannelRead(output.get(i));
}
} finally {
data.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
package com.demo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
public class StickyDemoServer {
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默認值
}
}
new StickyDemoServer().bind(port);
}
public void bind(int port) throws Exception {
// 第一步:
// 配置服務端的NIO線程組
// 主線程組, 用于接受客戶端的連接,但是不做任何具體業(yè)務處理,像老板一樣,負責接待客戶,不具體服務客戶
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 工作線程組, 老板線程組會把任務丟給他,讓手下線程組去做任務,服務客戶
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 類ServerBootstrap用于配置Server相關參數(shù),并啟動Server
ServerBootstrap b = new ServerBootstrap();
// 鏈式調用
// 配置parentGroup和childGroup
b.group(bossGroup, workerGroup)
// 配置Server通道
.channel(NioServerSocketChannel.class)
// 配置通道的ChannelPipeline
.childHandler(new ChildChannelHandler());
// 綁定端口,并啟動server,同時設置啟動方式為同步
ChannelFuture f = b.bind(port).sync();
System.out.println(StickyDemoServer.class.getName() + " 啟動成功,在地址[" + f.channel().localAddress() + "]上等待客戶請求......");
// 等待服務端監(jiān)聽端口關閉
f.channel().closeFuture().sync();
} finally {
// 優(yōu)雅退出,釋放線程池資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//ch.pipeline().addLast("framer", new FixedLengthFrameDecoder(139));
ch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(8192, Unpooled.wrappedBuffer(new byte[] { '#' })));
//ch.pipeline().addLast("framer", new StickyDemoDecodeHandler(139));
// ch.pipeline().addLast("framer", new StickyDemoDecodeHandlerV2(
// Unpooled.wrappedBuffer(new byte[] { '#' })));
ch.pipeline().addLast(new StickyDemoServerHandler());
}
}
}
package com.demo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class StickyDemoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
System.out.println(
"服務器接收到消息:" + in.toString(CharsetUtil.UTF_8));
ctx.write(in);
// ctx.write(Unpooled.copiedBuffer("#", CharsetUtil.UTF_8));
//compositeBuffer.addComponent(in);
// ByteBuf buf = ctx.alloc().directBuffer();
// buf.writeBytes("#".getBytes());
// CompositeByteBuf compositeBuffer = ctx.alloc().compositeBuffer();
// compositeBuffer.addComponents(true, in, buf);
// ctx.write(compositeBuffer);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
source:https://www.yuque.com/yangxinlei/lodfss/nguvm0

喜歡,在看
