Netty的異步任務處理與Socket事件處理
有道無術,術尚可求也!有術無道,止于術!
經(jīng)過前面幾章的學習,我們基本是明白了Netty通道的創(chuàng)建、注冊、與綁定與JDK NIO的對應關系,如果我們使用的是JDK NIO的方式去開發(fā)一個Socket服務端的時候,此時還缺少了一個重要的環(huán)節(jié),就是循環(huán)處理IO事件!
我們前面不只一次的見到Netty的異步事件,因為我們某些知識還沒有學習到,所以我們都按照同步的方式去獲取的,所以我們本章節(jié)將帶你學習,Netty對于IO事件的處理與異步事件的處理!
我們以綁定為出發(fā)點,由點到面進行分析!
一、源碼入口
我們直接進入到綁定的源碼分析:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// 在觸發(fā)channelRegistered()之前調(diào)用此方法。給用戶處理程序一個設置的機會
// 其channelRegistered()實現(xiàn)中的管道。
channel.eventLoop().execute(() -> {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
});
}
我們上節(jié)課直接分析的channel.bind方法,而忽略上上面的異步方法,這里我們開始分析異步方法,我們進入到channel.eventLoop().execute()方法:

二、源碼分析
我們前面分析過,每個Channel綁定一個NioEventLoop,而EventLoop又是SingleThreadEventExecutor的子類,所以我們進入到io.netty.util.concurrent.SingleThreadEventExecutor#execute(java.lang.Runnable):
@Override
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
---------------------------分界線------------------------------------
//繼續(xù)往下追 execute
private void execute(Runnable task, boolean immediate) {
//判斷當前執(zhí)行的線程是不是 NIoEventLoopGroup的線程 這里是false
boolean inEventLoop = inEventLoop();
//將任務加入到隊列
addTask(task);
//這里永遠只能啟動一次 一個eventLoop
if (!inEventLoop) {
//啟動線程
startThread();
.....................................
}
//io.netty.channel.nio.NioEventLoop.selector
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
我們這里可以分為兩部分:
1. 添加任務
addTask(task);
----------------------------------分界線---------------------------
protected void addTask(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
if (!offerTask(task)) {
reject(task);
}
}
基礎好一點的同學我估計已經(jīng)有點猜到了,單看這個 offerTask有沒有像和隊列相關的操作,我們進入到offerTask方法:
final boolean offerTask(Runnable task) {
...............忽略.................
return taskQueue.offer(task);
}
果不其然,果然是入隊操作,taskQueue是什么呢?

我們再初始化NioEventLoop的源碼分析學習的時候,學習到,我們會創(chuàng)建兩個MpscQ隊列(多生產(chǎn)者,單消費者),這個taskQueue就是當時我們創(chuàng)建的一個任務隊列,這里面將我們提交的異步任務追加到隊列里面!
返回異步任務是不是被追加到隊列里面了,如果隊列滿了,或者其他原因追加失敗的話,會返回false,就會執(zhí)行reject方法:
protected final void reject(Runnable task) {
rejectedExecutionHandler.rejected(task, this);
}
這個拒絕策略同樣是我們再創(chuàng)建NioEventLoop的時候創(chuàng)建保存的,給大家留一個作業(yè),去追一下這個拒絕策略,判斷一下當發(fā)生了添加異步任務失敗之后,會發(fā)生什么呢?
2. 啟動消費線程
startThread();
-----------------------------分割線-------------------------
/**
* 啟動線程
*/
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
//啟動線程
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
注意,這里有個CAS操作 STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED); 判斷消費線程是不是已經(jīng)啟動,如果已經(jīng)啟動就不進入這個邏輯,如果沒啟動就進入這個邏輯!我們第一次調(diào)用,肯定沒啟動,進入這個邏輯:
doStartThread();
----------------------------分割線---------------------------
private void doStartThread() {
assert thread == null;
//創(chuàng)建一條線程并啟動
//這個線程又EventLoop
executor.execute(new Runnable() {
@Override
public void run() {
//保存當前線程 給線程賦值的就是這里
thread = Thread.currentThread();
...........................忽略........................
try {
//進行實際的啟動
//io.netty.channel.nio.NioEventLoop.run
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
...........................忽略........................
}
}
...........................忽略........................
}
...........................忽略........................
}
代碼比較長,我們只分析主線邏輯:
thread = Thread.currentThread();
首先保存了一下當前線程到成員變量,這個分支不是很重要,后面有時間進行分析!
SingleThreadEventExecutor.this.run();
這個就是處理異步任務的代碼,我們進入到run方法查看:

@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
//存在任務就返回IO時間的數(shù)量,不存在任務就返回select阻塞等待事件發(fā)生
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
//如果不存在異步任務 就進行事件選擇
case SelectStrategy.SELECT:
//下一個定時任務的截至時間 當不存在任務的時候就返回-1
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE;
}
nextWakeupNanos.set(curDeadlineNanos);
try {
//不存在任務就去阻塞獲取IO事件
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
nextWakeupNanos.lazySet(AWAKE);
}
default:
}
} catch (IOException e) {
//替換一個選擇器
rebuildSelector0();
//選擇次數(shù)重置為0
selectCnt = 0;
//處理循環(huán)異常 主要處理方式就是睡眠一會讓程序主動釋放CPU
handleLoopException(e);
continue;
}
//本次循環(huán)次數(shù)+1
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
//這里是默認值 50
final int ioRatio = this.ioRatio;
boolean ranTasks;
//不會進這個分支
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
//當存在I/O事件的時候
} else if (strategy > 0) {
//記錄一下當前的時間
final long ioStartTime = System.nanoTime();
try {
//處理IO事件
processSelectedKeys();
} finally {
//計算處理IO事件耗費的事件
final long ioTime = System.nanoTime() - ioStartTime;
//里面的時間是計算處理異步任務的時間盡量保持為1:1
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
//沒有IO事件的話就處理異步任務
ranTasks = runAllTasks(0);
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
//沒有空輪詢的話三次一清空
selectCnt = 0;
//如果空輪詢的次數(shù)超過默認的512次 就處理空輪詢BUG的選擇器
} else if (unexpectedSelectorWakeup(selectCnt)) {
//空輪詢被處理后清空 輪詢次數(shù)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
...................忽略........................
} finally {
...................忽略........................
}
}
}
這主線邏輯分為三個:如何解決IO事件、如何處理異步任務、如何解決空輪詢BUG!!分支代碼關注一下注釋,這里分析下主線代碼:
I. I/O事件的處理
processSelectedKeys();
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
selectedKeys是我們在創(chuàng)建NIOEventLoop的時候,會創(chuàng)建一個優(yōu)化后的的SelectorKeySet集合,使用數(shù)組來實現(xiàn)的,大家忘記的話,可以會看一下NioEventLoop的初始化源碼篇!
當你沒有禁用優(yōu)化的時候,就會進入到if分支,我們查看if內(nèi)部代碼的源碼:
private void processSelectedKeysOptimized() {
//開始遍歷所有的主鍵
for (int i = 0; i < selectedKeys.size; ++i) {
//獲取事件
final SelectionKey k = selectedKeys.keys[i];
//將該位置的數(shù)據(jù)制空
selectedKeys.keys[i] = null;
//獲取之間注冊NioServerSocketChannel的時候,綁定的Channel對象
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//開始進行IO事件處理
processSelectedKey(k, (AbstractNioChannel) a);
} else {
.........................忽略............................
}
.........................忽略............................
}
}
獲取事件集合中的每一個key,同時獲取之前綁定的NioServerSocketChannel,然后調(diào)用processSelectedKey處理這個事件:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
//當key失效之后,就關閉通道
....................忽略....................
}
try {
//獲取當前事件的key 掩碼
int readyOps = k.readyOps();
//是否包含連接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
//獲取包含的事件
int ops = k.interestOps();
//剔除OP_CONNECT事件
ops &= ~SelectionKey.OP_CONNECT;
//重新更新關注的事件
k.interestOps(ops);
//傳播 connect事件
unsafe.finishConnect();
}
//如果當前返回的關注事件的掩碼包含 OP_WRITE的話
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
//開始向通道內(nèi)刷新數(shù)據(jù)
ch.unsafe().forceFlush();
}
//如果當前的事件掩碼包含讀、新連接接入事件 或者 不關注任何事件的時候 傳播read事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //傳播read事件 可能是新連接接入也可能有數(shù)據(jù)可讀
unsafe.read();
}
} catch (CancelledKeyException ignored) {
//發(fā)生異常關閉通道
unsafe.close(unsafe.voidPromise());
}
}
大家可以看到,里面的處理基本和我們對于JDK NIO的處理一致,就是判斷各種事件然后進行對應的處理!
II、異步任務的處理
runAllTasks();
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
//合并任務 將定時任務的隊列里面的任務拉去出來,和異步任務的隊列進行合并
fetchedAll = fetchFromScheduledTaskQueue();
//開始執(zhí)行全部的任務
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll);
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
這里就是異步任務的被執(zhí)行的地方,這里分為兩個步驟:1. 合并任務 2.執(zhí)行taskQueue異步任務 3.執(zhí)行tailQueue異步任務!
合并任務
fetchedAll = fetchFromScheduledTaskQueue();Netty在我們學習中已經(jīng)知道了兩種隊列,一種是taskQueue隊列,一種是tailQueue隊列,現(xiàn)在又出現(xiàn)了第三種隊列:
scheduledTaskQueue,他是一個專門存放定時任務的對隊列,這里的合并任務就是將即將要執(zhí)行的任務合并到taskQueue中等待執(zhí)行!這行代碼執(zhí)行完畢后,所有即將要執(zhí)行的任務都被添加在了taskQueue隊列中,等待后續(xù)的執(zhí)行!
執(zhí)行taskQueue異步任務
//注意這里傳入的是合并完成后額taskQueue
runAllTasksFrom(taskQueue)上述代碼將對應的任務全部集中到了taskQueue隊列中后們這里開始消費taskQueue隊列進行執(zhí)行!我們可以適當?shù)目匆幌略创a:
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
//從taskQueue隊列中彈出一個任務
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
//執(zhí)行任務 調(diào)用run方法
safeExecute(task);
//繼續(xù)彈出任務
task = pollTaskFrom(taskQueue);
//如果彈出的任務為空
if (task == null) {
//直接返回
return true;
}
}
}執(zhí)行tailQueue異步任務
afterRunningAllTasks();這里開始執(zhí)行tailQueue節(jié)點的任務,可以看到,tailQueue節(jié)點的任務執(zhí)行優(yōu)先級低于上述兩種隊列!

