45 張圖深度解析 Netty 架構與原理

作為一個學 Java 的,如果沒有研究過 Netty,那么你對 Java 語言的使用和理解僅僅停留在表面水平,會點 SSH 寫幾個 MVC,訪問數(shù)據(jù)庫和緩存,這些只是初等 Java 程序員干的事。如果你要進階,想了解 Java 服務器的深層高階知識,Netty 絕對是一個必須要過的門檻。
接下來我們會學習一個 Netty 系列教程,Netty 系列由「架構與原理」,「源碼」,「架構」三部分組成,今天我們先來看看第一部分:Netty 架構與原理初探,大綱如下:
前言 1. Netty 基礎 1.4.1. 緩沖區(qū)(Buffer) 1.4.2. 通道(Channel) 1.4.3. 選擇器(Selector) 1.1. Netty 是什么 1.2. Netty 的應用場景 1.3. Java 中的網(wǎng)絡 IO 模型 1.4. Java NIO API 簡單回顧 1.5. 零拷貝技術 2. Netty 的架構與原理 2.2.1. 單 Reactor 單線程模式 2.2.2. 單 Reactor 多線程模式 2.2.3. 主從 Reactor 多線程模式 2.1. 為什么要制造 Netty 2.2. 幾種 Reactor 線程模式 2.3. Netty 的模樣 2.4. 基于 Netty 的 TCP Server/Client 案例 2.5. Netty 的 Handler 組件 2.6. Netty 的 Pipeline 組件 2.7. Netty 的 EventLoopGroup 組件 2.8. Netty 的 TaskQueue 2.9. Netty 的 Future 和 Promise 3. 結束語
前言
讀者在閱讀本文前最好有 Java 的 IO 編程經(jīng)驗(知道 Java 的各種 IO 流),以及 Java 網(wǎng)絡編程經(jīng)驗(用 ServerSocket 和 Socket 寫過 demo),并對 Java NIO 有基本的認識(至少知道 Channel、Buffer、Selector 中的核心屬性和方法,以及三者如何配合使用的),以及 JUC 編程經(jīng)驗(至少知道其中的 Future 異步處理機制),沒有也沒關系,文中多數(shù)會介紹,不影響整體的理解。
文中對于 Reactor 的講解使用了幾張來自網(wǎng)絡上的深灰色背景的示意圖,但未找到原始出處,文中已標注“圖片來源于網(wǎng)絡”。
Netty 的設計復雜,接口和類體系龐大,因此我會從不同的層次對有些 Netty 中的重要組件反復描述,以幫助讀者理解。
1. Netty 基礎
基礎好的同學,如果已經(jīng)掌握了 Java NIO 并對 IO 多路復用的概念有一定的認知,可以跳過本章。
1.1. Netty 是什么
1)Netty 是 JBoss 開源項目,是異步的、基于事件驅(qū)動的網(wǎng)絡應用框架,它以高性能、高并發(fā)著稱。所謂基于事件驅(qū)動,說得簡單點就是 Netty 會根據(jù)客戶端事件(連接、讀、寫等)做出響應,關于這點,隨著文章的論述的展開,讀者自然會明白。
2)Netty 主要用于開發(fā)基于 TCP 協(xié)議的網(wǎng)絡 IO 程序(TCP/IP 是網(wǎng)絡通信的基石,當然也是 Netty 的基石,Netty 并沒有去改變這些底層的網(wǎng)絡基礎設施,而是在這之上提供更高層的網(wǎng)絡基礎設施),例如高性能服務器段/客戶端、P2P 程序等。
3)Netty 是基于 Java NIO 構建出來的,Java NIO 又是基于 Linux 提供的高性能 IO 接口/系統(tǒng)調(diào)用構建出來的。關于 Netty 在網(wǎng)絡中的地位,下圖可以很好地表達出來:

1.2. Netty 的應用場景
在互聯(lián)網(wǎng)領域,Netty 作為異步高并發(fā)的網(wǎng)絡組件,常常用于構建高性能 RPC 框架,以提升分布式服務群之間調(diào)用或者數(shù)據(jù)傳輸?shù)牟l(fā)度和速度。例如 Dubbo 的網(wǎng)絡層就可以(但并非一定)使用 Netty。
一些大數(shù)據(jù)基礎設施,比如 Hadoop,在處理海量數(shù)據(jù)的時候,數(shù)據(jù)在多個計算節(jié)點之中傳輸,為了提高傳輸性能,也采用 Netty 構建性能更高的網(wǎng)絡 IO 層。
在游戲行業(yè),Netty 被用于構建高性能的游戲交互服務器,Netty 提供了 TCP/UDP、HTTP 協(xié)議棧,方便開發(fā)者基于 Netty 進行私有協(xié)議的開發(fā)。
……
Netty 作為成熟的高性能異步通信框架,無論是應用在互聯(lián)網(wǎng)分布式應用開發(fā)中,還是在大數(shù)據(jù)基礎設施構建中,亦或是用于實現(xiàn)應用層基于公私協(xié)議的服務器等等,都有出色的表現(xiàn),是一個極好的輪子。
1.3. Java 中的網(wǎng)絡 IO 模型
Java 中的網(wǎng)絡 IO 模型有三種:BIO、NIO、AIO。
1)BIO:同步的、阻塞式 IO。在這種模型中,服務器上一個線程處理一次連接,即客戶端每發(fā)起一個請求,服務端都要開啟一個線程專門處理該請求。這種模型對線程量的耗費極大,且線程利用率低,難以承受請求的高并發(fā)。BIO 雖然可以使用線程池+等待隊列進行優(yōu)化,避免使用過多的線程,但是依然無法解決線程利用率低的問題。

使用 BIO 構建 C/S 系統(tǒng)的 Java 編程組件是 ServerSocket 和 Socket。服務端示例代碼為:
public?static?void?main(String[]?args)?throws?IOException?{
????ExecutorService?threadPool?=?Executors.newCachedThreadPool();
????ServerSocket?serverSocket?=?new?ServerSocket(8080);
????while?(true)?{
????????Socket?socket?=?serverSocket.accept();
????????threadPool.execute(()?->?{
????????????handler(socket);
????????});
????}
}
/**
?*?處理客戶端請求
?*/
private?static?void?handler(Socket?socket)?throws?IOException?{
????byte[]?bytes?=?new?byte[1024];
????InputStream?inputStream?=?socket.getInputStream();
????socket.close();
????while?(true)?{
????????int?read?=?inputStream.read(bytes);
????????if?(read?!=?-1)?{
????????????System.out.println("msg?from?client:?"?+?new?String(bytes,?0,?read));
????????}?else?{
????????????break;
????????}
????}
}
2)NIO:同步的、非阻塞式 IO。在這種模型中,服務器上一個線程處理多個連接,即多個客戶端請求都會被注冊到多路復用器(后文要講的 Selector)上,多路復用器會輪訓這些連接,輪訓到連接上有 IO 活動就進行處理。NIO 降低了線程的需求量,提高了線程的利用率。Netty 就是基于 NIO 的(這里有一個問題:前文大力宣揚 Netty 是一個異步高性能網(wǎng)絡應用框架,為何這里又說 Netty 是基于同步的 NIO 的?請讀者跟著文章的描述找尋答案)。

