NIO 知識(shí)點(diǎn)一網(wǎng)打盡

前言
這段時(shí)間也一直在學(xué)習(xí)Netty相關(guān)知識(shí),因?yàn)樯婕爸R(shí)點(diǎn)比較多,也走了不少?gòu)澛?。目前網(wǎng)上關(guān)于Netty學(xué)習(xí)資料玲瑯滿(mǎn)目,不知如何下手,其實(shí)大家都是一樣的,學(xué)習(xí)方法和技巧都是總結(jié)出來(lái)的,我們?cè)跊](méi)有找到很好的方法之前不如按部就班先從基礎(chǔ)開(kāi)始,一般從總分總的漸進(jìn)方式,既觀(guān)森林,又見(jiàn)草木。
Netty是一款提供異步的、事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具,是基于NIO客戶(hù)端、服務(wù)器端的編程框架。所以這里我們先以NIO和依賴(lài)相關(guān)的基礎(chǔ)鋪墊來(lái)進(jìn)行剖析講解,從而作為Netty學(xué)習(xí)之旅的一個(gè)開(kāi)端。
一、網(wǎng)絡(luò)編程基礎(chǔ)回顧
1. Socket
Socket本身有“插座”的意思,不是Java中特有的概念,而是一個(gè)語(yǔ)言無(wú)關(guān)的標(biāo)準(zhǔn),任何可以實(shí)現(xiàn)網(wǎng)絡(luò)編程的編程語(yǔ)言都有Socket。在Linux環(huán)境下,用于表示進(jìn)程間網(wǎng)絡(luò)通信的特殊文件類(lèi)型,其本質(zhì)為內(nèi)核借助緩沖區(qū)形成的偽文件。既然是文件,那么理所當(dāng)然的,我們可以使用文件描述符引用套接字。
與管道類(lèi)似的,Linux系統(tǒng)將其封裝成文件的目的是為了統(tǒng)一接口,使得讀寫(xiě)套接字和讀寫(xiě)文件的操作一致。區(qū)別是管道主要應(yīng)用于本地進(jìn)程間通信,而套接字多應(yīng)用于網(wǎng)絡(luò)進(jìn)程間數(shù)據(jù)的傳遞。
可以這么理解:Socket就是網(wǎng)絡(luò)上的兩個(gè)應(yīng)用程序通過(guò)一個(gè)雙向通信連接實(shí)現(xiàn)數(shù)據(jù)交換的編程接口API。
Socket通信的基本流程具體步驟如下所示:
(1)服務(wù)端通過(guò)Listen開(kāi)啟監(jiān)聽(tīng),等待客戶(hù)端接入。
(2)客戶(hù)端的套接字通過(guò)Connect連接服務(wù)器端的套接字,服務(wù)端通過(guò)Accept接收客戶(hù)端連接。在connect-accept過(guò)程中,操作系統(tǒng)將會(huì)進(jìn)行三次握手。
(3)客戶(hù)端和服務(wù)端通過(guò)write和read發(fā)送和接收數(shù)據(jù),操作系統(tǒng)將會(huì)完成TCP數(shù)據(jù)的確認(rèn)、重發(fā)等步驟。
(4)通過(guò)close關(guān)閉連接,操作系統(tǒng)會(huì)進(jìn)行四次揮手。
針對(duì)Java編程語(yǔ)言,java.net包是網(wǎng)絡(luò)編程的基礎(chǔ)類(lèi)庫(kù)。其中ServerSocket和Socket是網(wǎng)絡(luò)編程的基礎(chǔ)類(lèi)型。
SeverSocket是服務(wù)端應(yīng)用類(lèi)型。Socket是建立連接的類(lèi)型。當(dāng)連接建立成功后,服務(wù)器和客戶(hù)端都會(huì)有一個(gè)Socket對(duì)象示例,可以通過(guò)這個(gè)Socket對(duì)象示例,完成會(huì)話(huà)的所有操作。對(duì)于一個(gè)完整的網(wǎng)絡(luò)連接來(lái)說(shuō),Socket是平等的,沒(méi)有服務(wù)器客戶(hù)端分級(jí)情況。
2. IO模型介紹
對(duì)于一次IO操作,數(shù)據(jù)會(huì)先拷貝到內(nèi)核空間中,然后再?gòu)膬?nèi)核空間拷貝到用戶(hù)空間中,所以一次read操作,會(huì)經(jīng)歷兩個(gè)階段:
(1)等待數(shù)據(jù)準(zhǔn)備
(2)數(shù)據(jù)從內(nèi)核空間拷貝到用戶(hù)空間
基于以上兩個(gè)階段就產(chǎn)生了五種不同的IO模式。
阻塞IO:從進(jìn)程發(fā)起IO操作,一直等待上述兩個(gè)階段完成,此時(shí)兩階段一起阻塞。 非阻塞IO:進(jìn)程一直詢(xún)問(wèn)IO準(zhǔn)備好了沒(méi)有,準(zhǔn)備好了再發(fā)起讀取操作,這時(shí)才把數(shù)據(jù)從內(nèi)核空間拷貝到用戶(hù)空間。第一階段不阻塞但要輪詢(xún),第二階段阻塞。 多路復(fù)用IO:多個(gè)連接使用同一個(gè)select去詢(xún)問(wèn)IO準(zhǔn)備好了沒(méi)有,如果有準(zhǔn)備好了的,就返回有數(shù)據(jù)準(zhǔn)備好了,然后對(duì)應(yīng)的連接再發(fā)起讀取操作,把數(shù)據(jù)從內(nèi)核空間拷貝到用戶(hù)空間。兩階段分開(kāi)阻塞。 信號(hào)驅(qū)動(dòng)IO:進(jìn)程發(fā)起讀取操作會(huì)立即返回,當(dāng)數(shù)據(jù)準(zhǔn)備好了會(huì)以通知的形式告訴進(jìn)程,進(jìn)程再發(fā)起讀取操作,把數(shù)據(jù)從內(nèi)核空間拷貝到用戶(hù)空間。第一階段不阻塞,第二階段阻塞。 異步IO:進(jìn)程發(fā)起讀取操作會(huì)立即返回,等到數(shù)據(jù)準(zhǔn)備好且已經(jīng)拷貝到用戶(hù)空間了再通知進(jìn)程拿數(shù)據(jù)。兩個(gè)階段都不阻塞。
這五種IO模式不難發(fā)現(xiàn)存在這兩對(duì)關(guān)系:同步和異步、阻塞和非阻塞。那么稍微解釋一下:
同步和異步
同步: 同步就是發(fā)起一個(gè)調(diào)用后,被調(diào)用者未處理完請(qǐng)求之前,調(diào)用不返回。 異步: 異步就是發(fā)起一個(gè)調(diào)用后,立刻得到被調(diào)用者的回應(yīng)表示已接收到請(qǐng)求,但是被調(diào)用者并沒(méi)有返回結(jié)果,此時(shí)我們可以處理其他的請(qǐng)求,被調(diào)用者通常依靠事件,回調(diào)等機(jī)制來(lái)通知調(diào)用者其返回結(jié)果。
同步和異步的區(qū)別最大在于異步的話(huà)調(diào)用者不需要等待處理結(jié)果,被調(diào)用者會(huì)通過(guò)回調(diào)等機(jī)制來(lái)通知調(diào)用者其返回結(jié)果。
阻塞和非阻塞
阻塞: 阻塞就是發(fā)起一個(gè)請(qǐng)求,調(diào)用者一直等待請(qǐng)求結(jié)果返回,也就是當(dāng)前線(xiàn)程會(huì)被掛起,無(wú)法從事其他任務(wù),只有當(dāng)條件就緒才能繼續(xù)。 非阻塞: 非阻塞就是發(fā)起一個(gè)請(qǐng)求,調(diào)用者不用一直等著結(jié)果返回,可以先去干其他事情。
阻塞和非阻塞是針對(duì)進(jìn)程在訪(fǎng)問(wèn)數(shù)據(jù)的時(shí)候,根據(jù)IO操作的就緒狀態(tài)來(lái)采取的不同方式,說(shuō)白了是一種讀取或者寫(xiě)入操作方法的實(shí)現(xiàn)方式,阻塞方式下讀取或者寫(xiě)入函數(shù)將一直等待,而非阻塞方式下,讀取或者寫(xiě)入方法會(huì)立即返回一個(gè)狀態(tài)值。
如果組合后的同步阻塞(blocking-IO)簡(jiǎn)稱(chēng)BIO、同步非阻塞(non-blocking-IO)簡(jiǎn)稱(chēng)NIO和異步非阻塞(asynchronous-non-blocking-IO)簡(jiǎn)稱(chēng)AIO又代表什么意思呢?
BIO (同步阻塞I/O模式): 數(shù)據(jù)的讀取寫(xiě)入必須阻塞在一個(gè)線(xiàn)程內(nèi)等待其完成。這里使用那個(gè)經(jīng)典的燒開(kāi)水例子,這里假設(shè)一個(gè)燒開(kāi)水的場(chǎng)景,有一排水壺在燒開(kāi)水,BIO的工作模式就是, 叫一個(gè)線(xiàn)程停留在一個(gè)水壺那,直到這個(gè)水壺?zé)_(kāi),才去處理下一個(gè)水壺。但是實(shí)際上線(xiàn)程在等待水壺?zé)_(kāi)的時(shí)間段什么都沒(méi)有做。 NIO(同步非阻塞): 同時(shí)支持阻塞與非阻塞模式,但這里我們以其同步非阻塞I/O模式來(lái)說(shuō)明,那么什么叫做同步非阻塞?如果還拿燒開(kāi)水來(lái)說(shuō),NIO的做法是叫一個(gè)線(xiàn)程不斷的輪詢(xún)每個(gè)水壺的狀態(tài),看看是否有水壺的狀態(tài)發(fā)生了改變,從而進(jìn)行下一步的操作。 AIO(異步非阻塞I/O模型): 異步非阻塞與同步非阻塞的區(qū)別在哪里?異步非阻塞無(wú)需一個(gè)線(xiàn)程去輪詢(xún)所有IO操作的狀態(tài)改變,在相應(yīng)的狀態(tài)改變后,系統(tǒng)會(huì)通知對(duì)應(yīng)的線(xiàn)程來(lái)處理。對(duì)應(yīng)到燒開(kāi)水中就是,為每個(gè)水壺上面裝了一個(gè)開(kāi)關(guān),水燒開(kāi)之后,水壺會(huì)自動(dòng)通知我水燒開(kāi)了。
java 中的 BIO、NIO和AIO理解為是 Java 語(yǔ)言在操作系統(tǒng)層面對(duì)這三種 IO 模型的封裝。程序員在使用這些 封裝API 的時(shí)候,不需要關(guān)心操作系統(tǒng)層面的知識(shí),也不需要根據(jù)不同操作系統(tǒng)編寫(xiě)不同的代碼,只需要使用Java的API就可以了。由此,為了使讀者對(duì)這三種模型有個(gè)比較具體和遞推式的了解,并且和本文主題NIO有個(gè)清晰的對(duì)比,下面繼續(xù)延伸。
Java BIO
BIO編程方式通常是是Java的上古產(chǎn)品,自JDK 1.0-JDK1.4就有的東西。編程實(shí)現(xiàn)過(guò)程為:首先在服務(wù)端啟動(dòng)一個(gè)ServerSocket來(lái)監(jiān)聽(tīng)網(wǎng)絡(luò)請(qǐng)求,客戶(hù)端啟動(dòng)Socket發(fā)起網(wǎng)絡(luò)請(qǐng)求,默認(rèn)情況下SeverSocket會(huì)建立一個(gè)線(xiàn)程來(lái)處理此請(qǐng)求,如果服務(wù)端沒(méi)有線(xiàn)程可用,客戶(hù)端則會(huì)阻塞等待或遭到拒絕。服務(wù)器實(shí)現(xiàn)模式為一個(gè)連接一個(gè)線(xiàn)程,即客戶(hù)端有連接請(qǐng)求時(shí)服務(wù)器端就需要啟動(dòng)一個(gè)線(xiàn)程進(jìn)行處理。大致結(jié)構(gòu)如下:

