45 張圖深度解析 Netty 架構(gòu)與原理
接下來(lái)我們會(huì)學(xué)習(xí)一個(gè) Netty 系列教程,Netty 系列由「架構(gòu)與原理」,「源碼」,「架構(gòu)」三部分組成,今天我們先來(lái)看看第一部分:Netty 架構(gòu)與原理初探,大綱如下:
前言 1. Netty 基礎(chǔ) 1.4.1. 緩沖區(qū)(Buffer) 1.4.2. 通道(Channel) 1.4.3. 選擇器(Selector) 1.1. Netty 是什么 1.2. Netty 的應(yīng)用場(chǎng)景 1.3. Java 中的網(wǎng)絡(luò) IO 模型 1.4. Java NIO API 簡(jiǎn)單回顧 1.5. 零拷貝技術(shù) 2. Netty 的架構(gòu)與原理 2.2.1. 單 Reactor 單線程模式 2.2.2. 單 Reactor 多線程模式 2.2.3. 主從 Reactor 多線程模式 2.1. 為什么要制造 Netty 2.2. 幾種 Reactor 線程模式 2.3. Netty 的模樣 2.4. 基于 Netty 的 TCP Server/Client 案例 2.5. Netty 的 Handler 組件 2.6. Netty 的 Pipeline 組件 2.7. Netty 的 EventLoopGroup 組件 2.8. Netty 的 TaskQueue 2.9. Netty 的 Future 和 Promise 3. 結(jié)束語(yǔ)
前言
讀者在閱讀本文前最好有 Java 的 IO 編程經(jīng)驗(yàn)(知道 Java 的各種 IO 流),以及 Java 網(wǎng)絡(luò)編程經(jīng)驗(yàn)(用 ServerSocket 和 Socket 寫過(guò) demo),并對(duì) Java NIO 有基本的認(rèn)識(shí)(至少知道 Channel、Buffer、Selector 中的核心屬性和方法,以及三者如何配合使用的),以及 JUC 編程經(jīng)驗(yàn)(至少知道其中的 Future 異步處理機(jī)制),沒(méi)有也沒(méi)關(guān)系,文中多數(shù)會(huì)介紹,不影響整體的理解。
文中對(duì)于 Reactor 的講解使用了幾張來(lái)自網(wǎng)絡(luò)上的深灰色背景的示意圖,但未找到原始出處,文中已標(biāo)注“圖片來(lái)源于網(wǎng)絡(luò)”。
Netty 的設(shè)計(jì)復(fù)雜,接口和類體系龐大,因此我會(huì)從不同的層次對(duì)有些 Netty 中的重要組件反復(fù)描述,以幫助讀者理解。
1. Netty 基礎(chǔ)
基礎(chǔ)好的同學(xué),如果已經(jīng)掌握了 Java NIO 并對(duì) IO 多路復(fù)用的概念有一定的認(rèn)知,可以跳過(guò)本章。
1.1. Netty 是什么
1)Netty 是 JBoss 開(kāi)源項(xiàng)目,是異步的、基于事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用框架,它以高性能、高并發(fā)著稱。所謂基于事件驅(qū)動(dòng),說(shuō)得簡(jiǎn)單點(diǎn)就是 Netty 會(huì)根據(jù)客戶端事件(連接、讀、寫等)做出響應(yīng),關(guān)于這點(diǎn),隨著文章的論述的展開(kāi),讀者自然會(huì)明白。
2)Netty 主要用于開(kāi)發(fā)基于 TCP 協(xié)議的網(wǎng)絡(luò) IO 程序(TCP/IP 是網(wǎng)絡(luò)通信的基石,當(dāng)然也是 Netty 的基石,Netty 并沒(méi)有去改變這些底層的網(wǎng)絡(luò)基礎(chǔ)設(shè)施,而是在這之上提供更高層的網(wǎng)絡(luò)基礎(chǔ)設(shè)施),例如高性能服務(wù)器段/客戶端、P2P 程序等。
3)Netty 是基于 Java NIO 構(gòu)建出來(lái)的,Java NIO 又是基于 Linux 提供的高性能 IO 接口/系統(tǒng)調(diào)用構(gòu)建出來(lái)的。關(guān)于 Netty 在網(wǎng)絡(luò)中的地位,下圖可以很好地表達(dá)出來(lái):

1.2. Netty 的應(yīng)用場(chǎng)景
在互聯(lián)網(wǎng)領(lǐng)域,Netty 作為異步高并發(fā)的網(wǎng)絡(luò)組件,常常用于構(gòu)建高性能 RPC 框架,以提升分布式服務(wù)群之間調(diào)用或者數(shù)據(jù)傳輸?shù)牟l(fā)度和速度。例如 Dubbo 的網(wǎng)絡(luò)層就可以(但并非一定)使用 Netty。
一些大數(shù)據(jù)基礎(chǔ)設(shè)施,比如 Hadoop,在處理海量數(shù)據(jù)的時(shí)候,數(shù)據(jù)在多個(gè)計(jì)算節(jié)點(diǎn)之中傳輸,為了提高傳輸性能,也采用 Netty 構(gòu)建性能更高的網(wǎng)絡(luò) IO 層。
在游戲行業(yè),Netty 被用于構(gòu)建高性能的游戲交互服務(wù)器,Netty 提供了 TCP/UDP、HTTP 協(xié)議棧,方便開(kāi)發(fā)者基于 Netty 進(jìn)行私有協(xié)議的開(kāi)發(fā)。
……
Netty 作為成熟的高性能異步通信框架,無(wú)論是應(yīng)用在互聯(lián)網(wǎng)分布式應(yīng)用開(kāi)發(fā)中,還是在大數(shù)據(jù)基礎(chǔ)設(shè)施構(gòu)建中,亦或是用于實(shí)現(xiàn)應(yīng)用層基于公私協(xié)議的服務(wù)器等等,都有出色的表現(xiàn),是一個(gè)極好的輪子。
1.3. Java 中的網(wǎng)絡(luò) IO 模型
Java 中的網(wǎng)絡(luò) IO 模型有三種:BIO、NIO、AIO。
1)BIO:同步的、阻塞式 IO。在這種模型中,服務(wù)器上一個(gè)線程處理一次連接,即客戶端每發(fā)起一個(gè)請(qǐng)求,服務(wù)端都要開(kāi)啟一個(gè)線程專門處理該請(qǐng)求。這種模型對(duì)線程量的耗費(fèi)極大,且線程利用率低,難以承受請(qǐng)求的高并發(fā)。BIO 雖然可以使用線程池+等待隊(duì)列進(jìn)行優(yōu)化,避免使用過(guò)多的線程,但是依然無(wú)法解決線程利用率低的問(wèn)題。

使用 BIO 構(gòu)建 C/S 系統(tǒng)的 Java 編程組件是 ServerSocket 和 Socket。服務(wù)端示例代碼為:
public?static?void?main(String[]?args)?throws?IOException?{
????ExecutorService?threadPool?=?Executors.newCachedThreadPool();
????ServerSocket?serverSocket?=?new?ServerSocket(8080);
????while?(true)?{
????????Socket?socket?=?serverSocket.accept();
????????threadPool.execute(()?->?{
????????????handler(socket);
????????});
????}
}
/**
?*?處理客戶端請(qǐng)求
?*/
private?static?void?handler(Socket?socket)?throws?IOException?{
????byte[]?bytes?=?new?byte[1024];
????InputStream?inputStream?=?socket.getInputStream();
????socket.close();
????while?(true)?{
????????int?read?=?inputStream.read(bytes);
????????if?(read?!=?-1)?{
????????????System.out.println("msg?from?client:?"?+?new?String(bytes,?0,?read));
????????}?else?{
????????????break;
????????}
????}
}
2)NIO:同步的、非阻塞式 IO。在這種模型中,服務(wù)器上一個(gè)線程處理多個(gè)連接,即多個(gè)客戶端請(qǐng)求都會(huì)被注冊(cè)到多路復(fù)用器(后文要講的 Selector)上,多路復(fù)用器會(huì)輪訓(xùn)這些連接,輪訓(xùn)到連接上有 IO 活動(dòng)就進(jìn)行處理。NIO 降低了線程的需求量,提高了線程的利用率。Netty 就是基于 NIO 的(這里有一個(gè)問(wèn)題:前文大力宣揚(yáng) Netty 是一個(gè)異步高性能網(wǎng)絡(luò)應(yīng)用框架,為何這里又說(shuō) Netty 是基于同步的 NIO 的?請(qǐng)讀者跟著文章的描述找尋答案)。