NIO 是面向緩沖區(qū)編程的,從緩沖區(qū)讀取數(shù)據(jù)的時候游標在緩沖區(qū)中是可以前后移動的,這就增加了數(shù)據(jù)處理的靈活性。這和面向流的 BIO 只能順序讀取流中數(shù)據(jù)有很大的不同。
Java NIO 的非阻塞模式,使得一個線程從某個通道讀取數(shù)據(jù)的時候,若當前有可用數(shù)據(jù),則該線程進行處理,若當前無可用數(shù)據(jù),則該線程不會保持阻塞等待狀態(tài),而是可以去處理其他工作(比如處理其他通道的讀寫);同樣,一個線程向某個通道寫入數(shù)據(jù)的時候,一旦開始寫入,該線程無需等待寫完即可去處理其他工作(比如處理其他通道的讀寫)。這種特性使得一個線程能夠處理多個客戶端請求,而不是像 BIO 那樣,一個線程只能處理一個請求。
使用 NIO 構建 C/S 系統(tǒng)的 Java 編程組件是 Channel、Buffer、Selector。服務端示例代碼為:
public?static?void?main(String[]?args)?throws?IOException?{
????ServerSocketChannel?serverSocketChannel?=?ServerSocketChannel.open();
????Selector?selector?=?Selector.open();
????//?綁定端口
????serverSocketChannel.socket().bind(new?InetSocketAddress(8080));
????//?設置?serverSocketChannel?為非阻塞模式
????serverSocketChannel.configureBlocking(false);
????//?注冊?serverSocketChannel?到?selector,關注?OP_ACCEPT?事件
????serverSocketChannel.register(selector,?SelectionKey.OP_ACCEPT);
????while?(true)?{
????????//?沒有事件發(fā)生
????????if?(selector.select(1000)?==?0)?{
????????????continue;
????????}
????????//?有事件發(fā)生,找到發(fā)生事件的?Channel?對應的?SelectionKey?的集合
????????Set?selectionKeys?=?selector.selectedKeys();
????????Iterator?iterator?=?selectionKeys.iterator();
????????while?(iterator.hasNext())?{
????????????SelectionKey?selectionKey?=?iterator.next();
????????????//?發(fā)生?OP_ACCEPT?事件,處理連接請求
????????????if?(selectionKey.isAcceptable())?{
????????????????SocketChannel?socketChannel?=?serverSocketChannel.accept();
????????????????//?將?socketChannel?也注冊到?selector,關注?OP_READ
????????????????//?事件,并給?socketChannel?關聯(lián)?Buffer
????????????????socketChannel.register(selector,?SelectionKey.OP_READ,?ByteBuffer.allocate(1024));
????????????}
????????????//?發(fā)生?OP_READ?事件,讀客戶端數(shù)據(jù)
????????????if?(selectionKey.isReadable())?{
????????????????SocketChannel?channel?=?(SocketChannel)?selectionKey.channel();
????????????????ByteBuffer?buffer?=?(ByteBuffer)?selectionKey.attachment();
????????????????channel.read(buffer);
????????????????System.out.println("msg?form?client:?"?+?new?String(buffer.array()));
????????????}
????????????//?手動從集合中移除當前的?selectionKey,防止重復處理事件
????????????iterator.remove();
????????}
????}
}
3)AIO:異步非阻塞式 IO。在這種模型中,由操作系統(tǒng)完成與客戶端之間的 read/write,之后再由操作系統(tǒng)主動通知服務器線程去處理后面的工作,在這個過程中服務器線程不必同步等待 read/write 完成。由于不同的操作系統(tǒng)對 AIO 的支持程度不同,AIO 目前未得到廣泛應用。因此本文對 AIO 不做過多描述。
使用 Java NIO 構建的 IO 程序,它的工作模式是:主動輪訓 IO 事件,IO 事件發(fā)生后程序的線程主動處理 IO 工作,這種模式也叫做 Reactor 模式。使用 Java AIO 構建的 IO 程序,它的工作模式是:將 IO 事件的處理托管給操作系統(tǒng),操作系統(tǒng)完成 IO 工作之后會通知程序的線程去處理后面的工作,這種模式也叫做 Proactor 模式。
本節(jié)最后,討論一下網(wǎng)路 IO 中阻塞、非阻塞、異步、同步這幾個術語的含義和關系:
阻塞:如果線程調(diào)用 read/write 過程,但 read/write 過程沒有就緒或沒有完成,則調(diào)用 read/write 過程的線程會一直等待,這個過程叫做阻塞式讀寫。 非阻塞:如果線程調(diào)用 read/write 過程,但 read/write 過程沒有就緒或沒有完成,調(diào)用 read/write 過程的線程并不會一直等待,而是去處理其他工作,等到 read/write 過程就緒或完成后再回來處理,這個過程叫做非阻塞式讀寫。 異步:read/write 過程托管給操作系統(tǒng)來完成,完成后操作系統(tǒng)會通知(通過回調(diào)或者事件)應用網(wǎng)絡 IO 程序(其中的線程)來進行后續(xù)的處理。 同步:read/write 過程由網(wǎng)絡 IO 程序(其中的線程)來完成。
基于以上含義,可以看出:異步 IO 一定是非阻塞 IO;同步 IO 既可以是阻塞 IO、也可以是非阻塞 IO。
1.4. Java NIO API 簡單回顧
BIO 以流的方式處理數(shù)據(jù),而 NIO 以緩沖區(qū)(也被叫做塊)的方式處理數(shù)據(jù),塊 IO 效率比流 IO 效率高很多。BIO 基于字符流或者字節(jié)流進行操作,而 NIO 基于 Channel 和 Buffer 進行操作,數(shù)據(jù)總是從通道讀取到緩沖區(qū)或者從緩沖區(qū)寫入到通道。Selector 用于監(jiān)聽多個通道上的事件(比如收到連接請求、數(shù)據(jù)達到等等),因此使用單個線程就可以監(jiān)聽多個客戶端通道。如下圖所示:

關于上圖,再進行幾點說明:
一個 Selector 對應一個處理線程 一個 Selector 上可以注冊多個 Channel 每個 Channel 都會對應一個 Buffer(有時候一個 Channel 可以使用多個 Buffer,這時候程序要進行多個 Buffer 的分散和聚集操作),Buffer 的本質(zhì)是一個內(nèi)存塊,底層是一個數(shù)組 Selector 會根據(jù)不同的事件在各個 Channel 上切換 Buffer 是雙向的,既可以讀也可以寫,切換讀寫方向要調(diào)用 Buffer 的 flip()方法 同樣,Channel 也是雙向的,數(shù)據(jù)既可以流入也可以流出
1.4.1. 緩沖區(qū)(Buffer)
緩沖區(qū)(Buffer)本質(zhì)上是一個可讀可寫的內(nèi)存塊,可以理解成一個容器對象,Channel 讀寫文件或者網(wǎng)絡都要經(jīng)由 Buffer。在 Java NIO 中,Buffer 是一個頂層抽象類,它的常用子類有(前綴表示該 Buffer 可以存儲哪種類型的數(shù)據(jù)):
ByteBuffer CharBuffer ShortBuffer IntBuffer LongBuffer DoubleBuffer FloatBuffer
涵蓋了 Java 中除 boolean 之外的所有的基本數(shù)據(jù)類型。其中 ByteBuffer 支持類型化的數(shù)據(jù)存取,即可以往 ByteBuffer 中放 byte 類型數(shù)據(jù)、也可以放 char、int、long、double 等類型的數(shù)據(jù),但讀取的時候要做好類型匹配處理,否則會拋出 BufferUnderflowException。
另外,Buffer 體系中還有一個重要的 MappedByteBuffer(ByteBuffer 的子類),可以讓文件內(nèi)容直接在堆外內(nèi)存中被修改,而如何同步到文件由 NIO 來完成。本文重點不在于此,有興趣的可以去探究一下 MappedByteBuffer 的底層原理。
1.4.2. 通道(Channel)
通道(Channel)是雙向的,可讀可寫。在 Java NIO 中,Buffer 是一個頂層接口,它的常用子類有:
FileChannel:用于文件讀寫 DatagramChannel:用于 UDP 數(shù)據(jù)包收發(fā) ServerSocketChannel:用于服務端 TCP 數(shù)據(jù)包收發(fā) SocketChannel:用于客戶端 TCP 數(shù)據(jù)包收發(fā)
1.4.3. 選擇器(Selector)
選擇器(Selector)是實現(xiàn) IO 多路復用的關鍵,多個 Channel 注冊到某個 Selector 上,當 Channel 上有事件發(fā)生時,Selector 就會取得事件然后調(diào)用線程去處理事件。也就是說只有當連接上真正有讀寫等事件發(fā)生時,線程才會去進行讀寫等操作,這就不必為每個連接都創(chuàng)建一個線程,一個線程可以應對多個連接。這就是 IO 多路復用的要義。
Netty 的 IO 線程 NioEventLoop 聚合了 Selector,可以同時并發(fā)處理成百上千的客戶端連接,后文會展開描述。
在 Java NIO 中,Selector 是一個抽象類,它的常用方法有:
public?abstract?class?Selector?implements?Closeable?{
????......
????
????/**
?????*?得到一個選擇器對象
?????*/
????public?static?Selector?open()?throws?IOException?{
????????return?SelectorProvider.provider().openSelector();
????}
????......
????/**
?????*?返回所有發(fā)生事件的?Channel?對應的?SelectionKey?的集合,通過
?????*?SelectionKey?可以找到對應的?Channel
?????*/
????public?abstract?Set?selectedKeys() ;
????......
????
????/**
?????*?返回所有?Channel?對應的?SelectionKey?的集合,通過?SelectionKey
?????*?可以找到對應的?Channel
?????*/
????public?abstract?Set?keys() ;
????......
????
????/**
?????*?監(jiān)控所有注冊的?Channel,當其中的?Channel?有?IO?操作可以進行時,
?????*?將這些 Channel 對應的 SelectionKey 找到。參數(shù)用于設置超時時間
?????*/
????public?abstract?int?select(long?timeout)?throws?IOException;
????
????/**
????*?無超時時間的?select?過程,一直等待,直到發(fā)現(xiàn)有?Channel?可以進行
????*?IO?操作
????*/
????public?abstract?int?select()?throws?IOException;
????
????/**
????*?立即返回的?select?過程
????*/
????public?abstract?int?selectNow()?throws?IOException;
????......
????
????/**
????*?喚醒?Selector,對無超時時間的?select?過程起作用,終止其等待
????*/
????public?abstract?Selector?wakeup();
????......
}
在上文的使用 Java NIO 編寫的服務端示例代碼中,服務端的工作流程為:
1)當客戶端發(fā)起連接時,會通過 ServerSocketChannel 創(chuàng)建對應的 SocketChannel。
2)調(diào)用 SocketChannel 的注冊方法將 SocketChannel 注冊到 Selector 上,注冊方法返回一個 SelectionKey,該 SelectionKey 會被放入 Selector 內(nèi)部的 SelectionKey 集合中。該 SelectionKey 和 Selector 關聯(lián)(即通過 SelectionKey 可以找到對應的 Selector),也和 SocketChannel 關聯(lián)(即通過 SelectionKey 可以找到對應的 SocketChannel)。
4)Selector 會調(diào)用 select()/select(timeout)/selectNow()方法對內(nèi)部的 SelectionKey 集合關聯(lián)的 SocketChannel 集合進行監(jiān)聽,找到有事件發(fā)生的 SocketChannel 對應的 SelectionKey。
5)通過 SelectionKey 找到有事件發(fā)生的 SocketChannel,完成數(shù)據(jù)處理。
以上過程的相關源碼為:
/**
*?SocketChannel?繼承?AbstractSelectableChannel
*/
public?abstract?class?SocketChannel
????extends?AbstractSelectableChannel
????implements?ByteChannel,?
???????????????ScatteringByteChannel,?
???????????????GatheringByteChannel,?
???????????????NetworkChannel
{
????......
}
public?abstract?class?AbstractSelectableChannel
????extends?SelectableChannel
{
????......
????/**
?????*?AbstractSelectableChannel?中包含注冊方法,SocketChannel?實例
?????*?借助該注冊方法注冊到?Selector?實例上去,該方法返回?SelectionKey
?????*/
????public?final?SelectionKey?register(
????????//?指明注冊到哪個?Selector?實例
????????Selector?sel,?
????????//?ops?是事件代碼,告訴?Selector?應該關注該通道的什么事件
????????int?ops,
????????//?附加信息?attachment
????????Object?att)?throws?ClosedChannelException?{
????????......
????}
????......
}
public?abstract?class?SelectionKey?{
????......
????/**
?????*?獲取該?SelectionKey?對應的?Channel
?????*/
????public?abstract?SelectableChannel?channel();
????/**
?????*?獲取該?SelectionKey?對應的?Selector
?????*/
????public?abstract?Selector?selector();
????......
????
????/**
?????*?事件代碼,上面的?ops?參數(shù)取這里的值
?????*/
????public?static?final?int?OP_READ?=?1?<0;
????public?static?final?int?OP_WRITE?=?1?<2;
????public?static?final?int?OP_CONNECT?=?1?<3;
????public?static?final?int?OP_ACCEPT?=?1?<4;
????......
????
????/**
?????*?檢查該?SelectionKey?對應的?Channel?是否可讀
?????*/
????public?final?boolean?isReadable()?{
????????return?(readyOps()?&?OP_READ)?!=?0;
????}
????/**
?????*?檢查該?SelectionKey?對應的?Channel?是否可寫
?????*/
????public?final?boolean?isWritable()?{
????????return?(readyOps()?&?OP_WRITE)?!=?0;
????}
????/**
?????*?檢查該?SelectionKey?對應的?Channel?是否已經(jīng)建立起?socket?連接
?????*/
????public?final?boolean?isConnectable()?{
????????return?(readyOps()?&?OP_CONNECT)?!=?0;
????}
????/**
?????*?檢查該?SelectionKey?對應的?Channel?是否準備好接受一個新的?socket?連接
?????*/
????public?final?boolean?isAcceptable()?{
????????return?(readyOps()?&?OP_ACCEPT)?!=?0;
????}
????/**
?????*?添加附件(例如?Buffer)
?????*/
????public?final?Object?attach(Object?ob)?{
????????return?attachmentUpdater.getAndSet(this,?ob);
????}
????/**
?????*?獲取附件
?????*/
????public?final?Object?attachment()?{
????????return?attachment;
????}
????......
}
下圖用于輔助讀者理解上面的過程和源碼:

首先說明,本文以 Linux 系統(tǒng)為對象來研究文件 IO 模型和網(wǎng)絡 IO 模型。
1.5. 零拷貝技術
注:本節(jié)討論的是 Linux 系統(tǒng)下的 IO 過程。并且對于零拷貝技術的講解采用了一種淺顯易懂但能觸及其本質(zhì)的方式,因為這個話題,展開來講實在是有太多的細節(jié)要關注。
在“將本地磁盤中文件發(fā)送到網(wǎng)絡中”這一場景中,零拷貝技術是提升 IO 效率的一個利器,為了對比出零拷貝技術的優(yōu)越性,下面依次給出使用直接 IO 技術、內(nèi)存映射文件技術、零拷貝技術實現(xiàn)將本地磁盤文件發(fā)送到網(wǎng)絡中的過程。
1)直接 IO 技術
使用直接 IO 技術實現(xiàn)文件傳輸?shù)倪^程如下圖所示。

上圖中,內(nèi)核緩沖區(qū)是 Linux 系統(tǒng)的 Page Cahe。為了加快磁盤的 IO,Linux 系統(tǒng)會把磁盤上的數(shù)據(jù)以 Page 為單位緩存在操作系統(tǒng)的內(nèi)存里,這里的 Page 是 Linux 系統(tǒng)定義的一個邏輯概念,一個 Page 一般為 4K。
可以看出,整個過程有四次數(shù)據(jù)拷貝,讀進來兩次,寫回去又兩次:磁盤-->內(nèi)核緩沖區(qū)-->Socket 緩沖區(qū)-->網(wǎng)絡。
直接 IO 過程使用的 Linux 系統(tǒng) API 為:
ssize_t?read(int?filedes,?void?*buf,?size_t?nbytes);
ssize_t?write(int?filedes,?void?*buf,?size_t?nbytes);
等函數(shù)。
2)內(nèi)存映射文件技術
使用內(nèi)存映射文件技術實現(xiàn)文件傳輸?shù)倪^程如下圖所示。

可以看出,整個過程有三次數(shù)據(jù)拷貝,不再經(jīng)過應用程序內(nèi)存,直接在內(nèi)核空間中從內(nèi)核緩沖區(qū)拷貝到 Socket 緩沖區(qū)。
內(nèi)存映射文件過程使用的 Linux 系統(tǒng) API 為:
void?*mmap(void?*addr,?size_t?length,?int?prot,?int?flags,?int?fd,?off_t?offset);
3)零拷貝技術
使用零拷貝技術,連內(nèi)核緩沖區(qū)到 Socket 緩沖區(qū)的拷貝也省略了,如下圖所示:

內(nèi)核緩沖區(qū)到 Socket 緩沖區(qū)之間并沒有做數(shù)據(jù)的拷貝,只是一個地址的映射。底層的網(wǎng)卡驅(qū)動程序要讀取數(shù)據(jù)并發(fā)送到網(wǎng)絡上的時候,看似讀取的是 Socket 的緩沖區(qū)中的數(shù)據(jù),其實直接讀的是內(nèi)核緩沖區(qū)中的數(shù)據(jù)。
零拷貝中所謂的“零”指的是內(nèi)存中數(shù)據(jù)拷貝的次數(shù)為 0。
零拷貝過程使用的 Linux 系統(tǒng) API 為:
ssize_t?sendfile(int?out_fd,?int?in_fd,?off_t?*offset,?size_t?count);
在 JDK 中,提供的:
FileChannel.transderTo(long?position,?long?count,?WritableByteChannel?target);
方法實現(xiàn)了零拷貝過程,其中的第三個參數(shù)可以傳入 SocketChannel 實例。例如客戶端使用以上的零拷貝接口向服務器傳輸文件的代碼為:
public?static?void?main(String[]?args)?throws?IOException?{
????SocketChannel?socketChannel?=?SocketChannel.open();
????socketChannel.connect(new?InetSocketAddress("127.0.0.1",?8080));
????String?fileName?=?"test.zip";
????//?得到一個文件?channel
????FileChannel?fileChannel?=?new?FileInputStream(fileName).getChannel();
????
????//?使用零拷貝?IO?技術發(fā)送
????long?transferSize?=?fileChannel.transferTo(0,?fileChannel.size(),?socketChannel);
????System.out.println("file?transfer?done,?size:?"?+?transferSize);
????fileChannel.close();
}
以上部分為第一章,學習 Netty 需要的基礎知識。
2. Netty 的架構與原理
2.1. 為什么要制造 Netty
既然 Java 提供了 NIO,為什么還要制造一個 Netty,主要原因是 Java NIO 有以下幾個缺點:
1)Java NIO 的類庫和 API 龐大繁雜,使用起來很麻煩,開發(fā)工作量大。
2)使用 Java NIO,程序員需要具備高超的 Java 多線程編碼技能,以及非常熟悉網(wǎng)絡編程,比如要處理斷連重連、網(wǎng)絡閃斷、半包讀寫、失敗緩存、網(wǎng)絡擁塞和異常流處理等一系列棘手的工作。
3)Java NIO 存在 Bug,例如 Epoll Bug 會導致 Selector 空輪訓,極大耗費 CPU 資源。
Netty 對于 JDK 自帶的 NIO 的 API 進行了封裝,解決了上述問題,提高了 IO 程序的開發(fā)效率和可靠性,同時 Netty:
1)設計優(yōu)雅,提供阻塞和非阻塞的 Socket;提供靈活可拓展的事件模型;提供高度可定制的線程模型。
2)具備更高的性能和更大的吞吐量,使用零拷貝技術最小化不必要的內(nèi)存復制,減少資源的消耗。
3)提供安全傳輸特性。
4)支持多種主流協(xié)議;預置多種編解碼功能,支持用戶開發(fā)私有協(xié)議。
**注:所謂支持 TCP、UDP、HTTP、WebSocket 等協(xié)議,就是說 Netty 提供了相關的編程類和接口,因此本文后面主要對基于 Netty 的 TCP Server/Client 開發(fā)案例進行講解,以展示 Netty 的核心原理,對于其他協(xié)議 Server/Client 開發(fā)不再給出示例,幫助讀者提升內(nèi)力而非教授花招是我寫作的出發(fā)點 :-) **
下圖為 Netty 官網(wǎng)給出的 Netty 架構圖。

我們從其中的幾個關鍵詞就能看出 Netty 的強大之處:零拷貝、可拓展事件模型;支持 TCP、UDP、HTTP、WebSocket 等協(xié)議;提供安全傳輸、壓縮、大文件傳輸、編解碼支持等等。
2.2. 幾種 Reactor 線程模式
傳統(tǒng)的 BIO 服務端編程采用“每線程每連接”的處理模型,弊端很明顯,就是面對大量的客戶端并發(fā)連接時,服務端的資源壓力很大;并且線程的利用率很低,如果當前線程沒有數(shù)據(jù)可讀,它會阻塞在 read 操作上。這個模型的基本形態(tài)如下圖所示(圖片來源于網(wǎng)絡)。

BIO 服務端編程采用的是 Reactor 模式(也叫做 Dispatcher 模式,分派模式),Reactor 模式有兩個要義:
1)基于 IO 多路復用技術,多個連接共用一個多路復用器,應用程序的線程無需阻塞等待所有連接,只需阻塞等待多路復用器即可。當某個連接上有新數(shù)據(jù)可以處理時,應用程序的線程從阻塞狀態(tài)返回,開始處理這個連接上的業(yè)務。
2)基于線程池技術復用線程資源,不必為每個連接創(chuàng)建專用的線程,應用程序?qū)⑦B接上的業(yè)務處理任務分配給線程池中的線程進行處理,一個線程可以處理多個連接的業(yè)務。
下圖反應了 Reactor 模式的基本形態(tài)(圖片來源于網(wǎng)絡):

Reactor 模式有兩個核心組成部分:
1)Reactor(圖中的 ServiceHandler):Reactor 在一個單獨的線程中運行,負責監(jiān)聽和分發(fā)事件,分發(fā)給適當?shù)奶幚砭€程來對 IO 事件做出反應。
2)Handlers(圖中的 EventHandler):處理線程執(zhí)行處理方法來響應 I/O 事件,處理線程執(zhí)行的是非阻塞操作。
Reactor 模式就是實現(xiàn)網(wǎng)絡 IO 程序高并發(fā)特性的關鍵。它又可以分為單 Reactor 單線程模式、單 Reactor 多線程模式、主從 Reactor 多線程模式。
2.2.1. 單 Reactor 單線程模式
單 Reactor 單線程模式的基本形態(tài)如下(圖片來源于網(wǎng)絡):

這種模式的基本工作流程為:
1)Reactor 通過 select 監(jiān)聽客戶端請求事件,收到事件之后通過 dispatch 進行分發(fā)
2)如果事件是建立連接的請求事件,則由 Acceptor 通過 accept 處理連接請求,然后創(chuàng)建一個 Handler 對象處理連接建立后的后續(xù)業(yè)務處理。
3)如果事件不是建立連接的請求事件,則由 Reactor 對象分發(fā)給連接對應的 Handler 處理。
4)Handler 會完成 read-->業(yè)務處理-->send 的完整處理流程。
這種模式的優(yōu)點是:模型簡單,沒有多線程、進程通信、競爭的問題,一個線程完成所有的事件響應和業(yè)務處理。當然缺點也很明顯:
1)存在性能問題,只有一個線程,無法完全發(fā)揮多核 CPU 的性能。Handler 在處理某個連接上的業(yè)務時,整個進程無法處理其他連接事件,很容易導致性能瓶頸。
2)存在可靠性問題,若線程意外終止,或者進入死循環(huán),會導致整個系統(tǒng)通信模塊不可用,不能接收和處理外部消息,造成節(jié)點故障。
單 Reactor 單線程模式使用場景為:客戶端的數(shù)量有限,業(yè)務處理非??焖?,比如 Redis 在業(yè)務處理的時間復雜度為 O(1)的情況。
2.2.2. 單 Reactor 多線程模式
單 Reactor 單線程模式的基本形態(tài)如下(圖片來源于網(wǎng)絡):

這種模式的基本工作流程為:
1)Reactor 對象通過 select 監(jiān)聽客戶端請求事件,收到事件后通過 dispatch 進行分發(fā)。
2)如果事件是建立連接的請求事件,則由 Acceptor 通過 accept 處理連接請求,然后創(chuàng)建一個 Handler 對象處理連接建立后的后續(xù)業(yè)務處理。
3)如果事件不是建立連接的請求事件,則由 Reactor 對象分發(fā)給連接對應的 Handler 處理。Handler 只負責響應事件,不做具體的業(yè)務處理,Handler 通過 read 讀取到請求數(shù)據(jù)后,會分發(fā)給后面的 Worker 線程池來處理業(yè)務請求。
4)Worker 線程池會分配獨立線程來完成真正的業(yè)務處理,并將處理結果返回給 Handler。Handler 通過 send 向客戶端發(fā)送響應數(shù)據(jù)。
這種模式的優(yōu)點是可以充分的利用多核 cpu 的處理能力,缺點是多線程數(shù)據(jù)共享和控制比較復雜,Reactor 處理所有的事件的監(jiān)聽和響應,在單線程中運行,面對高并發(fā)場景還是容易出現(xiàn)性能瓶頸。
2.2.3. 主從 Reactor 多線程模式
主從 Reactor 多線程模式的基本形態(tài)如下(第一章圖片來源于網(wǎng)絡,第二章圖片是 JUC 作者 Doug Lea 老師在《Scalable IO in Java》中給出的示意圖,兩張圖表達的含義一樣):