如果要讓 BIO 通信模型能夠同時(shí)處理多個(gè)客戶(hù)端請(qǐng)求,就必須使用多線(xiàn)程(主要原因是 socket.accept()、socket.read()、 socket.write() 涉及的三個(gè)主要函數(shù)都是同步阻塞的),也就是說(shuō)它在接收到客戶(hù)端連接請(qǐng)求之后為每個(gè)客戶(hù)端創(chuàng)建一個(gè)新的線(xiàn)程進(jìn)行鏈路處理,處理完成之后,通過(guò)輸出流返回應(yīng)答給客戶(hù)端,線(xiàn)程銷(xiāo)毀。這就是典型的 一請(qǐng)求一應(yīng)答通信模型 。我們可以設(shè)想一下如果這個(gè)連接不做任何事情的話(huà)就會(huì)造成不必要的線(xiàn)程開(kāi)銷(xiāo),不過(guò)可以通過(guò)線(xiàn)程池機(jī)制改善,線(xiàn)程池還可以讓線(xiàn)程的創(chuàng)建和回收成本相對(duì)較低。使用線(xiàn)程池機(jī)制改善后的 BIO 模型圖如下:

BIO方式適用于連接數(shù)目比較小且固定的架構(gòu),這種方式對(duì)服務(wù)器資源要求比較高,并發(fā)局限于應(yīng)用中,是JDK1.4以前的唯一選擇,但程序直觀(guān)簡(jiǎn)單易懂。Java BIO編程示例網(wǎng)上很多,這里就不進(jìn)行coding舉例了,畢竟后面NIO才是重點(diǎn)。
Java NIO
NIO(New IO或者No-Blocking IO),從JDK1.4 開(kāi)始引入的非阻塞IO,是一種非阻塞+ 同步的通信模式。這里的No Blocking IO用于區(qū)分上面的BIO。
NIO本身想解決 BIO的并發(fā)問(wèn)題,通過(guò)Reactor模式的事件驅(qū)動(dòng)機(jī)制來(lái)達(dá)到Non Blocking的。當(dāng) socket 有流可讀或可寫(xiě)入 socket 時(shí),操作系統(tǒng)會(huì)相應(yīng)的通知應(yīng)用程序進(jìn)行處理,應(yīng)用再將流讀取到緩沖區(qū)或?qū)懭氩僮飨到y(tǒng)。
也就是說(shuō),這個(gè)時(shí)候,已經(jīng)不是一個(gè)連接就 要對(duì)應(yīng)一個(gè)處理線(xiàn)程了,而是有效的請(qǐng)求,對(duì)應(yīng)一個(gè)線(xiàn)程,當(dāng)連接沒(méi)有數(shù)據(jù)時(shí),是沒(méi)有工作線(xiàn)程來(lái)處理的。
當(dāng)一個(gè)連接創(chuàng)建后,不需要對(duì)應(yīng)一個(gè)線(xiàn)程,這個(gè)連接會(huì)被注冊(cè)到 多路復(fù)用器上面,所以所有的連接只需要一個(gè)線(xiàn)程就可以搞定,當(dāng)這個(gè)線(xiàn)程中的多路復(fù)用器 進(jìn)行輪詢(xún)的時(shí)候,發(fā)現(xiàn)連接上有請(qǐng)求的話(huà),才開(kāi)啟一個(gè)線(xiàn)程進(jìn)行處理,也就是一個(gè)請(qǐng)求一個(gè)線(xiàn)程模式。
NIO提供了與傳統(tǒng)BIO模型中的Socket和ServerSocket相對(duì)應(yīng)的SocketChannel和ServerSocketChannel兩種不同的套接字通道實(shí)現(xiàn),如下圖結(jié)構(gòu)所示。這里涉及的Reactor設(shè)計(jì)模式、多路復(fù)用Selector、Buffer等暫時(shí)不用管,后面會(huì)講到。

