Netty服務端的新連接接入源碼解析
有道無術,術尚可求也!有術無道,止于術!
經(jīng)過上一章節(jié)的學習,我們基本了解了Netty是如何對IO事件以及異步任務的處理了,今天我們就一起來學習一下,Netty是如何處理新連接接入與數(shù)據(jù)讀取的!
一、源碼尋找
我們上一章節(jié)學到了,當存在IO事件的時候,Netty的反應堆線程會監(jiān)聽這些事件,然后進行處理,忘記的,可以回顧一下上一章節(jié),,我們這里直接進入到:
io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
這里的代碼,我們昨天只是做了一個大概的分析,并沒有深入的講解,這一章節(jié)具體分析一下新連接的接入和Channel數(shù)據(jù)的讀取。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
.........忽略有效性驗證................
try {
int readyOps = k.readyOps();
...................忽略其他 事件的處理邏輯...............
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
我們重點關注當事件存在讀事件或者新連接接入事件的時候,才會進入到這一判斷邏輯,那么由此可見,我們兵丁是要關注unsafe.read() 這一行代碼了!
二、新連接接入源碼分析
首選,我們聲明一下,我們現(xiàn)在一直是按照服務端啟動邏輯進行分析的,那么服務端邏輯分析,對照通道就是NioServerSocketChannel, 我們在創(chuàng)建NioServerSocketChannel的時候初始化過一個Unsafe對象,他是NioMessageUnsafe類型的,如果有疑問的同學可以回顧一下NioServerSocketChannel的初始化過程!
所以,必然,我們這里的unsafe.read(); 就必然進入的是NioMessageUnsafe的read方法:

@Override
public void read() {
..................忽略不必要代碼............
try {
try {
do {
//讀取數(shù)據(jù) 可能是數(shù)據(jù) 也可能是新連接
int localRead = doReadMessages(readBuf);
//如果沒數(shù)據(jù)就跳出
if (localRead == 0) {
break;
}
//-1 就是連接被關閉
if (localRead < 0) {
closed = true;
break;
}
//讀取的連接數(shù)增加
allocHandle.incMessagesRead(localRead);
//每次默認讀取最大16個連接 剩余的后續(xù)去讀
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
//獲取連接數(shù)量或者讀取的數(shù)據(jù)的數(shù)量
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//開始傳播channelRead屬性
pipeline.fireChannelRead(readBuf.get(i));
}
//清空緩沖區(qū)
readBuf.clear();
allocHandle.readComplete();
//傳播讀取完成事件
pipeline.fireChannelReadComplete();
...................忽略不必要代碼.......................
} finally {
...................忽略不必要代碼.......................
}
}
1. 讀取新連接
int localRead = doReadMessages(readBuf);
這行代碼是讀取新連接的主要邏輯:

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
//調(diào)用JDK ServerSocketChannel獲取新連接 JDK SocketChannel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
//將客戶端連接直接包裝為 Netty的管道包裝對象 NioSocketChannel
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
................忽略異常處理.............
}
return 0;
}
可以看到這里的邏輯比較簡單,首先,Netty會使用先前保存的JDK 的原生的SocketChannel調(diào)用accept方法進行獲取JDK新連接的管道!
注意此時獲取的管道是JDK NIO的原生的管道對象,和Netty還沒有關系,然后再將JDK NIO原生的Channel包裝為Netty的NioSocketChannel放到緩沖區(qū)里面,注意此時放到緩沖區(qū)里面的對象就是Netty的包裝對象了!包裝完成之后直接返回 ,此時我們的緩沖區(qū)就存在數(shù)據(jù)量,這個數(shù)據(jù)是NioSocketChannel對象!
我們回到主線 read方法,當調(diào)用完doMessage方法之后開始就要處理這個NioSocketChannel了呀!
2. 處理新連接的管道
pipeline.fireChannelRead(readBuf.get(i));
從代碼上看,可以看到,他是把剛剛我們讀到的NioSocketChannel出來往下傳播,這個代碼是在通道內(nèi)傳播,我們前幾節(jié)課講過,此時Pipeline的結構是如圖所示的數(shù)據(jù)結構:

我們看如下代碼:
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
他是從頭節(jié)點開始傳播的,channelRead的傳播是自上而下的,所以就勢必會傳播到 ServerBootstrapAcceptor的邏輯中,所以我們進入到ServerBootstrapAcceptor#channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//能進入都這一段邏輯的就必定是通道對象,因為只有服務端管道會存在該處理器
final Channel child = (Channel) msg;
//向服務端管道追加childHandler 在構建ServerBootStrap的時候傳入的
child.pipeline().addLast(childHandler);
//在構建ServerBootStrap的時候傳入的
setChannelOptions(child, childOptions, logger);
//在構建ServerBootStrap的時候傳入的
setAttributes(child, childAttrs);
try {
//開始進行注冊,注冊邏輯同NioServerSocketChannel相同
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
我們可以看到這個,先是向通道內(nèi)注冊一些客戶端的參數(shù),然后開始進行注冊Channel, 注冊的時候同NioServerSocketChannel的注冊邏輯一樣,只不過NioSocketChannel的關注事件是OP_READ事件,這里留一個作業(yè),同學們可以自己分析一下NioSocketChannel的創(chuàng)建,分析一下它的注冊邏輯與反應堆邏輯!
三、客戶端數(shù)據(jù)讀取源碼解析
我們還是直接回到
io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
.........忽略有效性驗證................
try {
int readyOps = k.readyOps();
...................忽略其他 事件的處理邏輯...............
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
上面分析過,負責客戶端新連接的通道是NioSocketChannel,大家自行分析一下內(nèi)部邏輯,與NioServerSocketChannel的相似度90%!
1. 讀取通道數(shù)據(jù)
NioSocketChannel的Unsafe是 NioByteUnsafe, 所以我們直接進入到:
io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
查看他是如何進行數(shù)據(jù)讀取的:
@Override
public final void read() {
........................忽略........................
//獲取客戶端通道管道
final ChannelPipeline pipeline = pipeline();
//獲取一個內(nèi)存分配器
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
//分配一個ByteBuf緩沖區(qū)
byteBuf = allocHandle.allocate(allocator);
//開始向緩沖區(qū)內(nèi)寫入通道的數(shù)據(jù)
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// 如果沒有讀取到緩沖區(qū),就釋放緩沖區(qū).
byteBuf.release();
byteBuf = null;
//設置關閉標志
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
//傳播一次readChnnel事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
//傳播一次readChnnelComplete事件
pipeline.fireChannelReadComplete();
//關閉通道
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
....................忽略不必要代碼.....................
}
}
我們整體將邏輯分為以下幾個步驟:
獲取一個內(nèi)存分配器,Netty中存在一個專門用于分配ByteBuf的內(nèi)存分配器。這里是將它獲取出來! 使用上一步獲取的內(nèi)存分配器分配一塊緩沖區(qū),用域后續(xù)的使用! 開始讀取通道內(nèi)的數(shù)據(jù)寫入預先分配好的緩沖區(qū)! 讀取數(shù)據(jù)完畢后,將帶有數(shù)據(jù)的緩沖區(qū)調(diào)用pieline的傳播方法進行數(shù)據(jù)的傳播 readChannel方法! 當通道內(nèi)的數(shù)據(jù)被處理完后,傳播一次 channelReadComplete方法
四、總結
在Netty中NioServerSocketChannel與NioSocketChannel的處理中,對于數(shù)據(jù)的讀取擁有不同的處理方法,NioServerSockerChannel主要用于處理新連接的,在初始化的時候就會在通道內(nèi)加入一個新連接接入器
ServerBootstrapAcceptor!NioServerSocketChannel對象在讀取到數(shù)據(jù)后將之包裝為NioSocketChannel對象,然后使用ServerBootstrapAcceptor進行NioSocketChannel的注冊與啟動反應堆線程!
當通道內(nèi)存在數(shù)據(jù)的時候,被NioSockerChannel探測到后,就會先分配一塊緩沖區(qū),將數(shù)據(jù)讀取進預先分配好的緩沖區(qū),然后進行數(shù)據(jù)的向下通道流轉(zhuǎn)(事件觸發(fā))!
才疏學淺,如果文章中理解有誤,歡迎大佬們私聊指正!歡迎關注作者的公眾號,一起進步,一起學習!
??「轉(zhuǎn)發(fā)」和「在看」,是對我最大的支持??
