<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          基于Redis實現(xiàn)延時隊列服務

          共 2467字,需瀏覽 5分鐘

           ·

          2022-01-23 17:57

          一、背景

          在業(yè)務發(fā)展過程中,會出現(xiàn)一些需要延時處理的場景,比如:
          1. 訂單下單之后超過30分鐘用戶未支付,需要取消訂單

          2. 訂單一些評論,如果48h用戶未對商家評論,系統(tǒng)會自動產(chǎn)生一條默認評論

          3. 點我達訂單下單后,超過一定時間訂單未派出,需要超時取消訂單等。。

          處理這類需求,比較直接簡單的方式就是定時任務輪訓掃表。這種處理方式在數(shù)據(jù)量不大的場景下是完全沒問題,但是當數(shù)據(jù)量大的時候高頻的輪訓數(shù)據(jù)庫就會比較的耗資源,導致數(shù)據(jù)庫的慢查或者查詢超時。所以在處理這類需求時候,采用了延時隊列來完成。

          二、幾種延時隊列

          延時隊列就是一種帶有延遲功能的消息隊列。下面會介紹幾種目前已有的延時隊列:
          1.Java中java.util.concurrent.DelayQueue
          優(yōu)點:JDK自身實現(xiàn),使用方便,量小適用
          缺點:隊列消息處于jvm內(nèi)存,不支持分布式運行和消息持久化
          2.Rocketmq延時隊列
          優(yōu)點:消息持久化,分布式
          缺點:不支持任意時間精度,只支持特定level的延時消息
          3.Rabbitmq延時隊列(TTL+DLX實現(xiàn))
          優(yōu)點:消息持久化,分布式
          缺點:延時相同的消息必須扔在同一個隊列
          根據(jù)自身業(yè)務和公司情況,如果實現(xiàn)一個自己的延時隊列服務需要考慮一下幾點:
          * 消息存儲
          * 過期延時消息實時獲取
          * 高可用性


          三、?基于Redis實現(xiàn)

          1.0版本

          • 功能特性

          * 消息可靠性,消息持久化,消息至少被消費一次
          * 實時性:存在一定的時間誤差(定時任務間隔)
          * 支持指定消息remove
          * 高可用性
          • 整體結構

          - Messages Pool所有的延時消息存放,結構為KV結構,key為消息ID,value為一個具體的message(這里選擇Redis Hash結構主要是因為hash結構能存儲較大的數(shù)據(jù)量,數(shù)據(jù)較多時候會進行漸進式rehash擴容,并且對于HSET和HGET命令來說時間復雜度都是O(1))
          - Delayed Queue是16個有序隊列(隊列支持水平擴展),結構為ZSET,value為messages pool中消息ID,score為過期時間(分為多個隊列是為了提高掃描的速度)
          - Timed Task定時任務,負責掃描處理每個隊列過期消息
          • ?消息結構

          每個延時消息必須包括以下參數(shù):
          * tags:消息過期之后發(fā)送mq的tags
          * keys:消息過期之后發(fā)送mq的keys
          * body:消息過期之后發(fā)送mq的body,提供給消費這做具體的消息處理
          * delayTime:延時發(fā)送時間(默認,delayTime、expectDate有一個即可)
          * expectDate:期望發(fā)送時間
          • 流程


          注:上圖1、2、3或者2、3是一個事務操作
          取出過期消息過程是通過一個外部定時任務每隔1min分鐘去查詢隊列中過期的消息,然后發(fā)送mq && remove

          2.0版本

          1.0上有一個可改進的地方就是隊列中過期的消息是通過定時任務觸發(fā)查詢。所有有了2.0
          2.0版本在1.0上做了一個優(yōu)化,廢棄掉了1min定時任務觸發(fā)過期消息發(fā)送,采用了java Lock await/singlal方式實現(xiàn)過期消息的實時發(fā)送低延時
          • 多節(jié)點部署結構:

          - pull job:這里分別為每一個隊列創(chuàng)建了一個pull job thread,功能很簡單,就是負責去隊列中拉取過期的消息數(shù)據(jù)(這里保證一個隊列有且只有一個pull job)
          - worker:pull job拉取到的過期消息會交給一個worker thread去處理,這樣的好處是處理過期的消息實時性更高(pull job不必等去除過期消息全部處理完成在繼續(xù)去拉取新的過期數(shù)據(jù))
          - zookeeper coordinate:通過zk的操作來完成對隊列的重新分配工作,daemon thread監(jiān)聽zk節(jié)點的創(chuàng)建和刪除
          • 主要流程:

          服務啟動會注冊zk,獲取分配處理的queues,啟動后臺線程監(jiān)聽zk 。
          為每個分配queue創(chuàng)建一個pull job 。
          pull job首先會去queue中查詢是否有過期消息:
          ??? Y:將取出消息交給worker處理
          ???? N:查詢queue中最后一個成員(zset結構默認按score遞增排序),如果為空,則await;不為空則await(成員score-System.currentTimeMillis())
          由于過期消息發(fā)送成功才會從隊列中remove,所以pull job會記錄上一次查詢隊列的一個offset,每次獲取到過期消息會將offset向前偏移,過期消息交給worker處理,當worker由于某些異常原因處理失敗會重置pull job中offset,這樣可以避免消息發(fā)送一次失敗之后沒辦法在繼續(xù)處理(除了新節(jié)點add || remove時候)。
          當部署服務有新增,延時隊列服務會重新計算得到當前處理隊列,并將之前創(chuàng)建pull job cancel,為新處理隊列重新創(chuàng)建pull job。刪除同理。

          作者:Simple

          來源:www.cnblogs.com/lylife/p/7881950.html

          版權申明:內(nèi)容來源網(wǎng)絡,僅供分享學習,版權歸原創(chuàng)者所有。除非無法確認,我們都會標明作者及出處,如有侵權煩請告知,我們會立即刪除并表示歉意。謝謝!


          瀏覽 31
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  人人操手机观看 | 国产高清乱伦片 | 丁香六月婷婷综合激情欧美 | 国内精品视频 | 人妻中文视频免费 |