NIO 方式適用于連接數(shù)目多且連接比較短(輕操作)的架構(gòu),比如聊天服務(wù)器,并發(fā)局 限于應(yīng)用中,編程復(fù)雜,JDK1.4 開(kāi)始支持。同時(shí),NIO和普通IO的區(qū)別主要可以從存儲(chǔ)數(shù)據(jù)的載體、是否阻塞等來(lái)區(qū)分:

Java AIO
與 NIO 不同,當(dāng)進(jìn)行讀寫(xiě)操作時(shí),只須直接調(diào)用 API 的 read 或 write 方法即可。這兩種方法均為異步的,對(duì)于讀操作而言,當(dāng)有流可讀取時(shí),操作系統(tǒng)會(huì)將可讀的流傳入 read 方 法的緩沖區(qū),并通知應(yīng)用程序;對(duì)于寫(xiě)操作而言,當(dāng)操作系統(tǒng)將 write 方法傳遞的流寫(xiě)入完畢時(shí),操作系統(tǒng)主動(dòng)通知應(yīng)用程序。即可以理解為,read/write 方法都是異步的,完成后會(huì)主動(dòng)調(diào)用回調(diào)函數(shù)。在 JDK7 中,提供了異步文件通道和異步套接字通道的實(shí)現(xiàn),這部分內(nèi)容被稱(chēng)作 NIO.
AIO 方式使用于連接數(shù)目多且連接比較長(zhǎng)(重操作)的架構(gòu),比如相冊(cè)服務(wù)器,充分調(diào)用 OS 參與并發(fā)操作,編程比較復(fù)雜,JDK7 開(kāi)始支持。
目前來(lái)說(shuō) AIO 的應(yīng)用還不是很廣泛,Netty 之前也嘗試使用過(guò) AIO,不過(guò)又放棄了。
二、NIO核心組件介紹
1. Channel
在NIO中,基本所有的IO操作都是從Channel開(kāi)始的,Channel通過(guò)Buffer(緩沖區(qū))進(jìn)行讀寫(xiě)操作。
read()表示讀取通道中數(shù)據(jù)到緩沖區(qū),write()表示把緩沖區(qū)數(shù)據(jù)寫(xiě)入到通道。