針對單 Reactor 多線程模型中,Reactor 在單個線程中運行,面對高并發(fā)的場景易成為性能瓶頸的缺陷,主從 Reactor 多線程模式讓 Reactor 在多個線程中運行(分成 MainReactor 線程與 SubReactor 線程)。這種模式的基本工作流程為:
1)Reactor 主線程 MainReactor 對象通過 select 監(jiān)聽客戶端連接事件,收到事件后,通過 Acceptor 處理客戶端連接事件。
2)當 Acceptor 處理完客戶端連接事件之后(與客戶端建立好 Socket 連接),MainReactor 將連接分配給 SubReactor。(即:MainReactor 只負責監(jiān)聽客戶端連接請求,和客戶端建立連接之后將連接交由 SubReactor 監(jiān)聽后面的 IO 事件。)
3)SubReactor 將連接加入到自己的連接隊列進行監(jiān)聽,并創(chuàng)建 Handler 對各種事件進行處理。
4)當連接上有新事件發(fā)生的時候,SubReactor 就會調(diào)用對應的 Handler 處理。
5)Handler 通過 read 從連接上讀取請求數(shù)據(jù),將請求數(shù)據(jù)分發(fā)給 Worker 線程池進行業(yè)務處理。
6)Worker 線程池會分配獨立線程來完成真正的業(yè)務處理,并將處理結果返回給 Handler。Handler 通過 send 向客戶端發(fā)送響應數(shù)據(jù)。
7)一個 MainReactor 可以對應多個 SubReactor,即一個 MainReactor 線程可以對應多個 SubReactor 線程。
這種模式的優(yōu)點是:
1)MainReactor 線程與 SubReactor 線程的數(shù)據(jù)交互簡單職責明確,MainReactor 線程只需要接收新連接,SubReactor 線程完成后續(xù)的業(yè)務處理。
2)MainReactor 線程與 SubReactor 線程的數(shù)據(jù)交互簡單, MainReactor 線程只需要把新連接傳給 SubReactor 線程,SubReactor 線程無需返回數(shù)據(jù)。
3)多個 SubReactor 線程能夠應對更高的并發(fā)請求。
這種模式的缺點是編程復雜度較高。但是由于其優(yōu)點明顯,在許多項目中被廣泛使用,包括 Nginx、Memcached、Netty 等。
這種模式也被叫做服務器的 1+M+N 線程模式,即使用該模式開發(fā)的服務器包含一個(或多個,1 只是表示相對較少)連接建立線程+M 個 IO 線程+N 個業(yè)務處理線程。這是業(yè)界成熟的服務器程序設計模式。
2.3. Netty 的模樣
Netty 的設計主要基于主從 Reactor 多線程模式,并做了一定的改進。本節(jié)將使用一種漸進式的描述方式展示 Netty 的模樣,即先給出 Netty 的簡單版本,然后逐漸豐富其細節(jié),直至展示出 Netty 的全貌。
簡單版本的 Netty 的模樣如下:

關于這張圖,作以下幾點說明:
1)BossGroup 線程維護 Selector,ServerSocketChannel 注冊到這個 Selector 上,只關注連接建立請求事件(相當于主 Reactor)。
2)當接收到來自客戶端的連接建立請求事件的時候,通過 ServerSocketChannel.accept 方法獲得對應的 SocketChannel,并封裝成 NioSocketChannel 注冊到 WorkerGroup 線程中的 Selector,每個 Selector 運行在一個線程中(相當于從 Reactor)。
3)當 WorkerGroup 線程中的 Selector 監(jiān)聽到自己感興趣的 IO 事件后,就調(diào)用 Handler 進行處理。
我們給這簡單版的 Netty 添加一些細節(jié):

關于這張圖,作以下幾點說明:
1)有兩組線程池:BossGroup 和 WorkerGroup,BossGroup 中的線程(可以有多個,圖中只畫了一個)專門負責和客戶端建立連接,WorkerGroup 中的線程專門負責處理連接上的讀寫。
2)BossGroup 和 WorkerGroup 含有多個不斷循環(huán)的執(zhí)行事件處理的線程,每個線程都包含一個 Selector,用于監(jiān)聽注冊在其上的 Channel。
3)每個 BossGroup 中的線程循環(huán)執(zhí)行以下三個步驟:
3.1)輪訓注冊在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
3.2)處理 accept 事件,與客戶端建立連接,生成一個 NioSocketChannel,并將其注冊到 WorkerGroup 中某個線程上的 Selector 上
3.3)再去以此循環(huán)處理任務隊列中的下一個事件
4)每個 WorkerGroup 中的線程循環(huán)執(zhí)行以下三個步驟:
4.1)輪訓注冊在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)
4.2)在對應的 NioSocketChannel 上處理 read/write 事件
4.3)再去以此循環(huán)處理任務隊列中的下一個事件
我們再來看下終極版的 Netty 的模樣,如下圖所示(圖片來源于網(wǎng)絡):