NIO 是面向緩沖區(qū)編程的,從緩沖區(qū)讀取數(shù)據(jù)的時(shí)候游標(biāo)在緩沖區(qū)中是可以前后移動(dòng)的,這就增加了數(shù)據(jù)處理的靈活性。這和面向流的 BIO 只能順序讀取流中數(shù)據(jù)有很大的不同。
Java NIO 的非阻塞模式,使得一個(gè)線程從某個(gè)通道讀取數(shù)據(jù)的時(shí)候,若當(dāng)前有可用數(shù)據(jù),則該線程進(jìn)行處理,若當(dāng)前無(wú)可用數(shù)據(jù),則該線程不會(huì)保持阻塞等待狀態(tài),而是可以去處理其他工作(比如處理其他通道的讀寫);同樣,一個(gè)線程向某個(gè)通道寫入數(shù)據(jù)的時(shí)候,一旦開(kāi)始寫入,該線程無(wú)需等待寫完即可去處理其他工作(比如處理其他通道的讀寫)。這種特性使得一個(gè)線程能夠處理多個(gè)客戶端請(qǐng)求,而不是像 BIO 那樣,一個(gè)線程只能處理一個(gè)請(qǐng)求。
使用 NIO 構(gòu)建 C/S 系統(tǒng)的 Java 編程組件是 Channel、Buffer、Selector。服務(wù)端示例代碼為:
public?static?void?main(String[]?args)?throws?IOException?{
????ServerSocketChannel?serverSocketChannel?=?ServerSocketChannel.open();
????Selector?selector?=?Selector.open();
????//?綁定端口
????serverSocketChannel.socket().bind(new?InetSocketAddress(8080));
????//?設(shè)置?serverSocketChannel?為非阻塞模式
????serverSocketChannel.configureBlocking(false);
????//?注冊(cè)?serverSocketChannel?到?selector,關(guān)注?OP_ACCEPT?事件
????serverSocketChannel.register(selector,?SelectionKey.OP_ACCEPT);
????while?(true)?{
????????//?沒(méi)有事件發(fā)生
????????if?(selector.select(1000)?==?0)?{
????????????continue;
????????}
????????//?有事件發(fā)生,找到發(fā)生事件的?Channel?對(duì)應(yīng)的?SelectionKey?的集合
????????Set?selectionKeys?=?selector.selectedKeys();
????????Iterator?iterator?=?selectionKeys.iterator();
????????while?(iterator.hasNext())?{
????????????SelectionKey?selectionKey?=?iterator.next();
????????????//?發(fā)生?OP_ACCEPT?事件,處理連接請(qǐng)求
????????????if?(selectionKey.isAcceptable())?{
????????????????SocketChannel?socketChannel?=?serverSocketChannel.accept();
????????????????//?將?socketChannel?也注冊(cè)到?selector,關(guān)注?OP_READ
????????????????//?事件,并給?socketChannel?關(guān)聯(lián)?Buffer
????????????????socketChannel.register(selector,?SelectionKey.OP_READ,?ByteBuffer.allocate(1024));
????????????}
????????????//?發(fā)生?OP_READ?事件,讀客戶端數(shù)據(jù)
????????????if?(selectionKey.isReadable())?{
????????????????SocketChannel?channel?=?(SocketChannel)?selectionKey.channel();
????????????????ByteBuffer?buffer?=?(ByteBuffer)?selectionKey.attachment();
????????????????channel.read(buffer);
????????????????System.out.println("msg?form?client:?"?+?new?String(buffer.array()));
????????????}
????????????//?手動(dòng)從集合中移除當(dāng)前的?selectionKey,防止重復(fù)處理事件
????????????iterator.remove();
????????}
????}
}
3)AIO:異步非阻塞式 IO。在這種模型中,由操作系統(tǒng)完成與客戶端之間的 read/write,之后再由操作系統(tǒng)主動(dòng)通知服務(wù)器線程去處理后面的工作,在這個(gè)過(guò)程中服務(wù)器線程不必同步等待 read/write 完成。由于不同的操作系統(tǒng)對(duì) AIO 的支持程度不同,AIO 目前未得到廣泛應(yīng)用。因此本文對(duì) AIO 不做過(guò)多描述。
使用 Java NIO 構(gòu)建的 IO 程序,它的工作模式是:主動(dòng)輪訓(xùn) IO 事件,IO 事件發(fā)生后程序的線程主動(dòng)處理 IO 工作,這種模式也叫做 Reactor 模式。使用 Java AIO 構(gòu)建的 IO 程序,它的工作模式是:將 IO 事件的處理托管給操作系統(tǒng),操作系統(tǒng)完成 IO 工作之后會(huì)通知程序的線程去處理后面的工作,這種模式也叫做 Proactor 模式。
本節(jié)最后,討論一下網(wǎng)路 IO 中阻塞、非阻塞、異步、同步這幾個(gè)術(shù)語(yǔ)的含義和關(guān)系:
阻塞:如果線程調(diào)用 read/write 過(guò)程,但 read/write 過(guò)程沒(méi)有就緒或沒(méi)有完成,則調(diào)用 read/write 過(guò)程的線程會(huì)一直等待,這個(gè)過(guò)程叫做阻塞式讀寫。 非阻塞:如果線程調(diào)用 read/write 過(guò)程,但 read/write 過(guò)程沒(méi)有就緒或沒(méi)有完成,調(diào)用 read/write 過(guò)程的線程并不會(huì)一直等待,而是去處理其他工作,等到 read/write 過(guò)程就緒或完成后再回來(lái)處理,這個(gè)過(guò)程叫做非阻塞式讀寫。 異步:read/write 過(guò)程托管給操作系統(tǒng)來(lái)完成,完成后操作系統(tǒng)會(huì)通知(通過(guò)回調(diào)或者事件)應(yīng)用網(wǎng)絡(luò) IO 程序(其中的線程)來(lái)進(jìn)行后續(xù)的處理。 同步:read/write 過(guò)程由網(wǎng)絡(luò) IO 程序(其中的線程)來(lái)完成。
基于以上含義,可以看出:異步 IO 一定是非阻塞 IO;同步 IO 既可以是阻塞 IO、也可以是非阻塞 IO。
1.4. Java NIO API 簡(jiǎn)單回顧
BIO 以流的方式處理數(shù)據(jù),而 NIO 以緩沖區(qū)(也被叫做塊)的方式處理數(shù)據(jù),塊 IO 效率比流 IO 效率高很多。BIO 基于字符流或者字節(jié)流進(jìn)行操作,而 NIO 基于 Channel 和 Buffer 進(jìn)行操作,數(shù)據(jù)總是從通道讀取到緩沖區(qū)或者從緩沖區(qū)寫入到通道。Selector 用于監(jiān)聽(tīng)多個(gè)通道上的事件(比如收到連接請(qǐng)求、數(shù)據(jù)達(dá)到等等),因此使用單個(gè)線程就可以監(jiān)聽(tīng)多個(gè)客戶端通道。如下圖所示:

關(guān)于上圖,再進(jìn)行幾點(diǎn)說(shuō)明:
一個(gè) Selector 對(duì)應(yīng)一個(gè)處理線程 一個(gè) Selector 上可以注冊(cè)多個(gè) Channel 每個(gè) Channel 都會(huì)對(duì)應(yīng)一個(gè) Buffer(有時(shí)候一個(gè) Channel 可以使用多個(gè) Buffer,這時(shí)候程序要進(jìn)行多個(gè) Buffer 的分散和聚集操作),Buffer 的本質(zhì)是一個(gè)內(nèi)存塊,底層是一個(gè)數(shù)組 Selector 會(huì)根據(jù)不同的事件在各個(gè) Channel 上切換 Buffer 是雙向的,既可以讀也可以寫,切換讀寫方向要調(diào)用 Buffer 的 flip()方法 同樣,Channel 也是雙向的,數(shù)據(jù)既可以流入也可以流出
1.4.1. 緩沖區(qū)(Buffer)
緩沖區(qū)(Buffer)本質(zhì)上是一個(gè)可讀可寫的內(nèi)存塊,可以理解成一個(gè)容器對(duì)象,Channel 讀寫文件或者網(wǎng)絡(luò)都要經(jīng)由 Buffer。在 Java NIO 中,Buffer 是一個(gè)頂層抽象類,它的常用子類有(前綴表示該 Buffer 可以存儲(chǔ)哪種類型的數(shù)據(jù)):
ByteBuffer CharBuffer ShortBuffer IntBuffer LongBuffer DoubleBuffer FloatBuffer
涵蓋了 Java 中除 boolean 之外的所有的基本數(shù)據(jù)類型。其中 ByteBuffer 支持類型化的數(shù)據(jù)存取,即可以往 ByteBuffer 中放 byte 類型數(shù)據(jù)、也可以放 char、int、long、double 等類型的數(shù)據(jù),但讀取的時(shí)候要做好類型匹配處理,否則會(huì)拋出 BufferUnderflowException。
另外,Buffer 體系中還有一個(gè)重要的 MappedByteBuffer(ByteBuffer 的子類),可以讓文件內(nèi)容直接在堆外內(nèi)存中被修改,而如何同步到文件由 NIO 來(lái)完成。本文重點(diǎn)不在于此,有興趣的可以去探究一下 MappedByteBuffer 的底層原理。
1.4.2. 通道(Channel)
通道(Channel)是雙向的,可讀可寫。在 Java NIO 中,Buffer 是一個(gè)頂層接口,它的常用子類有:
FileChannel:用于文件讀寫 DatagramChannel:用于 UDP 數(shù)據(jù)包收發(fā) ServerSocketChannel:用于服務(wù)端 TCP 數(shù)據(jù)包收發(fā) SocketChannel:用于客戶端 TCP 數(shù)據(jù)包收發(fā)
1.4.3. 選擇器(Selector)
選擇器(Selector)是實(shí)現(xiàn) IO 多路復(fù)用的關(guān)鍵,多個(gè) Channel 注冊(cè)到某個(gè) Selector 上,當(dāng) Channel 上有事件發(fā)生時(shí),Selector 就會(huì)取得事件然后調(diào)用線程去處理事件。也就是說(shuō)只有當(dāng)連接上真正有讀寫等事件發(fā)生時(shí),線程才會(huì)去進(jìn)行讀寫等操作,這就不必為每個(gè)連接都創(chuàng)建一個(gè)線程,一個(gè)線程可以應(yīng)對(duì)多個(gè)連接。這就是 IO 多路復(fù)用的要義。
Netty 的 IO 線程 NioEventLoop 聚合了 Selector,可以同時(shí)并發(fā)處理成百上千的客戶端連接,后文會(huì)展開(kāi)描述。
在 Java NIO 中,Selector 是一個(gè)抽象類,它的常用方法有:
public?abstract?class?Selector?implements?Closeable?{
????......
????
????/**
?????*?得到一個(gè)選擇器對(duì)象
?????*/
????public?static?Selector?open()?throws?IOException?{
????????return?SelectorProvider.provider().openSelector();
????}
????......
????/**
?????*?返回所有發(fā)生事件的?Channel?對(duì)應(yīng)的?SelectionKey?的集合,通過(guò)
?????*?SelectionKey?可以找到對(duì)應(yīng)的?Channel
?????*/
????public?abstract?Set?selectedKeys() ;
????......
????
????/**
?????*?返回所有?Channel?對(duì)應(yīng)的?SelectionKey?的集合,通過(guò)?SelectionKey
?????*?可以找到對(duì)應(yīng)的?Channel
?????*/
????public?abstract?Set?keys() ;
????......
????
????/**
?????*?監(jiān)控所有注冊(cè)的?Channel,當(dāng)其中的?Channel?有?IO?操作可以進(jìn)行時(shí),
?????*?將這些 Channel 對(duì)應(yīng)的 SelectionKey 找到。參數(shù)用于設(shè)置超時(shí)時(shí)間
?????*/
????public?abstract?int?select(long?timeout)?throws?IOException;
????
????/**
????*?無(wú)超時(shí)時(shí)間的?select?過(guò)程,一直等待,直到發(fā)現(xiàn)有?Channel?可以進(jìn)行
????*?IO?操作
????*/
????public?abstract?int?select()?throws?IOException;
????
????/**
????*?立即返回的?select?過(guò)程
????*/
????public?abstract?int?selectNow()?throws?IOException;
????......
????
????/**
????*?喚醒?Selector,對(duì)無(wú)超時(shí)時(shí)間的?select?過(guò)程起作用,終止其等待
????*/
????public?abstract?Selector?wakeup();
????......
}
在上文的使用 Java NIO 編寫的服務(wù)端示例代碼中,服務(wù)端的工作流程為:
1)當(dāng)客戶端發(fā)起連接時(shí),會(huì)通過(guò) ServerSocketChannel 創(chuàng)建對(duì)應(yīng)的 SocketChannel。
2)調(diào)用 SocketChannel 的注冊(cè)方法將 SocketChannel 注冊(cè)到 Selector 上,注冊(cè)方法返回一個(gè) SelectionKey,該 SelectionKey 會(huì)被放入 Selector 內(nèi)部的 SelectionKey 集合中。該 SelectionKey 和 Selector 關(guān)聯(lián)(即通過(guò) SelectionKey 可以找到對(duì)應(yīng)的 Selector),也和 SocketChannel 關(guān)聯(lián)(即通過(guò) SelectionKey 可以找到對(duì)應(yīng)的 SocketChannel)。
4)Selector 會(huì)調(diào)用 select()/select(timeout)/selectNow()方法對(duì)內(nèi)部的 SelectionKey 集合關(guān)聯(lián)的 SocketChannel 集合進(jìn)行監(jiān)聽(tīng),找到有事件發(fā)生的 SocketChannel 對(duì)應(yīng)的 SelectionKey。
5)通過(guò) SelectionKey 找到有事件發(fā)生的 SocketChannel,完成數(shù)據(jù)處理。
以上過(guò)程的相關(guān)源碼為:
/**
*?SocketChannel?繼承?AbstractSelectableChannel
*/
public?abstract?class?SocketChannel
????extends?AbstractSelectableChannel
????implements?ByteChannel,?
???????????????ScatteringByteChannel,?
???????????????GatheringByteChannel,?
???????????????NetworkChannel
{
????......
}
public?abstract?class?AbstractSelectableChannel
????extends?SelectableChannel
{
????......
????/**
?????*?AbstractSelectableChannel?中包含注冊(cè)方法,SocketChannel?實(shí)例
?????*?借助該注冊(cè)方法注冊(cè)到?Selector?實(shí)例上去,該方法返回?SelectionKey
?????*/
????public?final?SelectionKey?register(
????????//?指明注冊(cè)到哪個(gè)?Selector?實(shí)例
????????Selector?sel,?
????????//?ops?是事件代碼,告訴?Selector?應(yīng)該關(guān)注該通道的什么事件
????????int?ops,
????????//?附加信息?attachment
????????Object?att)?throws?ClosedChannelException?{
????????......
????}
????......
}
public?abstract?class?SelectionKey?{
????......
????/**
?????*?獲取該?SelectionKey?對(duì)應(yīng)的?Channel
?????*/
????public?abstract?SelectableChannel?channel();
????/**
?????*?獲取該?SelectionKey?對(duì)應(yīng)的?Selector
?????*/
????public?abstract?Selector?selector();
????......
????
????/**
?????*?事件代碼,上面的?ops?參數(shù)取這里的值
?????*/
????public?static?final?int?OP_READ?=?1?<0;
????public?static?final?int?OP_WRITE?=?1?<2;
????public?static?final?int?OP_CONNECT?=?1?<3;
????public?static?final?int?OP_ACCEPT?=?1?<4;
????......
????
????/**
?????*?檢查該?SelectionKey?對(duì)應(yīng)的?Channel?是否可讀
?????*/
????public?final?boolean?isReadable()?{
????????return?(readyOps()?&?OP_READ)?!=?0;
????}
????/**
?????*?檢查該?SelectionKey?對(duì)應(yīng)的?Channel?是否可寫
?????*/
????public?final?boolean?isWritable()?{
????????return?(readyOps()?&?OP_WRITE)?!=?0;
????}
????/**
?????*?檢查該?SelectionKey?對(duì)應(yīng)的?Channel?是否已經(jīng)建立起?socket?連接
?????*/
????public?final?boolean?isConnectable()?{
????????return?(readyOps()?&?OP_CONNECT)?!=?0;
????}
????/**
?????*?檢查該?SelectionKey?對(duì)應(yīng)的?Channel?是否準(zhǔn)備好接受一個(gè)新的?socket?連接
?????*/
????public?final?boolean?isAcceptable()?{
????????return?(readyOps()?&?OP_ACCEPT)?!=?0;
????}
????/**
?????*?添加附件(例如?Buffer)
?????*/
????public?final?Object?attach(Object?ob)?{
????????return?attachmentUpdater.getAndSet(this,?ob);
????}
????/**
?????*?獲取附件
?????*/
????public?final?Object?attachment()?{
????????return?attachment;
????}
????......
}
下圖用于輔助讀者理解上面的過(guò)程和源碼:

首先說(shuō)明,本文以 Linux 系統(tǒng)為對(duì)象來(lái)研究文件 IO 模型和網(wǎng)絡(luò) IO 模型。
1.5. 零拷貝技術(shù)
注:本節(jié)討論的是 Linux 系統(tǒng)下的 IO 過(guò)程。并且對(duì)于零拷貝技術(shù)的講解采用了一種淺顯易懂但能觸及其本質(zhì)的方式,因?yàn)檫@個(gè)話題,展開(kāi)來(lái)講實(shí)在是有太多的細(xì)節(jié)要關(guān)注。
在“將本地磁盤中文件發(fā)送到網(wǎng)絡(luò)中”這一場(chǎng)景中,零拷貝技術(shù)是提升 IO 效率的一個(gè)利器,為了對(duì)比出零拷貝技術(shù)的優(yōu)越性,下面依次給出使用直接 IO 技術(shù)、內(nèi)存映射文件技術(shù)、零拷貝技術(shù)實(shí)現(xiàn)將本地磁盤文件發(fā)送到網(wǎng)絡(luò)中的過(guò)程。
1)直接 IO 技術(shù)
使用直接 IO 技術(shù)實(shí)現(xiàn)文件傳輸?shù)倪^(guò)程如下圖所示。

上圖中,內(nèi)核緩沖區(qū)是 Linux 系統(tǒng)的 Page Cahe。為了加快磁盤的 IO,Linux 系統(tǒng)會(huì)把磁盤上的數(shù)據(jù)以 Page 為單位緩存在操作系統(tǒng)的內(nèi)存里,這里的 Page 是 Linux 系統(tǒng)定義的一個(gè)邏輯概念,一個(gè) Page 一般為 4K。
可以看出,整個(gè)過(guò)程有四次數(shù)據(jù)拷貝,讀進(jìn)來(lái)兩次,寫回去又兩次:磁盤-->內(nèi)核緩沖區(qū)-->Socket 緩沖區(qū)-->網(wǎng)絡(luò)。
直接 IO 過(guò)程使用的 Linux 系統(tǒng) API 為:
ssize_t?read(int?filedes,?void?*buf,?size_t?nbytes);
ssize_t?write(int?filedes,?void?*buf,?size_t?nbytes);
等函數(shù)。
2)內(nèi)存映射文件技術(shù)
使用內(nèi)存映射文件技術(shù)實(shí)現(xiàn)文件傳輸?shù)倪^(guò)程如下圖所示。

可以看出,整個(gè)過(guò)程有三次數(shù)據(jù)拷貝,不再經(jīng)過(guò)應(yīng)用程序內(nèi)存,直接在內(nèi)核空間中從內(nèi)核緩沖區(qū)拷貝到 Socket 緩沖區(qū)。
內(nèi)存映射文件過(guò)程使用的 Linux 系統(tǒng) API 為:
void?*mmap(void?*addr,?size_t?length,?int?prot,?int?flags,?int?fd,?off_t?offset);
3)零拷貝技術(shù)
使用零拷貝技術(shù),連內(nèi)核緩沖區(qū)到 Socket 緩沖區(qū)的拷貝也省略了,如下圖所示:

內(nèi)核緩沖區(qū)到 Socket 緩沖區(qū)之間并沒(méi)有做數(shù)據(jù)的拷貝,只是一個(gè)地址的映射。底層的網(wǎng)卡驅(qū)動(dòng)程序要讀取數(shù)據(jù)并發(fā)送到網(wǎng)絡(luò)上的時(shí)候,看似讀取的是 Socket 的緩沖區(qū)中的數(shù)據(jù),其實(shí)直接讀的是內(nèi)核緩沖區(qū)中的數(shù)據(jù)。
零拷貝中所謂的“零”指的是內(nèi)存中數(shù)據(jù)拷貝的次數(shù)為 0。
零拷貝過(guò)程使用的 Linux 系統(tǒng) API 為:
ssize_t?sendfile(int?out_fd,?int?in_fd,?off_t?*offset,?size_t?count);
在 JDK 中,提供的:
FileChannel.transderTo(long?position,?long?count,?WritableByteChannel?target);
方法實(shí)現(xiàn)了零拷貝過(guò)程,其中的第三個(gè)參數(shù)可以傳入 SocketChannel 實(shí)例。例如客戶端使用以上的零拷貝接口向服務(wù)器傳輸文件的代碼為:
public?static?void?main(String[]?args)?throws?IOException?{
????SocketChannel?socketChannel?=?SocketChannel.open();
????socketChannel.connect(new?InetSocketAddress("127.0.0.1",?8080));
????String?fileName?=?"test.zip";
????//?得到一個(gè)文件?channel
????FileChannel?fileChannel?=?new?FileInputStream(fileName).getChannel();
????
????//?使用零拷貝?IO?技術(shù)發(fā)送
????long?transferSize?=?fileChannel.transferTo(0,?fileChannel.size(),?socketChannel);
????System.out.println("file?transfer?done,?size:?"?+?transferSize);
????fileChannel.close();
}
以上部分為第一章,學(xué)習(xí) Netty 需要的基礎(chǔ)知識(shí)。
2. Netty 的架構(gòu)與原理
2.1. 為什么要制造 Netty
既然 Java 提供了 NIO,為什么還要制造一個(gè) Netty,主要原因是 Java NIO 有以下幾個(gè)缺點(diǎn):
1)Java NIO 的類庫(kù)和 API 龐大繁雜,使用起來(lái)很麻煩,開(kāi)發(fā)工作量大。
2)使用 Java NIO,程序員需要具備高超的 Java 多線程編碼技能,以及非常熟悉網(wǎng)絡(luò)編程,比如要處理斷連重連、網(wǎng)絡(luò)閃斷、半包讀寫、失敗緩存、網(wǎng)絡(luò)擁塞和異常流處理等一系列棘手的工作。
3)Java NIO 存在 Bug,例如 Epoll Bug 會(huì)導(dǎo)致 Selector 空輪訓(xùn),極大耗費(fèi) CPU 資源。
Netty 對(duì)于 JDK 自帶的 NIO 的 API 進(jìn)行了封裝,解決了上述問(wèn)題,提高了 IO 程序的開(kāi)發(fā)效率和可靠性,同時(shí) Netty:
1)設(shè)計(jì)優(yōu)雅,提供阻塞和非阻塞的 Socket;提供靈活可拓展的事件模型;提供高度可定制的線程模型。
2)具備更高的性能和更大的吞吐量,使用零拷貝技術(shù)最小化不必要的內(nèi)存復(fù)制,減少資源的消耗。
3)提供安全傳輸特性。
4)支持多種主流協(xié)議;預(yù)置多種編解碼功能,支持用戶開(kāi)發(fā)私有協(xié)議。
**注:所謂支持 TCP、UDP、HTTP、WebSocket 等協(xié)議,就是說(shuō) Netty 提供了相關(guān)的編程類和接口,因此本文后面主要對(duì)基于 Netty 的 TCP Server/Client 開(kāi)發(fā)案例進(jìn)行講解,以展示 Netty 的核心原理,對(duì)于其他協(xié)議 Server/Client 開(kāi)發(fā)不再給出示例,幫助讀者提升內(nèi)力而非教授花招是我寫作的出發(fā)點(diǎn) :-) **
下圖為 Netty 官網(wǎng)給出的 Netty 架構(gòu)圖。

我們從其中的幾個(gè)關(guān)鍵詞就能看出 Netty 的強(qiáng)大之處:零拷貝、可拓展事件模型;支持 TCP、UDP、HTTP、WebSocket 等協(xié)議;提供安全傳輸、壓縮、大文件傳輸、編解碼支持等等。
2.2. 幾種 Reactor 線程模式
傳統(tǒng)的 BIO 服務(wù)端編程采用“每線程每連接”的處理模型,弊端很明顯,就是面對(duì)大量的客戶端并發(fā)連接時(shí),服務(wù)端的資源壓力很大;并且線程的利用率很低,如果當(dāng)前線程沒(méi)有數(shù)據(jù)可讀,它會(huì)阻塞在 read 操作上。這個(gè)模型的基本形態(tài)如下圖所示(圖片來(lái)源于網(wǎng)絡(luò))。

BIO 服務(wù)端編程采用的是 Reactor 模式(也叫做 Dispatcher 模式,分派模式),Reactor 模式有兩個(gè)要義:
1)基于 IO 多路復(fù)用技術(shù),多個(gè)連接共用一個(gè)多路復(fù)用器,應(yīng)用程序的線程無(wú)需阻塞等待所有連接,只需阻塞等待多路復(fù)用器即可。當(dāng)某個(gè)連接上有新數(shù)據(jù)可以處理時(shí),應(yīng)用程序的線程從阻塞狀態(tài)返回,開(kāi)始處理這個(gè)連接上的業(yè)務(wù)。
2)基于線程池技術(shù)復(fù)用線程資源,不必為每個(gè)連接創(chuàng)建專用的線程,應(yīng)用程序?qū)⑦B接上的業(yè)務(wù)處理任務(wù)分配給線程池中的線程進(jìn)行處理,一個(gè)線程可以處理多個(gè)連接的業(yè)務(wù)。
下圖反應(yīng)了 Reactor 模式的基本形態(tài)(圖片來(lái)源于網(wǎng)絡(luò)):

Reactor 模式有兩個(gè)核心組成部分:
1)Reactor(圖中的 ServiceHandler):Reactor 在一個(gè)單獨(dú)的線程中運(yùn)行,負(fù)責(zé)監(jiān)聽(tīng)和分發(fā)事件,分發(fā)給適當(dāng)?shù)奶幚砭€程來(lái)對(duì) IO 事件做出反應(yīng)。
2)Handlers(圖中的 EventHandler):處理線程執(zhí)行處理方法來(lái)響應(yīng) I/O 事件,處理線程執(zhí)行的是非阻塞操作。
Reactor 模式就是實(shí)現(xiàn)網(wǎng)絡(luò) IO 程序高并發(fā)特性的關(guān)鍵。它又可以分為單 Reactor 單線程模式、單 Reactor 多線程模式、主從 Reactor 多線程模式。
2.2.1. 單 Reactor 單線程模式
單 Reactor 單線程模式的基本形態(tài)如下(圖片來(lái)源于網(wǎng)絡(luò)):

這種模式的基本工作流程為:
1)Reactor 通過(guò) select 監(jiān)聽(tīng)客戶端請(qǐng)求事件,收到事件之后通過(guò) dispatch 進(jìn)行分發(fā)
2)如果事件是建立連接的請(qǐng)求事件,則由 Acceptor 通過(guò) accept 處理連接請(qǐng)求,然后創(chuàng)建一個(gè) Handler 對(duì)象處理連接建立后的后續(xù)業(yè)務(wù)處理。
3)如果事件不是建立連接的請(qǐng)求事件,則由 Reactor 對(duì)象分發(fā)給連接對(duì)應(yīng)的 Handler 處理。
4)Handler 會(huì)完成 read-->業(yè)務(wù)處理-->send 的完整處理流程。
這種模式的優(yōu)點(diǎn)是:模型簡(jiǎn)單,沒(méi)有多線程、進(jìn)程通信、競(jìng)爭(zhēng)的問(wèn)題,一個(gè)線程完成所有的事件響應(yīng)和業(yè)務(wù)處理。當(dāng)然缺點(diǎn)也很明顯:
1)存在性能問(wèn)題,只有一個(gè)線程,無(wú)法完全發(fā)揮多核 CPU 的性能。Handler 在處理某個(gè)連接上的業(yè)務(wù)時(shí),整個(gè)進(jìn)程無(wú)法處理其他連接事件,很容易導(dǎo)致性能瓶頸。
2)存在可靠性問(wèn)題,若線程意外終止,或者進(jìn)入死循環(huán),會(huì)導(dǎo)致整個(gè)系統(tǒng)通信模塊不可用,不能接收和處理外部消息,造成節(jié)點(diǎn)故障。
單 Reactor 單線程模式使用場(chǎng)景為:客戶端的數(shù)量有限,業(yè)務(wù)處理非常快速,比如 Redis 在業(yè)務(wù)處理的時(shí)間復(fù)雜度為 O(1)的情況。
2.2.2. 單 Reactor 多線程模式
單 Reactor 單線程模式的基本形態(tài)如下(圖片來(lái)源于網(wǎng)絡(luò)):

這種模式的基本工作流程為:
1)Reactor 對(duì)象通過(guò) select 監(jiān)聽(tīng)客戶端請(qǐng)求事件,收到事件后通過(guò) dispatch 進(jìn)行分發(fā)。
2)如果事件是建立連接的請(qǐng)求事件,則由 Acceptor 通過(guò) accept 處理連接請(qǐng)求,然后創(chuàng)建一個(gè) Handler 對(duì)象處理連接建立后的后續(xù)業(yè)務(wù)處理。
3)如果事件不是建立連接的請(qǐng)求事件,則由 Reactor 對(duì)象分發(fā)給連接對(duì)應(yīng)的 Handler 處理。Handler 只負(fù)責(zé)響應(yīng)事件,不做具體的業(yè)務(wù)處理,Handler 通過(guò) read 讀取到請(qǐng)求數(shù)據(jù)后,會(huì)分發(fā)給后面的 Worker 線程池來(lái)處理業(yè)務(wù)請(qǐng)求。
4)Worker 線程池會(huì)分配獨(dú)立線程來(lái)完成真正的業(yè)務(wù)處理,并將處理結(jié)果返回給 Handler。Handler 通過(guò) send 向客戶端發(fā)送響應(yīng)數(shù)據(jù)。
這種模式的優(yōu)點(diǎn)是可以充分的利用多核 cpu 的處理能力,缺點(diǎn)是多線程數(shù)據(jù)共享和控制比較復(fù)雜,Reactor 處理所有的事件的監(jiān)聽(tīng)和響應(yīng),在單線程中運(yùn)行,面對(duì)高并發(fā)場(chǎng)景還是容易出現(xiàn)性能瓶頸。
2.2.3. 主從 Reactor 多線程模式
主從 Reactor 多線程模式的基本形態(tài)如下(第一章圖片來(lái)源于網(wǎng)絡(luò),第二章圖片是 JUC 作者 Doug Lea 老師在《Scalable IO in Java》中給出的示意圖,兩張圖表達(dá)的含義一樣):


