基于Redis實(shí)現(xiàn)延時(shí)隊(duì)列服務(wù)
一、背景
訂單下單之后超過30分鐘用戶未支付,需要取消訂單
訂單一些評(píng)論,如果48h用戶未對(duì)商家評(píng)論,系統(tǒng)會(huì)自動(dòng)產(chǎn)生一條默認(rèn)評(píng)論
點(diǎn)我達(dá)訂單下單后,超過一定時(shí)間訂單未派出,需要超時(shí)取消訂單等。。
處理這類需求,比較直接簡(jiǎn)單的方式就是定時(shí)任務(wù)輪訓(xùn)掃表。這種處理方式在數(shù)據(jù)量不大的場(chǎng)景下是完全沒問題,但是當(dāng)數(shù)據(jù)量大的時(shí)候高頻的輪訓(xùn)數(shù)據(jù)庫(kù)就會(huì)比較的耗資源,導(dǎo)致數(shù)據(jù)庫(kù)的慢查或者查詢超時(shí)。所以在處理這類需求時(shí)候,采用了延時(shí)隊(duì)列來(lái)完成。
二、幾種延時(shí)隊(duì)列
1.Java中java.util.concurrent.DelayQueue
優(yōu)點(diǎn):JDK自身實(shí)現(xiàn),使用方便,量小適用
缺點(diǎn):隊(duì)列消息處于jvm內(nèi)存,不支持分布式運(yùn)行和消息持久化
2.Rocketmq延時(shí)隊(duì)列
優(yōu)點(diǎn):消息持久化,分布式
缺點(diǎn):不支持任意時(shí)間精度,只支持特定level的延時(shí)消息
3.Rabbitmq延時(shí)隊(duì)列(TTL+DLX實(shí)現(xiàn))
優(yōu)點(diǎn):消息持久化,分布式
缺點(diǎn):延時(shí)相同的消息必須扔在同一個(gè)隊(duì)列
* 過期延時(shí)消息實(shí)時(shí)獲取
* 高可用性
三、?基于Redis實(shí)現(xiàn)
1.0版本
功能特性
* 實(shí)時(shí)性:存在一定的時(shí)間誤差(定時(shí)任務(wù)間隔)
* 支持指定消息remove
* 高可用性
整體結(jié)構(gòu)

- Delayed Queue是16個(gè)有序隊(duì)列(隊(duì)列支持水平擴(kuò)展),結(jié)構(gòu)為ZSET,value為messages pool中消息ID,score為過期時(shí)間(分為多個(gè)隊(duì)列是為了提高掃描的速度)
- Timed Task定時(shí)任務(wù),負(fù)責(zé)掃描處理每個(gè)隊(duì)列過期消息
?消息結(jié)構(gòu)
* keys:消息過期之后發(fā)送mq的keys
* body:消息過期之后發(fā)送mq的body,提供給消費(fèi)這做具體的消息處理
* delayTime:延時(shí)發(fā)送時(shí)間(默認(rèn),delayTime、expectDate有一個(gè)即可)
* expectDate:期望發(fā)送時(shí)間
流程

注:上圖1、2、3或者2、3是一個(gè)事務(wù)操作
取出過期消息過程是通過一個(gè)外部定時(shí)任務(wù)每隔1min分鐘去查詢隊(duì)列中過期的消息,然后發(fā)送mq && remove
2.0版本
2.0版本在1.0上做了一個(gè)優(yōu)化,廢棄掉了1min定時(shí)任務(wù)觸發(fā)過期消息發(fā)送,采用了java Lock await/singlal方式實(shí)現(xiàn)過期消息的實(shí)時(shí)發(fā)送低延時(shí)

多節(jié)點(diǎn)部署結(jié)構(gòu):

- worker:pull job拉取到的過期消息會(huì)交給一個(gè)worker thread去處理,這樣的好處是處理過期的消息實(shí)時(shí)性更高(pull job不必等去除過期消息全部處理完成在繼續(xù)去拉取新的過期數(shù)據(jù))
- zookeeper coordinate:通過zk的操作來(lái)完成對(duì)隊(duì)列的重新分配工作,daemon thread監(jiān)聽zk節(jié)點(diǎn)的創(chuàng)建和刪除
主要流程:
為每個(gè)分配queue創(chuàng)建一個(gè)pull job 。
pull job首先會(huì)去queue中查詢是否有過期消息:
??? Y:將取出消息交給worker處理
???? N:查詢queue中最后一個(gè)成員(zset結(jié)構(gòu)默認(rèn)按score遞增排序),如果為空,則await;不為空則await(成員score-System.currentTimeMillis())
當(dāng)部署服務(wù)有新增,延時(shí)隊(duì)列服務(wù)會(huì)重新計(jì)算得到當(dāng)前處理隊(duì)列,并將之前創(chuàng)建pull job cancel,為新處理隊(duì)列重新創(chuàng)建pull job。刪除同理。
作者:Simple
來(lái)源:www.cnblogs.com/lylife/p/7881950.html
評(píng)論
圖片
表情
