NioServerSocketChannel的綁定源碼解析
有道無(wú)術(shù),術(shù)尚可求也!有術(shù)無(wú)道,止于術(shù)!
前面兩節(jié)課,我們著重分析了 initAndRegister方法,對(duì)通訊通道的創(chuàng)建、初始化以及注冊(cè)到選擇器上有了一個(gè)詳細(xì)的介紹,回想JDK NIO的開發(fā)步驟,我們需要獲取SocketChaennel、獲取選擇器Selector、將通道注冊(cè)到選擇器、綁定端口、處理事件!那么同樣的Netty是基于NIO開發(fā)的,也同樣少不了這幾個(gè)步驟,迄今為止,我們已經(jīng)學(xué)習(xí)了,Selector的創(chuàng)建、SocketChannel的創(chuàng)建、選擇器的注冊(cè),今天我們要學(xué)的就是通道的綁定端口!
一、源碼入口
我們回到:io.netty.bootstrap.AbstractBootstrap#doBind 方法:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
.............................忽略..............................
if (regFuture.isDone()) {
........................忽略.........................
//進(jìn)行數(shù)據(jù)綁定 通道的注冊(cè) 以及事件的觸發(fā)
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
........................忽略.........................
return promise;
}
}
這幾行代碼相信大家無(wú)比的熟悉,initAndRegister是做Channel的創(chuàng)建、初始化、注冊(cè)的,我們分析完了,下面就是要分析綁定方法了!
話不多少,我們直接進(jìn)入到doBind0方法里面!
private static void doBind0( final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// 在觸發(fā)channelRegistered()之前調(diào)用此方法。給用戶處理程序一個(gè)設(shè)置的機(jī)會(huì)
// 其channelRegistered()實(shí)現(xiàn)中的管道。
channel.eventLoop().execute(() -> {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
});
}
毋庸置疑,我們重點(diǎn)關(guān)注:
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
繼續(xù)跟進(jìn)到bind方法:

@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
//this.bind head
return pipeline.bind(localAddress, promise);
}
我們到這里看到了一行奇怪的代碼,似乎調(diào)用了一個(gè)通道的傳播,我們繼續(xù)跟下去:
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
大家都知道,tail節(jié)點(diǎn)是我們?cè)偻ǖ乐械淖钗膊抗?jié)點(diǎn),大家通過上節(jié)課的分析可知,現(xiàn)在的pipeline是如下結(jié)構(gòu):

我們查看bind方法是 ChannelOutboundInvoker接口下的 ,回想我們分析Netty的整體架構(gòu)圖的時(shí)候,分析過ChannelOutboundInvoker是從后向前傳播的,即從tail節(jié)點(diǎn)向前傳播,最終到Head節(jié)點(diǎn)結(jié)束的,但是TailContext與ServerBootstrapAcceptor都未實(shí)現(xiàn)bind方法,那么我們最終把位置定位到HeadContext的代碼上:(注意,這里不必知道,哎pipeline中是如何傳播的,下面有一章節(jié)是對(duì)pipeline的添加、尋找、注冊(cè)有一個(gè)完整的源碼分析,這里為了同學(xué)們更好的理解,就先不涉及這么多了!)
我們進(jìn)入到:io.netty.channel.DefaultChannelPipeline.HeadContext#bind
@Override
public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}