針對(duì)單 Reactor 多線程模型中,Reactor 在單個(gè)線程中運(yùn)行,面對(duì)高并發(fā)的場(chǎng)景易成為性能瓶頸的缺陷,主從 Reactor 多線程模式讓 Reactor 在多個(gè)線程中運(yùn)行(分成 MainReactor 線程與 SubReactor 線程)。這種模式的基本工作流程為:
1)Reactor 主線程 MainReactor 對(duì)象通過(guò) select 監(jiān)聽(tīng)客戶端連接事件,收到事件后,通過(guò) Acceptor 處理客戶端連接事件。
2)當(dāng) Acceptor 處理完客戶端連接事件之后(與客戶端建立好 Socket 連接),MainReactor 將連接分配給 SubReactor。(即:MainReactor 只負(fù)責(zé)監(jiān)聽(tīng)客戶端連接請(qǐng)求,和客戶端建立連接之后將連接交由 SubReactor 監(jiān)聽(tīng)后面的 IO 事件。)
3)SubReactor 將連接加入到自己的連接隊(duì)列進(jìn)行監(jiān)聽(tīng),并創(chuàng)建 Handler 對(duì)各種事件進(jìn)行處理。
4)當(dāng)連接上有新事件發(fā)生的時(shí)候,SubReactor 就會(huì)調(diào)用對(duì)應(yīng)的 Handler 處理。
5)Handler 通過(guò) read 從連接上讀取請(qǐng)求數(shù)據(jù),將請(qǐng)求數(shù)據(jù)分發(fā)給 Worker 線程池進(jìn)行業(yè)務(wù)處理。
6)Worker 線程池會(huì)分配獨(dú)立線程來(lái)完成真正的業(yè)務(wù)處理,并將處理結(jié)果返回給 Handler。Handler 通過(guò) send 向客戶端發(fā)送響應(yīng)數(shù)據(jù)。
7)一個(gè) MainReactor 可以對(duì)應(yīng)多個(gè) SubReactor,即一個(gè) MainReactor 線程可以對(duì)應(yīng)多個(gè) SubReactor 線程。
這種模式的優(yōu)點(diǎn)是:
1)MainReactor 線程與 SubReactor 線程的數(shù)據(jù)交互簡(jiǎn)單職責(zé)明確,MainReactor 線程只需要接收新連接,SubReactor 線程完成后續(xù)的業(yè)務(wù)處理。
2)MainReactor 線程與 SubReactor 線程的數(shù)據(jù)交互簡(jiǎn)單, MainReactor 線程只需要把新連接傳給 SubReactor 線程,SubReactor 線程無(wú)需返回?cái)?shù)據(jù)。
3)多個(gè) SubReactor 線程能夠應(yīng)對(duì)更高的并發(fā)請(qǐng)求。
這種模式的缺點(diǎn)是編程復(fù)雜度較高。但是由于其優(yōu)點(diǎn)明顯,在許多項(xiàng)目中被廣泛使用,包括 Nginx、Memcached、Netty 等。
這種模式也被叫做服務(wù)器的 1+M+N 線程模式,即使用該模式開(kāi)發(fā)的服務(wù)器包含一個(gè)(或多個(gè),1 只是表示相對(duì)較少)連接建立線程+M 個(gè) IO 線程+N 個(gè)業(yè)務(wù)處理線程。這是業(yè)界成熟的服務(wù)器程序設(shè)計(jì)模式。
2.3. Netty 的模樣
Netty 的設(shè)計(jì)主要基于主從 Reactor 多線程模式,并做了一定的改進(jìn)。本節(jié)將使用一種漸進(jìn)式的描述方式展示 Netty 的模樣,即先給出 Netty 的簡(jiǎn)單版本,然后逐漸豐富其細(xì)節(jié),直至展示出 Netty 的全貌。
簡(jiǎn)單版本的 Netty 的模樣如下:

關(guān)于這張圖,作以下幾點(diǎn)說(shuō)明:
1)BossGroup 線程維護(hù) Selector,ServerSocketChannel 注冊(cè)到這個(gè) Selector 上,只關(guān)注連接建立請(qǐng)求事件(相當(dāng)于主 Reactor)。
2)當(dāng)接收到來(lái)自客戶端的連接建立請(qǐng)求事件的時(shí)候,通過(guò) ServerSocketChannel.accept 方法獲得對(duì)應(yīng)的 SocketChannel,并封裝成 NioSocketChannel 注冊(cè)到 WorkerGroup 線程中的 Selector,每個(gè) Selector 運(yùn)行在一個(gè)線程中(相當(dāng)于從 Reactor)。
3)當(dāng) WorkerGroup 線程中的 Selector 監(jiān)聽(tīng)到自己感興趣的 IO 事件后,就調(diào)用 Handler 進(jìn)行處理。
我們給這簡(jiǎn)單版的 Netty 添加一些細(xì)節(jié):

關(guān)于這張圖,作以下幾點(diǎn)說(shuō)明:
1)有兩組線程池:BossGroup 和 WorkerGroup,BossGroup 中的線程(可以有多個(gè),圖中只畫了一個(gè))專門負(fù)責(zé)和客戶端建立連接,WorkerGroup 中的線程專門負(fù)責(zé)處理連接上的讀寫。
2)BossGroup 和 WorkerGroup 含有多個(gè)不斷循環(huán)的執(zhí)行事件處理的線程,每個(gè)線程都包含一個(gè) Selector,用于監(jiān)聽(tīng)注冊(cè)在其上的 Channel。
3)每個(gè) BossGroup 中的線程循環(huán)執(zhí)行以下三個(gè)步驟:
3.1)輪訓(xùn)注冊(cè)在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
3.2)處理 accept 事件,與客戶端建立連接,生成一個(gè) NioSocketChannel,并將其注冊(cè)到 WorkerGroup 中某個(gè)線程上的 Selector 上
3.3)再去以此循環(huán)處理任務(wù)隊(duì)列中的下一個(gè)事件
4)每個(gè) WorkerGroup 中的線程循環(huán)執(zhí)行以下三個(gè)步驟:
4.1)輪訓(xùn)注冊(cè)在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)
4.2)在對(duì)應(yīng)的 NioSocketChannel 上處理 read/write 事件
4.3)再去以此循環(huán)處理任務(wù)隊(duì)列中的下一個(gè)事件
我們?cè)賮?lái)看下終極版的 Netty 的模樣,如下圖所示(圖片來(lái)源于網(wǎng)絡(luò)):

