深挖 Netty 源碼:時(shí)間輪底層原理分析
本文選自 Doocs 開源社區(qū)旗下“源碼獵人”項(xiàng)目,作者?tydhot[1]。
項(xiàng)目將會(huì)持續(xù)更新,歡迎 Star 關(guān)注。
項(xiàng)目地址:https://github.com/doocs/source-code-hunter
該文所涉及的 Netty 源碼版本為 4.1.6。
HashedWheelTimer 是什么
Netty 的時(shí)間輪?HashedWheelTimer?給出了一個(gè)粗略的定時(shí)器實(shí)現(xiàn),之所以稱之為粗略的實(shí)現(xiàn)是因?yàn)樵摃r(shí)間輪并沒有嚴(yán)格的準(zhǔn)時(shí)執(zhí)行定時(shí)任務(wù),而是在每隔一個(gè)時(shí)間間隔之后的時(shí)間節(jié)點(diǎn)執(zhí)行,并執(zhí)行當(dāng)前時(shí)間節(jié)點(diǎn)之前到期的定時(shí)任務(wù)。
當(dāng)然具體的定時(shí)任務(wù)的時(shí)間執(zhí)行精度可以通過調(diào)節(jié) HashedWheelTimer 構(gòu)造方法的時(shí)間間隔的大小來進(jìn)行調(diào)節(jié),在大多數(shù)網(wǎng)絡(luò)應(yīng)用的情況下,由于 IO 延遲的存在,并不會(huì)嚴(yán)格要求具體的時(shí)間執(zhí)行精度,所以默認(rèn)的 100ms 時(shí)間間隔可以滿足大多數(shù)的情況,不需要再花精力去調(diào)節(jié)該時(shí)間精度。
HashedWheelTimer 的實(shí)現(xiàn)原理
HashedWheelTimer 內(nèi)部的數(shù)據(jù)結(jié)構(gòu)
private final HashedWheelBucket[] wheel;HashedWheelTimer 的主體數(shù)據(jù)結(jié)構(gòu) wheel 是一個(gè)由多個(gè)鏈表所組成的數(shù)組,默認(rèn)情況下該數(shù)組的大小為 512。當(dāng)定時(shí)任務(wù)準(zhǔn)備加入到時(shí)間輪中的時(shí)候,將會(huì)以其等待執(zhí)行的時(shí)間為依據(jù)選擇該數(shù)組上的一個(gè)具體槽位上的鏈表加入。
private HashedWheelTimeout head;private HashedWheelTimeout tail;
在這個(gè) wheel 數(shù)組中,每一個(gè)槽位都是一條由 HashedWheelTimeout 所組成的鏈表,其中鏈表中的每一個(gè)節(jié)點(diǎn)都是一個(gè)等待執(zhí)行的定時(shí)任務(wù)。
HashedWheelTimer 內(nèi)部的線程模型
在 HashedWheelTimer 中,其內(nèi)部是一個(gè)單線程的 worker 線程,通過類似 eventloop 的工作模式進(jìn)行定時(shí)任務(wù)的調(diào)度。
@Overridepublic void run() {// Initialize the startTime.startTime = System.nanoTime();if (startTime == 0) {// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.startTime = 1;}// Notify the other threads waiting for the initialization at start().startTimeInitialized.countDown();do {final long deadline = waitForNextTick();if (deadline > 0) {transferTimeoutsToBuckets();HashedWheelBucket bucket =wheel[(int) (tick & mask)];bucket.expireTimeouts(deadline);tick++;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);// Fill the unprocessedTimeouts so we can return them from stop() method.for (HashedWheelBucket bucket: wheel) {bucket.clearTimeouts(unprocessedTimeouts);}for (;;) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}unprocessedTimeouts.add(timeout);}}
簡單看到 HashedWheelTimer 內(nèi)部的 woker 線程的?run()方法,在其首先會(huì)記錄啟動(dòng)時(shí)間作為 startTime 作為接下來調(diào)度定時(shí)任務(wù)的時(shí)間依據(jù),而之后會(huì)通過 CountDownLatch 來通知所有外部線程當(dāng)前 worker 工作線程已經(jīng)初始化完畢。之后的循環(huán)體便是當(dāng)時(shí)間輪持續(xù)生效的時(shí)間里的具體調(diào)度邏輯。時(shí)間刻度是時(shí)間輪的一個(gè)重要屬性,其默認(rèn)為 100ms,此處的循環(huán)間隔便是時(shí)間輪的時(shí)間刻度,默認(rèn)情況下就是間隔 100ms 進(jìn)行一次調(diào)度循環(huán)。工作線程會(huì)維護(hù)當(dāng)前工作線程具體循環(huán)了多少輪,用于定位具體執(zhí)行觸發(fā)時(shí)間輪數(shù)組上的哪一個(gè)位置上的鏈表。當(dāng)時(shí)間輪準(zhǔn)備 shutdown 的階段,最后的代碼會(huì)對(duì)未執(zhí)行的任務(wù)整理到未執(zhí)行的隊(duì)列中。
由此可見,worker 線程的 run()方法中基本定義了工作線程的整個(gè)生命周期,從初始的初始化到循環(huán)體中的具體調(diào)度,最后到未執(zhí)行任務(wù)的具體清理。整體的調(diào)度邏輯便主要在這里執(zhí)行。值得注意的是,在這里的前提下,每個(gè) HashedWheelTimer 時(shí)間輪都會(huì)有一個(gè)工作線程進(jìn)行調(diào)度,所以不需要在 netty 中在每一個(gè)連接中單獨(dú)使用一個(gè) HashedWheelTimer 來進(jìn)行定時(shí)任務(wù)的調(diào)度,否則可能將對(duì)性能產(chǎn)生影響。
向 HashedWheelTimer 加入一個(gè)定時(shí)任務(wù)的流程
當(dāng)調(diào)用 HashedWheelTimer 的 newTimeout()方法的時(shí)候,即是將定時(shí)任務(wù)加入時(shí)間輪中的 api。
@Overridepublic Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {if (task == null) {throw new NullPointerException("task");}if (unit == null) {throw new NullPointerException("unit");}start();long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout);return timeout;}
當(dāng)此次是首次向該時(shí)間輪加入定時(shí)任務(wù)的時(shí)候,將會(huì)通過 start()方法開始執(zhí)行上文所述的 worker 工作線程的啟動(dòng)與循環(huán)調(diào)度邏輯,這里暫且不提。之后計(jì)算定時(shí)任務(wù)觸發(fā)時(shí)間相對(duì)于時(shí)間輪初始化時(shí)間的相對(duì)時(shí)間間隔 deadline,并將其包裝為一個(gè)鏈表節(jié)點(diǎn) HashedWheelTimeout ,投入到 timeouts 隊(duì)列中,等待 worker 工作線程在下一輪調(diào)度循環(huán)中將其加入到時(shí)間輪的具體鏈表中等待觸發(fā)執(zhí)行,timeouts 的實(shí)現(xiàn)是一個(gè) mpsc 隊(duì)列,關(guān)于 mpsc 隊(duì)列可以查看此文,這里也符合多生產(chǎn)者單消費(fèi)者的隊(duì)列模型。
HashedWheelTimer 中工作線程的具體調(diào)度
do {final long deadline = waitForNextTick();if (deadline > 0) {transferTimeoutsToBuckets();HashedWheelBucket bucket =wheel[(int) (tick & mask)];bucket.expireTimeouts(deadline);tick++;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
在 HashedWheelTimer 中的工作線程 run()方法的主要循環(huán)中,主要分為三個(gè)步驟。
首先 worker 線程會(huì)通過?waitForNextTick()方法根據(jù)時(shí)間輪的時(shí)間刻度等待一輪循環(huán)的開始,在默認(rèn)情況下時(shí)間輪的時(shí)間刻度是 100ms,那么此處 worker 線程也將在這個(gè)方法中 sleep 相應(yīng)的時(shí)間等待下一輪循環(huán)的開始。此處也決定了時(shí)間輪的定時(shí)任務(wù)時(shí)間精度。
當(dāng) worker 線程經(jīng)過相應(yīng)時(shí)間間隔的 sleep 之后,也代表新的一輪調(diào)度開始。此時(shí),會(huì)通過?transferTimeoutsToBuckets()方法將之前剛剛加入到 timeouts 隊(duì)列中的定時(shí)任務(wù)放入到時(shí)間輪具體槽位上的鏈表中。
for (int i = 0; i < 100000; i++) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {// all processedbreak;}if (timeout.state() == HashedWheelTimeout.ST_CANCELLED|| !timeout.compareAndSetState(HashedWheelTimeout.ST_INIT, HashedWheelTimeout.ST_IN_BUCKET)) {timeout.remove();continue;}long calculated = timeout.deadline / tickDuration;long remainingRounds = (calculated - tick) / wheel.length;timeout.remainingRounds = remainingRounds;final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.int stopIndex = (int) (ticks & mask);HashedWheelBucket bucket = wheel[stopIndex];bucket.addTimeout(timeout);}
首先,在每一輪的調(diào)度中,最多只會(huì)從?timeouts?隊(duì)列中定位到時(shí)間輪 100000 個(gè)定時(shí)任務(wù),這也是為了防止在這里耗時(shí)過久導(dǎo)致后面觸發(fā)定時(shí)任務(wù)的延遲。在這里會(huì)不斷從 timeouts 隊(duì)列中獲取剛加入的定時(shí)任務(wù)。
具體的計(jì)算流程便是將定時(shí)任務(wù)相對(duì)于時(shí)間輪初始化時(shí)間的相對(duì)間隔與時(shí)間輪的時(shí)間刻度相除得到相對(duì)于初始化時(shí)間的具體輪數(shù),之后便在減去當(dāng)前輪數(shù)得到還需要遍歷幾遍整個(gè)時(shí)間輪數(shù)組得到 remainingRounds,最后將輪數(shù)與時(shí)間輪數(shù)組長度-1 相與,得到該定時(shí)任務(wù)到底應(yīng)該存放到時(shí)間輪上哪個(gè)位置的鏈表。
用具體的數(shù)組舉個(gè)例子,該時(shí)間輪初始化時(shí)間為 12 點(diǎn),時(shí)間刻度為 1 小時(shí),時(shí)間輪數(shù)組長度為 8,當(dāng)前時(shí)間 13 點(diǎn),當(dāng)向時(shí)間輪加入一個(gè)明天 13 點(diǎn)執(zhí)行的任務(wù)的時(shí)候,首先得到該任務(wù)相對(duì)于初始化的時(shí)間間隔是 25 小時(shí),也就是需要 25 輪調(diào)度,而當(dāng)前 13 點(diǎn),當(dāng)前調(diào)度輪數(shù)為 1,因此還需要 24 輪調(diào)度,就需要再遍歷 3 輪時(shí)間輪,因此 remainingRounds 為 3,再根據(jù) 25 與 8-1 相與的結(jié)果為 1,因此將該定時(shí)任務(wù)放置到時(shí)間輪數(shù)組下標(biāo)為 1 的鏈表上等待被觸發(fā)。
這便是一次完整的定時(shí)任務(wù)加入到時(shí)間輪具體位置的計(jì)算。
在 worker 線程的最后,就需要來具體執(zhí)行定時(shí)任務(wù)了,首先通過當(dāng)前循環(huán)輪數(shù)與時(shí)間輪數(shù)組長度-1 相與的結(jié)果定位具體觸發(fā)時(shí)間輪數(shù)組上哪個(gè)位置上的鏈表,再通過?expireTimeouts()方法依次對(duì)鏈表上的定時(shí)任務(wù)進(jìn)行觸發(fā)執(zhí)行。這里的流程就相對(duì)很簡單,鏈表上的節(jié)點(diǎn)如果 remainingRounds 小于等于 0,那么就可以直接執(zhí)行這個(gè)定時(shí)任務(wù),如果 remainingRounds 大于 0,那么顯然還沒有到達(dá)觸發(fā)的時(shí)間點(diǎn),則將其-1 等待下一輪的調(diào)度之后再進(jìn)行執(zhí)行。在繼續(xù)回到上面的例子,當(dāng) 14 點(diǎn)來臨之時(shí),此時(shí)工作線程將進(jìn)行第 2 輪的調(diào)度,將會(huì)把 2 與 8-1 進(jìn)行相與得到結(jié)果 2,那么當(dāng)前工作線程就會(huì)選擇時(shí)間輪數(shù)組下標(biāo)為 2 的鏈表依次判斷是否需要觸發(fā),如果 remainingRounds 為 0 將會(huì)直接觸發(fā),否則將會(huì)將 remainingRounds-1 等待下一輪的執(zhí)行。
全文完!
希望本文對(duì)大家有所幫助。如果感覺本文有幫助,有勞轉(zhuǎn)發(fā)或點(diǎn)一下“在看”!讓更多人收獲知識(shí)!
引用鏈接
[1]?tydhot:?https://github.com/tydhot
長按識(shí)別下圖二維碼,關(guān)注公眾號(hào)「Doocs 開源社區(qū)」,第一時(shí)間跟你們分享好玩、實(shí)用的技術(shù)文章與業(yè)內(nèi)最新資訊。
