<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          一口氣說出 6 種實現(xiàn)延時消息的方案

          共 4731字,需瀏覽 10分鐘

           ·

          2022-05-27 16:32

          來源:juejin.cn/post/7052894117105238053

          前言

          Hollis的新書限時折扣中,一本深入講解Java基礎(chǔ)的干貨筆記!

          延時消息(定時消息)指的在分布式異步消息場景下,生產(chǎn)端發(fā)送一條消息,希望在指定延時或者指定時間點被消費端消費到,而不是立刻被消費。

          延時消息適用的業(yè)務(wù)場景非常的廣泛,在分布式系統(tǒng)環(huán)境下,延時消息的功能一般會在下沉到中間件層,通常是 MQ 中內(nèi)置這個功能或者內(nèi)聚成一個公共基礎(chǔ)服務(wù)。

          本文旨在探討常見延時消息的實現(xiàn)方案以及方案設(shè)計的優(yōu)缺點。

          實現(xiàn)方案

          1.基于外部存儲實現(xiàn)的方案

          這里討論的外部存儲指的是在 MQ 本身自帶的存儲以外又引入的其他的存儲系統(tǒng)。

          基于外部存儲的方案本質(zhì)上都是一個套路,將 MQ 和 延時模塊 區(qū)分開來,延時消息模塊是一個獨立的服務(wù)/進程。延時消息先保留到其他存儲介質(zhì)中,然后在消息到期時再投遞到 MQ。

          當然還有一些細節(jié)性的設(shè)計,比如消息進入的延時消息模塊時已經(jīng)到期則直接投遞這類的邏輯,這里不展開討論。

          圖片

          下述方案不同的是,采用了不同的存儲系統(tǒng)。

          基于 數(shù)據(jù)庫(如MySQL)

          基于關(guān)系型數(shù)據(jù)庫(如MySQL)延時消息表的方式來實現(xiàn)。

          CREATE?TABLE?`delay_msg`?(
          ??`id`?bigint?unsigned?NOT?NULL?AUTO_INCREMENT,
          ??`delivery_time`?DATETIME?NOT?NULL?COMMENT?'投遞時間',
          ??`payloads`?blob?COMMENT?'消息內(nèi)容',
          ??PRIMARY?KEY?(`id`),
          ??KEY?`time_index`?(`delivery_time`)
          )

          通過定時線程定時掃描到期的消息,然后進行投遞。定時線程的掃描間隔理論上就是你延時消息的最小時間精度。

          優(yōu)點:

          • 實現(xiàn)簡單;

          缺點:

          • B+Tree索引不適合消息場景的大量寫入;

          基于 RocksDB

          RocksDB 的方案其實就是在上述方案上選擇了比較合適的存儲介質(zhì)。

          RocksDB 在筆者之前的文章中有聊過,LSM 樹更適合大量寫入的場景。滴滴開源的DDMQ中的延時消息模塊 Chronos 就是采用了這個方案。

          DDMQ 這個項目簡單來說就是在 RocketMQ 外面加了一層統(tǒng)一的代理層,在這個代理層就可以做一些功能維度的擴展。延時消息的邏輯就是代理層實現(xiàn)了對延時消息的轉(zhuǎn)發(fā),如果是延時消息,會先投遞到 RocketMQ 中 Chronos 專用的 topic 中。

          延時消息模塊 Chronos 消費得到延時消息轉(zhuǎn)儲到 RocksDB,后面就是類似的邏輯了,定時掃描到期的消息,然后往 RocketMQ 中投遞。

          圖片

          這個方案老實說是一個比較重的方案。因為基于 RocksDB 來實現(xiàn)的話,從數(shù)據(jù)可用性的角度考慮,你還需要自己去處理多副本的數(shù)據(jù)同步等邏輯。

          優(yōu)點:

          • RocksDB LSM 樹很適合消息場景的大量寫入;

          缺點:

          • 實現(xiàn)方案較重,如果你采用這個方案,需要自己實現(xiàn) RocksDB 的數(shù)據(jù)容災(zāi)邏輯;

          基于 Redis

          再來聊聊 Redis 的方案。下面放一個比較完善的方案。

          圖片
          • Messages Pool 所有的延時消息存放,結(jié)構(gòu)為KV結(jié)構(gòu),key為消息ID,value為一個具體的message(這里選擇Redis Hash結(jié)構(gòu)主要是因為hash結(jié)構(gòu)能存儲較大的數(shù)據(jù)量,數(shù)據(jù)較多時候會進行漸進式rehash擴容,并且對于HSET和HGET命令來說時間復(fù)雜度都是O(1))
          • Delayed Queue是16個有序隊列(隊列支持水平擴展),結(jié)構(gòu)為ZSET,value 為 messages pool中消息ID,score為過期時間(分為多個隊列是為了提高掃描的速度)
          • Worker 代表處理線程,通過定時任務(wù)掃描 Delayed Queue 中到期的消息

          這個方案選用 Redis 存儲在我看來有幾點考慮,

          • Redis ZSET 很適合實現(xiàn)延時隊列
          • 性能問題,雖然 ZSET 插入是一個 O(logn) 的操作,但是Redis 基于內(nèi)存操作,并且內(nèi)部做了很多性能方面的優(yōu)化。

          但是這個方案其實也有需要斟酌的地方,上述方案通過創(chuàng)建多個 Delayed Queue 來滿足對于并發(fā)性能的要求,但這也帶來了多個 Delayed Queue 如何在多個節(jié)點情況下均勻分配,并且很可能出現(xiàn)到期消息并發(fā)重復(fù)處理的情況,是否要引入分布式鎖之類的并發(fā)控制設(shè)計?

          在量不大的場景下,上述方案的架構(gòu)其實可以蛻化成主從架構(gòu),只允許主節(jié)點來處理任務(wù),從節(jié)點只做容災(zāi)備份。實現(xiàn)難度更低更可控。

          定時線程檢查的缺陷與改進

          上述幾個方案中,都通過線程定時掃描的方案來獲取到期的消息。

          定時線程的方案在消息量較少的時候,會浪費資源,在消息量非常多的時候,又會出現(xiàn)因為掃描間隔設(shè)置不合理導(dǎo)致延時時間不準確的問題??梢越柚?JDK Timer 類中的思想,通過?wait-notify?來節(jié)省 CPU 資源。

          獲取中最近的延時消息,然后wait(執(zhí)行時間-當前時間),這樣就不需要浪費資源到達時間時會自動響應(yīng),如果有新的消息進入,并且比我們等待的消息還要小,那么直接notify喚醒,重新獲取這個更小的消息,然后又wait,如此循環(huán)。

          2. 開源 MQ 中的實現(xiàn)方案

          再來講講目前自帶延時消息功能的開源MQ,它們是如何實現(xiàn)的

          RocketMQ

          RocketMQ 開源版本支持延時消息,但是只支持 18 個 Level 的延時,并不支持任意時間。只不過這個 Level 在 RocketMQ 中可以自定義的,所幸來說對普通業(yè)務(wù)算是夠用的。默認值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18個level。

          通俗的講,設(shè)定了延時 Level 的消息會被暫存在名為SCHEDULE_TOPIC_XXXX的topic中,并根據(jù) level 存入特定的queue,queueId = delayTimeLevel – 1,即一個queue只存相同延時的消息,保證具有相同發(fā)送延時的消息能夠順序消費。?broker會調(diào)度地消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的topic。

          下面是整個實現(xiàn)方案的示意圖,紅色代表投遞延時消息,紫色代表定時調(diào)度到期的延時消息:

          圖片

          優(yōu)點:

          • Level 數(shù)固定,每個 Level 有自己的定時器,開銷不大
          • 將 Level 相同的消息放入到同一個 Queue 中,保證了同一 Level 消息的順序性;不同 Level 放到不同的 Queue 中,保證了投遞的時間準確性;
          • 通過只支持固定的Level,將不同延時消息的排序變成了固定Level Topic 的追加寫操作

          缺點:

          • Level 配置的修改代價太大,固定 Level 不靈活
          • CommitLog 會因為延時消息的存在變得很大

          Pulsar

          Pulsar 支持“任意時間”的延時消息,但實現(xiàn)方式和 RocketMQ 不同。

          通俗的講,Pulsar 的延時消息會直接進入到客戶端發(fā)送指定的 Topic 中,然后在堆外內(nèi)存中創(chuàng)建一個基于時間的優(yōu)先級隊列,來維護延時消息的索引信息。延時時間最短的會放在頭上,時間越長越靠后。在進行消費邏輯時候,再判斷是否有到期需要投遞的消息,如果有就從隊列里面拿出,根據(jù)延時消息的索引查詢到對應(yīng)的消息進行消費。

          如果節(jié)點崩潰,在這個 broker 節(jié)點上的 Topics 會轉(zhuǎn)移到其他可用的 broker 上,上面提到的這個優(yōu)先級隊列也會被重建。

          下面是 Pulsar 公眾號中對于 Pulsar 延時消息的示意圖。

          圖片

          乍一看會覺得這個方案其實非常簡單,還能支持任意時間的消息。但是這個方案有幾個比較大的問題

          • 內(nèi)存開銷:?維護延時消息索引的隊列是放在堆外內(nèi)存中的,并且這個隊列是以訂閱組(Kafka中的消費組)為維度的,比如你這個 Topic 有 N 個訂閱組,那么如果你這個 Topic 使用了延時消息,就會創(chuàng)建 N 個 隊列;并且隨著延時消息的增多,時間跨度的增加,每個隊列的內(nèi)存占用也會上升。(是的,在這個方案下,支持任意的延時消息反而有可能讓這個缺陷更嚴重)
          • 故障轉(zhuǎn)移之后延時消息索引隊列的重建時間開銷:?對于跨度時間長的大規(guī)模延時消息,重建時間可能會到小時級別。(摘自 Pulsar 官方公眾號文章)
          • 存儲開銷:?延時消息的時間跨度會影響到 Pulsar 中已經(jīng)消費的消息數(shù)據(jù)的空間回收。打個比方,你的 Topic 如果業(yè)務(wù)上要求支持一個月跨度的延時消息,然后你發(fā)了一個延時一個月的消息,那么你這個 Topic 中底層的存儲就會保留整整一個月的消息數(shù)據(jù),即使這一個月中99%的正常消息都已經(jīng)消費了。

          對于前面第一點和第二點的問題,社區(qū)也設(shè)計了解決方案,在隊列中加入時間分區(qū),Broker 只加載當前較近的時間片的隊列到內(nèi)存,其余時間片分區(qū)持久化磁盤,示例圖如下圖所示:

          圖片

          但是目前,這個方案并沒有對應(yīng)的實現(xiàn)版本??梢栽趯嶋H使用時,規(guī)定只能使用較小時間跨度的延時消息,來減少前兩點缺陷的影響。另外,因為內(nèi)存中存的并不是延時消息的全量數(shù)據(jù),只是索引,所以可能要積壓上百萬條延時消息才可能對內(nèi)存造成顯著影響,從這個角度來看,官方暫時沒有完善前兩個問題也可以理解了。

          至于第三個問題,估計是比較難解決的,需要在數(shù)據(jù)存儲層將延時消息和正常消息區(qū)分開來,單獨存儲延時消息。

          QMQ

          QMQ提供任意時間的延時/定時消息,你可以指定消息在未來兩年內(nèi)(可配置)任意時間內(nèi)投遞。

          把 QMQ 放到最后,是因為我覺得 QMQ 是目前開源 MQ 中延時消息設(shè)計最合理的。里面設(shè)計的核心簡單來說就是 多級時間輪 + 延時加載 + 延時消息單獨磁盤存儲。

          QMQ的延時/定時消息使用的是兩層 hash wheel 來實現(xiàn)的。第一層位于磁盤上,每個小時為一個刻度(默認為一個小時一個刻度,可以根據(jù)實際情況在配置里進行調(diào)整),每個刻度會生成一個日志文件(schedule log),因為QMQ支持兩年內(nèi)的延時消息(默認支持兩年內(nèi),可以進行配置修改),則最多會生成?2 * 366 * 24 = 17568?個文件(如果需要支持的最大延時時間更短,則生成的文件更少)。

          第二層在內(nèi)存中,當消息的投遞時間即將到來的時候,會將這個小時的消息索引(索引包括消息在schedule log中的offset和size)從磁盤文件加載到內(nèi)存中的hash wheel上,內(nèi)存中的hash wheel則是以500ms為一個刻度。

          圖片

          總結(jié)一下設(shè)計上的亮點:

          • 時間輪算法適合延時/定時消息的場景,省去延時消息的排序,插入刪除操作都是 O(1) 的時間復(fù)雜度;
          • 通過多級時間輪設(shè)計,支持了超大時間跨度的延時消息;
          • 通過延時加載,內(nèi)存中只會有最近要消費的消息,更久的延時消息會被存儲在磁盤中,對內(nèi)存友好;
          • 延時消息單獨存儲(schedule log),不會影響到正常消息的空間回收;

          總結(jié)

          本文匯總了目前業(yè)界常見的延時消息方案,并且討論了各個方案的優(yōu)缺點。希望對讀者有所啟發(fā)。


          我的新書《深入理解Java核心技術(shù)》已經(jīng)上市了,上市后一直蟬聯(lián)京東暢銷榜中,目前正在6折優(yōu)惠中,想要入手的朋友千萬不要錯過哦~長按二維碼即可購買~


          長按掃碼享受6折優(yōu)惠





          往期推薦

          “入侵火狐只花了8秒”


          SpringBoot + Redis 實現(xiàn)接口限流,一個注解的事!


          屢次讓拳頭翻車的ping:作者因車禍英年早逝,千行源碼改變世界




          有道無術(shù),術(shù)可成;有術(shù)無道,止于術(shù)

          歡迎大家關(guān)注Java之道公眾號


          好文章,我在看??

          瀏覽 36
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  97xxxxx | 色婷婷综合久色aⅴ五区最新 | 最新最近日本中文字幕不亚洲 | 日韩A视频 | 亚洲视频日韩精彩动漫一区二区 |