Channel有好多實(shí)現(xiàn)類(lèi),這里有三個(gè)最常用:
SocketChannel:一個(gè)客戶(hù)端發(fā)起TCP連接的ChannelServerSocketChannel:一個(gè)服務(wù)端監(jiān)聽(tīng)新連接的TCP Channel,對(duì)于每一個(gè)新的Client連接,都會(huì)建立一個(gè)對(duì)應(yīng)的SocketChannelFileChannel:從文件中讀寫(xiě)數(shù)據(jù)
其中SocketChannel和ServerSocketChannel是網(wǎng)絡(luò)編程中最常用的,一會(huì)在最后的示例代碼中會(huì)有講解到具體用法。
2. Buffer
概念
Buffer也被成為內(nèi)存緩沖區(qū),本質(zhì)上就是內(nèi)存中的一塊,我們可以將數(shù)據(jù)寫(xiě)入這塊內(nèi)存,之后從這塊內(nèi)存中讀取數(shù)據(jù)。也可以將這塊內(nèi)存封裝成NIO Buffer對(duì)象,并提供一組常用的方法,方便我們對(duì)該塊內(nèi)存進(jìn)行讀寫(xiě)操作。
Buffer在java.nio中被定義為抽象類(lèi):

我們可以將Buffer理解為一個(gè)數(shù)組的封裝,我們最常用的ByteBuffer對(duì)應(yīng)的數(shù)據(jù)結(jié)構(gòu)就是byte[]
屬性
Buffer中有4個(gè)非常重要的屬性:capacity、limit、position、mark

capacity屬性:容量,Buffer能夠容納的數(shù)據(jù)元素的最大值,在Buffer初始化創(chuàng)建的時(shí)候被賦值,而且不能被修改。

上圖中,初始化Buffer的容量為8(圖中從0~7,共8個(gè)元素),所以capacity = 8
limit屬性:代表Buffer可讀可寫(xiě)的上限。寫(xiě)模式下: limit代表能寫(xiě)入數(shù)據(jù)的上限位置,這個(gè)時(shí)候limit = capacity讀模式下:在Buffer完成所有數(shù)據(jù)寫(xiě)入后,通過(guò)調(diào)用flip()方法,切換到讀模式,此時(shí)limit等于Buffer中實(shí)際已經(jīng)寫(xiě)入的數(shù)據(jù)大小。因?yàn)?code style="font-size: 14px;overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(239, 112, 96);">Buffer可能沒(méi)有被寫(xiě)滿(mǎn),所以limit<=capacityposition屬性:代表讀取或者寫(xiě)入Buffer的位置。默認(rèn)為0。寫(xiě)模式下:每往 Buffer中寫(xiě)入一個(gè)值,position就會(huì)自動(dòng)加1,代表下一次寫(xiě)入的位置。讀模式下:每往 Buffer中讀取一個(gè)值,position就自動(dòng)加1,代表下一次讀取的位置。

