Netty 源碼深度解析-EventLoop(1):EventLoop的構(gòu)造

- 前言 -
本文源碼地址:netty-source-code-analysis
本文所使用的netty版本4.1.6.Final:帶注釋的netty源碼
EventLoop在netty中發(fā)揮著驅(qū)動引擎的作用,本文我們以NioEventLoopGroup和NioEventLoop為例著重分析一下EventLoopGroup和EventLoop的創(chuàng)建、一些重要的數(shù)據(jù)結(jié)構(gòu)和netty的一些優(yōu)化。

- NioEventLoopGroup -
咱們以NioEventLoopGroup為例進(jìn)行分析。NioEventLoopGroup有很多構(gòu)造方法,咱們不再一一貼出,只貼出2個關(guān)鍵的構(gòu)造方法。
NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider)該構(gòu)造方法給出了SelectStrategyFactory的默認(rèn)值為DefaultSelectStrategyFactory.INSTANCE。EventLoop在每次循環(huán)時需要調(diào)用該類的calculateStrategy方法來決定循環(huán)的策略,具體咱們后邊講。
我們通過new NioEventLoopGroup()或者new NioEventLoopGroup(32)創(chuàng)建EventLoopGroup時最終都會調(diào)用到這個構(gòu)造方法。接著又調(diào)用了另一個構(gòu)造方法,咱們跟下去。
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
這里調(diào)用了父類 MultithreadEventLoopGroup的構(gòu)造方法,咱們繼續(xù)跟下去。
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
MultithreadEventLoopGroup的構(gòu)造方法如下,這里給出了nThreads的默認(rèn)值,為MultithreadEventLoopGroup的靜態(tài)屬性DEFAULT_EVENT_LOOP_THREADS。接著調(diào)用父類MultithreadEventExecutorGroup的構(gòu)造方法。
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
DEFAULT_EVENT_LOOP_THREADS的賦值在MultithreadEventLoopGroup的靜態(tài)代碼塊中,我們看到該值默認(rèn)為cpu核數(shù)×2。
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
咱們接著跟到MultithreadEventExecutorGroup的構(gòu)造方法,這里繼續(xù)調(diào)用本類的構(gòu)造方法MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args),并且為executor賦默認(rèn)值。
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
executor的默認(rèn)值為ThreadPerTaskExecutor,顧名思義就是為每一個任務(wù)創(chuàng)建一個線程,我們看一下它的實現(xiàn),代碼如下,execute方法中為每一個任務(wù)創(chuàng)建了一個線程。
public final class ThreadPerTaskExecutor implements Executor {
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
我們繼續(xù)跟到MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args),這里接著調(diào)用另一個構(gòu)造方法MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args)并且為chooserFacotry賦默認(rèn)值,chooserFactory咱們在“服務(wù)端的啟動流程”和“客戶端的啟動流程”中均有涉及,作用是在為Channel綁定EventLoop時輪詢地從EventLoopGroup里選擇EventLoop,這里不再贅述。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
接著看下一個構(gòu)造方法MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args),這里就是創(chuàng)建EventLoopGroup的核心邏輯了。
首先創(chuàng)建了一個EventExecutor數(shù)組,并賦值給children屬性,數(shù)據(jù)長度和nThreads相等,很容易想到,這里應(yīng)該就是保存EventLoop的數(shù)組了。
緊接著循環(huán)調(diào)用newChild方法為數(shù)組的每一個元素賦值。
最后調(diào)用chooserFactory.newChooser(children)為chooser賦值。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
} finally {
}
}
chooser = chooserFactory.newChooser(children);
}
我們跟著看一下newChild方法,該方法為抽象的,這里newChild方法的實現(xiàn)在NioEventLoopGroup中。好了,到這里咱們看到了NioEventLoop的構(gòu)造方法,繼續(xù)跟進(jìn)去。
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