關于這張圖,作以下幾點說明:
1)Netty 抽象出兩組線程池:BossGroup 和 WorkerGroup,也可以叫做 BossNioEventLoopGroup 和 WorkerNioEventLoopGroup。每個線程池中都有 NioEventLoop 線程。BossGroup 中的線程專門負責和客戶端建立連接,WorkerGroup 中的線程專門負責處理連接上的讀寫。BossGroup 和 WorkerGroup 的類型都是 NioEventLoopGroup。
2)NioEventLoopGroup 相當于一個事件循環(huán)組,這個組中含有多個事件循環(huán),每個事件循環(huán)就是一個 NioEventLoop。
3)NioEventLoop 表示一個不斷循環(huán)的執(zhí)行事件處理的線程,每個 NioEventLoop 都包含一個 Selector,用于監(jiān)聽注冊在其上的 Socket 網(wǎng)絡連接(Channel)。
4)NioEventLoopGroup 可以含有多個線程,即可以含有多個 NioEventLoop。
5)每個 BossNioEventLoop 中循環(huán)執(zhí)行以下三個步驟:
5.1)select:輪訓注冊在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
5.2)processSelectedKeys:處理 accept 事件,與客戶端建立連接,生成一個 NioSocketChannel,并將其注冊到某個 WorkerNioEventLoop 上的 Selector 上
5.3)runAllTasks:再去以此循環(huán)處理任務隊列中的其他任務
6)每個 WorkerNioEventLoop 中循環(huán)執(zhí)行以下三個步驟:
6.1)select:輪訓注冊在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)
6.2)processSelectedKeys:在對應的 NioSocketChannel 上處理 read/write 事件
6.3)runAllTasks:再去以此循環(huán)處理任務隊列中的其他任務
7)在以上兩個processSelectedKeys步驟中,會使用 Pipeline(管道),Pipeline 中引用了 Channel,即通過 Pipeline 可以獲取到對應的 Channel,Pipeline 中維護了很多的處理器(攔截處理器、過濾處理器、自定義處理器等)。這里暫時不詳細展開講解 Pipeline。
2.4. 基于 Netty 的 TCP Server/Client 案例
下面我們寫點代碼來加深理解 Netty 的模樣。下面兩段代碼分別是基于 Netty 的 TCP Server 和 TCP Client。
服務端代碼為:
/**
?*?需要的依賴:
?*?
?*?io.netty
?*?netty-all
?*?4.1.52.Final
?*?
?*/
public?static?void?main(String[]?args)?throws?InterruptedException?{
????//?創(chuàng)建?BossGroup?和?WorkerGroup
????//?1.?bossGroup?只處理連接請求
????//?2.?業(yè)務處理由?workerGroup?來完成
????EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();
????EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();
????try?{
????????//?創(chuàng)建服務器端的啟動對象
????????ServerBootstrap?bootstrap?=?new?ServerBootstrap();
????????//?配置參數(shù)
????????bootstrap
????????????????//?設置線程組
????????????????.group(bossGroup,?workerGroup)
????????????????//?說明服務器端通道的實現(xiàn)類(便于?Netty?做反射處理)
????????????????.channel(NioServerSocketChannel.class)
????????????????//?設置等待連接的隊列的容量(當客戶端連接請求速率大
?????????????//?于?NioServerSocketChannel?接收速率的時候,會使用
????????????????//?該隊列做緩沖)
????????????????//?option()方法用于給服務端的?ServerSocketChannel
????????????????//?添加配置
????????????????.option(ChannelOption.SO_BACKLOG,?128)
????????????????//?設置連接?;?br>????????????????//?childOption()方法用于給服務端?ServerSocketChannel
????????????????//?接收到的?SocketChannel?添加配置
????????????????.childOption(ChannelOption.SO_KEEPALIVE,?true)
????????????????//?handler()方法用于給?BossGroup?設置業(yè)務處理器
????????????????//?childHandler()方法用于給?WorkerGroup?設置業(yè)務處理器
????????????????.childHandler(
????????????????????????//?創(chuàng)建一個通道初始化對象
????????????????????????new?ChannelInitializer<SocketChannel>()?{
????????????????????????????//?向?Pipeline?添加業(yè)務處理器
????????????????????????????@Override
????????????????????????????protected?void?initChannel(
????????????????????????????????????SocketChannel?socketChannel
????????????????????????????)?throws?Exception?{
????????????????????????????????socketChannel.pipeline().addLast(
????????????????????????????????????????new?NettyServerHandler()
????????????????????????????????);
????????????????????????????????
????????????????????????????????//?可以繼續(xù)調(diào)用?socketChannel.pipeline().addLast()
????????????????????????????????//?添加更多?Handler
????????????????????????????}
????????????????????????}
????????????????);
????????System.out.println("server?is?ready...");
????????//?綁定端口,啟動服務器,生成一個?channelFuture?對象,
????????//?ChannelFuture?涉及到?Netty?的異步模型,后面展開講
????????ChannelFuture?channelFuture?=?bootstrap.bind(8080).sync();
????????//?對通道關閉進行監(jiān)聽
????????channelFuture.channel().closeFuture().sync();
????}?finally?{
????????bossGroup.shutdownGracefully();
????????workerGroup.shutdownGracefully();
????}
}
/**
?*?自定義一個?Handler,需要繼承?Netty?規(guī)定好的某個?HandlerAdapter(規(guī)范)
?*?InboundHandler?用于處理數(shù)據(jù)流入本端(服務端)的?IO?事件
?*?InboundHandler?用于處理數(shù)據(jù)流出本端(服務端)的?IO?事件
?*/
static?class?NettyServerHandler?extends?ChannelInboundHandlerAdapter?{
????/**
?????*?當通道有數(shù)據(jù)可讀時執(zhí)行
?????*
?????*?@param?ctx?上下文對象,可以從中取得相關聯(lián)的?Pipeline、Channel、客戶端地址等
?????*?@param?msg?客戶端發(fā)送的數(shù)據(jù)
?????*?@throws?Exception
?????*/
????@Override
????public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)
????????????throws?Exception?{
????????//?接收客戶端發(fā)來的數(shù)據(jù)
????????System.out.println("client?address:?"
????????????????+?ctx.channel().remoteAddress());
????????//?ByteBuf?是?Netty?提供的類,比?NIO?的?ByteBuffer?性能更高
????????ByteBuf?byteBuf?=?(ByteBuf)?msg;
????????System.out.println("data?from?client:?"
????????????????+?byteBuf.toString(CharsetUtil.UTF_8));
????}
????/**
?????*?數(shù)據(jù)讀取完畢后執(zhí)行
?????*
?????*?@param?ctx?上下文對象
?????*?@throws?Exception
?????*/
????@Override
????public?void?channelReadComplete(ChannelHandlerContext?ctx)
????????????throws?Exception?{
????????//?發(fā)送響應給客戶端
????????ctx.writeAndFlush(
????????????????//?Unpooled?類是?Netty?提供的專門操作緩沖區(qū)的工具
????????????????//?類,copiedBuffer?方法返回的?ByteBuf?對象類似于
????????????????//?NIO?中的?ByteBuffer,但性能更高
????????????????Unpooled.copiedBuffer(
????????????????????????"hello?client!?i?have?got?your?data.",
????????????????????????CharsetUtil.UTF_8
????????????????)
????????);
????}
????/**
?????*?發(fā)生異常時執(zhí)行
?????*
?????*?@param?ctx???上下文對象
?????*?@param?cause?異常對象
?????*?@throws?Exception
?????*/
????@Override
????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)
????????????throws?Exception?{
????????//?關閉與客戶端的?Socket?連接
????????ctx.channel().close();
????}
}
客戶端端代碼為:
/**
?*?需要的依賴:
?*?
?*?io.netty
?*?netty-all
?*?4.1.52.Final
?*?
?*/
public?static?void?main(String[]?args)?throws?InterruptedException?{
????//?客戶端只需要一個事件循環(huán)組,可以看做?BossGroup
????EventLoopGroup?eventLoopGroup?=?new?NioEventLoopGroup();
????try?{
????????//?創(chuàng)建客戶端的啟動對象
????????Bootstrap?bootstrap?=?new?Bootstrap();
????????//?配置參數(shù)
????????bootstrap
????????????????//?設置線程組
????????????????.group(eventLoopGroup)
????????????????//?說明客戶端通道的實現(xiàn)類(便于?Netty?做反射處理)
????????????????.channel(NioSocketChannel.class)
????????????????//?handler()方法用于給?BossGroup?設置業(yè)務處理器
????????????????.handler(
????????????????????????//?創(chuàng)建一個通道初始化對象
????????????????????????new?ChannelInitializer<SocketChannel>()?{
????????????????????????????//?向?Pipeline?添加業(yè)務處理器
????????????????????????????@Override
????????????????????????????protected?void?initChannel(
????????????????????????????????????SocketChannel?socketChannel
????????????????????????????)?throws?Exception?{
????????????????????????????????socketChannel.pipeline().addLast(
????????????????????????????????????????new?NettyClientHandler()
????????????????????????????????);
????????????????????????????????
????????????????????????????????//?可以繼續(xù)調(diào)用?socketChannel.pipeline().addLast()
????????????????????????????????//?添加更多?Handler
????????????????????????????}
????????????????????????}
????????????????);
????????System.out.println("client?is?ready...");
????????//?啟動客戶端去連接服務器端,ChannelFuture?涉及到?Netty?的異步模型,后面展開講
????????ChannelFuture?channelFuture?=?bootstrap.connect(
????????????????"127.0.0.1",
????????????????8080).sync();
????????//?對通道關閉進行監(jiān)聽
????????channelFuture.channel().closeFuture().sync();
????}?finally?{
????????eventLoopGroup.shutdownGracefully();
????}
}
/**
?*?自定義一個?Handler,需要繼承?Netty?規(guī)定好的某個?HandlerAdapter(規(guī)范)
?*?InboundHandler?用于處理數(shù)據(jù)流入本端(客戶端)的?IO?事件
?*?InboundHandler?用于處理數(shù)據(jù)流出本端(客戶端)的?IO?事件
?*/
static?class?NettyClientHandler?extends?ChannelInboundHandlerAdapter?{
????/**
?????*?通道就緒時執(zhí)行
?????*
?????*?@param?ctx?上下文對象
?????*?@throws?Exception
?????*/
????@Override
????public?void?channelActive(ChannelHandlerContext?ctx)
????????????throws?Exception?{
????????//?向服務器發(fā)送數(shù)據(jù)
????????ctx.writeAndFlush(
????????????????//?Unpooled?類是?Netty?提供的專門操作緩沖區(qū)的工具
????????????????//?類,copiedBuffer?方法返回的?ByteBuf?對象類似于
????????????????//?NIO?中的?ByteBuffer,但性能更高
????????????????Unpooled.copiedBuffer(
????????????????????????"hello?server!",
????????????????????????CharsetUtil.UTF_8
????????????????)
????????);
????}
????/**
?????*?當通道有數(shù)據(jù)可讀時執(zhí)行
?????*
?????*?@param?ctx?上下文對象
?????*?@param?msg?服務器端發(fā)送的數(shù)據(jù)
?????*?@throws?Exception
?????*/
????@Override
????public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)
????????????throws?Exception?{
????????//?接收服務器端發(fā)來的數(shù)據(jù)
????????System.out.println("server?address:?"
????????????????+?ctx.channel().remoteAddress());
????????//?ByteBuf?是?Netty?提供的類,比?NIO?的?ByteBuffer?性能更高
????????ByteBuf?byteBuf?=?(ByteBuf)?msg;
????????System.out.println("data?from?server:?"
????????????????+?byteBuf.toString(CharsetUtil.UTF_8));
????}
????/**
?????*?發(fā)生異常時執(zhí)行
?????*
?????*?@param?ctx???上下文對象
?????*?@param?cause?異常對象
?????*?@throws?Exception
?????*/
????@Override
????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)
????????????throws?Exception?{
????????//?關閉與服務器端的?Socket?連接
????????ctx.channel().close();
????}
}
什么?你覺得使用 Netty 編程難度和工作量更大了?不會吧不會吧,你要知道,你通過這么兩段簡短的代碼得到了一個基于主從 Reactor 多線程模式的服務器,一個高吞吐量和并發(fā)量的服務器,一個異步處理服務器……你還要怎樣?
對上面的兩段代碼,作以下簡單說明:
1)Bootstrap 和 ServerBootstrap 分別是客戶端和服務器端的引導類,一個 Netty 應用程序通常由一個引導類開始,主要是用來配置整個 Netty 程序、設置業(yè)務處理類(Handler)、綁定端口、發(fā)起連接等。
2)客戶端創(chuàng)建一個 NioSocketChannel 作為客戶端通道,去連接服務器。
3)服務端首先創(chuàng)建一個 NioServerSocketChannel 作為服務器端通道,每當接收一個客戶端連接就產(chǎn)生一個 NioSocketChannel 應對該客戶端。
4)使用 Channel 構建網(wǎng)絡 IO 程序的時候,不同的協(xié)議、不同的阻塞類型和 Netty 中不同的 Channel 對應,常用的 Channel 有:
NioSocketChannel:非阻塞的 TCP 客戶端 Channel(本案例的客戶端使用的 Channel)
NioServerSocketChannel:非阻塞的 TCP 服務器端 Channel(本案例的服務器端使用的 Channel)
NioDatagramChannel:非阻塞的 UDP Channel
NioSctpChannel:非阻塞的 SCTP 客戶端 Channel
NioSctpServerChannel:非阻塞的 SCTP 服務器端 Channel
......
啟動服務端和客戶端代碼,調(diào)試以上的服務端代碼,發(fā)現(xiàn):
1)默認情況下 BossGroup 和 WorkerGroup 都包含 16 個線程(NioEventLoop),這是因為我的 PC 是 8 核的 NioEventLoop 的數(shù)量=coreNum*2。這 16 個線程相當于主 Reactor。



