<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          基于Redis實(shí)現(xiàn)延時(shí)隊(duì)列服務(wù)

          共 2179字,需瀏覽 5分鐘

           ·

          2022-01-24 01:49

          一、背景

          在業(yè)務(wù)發(fā)展過程中,會(huì)出現(xiàn)一些需要延時(shí)處理的場(chǎng)景,比如:
          1. 訂單下單之后超過30分鐘用戶未支付,需要取消訂單

          2. 訂單一些評(píng)論,如果48h用戶未對(duì)商家評(píng)論,系統(tǒng)會(huì)自動(dòng)產(chǎn)生一條默認(rèn)評(píng)論

          3. 點(diǎn)我達(dá)訂單下單后,超過一定時(shí)間訂單未派出,需要超時(shí)取消訂單等。。

          處理這類需求,比較直接簡(jiǎn)單的方式就是定時(shí)任務(wù)輪訓(xùn)掃表。這種處理方式在數(shù)據(jù)量不大的場(chǎng)景下是完全沒問題,但是當(dāng)數(shù)據(jù)量大的時(shí)候高頻的輪訓(xùn)數(shù)據(jù)庫(kù)就會(huì)比較的耗資源,導(dǎo)致數(shù)據(jù)庫(kù)的慢查或者查詢超時(shí)。所以在處理這類需求時(shí)候,采用了延時(shí)隊(duì)列來(lái)完成。

          二、幾種延時(shí)隊(duì)列

          延時(shí)隊(duì)列就是一種帶有延遲功能的消息隊(duì)列。下面會(huì)介紹幾種目前已有的延時(shí)隊(duì)列:
          1.Java中java.util.concurrent.DelayQueue
          優(yōu)點(diǎn):JDK自身實(shí)現(xiàn),使用方便,量小適用
          缺點(diǎn):隊(duì)列消息處于jvm內(nèi)存,不支持分布式運(yùn)行和消息持久化
          2.Rocketmq延時(shí)隊(duì)列
          優(yōu)點(diǎn):消息持久化,分布式
          缺點(diǎn):不支持任意時(shí)間精度,只支持特定level的延時(shí)消息
          3.Rabbitmq延時(shí)隊(duì)列(TTL+DLX實(shí)現(xiàn))
          優(yōu)點(diǎn):消息持久化,分布式
          缺點(diǎn):延時(shí)相同的消息必須扔在同一個(gè)隊(duì)列
          根據(jù)自身業(yè)務(wù)和公司情況,如果實(shí)現(xiàn)一個(gè)自己的延時(shí)隊(duì)列服務(wù)需要考慮一下幾點(diǎn):
          * 消息存儲(chǔ)
          * 過期延時(shí)消息實(shí)時(shí)獲取
          * 高可用性


          三、?基于Redis實(shí)現(xiàn)

          1.0版本

          • 功能特性

          * 消息可靠性,消息持久化,消息至少被消費(fèi)一次
          * 實(shí)時(shí)性:存在一定的時(shí)間誤差(定時(shí)任務(wù)間隔)
          * 支持指定消息remove
          * 高可用性
          • 整體結(jié)構(gòu)

          - Messages Pool所有的延時(shí)消息存放,結(jié)構(gòu)為KV結(jié)構(gòu),key為消息ID,value為一個(gè)具體的message(這里選擇Redis Hash結(jié)構(gòu)主要是因?yàn)閔ash結(jié)構(gòu)能存儲(chǔ)較大的數(shù)據(jù)量,數(shù)據(jù)較多時(shí)候會(huì)進(jìn)行漸進(jìn)式rehash擴(kuò)容,并且對(duì)于HSET和HGET命令來(lái)說(shuō)時(shí)間復(fù)雜度都是O(1))
          - Delayed Queue是16個(gè)有序隊(duì)列(隊(duì)列支持水平擴(kuò)展),結(jié)構(gòu)為ZSET,value為messages pool中消息ID,score為過期時(shí)間(分為多個(gè)隊(duì)列是為了提高掃描的速度)
          - Timed Task定時(shí)任務(wù),負(fù)責(zé)掃描處理每個(gè)隊(duì)列過期消息
          • ?消息結(jié)構(gòu)

          每個(gè)延時(shí)消息必須包括以下參數(shù):
          * tags:消息過期之后發(fā)送mq的tags
          * keys:消息過期之后發(fā)送mq的keys
          * body:消息過期之后發(fā)送mq的body,提供給消費(fèi)這做具體的消息處理
          * delayTime:延時(shí)發(fā)送時(shí)間(默認(rèn),delayTime、expectDate有一個(gè)即可)
          * expectDate:期望發(fā)送時(shí)間
          • 流程


          注:上圖1、2、3或者2、3是一個(gè)事務(wù)操作
          取出過期消息過程是通過一個(gè)外部定時(shí)任務(wù)每隔1min分鐘去查詢隊(duì)列中過期的消息,然后發(fā)送mq && remove

          2.0版本

          1.0上有一個(gè)可改進(jìn)的地方就是隊(duì)列中過期的消息是通過定時(shí)任務(wù)觸發(fā)查詢。所有有了2.0
          2.0版本在1.0上做了一個(gè)優(yōu)化,廢棄掉了1min定時(shí)任務(wù)觸發(fā)過期消息發(fā)送,采用了java Lock await/singlal方式實(shí)現(xiàn)過期消息的實(shí)時(shí)發(fā)送低延時(shí)
          • 多節(jié)點(diǎn)部署結(jié)構(gòu):

          - pull job:這里分別為每一個(gè)隊(duì)列創(chuàng)建了一個(gè)pull job thread,功能很簡(jiǎn)單,就是負(fù)責(zé)去隊(duì)列中拉取過期的消息數(shù)據(jù)(這里保證一個(gè)隊(duì)列有且只有一個(gè)pull job)
          - worker:pull job拉取到的過期消息會(huì)交給一個(gè)worker thread去處理,這樣的好處是處理過期的消息實(shí)時(shí)性更高(pull job不必等去除過期消息全部處理完成在繼續(xù)去拉取新的過期數(shù)據(jù))
          - zookeeper coordinate:通過zk的操作來(lái)完成對(duì)隊(duì)列的重新分配工作,daemon thread監(jiān)聽zk節(jié)點(diǎn)的創(chuàng)建和刪除
          • 主要流程:

          服務(wù)啟動(dòng)會(huì)注冊(cè)zk,獲取分配處理的queues,啟動(dòng)后臺(tái)線程監(jiān)聽zk 。
          為每個(gè)分配queue創(chuàng)建一個(gè)pull job 。
          pull job首先會(huì)去queue中查詢是否有過期消息:
          ??? Y:將取出消息交給worker處理
          ???? N:查詢queue中最后一個(gè)成員(zset結(jié)構(gòu)默認(rèn)按score遞增排序),如果為空,則await;不為空則await(成員score-System.currentTimeMillis())
          由于過期消息發(fā)送成功才會(huì)從隊(duì)列中remove,所以pull job會(huì)記錄上一次查詢隊(duì)列的一個(gè)offset,每次獲取到過期消息會(huì)將offset向前偏移,過期消息交給worker處理,當(dāng)worker由于某些異常原因處理失敗會(huì)重置pull job中offset,這樣可以避免消息發(fā)送一次失敗之后沒辦法在繼續(xù)處理(除了新節(jié)點(diǎn)add || remove時(shí)候)。
          當(dāng)部署服務(wù)有新增,延時(shí)隊(duì)列服務(wù)會(huì)重新計(jì)算得到當(dāng)前處理隊(duì)列,并將之前創(chuàng)建pull job cancel,為新處理隊(duì)列重新創(chuàng)建pull job。刪除同理。

          作者:Simple

          來(lái)源:www.cnblogs.com/lylife/p/7881950.html

          瀏覽 24
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  www.AV视频在线观看 | 看高清无码在线视频 | 丁香五月婷婷香 | 天堂视频免费在线观看 | www.久久精品视频 |