從上圖就能很清晰看出,讀寫(xiě)模式下capacity、limit、position的關(guān)系了。
mark屬性:代表標(biāo)記,通過(guò)mark()方法,記錄當(dāng)前position值,將position值賦值給mark,在后續(xù)的寫(xiě)入或讀取過(guò)程中,可以通過(guò)reset()方法恢復(fù)當(dāng)前position為mark記錄的值。
這幾個(gè)重要屬性講完,我們可以再來(lái)回顧下:
0 <= mark <= position <= limit <= capacity
現(xiàn)在應(yīng)該很清晰這幾個(gè)屬性的關(guān)系了~
Buffer常見(jiàn)操作
創(chuàng)建Buffer
allocate(int capacity)
ByteBuffer?buffer?=?ByteBuffer.allocate(1024);
int?count?=?channel.read(buffer);
例子中創(chuàng)建的ByteBuffer是基于堆內(nèi)存的一個(gè)對(duì)象。
wrap(array)
wrap方法可以將數(shù)組包裝成一個(gè)Buffer對(duì)象:
ByteBuffer?buffer?=?ByteBuffer.wrap("hello?world".getBytes());
channel.write(buffer);
allocateDirect(int capacity)
通過(guò)allocateDirect方法也可以快速實(shí)例化一個(gè)Buffer對(duì)象,和allocate很相似,這里區(qū)別的是allocateDirect創(chuàng)建的是基于堆外內(nèi)存的對(duì)象。
堆外內(nèi)存不在JVM堆上,不受GC的管理。堆外內(nèi)存進(jìn)行一些底層系統(tǒng)的IO操作時(shí),效率會(huì)更高。
Buffer寫(xiě)操作
Buffer寫(xiě)入可以通過(guò)put()和channel.read(buffer)兩種方式寫(xiě)入。
通常我們NIO的讀操作的時(shí)候,都是從Channel中讀取數(shù)據(jù)寫(xiě)入Buffer,這個(gè)對(duì)應(yīng)的是Buffer的寫(xiě)操作。
Buffer讀操作
Buffer讀取可以通過(guò)get()和channel.write(buffer)兩種方式讀入。
還是同上,我們對(duì)Buffer的讀入操作,反過(guò)來(lái)說(shuō)就是對(duì)Channel的寫(xiě)操作。讀取Buffer中的數(shù)據(jù)然后寫(xiě)入Channel中。

其他常見(jiàn)方法
rewind():重置position位置為0,可以重新讀取和寫(xiě)入buffer,一般該方法適用于讀操作,可以理解為對(duì)buffer的重復(fù)讀。
public?final?Buffer?rewind()?{
????position?=?0;
????mark?=?-1;
????return?this;
}
flip():很常用的一個(gè)方法,一般在寫(xiě)模式切換到讀模式的時(shí)候會(huì)經(jīng)常用到。也會(huì)將position設(shè)置為0,然后設(shè)置limit等于原來(lái)寫(xiě)入的position。
public?final?Buffer?flip()?{
????limit?=?position;
????position?=?0;
????mark?=?-1;
????return?this;
}
clear():重置buffer中的數(shù)據(jù),該方法主要是針對(duì)于寫(xiě)模式,因?yàn)閘imit設(shè)置為了capacity,讀模式下會(huì)出問(wèn)題。
public?final?Buffer?clear()?{
????position?=?0;
????limit?=?capacity;
????mark?=?-1;
????return?this;
}
mark()&reset():mark()方法是保存當(dāng)前position到變量markz中,然后通過(guò)reset()方法恢復(fù)當(dāng)前position為mark,實(shí)現(xiàn)代碼很簡(jiǎn)單,如下:
public?final?Buffer?mark()?{
????mark?=?position;
????return?this;
}
public?final?Buffer?reset()?{
????int?m?=?mark;
????if?(m?0)
????????throw?new?InvalidMarkException();
????position?=?m;
????return?this;
}
常用的讀寫(xiě)方法可以用一張圖總結(jié)一下:

3. Selector
概念
Selector是NIO中最為重要的組件之一,我們常常說(shuō)的多路復(fù)用器就是指的Selector組件。Selector組件用于輪詢(xún)一個(gè)或多個(gè)NIO Channel的狀態(tài)是否處于可讀、可寫(xiě)。通過(guò)輪詢(xún)的機(jī)制就可以管理多個(gè)Channel,也就是說(shuō)可以管理多個(gè)網(wǎng)絡(luò)連接。

輪詢(xún)機(jī)制
首先,需要將Channel注冊(cè)到Selector上,這樣Selector才知道需要管理哪些Channel 接著Selector會(huì)不斷輪詢(xún)其上注冊(cè)的Channel,如果某個(gè)Channel發(fā)生了讀或?qū)懙臅r(shí)間,這個(gè)Channel就會(huì)被Selector輪詢(xún)出來(lái),然后通過(guò)SelectionKey可以獲取就緒的Channel集合,進(jìn)行后續(xù)的IO操作。

