面試題:讓你設(shè)計一個延時隊列,說說你的思路
點擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”

項目背景
延遲隊列,它是一種帶有延遲功能的消息隊列,目前工作中有幾處需延時處理的應(yīng)用場景。
可選技術(shù)參考
kafka
考慮前提:由于項目代碼與業(yè)務(wù)方交互大多采用 kafka,所以想是否能自己集成一個 kafka 延遲隊列,直接提供延遲功能,更方便使用。
大致思路:借鑒 rocketMQ 延遲隊列設(shè)計思想,創(chuàng)建多個topic 用于處理不同的延遲消息,例如延遲一分鐘的任務(wù)消息,讓 topic為 delay-minutes-1 進(jìn)行處理。
發(fā)送延遲消息時不直接發(fā)到目標(biāo)topic,而是發(fā)到一個用于處理延遲消息的topic,例如 delay-minutes-1
寫一段代碼定時拉取 delay-minutes-1 中的消息,將滿足的消息發(fā)到真正的目標(biāo)主題里。
流程圖:

解決問題:如何讓延遲消息等待一段時間才發(fā)送到真正的topic里面?
答:KafkaConsumer 提供了暫停和恢復(fù)的 API 函數(shù),當(dāng)消費者發(fā)現(xiàn)不滿足消費時間條件時,可以先暫停消費者,并把消費偏移量移動到上次位置,進(jìn)行等待下次消費。
缺點:kafka內(nèi)部改造復(fù)雜度較高,由于要使 consumer 進(jìn)行 pause,還需要額外的做一些健康檢查操作,在狀態(tài)不對時可以報警或者重啟。另外,不支持靈活設(shè)置延時時間。
rocketMQ
考慮前提:底層代碼已經(jīng)全部封裝好,直接使用,不用關(guān)心底層代碼,可以實現(xiàn)與業(yè)務(wù)進(jìn)行解藕。
大致原理思路:
RocketMQ將延時隊列的延時時間分為 18 個級別,在發(fā)送 MQ消息的時候只需要設(shè)置 delayLevel,把每種延遲時間段的消息放到同一個隊列中
通過一個定時器進(jìn)行輪詢這些隊列,查看消息是否到期
流程圖:

缺點:
使用中間件,盡可能的需要熟讀底層源碼,以便后續(xù)出現(xiàn)問題,快速跟蹤定位。還有能找到適合的擴展點。
定時器采用的timer是單線程運行,如果延遲消息數(shù)量很大的話,可能造成消息到期也沒有發(fā)送出去的情況。
redis
考慮前提:Redisson延時隊列,代碼redis已經(jīng)封裝好,可以直接拿來用。redisson.getBlockingQueue() 和 Redission.getDelayQueue()
大致原理思路:https://zhuanlan.zhihu.com/p/343811173
三個核心集合結(jié)構(gòu):
延時隊列:數(shù)據(jù)入隊的隊列
目標(biāo) blocking 隊列 :到期數(shù)據(jù)待consume
timeoutSet 過期時間zset:分?jǐn)?shù)值為timeout,輔佐判斷元素是否過期。?
實現(xiàn) Timer :
運用了 redis 的 sub/pub 功能,當(dāng)有數(shù)據(jù)put的時候,先把它放到一個zset集合,同時發(fā)布訂閱的key,發(fā)布內(nèi)容為數(shù)據(jù)到期的timeout,此時客戶端開啟了一個延時任務(wù)(HashedWheelTimer),到了時間,從zset分頁取出到期了的數(shù)據(jù),放入 blocking 隊列中。
缺點:
采用 sub/pub 機制的時候,可能會造成多個客戶端同時開啟一個時間段的延時任務(wù),重復(fù)執(zhí)行,也會有并發(fā)的安全問題,因為涉及的要數(shù)據(jù)加入阻塞隊列,和將當(dāng)前數(shù)據(jù)從zset移除操作。
默認(rèn)是數(shù)據(jù)量小的時候比較穩(wěn)定,數(shù)據(jù)量一大就需要構(gòu)建 cluster模式,這一塊需要自己開發(fā)
基于Redisson方案進(jìn)行改造思路
有贊的延時隊列
https://tech.youzan.com/queuing_delay/
實現(xiàn)邏輯圖