- NioEventLoop -
來看NioEventLoop的構(gòu)造方法,這里NioEventLoop保存了3個屬性。
provider:selector的工廠類,從provider可以得到selector。selector: 調(diào)用provider.openSelector()得到的selector,這里netty做了優(yōu)化,咱們后邊講。selectStrategy:這個的calculateStrategy方法返回值,決定了EventLoop的下一步動作,咱們也是后邊講。
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
selector = openSelector();
selectStrategy = strategy;
}
繼續(xù)跟到父類SingleThreadEventLoop的構(gòu)造方法,這里初始化了一個屬性tailTasks,具體有什么用,咱們一會兒說,先記住這里有一個任務(wù)隊列叫tailTasks。
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
}
繼續(xù)跟到父類SingleThreadEventExecutor的構(gòu)造方法,這里有幾個屬性賦值。
addTaskWakesUp:這個是用來喚醒阻塞在taskQueue上的EventLoop線程的,在NioEventLoop中EventLoop不會阻塞在taskQueue上,這里用處不是很大,可以先不研究它。maxPendingTasks:后面緊接著就用到了,任務(wù)隊列的最大長度。executor:真正生成線程的類,默認(rèn)是ThreadPerTaskExecutor。taskQueue:任務(wù)隊列,還記得上邊已經(jīng)有一個tailTask隊列了嗎,這里是另外一個隊列。rejectedExecutionHandler:拒絕策略。
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
繼續(xù)跟到父類AbstractScheduledEventExecutor的構(gòu)造方法,這里什么也沒干,又繼續(xù)調(diào)用父類的構(gòu)造方法,但是我們注意到該類中也有一個隊列scheduledTaskQueue,顧名思義是定時任務(wù)隊列,只是該隊列沒有在構(gòu)造方法中初始化,等到有定時任務(wù)加入時才初始化。
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
super(parent);
}
}
繼續(xù)跟到父類AbstractEventExecutor的構(gòu)造方法,這里為parent賦值,即該EventLoop所屬的EventLoopGroup。
protected AbstractEventExecutor(EventExecutorGroup parent) {
this.parent = parent;
}
- Netty 的優(yōu)化 -
3.1 對selector的優(yōu)化
我們回到NioEventLoop類中的openSelector方法,這個方法在干什么呢,它在通過反射替換sun.nio.ch.SelectorImpl類的兩個屬性selectedKeys和publicSelectedKeys,將這兩個屬性都替換成了SelectedSelectionKeySet類的實例。為什么要這么換呢,咱們接著往下看。
private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();
} catch (IOException e) {
}
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (ClassNotFoundException e) {
} catch (SecurityException e) {
}
}
});
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
} catch (IllegalAccessException e) {
} catch (RuntimeException e) {
}
}
});
if (maybeException instanceof Exception) {
} else {
//將selectedKeySet保存到selectedKeys屬性中
selectedKeys = selectedKeySet;
}
return selector;
}
sun.nio.ch.SelectorImpl類中的selectedKeys和publicSelectedKeys的作用是保存有興趣事件發(fā)生的Selectionkey,其中publicSelectedKeys是selectedKeys的包裝類,Util.ungrowableSet(this.selectedKeys),看方法名ungrowableSet表示包裝之后這個set就不能添加元素了,但是可以刪除元素。也就是說jdk內(nèi)部使用selectedKeys(注意這是個protected字段)進(jìn)行添加和刪除操作,而暴露給用戶的是publicSelectedKeys只能進(jìn)行刪除操作(看selectedKeys方法)。
默認(rèn)實現(xiàn)用的是HashSet,
public abstract class SelectorImpl extends AbstractSelector {
protected Set<SelectionKey> selectedKeys = new HashSet();
protected HashSet<SelectionKey> keys = new HashSet();
private Set<SelectionKey> publicKeys;
private Set<SelectionKey> publicSelectedKeys;
protected SelectorImpl(SelectorProvider var1) {
super(var1);
if (Util.atBugLevel("1.4")) {
this.publicKeys = this.keys;
this.publicSelectedKeys = this.selectedKeys;
} else {
this.publicKeys = Collections.unmodifiableSet(this.keys);
this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
}
}
public Set<SelectionKey> selectedKeys() {
if (!this.isOpen() && !Util.atBugLevel("1.4")) {
throw new ClosedSelectorException();
} else {
return this.publicSelectedKeys;
}
}
}
再來看看SelectedSelectionKeySet,這里用的數(shù)據(jù)結(jié)構(gòu)是數(shù)組,很顯然netty這里的優(yōu)化是因為數(shù)組的元素添加和遍歷操作比HashSet更快。有同學(xué)對這里有兩個數(shù)組的存在表示疑問,大家不用操心這個問題了,因為我也沒看懂為什么會有兩個數(shù)組,還要交替使用。netty在后來的版本中做了優(yōu)化,只用一個數(shù)組就實現(xiàn)了。所以這里只要知道netty用數(shù)組進(jìn)行優(yōu)化就可以了。
那么又一個問題來了,為什么jdk里邊不用數(shù)組或者List,而用HashSet呢,那是因為selector的select方法每次調(diào)用都會把已經(jīng)準(zhǔn)備好的SelectionKey放入selectedKeys中,如果用戶在第一次調(diào)用select方法之后沒有處理相應(yīng)Channel的事件的,也沒有刪除selectedKeys中的元素,那么再次調(diào)用select之后,相同的SelectionKey會再次加入selectedKeys中,如果selectedKeys使用數(shù)組或者List實現(xiàn)將起不到去重的效果。
另一方面,如果使用List或者數(shù)組,刪除成本也比較高。
而netty的實現(xiàn)中保證不會在連續(xù)調(diào)用兩次select方法之間不刪除selectedKeys中的元素,而且netty直接將selectedKeys暴露出來,在刪除的時候可以直接將數(shù)據(jù)中對應(yīng)索引的元素設(shè)置為null。
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
private SelectionKey[] keysA;
private int keysASize;
private SelectionKey[] keysB;
private int keysBSize;
private boolean isA = true;
}
3.2 對任務(wù)隊列的優(yōu)化
在SingleThreadEventExecutor的構(gòu)造方法中taskQueue屬性通過調(diào)用newTaskQueue方法產(chǎn)生,newTaskQueue方法在NioEventLoop中被覆蓋了,我們看一下。
這里被優(yōu)化成了MpscQueue,全稱是MultiProducerSingleConsumerQueue,即多生產(chǎn)者單消費(fèi)者隊列,感興趣的同學(xué)自己去看一下,既然單獨(dú)做一個這樣的隊列出來,在當(dāng)前場景下自然是比BlockingQueue性能更好了。
為什么可以做這樣的優(yōu)化呢,很顯然,這里的隊列消費(fèi)者只有一個那就是EventLoop,而生產(chǎn)者會有多個。并且,這里有一行注釋This event loop never calls takeTask(),這就說明這個MultiProducerSingleConsumerQueue并沒有阻塞的take方法,而正好NioEventLoop也不需要調(diào)用阻塞的take方法,正好適合。
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return PlatformDependent.newMpscQueue(maxPendingTasks);
}
同樣的,tailTask這個隊列也被優(yōu)化成了MpscQueue。我們看一下scheduledTaskQueue是什么隊列,答案在AbstractScheduledEventExecutor#scheduledTaskQueue方法中,我們看到scheduledTaskQueue僅僅是一個普通的優(yōu)先級隊列,甚至都不是一個線程安全的阻塞隊列。
Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
}
return scheduledTaskQueue;
}
為什么呢,為什么tailTasks和taskQueue是MpscQueue,而scheduledTaskQueue是非線程安全的隊列呢?可能是因為沒有的帶優(yōu)先級功能的MpsQueue吧。
scheduledTaskQueue是非線程安全的隊列,會不會有多線程安全問題呢,答案是不會。我們看一下AbstractScheduledEventExecutor#schedule(io.netty.util.concurrent.ScheduledFutureTask<V>)方法,這里在添加定時任務(wù)時,如果是非EventLoop線程調(diào)用,則會發(fā)起一個異步調(diào)用,最終往scheduledTaskQueue添加定時任務(wù)的還是EventLoop線程。所以呢,這里又有另一個優(yōu)化,那就是可以延遲初始化scheduledTaskQueue。

- 總結(jié) -
畫重點(diǎn)來了,本文的重點(diǎn)就這兩個。
每個 EventLoop中有3個隊列,分別是tailTasks、taskQueue和scheduledTaskQueue。而且tailTasks和taskQueue隊列在NioEventLoop中的被優(yōu)化為MultiProducerSingleConsumerQueue。netty對 selector中的selectedKeys做了優(yōu)化,從HashSet替換為SelectedSelectionKeySet,SelectedSelectionKeySet是用數(shù)組實現(xiàn)的Set,添加元素和遍歷效率更高。
作者:王建新,轉(zhuǎn)轉(zhuǎn)架構(gòu)部資深Java工程師,主要負(fù)責(zé)服務(wù)治理、RPC框架、分布式調(diào)用跟蹤、監(jiān)控系統(tǒng)等。愛技術(shù)、愛學(xué)習(xí),歡迎聯(lián)系交流。
來源公眾號:種代碼