關(guān)于這張圖,作以下幾點(diǎn)說(shuō)明:
1)Netty 抽象出兩組線程池:BossGroup 和 WorkerGroup,也可以叫做 BossNioEventLoopGroup 和 WorkerNioEventLoopGroup。每個(gè)線程池中都有 NioEventLoop 線程。BossGroup 中的線程專門負(fù)責(zé)和客戶端建立連接,WorkerGroup 中的線程專門負(fù)責(zé)處理連接上的讀寫。BossGroup 和 WorkerGroup 的類型都是 NioEventLoopGroup。
2)NioEventLoopGroup 相當(dāng)于一個(gè)事件循環(huán)組,這個(gè)組中含有多個(gè)事件循環(huán),每個(gè)事件循環(huán)就是一個(gè) NioEventLoop。
3)NioEventLoop 表示一個(gè)不斷循環(huán)的執(zhí)行事件處理的線程,每個(gè) NioEventLoop 都包含一個(gè) Selector,用于監(jiān)聽(tīng)注冊(cè)在其上的 Socket 網(wǎng)絡(luò)連接(Channel)。
4)NioEventLoopGroup 可以含有多個(gè)線程,即可以含有多個(gè) NioEventLoop。
5)每個(gè) BossNioEventLoop 中循環(huán)執(zhí)行以下三個(gè)步驟:
5.1)select:輪訓(xùn)注冊(cè)在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
5.2)processSelectedKeys:處理 accept 事件,與客戶端建立連接,生成一個(gè) NioSocketChannel,并將其注冊(cè)到某個(gè) WorkerNioEventLoop 上的 Selector 上
5.3)runAllTasks:再去以此循環(huán)處理任務(wù)隊(duì)列中的其他任務(wù)
6)每個(gè) WorkerNioEventLoop 中循環(huán)執(zhí)行以下三個(gè)步驟:
6.1)select:輪訓(xùn)注冊(cè)在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)
6.2)processSelectedKeys:在對(duì)應(yīng)的 NioSocketChannel 上處理 read/write 事件
6.3)runAllTasks:再去以此循環(huán)處理任務(wù)隊(duì)列中的其他任務(wù)
7)在以上兩個(gè)processSelectedKeys步驟中,會(huì)使用 Pipeline(管道),Pipeline 中引用了 Channel,即通過(guò) Pipeline 可以獲取到對(duì)應(yīng)的 Channel,Pipeline 中維護(hù)了很多的處理器(攔截處理器、過(guò)濾處理器、自定義處理器等)。這里暫時(shí)不詳細(xì)展開(kāi)講解 Pipeline。
2.4. 基于 Netty 的 TCP Server/Client 案例
下面我們寫點(diǎn)代碼來(lái)加深理解 Netty 的模樣。下面兩段代碼分別是基于 Netty 的 TCP Server 和 TCP Client。
服務(wù)端代碼為:
/**
?*?需要的依賴:
?*?
?*?io.netty
?*?netty-all
?*?4.1.52.Final
?*?
?*/
public?static?void?main(String[]?args)?throws?InterruptedException?{
????//?創(chuàng)建?BossGroup?和?WorkerGroup
????//?1.?bossGroup?只處理連接請(qǐng)求
????//?2.?業(yè)務(wù)處理由?workerGroup?來(lái)完成
????EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();
????EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();
????try?{
????????//?創(chuàng)建服務(wù)器端的啟動(dòng)對(duì)象
????????ServerBootstrap?bootstrap?=?new?ServerBootstrap();
????????//?配置參數(shù)
????????bootstrap
????????????????//?設(shè)置線程組
????????????????.group(bossGroup,?workerGroup)
????????????????//?說(shuō)明服務(wù)器端通道的實(shí)現(xiàn)類(便于?Netty?做反射處理)
????????????????.channel(NioServerSocketChannel.class)
????????????????//?設(shè)置等待連接的隊(duì)列的容量(當(dāng)客戶端連接請(qǐng)求速率大
?????????????//?于?NioServerSocketChannel?接收速率的時(shí)候,會(huì)使用
????????????????//?該隊(duì)列做緩沖)
????????????????//?option()方法用于給服務(wù)端的?ServerSocketChannel
????????????????//?添加配置
????????????????.option(ChannelOption.SO_BACKLOG,?128)
????????????????//?設(shè)置連接保活
????????????????//?childOption()方法用于給服務(wù)端?ServerSocketChannel
????????????????//?接收到的?SocketChannel?添加配置
????????????????.childOption(ChannelOption.SO_KEEPALIVE,?true)
????????????????//?handler()方法用于給?BossGroup?設(shè)置業(yè)務(wù)處理器
????????????????//?childHandler()方法用于給?WorkerGroup?設(shè)置業(yè)務(wù)處理器
????????????????.childHandler(
????????????????????????//?創(chuàng)建一個(gè)通道初始化對(duì)象
????????????????????????new?ChannelInitializer<SocketChannel>()?{
????????????????????????????//?向?Pipeline?添加業(yè)務(wù)處理器
????????????????????????????@Override
????????????????????????????protected?void?initChannel(
????????????????????????????????????SocketChannel?socketChannel
????????????????????????????)?throws?Exception?{
????????????????????????????????socketChannel.pipeline().addLast(
????????????????????????????????????????new?NettyServerHandler()
????????????????????????????????);
????????????????????????????????
????????????????????????????????//?可以繼續(xù)調(diào)用?socketChannel.pipeline().addLast()
????????????????????????????????//?添加更多?Handler
????????????????????????????}
????????????????????????}
????????????????);
????????System.out.println("server?is?ready...");
????????//?綁定端口,啟動(dòng)服務(wù)器,生成一個(gè)?channelFuture?對(duì)象,
????????//?ChannelFuture?涉及到?Netty?的異步模型,后面展開(kāi)講
????????ChannelFuture?channelFuture?=?bootstrap.bind(8080).sync();
????????//?對(duì)通道關(guān)閉進(jìn)行監(jiān)聽(tīng)
????????channelFuture.channel().closeFuture().sync();
????}?finally?{
????????bossGroup.shutdownGracefully();
????????workerGroup.shutdownGracefully();
????}
}
/**
?*?自定義一個(gè)?Handler,需要繼承?Netty?規(guī)定好的某個(gè)?HandlerAdapter(規(guī)范)
?*?InboundHandler?用于處理數(shù)據(jù)流入本端(服務(wù)端)的?IO?事件
?*?InboundHandler?用于處理數(shù)據(jù)流出本端(服務(wù)端)的?IO?事件
?*/
static?class?NettyServerHandler?extends?ChannelInboundHandlerAdapter?{
????/**
?????*?當(dāng)通道有數(shù)據(jù)可讀時(shí)執(zhí)行
?????*
?????*?@param?ctx?上下文對(duì)象,可以從中取得相關(guān)聯(lián)的?Pipeline、Channel、客戶端地址等
?????*?@param?msg?客戶端發(fā)送的數(shù)據(jù)
?????*?@throws?Exception
?????*/
????@Override
????public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)
????????????throws?Exception?{
????????//?接收客戶端發(fā)來(lái)的數(shù)據(jù)
????????System.out.println("client?address:?"
????????????????+?ctx.channel().remoteAddress());
????????//?ByteBuf?是?Netty?提供的類,比?NIO?的?ByteBuffer?性能更高
????????ByteBuf?byteBuf?=?(ByteBuf)?msg;
????????System.out.println("data?from?client:?"
????????????????+?byteBuf.toString(CharsetUtil.UTF_8));
????}
????/**
?????*?數(shù)據(jù)讀取完畢后執(zhí)行
?????*
?????*?@param?ctx?上下文對(duì)象
?????*?@throws?Exception
?????*/
????@Override
????public?void?channelReadComplete(ChannelHandlerContext?ctx)
????????????throws?Exception?{
????????//?發(fā)送響應(yīng)給客戶端
????????ctx.writeAndFlush(
????????????????//?Unpooled?類是?Netty?提供的專門操作緩沖區(qū)的工具
????????????????//?類,copiedBuffer?方法返回的?ByteBuf?對(duì)象類似于
????????????????//?NIO?中的?ByteBuffer,但性能更高
????????????????Unpooled.copiedBuffer(
????????????????????????"hello?client!?i?have?got?your?data.",
????????????????????????CharsetUtil.UTF_8
????????????????)
????????);
????}
????/**
?????*?發(fā)生異常時(shí)執(zhí)行
?????*
?????*?@param?ctx???上下文對(duì)象
?????*?@param?cause?異常對(duì)象
?????*?@throws?Exception
?????*/
????@Override
????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)
????????????throws?Exception?{
????????//?關(guān)閉與客戶端的?Socket?連接
????????ctx.channel().close();
????}
}
客戶端端代碼為:
/**
?*?需要的依賴:
?*?
?*?io.netty
?*?netty-all
?*?4.1.52.Final
?*?
?*/
public?static?void?main(String[]?args)?throws?InterruptedException?{
????//?客戶端只需要一個(gè)事件循環(huán)組,可以看做?BossGroup
????EventLoopGroup?eventLoopGroup?=?new?NioEventLoopGroup();
????try?{
????????//?創(chuàng)建客戶端的啟動(dòng)對(duì)象
????????Bootstrap?bootstrap?=?new?Bootstrap();
????????//?配置參數(shù)
????????bootstrap
????????????????//?設(shè)置線程組
????????????????.group(eventLoopGroup)
????????????????//?說(shuō)明客戶端通道的實(shí)現(xiàn)類(便于?Netty?做反射處理)
????????????????.channel(NioSocketChannel.class)
????????????????//?handler()方法用于給?BossGroup?設(shè)置業(yè)務(wù)處理器
????????????????.handler(
????????????????????????//?創(chuàng)建一個(gè)通道初始化對(duì)象
????????????????????????new?ChannelInitializer<SocketChannel>()?{
????????????????????????????//?向?Pipeline?添加業(yè)務(wù)處理器
????????????????????????????@Override
????????????????????????????protected?void?initChannel(
????????????????????????????????????SocketChannel?socketChannel
????????????????????????????)?throws?Exception?{
????????????????????????????????socketChannel.pipeline().addLast(
????????????????????????????????????????new?NettyClientHandler()
????????????????????????????????);
????????????????????????????????
????????????????????????????????//?可以繼續(xù)調(diào)用?socketChannel.pipeline().addLast()
????????????????????????????????//?添加更多?Handler
????????????????????????????}
????????????????????????}
????????????????);
????????System.out.println("client?is?ready...");
????????//?啟動(dòng)客戶端去連接服務(wù)器端,ChannelFuture?涉及到?Netty?的異步模型,后面展開(kāi)講
????????ChannelFuture?channelFuture?=?bootstrap.connect(
????????????????"127.0.0.1",
????????????????8080).sync();
????????//?對(duì)通道關(guān)閉進(jìn)行監(jiān)聽(tīng)
????????channelFuture.channel().closeFuture().sync();
????}?finally?{
????????eventLoopGroup.shutdownGracefully();
????}
}
/**
?*?自定義一個(gè)?Handler,需要繼承?Netty?規(guī)定好的某個(gè)?HandlerAdapter(規(guī)范)
?*?InboundHandler?用于處理數(shù)據(jù)流入本端(客戶端)的?IO?事件
?*?InboundHandler?用于處理數(shù)據(jù)流出本端(客戶端)的?IO?事件
?*/
static?class?NettyClientHandler?extends?ChannelInboundHandlerAdapter?{
????/**
?????*?通道就緒時(shí)執(zhí)行
?????*
?????*?@param?ctx?上下文對(duì)象
?????*?@throws?Exception
?????*/
????@Override
????public?void?channelActive(ChannelHandlerContext?ctx)
????????????throws?Exception?{
????????//?向服務(wù)器發(fā)送數(shù)據(jù)
????????ctx.writeAndFlush(
????????????????//?Unpooled?類是?Netty?提供的專門操作緩沖區(qū)的工具
????????????????//?類,copiedBuffer?方法返回的?ByteBuf?對(duì)象類似于
????????????????//?NIO?中的?ByteBuffer,但性能更高
????????????????Unpooled.copiedBuffer(
????????????????????????"hello?server!",
????????????????????????CharsetUtil.UTF_8
????????????????)
????????);
????}
????/**
?????*?當(dāng)通道有數(shù)據(jù)可讀時(shí)執(zhí)行
?????*
?????*?@param?ctx?上下文對(duì)象
?????*?@param?msg?服務(wù)器端發(fā)送的數(shù)據(jù)
?????*?@throws?Exception
?????*/
????@Override
????public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)
????????????throws?Exception?{
????????//?接收服務(wù)器端發(fā)來(lái)的數(shù)據(jù)
????????System.out.println("server?address:?"
????????????????+?ctx.channel().remoteAddress());
????????//?ByteBuf?是?Netty?提供的類,比?NIO?的?ByteBuffer?性能更高
????????ByteBuf?byteBuf?=?(ByteBuf)?msg;
????????System.out.println("data?from?server:?"
????????????????+?byteBuf.toString(CharsetUtil.UTF_8));
????}
????/**
?????*?發(fā)生異常時(shí)執(zhí)行
?????*
?????*?@param?ctx???上下文對(duì)象
?????*?@param?cause?異常對(duì)象
?????*?@throws?Exception
?????*/
????@Override
????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)
????????????throws?Exception?{
????????//?關(guān)閉與服務(wù)器端的?Socket?連接
????????ctx.channel().close();
????}
}
什么?你覺(jué)得使用 Netty 編程難度和工作量更大了?不會(huì)吧不會(huì)吧,你要知道,你通過(guò)這么兩段簡(jiǎn)短的代碼得到了一個(gè)基于主從 Reactor 多線程模式的服務(wù)器,一個(gè)高吞吐量和并發(fā)量的服務(wù)器,一個(gè)異步處理服務(wù)器……你還要怎樣?
對(duì)上面的兩段代碼,作以下簡(jiǎn)單說(shuō)明:
1)Bootstrap 和 ServerBootstrap 分別是客戶端和服務(wù)器端的引導(dǎo)類,一個(gè) Netty 應(yīng)用程序通常由一個(gè)引導(dǎo)類開(kāi)始,主要是用來(lái)配置整個(gè) Netty 程序、設(shè)置業(yè)務(wù)處理類(Handler)、綁定端口、發(fā)起連接等。
2)客戶端創(chuàng)建一個(gè) NioSocketChannel 作為客戶端通道,去連接服務(wù)器。
3)服務(wù)端首先創(chuàng)建一個(gè) NioServerSocketChannel 作為服務(wù)器端通道,每當(dāng)接收一個(gè)客戶端連接就產(chǎn)生一個(gè) NioSocketChannel 應(yīng)對(duì)該客戶端。
4)使用 Channel 構(gòu)建網(wǎng)絡(luò) IO 程序的時(shí)候,不同的協(xié)議、不同的阻塞類型和 Netty 中不同的 Channel 對(duì)應(yīng),常用的 Channel 有:
NioSocketChannel:非阻塞的 TCP 客戶端 Channel(本案例的客戶端使用的 Channel)
NioServerSocketChannel:非阻塞的 TCP 服務(wù)器端 Channel(本案例的服務(wù)器端使用的 Channel)
NioDatagramChannel:非阻塞的 UDP Channel
NioSctpChannel:非阻塞的 SCTP 客戶端 Channel
NioSctpServerChannel:非阻塞的 SCTP 服務(wù)器端 Channel
......
啟動(dòng)服務(wù)端和客戶端代碼,調(diào)試以上的服務(wù)端代碼,發(fā)現(xiàn):
1)默認(rèn)情況下 BossGroup 和 WorkerGroup 都包含 16 個(gè)線程(NioEventLoop),這是因?yàn)槲业?PC 是 8 核的 NioEventLoop 的數(shù)量=coreNum*2。這 16 個(gè)線程相當(dāng)于主 Reactor。



