<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          面試題:讓你設(shè)計一個延時隊列,說說你的思路

          共 5010字,需瀏覽 11分鐘

           ·

          2022-01-14 22:10

          點擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”


          回復(fù)”學(xué)習(xí)資料“獲取學(xué)習(xí)寶典

          【文章來源】https://sourl.cn/pcgvTp

          項目背景





          延遲隊列,它是一種帶有延遲功能的消息隊列,目前工作中有幾處需延時處理的應(yīng)用場景。


          可選技術(shù)參考




          kafka

          考慮前提:由于項目代碼與業(yè)務(wù)方交互大多采用 kafka,所以想是否能自己集成一個 kafka 延遲隊列,直接提供延遲功能,更方便使用。

          大致思路:借鑒 rocketMQ 延遲隊列設(shè)計思想,創(chuàng)建多個topic 用于處理不同的延遲消息,例如延遲一分鐘的任務(wù)消息,讓 topic為 delay-minutes-1 進(jìn)行處理。

          • 發(fā)送延遲消息時不直接發(fā)到目標(biāo)topic,而是發(fā)到一個用于處理延遲消息的topic,例如 delay-minutes-1

          • 寫一段代碼定時拉取 delay-minutes-1 中的消息,將滿足的消息發(fā)到真正的目標(biāo)主題里。

          流程圖:

          解決問題:如何讓延遲消息等待一段時間才發(fā)送到真正的topic里面?

          答:KafkaConsumer 提供了暫停和恢復(fù)的 API 函數(shù),當(dāng)消費者發(fā)現(xiàn)不滿足消費時間條件時,可以先暫停消費者,并把消費偏移量移動到上次位置,進(jìn)行等待下次消費。

          缺點:kafka內(nèi)部改造復(fù)雜度較高,由于要使 consumer 進(jìn)行 pause,還需要額外的做一些健康檢查操作,在狀態(tài)不對時可以報警或者重啟。另外,不支持靈活設(shè)置延時時間。

          rocketMQ

          考慮前提:底層代碼已經(jīng)全部封裝好,直接使用,不用關(guān)心底層代碼,可以實現(xiàn)與業(yè)務(wù)進(jìn)行解藕。

          大致原理思路:

          • RocketMQ將延時隊列的延時時間分為 18 個級別,在發(fā)送 MQ消息的時候只需要設(shè)置 delayLevel,把每種延遲時間段的消息放到同一個隊列中

          • 通過一個定時器進(jìn)行輪詢這些隊列,查看消息是否到期

          流程圖:

          缺點:

          • 使用中間件,盡可能的需要熟讀底層源碼,以便后續(xù)出現(xiàn)問題,快速跟蹤定位。還有能找到適合的擴展點。

          • 定時器采用的timer是單線程運行,如果延遲消息數(shù)量很大的話,可能造成消息到期也沒有發(fā)送出去的情況。

          redis

          考慮前提:Redisson延時隊列,代碼redis已經(jīng)封裝好,可以直接拿來用。redisson.getBlockingQueue() 和 Redission.getDelayQueue()

          大致原理思路:https://zhuanlan.zhihu.com/p/343811173

          三個核心集合結(jié)構(gòu):

          延時隊列:數(shù)據(jù)入隊的隊列

          目標(biāo) blocking 隊列 :到期數(shù)據(jù)待consume

          timeoutSet 過期時間zset:分?jǐn)?shù)值為timeout,輔佐判斷元素是否過期。?

          實現(xiàn) Timer :

          運用了 redis 的 sub/pub 功能,當(dāng)有數(shù)據(jù)put的時候,先把它放到一個zset集合,同時發(fā)布訂閱的key,發(fā)布內(nèi)容為數(shù)據(jù)到期的timeout,此時客戶端開啟了一個延時任務(wù)(HashedWheelTimer),到了時間,從zset分頁取出到期了的數(shù)據(jù),放入 blocking 隊列中。

          缺點:

          • 采用 sub/pub 機制的時候,可能會造成多個客戶端同時開啟一個時間段的延時任務(wù),重復(fù)執(zhí)行,也會有并發(fā)的安全問題,因為涉及的要數(shù)據(jù)加入阻塞隊列,和將當(dāng)前數(shù)據(jù)從zset移除操作。

          • 默認(rèn)是數(shù)據(jù)量小的時候比較穩(wěn)定,數(shù)據(jù)量一大就需要構(gòu)建 cluster模式,這一塊需要自己開發(fā)


          基于Redisson方案進(jìn)行改造思路




          有贊的延時隊列

          https://tech.youzan.com/queuing_delay/

          實現(xiàn)邏輯圖

          各個組件含義:

          job :需要異步處理的任務(wù),是最基本單元,其中屬性包含,自定義唯一jobid,topic任務(wù)類型,delayTime任務(wù)執(zhí)行時間,ttrtime執(zhí)行超時時間,message具體消息內(nèi)容。?

          job pool :用來存放Job 的原信息,是個 map結(jié)構(gòu)

          Delay Bucket :一組以時間為維度的有序隊列(這里只存放 job Id),bucket的數(shù)據(jù)結(jié)構(gòu)就是redis的zset,將其分為多個bucket是為了提高掃描速度,降低消息延遲

          Timer: 實時掃描各個 Bucket,并將delay時間小于等于當(dāng)前時間的job放入到對應(yīng)的 Ready Queue。

          *?自己實現(xiàn)中,此處的Ready Queue?替換一個共同的kafka topic出口:存放處于Ready狀態(tài)的Job,以供客戶端消費程序消費。timer 到時間直接發(fā)送到 kafka ?

          對比 Redisson 改動點

          • 去除原有redisson 延時隊列 sub/pub實現(xiàn)timer思路,采用輪詢 zset 頭部節(jié)點,判斷是否已到過期時間進(jìn)行判斷。

          • 加入線程池概念,加快消息處理,減少延時消息時間誤差。

          • cluster 模式,可用 redis 的 setnx命令實現(xiàn)簡單的分布式鎖,以保證集群中每次只有一個timer thread執(zhí)行。

          個人改動點

          • 做成通用性服務(wù),提供統(tǒng)一的push topic,和統(tǒng)一的pull topic

          整體執(zhí)行流程:

          • 各個業(yè)務(wù)方把任務(wù)發(fā)給入口topic,生成延遲任務(wù),放入某個桶

          • 定時器時刻輪詢各個桶,當(dāng)時間到達(dá),發(fā)送消息任務(wù)到Kafka

          • 消費端可以從?Kafka?共同出口中取到任務(wù),做相應(yīng)的業(yè)務(wù)邏輯

          • 出口topic接收到消息,Kafka確認(rèn)應(yīng)答一次,保證消息不丟失?

          微服務(wù)延時隊列整體架構(gòu)圖

          例子:

          kafka 共同入口 delay_entrance_topic 格式:

          屬性類型是否必須含義
          realTopicNamestring業(yè)務(wù)類型,真實投遞到的topic
          delayTimelong任務(wù)延時時間
          messagestring具體消息內(nèi)容,json字符串

          kafka 共同出口 delay_exit_topic 格式:

          屬性類型是否必須含義
          delayJobIdlong發(fā)送到kafka時,發(fā)送成功應(yīng)答時需取這個字段進(jìn)行后續(xù)操作,業(yè)務(wù)方可不關(guān)注
          realTopicNamestring業(yè)務(wù)類型,真實投遞到的topic,各個業(yè)務(wù)進(jìn)行過濾
          messagestring具體消息內(nèi)容,json字符串

          擴展點




          • 減少延時時間誤差,使用線程池加快輪訓(xùn)判斷時間到期
          • cluster模式,防止其中一臺服務(wù)器掛了無法使用,高可用設(shè)計,使用定時器維護(hù)路由
          • cluseter模式中,timer 代碼邏輯需要設(shè)置分布式鎖,防止多臺服務(wù)器同時執(zhí)行
          • 消息可靠性:保證至少被消費一次,消費不成功,未應(yīng)答,會重新投遞一次。

          可能產(chǎn)生的問題



          消息持久化問題:基于Redis自身的持久化特性,如果Redis數(shù)據(jù)丟失,意味著延遲消息的丟失,不過可以做主備和集群保證。這個可以考慮后續(xù)優(yōu)化將消息持久化到MangoDB中。


          其他延時隊列思路




          Netty 時間輪

          HashedWheelTimer 流程圖

          tickDuration: 每個格子的時間大小,每次轉(zhuǎn)動的時間

          ticksPerWheel:時間輪數(shù)組大小

          HashedWheelBucket:數(shù)組,記錄 header,tail

          HashedWheelTimeOut: 延時任務(wù)載體,放于Bucket 數(shù)組中,屬性有:前后指針,round 數(shù)等

          如果把時間輪看作一個map,那么 tickPerWheel 就為map的size,時間輪開始的時候,會設(shè)置一個 startTime,即每ticket都可算出延時時間,也就是 map 的key,value 為bucket。

          核心代碼,線程 for循環(huán),校驗 此刻的 bucket的鏈表是否到了執(zhí)行時間,到了就立即執(zhí)行,且 ticket+1,往下走。沒有則會sleep一會兒。

          ????????????long?deadline?=?tickDuration?*?(tick?+?1);
          for (;;) { // 相對時間 final long currentTime = System.nanoTime() - startTime;
          long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
          // <=0 說明可以撥動時鐘了 if (sleepTimeMs <= 0) { if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { return currentTime; } }

          // 這里是為了兼容 Windows 平臺 if (PlatformDependent.isWindows()) { sleepTimeMs = sleepTimeMs / 10 * 10; }
          try { Thread.sleep(sleepTimeMs); } catch (InterruptedException ignored) { if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } }????????}

          kafka 時間輪

          在普通時間輪的基礎(chǔ)上,以空間換時間的思路,用 DelayQueue 去存儲每個 Bucket,DelayQueue 內(nèi)部有個 PriorityQueue,以每個bucket的延時時間進(jìn)行大小排序,隊首的bucket就為將要執(zhí)行的任務(wù),如果到期了,則可以直接取出執(zhí)行,未到則阻塞。依次循環(huán)取空優(yōu)先隊列。

          其中的比對時間到期,交給底層api去做,Condition.awaitNanos() -> parkNanos() 核心代碼:

          private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)
          /* * Advances the clock if there is an expired bucket. If there isn't any expired bucket when called, * waits up to timeoutMs before giving up. */def advanceClock(timeoutMs: Long): Boolean = { var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) if (bucket != null) { writeLock.lock() try { while (bucket != null) { //驅(qū)動時間輪 timingWheel.advanceClock(bucket.getExpiration()) //循環(huán)buckek也就是任務(wù)列表,任務(wù)列表一個個繼續(xù)添加進(jìn)時間輪以此來升級或者降級時間輪,把過期任務(wù)找出來執(zhí)行 bucket.flush(reinsert) //這里就是從延遲隊列取出bucket,bucket是有延遲時間的,取出代表該bucket過期,通過bucket能取到bucket包含的任務(wù)列表 bucket = delayQueue.poll() } } finally { writeLock.unlock() } true } else { false }}

          小問題

          對于時間計算方面的問題,底層系統(tǒng)提供的api為什么效率更低呢?

          它應(yīng)該也是循環(huán)檢查到期時間,看到有的同學(xué)說,更推薦使用底層api,原理是一樣的,它為什么就比放在外面要好些呢?如果有知道的同學(xué),也可在評論區(qū)告訴作者,感恩!

          XXL_JOB

          主要有兩個線程:scheduleThread 負(fù)責(zé)把 5s 之后要執(zhí)行的任務(wù),從 db 中掃出來,放到 時間輪 容器中。

          ringThread 負(fù)責(zé)把時針指向的每個到期的任務(wù)鏈表,交由快慢線程,rpc調(diào)用指給調(diào)度器執(zhí)行。

          分布式任務(wù)調(diào)度,多個執(zhí)行器。任務(wù)持久化,任務(wù)統(tǒng)一先入庫,延時也是用的傳統(tǒng)時間輪。?


          總 結(jié)



          兩個非常核心的問題:

          • 一定先給所有的延時任務(wù)排序

          • 比對時間問題,到了任務(wù)執(zhí)行時間取出來

          ?排序找到到期job
          RocektMQ指定level,類似桶排序for 循環(huán)
          HashedWheelTimer數(shù)組,桶排序for 循環(huán)
          kafka 時間輪堆排序,PriorityQueue底層api實現(xiàn),Condition.awaitNanos()-> parkNanos()
          Redisson 延時隊列Zset 跳表實現(xiàn)先是 sub/pub 訂閱功能,客戶端到期從zset中拿數(shù)據(jù),用的是 HashedWheelTimer
          基于有贊延時隊列Zset 跳表實現(xiàn)for循環(huán)遍歷,開啟多個線程,每個bucket一個線程

          ?所以,如果想自己設(shè)計一個延時隊列,關(guān)鍵是確定這兩個核心問題怎么解決,其余的根據(jù)自己的業(yè)務(wù)場景進(jìn)行調(diào)整吧。


          巨人肩膀



          • https://juejin.cn/post/6845166891225317384

          • https://juejin.cn/post/6910068006244581390
          • https://juejin.cn/post/6976412313981026318

          -------------? END??-------------
          掃描下方二維碼,加入技術(shù)群。暗號:加群

          瀏覽 45
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  影音先锋欧美资源 | 国产精品秘 久久久久久99 | 中文字幕第2页在线观看 | 亚洲欧美日本一区 | 青青草天天搞 |