其實創(chuàng)建 BossGroup 和 WorkerGroup 的時候可以指定 NioEventLoop 數(shù)量,如下:
EventLoopGroup?bossGroup?=?new?NioEventLoopGroup(1);
EventLoopGroup?workerGroup?=?new?NioEventLoopGroup(16);
這樣就能更好地分配線程資源。
2)每一個 NioEventLoop 包含如下的屬性(比如自己的 Selector、任務隊列、執(zhí)行器等):

3)將代碼斷在服務端的 NettyServerHandler.channelRead 上:

可以看到 ctx 中包含的屬性如下:

可以看到:
當前 ChannelHandlerContext ctx 是位于 ChannelHandlerContext 責任鏈中的一環(huán),可以看到其 next、prev 屬性
當前 ChannelHandlerContext ctx 包含一個 Handler
當前 ChannelHandlerContext ctx 包含一個 Pipeline
Pipeline 本質(zhì)上是一個雙向循環(huán)列表,可以看到其 tail、head 屬性
Pipeline 中包含一個 Channel,Channel 中又包含了該 Pipeline,兩者互相引用
……
從下一節(jié)開始,我將深入剖析以上兩段代碼,向讀者展示 Netty 的更多細節(jié)。
2.5. Netty 的 Handler 組件
無論是服務端代碼中自定義的 NettyServerHandler 還是客戶端代碼中自定義的 NettyClientHandler,都繼承于 ChannelInboundHandlerAdapter,ChannelInboundHandlerAdapter 又繼承于 ChannelHandlerAdapter,ChannelHandlerAdapter 又實現(xiàn)了 ChannelHandler:
public?class?ChannelInboundHandlerAdapter?
????extends?ChannelHandlerAdapter?
????implements?ChannelInboundHandler?{
????......
public?abstract?class?ChannelHandlerAdapter?
????implements?ChannelHandler?{
????......
因此無論是服務端代碼中自定義的 NettyServerHandler 還是客戶端代碼中自定義的 NettyClientHandler,都可以統(tǒng)稱為 ChannelHandler。
Netty 中的 ChannelHandler 的作用是,在當前 ChannelHandler 中處理 IO 事件,并將其傳遞給 ChannelPipeline 中下一個 ChannelHandler 處理,因此多個 ChannelHandler 形成一個責任鏈,責任鏈位于 ChannelPipeline 中。
數(shù)據(jù)在基于 Netty 的服務器或客戶端中的處理流程是:讀取數(shù)據(jù)-->解碼數(shù)據(jù)-->處理數(shù)據(jù)-->編碼數(shù)據(jù)-->發(fā)送數(shù)據(jù)。其中的每個過程都用得到 ChannelHandler 責任鏈。

Netty 中的 ChannelHandler 體系如下(第一張圖來源于網(wǎng)絡):


其中:
ChannelInboundHandler 用于處理入站 IO 事件 ChannelOutboundHandler 用于處理出站 IO 事件 ChannelInboundHandlerAdapter 用于處理入站 IO 事件 ChannelOutboundHandlerAdapter 用于處理出站 IO 事件
ChannelPipeline 提供了 ChannelHandler 鏈的容器。以客戶端應用程序為例,如果事件的方向是從客戶端到服務器的,我們稱事件是出站的,那么客戶端發(fā)送給服務器的數(shù)據(jù)會通過 Pipeline 中的一系列 ChannelOutboundHandler 進行處理;如果事件的方向是從服務器到客戶端的,我們稱事件是入站的,那么服務器發(fā)送給客戶端的數(shù)據(jù)會通過 Pipeline 中的一系列 ChannelInboundHandler 進行處理。

無論是服務端代碼中自定義的 NettyServerHandler 還是客戶端代碼中自定義的 NettyClientHandler,都繼承于 ChannelInboundHandlerAdapter,ChannelInboundHandlerAdapter 提供的方法如下:

從方法名字可以看出,它們在不同的事件發(fā)生后被觸發(fā),例如注冊 Channel 時執(zhí)行 channelRegistred()、添加 ChannelHandler 時執(zhí)行 handlerAdded()、收到入站數(shù)據(jù)時執(zhí)行 channelRead()、入站數(shù)據(jù)讀取完畢后執(zhí)行 channelReadComplete()等等。
2.6. Netty 的 Pipeline 組件
上一節(jié)說到,Netty 的 ChannelPipeline,它維護了一個 ChannelHandler 責任鏈,負責攔截或者處理 inbound(入站)和 outbound(出站)的事件和操作。這一節(jié)給出更深層次的描述。
ChannelPipeline 實現(xiàn)了一種高級形式的攔截過濾器模式,使用戶可以完全控制事件的處理方式,以及 Channel 中各個 ChannelHandler 如何相互交互。
每個 Netty Channel 包含了一個 ChannelPipeline(其實 Channel 和 ChannelPipeline 互相引用),而 ChannelPipeline 又維護了一個由 ChannelHandlerContext 構成的雙向循環(huán)列表,其中的每一個 ChannelHandlerContext 都包含一個 ChannelHandler。(前文描述的時候為了簡便,直接說 ChannelPipeline 包含了一個 ChannelHandler 責任鏈,這里給出完整的細節(jié)。)
如下圖所示(圖片來源于網(wǎng)絡):

還記得下面這張圖嗎?這是上文中基于 Netty 的 Server 程序的調(diào)試截圖,可以從中看到 ChannelHandlerContext 中包含了哪些成分:

ChannelHandlerContext 除了包含 ChannelHandler 之外,還關聯(lián)了對應的 Channel 和 Pipeline??梢赃@么來講:ChannelHandlerContext、ChannelHandler、Channel、ChannelPipeline 這幾個組件之間互相引用,互為各自的屬性,你中有我、我中有你。
在處理入站事件的時候,入站事件及數(shù)據(jù)會從 Pipeline 中的雙向鏈表的頭 ChannelHandlerContext 流向尾 ChannelHandlerContext,并依次在其中每個 ChannelInboundHandler(例如解碼 Handler)中得到處理;出站事件及數(shù)據(jù)會從 Pipeline 中的雙向鏈表的尾 ChannelHandlerContext 流向頭 ChannelHandlerContext,并依次在其中每個 ChannelOutboundHandler(例如編碼 Handler)中得到處理。

2.7. Netty 的 EventLoopGroup 組件
在基于 Netty 的 TCP Server 代碼中,包含了兩個 EventLoopGroup——bossGroup 和 workerGroup,EventLoopGroup 是一組 EventLoop 的抽象。
追蹤 Netty 的 EventLoop 的繼承鏈,可以發(fā)現(xiàn) EventLoop 最終繼承于 JUC Executor,因此 EventLoop 本質(zhì)就是一個 JUC Executor,即線程,JUC Executor 的源碼為:
public?interface?Executor?{
????/**
?????*?Executes?the?given?command?at?some?time?in?the?future.
?????*/
????void?execute(Runnable?command);
}
Netty 為了更好地利用多核 CPU 的性能,一般會有多個 EventLoop 同時工作,每個 EventLoop 維護著一個 Selector 實例,Selector 實例監(jiān)聽注冊其上的 Channel 的 IO 事件。
EventLoopGroup 含有一個 next 方法,它的作用是按照一定規(guī)則從 Group 中選取一個 EventLoop 處理 IO 事件。
在服務端,通常 Boss EventLoopGroup 只包含一個 Boss EventLoop(單線程),該 EventLoop 維護者一個注冊了 ServerSocketChannel 的 Selector 實例。該 EventLoop 不斷輪詢 Selector 得到 OP_ACCEPT 事件(客戶端連接事件),然后將接收到的 SocketChannel 交給 Worker EventLoopGroup,Worker EventLoopGroup 會通過 next()方法選取一個 Worker EventLoop 并將這個 SocketChannel 注冊到其中的 Selector 上,由這個 Worker EventLoop 負責該 SocketChannel 上后續(xù)的 IO 事件處理。整個過程如下圖所示:

2.8. Netty 的 TaskQueue
在 Netty 的每一個 NioEventLoop 中都有一個 TaskQueue,設計它的目的是在任務提交的速度大于線程的處理速度的時候起到緩沖作用?;蛘哂糜诋惒降靥幚?Selector 監(jiān)聽到的 IO 事件。

Netty 中的任務隊列有三種使用場景:
1)處理用戶程序的自定義普通任務的時候
2)處理用戶程序的自定義定時任務的時候
3)非當前 Reactor 線程調(diào)用當前 Channel 的各種方法的時候。
對于第一種場景,舉個例子,2.4 節(jié)的基于 Netty 編寫的服務端的 Handler 中,假如 channelRead 方法中執(zhí)行的過程很耗時,那么以下的阻塞式處理方式無疑會降低當前 NioEventLoop 的并發(fā)度:
/**
?*?當通道有數(shù)據(jù)可讀時執(zhí)行
?*
?*?@param?ctx?上下文對象
?*?@param?msg?客戶端發(fā)送的數(shù)據(jù)
?*?@throws?Exception
?*/
@Override
public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)
????????throws?Exception?{
????//?借助休眠模擬耗時操作
????Thread.sleep(LONG_TIME);
????ByteBuf?byteBuf?=?(ByteBuf)?msg;
????System.out.println("data?from?client:?"
????????????+?byteBuf.toString(CharsetUtil.UTF_8));
}
改進方法就是借助任務隊列,代碼如下:
/**
?*?當通道有數(shù)據(jù)可讀時執(zhí)行
?*
?*?@param?ctx?上下文對象
?*?@param?msg?客戶端發(fā)送的數(shù)據(jù)
?*?@throws?Exception
?*/
@Override
public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)
????????throws?Exception?{
????//?假如這里的處理非常耗時,那么就需要借助任務隊列異步執(zhí)行
????final?Object?finalMsg?=?msg;
????//?通過?ctx.channel().eventLoop().execute()將耗時
????//?操作放入任務隊列異步執(zhí)行
????ctx.channel().eventLoop().execute(new?Runnable()?{
????????public?void?run()?{
????????????//?借助休眠模擬耗時操作
????????????try?{
????????????????Thread.sleep(LONG_TIME);
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????????ByteBuf?byteBuf?=?(ByteBuf)?finalMsg;
????????????System.out.println("data?from?client:?"
????????????????????+?byteBuf.toString(CharsetUtil.UTF_8));
????????}
????});
????
????//?可以繼續(xù)調(diào)用?ctx.channel().eventLoop().execute()
????//?將更多操作放入隊列
????
????System.out.println("return?right?now.");
}
斷點跟蹤這個函數(shù)的執(zhí)行,可以發(fā)現(xiàn)該耗時任務確實被放入的當前 NioEventLoop 的 taskQueue 中了。

對于第二種場景,舉個例子,2.4 節(jié)的基于 Netty 編寫的服務端的 Handler 中,假如 channelRead 方法中執(zhí)行的過程并不需要立即執(zhí)行,而是要定時執(zhí)行,那么代碼可以這樣寫:
/**
?*?當通道有數(shù)據(jù)可讀時執(zhí)行
?*
?*?@param?ctx?上下文對象
?*?@param?msg?客戶端發(fā)送的數(shù)據(jù)
?*?@throws?Exception
?*/
@Override
public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)
????????throws?Exception?{
????final?Object?finalMsg?=?msg;
????//?通過?ctx.channel().eventLoop().schedule()將操作
????//?放入任務隊列定時執(zhí)行(5min?之后才進行處理)
????ctx.channel().eventLoop().schedule(new?Runnable()?{
????????public?void?run()?{
????????????ByteBuf?byteBuf?=?(ByteBuf)?finalMsg;
????????????System.out.println("data?from?client:?"
????????????????????+?byteBuf.toString(CharsetUtil.UTF_8));
????????}
????},?5,?TimeUnit.MINUTES);
????
????//?可以繼續(xù)調(diào)用?ctx.channel().eventLoop().schedule()
????//?將更多操作放入隊列
????System.out.println("return?right?now.");
}
斷點跟蹤這個函數(shù)的執(zhí)行,可以發(fā)現(xiàn)該定時任務確實被放入的當前 NioEventLoop 的 scheduleTasjQueue 中了。