各個組件含義:
job :需要異步處理的任務(wù),是最基本單元,其中屬性包含,自定義唯一jobid,topic任務(wù)類型,delayTime任務(wù)執(zhí)行時間,ttrtime執(zhí)行超時時間,message具體消息內(nèi)容。?
job pool :用來存放Job 的原信息,是個 map結(jié)構(gòu)
Delay Bucket :一組以時間為維度的有序隊列(這里只存放 job Id),bucket的數(shù)據(jù)結(jié)構(gòu)就是redis的zset,將其分為多個bucket是為了提高掃描速度,降低消息延遲
Timer: 實時掃描各個 Bucket,并將delay時間小于等于當(dāng)前時間的job放入到對應(yīng)的 Ready Queue。
*?自己實現(xiàn)中,此處的Ready Queue?替換一個共同的kafka topic出口:存放處于Ready狀態(tài)的Job,以供客戶端消費程序消費。timer 到時間直接發(fā)送到 kafka ?
對比 Redisson 改動點
去除原有redisson 延時隊列 sub/pub實現(xiàn)timer思路,采用輪詢 zset 頭部節(jié)點,判斷是否已到過期時間進(jìn)行判斷。
加入線程池概念,加快消息處理,減少延時消息時間誤差。
cluster 模式,可用 redis 的 setnx命令實現(xiàn)簡單的分布式鎖,以保證集群中每次只有一個timer thread執(zhí)行。
個人改動點
做成通用性服務(wù),提供統(tǒng)一的push topic,和統(tǒng)一的pull topic
整體執(zhí)行流程:
各個業(yè)務(wù)方把任務(wù)發(fā)給入口topic,生成延遲任務(wù),放入某個桶
定時器時刻輪詢各個桶,當(dāng)時間到達(dá),發(fā)送消息任務(wù)到Kafka
消費端可以從?Kafka?共同出口中取到任務(wù),做相應(yīng)的業(yè)務(wù)邏輯
出口topic接收到消息,Kafka確認(rèn)應(yīng)答一次,保證消息不丟失?
微服務(wù)延時隊列整體架構(gòu)圖

例子:
kafka 共同入口 delay_entrance_topic 格式:
| 屬性 | 類型 | 是否必須 | 含義 |
|---|---|---|---|
| realTopicName | string | 是 | 業(yè)務(wù)類型,真實投遞到的topic |
| delayTime | long | 是 | 任務(wù)延時時間 |
| message | string | 是 | 具體消息內(nèi)容,json字符串 |
kafka 共同出口 delay_exit_topic 格式:
| 屬性 | 類型 | 是否必須 | 含義 |
|---|---|---|---|
| delayJobId | long | 是 | 發(fā)送到kafka時,發(fā)送成功應(yīng)答時需取這個字段進(jìn)行后續(xù)操作,業(yè)務(wù)方可不關(guān)注 |
| realTopicName | string | 是 | 業(yè)務(wù)類型,真實投遞到的topic,各個業(yè)務(wù)進(jìn)行過濾 |
| message | string | 是 | 具體消息內(nèi)容,json字符串 |
擴展點
減少延時時間誤差,使用線程池加快輪訓(xùn)判斷時間到期 cluster模式,防止其中一臺服務(wù)器掛了無法使用,高可用設(shè)計,使用定時器維護(hù)路由 cluseter模式中,timer 代碼邏輯需要設(shè)置分布式鎖,防止多臺服務(wù)器同時執(zhí)行 消息可靠性:保證至少被消費一次,消費不成功,未應(yīng)答,會重新投遞一次。
可能產(chǎn)生的問題
消息持久化問題:基于Redis自身的持久化特性,如果Redis數(shù)據(jù)丟失,意味著延遲消息的丟失,不過可以做主備和集群保證。這個可以考慮后續(xù)優(yōu)化將消息持久化到MangoDB中。
其他延時隊列思路
Netty 時間輪
HashedWheelTimer 流程圖

