【死磕NIO】— NIO基礎(chǔ)詳解
Netty 是基于Java NIO 封裝的網(wǎng)絡(luò)通訊框架,只有充分理解了 Java NIO 才能理解好Netty的底層設(shè)計(jì)。Java NIO 由三個(gè)核心組件組件:
Buffer
Channel
Selector
緩沖區(qū) Buffer
Buffer 是一個(gè)數(shù)據(jù)對(duì)象,我們可以把它理解為固定數(shù)量的數(shù)據(jù)的容器,它包含一些要寫入或者讀出的數(shù)據(jù)。
在 Java NIO 中,任何時(shí)候訪問(wèn) NIO 中的數(shù)據(jù),都需要通過(guò)緩沖區(qū)(Buffer)進(jìn)行操作。讀取數(shù)據(jù)時(shí),直接從緩沖區(qū)中讀取,寫入數(shù)據(jù)時(shí),寫入至緩沖區(qū)。NIO 最常用的緩沖區(qū)則是 ByteBuffer。下圖是 Buffer 繼承關(guān)系圖:

每一個(gè) Java 基本類型都對(duì)應(yīng)著一種 Buffer,他們都包含這相同的操作,只不過(guò)是所處理的數(shù)據(jù)類型不同而已。
通道 Channel
Channel 是一個(gè)通道,它就像自來(lái)水管一樣,網(wǎng)絡(luò)數(shù)據(jù)通過(guò) Channel 這根水管讀取和寫入。傳統(tǒng)的 IO 是基于流進(jìn)行操作的,Channle 和類似,但又有些不同:
| 區(qū)別 | 流 | 通過(guò)Channel |
|---|---|---|
| 支持異步 | 不支持 | 支持 |
| 是否可雙向傳輸數(shù)據(jù) | 不能,只能單向 | 可以,既可以從通道讀取數(shù)據(jù),也可以向通道寫入數(shù)據(jù) |
| 是否結(jié)合 Buffer 使用 | 不 | 必須結(jié)合 Buffer 使用 |
| 性能 | 較低 | 較高 |
正如上面說(shuō)到的,Channel 必須要配合 Buffer 一起使用,我們永遠(yuǎn)不可能將數(shù)據(jù)直接寫入到 Channel 中,同樣也不可能直接從 Channel 中讀取數(shù)據(jù)。都是通過(guò)從 Channel 讀取數(shù)據(jù)到 Buffer 中或者從 Buffer 寫入數(shù)據(jù)到 Channel 中,如下:

簡(jiǎn)單點(diǎn)說(shuō),Channel 是數(shù)據(jù)的源頭或者數(shù)據(jù)的目的地,用于向 buffer 提供數(shù)據(jù)或者讀取 buffer 數(shù)據(jù),并且對(duì) I/O 提供異步支持。
下圖是 Channel 的類圖

Channel 為最頂層接口,所有子 Channel 都實(shí)現(xiàn)了該接口,它主要用于 I/O 操作的連接。定義如下:
public interface Channel extends Closeable {
/**
* 判斷此通道是否處于打開狀態(tài)。
*/
public boolean isOpen();
/**
*關(guān)閉此通道。
*/
public void close() throws IOException;
}
最為重要的Channel實(shí)現(xiàn)類為:
FileChannel:一個(gè)用來(lái)寫、讀、映射和操作文件的通道
DatagramChannel:能通過(guò) UDP 讀寫網(wǎng)絡(luò)中的數(shù)據(jù)
SocketChannel: 能通過(guò) TCP 讀寫網(wǎng)絡(luò)中的數(shù)據(jù)
ServerSocketChannel:可以監(jiān)聽新進(jìn)來(lái)的 TCP 連接,像 Web 服務(wù)器那樣。對(duì)每一個(gè)新進(jìn)來(lái)的連接都會(huì)創(chuàng)建一個(gè) SocketChannel
多路復(fù)用器 Selector
多路復(fù)用器 Selector,它是 Java NIO 編程的基礎(chǔ),它提供了選擇已經(jīng)就緒的任務(wù)的能力。從底層來(lái)看,Selector 提供了詢問(wèn)通道是否已經(jīng)準(zhǔn)備好執(zhí)行每個(gè) I/O 操作的能力。簡(jiǎn)單來(lái)講,Selector 會(huì)不斷地輪詢注冊(cè)在其上的 Channel,如果某個(gè) Channel 上面發(fā)生了讀或者寫事件,這個(gè) Channel 就處于就緒狀態(tài),會(huì)被 Selector 輪詢出來(lái),然后通過(guò) SelectionKey 可以獲取就緒 Channel 的集合,進(jìn)行后續(xù)的 I/O 操作。
Selector 允許一個(gè)線程處理多個(gè) Channel ,也就是說(shuō)只要一個(gè)線程復(fù)雜 Selector 的輪詢,就可以處理成千上萬(wàn)個(gè) Channel ,相比于多線程來(lái)處理勢(shì)必會(huì)減少線程的上下文切換問(wèn)題。下圖是一個(gè) Selector 連接三個(gè) Channel :

