Socket粘包問題終極解決方案—Netty版(2W字)!

作者 | 王磊
來源 | Java中文社群(ID:javacn666)
上一篇我們寫了《Socket粘包問題的3種解決方案》,但沒想到評(píng)論區(qū)竟然炸了。介于大家的熱情討論,以及不同的反饋意見,本文就來做一個(gè)擴(kuò)展和延伸,試圖找到問題的最優(yōu)解,以及消息通訊的最優(yōu)解決方案。
在正式開始之前,我們先對(duì)上篇評(píng)論中的幾個(gè)典型問題做一個(gè)簡(jiǎn)單的回復(fù),不感興趣的朋友可直接劃過。
問題一:TCP存在粘包問題嗎?
先說答案:TCP 本身并沒有粘包和半包一說,因?yàn)?TCP 本質(zhì)上只是一個(gè)傳輸控制協(xié)議(Transmission Control Protocol,TCP),它是一種面向連接的、可靠的、基于字節(jié)流的傳輸層通信協(xié)議,由 IETF 的 RFC 793 定義。
所謂的協(xié)議本質(zhì)上是一個(gè)約定,就好比 Java 編程約定使用駝峰命名法一樣,約定的意義是為了讓通訊雙方,能夠正常的進(jìn)行消息互換的,那粘包和半包問題又是如何產(chǎn)生的呢?
這是因?yàn)樵?TCP 的交互中,數(shù)據(jù)是以字節(jié)流的形式進(jìn)行傳輸?shù)模傲鳌钡膫鬏斒菦]有邊界的,因?yàn)闆]有邊界所以就不能區(qū)分消息的歸屬,從而就會(huì)產(chǎn)生粘包和半包問題(粘包和半包的定義,詳見上一篇)。所以說 TCP 協(xié)議本身并不存在粘包和半包問題,只是在使用中如果不能有效的確定流的邊界就會(huì)產(chǎn)生粘包和半包問題。
問題二:分隔符是最優(yōu)解決方案?
坦白的說,經(jīng)過評(píng)論區(qū)大家的耐心“開導(dǎo)”,我也意識(shí)到了以結(jié)束符作為最終的解決方案存在一定的局限性,比如當(dāng)一條消息中間如果出現(xiàn)了結(jié)束符就會(huì)造成半包的問題,所以如果是復(fù)雜的字符串要對(duì)內(nèi)容進(jìn)行編碼和解碼處理,這樣才能保證結(jié)束符的正確性。
問題三:Socket 高效嗎?
這個(gè)問題的答案是否定的,其實(shí)上文在開頭已經(jīng)描述了應(yīng)用場(chǎng)景:「?jìng)鹘y(tǒng)的 Socket 編程」,學(xué)習(xí)它的意義就在于理解更早期更底層的一些知識(shí),當(dāng)然作為補(bǔ)充本文會(huì)提供更加高效的消息通訊方案——Netty 通訊。
聊完了以上問題,接下來咱們先來補(bǔ)充一下上篇文章中提到的,將消息分為消息頭和消息體的代碼實(shí)現(xiàn)。
一、封裝消息頭和消息體
在開始寫服務(wù)器端和客戶端之前,咱們先來編寫一個(gè)消息的封裝類,使用它可以將消息封裝成消息頭和消息體,如下圖所示:
消息頭中存儲(chǔ)消息體的長(zhǎng)度,從而確定了消息的邊界,便解決粘包和半包問題。
1.消息封裝類
消息的封裝類中提供了兩個(gè)方法:一個(gè)是將消息轉(zhuǎn)換成消息頭 + 消息體的方法,另一個(gè)是讀取消息頭的方法,具體實(shí)現(xiàn)代碼如下:
/**
?*?消息封裝類
?*/
class?SocketPacket?{
????//?消息頭存儲(chǔ)的長(zhǎng)度(占?8?字節(jié))
????static?final?int?HEAD_SIZE?=?8;
????/**
?????*?將協(xié)議封裝為:協(xié)議頭?+?協(xié)議體
?????*?@param?context?消息體(String?類型)
?????*?@return?byte[]
?????*/
????public?byte[]?toBytes(String?context)?{
????????//?協(xié)議體?byte?數(shù)組
????????byte[]?bodyByte?=?context.getBytes();
????????int?bodyByteLength?=?bodyByte.length;
????????//?最終封裝對(duì)象
????????byte[]?result?=?new?byte[HEAD_SIZE?+?bodyByteLength];
????????//?借助?NumberFormat?將?int?轉(zhuǎn)換為?byte[]
????????NumberFormat?numberFormat?=?NumberFormat.getNumberInstance();
????????numberFormat.setMinimumIntegerDigits(HEAD_SIZE);
????????numberFormat.setGroupingUsed(false);
????????//?協(xié)議頭?byte?數(shù)組
????????byte[]?headByte?=?numberFormat.format(bodyByteLength).getBytes();
????????//?封裝協(xié)議頭
????????System.arraycopy(headByte,?0,?result,?0,?HEAD_SIZE);
????????//?封裝協(xié)議體
????????System.arraycopy(bodyByte,?0,?result,?HEAD_SIZE,?bodyByteLength);
????????return?result;
????}
????/**
?????*?獲取消息頭的內(nèi)容(也就是消息體的長(zhǎng)度)
?????*?@param?inputStream
?????*?@return
?????*/
????public?int?getHeader(InputStream?inputStream)?throws?IOException?{
????????int?result?=?0;
????????byte[]?bytes?=?new?byte[HEAD_SIZE];
????????inputStream.read(bytes,?0,?HEAD_SIZE);
????????//?得到消息體的字節(jié)長(zhǎng)度
????????result?=?Integer.valueOf(new?String(bytes));
????????return?result;
????}
}
2.編寫客戶端
接下來我們來定義客戶端,在客戶端中我們添加一組待發(fā)送的消息,隨機(jī)給服務(wù)器端發(fā)送一個(gè)消息,實(shí)現(xiàn)代碼如下:
/**
?*?客戶端
?*/
class?MySocketClient?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????//?啟動(dòng)?Socket?并嘗試連接服務(wù)器
????????Socket?socket?=?new?Socket("127.0.0.1",?9093);
????????//?發(fā)送消息合集(隨機(jī)發(fā)送一條消息)
????????final?String[]?message?=?{"Hi,Java.",?"Hi,SQL~",?"關(guān)注公眾號(hào)|Java中文社群."};
????????//?創(chuàng)建協(xié)議封裝對(duì)象
????????SocketPacket?socketPacket?=?new?SocketPacket();
????????try?(OutputStream?outputStream?=?socket.getOutputStream())?{
????????????//?給服務(wù)器端發(fā)送?10?次消息
????????????for?(int?i?=?0;?i?10;?i++)?{
????????????????//?隨機(jī)發(fā)送一條消息
????????????????String?msg?=?message[new?Random().nextInt(message.length)];
????????????????//?將內(nèi)容封裝為:協(xié)議頭+協(xié)議體
????????????????byte[]?bytes?=?socketPacket.toBytes(msg);
????????????????//?發(fā)送消息
????????????????outputStream.write(bytes,?0,?bytes.length);
????????????????outputStream.flush();
????????????}
????????}
????}
}
3.編寫服務(wù)器端
服務(wù)器端我們使用線程池來處理每個(gè)客戶端的業(yè)務(wù)請(qǐng)求,實(shí)現(xiàn)代碼如下:
/**
?*?服務(wù)器端
?*/
class?MySocketServer?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????//?創(chuàng)建?Socket?服務(wù)器端
????????ServerSocket?serverSocket?=?new?ServerSocket(9093);
????????//?獲取客戶端連接
????????Socket?clientSocket?=?serverSocket.accept();
????????//?使用線程池處理更多的客戶端
????????ThreadPoolExecutor?threadPool?=?new?ThreadPoolExecutor(100,?150,?100,
????????????????TimeUnit.SECONDS,?new?LinkedBlockingQueue<>(1000));
????????threadPool.submit(()?->?{
????????????//?客戶端消息處理
????????????processMessage(clientSocket);
????????});
????}
????/**
?????*?客戶端消息處理
?????*?@param?clientSocket
?????*/
????private?static?void?processMessage(Socket?clientSocket)?{
????????//?Socket?封裝對(duì)象
????????SocketPacket?socketPacket?=?new?SocketPacket();
????????//?獲取客戶端發(fā)送的消息對(duì)象
????????try?(InputStream?inputStream?=?clientSocket.getInputStream())?{
????????????while?(true)?{
????????????????//?獲取消息頭(也就是消息體的長(zhǎng)度)
????????????????int?bodyLength?=?socketPacket.getHeader(inputStream);
????????????????//?消息體?byte?數(shù)組
????????????????byte[]?bodyByte?=?new?byte[bodyLength];
????????????????//?每次實(shí)際讀取字節(jié)數(shù)
????????????????int?readCount?=?0;
????????????????//?消息體賦值下標(biāo)
????????????????int?bodyIndex?=?0;
????????????????//?循環(huán)接收消息頭中定義的長(zhǎng)度
????????????????while?(bodyIndex?<=?(bodyLength?-?1)?&&
????????????????????????(readCount?=?inputStream.read(bodyByte,?bodyIndex,?bodyLength))?!=?-1)?{
????????????????????bodyIndex?+=?readCount;
????????????????}
????????????????bodyIndex?=?0;
????????????????//?成功接收到客戶端的消息并打印
????????????????System.out.println("接收到客戶端的信息:"?+?new?String(bodyByte));
????????????}
????????}?catch?(IOException?ioException)?{
????????????System.out.println(ioException.getMessage());
????????}
????}
}
以上程序的執(zhí)行結(jié)果如下:
從上述結(jié)果可以看出,消息通訊正常,客戶端和服務(wù)器端的交互中并沒有出現(xiàn)粘包和半包的問題。
二、使用 Netty 實(shí)現(xiàn)高效通訊
以上的內(nèi)容都是針對(duì)傳統(tǒng) Socket 編程的,但要實(shí)現(xiàn)更加高效的通訊和連接對(duì)象的復(fù)用就要使用 NIO(Non-Blocking IO,非阻塞 IO)或者 AIO(Asynchronous IO,異步非阻塞 IO)了。
傳統(tǒng)的 Socket 編程是 BIO(Blocking IO,同步阻塞 IO),它和 NIO 和 AIO ?的區(qū)別如下:
BIO 來自傳統(tǒng)的 java.io 包,它是基于流模型實(shí)現(xiàn)的,交互的方式是同步、阻塞方式,也就是說在讀入輸入流或者輸出流時(shí),在讀寫動(dòng)作完成之前,線程會(huì)一直阻塞在那里,它們之間的調(diào)用是可靠的線性順序。它的優(yōu)點(diǎn)就是代碼比較簡(jiǎn)單、直觀;缺點(diǎn)就是 IO 的效率和擴(kuò)展性很低,容易成為應(yīng)用性能瓶頸。 NIO 是 Java 1.4 引入的 java.nio 包,提供了 Channel、Selector、Buffer 等新的抽象,可以構(gòu)建多路復(fù)用的、同步非阻塞 IO 程序,同時(shí)提供了更接近操作系統(tǒng)底層高性能的數(shù)據(jù)操作方式。 AIO 是 Java 1.7 之后引入的包,是 NIO 的升級(jí)版本,提供了異步非堵塞的 IO 操作方式,因此人們叫它 AIO(Asynchronous IO),異步 IO 是基于事件和回調(diào)機(jī)制實(shí)現(xiàn)的,也就是應(yīng)用操作之后會(huì)直接返回,不會(huì)堵塞在那里,當(dāng)后臺(tái)處理完成,操作系統(tǒng)會(huì)通知相應(yīng)的線程進(jìn)行后續(xù)的操作。
PS:AIO 可以看作是 NIO 的升級(jí),它也叫 NIO 2。
傳統(tǒng) Socket 的通訊流程:
NIO 的通訊流程:
使用 Netty 替代傳統(tǒng) NIO 編程
NIO 的設(shè)計(jì)思路雖然很好,但它的代碼編寫比較麻煩,比如 Buffer 的使用和 Selector 的編寫等。并且在面對(duì)斷線重連、包丟失和粘包等復(fù)雜問題時(shí)手動(dòng)處理的成本都很大,因此我們通常會(huì)使用 Netty 框架來替代傳統(tǒng)的 NIO。
Netty 是什么?
Netty 是一個(gè)異步、事件驅(qū)動(dòng)的用來做高性能、高可靠性的網(wǎng)絡(luò)應(yīng)用框架,使用它可以快速輕松地開發(fā)網(wǎng)絡(luò)應(yīng)用程序,極大的簡(jiǎn)化了網(wǎng)絡(luò)編程的復(fù)雜度。
Netty 主要優(yōu)點(diǎn)有以下幾個(gè):
框架設(shè)計(jì)優(yōu)雅,底層模型隨意切換適應(yīng)不同的網(wǎng)絡(luò)協(xié)議要求; 提供很多標(biāo)準(zhǔn)的協(xié)議、安全、編碼解碼的支持; 簡(jiǎn)化了 NIO 使用中的諸多不便; 社區(qū)非常活躍,很多開源框架中都使用了 Netty 框架,如 Dubbo、RocketMQ、Spark 等。
Netty 主要包含以下 3 個(gè)部分,如下圖所示:
這 3 個(gè)部分的功能介紹如下。
1. Core 核心層
Core 核心層是 Netty 最精華的內(nèi)容,它提供了底層網(wǎng)絡(luò)通信的通用抽象和實(shí)現(xiàn),包括可擴(kuò)展的事件模型、通用的通信 API、支持零拷貝的 ByteBuf 等。
2. Protocol Support 協(xié)議支持層
協(xié)議支持層基本上覆蓋了主流協(xié)議的編解碼實(shí)現(xiàn),如 HTTP、SSL、Protobuf、壓縮、大文件傳輸、WebSocket、文本、二進(jìn)制等主流協(xié)議,此外 Netty 還支持自定義應(yīng)用層協(xié)議。Netty 豐富的協(xié)議支持降低了用戶的開發(fā)成本,基于 Netty 我們可以快速開發(fā) HTTP、WebSocket 等服務(wù)。
3. Transport Service 傳輸服務(wù)層
傳輸服務(wù)層提供了網(wǎng)絡(luò)傳輸能力的定義和實(shí)現(xiàn)方法。它支持 Socket、HTTP 隧道、虛擬機(jī)管道等傳輸方式。Netty 對(duì) TCP、UDP 等數(shù)據(jù)傳輸做了抽象和封裝,用戶可以更聚焦在業(yè)務(wù)邏輯實(shí)現(xiàn)上,而不必關(guān)系底層數(shù)據(jù)傳輸?shù)募?xì)節(jié)。
Netty 使用
對(duì) Netty 有了大概的認(rèn)識(shí)之后,接下來我們用 Netty 來編寫一個(gè)基礎(chǔ)的通訊服務(wù)器,它包含兩個(gè)端:服務(wù)器端和客戶端,客戶端負(fù)責(zé)發(fā)送消息,服務(wù)器端負(fù)責(zé)接收并打印消息,具體的實(shí)現(xiàn)步驟如下。
1.添加 Netty 框架
首先我們需要先添加 Netty 框架的支持,如果是 Maven 項(xiàng)目添加如下配置即可:
<dependency>
????<groupId>io.nettygroupId>
????<artifactId>netty-allartifactId>
????<version>4.1.56.Finalversion>
dependency>
Netty 版本說明
Netty 的 3.x 和 4.x 為主流的穩(wěn)定版本,而最新的 5.x 已經(jīng)是放棄的測(cè)試版了,因此推薦使用 Netty 4.x 的最新穩(wěn)定版。
2. 服務(wù)器端實(shí)現(xiàn)代碼
按照官方的推薦,這里將服務(wù)器端的代碼分為以下 3 個(gè)部分:
MyNettyServer:服務(wù)器端的核心業(yè)務(wù)代碼; ServerInitializer:服務(wù)器端通道(Channel)初始化; ServerHandler:服務(wù)器端接收到信息之后的處理邏輯。
PS:Channel 字面意思為“通道”,它是網(wǎng)絡(luò)通信的載體。Channel 提供了基本的 API 用于網(wǎng)絡(luò) I/O 操作,如 register、bind、connect、read、write、flush 等。Netty 自己實(shí)現(xiàn)的 Channel 是以 JDK NIO Channel 為基礎(chǔ)的,相比較于 JDK NIO,Netty 的 Channel 提供了更高層次的抽象,同時(shí)屏蔽了底層 Socket 的復(fù)雜性,賦予了 Channel 更加強(qiáng)大的功能,你在使用 Netty 時(shí)基本不需要再與 Java Socket 類直接打交道。
服務(wù)器端的實(shí)現(xiàn)代碼如下:
//?定義服務(wù)器的端口號(hào)
static?final?int?PORT?=?8007;
/**
?*?服務(wù)器端
?*/
static?class?MyNettyServer?{
????public?static?void?main(String[]?args)?{
????????//?創(chuàng)建一個(gè)線程組,用來負(fù)責(zé)接收客戶端連接
????????EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();
????????//?創(chuàng)建另一個(gè)線程組,用來負(fù)責(zé)?I/O?的讀寫
????????EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();
????????try?{
????????????//?創(chuàng)建一個(gè)?Server?實(shí)例(可理解為?Netty?的入門類)
????????????ServerBootstrap?b?=?new?ServerBootstrap();
????????????//?將兩個(gè)線程池設(shè)置到?Server?實(shí)例
????????????b.group(bossGroup,?workerGroup)
????????????????????//?設(shè)置?Netty?通道的類型為?NioServerSocket(非阻塞?I/O?Socket?服務(wù)器)
????????????????????.channel(NioServerSocketChannel.class)
????????????????????//?設(shè)置建立連接之后的執(zhí)行器(ServerInitializer?是我創(chuàng)建的一個(gè)自定義類)
????????????????????.childHandler(new?ServerInitializer());
????????????//?綁定端口并且進(jìn)行同步
????????????ChannelFuture?future?=?b.bind(PORT).sync();
????????????//?對(duì)關(guān)閉通道進(jìn)行監(jiān)聽
????????????future.channel().closeFuture().sync();
????????}?catch?(InterruptedException?e)?{
????????????e.printStackTrace();
????????}?finally?{
????????????//?資源關(guān)閉
????????????bossGroup.shutdownGracefully();
????????????workerGroup.shutdownGracefully();
????????}
????}
}
/**
?*?服務(wù)端通道初始化
?*/
static?class?ServerInitializer?extends?ChannelInitializer<SocketChannel>?{
????//?字符串編碼器和解碼器
????private?static?final?StringDecoder?DECODER?=?new?StringDecoder();
????private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();
????//?服務(wù)器端連接之后的執(zhí)行器(自定義的類)
????private?static?final?ServerHandler?SERVER_HANDLER?=?new?ServerHandler();
????/**
?????*?初始化通道的具體執(zhí)行方法
?????*/
????@Override
????public?void?initChannel(SocketChannel?ch)?{
????????//?通道?Channel?設(shè)置
????????ChannelPipeline?pipeline?=?ch.pipeline();
????????//?設(shè)置(字符串)編碼器和解碼器
????????pipeline.addLast(DECODER);
????????pipeline.addLast(ENCODER);
????????//?服務(wù)器端連接之后的執(zhí)行器,接收到消息之后的業(yè)務(wù)處理
????????pipeline.addLast(SERVER_HANDLER);
????}
}
/**
?*?服務(wù)器端接收到消息之后的業(yè)務(wù)處理類
?*/
static?class?ServerHandler?extends?SimpleChannelInboundHandler<String>?{
????/**
?????*?讀取到客戶端的消息
?????*/
????@Override
????public?void?channelRead0(ChannelHandlerContext?ctx,?String?request)?{
????????if?(!request.isEmpty())?{
????????????System.out.println("接到客戶端的消息:"?+?request);
????????}
????}
????/**
?????*?數(shù)據(jù)讀取完畢
?????*/
????@Override
????public?void?channelReadComplete(ChannelHandlerContext?ctx)?{
????????ctx.flush();
????}
????/**
?????*?異常處理,打印異常并關(guān)閉通道
?????*/
????@Override
????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?{
????????cause.printStackTrace();
????????ctx.close();
????}
}
3.客戶端實(shí)現(xiàn)代碼
客戶端的代碼實(shí)現(xiàn)也是分為以下 3 個(gè)部分:
MyNettyClient:客戶端核心業(yè)務(wù)代碼; ClientInitializer:客戶端通道初始化; ClientHandler:接收到消息之后的處理邏輯。
客戶端的實(shí)現(xiàn)代碼如下:
/**
?*?客戶端
?*/
static?class?MyNettyClient?{
????public?static?void?main(String[]?args)?{
????????//?創(chuàng)建事件循環(huán)線程組(客戶端的線程組只有一個(gè))
????????EventLoopGroup?group?=?new?NioEventLoopGroup();
????????try?{
????????????//?Netty?客戶端啟動(dòng)對(duì)象
????????????Bootstrap?b?=?new?Bootstrap();
????????????//?設(shè)置啟動(dòng)參數(shù)
????????????b.group(group)
????????????????????//?設(shè)置通道類型
????????????????????.channel(NioSocketChannel.class)
????????????????????//?設(shè)置啟動(dòng)執(zhí)行器(負(fù)責(zé)啟動(dòng)事件的業(yè)務(wù)執(zhí)行,ClientInitializer?為自定義的類)
????????????????????.handler(new?ClientInitializer());
????????????//?連接服務(wù)器端并同步通道
????????????Channel?ch?=?b.connect("127.0.0.1",?8007).sync().channel();
????????????//?發(fā)送消息
????????????ChannelFuture?lastWriteFuture?=?null;
????????????//?給服務(wù)器端發(fā)送?10?條消息
????????????for?(int?i?=?0;?i?10;?i++)?{
????????????????//?發(fā)送給服務(wù)器消息
????????????????lastWriteFuture?=?ch.writeAndFlush("Hi,Java.");
????????????}
????????????//?在關(guān)閉通道之前,同步刷新所有的消息
????????????if?(lastWriteFuture?!=?null)?{
????????????????lastWriteFuture.sync();
????????????}
????????}?catch?(InterruptedException?e)?{
????????????e.printStackTrace();
????????}?finally?{
????????????//?釋放資源
????????????group.shutdownGracefully();
????????}
????}
}
/**
?*?客戶端通道初始化類
?*/
static?class?ClientInitializer?extends?ChannelInitializer<SocketChannel>?{
????//?字符串編碼器和解碼器
????private?static?final?StringDecoder?DECODER?=?new?StringDecoder();
????private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();
????//?客戶端連接成功之后業(yè)務(wù)處理
????private?static?final?ClientHandler?CLIENT_HANDLER?=?new?ClientHandler();
????/**
?????*?初始化客戶端通道
?????*/
????@Override
????public?void?initChannel(SocketChannel?ch)?{
????????ChannelPipeline?pipeline?=?ch.pipeline();
????????//?設(shè)置(字符串)編碼器和解碼器
????????pipeline.addLast(DECODER);
????????pipeline.addLast(ENCODER);
????????//?客戶端連接成功之后的業(yè)務(wù)處理
????????pipeline.addLast(CLIENT_HANDLER);
????}
}
/**
?*?客戶端連接成功之后的業(yè)務(wù)處理
?*/
static?class?ClientHandler?extends?SimpleChannelInboundHandler<String>?{
????/**
?????*?讀取到服務(wù)器端的消息
?????*/
????@Override
????protected?void?channelRead0(ChannelHandlerContext?ctx,?String?msg)?{
????????System.err.println("接到服務(wù)器的消息:"?+?msg);
????}
????/**
?????*?異常處理,打印異常并關(guān)閉通道
?????*/
????@Override
????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?{
????????cause.printStackTrace();
????????ctx.close();
????}
}
從以上代碼可以看出,我們代碼實(shí)現(xiàn)的功能是,客戶端給服務(wù)器端發(fā)送 10 條消息。
編寫完上述代碼之后,我們就可以啟動(dòng)服務(wù)器端和客戶端了,啟動(dòng)之后,它們的執(zhí)行結(jié)果如下:
從上述結(jié)果中可以看出,雖然客戶端和服務(wù)器端實(shí)現(xiàn)了通信,但在 Netty 的使用中依然存在粘包的問題,服務(wù)器端一次收到了 10 條消息,而不是每次只收到一條消息,因此接下來我們要解決掉 Netty 中的粘包問題。
三、解決 Netty 粘包問題
在 Netty 中,解決粘包問題的常用方案有以下 3 種:
設(shè)置固定大小的消息長(zhǎng)度,如果長(zhǎng)度不足則使用空字符彌補(bǔ),它的缺點(diǎn)比較明顯,比較消耗網(wǎng)絡(luò)流量,因此不建議使用; 使用分隔符來確定消息的邊界,從而避免粘包和半包問題的產(chǎn)生; 將消息分為消息頭和消息體,在頭部中保存有當(dāng)前整個(gè)消息的長(zhǎng)度,只有在讀取到足夠長(zhǎng)度的消息之后才算是讀到了一個(gè)完整的消息。
接下來我們分別來看后兩種推薦的解決方案。
1.使用分隔符解決粘包問題
在 Netty 中提供了 DelimiterBasedFrameDecoder 類用來以特殊符號(hào)作為消息的結(jié)束符,從而解決粘包和半包的問題。
它的核心實(shí)現(xiàn)代碼是在初始化通道(Channel)時(shí),通過設(shè)置 DelimiterBasedFrameDecoder 來分隔消息,需要在客戶端和服務(wù)器端都進(jìn)行設(shè)置,具體實(shí)現(xiàn)代碼如下。
服務(wù)器端核心實(shí)現(xiàn)代碼如下:
/**
?*?服務(wù)端通道初始化
?*/
static?class?ServerInitializer?extends?ChannelInitializer<SocketChannel>?{
????//?字符串編碼器和解碼器
????private?static?final?StringDecoder?DECODER?=?new?StringDecoder();
????private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();
????//?服務(wù)器端連接之后的執(zhí)行器(自定義的類)
????private?static?final?ServerHandler?SERVER_HANDLER?=?new?ServerHandler();
????/**
?????*?初始化通道的具體執(zhí)行方法
?????*/
????@Override
????public?void?initChannel(SocketChannel?ch)?{
????????//?通道?Channel?設(shè)置
????????ChannelPipeline?pipeline?=?ch.pipeline();
????????// 19 行:設(shè)置結(jié)尾分隔符【核心代碼】(參數(shù)1:為消息的最大長(zhǎng)度,可自定義;參數(shù)2:分隔符[此處以換行符為分隔符])
????????pipeline.addLast(new?DelimiterBasedFrameDecoder(1024,?Delimiters.lineDelimiter()));
????????//?設(shè)置(字符串)編碼器和解碼器
????????pipeline.addLast(DECODER);
????????pipeline.addLast(ENCODER);
????????//?服務(wù)器端連接之后的執(zhí)行器,接收到消息之后的業(yè)務(wù)處理
????????pipeline.addLast(SERVER_HANDLER);
????}
}
核心代碼為第 19 行,代碼中已經(jīng)備注了方法的含義,這里就不再贅述。
客戶端的核心實(shí)現(xiàn)代碼如下:
/**
?*?客戶端通道初始化類
?*/
static?class?ClientInitializer?extends?ChannelInitializer<SocketChannel>?{
????//?字符串編碼器和解碼器
????private?static?final?StringDecoder?DECODER?=?new?StringDecoder();
????private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();
????//?客戶端連接成功之后業(yè)務(wù)處理
????private?static?final?ClientHandler?CLIENT_HANDLER?=?new?ClientHandler();
????/**
?????*?初始化客戶端通道
?????*/
????@Override
????public?void?initChannel(SocketChannel?ch)?{
????????ChannelPipeline?pipeline?=?ch.pipeline();
????????// 17 行:設(shè)置結(jié)尾分隔符【核心代碼】(參數(shù)1:為消息的最大長(zhǎng)度,可自定義;參數(shù)2:分隔符[此處以換行符為分隔符])
????????pipeline.addLast(new?DelimiterBasedFrameDecoder(1024,?Delimiters.lineDelimiter()));
????????//?設(shè)置(字符串)編碼器和解碼器
????????pipeline.addLast(DECODER);
????????pipeline.addLast(ENCODER);
????????//?客戶端連接成功之后的業(yè)務(wù)處理
????????pipeline.addLast(CLIENT_HANDLER);
????}
}
完整的服務(wù)器端和客戶端的實(shí)現(xiàn)代碼如下:
import?io.netty.bootstrap.Bootstrap;
import?io.netty.bootstrap.ServerBootstrap;
import?io.netty.channel.*;
import?io.netty.channel.nio.NioEventLoopGroup;
import?io.netty.channel.socket.SocketChannel;
import?io.netty.channel.socket.nio.NioServerSocketChannel;
import?io.netty.channel.socket.nio.NioSocketChannel;
import?io.netty.handler.codec.DelimiterBasedFrameDecoder;
import?io.netty.handler.codec.Delimiters;
import?io.netty.handler.codec.string.StringDecoder;
import?io.netty.handler.codec.string.StringEncoder;
public?class?NettyExample?{
????//?定義服務(wù)器的端口號(hào)
????static?final?int?PORT?=?8007;
????/**
?????*?服務(wù)器端
?????*/
????static?class?MyNettyServer?{
????????public?static?void?main(String[]?args)?{
????????????//?創(chuàng)建一個(gè)線程組,用來負(fù)責(zé)接收客戶端連接
????????????EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();
????????????//?創(chuàng)建另一個(gè)線程組,用來負(fù)責(zé)?I/O?的讀寫
????????????EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();
????????????try?{
????????????????//?創(chuàng)建一個(gè)?Server?實(shí)例(可理解為?Netty?的入門類)
????????????????ServerBootstrap?b?=?new?ServerBootstrap();
????????????????//?將兩個(gè)線程池設(shè)置到?Server?實(shí)例
????????????????b.group(bossGroup,?workerGroup)
????????????????????????//?設(shè)置?Netty?通道的類型為?NioServerSocket(非阻塞?I/O?Socket?服務(wù)器)
????????????????????????.channel(NioServerSocketChannel.class)
????????????????????????//?設(shè)置建立連接之后的執(zhí)行器(ServerInitializer?是我創(chuàng)建的一個(gè)自定義類)
????????????????????????.childHandler(new?ServerInitializer());
????????????????//?綁定端口并且進(jìn)行同步
????????????????ChannelFuture?future?=?b.bind(PORT).sync();
????????????????//?對(duì)關(guān)閉通道進(jìn)行監(jiān)聽
????????????????future.channel().closeFuture().sync();
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}?finally?{
????????????????//?資源關(guān)閉
????????????????bossGroup.shutdownGracefully();
????????????????workerGroup.shutdownGracefully();
????????????}
????????}
????}
????/**
?????*?服務(wù)端通道初始化
?????*/
????static?class?ServerInitializer?extends?ChannelInitializer<SocketChannel>?{
????????//?字符串編碼器和解碼器
????????private?static?final?StringDecoder?DECODER?=?new?StringDecoder();
????????private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();
????????//?服務(wù)器端連接之后的執(zhí)行器(自定義的類)
????????private?static?final?ServerHandler?SERVER_HANDLER?=?new?ServerHandler();
????????/**
?????????*?初始化通道的具體執(zhí)行方法
?????????*/
????????@Override
????????public?void?initChannel(SocketChannel?ch)?{
????????????//?通道?Channel?設(shè)置
????????????ChannelPipeline?pipeline?=?ch.pipeline();
????????????//?設(shè)置結(jié)尾分隔符
????????????pipeline.addLast(new?DelimiterBasedFrameDecoder(1024,?Delimiters.lineDelimiter()));
????????????//?設(shè)置(字符串)編碼器和解碼器
????????????pipeline.addLast(DECODER);
????????????pipeline.addLast(ENCODER);
????????????//?服務(wù)器端連接之后的執(zhí)行器,接收到消息之后的業(yè)務(wù)處理
????????????pipeline.addLast(SERVER_HANDLER);
????????}
????}
????/**
?????*?服務(wù)器端接收到消息之后的業(yè)務(wù)處理類
?????*/
????static?class?ServerHandler?extends?SimpleChannelInboundHandler<String>?{
????????/**
?????????*?讀取到客戶端的消息
?????????*/
????????@Override
????????public?void?channelRead0(ChannelHandlerContext?ctx,?String?request)?{
????????????if?(!request.isEmpty())?{
????????????????System.out.println("接到客戶端的消息:"?+?request);
????????????}
????????}
????????/**
?????????*?數(shù)據(jù)讀取完畢
?????????*/
????????@Override
????????public?void?channelReadComplete(ChannelHandlerContext?ctx)?{
????????????ctx.flush();
????????}
????????/**
?????????*?異常處理,打印異常并關(guān)閉通道
?????????*/
????????@Override
????????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?{
????????????cause.printStackTrace();
????????????ctx.close();
????????}
????}
????/**
?????*?客戶端
?????*/
????static?class?MyNettyClient?{
????????public?static?void?main(String[]?args)?{
????????????//?創(chuàng)建事件循環(huán)線程組(客戶端的線程組只有一個(gè))
????????????EventLoopGroup?group?=?new?NioEventLoopGroup();
????????????try?{
????????????????//?Netty?客戶端啟動(dòng)對(duì)象
????????????????Bootstrap?b?=?new?Bootstrap();
????????????????//?設(shè)置啟動(dòng)參數(shù)
????????????????b.group(group)
????????????????????????//?設(shè)置通道類型
????????????????????????.channel(NioSocketChannel.class)
????????????????????????//?設(shè)置啟動(dòng)執(zhí)行器(負(fù)責(zé)啟動(dòng)事件的業(yè)務(wù)執(zhí)行,ClientInitializer?為自定義的類)
????????????????????????.handler(new?ClientInitializer());
????????????????//?連接服務(wù)器端并同步通道
????????????????Channel?ch?=?b.connect("127.0.0.1",?PORT).sync().channel();
????????????????//?發(fā)送消息
????????????????ChannelFuture?lastWriteFuture?=?null;
????????????????//?給服務(wù)器端發(fā)送?10?條消息
????????????????for?(int?i?=?0;?i?10;?i++)?{
????????????????????//?發(fā)送給服務(wù)器消息
????????????????????lastWriteFuture?=?ch.writeAndFlush("Hi,Java.\n");
????????????????}
????????????????//?在關(guān)閉通道之前,同步刷新所有的消息
????????????????if?(lastWriteFuture?!=?null)?{
????????????????????lastWriteFuture.sync();
????????????????}
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}?finally?{
????????????????//?釋放資源
????????????????group.shutdownGracefully();
????????????}
????????}
????}
????/**
?????*?客戶端通道初始化類
?????*/
????static?class?ClientInitializer?extends?ChannelInitializer<SocketChannel>?{
????????//?字符串編碼器和解碼器
????????private?static?final?StringDecoder?DECODER?=?new?StringDecoder();
????????private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();
????????//?客戶端連接成功之后業(yè)務(wù)處理
????????private?static?final?ClientHandler?CLIENT_HANDLER?=?new?ClientHandler();
????????/**
?????????*?初始化客戶端通道
?????????*/
????????@Override
????????public?void?initChannel(SocketChannel?ch)?{
????????????ChannelPipeline?pipeline?=?ch.pipeline();
????????????//?設(shè)置結(jié)尾分隔符
????????????pipeline.addLast(new?DelimiterBasedFrameDecoder(1024,?Delimiters.lineDelimiter()));
????????????//?設(shè)置(字符串)編碼器和解碼器
????????????pipeline.addLast(DECODER);
????????????pipeline.addLast(ENCODER);
????????????//?客戶端連接成功之后的業(yè)務(wù)處理
????????????pipeline.addLast(CLIENT_HANDLER);
????????}
????}
????/**
?????*?客戶端連接成功之后的業(yè)務(wù)處理
?????*/
????static?class?ClientHandler?extends?SimpleChannelInboundHandler<String>?{
????????/**
?????????*?讀取到服務(wù)器端的消息
?????????*/
????????@Override
????????protected?void?channelRead0(ChannelHandlerContext?ctx,?String?msg)?{
????????????System.err.println("接到服務(wù)器的消息:"?+?msg);
????????}
????????/**
?????????*?異常處理,打印異常并關(guān)閉通道
?????????*/
????????@Override
????????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?{
????????????cause.printStackTrace();
????????????ctx.close();
????????}
????}
}
最終的執(zhí)行結(jié)果如下圖所示:
從上述結(jié)果中可以看出,Netty 可以正常使用了,它已經(jīng)不存在粘包和半包問題了。
2.封裝消息解決粘包問題
此解決方案的核心是將消息分為消息頭 + 消息體,在消息頭中保存消息體的長(zhǎng)度,從而確定一條消息的邊界,這樣就避免了粘包和半包問題了,它的實(shí)現(xiàn)過程如下圖所示:
在 Netty 中可以通過 LengthFieldPrepender(編碼)和 LengthFieldBasedFrameDecoder(解碼)兩個(gè)類實(shí)現(xiàn)消息的封裝。和上一個(gè)解決方案類似,我們需要分別在服務(wù)器端和客戶端通過設(shè)置通道(Channel)來解決粘包問題。
服務(wù)器端的核心代碼如下:
/**
?*?服務(wù)端通道初始化
?*/
static?class?ServerInitializer?extends?ChannelInitializer<SocketChannel>?{
????//?字符串編碼器和解碼器
????private?static?final?StringDecoder?DECODER?=?new?StringDecoder();
????private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();
????//?服務(wù)器端連接之后的執(zhí)行器(自定義的類)
????private?static?final?NettyExample.ServerHandler?SERVER_HANDLER?=?new?NettyExample.ServerHandler();
????/**
?????*?初始化通道的具體執(zhí)行方法
?????*/
????@Override
????public?void?initChannel(SocketChannel?ch)?{
????????//?通道?Channel?設(shè)置
????????ChannelPipeline?pipeline?=?ch.pipeline();
????????// 18 行:消息解碼:讀取消息頭和消息體
????????pipeline.addLast(new?LengthFieldBasedFrameDecoder(1024,?0,?4,?0,?4));
????????// 20?行:消息編碼:將消息封裝為消息頭和消息體,在消息前添加消息體的長(zhǎng)度
????????pipeline.addLast(new?LengthFieldPrepender(4));
????????//?設(shè)置(字符串)編碼器和解碼器
????????pipeline.addLast(DECODER);
????????pipeline.addLast(ENCODER);
????????//?服務(wù)器端連接之后的執(zhí)行器,接收到消息之后的業(yè)務(wù)處理
????????pipeline.addLast(SERVER_HANDLER);
????}
}
其中核心代碼是 18 行和 20 行,通過 LengthFieldPrepender 實(shí)現(xiàn)編碼(將消息打包成消息頭 + 消息體),通過 LengthFieldBasedFrameDecoder 實(shí)現(xiàn)解碼(從封裝的消息中取出消息的內(nèi)容)。
LengthFieldBasedFrameDecoder 的參數(shù)說明如下:
參數(shù) 1:maxFrameLength - 發(fā)送的數(shù)據(jù)包最大長(zhǎng)度; 參數(shù) 2:lengthFieldOffset - 長(zhǎng)度域偏移量,指的是長(zhǎng)度域位于整個(gè)數(shù)據(jù)包字節(jié)數(shù)組中的下標(biāo); 參數(shù) 3:lengthFieldLength - 長(zhǎng)度域自己的字節(jié)數(shù)長(zhǎng)度; 參數(shù) 4:lengthAdjustment – 長(zhǎng)度域的偏移量矯正。如果長(zhǎng)度域的值,除了包含有效數(shù)據(jù)域的長(zhǎng)度外,還包含了其他域(如長(zhǎng)度域自身)長(zhǎng)度,那么,就需要進(jìn)行矯正。矯正的值為:包長(zhǎng) - 長(zhǎng)度域的值 – 長(zhǎng)度域偏移 – 長(zhǎng)度域長(zhǎng); 參數(shù) 5:initialBytesToStrip – 丟棄的起始字節(jié)數(shù)。丟棄處于有效數(shù)據(jù)前面的字節(jié)數(shù)量。比如前面有 4 個(gè)節(jié)點(diǎn)的長(zhǎng)度域,則它的值為 4。
LengthFieldBasedFrameDecoder(1024,0,4,0,4) 的意思是:數(shù)據(jù)包最大長(zhǎng)度為 1024,長(zhǎng)度域占首部的四個(gè)字節(jié),在讀數(shù)據(jù)的時(shí)候去掉首部四個(gè)字節(jié)(即長(zhǎng)度域)。
客戶端的核心實(shí)現(xiàn)代碼如下:
/**
?*?客戶端通道初始化類
?*/
static?class?ClientInitializer?extends?ChannelInitializer<SocketChannel>?{
????//?字符串編碼器和解碼器
????private?static?final?StringDecoder?DECODER?=?new?StringDecoder();
????private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();
????//?客戶端連接成功之后業(yè)務(wù)處理
????private?static?final?NettyExample.ClientHandler?CLIENT_HANDLER?=?new?NettyExample.ClientHandler();
????/**
?????*?初始化客戶端通道
?????*/
????@Override
????public?void?initChannel(SocketChannel?ch)?{
????????ChannelPipeline?pipeline?=?ch.pipeline();
????????//?消息解碼:讀取消息頭和消息體
????????pipeline.addLast(new?LengthFieldBasedFrameDecoder(1024,?0,?4,?0,?4));
????????//?消息編碼:將消息封裝為消息頭和消息體,在響應(yīng)字節(jié)數(shù)據(jù)前面添加消息體長(zhǎng)度
????????pipeline.addLast(new?LengthFieldPrepender(4));
????????//?設(shè)置(字符串)編碼器和解碼器
????????pipeline.addLast(DECODER);
????????pipeline.addLast(ENCODER);
????????//?客戶端連接成功之后的業(yè)務(wù)處理
????????pipeline.addLast(CLIENT_HANDLER);
????}
}
完整的服務(wù)器端和客戶端的實(shí)現(xiàn)代碼如下:
import?io.netty.bootstrap.Bootstrap;
import?io.netty.bootstrap.ServerBootstrap;
import?io.netty.channel.*;
import?io.netty.channel.nio.NioEventLoopGroup;
import?io.netty.channel.socket.SocketChannel;
import?io.netty.channel.socket.nio.NioServerSocketChannel;
import?io.netty.channel.socket.nio.NioSocketChannel;
import?io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import?io.netty.handler.codec.LengthFieldPrepender;
import?io.netty.handler.codec.string.StringDecoder;
import?io.netty.handler.codec.string.StringEncoder;
/**
?*?通過封裝?Netty?來解決粘包
?*/
public?class?NettyExample?{
????//?定義服務(wù)器的端口號(hào)
????static?final?int?PORT?=?8007;
????/**
?????*?服務(wù)器端
?????*/
????static?class?MyNettyServer?{
????????public?static?void?main(String[]?args)?{
????????????//?創(chuàng)建一個(gè)線程組,用來負(fù)責(zé)接收客戶端連接
????????????EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();
????????????//?創(chuàng)建另一個(gè)線程組,用來負(fù)責(zé)?I/O?的讀寫
????????????EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();
????????????try?{
????????????????//?創(chuàng)建一個(gè)?Server?實(shí)例(可理解為?Netty?的入門類)
????????????????ServerBootstrap?b?=?new?ServerBootstrap();
????????????????//?將兩個(gè)線程池設(shè)置到?Server?實(shí)例
????????????????b.group(bossGroup,?workerGroup)
????????????????????????//?設(shè)置?Netty?通道的類型為?NioServerSocket(非阻塞?I/O?Socket?服務(wù)器)
????????????????????????.channel(NioServerSocketChannel.class)
????????????????????????//?設(shè)置建立連接之后的執(zhí)行器(ServerInitializer?是我創(chuàng)建的一個(gè)自定義類)
????????????????????????.childHandler(new?NettyExample.ServerInitializer());
????????????????//?綁定端口并且進(jìn)行同步
????????????????ChannelFuture?future?=?b.bind(PORT).sync();
????????????????//?對(duì)關(guān)閉通道進(jìn)行監(jiān)聽
????????????????future.channel().closeFuture().sync();
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}?finally?{
????????????????//?資源關(guān)閉
????????????????bossGroup.shutdownGracefully();
????????????????workerGroup.shutdownGracefully();
????????????}
????????}
????}
????/**
?????*?服務(wù)端通道初始化
?????*/
????static?class?ServerInitializer?extends?ChannelInitializer<SocketChannel>?{
????????//?字符串編碼器和解碼器
????????private?static?final?StringDecoder?DECODER?=?new?StringDecoder();
????????private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();
????????//?服務(wù)器端連接之后的執(zhí)行器(自定義的類)
????????private?static?final?NettyExample.ServerHandler?SERVER_HANDLER?=?new?NettyExample.ServerHandler();
????????/**
?????????*?初始化通道的具體執(zhí)行方法
?????????*/
????????@Override
????????public?void?initChannel(SocketChannel?ch)?{
????????????//?通道?Channel?設(shè)置
????????????ChannelPipeline?pipeline?=?ch.pipeline();
????????????//?消息解碼:讀取消息頭和消息體
????????????pipeline.addLast(new?LengthFieldBasedFrameDecoder(1024,?0,?4,?0,?4));
????????????//?消息編碼:將消息封裝為消息頭和消息體,在響應(yīng)字節(jié)數(shù)據(jù)前面添加消息體長(zhǎng)度
????????????pipeline.addLast(new?LengthFieldPrepender(4));
????????????//?設(shè)置(字符串)編碼器和解碼器
????????????pipeline.addLast(DECODER);
????????????pipeline.addLast(ENCODER);
????????????//?服務(wù)器端連接之后的執(zhí)行器,接收到消息之后的業(yè)務(wù)處理
????????????pipeline.addLast(SERVER_HANDLER);
????????}
????}
????/**
?????*?服務(wù)器端接收到消息之后的業(yè)務(wù)處理類
?????*/
????static?class?ServerHandler?extends?SimpleChannelInboundHandler<String>?{
????????/**
?????????*?讀取到客戶端的消息
?????????*/
????????@Override
????????public?void?channelRead0(ChannelHandlerContext?ctx,?String?request)?{
????????????if?(!request.isEmpty())?{
????????????????System.out.println("接到客戶端的消息:"?+?request);
????????????}
????????}
????????/**
?????????*?數(shù)據(jù)讀取完畢
?????????*/
????????@Override
????????public?void?channelReadComplete(ChannelHandlerContext?ctx)?{
????????????ctx.flush();
????????}
????????/**
?????????*?異常處理,打印異常并關(guān)閉通道
?????????*/
????????@Override
????????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?{
????????????cause.printStackTrace();
????????????ctx.close();
????????}
????}
????/**
?????*?客戶端
?????*/
????static?class?MyNettyClient?{
????????public?static?void?main(String[]?args)?{
????????????//?創(chuàng)建事件循環(huán)線程組(客戶端的線程組只有一個(gè))
????????????EventLoopGroup?group?=?new?NioEventLoopGroup();
????????????try?{
????????????????//?Netty?客戶端啟動(dòng)對(duì)象
????????????????Bootstrap?b?=?new?Bootstrap();
????????????????//?設(shè)置啟動(dòng)參數(shù)
????????????????b.group(group)
????????????????????????//?設(shè)置通道類型
????????????????????????.channel(NioSocketChannel.class)
????????????????????????//?設(shè)置啟動(dòng)執(zhí)行器(負(fù)責(zé)啟動(dòng)事件的業(yè)務(wù)執(zhí)行,ClientInitializer?為自定義的類)
????????????????????????.handler(new?NettyExample.ClientInitializer());
????????????????//?連接服務(wù)器端并同步通道
????????????????Channel?ch?=?b.connect("127.0.0.1",?PORT).sync().channel();
????????????????//?發(fā)送消息
????????????????ChannelFuture?lastWriteFuture?=?null;
????????????????//?給服務(wù)器端發(fā)送?10?條消息
????????????????for?(int?i?=?0;?i?10;?i++)?{
????????????????????//?發(fā)送給服務(wù)器消息
????????????????????lastWriteFuture?=?ch.writeAndFlush("Hi,Java.\n");
????????????????}
????????????????//?在關(guān)閉通道之前,同步刷新所有的消息
????????????????if?(lastWriteFuture?!=?null)?{
????????????????????lastWriteFuture.sync();
????????????????}
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}?finally?{
????????????????//?釋放資源
????????????????group.shutdownGracefully();
????????????}
????????}
????}
????/**
?????*?客戶端通道初始化類
?????*/
????static?class?ClientInitializer?extends?ChannelInitializer<SocketChannel>?{
????????//?字符串編碼器和解碼器
????????private?static?final?StringDecoder?DECODER?=?new?StringDecoder();
????????private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();
????????//?客戶端連接成功之后業(yè)務(wù)處理
????????private?static?final?NettyExample.ClientHandler?CLIENT_HANDLER?=?new?NettyExample.ClientHandler();
????????/**
?????????*?初始化客戶端通道
?????????*/
????????@Override
????????public?void?initChannel(SocketChannel?ch)?{
????????????ChannelPipeline?pipeline?=?ch.pipeline();
????????????//?消息解碼:讀取消息頭和消息體
????????????pipeline.addLast(new?LengthFieldBasedFrameDecoder(1024,?0,?4,?0,?4));
????????????//?消息編碼:將消息封裝為消息頭和消息體,在響應(yīng)字節(jié)數(shù)據(jù)前面添加消息體長(zhǎng)度
????????????pipeline.addLast(new?LengthFieldPrepender(4));
????????????//?設(shè)置(字符串)編碼器和解碼器
????????????pipeline.addLast(DECODER);
????????????pipeline.addLast(ENCODER);
????????????//?客戶端連接成功之后的業(yè)務(wù)處理
????????????pipeline.addLast(CLIENT_HANDLER);
????????}
????}
????/**
?????*?客戶端連接成功之后的業(yè)務(wù)處理
?????*/
????static?class?ClientHandler?extends?SimpleChannelInboundHandler<String>?{
????????/**
?????????*?讀取到服務(wù)器端的消息
?????????*/
????????@Override
????????protected?void?channelRead0(ChannelHandlerContext?ctx,?String?msg)?{
????????????System.err.println("接到服務(wù)器的消息:"?+?msg);
????????}
????????/**
?????????*?異常處理,打印異常并關(guān)閉通道
?????????*/
????????@Override
????????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?{
????????????cause.printStackTrace();
????????????ctx.close();
????????}
????}
}
以上程序的執(zhí)行結(jié)果為:
四、總結(jié)
本文提供了傳統(tǒng) Socket 通訊將消息分為消息頭和消息體的具體代碼實(shí)現(xiàn),然而傳統(tǒng)的 Socket 在性能和復(fù)用性上表現(xiàn)一般,為了更加高效的實(shí)現(xiàn)通訊,我們可以使用 Netty 框架來替代傳統(tǒng)的 Socket 和 NIO 編程,但 Netty 在使用時(shí)依然會(huì)出現(xiàn)粘包的問題,于是我們提供了兩種最常見的解決方案:通過分隔符或?qū)⒎庋b消息的解決方案,其中最后一種解決方案的使用更加廣泛。
參考 & 鳴謝
《Netty 核心原理剖析與 RPC 實(shí)踐》

往期推薦

Socket粘包問題的3種解決方案,最后一種最完美!

文件寫入的6種方法,這種方法性能最好

SpringBoot集成Google開源圖片處理框架,賊好用!