image-20210503101059511 @Override
protected void afterRunningAllTasks() {
//注意這里傳入的是 tailQueue
runAllTasksFrom(tailTasks);
}
//繼續(xù)往下看源碼
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
//彈出任務
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
//執(zhí)行任務
safeExecute(task);
//再次彈出任務
task = pollTaskFrom(taskQueue);
if (task == null) {
//任務執(zhí)行完畢 返回true
return true;
}
}
}這里就不作過多講解了,這里和上面的邏輯基本一致,只是執(zhí)行的qeueb不是一個!
III、解決臭名昭著的JDK空輪詢BUG
可能大家大家都知道,JDK NIO在事件循環(huán)判斷的時候可能會出現(xiàn)空輪詢的BUG,導致CPU100%,雖然Oracle官方宣稱空輪詢的BUG已經(jīng)解決了,但是后續(xù)經(jīng)過一些公司實際的業(yè)務上證明并沒有解決,只是出現(xiàn)幾率小了點,Netty事實上并沒有解決這個空輪詢BUG只是用另外一種比較巧妙的方法規(guī)避開了,我們一起學習下:
首先,我們先想一下,我們?nèi)绾螖喽ㄎ覀兊某绦蚩赡馨l(fā)生了空輪詢的BUG,學習過NIO的都知道,我們會調(diào)用一個selector.select()進行阻塞等待有完成的事件發(fā)生,當selet方法阻塞解除的時候,就證明一定有我么感興趣的事件發(fā)生,但是當我們發(fā)現(xiàn)select方法解除了阻塞,但是事件數(shù)量卻為0的時候,我們就認為可能出現(xiàn)了空輪詢的BUG!
但是IO數(shù)量為0并不是一定出現(xiàn)了空輪詢的BUG,也可能外部調(diào)用了markUp方法,所以我們不能每一次出現(xiàn)事件數(shù)量為0的時候都認為程序出現(xiàn)了空輪詢BUG,所以我們就需要有一個記錄它出現(xiàn)該類異常情況發(fā)生的次數(shù),當發(fā)生的次數(shù)達到了我們設置的閾值,就證明它可能發(fā)生了空輪詢的BUG,這個時候需要處理這個空輪詢的BUG!
那么如何處理呢? 我們?nèi)蝿瞻l(fā)生空輪詢問題是因為(JDK官方認為,這個Linux Epoll告訴JDK有事件了,但是JDK獲取事件的時候獲取了一個空,所以JDK只能返回一個0)所以就發(fā)生了空輪詢:
JDK官方給出的解決方案
Netty是使用的第三種,拋棄舊的選擇器,重建一個新的選擇器,然后替換舊的選擇器,我們一起看下源碼!
我們看看Netty是如何做的,我們回到io.netty.channel.nio.NioEventLoop#run源碼:
我還是,為了方便講解,把這段代碼貼出來省略和空輪詢無關的代碼(完整代碼見上):
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
........................忽略進行事件選擇的代碼...................
//本次循環(huán)次數(shù)+1
selectCnt++;
....................忽略事件處理和異步任務執(zhí)行的代碼................
//當處理的異步任務或者IO事件的數(shù)量大于0,證明沒有發(fā)生空輪詢
if (ranTasks || strategy > 0) {
//每隔三次打印一次日志
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
//沒有空輪詢的話清空
selectCnt = 0;
//如果出現(xiàn)異步任務為空 IO事件為空的話就會進入到這個邏輯
} else if (unexpectedSelectorWakeup(selectCnt)) {
//空輪詢被處理后清空 輪詢次數(shù)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
...................忽略........................
} finally {
...................忽略........................
}
}
可以仔細的看一下 上述代碼的注釋,我們進入到 unexpectedSelectorWakeup(selectCnt) 方法:
private boolean unexpectedSelectorWakeup(int selectCnt) {
..............忽略日志打印................
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
//判斷異常情況的次數(shù)是不是超過了預設的512次
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
//開始重新構建一個selector
rebuildSelector();
return true;
}
return false;
}
我們讀源碼到這里,可以知道,當異常執(zhí)行的次數(shù)超過了閾值 512次,就會調(diào)用一個 rebuildSelector方法,我們點進去看一下:
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector0();
}
});
return;
}
rebuildSelector0();
}
我們按照慣例,按照同步方法調(diào)用 rebuildSelector0();
private void rebuildSelector0() {
//獲取原始的選擇器
final Selector oldSelector = selector;
//聲明一個新的選擇器
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
//創(chuàng)建一個新的選擇器,賦值給新的選擇器變量
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
int nChannels = 0;
//開始遍歷舊的選擇器,將舊選擇器的IO事件的key,綁定到新創(chuàng)建的選擇器上
for (SelectionKey key: oldSelector.keys()) {
//獲取舊選擇器的管道
Object a = key.attachment();
try {
//如果key失效了,就跳過!
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
//獲取對應關注的事件掩碼
int interestOps = key.interestOps();
//將舊key置為失效
key.cancel();
//重新將管道綁定到新的選擇器上
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
//替換管道里面保存的選擇器事件主鍵
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
...............省略...............
}
}
//重新保存新的優(yōu)化后的選擇器和原始選擇器
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
//關閉舊的選擇器
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
...............省略..................
}
}
...............省略..................
}
我們從上述代碼可以看到,Netty處理空輪詢的問題的策略是,當發(fā)現(xiàn)你可能發(fā)生空輪詢的次數(shù)超過了512次的時候,就直接重新獲取一個新的選擇器,然后將舊的選擇器直接替換掉,這樣空輪詢的BUG也就很輕易的解決了!
三、總結
每一個EventLoop都會啟動一條永久運行的線程,用于處理異步任務和IO事件,我們稱之為Reactor線程。 如果存在IO事件的話,會先處理IO事件! Reactor線程會先將定時任務里面的任務合并到taskqueue里面,然后執(zhí)行!taskQueue執(zhí)行完畢后執(zhí)行tailQueue隊列的任務! 如果空輪詢的次數(shù)發(fā)生了512次,就認為發(fā)生了空輪詢的BUG,就會拋棄原來的選擇器,重建一個新的選擇器,將舊選擇器上的事件全部綁定到新的選擇器上,然后將舊選擇器刪除!
才疏學淺,如果文章中理解有誤,歡迎大佬們私聊指正!歡迎關注作者的公眾號,一起進步,一起學習!
??「轉發(fā)」和「在看」,是對我最大的支持??