tickDuration: 每個格子的時間大小,每次轉(zhuǎn)動的時間
ticksPerWheel:時間輪數(shù)組大小
HashedWheelBucket:數(shù)組,記錄 header,tail
HashedWheelTimeOut: 延時任務(wù)載體,放于Bucket 數(shù)組中,屬性有:前后指針,round 數(shù)等
如果把時間輪看作一個map,那么 tickPerWheel 就為map的size,時間輪開始的時候,會設(shè)置一個 startTime,即每ticket都可算出延時時間,也就是 map 的key,value 為bucket。
核心代碼,線程 for循環(huán),校驗 此刻的 bucket的鏈表是否到了執(zhí)行時間,到了就立即執(zhí)行,且 ticket+1,往下走。沒有則會sleep一會兒。
????????????long?deadline?=?tickDuration?*?(tick?+?1);for (;;) {// 相對時間final long currentTime = System.nanoTime() - startTime;long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;// <=0 說明可以撥動時鐘了if (sleepTimeMs <= 0) {if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}}// 這里是為了兼容 Windows 平臺if (PlatformDependent.isWindows()) {sleepTimeMs = sleepTimeMs / 10 * 10;}try {Thread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}}}????????}
kafka 時間輪
在普通時間輪的基礎(chǔ)上,以空間換時間的思路,用 DelayQueue 去存儲每個 Bucket,DelayQueue 內(nèi)部有個 PriorityQueue,以每個bucket的延時時間進(jìn)行大小排序,隊首的bucket就為將要執(zhí)行的任務(wù),如果到期了,則可以直接取出執(zhí)行,未到則阻塞。依次循環(huán)取空優(yōu)先隊列。
其中的比對時間到期,交給底層api去做,Condition.awaitNanos() -> parkNanos() 核心代碼:
private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)/** Advances the clock if there is an expired bucket. If there isn't any expired bucket when called,* waits up to timeoutMs before giving up.*/def advanceClock(timeoutMs: Long): Boolean = {var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)if (bucket != null) {writeLock.lock()try {while (bucket != null) {//驅(qū)動時間輪timingWheel.advanceClock(bucket.getExpiration())//循環(huán)buckek也就是任務(wù)列表,任務(wù)列表一個個繼續(xù)添加進(jìn)時間輪以此來升級或者降級時間輪,把過期任務(wù)找出來執(zhí)行bucket.flush(reinsert)//這里就是從延遲隊列取出bucket,bucket是有延遲時間的,取出代表該bucket過期,通過bucket能取到bucket包含的任務(wù)列表bucket = delayQueue.poll()}} finally {writeLock.unlock()}true} else {false}}
小問題
對于時間計算方面的問題,底層系統(tǒng)提供的api為什么效率更低呢?
它應(yīng)該也是循環(huán)檢查到期時間,看到有的同學(xué)說,更推薦使用底層api,原理是一樣的,它為什么就比放在外面要好些呢?如果有知道的同學(xué),也可在評論區(qū)告訴作者,感恩!
XXL_JOB
主要有兩個線程:scheduleThread 負(fù)責(zé)把 5s 之后要執(zhí)行的任務(wù),從 db 中掃出來,放到 時間輪 容器中。
ringThread 負(fù)責(zé)把時針指向的每個到期的任務(wù)鏈表,交由快慢線程,rpc調(diào)用指給調(diào)度器執(zhí)行。
分布式任務(wù)調(diào)度,多個執(zhí)行器。任務(wù)持久化,任務(wù)統(tǒng)一先入庫,延時也是用的傳統(tǒng)時間輪。?
總 結(jié)
兩個非常核心的問題:
一定先給所有的延時任務(wù)排序
比對時間問題,到了任務(wù)執(zhí)行時間取出來
| ? | 排序 | 找到到期job |
|---|---|---|
| RocektMQ | 指定level,類似桶排序 | for 循環(huán) |
| HashedWheelTimer | 數(shù)組,桶排序 | for 循環(huán) |
| kafka 時間輪 | 堆排序,PriorityQueue | 底層api實現(xiàn),Condition.awaitNanos()-> parkNanos() |
| Redisson 延時隊列 | Zset 跳表實現(xiàn) | 先是 sub/pub 訂閱功能,客戶端到期從zset中拿數(shù)據(jù),用的是 HashedWheelTimer |
| 基于有贊延時隊列 | Zset 跳表實現(xiàn) | for循環(huán)遍歷,開啟多個線程,每個bucket一個線程 |
?所以,如果想自己設(shè)計一個延時隊列,關(guān)鍵是確定這兩個核心問題怎么解決,其余的根據(jù)自己的業(yè)務(wù)場景進(jìn)行調(diào)整吧。
巨人肩膀
https://juejin.cn/post/6845166891225317384
https://juejin.cn/post/6910068006244581390 https://juejin.cn/post/6976412313981026318