屬性操作
創(chuàng)建Selector
通過(guò)open()方法,我們可以創(chuàng)建一個(gè)Selector對(duì)象。
Selector?selector?=?Selector.open();
注冊(cè)Channel到Selector中
我們需要將Channel注冊(cè)到Selector中,才能夠被Selector管理。
channel.configureBlocking(false);
SelectionKey?key?=?channel.register(selector,?SelectionKey.OP_READ);
某個(gè)Channel要注冊(cè)到Selector中,那么該Channel必須是非阻塞,所有上面代碼中有個(gè)configureBlocking()的配置操作。
在register(Selector selector, int interestSet)方法的第二個(gè)參數(shù),標(biāo)識(shí)一個(gè)interest集合,意思是Selector對(duì)哪些事件感興趣,可以監(jiān)聽(tīng)四種不同類(lèi)型的事件:
public?static?final?int?OP_READ?=?1?<0;
public?static?final?int?OP_WRITE?=?1?<;
public?static?final?int?OP_CONNECT?=?1?<3;
public?static?final?int?OP_ACCEPT?=?1?<4;
Connect事件:連接完成事件( TCP 連接 ),僅適用于客戶(hù)端,對(duì)應(yīng) SelectionKey.OP_CONNECT。Accept事件:接受新連接事件,僅適用于服務(wù)端,對(duì)應(yīng) SelectionKey.OP_ACCEPT 。Read事件:讀事件,適用于兩端,對(duì)應(yīng) SelectionKey.OP_READ ,表示 Buffer 可讀。Write事件:寫(xiě)時(shí)間,適用于兩端,對(duì)應(yīng) SelectionKey.OP_WRITE ,表示 Buffer 可寫(xiě)。
Channel觸發(fā)了一個(gè)事件,表明該時(shí)間已經(jīng)準(zhǔn)備就緒:
一個(gè)Client Channel成功連接到另一個(gè)服務(wù)器,成為“連接就緒” 一個(gè)Server Socket準(zhǔn)備好接收新進(jìn)入的接,稱(chēng)為“接收就緒” 一個(gè)有數(shù)據(jù)可讀的Channel,稱(chēng)為“讀就緒” 一個(gè)等待寫(xiě)數(shù)據(jù)的Channel,稱(chēng)為”寫(xiě)就緒“
當(dāng)然,Selector是可以同時(shí)對(duì)多個(gè)事件感興趣的,我們使用或運(yùn)算即可組合多個(gè)事件:
int?interestSet?=?SelectionKey.OP_READ?|?SelectionKey.OP_WRITE;
Selector其他一些操作
選擇Channel
public?abstract?int?select()?throws?IOException;
public?abstract?int?select(long?timeout)?throws?IOException;
public?abstract?int?selectNow()?throws?IOException;
當(dāng)Selector執(zhí)行select()方法就會(huì)產(chǎn)生阻塞,等到注冊(cè)在其上的Channel準(zhǔn)備就緒就會(huì)立即返回,返回準(zhǔn)備就緒的數(shù)量。
select(long timeout)則是在select()的基礎(chǔ)上增加了超時(shí)機(jī)制。selectNow()立即返回,不產(chǎn)生阻塞。
有一點(diǎn)非常需要注意: select 方法返回的 int 值,表示有多少 Channel 已經(jīng)就緒。
自上次調(diào)用select 方法后有多少 Channel 變成就緒狀態(tài)。如果調(diào)用 select 方法,因?yàn)橛幸粋€(gè) Channel 變成就緒狀態(tài)則返回了 1 ;
若再次調(diào)用 select 方法,如果另一個(gè) Channel 就緒了,它會(huì)再次返回1。
獲取可操作的Channel
Set?selectedKeys?=?selector.selectedKeys();
當(dāng)有新增就緒的Channel,調(diào)用select()方法,就會(huì)將key添加到Set集合中。
三、代碼示例
前面鋪墊了這么多,主要是想讓大家能夠看懂NIO代碼示例,也方便后續(xù)大家來(lái)自己手寫(xiě)NIO 網(wǎng)絡(luò)編程的程序。創(chuàng)建NIO服務(wù)端的主要步驟如下:
1.?打開(kāi)ServerSocketChannel,監(jiān)聽(tīng)客戶(hù)端連接
2.?綁定監(jiān)聽(tīng)端口,設(shè)置連接為非阻塞模式
3.?創(chuàng)建Reactor線(xiàn)程,創(chuàng)建多路復(fù)用器并啟動(dòng)線(xiàn)程
4.?將ServerSocketChannel注冊(cè)到Reactor線(xiàn)程中的Selector上,監(jiān)聽(tīng)ACCEPT事件
5.?Selector輪詢(xún)準(zhǔn)備就緒的key
6.?Selector監(jiān)聽(tīng)到新的客戶(hù)端接入,處理新的接入請(qǐng)求,完成TCP三次握手,建立物理鏈路
7.?設(shè)置客戶(hù)端鏈路為非阻塞模式
8.?將新接入的客戶(hù)端連接注冊(cè)到Reactor線(xiàn)程的Selector上,監(jiān)聽(tīng)讀操作,讀取客戶(hù)端發(fā)送的網(wǎng)絡(luò)消息
9.?異步讀取客戶(hù)端消息到緩沖區(qū)
10.對(duì)Buffer編解碼,處理半包消息,將解碼成功的消息封裝成Task
11.將應(yīng)答消息編碼為Buffer,調(diào)用SocketChannel的write將消息異步發(fā)送給客戶(hù)端
NIOServer.java :
public?class?NIOServer?{
????private?static?Selector?selector;
????public?static?void?main(String[]?args)?{
????????init();
????????listen();
????}
????private?static?void?init()?{
????????ServerSocketChannel?serverSocketChannel?=?null;
????????try?{
????????????selector?=?Selector.open();
????????????serverSocketChannel?=?ServerSocketChannel.open();
????????????serverSocketChannel.configureBlocking(false);
????????????serverSocketChannel.socket().bind(new?InetSocketAddress(9000));
????????????serverSocketChannel.register(selector,?SelectionKey.OP_ACCEPT);
????????????System.out.println("NioServer?啟動(dòng)完成");
????????}?catch?(IOException?e)?{
????????????e.printStackTrace();
????????}
????}
????private?static?void?listen()?{
????????while?(true)?{
????????????try?{
????????????????selector.select();
????????????????Iterator?keysIterator?=?selector.selectedKeys().iterator();
????????????????while?(keysIterator.hasNext())?{
????????????????????SelectionKey?key?=?keysIterator.next();
????????????????????keysIterator.remove();
????????????????????handleRequest(key);
????????????????}
????????????}?catch?(Throwable?t)?{
????????????????t.printStackTrace();
????????????}
????????}
????}
????private?static?void?handleRequest(SelectionKey?key)?throws?IOException?{
????????SocketChannel?channel?=?null;
????????try?{
????????????if?(key.isAcceptable())?{
????????????????ServerSocketChannel?serverSocketChannel?=?(ServerSocketChannel)?key.channel();
????????????????channel?=?serverSocketChannel.accept();
????????????????channel.configureBlocking(false);
????????????????System.out.println("接受新的?Channel");
????????????????channel.register(selector,?SelectionKey.OP_READ);
????????????}
????????????if?(key.isReadable())?{
????????????????channel?=?(SocketChannel)?key.channel();
????????????????ByteBuffer?buffer?=?ByteBuffer.allocate(1024);
????????????????int?count?=?channel.read(buffer);
????????????????if?(count?>?0)?{
????????????????????System.out.println("服務(wù)端接收請(qǐng)求:"?+?new?String(buffer.array(),?0,?count));
????????????????????channel.register(selector,?SelectionKey.OP_WRITE);
????????????????}
????????????}
????????????if?(key.isWritable())?{
????????????????ByteBuffer?buffer?=?ByteBuffer.allocate(1024);
????????????????buffer.put("收到".getBytes());
????????????????buffer.flip();
????????????????channel?=?(SocketChannel)?key.channel();
????????????????channel.write(buffer);
????????????????channel.register(selector,?SelectionKey.OP_READ);
????????????}
????????}?catch?(Throwable?t)?{
????????????t.printStackTrace();
????????????if?(channel?!=?null)?{
????????????????channel.close();
????????????}
????????}
????}
}
NIOClient.java:
public?class?NIOClient?{
????public?static?void?main(String[]?args)?{
????????new?Worker().start();
????}
????static?class?Worker?extends?Thread?{
????????@Override
????????public?void?run()?{
????????????SocketChannel?channel?=?null;
????????????Selector?selector?=?null;
????????????try?{
????????????????channel?=?SocketChannel.open();
????????????????channel.configureBlocking(false);
????????????????selector?=?Selector.open();
????????????????channel.register(selector,?SelectionKey.OP_CONNECT);
????????????????channel.connect(new?InetSocketAddress(9000));
????????????????while?(true)?{
????????????????????selector.select();
????????????????????Iterator?keysIterator?=?selector.selectedKeys().iterator();
????????????????????while?(keysIterator.hasNext())?{
????????????????????????SelectionKey?key?=?keysIterator.next();
????????????????????????keysIterator.remove();
????????????????????????if?(key.isConnectable())?{
????????????????????????????System.out.println();
????????????????????????????channel?=?(SocketChannel)?key.channel();
????????????????????????????if?(channel.isConnectionPending())?{
????????????????????????????????channel.finishConnect();
????????????????????????????????ByteBuffer?buffer?=?ByteBuffer.allocate(1024);
????????????????????????????????buffer.put("你好".getBytes());
????????????????????????????????buffer.flip();
????????????????????????????????channel.write(buffer);
????????????????????????????}
????????????????????????????channel.register(selector,?SelectionKey.OP_READ);
????????????????????????}
????????????????????????if?(key.isReadable())?{
????????????????????????????channel?=?(SocketChannel)?key.channel();
????????????????????????????ByteBuffer?buffer?=?ByteBuffer.allocate(1024);
????????????????????????????int?len?=?channel.read(buffer);
????????????????????????????if?(len?>?0)?{
????????????????????????????????System.out.println("["?+?Thread.currentThread().getName()
????????????????????????????????????????+?"]收到響應(yīng):"?+?new?String(buffer.array(),?0,?len));
????????????????????????????????Thread.sleep(5000);
????????????????????????????????channel.register(selector,?SelectionKey.OP_WRITE);
????????????????????????????}
????????????????????????}
????????????????????????if(key.isWritable())?{
????????????????????????????ByteBuffer?buffer?=?ByteBuffer.allocate(1024);
????????????????????????????buffer.put("你好".getBytes());
????????????????????????????buffer.flip();
????????????????????????????channel?=?(SocketChannel)?key.channel();
????????????????????????????channel.write(buffer);
????????????????????????????channel.register(selector,?SelectionKey.OP_READ);
????????????????????????}
????????????????????}
????????????????}
????????????}?catch?(Exception?e)?{
????????????????e.printStackTrace();
????????????}?finally{
????????????????if(channel?!=?null){
????????????????????try?{
????????????????????????channel.close();
????????????????????}?catch?(IOException?e)?{
????????????????????????e.printStackTrace();
????????????????????}
????????????????}
????????????????if(selector?!=?null){
????????????????????try?{
????????????????????????selector.close();
????????????????????}?catch?(IOException?e)?{
????????????????????????e.printStackTrace();
????????????????????}
????????????????}
????????????}
????????}
????}
}
打印結(jié)果:
//?Server端
NioServer?啟動(dòng)完成
接受新的?Channel
服務(wù)端接收請(qǐng)求:你好
服務(wù)端接收請(qǐng)求:你好
服務(wù)端接收請(qǐng)求:你好
//?Client端
[Thread-0]收到響應(yīng):收到
[Thread-0]收到響應(yīng):收到
[Thread-0]收到響應(yīng):收到
四、總結(jié)
回顧一下使用 NIO 開(kāi)發(fā)服務(wù)端程序的步驟:
創(chuàng)建 ServerSocketChannel和業(yè)務(wù)處理線(xiàn)程池。綁定監(jiān)聽(tīng)端口,并配置為非阻塞模式。 創(chuàng)建 Selector,將之前創(chuàng)建的ServerSocketChannel注冊(cè)到Selector上,監(jiān)聽(tīng)SelectionKey.OP_ACCEPT。循環(huán)執(zhí)行 Selector.select()`` 方法,輪詢(xún)就緒的Channel`。輪詢(xún)就緒的 Channel時(shí),如果是處于OP_ACCEPT狀態(tài),說(shuō)明是新的客戶(hù)端接入,調(diào)用ServerSocketChannel.accept接收新的客戶(hù)端。設(shè)置新接入的 SocketChannel為非阻塞模式,并注冊(cè)到Selector上,監(jiān)聽(tīng)OP_READ。如果輪詢(xún)的 Channel狀態(tài)是OP_READ,說(shuō)明有新的就緒數(shù)據(jù)包需要讀取,則構(gòu)造ByteBuffer對(duì)象,讀取數(shù)據(jù)。
那從這些步驟中基本知道開(kāi)發(fā)者需要熟悉的知識(shí)點(diǎn)有:
jdk-nio提供的幾個(gè)關(guān)鍵類(lèi):Selector,SocketChannel,ServerSocketChannel,FileChannel,ByteBuffer,SelectionKey需要知道網(wǎng)絡(luò)知識(shí):tcp粘包拆包 、網(wǎng)絡(luò)閃斷、包體溢出及重復(fù)發(fā)送等 需要知道 linux底層實(shí)現(xiàn),如何正確的關(guān)閉channel,如何退出注銷(xiāo)selector,如何避免selector太過(guò)于頻繁需要知道如何讓 client端獲得server端的返回值,然后才返回給前端,需要如何等待或在怎樣作熔斷機(jī)制需要知道對(duì)象序列化,及序列化算法 省略等等,因?yàn)槲乙呀?jīng)有點(diǎn)不舒服了,作為程序員的我習(xí)慣了舒舒服服簡(jiǎn)單的API,不用太知道底層細(xì)節(jié),就能寫(xiě)出比較健壯和沒(méi)有Bug的代碼...
NIO 原生 API 的弊端 :
① NIO 組件復(fù)雜 : 使用原生 NIO 開(kāi)發(fā)服務(wù)器端與客戶(hù)端 , 需要涉及到 服務(wù)器套接字通道 ( ServerSocketChannel ) , 套接字通道 ( SocketChannel ) , 選擇器 ( Selector ) , 緩沖區(qū) ( ByteBuffer ) 等組件 , 這些組件的原理 和API 都要熟悉 , 才能進(jìn)行 NIO 的開(kāi)發(fā)與調(diào)試 , 之后還需要針對(duì)應(yīng)用進(jìn)行調(diào)試優(yōu)化
② NIO 開(kāi)發(fā)基礎(chǔ) : NIO 門(mén)檻略高 , 需要開(kāi)發(fā)者掌握多線(xiàn)程、網(wǎng)絡(luò)編程等才能開(kāi)發(fā)并且優(yōu)化 NIO 網(wǎng)絡(luò)通信的應(yīng)用程序
③ 原生 API 開(kāi)發(fā)網(wǎng)絡(luò)通信模塊的基本的傳輸處理 : 網(wǎng)絡(luò)傳輸不光是實(shí)現(xiàn)服務(wù)器端和客戶(hù)端的數(shù)據(jù)傳輸功能 , 還要處理各種異常情況 , 如 連接斷開(kāi)重連機(jī)制 , 網(wǎng)絡(luò)堵塞處理 , 異常處理 , 粘包處理 , 拆包處理 , 緩存機(jī)制 等方面的問(wèn)題 , 這是所有成熟的網(wǎng)絡(luò)應(yīng)用程序都要具有的功能 , 否則只能說(shuō)是入門(mén)級(jí)的 Demo
④ NIO BUG : NIO 本身存在一些 BUG , 如 Epoll , 導(dǎo)致 選擇器 ( Selector ) 空輪詢(xún) , 在 JDK 1.7 中還沒(méi)有解決
Netty 在 NIO 的基礎(chǔ)上 , 封裝了 Java 原生的 NIO API , 解決了上述哪些問(wèn)題呢 ?
相比 Java NIO,使用 Netty 開(kāi)發(fā)程序,都簡(jiǎn)化了哪些步驟呢?...等等這系列問(wèn)題也都是我們要問(wèn)的問(wèn)題。不過(guò)因?yàn)檫@篇只是介紹NIO相關(guān)知識(shí),沒(méi)有介紹Netty API的使用,所以介紹Netty API使用簡(jiǎn)單開(kāi)發(fā)門(mén)檻低等優(yōu)點(diǎn)有點(diǎn)站不住腳。那么就留到后面跟大家一起開(kāi)啟Netty學(xué)習(xí)之旅,探討人人說(shuō)好的Netty到底是不是江湖傳言的那么好。
一起期待后續(xù)的Netty之旅吧!
高清思維導(dǎo)圖原件(xmind/pdf/jpg)可以關(guān)注公眾號(hào)回復(fù)nio 即可。
完
? ? ? ?
???覺(jué)得不錯(cuò),點(diǎn)個(gè)在看~