對于第三種場景,舉個例子,比如在基于 Netty 構建的推送系統(tǒng)的業(yè)務線程中,要根據(jù)用戶標識,找到對應的 SocketChannel 引用,然后調(diào)用 write 方法向該用戶推送消息,這時候就會將這一 write 任務放在任務隊列中,write 任務最終被異步消費。這種情形是對前兩種情形的應用,且涉及的業(yè)務內(nèi)容太多,不再給出示例代碼,讀者有興趣可以自行完成,這里給出以下提示:

2.9. Netty 的 Future 和 Promise
Netty**對使用者提供的多數(shù) IO 接口(即 Netty Channel 中的 IO 方法)**是異步的(即都立即返回一個 Netty Future,而 IO 過程異步進行),因此,調(diào)用者調(diào)用 IO 操作后是不能直接拿到調(diào)用結果的。要想得到 IO 操作結果,可以借助 Netty 的 Future(上面代碼中的 ChannelFuture 就繼承了 Netty Future,Netty Future 又繼承了 JUC Future)查詢執(zhí)行狀態(tài)、等待執(zhí)行結果、獲取執(zhí)行結果等,使用過 JUC Future 接口的同學會非常熟悉這個機制,這里不再展開描述了。也可以通過 Netty Future 的 addListener()添加一個回調(diào)方法來異步處理 IO 結果,如下:
//?啟動客戶端去連接服務器端
//?由于?bootstrap.connect()是一個異步操作,因此用.sync()等待
//?這個異步操作完成
final?ChannelFuture?channelFuture?=?bootstrap.connect(
????????"127.0.0.1",
????????8080).sync();
channelFuture.addListener(new?ChannelFutureListener()?{
????/**
?????*?回調(diào)方法,上面的?bootstrap.connect()操作執(zhí)行完之后觸發(fā)
?????*/
????public?void?operationComplete(ChannelFuture?future)
????????????throws?Exception?{
????????if?(channelFuture.isSuccess())?{
????????????System.out.println("client?has?connected?to?server!");
????????????//?TODO?其他處理
????????}?else?{
????????????System.out.println("connect?to?serverfail!");
????????????//?TODO?其他處理
????????}
????}
});
Netty Future 提供的接口有:

注:會有一些資料給出這樣的描述:“Netty 中所有的 IO 操作都是異步的”,這顯然是錯誤的。Netty 基于 Java NIO,Java NIO 是同步非阻塞 IO。Netty 基于 Java NIO 做了封裝,向使用者提供了異步特性的接口,因此本文說 Netty**對使用者提供的多數(shù) IO 接口(即 Netty Channel 中的 IO 方法)**是異步的。例如在 io.netty.channel.ChannelOutboundInvoker(Netty Channel 的 IO 方法多繼承于此)提供的多數(shù) IO 接口都返回 Netty Future:
Promise 是可寫的 Future,F(xiàn)uture 自身并沒有寫操作相關的接口,Netty 通過 Promise 對 Future 進行擴展,用于設置 IO 操作的結果。Future 繼承了 Future,相關的接口定義如下圖所示,相比于上圖 Future 的接口,它多出了一些 setXXX 方法:

Netty 發(fā)起 IO 寫操作的時候,會創(chuàng)建一個新的 Promise 對象,例如調(diào)用 ChannelHandlerContext 的 write(Object object)方法時,會創(chuàng)建一個新的 ChannelPromise,相關代碼如下:
@Override
public?ChannelFuture?write(Object?msg)?{
????return?write(msg,?newPromise());
}
......
@Override
public?ChannelPromise?newPromise()?{
????return?new?DefaultChannelPromise(channel(),?executor());
}
......
當 IO 操作發(fā)生異?;蛘咄瓿蓵r,通過 Promise.setSuccess()或者 Promise.setFailure()設置結果,并通知所有 Listener。關于 Netty 的 Future/Promise 的工作原理,我將在下一篇文章中進行源碼級的解析。
3. 結束語
我想,到此為止,讀者再次看到這幅 Netty 的架構圖會有不一樣的感覺。它變得簡潔、生動、優(yōu)雅,因為你已經(jīng)熟知了它的細節(jié)和運作流程。

參考資料:
Netty官網(wǎng)文檔,https://netty.io/wiki/all-documents.html 《Netty權威指南(第一版)》,李林鋒 《Netty in Action》,Norman Maurer 《Scalable IO in Java》,Doug Lea 尚硅谷Netty系列教程,韓順平主講
最后歡迎大家關注我的公號,加我好友:「GG_Stone」,一起交流,共同進步!

往期推薦
2020-11-30
2020-12-01
2020-11-27