實(shí)例
服務(wù)端
public class NIOServer {
/*接受數(shù)據(jù)緩沖區(qū)*/
private ByteBuffer sendbuffer = ByteBuffer.allocate(1024);
/*發(fā)送數(shù)據(jù)緩沖區(qū)*/
private ByteBuffer receivebuffer = ByteBuffer.allocate(1024);
private Selector selector;
public NIOServer(int port) throws IOException {
// 打開服務(wù)器套接字通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 服務(wù)器配置為非阻塞
serverSocketChannel.configureBlocking(false);
// 檢索與此通道關(guān)聯(lián)的服務(wù)器套接字
ServerSocket serverSocket = serverSocketChannel.socket();
// 進(jìn)行服務(wù)的綁定
serverSocket.bind(new InetSocketAddress(port));
// 通過(guò)open()方法找到Selector
selector = Selector.open();
// 注冊(cè)到selector,等待連接
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server Start----:");
}
private void listen() throws IOException {
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
handleKey(selectionKey);
}
}
}
private void handleKey(SelectionKey selectionKey) throws IOException {
// 接受請(qǐng)求
ServerSocketChannel server = null;
SocketChannel client = null;
String receiveText;
String sendText;
int count=0;
// 測(cè)試此鍵的通道是否已準(zhǔn)備好接受新的套接字連接。
if (selectionKey.isAcceptable()) {
// 返回為之創(chuàng)建此鍵的通道。
server = (ServerSocketChannel) selectionKey.channel();
// 接受到此通道套接字的連接。
// 此方法返回的套接字通道(如果有)將處于阻塞模式。
client = server.accept();
// 配置為非阻塞
client.configureBlocking(false);
// 注冊(cè)到selector,等待連接
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
// 返回為之創(chuàng)建此鍵的通道。
client = (SocketChannel) selectionKey.channel();
//將緩沖區(qū)清空以備下次讀取
receivebuffer.clear();
//讀取服務(wù)器發(fā)送來(lái)的數(shù)據(jù)到緩沖區(qū)中
count = client.read(receivebuffer);
if (count > 0) {
receiveText = new String( receivebuffer.array(),0,count);
System.out.println("服務(wù)器端接受客戶端數(shù)據(jù)--:"+receiveText);
client.register(selector, SelectionKey.OP_WRITE);
}
} else if (selectionKey.isWritable()) {
//將緩沖區(qū)清空以備下次寫入
sendbuffer.clear();
// 返回為之創(chuàng)建此鍵的通道。
client = (SocketChannel) selectionKey.channel();
sendText="message from server--";
//向緩沖區(qū)中輸入數(shù)據(jù)
sendbuffer.put(sendText.getBytes());
//將緩沖區(qū)各標(biāo)志復(fù)位,因?yàn)橄蚶锩鎝ut了數(shù)據(jù)標(biāo)志被改變要想從中讀取數(shù)據(jù)發(fā)向服務(wù)器,就要復(fù)位
sendbuffer.flip();
//輸出到通道
client.write(sendbuffer);
System.out.println("服務(wù)器端向客戶端發(fā)送數(shù)據(jù)--:"+sendText);
client.register(selector, SelectionKey.OP_READ);
}
}
public static void main(String[] args) throws IOException {
int port = 8080;
NIOServer server = new NIOServer(port);
server.listen();
}
}
客戶端
public class NIOClient {
/*接受數(shù)據(jù)緩沖區(qū)*/
private static ByteBuffer sendbuffer = ByteBuffer.allocate(1024);
/*發(fā)送數(shù)據(jù)緩沖區(qū)*/
private static ByteBuffer receivebuffer = ByteBuffer.allocate(1024);
public static void main(String[] args) throws IOException {
// 打開socket通道
SocketChannel socketChannel = SocketChannel.open();
// 設(shè)置為非阻塞方式
socketChannel.configureBlocking(false);
// 打開選擇器
Selector selector = Selector.open();
// 注冊(cè)連接服務(wù)端socket動(dòng)作
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// 連接
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
Set<SelectionKey> selectionKeys;
Iterator<SelectionKey> iterator;
SelectionKey selectionKey;
SocketChannel client;
String receiveText;
String sendText;
int count=0;
while (true) {
//選擇一組鍵,其相應(yīng)的通道已為 I/O 操作準(zhǔn)備就緒。
//此方法執(zhí)行處于阻塞模式的選擇操作。
selector.select();
//返回此選擇器的已選擇鍵集。
selectionKeys = selector.selectedKeys();
//System.out.println(selectionKeys.size());
iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
selectionKey = iterator.next();
if (selectionKey.isConnectable()) {
System.out.println("client connect");
client = (SocketChannel) selectionKey.channel();
// 判斷此通道上是否正在進(jìn)行連接操作。
// 完成套接字通道的連接過(guò)程。
if (client.isConnectionPending()) {
client.finishConnect();
System.out.println("完成連接!");
sendbuffer.clear();
sendbuffer.put("Hello,Server".getBytes());
sendbuffer.flip();
client.write(sendbuffer);
}
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
client = (SocketChannel) selectionKey.channel();
//將緩沖區(qū)清空以備下次讀取
receivebuffer.clear();
//讀取服務(wù)器發(fā)送來(lái)的數(shù)據(jù)到緩沖區(qū)中
count=client.read(receivebuffer);
if(count>0){
receiveText = new String( receivebuffer.array(),0,count);
System.out.println("客戶端接受服務(wù)器端數(shù)據(jù)--:"+receiveText);
client.register(selector, SelectionKey.OP_WRITE);
}
} else if (selectionKey.isWritable()) {
sendbuffer.clear();
client = (SocketChannel) selectionKey.channel();
sendText = "message from client--";
sendbuffer.put(sendText.getBytes());
//將緩沖區(qū)各標(biāo)志復(fù)位,因?yàn)橄蚶锩鎝ut了數(shù)據(jù)標(biāo)志被改變要想從中讀取數(shù)據(jù)發(fā)向服務(wù)器,就要復(fù)位
sendbuffer.flip();
client.write(sendbuffer);
System.out.println("客戶端向服務(wù)器端發(fā)送數(shù)據(jù)--:"+sendText);
client.register(selector, SelectionKey.OP_READ);
}
}
selectionKeys.clear();
}
}
}
運(yùn)行結(jié)果