其實(shí)創(chuàng)建 BossGroup 和 WorkerGroup 的時(shí)候可以指定 NioEventLoop 數(shù)量,如下:
EventLoopGroup?bossGroup?=?new?NioEventLoopGroup(1);
EventLoopGroup?workerGroup?=?new?NioEventLoopGroup(16);
這樣就能更好地分配線程資源。
2)每一個(gè) NioEventLoop 包含如下的屬性(比如自己的 Selector、任務(wù)隊(duì)列、執(zhí)行器等):

3)將代碼斷在服務(wù)端的 NettyServerHandler.channelRead 上:

可以看到 ctx 中包含的屬性如下:

可以看到:
當(dāng)前 ChannelHandlerContext ctx 是位于 ChannelHandlerContext 責(zé)任鏈中的一環(huán),可以看到其 next、prev 屬性
當(dāng)前 ChannelHandlerContext ctx 包含一個(gè) Handler
當(dāng)前 ChannelHandlerContext ctx 包含一個(gè) Pipeline
Pipeline 本質(zhì)上是一個(gè)雙向循環(huán)列表,可以看到其 tail、head 屬性
Pipeline 中包含一個(gè) Channel,Channel 中又包含了該 Pipeline,兩者互相引用
……
從下一節(jié)開(kāi)始,我將深入剖析以上兩段代碼,向讀者展示 Netty 的更多細(xì)節(jié)。
2.5. Netty 的 Handler 組件
無(wú)論是服務(wù)端代碼中自定義的 NettyServerHandler 還是客戶端代碼中自定義的 NettyClientHandler,都繼承于 ChannelInboundHandlerAdapter,ChannelInboundHandlerAdapter 又繼承于 ChannelHandlerAdapter,ChannelHandlerAdapter 又實(shí)現(xiàn)了 ChannelHandler:
public?class?ChannelInboundHandlerAdapter?
????extends?ChannelHandlerAdapter?
????implements?ChannelInboundHandler?{
????......
public?abstract?class?ChannelHandlerAdapter?
????implements?ChannelHandler?{
????......
因此無(wú)論是服務(wù)端代碼中自定義的 NettyServerHandler 還是客戶端代碼中自定義的 NettyClientHandler,都可以統(tǒng)稱為 ChannelHandler。
Netty 中的 ChannelHandler 的作用是,在當(dāng)前 ChannelHandler 中處理 IO 事件,并將其傳遞給 ChannelPipeline 中下一個(gè) ChannelHandler 處理,因此多個(gè) ChannelHandler 形成一個(gè)責(zé)任鏈,責(zé)任鏈位于 ChannelPipeline 中。
數(shù)據(jù)在基于 Netty 的服務(wù)器或客戶端中的處理流程是:讀取數(shù)據(jù)-->解碼數(shù)據(jù)-->處理數(shù)據(jù)-->編碼數(shù)據(jù)-->發(fā)送數(shù)據(jù)。其中的每個(gè)過(guò)程都用得到 ChannelHandler 責(zé)任鏈。

Netty 中的 ChannelHandler 體系如下(第一張圖來(lái)源于網(wǎng)絡(luò)):


其中:
ChannelInboundHandler 用于處理入站 IO 事件 ChannelOutboundHandler 用于處理出站 IO 事件 ChannelInboundHandlerAdapter 用于處理入站 IO 事件 ChannelOutboundHandlerAdapter 用于處理出站 IO 事件
ChannelPipeline 提供了 ChannelHandler 鏈的容器。以客戶端應(yīng)用程序?yàn)槔绻录姆较蚴菑目蛻舳说椒?wù)器的,我們稱事件是出站的,那么客戶端發(fā)送給服務(wù)器的數(shù)據(jù)會(huì)通過(guò) Pipeline 中的一系列 ChannelOutboundHandler 進(jìn)行處理;如果事件的方向是從服務(wù)器到客戶端的,我們稱事件是入站的,那么服務(wù)器發(fā)送給客戶端的數(shù)據(jù)會(huì)通過(guò) Pipeline 中的一系列 ChannelInboundHandler 進(jìn)行處理。

無(wú)論是服務(wù)端代碼中自定義的 NettyServerHandler 還是客戶端代碼中自定義的 NettyClientHandler,都繼承于 ChannelInboundHandlerAdapter,ChannelInboundHandlerAdapter 提供的方法如下:

從方法名字可以看出,它們?cè)诓煌氖录l(fā)生后被觸發(fā),例如注冊(cè) Channel 時(shí)執(zhí)行 channelRegistred()、添加 ChannelHandler 時(shí)執(zhí)行 handlerAdded()、收到入站數(shù)據(jù)時(shí)執(zhí)行 channelRead()、入站數(shù)據(jù)讀取完畢后執(zhí)行 channelReadComplete()等等。
2.6. Netty 的 Pipeline 組件
上一節(jié)說(shuō)到,Netty 的 ChannelPipeline,它維護(hù)了一個(gè) ChannelHandler 責(zé)任鏈,負(fù)責(zé)攔截或者處理 inbound(入站)和 outbound(出站)的事件和操作。這一節(jié)給出更深層次的描述。
ChannelPipeline 實(shí)現(xiàn)了一種高級(jí)形式的攔截過(guò)濾器模式,使用戶可以完全控制事件的處理方式,以及 Channel 中各個(gè) ChannelHandler 如何相互交互。
每個(gè) Netty Channel 包含了一個(gè) ChannelPipeline(其實(shí) Channel 和 ChannelPipeline 互相引用),而 ChannelPipeline 又維護(hù)了一個(gè)由 ChannelHandlerContext 構(gòu)成的雙向循環(huán)列表,其中的每一個(gè) ChannelHandlerContext 都包含一個(gè) ChannelHandler。(前文描述的時(shí)候?yàn)榱撕?jiǎn)便,直接說(shuō) ChannelPipeline 包含了一個(gè) ChannelHandler 責(zé)任鏈,這里給出完整的細(xì)節(jié)。)
如下圖所示(圖片來(lái)源于網(wǎng)絡(luò)):

還記得下面這張圖嗎?這是上文中基于 Netty 的 Server 程序的調(diào)試截圖,可以從中看到 ChannelHandlerContext 中包含了哪些成分:

ChannelHandlerContext 除了包含 ChannelHandler 之外,還關(guān)聯(lián)了對(duì)應(yīng)的 Channel 和 Pipeline。可以這么來(lái)講:ChannelHandlerContext、ChannelHandler、Channel、ChannelPipeline 這幾個(gè)組件之間互相引用,互為各自的屬性,你中有我、我中有你。
在處理入站事件的時(shí)候,入站事件及數(shù)據(jù)會(huì)從 Pipeline 中的雙向鏈表的頭 ChannelHandlerContext 流向尾 ChannelHandlerContext,并依次在其中每個(gè) ChannelInboundHandler(例如解碼 Handler)中得到處理;出站事件及數(shù)據(jù)會(huì)從 Pipeline 中的雙向鏈表的尾 ChannelHandlerContext 流向頭 ChannelHandlerContext,并依次在其中每個(gè) ChannelOutboundHandler(例如編碼 Handler)中得到處理。

2.7. Netty 的 EventLoopGroup 組件
在基于 Netty 的 TCP Server 代碼中,包含了兩個(gè) EventLoopGroup——bossGroup 和 workerGroup,EventLoopGroup 是一組 EventLoop 的抽象。
追蹤 Netty 的 EventLoop 的繼承鏈,可以發(fā)現(xiàn) EventLoop 最終繼承于 JUC Executor,因此 EventLoop 本質(zhì)就是一個(gè) JUC Executor,即線程,JUC Executor 的源碼為:
public?interface?Executor?{
????/**
?????*?Executes?the?given?command?at?some?time?in?the?future.
?????*/
????void?execute(Runnable?command);
}
Netty 為了更好地利用多核 CPU 的性能,一般會(huì)有多個(gè) EventLoop 同時(shí)工作,每個(gè) EventLoop 維護(hù)著一個(gè) Selector 實(shí)例,Selector 實(shí)例監(jiān)聽(tīng)注冊(cè)其上的 Channel 的 IO 事件。
EventLoopGroup 含有一個(gè) next 方法,它的作用是按照一定規(guī)則從 Group 中選取一個(gè) EventLoop 處理 IO 事件。
在服務(wù)端,通常 Boss EventLoopGroup 只包含一個(gè) Boss EventLoop(單線程),該 EventLoop 維護(hù)者一個(gè)注冊(cè)了 ServerSocketChannel 的 Selector 實(shí)例。該 EventLoop 不斷輪詢 Selector 得到 OP_ACCEPT 事件(客戶端連接事件),然后將接收到的 SocketChannel 交給 Worker EventLoopGroup,Worker EventLoopGroup 會(huì)通過(guò) next()方法選取一個(gè) Worker EventLoop 并將這個(gè) SocketChannel 注冊(cè)到其中的 Selector 上,由這個(gè) Worker EventLoop 負(fù)責(zé)該 SocketChannel 上后續(xù)的 IO 事件處理。整個(gè)過(guò)程如下圖所示:

2.8. Netty 的 TaskQueue
在 Netty 的每一個(gè) NioEventLoop 中都有一個(gè) TaskQueue,設(shè)計(jì)它的目的是在任務(wù)提交的速度大于線程的處理速度的時(shí)候起到緩沖作用。或者用于異步地處理 Selector 監(jiān)聽(tīng)到的 IO 事件。

Netty 中的任務(wù)隊(duì)列有三種使用場(chǎng)景:
1)處理用戶程序的自定義普通任務(wù)的時(shí)候
2)處理用戶程序的自定義定時(shí)任務(wù)的時(shí)候
3)非當(dāng)前 Reactor 線程調(diào)用當(dāng)前 Channel 的各種方法的時(shí)候。
對(duì)于第一種場(chǎng)景,舉個(gè)例子,2.4 節(jié)的基于 Netty 編寫的服務(wù)端的 Handler 中,假如 channelRead 方法中執(zhí)行的過(guò)程很耗時(shí),那么以下的阻塞式處理方式無(wú)疑會(huì)降低當(dāng)前 NioEventLoop 的并發(fā)度:
/**
?*?當(dāng)通道有數(shù)據(jù)可讀時(shí)執(zhí)行
?*
?*?@param?ctx?上下文對(duì)象
?*?@param?msg?客戶端發(fā)送的數(shù)據(jù)
?*?@throws?Exception
?*/
@Override
public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)
????????throws?Exception?{
????//?借助休眠模擬耗時(shí)操作
????Thread.sleep(LONG_TIME);
????ByteBuf?byteBuf?=?(ByteBuf)?msg;
????System.out.println("data?from?client:?"
????????????+?byteBuf.toString(CharsetUtil.UTF_8));
}
改進(jìn)方法就是借助任務(wù)隊(duì)列,代碼如下:
/**
?*?當(dāng)通道有數(shù)據(jù)可讀時(shí)執(zhí)行
?*
?*?@param?ctx?上下文對(duì)象
?*?@param?msg?客戶端發(fā)送的數(shù)據(jù)
?*?@throws?Exception
?*/
@Override
public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)
????????throws?Exception?{
????//?假如這里的處理非常耗時(shí),那么就需要借助任務(wù)隊(duì)列異步執(zhí)行
????final?Object?finalMsg?=?msg;
????//?通過(guò)?ctx.channel().eventLoop().execute()將耗時(shí)
????//?操作放入任務(wù)隊(duì)列異步執(zhí)行
????ctx.channel().eventLoop().execute(new?Runnable()?{
????????public?void?run()?{
????????????//?借助休眠模擬耗時(shí)操作
????????????try?{
????????????????Thread.sleep(LONG_TIME);
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????????ByteBuf?byteBuf?=?(ByteBuf)?finalMsg;
????????????System.out.println("data?from?client:?"
????????????????????+?byteBuf.toString(CharsetUtil.UTF_8));
????????}
????});
????
????//?可以繼續(xù)調(diào)用?ctx.channel().eventLoop().execute()
????//?將更多操作放入隊(duì)列
????
????System.out.println("return?right?now.");
}
斷點(diǎn)跟蹤這個(gè)函數(shù)的執(zhí)行,可以發(fā)現(xiàn)該耗時(shí)任務(wù)確實(shí)被放入的當(dāng)前 NioEventLoop 的 taskQueue 中了。

對(duì)于第二種場(chǎng)景,舉個(gè)例子,2.4 節(jié)的基于 Netty 編寫的服務(wù)端的 Handler 中,假如 channelRead 方法中執(zhí)行的過(guò)程并不需要立即執(zhí)行,而是要定時(shí)執(zhí)行,那么代碼可以這樣寫:
/**
?*?當(dāng)通道有數(shù)據(jù)可讀時(shí)執(zhí)行
?*
?*?@param?ctx?上下文對(duì)象
?*?@param?msg?客戶端發(fā)送的數(shù)據(jù)
?*?@throws?Exception
?*/
@Override
public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)
????????throws?Exception?{
????final?Object?finalMsg?=?msg;
????//?通過(guò)?ctx.channel().eventLoop().schedule()將操作
????//?放入任務(wù)隊(duì)列定時(shí)執(zhí)行(5min?之后才進(jìn)行處理)
????ctx.channel().eventLoop().schedule(new?Runnable()?{
????????public?void?run()?{
????????????ByteBuf?byteBuf?=?(ByteBuf)?finalMsg;
????????????System.out.println("data?from?client:?"
????????????????????+?byteBuf.toString(CharsetUtil.UTF_8));
????????}
????},?5,?TimeUnit.MINUTES);
????
????//?可以繼續(xù)調(diào)用?ctx.channel().eventLoop().schedule()
????//?將更多操作放入隊(duì)列
????System.out.println("return?right?now.");
}
斷點(diǎn)跟蹤這個(gè)函數(shù)的執(zhí)行,可以發(fā)現(xiàn)該定時(shí)任務(wù)確實(shí)被放入的當(dāng)前 NioEventLoop 的 scheduleTasjQueue 中了。

對(duì)于第三種場(chǎng)景,舉個(gè)例子,比如在基于 Netty 構(gòu)建的推送系統(tǒng)的業(yè)務(wù)線程中,要根據(jù)用戶標(biāo)識(shí),找到對(duì)應(yīng)的 SocketChannel 引用,然后調(diào)用 write 方法向該用戶推送消息,這時(shí)候就會(huì)將這一 write 任務(wù)放在任務(wù)隊(duì)列中,write 任務(wù)最終被異步消費(fèi)。這種情形是對(duì)前兩種情形的應(yīng)用,且涉及的業(yè)務(wù)內(nèi)容太多,不再給出示例代碼,讀者有興趣可以自行完成,這里給出以下提示:

2.9. Netty 的 Future 和 Promise
Netty**對(duì)使用者提供的多數(shù) IO 接口(即 Netty Channel 中的 IO 方法)**是異步的(即都立即返回一個(gè) Netty Future,而 IO 過(guò)程異步進(jìn)行),因此,調(diào)用者調(diào)用 IO 操作后是不能直接拿到調(diào)用結(jié)果的。要想得到 IO 操作結(jié)果,可以借助 Netty 的 Future(上面代碼中的 ChannelFuture 就繼承了 Netty Future,Netty Future 又繼承了 JUC Future)查詢執(zhí)行狀態(tài)、等待執(zhí)行結(jié)果、獲取執(zhí)行結(jié)果等,使用過(guò) JUC Future 接口的同學(xué)會(huì)非常熟悉這個(gè)機(jī)制,這里不再展開(kāi)描述了。也可以通過(guò) Netty Future 的 addListener()添加一個(gè)回調(diào)方法來(lái)異步處理 IO 結(jié)果,如下:
//?啟動(dòng)客戶端去連接服務(wù)器端
//?由于?bootstrap.connect()是一個(gè)異步操作,因此用.sync()等待
//?這個(gè)異步操作完成
final?ChannelFuture?channelFuture?=?bootstrap.connect(
????????"127.0.0.1",
????????8080).sync();
channelFuture.addListener(new?ChannelFutureListener()?{
????/**
?????*?回調(diào)方法,上面的?bootstrap.connect()操作執(zhí)行完之后觸發(fā)
?????*/
????public?void?operationComplete(ChannelFuture?future)
????????????throws?Exception?{
????????if?(channelFuture.isSuccess())?{
????????????System.out.println("client?has?connected?to?server!");
????????????//?TODO?其他處理
????????}?else?{
????????????System.out.println("connect?to?serverfail!");
????????????//?TODO?其他處理
????????}
????}
});
Netty Future 提供的接口有:

注:會(huì)有一些資料給出這樣的描述:“Netty 中所有的 IO 操作都是異步的”,這顯然是錯(cuò)誤的。Netty 基于 Java NIO,Java NIO 是同步非阻塞 IO。Netty 基于 Java NIO 做了封裝,向使用者提供了異步特性的接口,因此本文說(shuō) Netty**對(duì)使用者提供的多數(shù) IO 接口(即 Netty Channel 中的 IO 方法)**是異步的。例如在 io.netty.channel.ChannelOutboundInvoker(Netty Channel 的 IO 方法多繼承于此)提供的多數(shù) IO 接口都返回 Netty Future:
Promise 是可寫的 Future,F(xiàn)uture 自身并沒(méi)有寫操作相關(guān)的接口,Netty 通過(guò) Promise 對(duì) Future 進(jìn)行擴(kuò)展,用于設(shè)置 IO 操作的結(jié)果。Future 繼承了 Future,相關(guān)的接口定義如下圖所示,相比于上圖 Future 的接口,它多出了一些 setXXX 方法:

Netty 發(fā)起 IO 寫操作的時(shí)候,會(huì)創(chuàng)建一個(gè)新的 Promise 對(duì)象,例如調(diào)用 ChannelHandlerContext 的 write(Object object)方法時(shí),會(huì)創(chuàng)建一個(gè)新的 ChannelPromise,相關(guān)代碼如下:
@Override
public?ChannelFuture?write(Object?msg)?{
????return?write(msg,?newPromise());
}
......
@Override
public?ChannelPromise?newPromise()?{
????return?new?DefaultChannelPromise(channel(),?executor());
}
......
當(dāng) IO 操作發(fā)生異常或者完成時(shí),通過(guò) Promise.setSuccess()或者 Promise.setFailure()設(shè)置結(jié)果,并通知所有 Listener。關(guān)于 Netty 的 Future/Promise 的工作原理,我將在下一篇文章中進(jìn)行源碼級(jí)的解析。
3. 結(jié)束語(yǔ)
我想,到此為止,讀者再次看到這幅 Netty 的架構(gòu)圖會(huì)有不一樣的感覺(jué)。它變得簡(jiǎn)潔、生動(dòng)、優(yōu)雅,因?yàn)槟阋呀?jīng)熟知了它的細(xì)節(jié)和運(yùn)作流程。

參考資料:
Netty官網(wǎng)文檔,https://netty.io/wiki/all-documents.html 《Netty權(quán)威指南(第一版)》,李林鋒 《Netty in Action》,Norman Maurer 《Scalable IO in Java》,Doug Lea 尚硅谷Netty系列教程,韓順平主講