unsafe是NioMessageUnsafe類型的,父類是AbstractNioUnsafe,所以我們進(jìn)入到AbstractNioUnsafe的源碼:
二、源碼解析
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
...............................忽略.................................
//false
boolean wasActive = isActive();
try {
//jdk底層的綁定端口 NioServerSocketChannel
doBind(localAddress);
} catch (Throwable t) {
...............................忽略.................................
return;
}
//isActive true
if (!wasActive && isActive()) {
//觸發(fā) Active事件
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
我們還是忽略到部分分支代碼,看我們的主線代碼,首先會(huì)判斷通道是否是激活狀態(tài):
boolean wasActive = isActive();
此時(shí),通道并沒有綁定端口號(hào),所以此時(shí)返回的是false。
doBind(localAddress);
開始調(diào)用JDK底層的邏輯進(jìn)行通道的綁定,我們進(jìn)入到doBind方法,你們一定要記好,我們初始化的是服務(wù)端,我們給的通道類型是NioServerSocketChannel

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
//Netty 會(huì)根據(jù) JDK 版本的不同,分別調(diào)用 JDK 底層不同的 bind() 方法。
// 我使用的是 JDK8,所以會(huì)調(diào)用 JDK 原生 Channel 的 bind() 方法。
// 執(zhí)行完 doBind() 之后,服務(wù)端 JDK 原生的 Channel 真正已經(jīng)完成端口綁定了。
if (PlatformDependent.javaVersion() >= 7) {
//jdk底層的綁定
javaChannel().bind(localAddress, config.getBacklog());
} else {
//jdk底層的綁定
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
Netty會(huì)根據(jù)JDK版本的不同注冊(cè)的時(shí)候有些微的不一樣,我們以JDK8為例,會(huì)執(zhí)行if分支:
javaChannel().bind(localAddress, config.getBacklog());
這行代碼,相信大家也是無(wú)比的熟悉,這就是JDK NIO的綁定端口的代碼,我們回想下JDK NIO是如何綁定端口的:

上圖的JDK NIO的注冊(cè)方式,兩者代碼是一致的!綁定完成后,我們回到主線代碼:
if (!wasActive && isActive()) {
//觸發(fā) Active事件
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
wasActive屬性是false,因?yàn)橹斑€沒激活,取反為true,此時(shí)通道已經(jīng)綁定成功了,重新調(diào)用isActive(),返回為true,所以整體返回true,走該分支,我們暫且停一下,試想一下,這個(gè)判斷的意義在哪里!
按照之前的分析,這個(gè)判斷的邏輯是,綁定之前沒有激活,綁定之后激活了,只有兩個(gè)條件同時(shí)滿足才會(huì)走這個(gè)分支,這能夠保障該判斷邏輯內(nèi)的邏輯不會(huì)被重復(fù)調(diào)用,只會(huì)再綁定成功后調(diào)用一次!
我們進(jìn)入到邏輯分支,該方法也是異步的,但是沒關(guān)系,我們依舊按照同步的方式分析,有關(guān)異步,我會(huì)在下一節(jié)課完整的分析,在Netty中所有的異步都有一個(gè)相同的執(zhí)行方式!
pipeline.fireChannelActive();
從定義上來(lái)看又是一個(gè)管道的事件傳播,我們進(jìn)入看一下,從什么地方開始傳播的:
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
我們可以看到是從Head節(jié)點(diǎn)開始傳播的,
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
這里無(wú)論是同步還是異步,都是調(diào)用了 next.invokeChannelActive(); 我們進(jìn)入到源碼邏輯:
private void invokeChannelActive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
..............................忽略............................
}
}
..............................忽略............................
}
因?yàn)槲覀兊膆andler是Head類型的,所以:

@Override
public void channelActive(ChannelHandlerContext ctx) {
//傳播事件
ctx.fireChannelActive();
//設(shè)置為讀監(jiān)聽
readIfIsAutoRead();
}
一共兩行代碼,比較簡(jiǎn)單,第一行傳播事件,從頭結(jié)點(diǎn)往下尋找傳播 ChannelActive方法:
ctx.fireChannelActive();
有關(guān)事件的傳播,我會(huì)在pipeline中詳解,這里先記住,會(huì)傳播一個(gè)事件,調(diào)用channelActive方法!

因?yàn)镃hannelActive是ChannelInboundHandler類的方法,Netty整體架構(gòu)課分析過,ChannelInboundHandler屬于正向傳播,即從Head節(jié)點(diǎn)開始到Tail節(jié)點(diǎn)結(jié)束:

//設(shè)置為讀監(jiān)聽
readIfIsAutoRead();
大家回想一下,我們?cè)僮?cè)NioServerSocketChannel的時(shí)候,關(guān)注的是0,即不關(guān)注任何事件,忘記的同學(xué)可以去上一節(jié)課注冊(cè)的源碼解析查看:

但是按道理來(lái)說,以我們JDK NIO的基礎(chǔ),我們新服務(wù)器應(yīng)該關(guān)注的是一個(gè)OP_ACCEPT事件,所以,我們這里就要對(duì)他進(jìn)行一個(gè)更改,讓他關(guān)注新連接事件,我們進(jìn)入到readIfIsAutoRead源碼中:
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
判斷的邏輯分支默認(rèn)為true

關(guān)于為什么選這個(gè),已經(jīng)前面講了好幾次,這里不做陳述,我們直接進(jìn)入到read源碼中:
@Override
public Channel read() {
pipeline.read();
return this;
}
很明顯,又是一個(gè)事件傳播,我們繼續(xù)跟:
@Override
public final ChannelPipeline read() {
tail.read();
return this;
}
很明顯,該方法是從tail節(jié)點(diǎn)開始傳播,Netty整體架構(gòu)課上說過,read屬于ChannelOutboundInvoker,屬于倒序傳播,該代碼是從tail節(jié)點(diǎn)向上尋找,默認(rèn)實(shí)現(xiàn)是HeadContext實(shí)現(xiàn),我們進(jìn)入到HeadContext:
注意,同學(xué)們有關(guān)事件傳播如何傳播的會(huì)很疑惑,先不要急,先按照我的邏輯走,后面學(xué)習(xí)完pipeline之后,你會(huì)對(duì)如何傳播有一個(gè)及其清晰的認(rèn)識(shí),先按照我的邏輯走!
io.netty.channel.DefaultChannelPipeline.HeadContext#read
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
我們進(jìn)入到 unsafe.beginRead();

@Override
public final void beginRead() {
....................忽略..................
try {
doBeginRead();
} catch (final Exception e) {
....................忽略..................
}
}
我們進(jìn)入到 doBeginRead(); 方法中, 注意我們是服務(wù)端默認(rèn)的Unsafe是 AbstractNioMessageChannel類型的:

@Override
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
super.doBeginRead();
}
調(diào)用父類的doBeginRead方法:
io.netty.channel.nio.AbstractNioChannel#doBeginRead
@Override
protected void doBeginRead() throws Exception {
.............................忽略.............................
final int interestOps = selectionKey.interestOps();
//如果當(dāng)前的讀事件為0 且預(yù)設(shè)的事件不為0進(jìn)入邏輯
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
大家還記得我們?cè)賱?chuàng)建NioServerSocketChannel的時(shí)候,保存的readInterestOp 是什么嗎?我截圖幫助大家回憶一下:

那么if分支邏輯內(nèi)就相當(dāng)于:
selectionKey.interestOps(0 | SelectionKey.OP_ACCEPT);
然后,NioServerSokcetChannel的選擇器就被綁定為關(guān)注連接事件了!
至此,服務(wù)端啟動(dòng)成功??!
三、總結(jié)
調(diào)用JDK原生的方法,給channel綁定一個(gè)端口! 傳播channelActive事件,進(jìn)行方法的回調(diào)! 修改NioServerSocketChannel選擇器默認(rèn)關(guān)注的事件從0變?yōu)镾electionKey.OP_ACCEPT,開始等待客戶端新連接接入! 服務(wù)端啟動(dòng)成功!
才疏學(xué)淺,如果文章中理解有誤,歡迎大佬們私聊指正!歡迎關(guān)注作者的公眾號(hào),一起進(jìn)步,一起學(xué)習(xí)!
??「轉(zhuǎn)發(fā)」和「在看」,是對(duì)我最大的支持??
