用 Redis 做一個可靠的延遲隊列
閱讀本文大概需要 7 分鐘。
來自:cnblogs.com/Finley/p/16400287.html
當訂單一直處于未支付狀態(tài)時,如何及時的關閉訂單,并退還庫存? 新創(chuàng)建店鋪,N天內(nèi)沒有上傳商品,系統(tǒng)如何知道該信息,并發(fā)送激活短信?
DelayQueue 等方式實現(xiàn)延時任務。我們在之前的文章中討論過他們的缺陷:比如使用 Redis 過期通知不保證準時、發(fā)送即忘不保證送達,時間輪缺乏持久化機制容易丟失等。持久化: 服務重啟或崩潰不能丟失任務 確認重試機制: 任務處理失敗或超時應該有重試 定時盡量精確
member 投遞時間戳作為 score,使用 zrangebyscore 命令搜索已到投遞時間的消息然后將其發(fā)給消費者。提供 ACK 和重試機制 只需要 Redis 和消費者即可運行,無需其它組件 提供 At-Least-One 投遞語義、并保證消息不會并發(fā)消費
https://github.com/HDT3213/delayqueue
go get github.com/hdt3213/delayqueue 完成安裝。start() 即可:package main
import (
"github.com/go-redis/redis/v8"
"github.com/hdt3213/delayqueue"
"strconv"
"time"
)
func main() {
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {
// 注冊處理消息的回調(diào)函數(shù)
// 返回 true 表示已成功消費,返回 false 消息隊列會重新投遞次消息
return true
})
// 發(fā)送延時消息
for i := 0; i < 10; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
if err != nil {
panic(err)
}
}
// 啟動消費協(xié)程
done := queue.StartConsume()
// 阻塞等待消費協(xié)程退出
<-done
}
原理詳解
msgKey: 為了避免兩條內(nèi)容完全相同的消息造成意外的影響,我們將每條消息放到一個字符串類型的鍵中,并分配一個 UUID 作為它的唯一標識。其它數(shù)據(jù)結構中只存儲 UUID 而不存儲完整的消息內(nèi)容。每個 msg 擁有一個獨立的 key 而不是將所有消息放到一個哈希表是為了利用 TTL 機制避免pendingKey: 有序集合類型,member 為消息 ID, score 為投遞時間的 unix 時間戳。readyKey: 列表類型,需要投遞的消息 ID。unAckKey: 有序集合類型,member 為消息 ID, score 為重試時間的 unix 時間戳。retryKey: 列表類型,已到重試時間的消息 IDgarbageKey: 集合類型,用于暫存已達重試上線的消息 IDretryCountKey: 哈希表類型,鍵為消息 ID, 值為剩余的重試次數(shù)

都是原子性的 不會重復處理同一條消息 操作前后消息隊列始終處于正確的狀態(tài)
pending2ReadyScript
pending2ReadyScript 使用 zrangebyscore 掃描已到投遞時間的消息ID并把它們移動到 ready 中:-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 從 pending key 中找出已到投遞時間的消息
if (#msgs == 0) then return end
local args2 = {'LPush', KEYS[2]} -- 將他們放入 ready key 中
for _,v in ipairs(msgs) do
table.insert(args2, v)
end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 從 pending key 中刪除已投遞的消息
ready2UnackScript
ready2UnackScript 從 ready 或者 retry 中取出一條消息發(fā)送給消費者并放入 unack 中,類似于 RPopLPush:-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if (not msg) then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msg
unack2RetryScript
unack2RetryScript 從 retry 中找出所有已到重試時間的消息并把它們移動到 unack 中:-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 找到已到重試時間的消息
if (#msgs == 0) then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- 查詢剩余重試次數(shù)
for i,v in ipairs(retryCounts) do
local k = msgs[i]
if tonumber(v) > 0 then -- 剩余次數(shù)大于 0
redis.call("HIncrBy", KEYS[2], k, -1) -- 減少剩余重試次數(shù)
redis.call("LPush", KEYS[3], k) -- 添加到 retry key 中
else -- 剩余重試次數(shù)為 0
redis.call("HDel", KEYS[2], k) -- 刪除重試次數(shù)記錄
redis.call("SAdd", KEYS[4], k) -- 添加到垃圾桶,等待后續(xù)刪除
end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 將已處理的消息從 unack key 中刪除
KEYS 參數(shù)中聲明自己要訪問的 key, 而我們將每個 msg 有一個獨立的 key,我們在執(zhí)行 unack2RetryScript 之前是不知道哪些 msg key 需要被刪除。所以 lua 腳本只將需要刪除的消息記在 garbage key 中,腳本執(zhí)行完后再通過 del 命令將他們刪除:func (q *DelayQueue) garbageCollect() error {
ctx := context.Background()
msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()
if err != nil {
return fmt.Errorf("smembers failed: %v", err)
}
if len(msgIds) == 0 {
return nil
}
// allow concurrent clean
msgKeys := make([]string, 0, len(msgIds))
for _, idStr := range msgIds {
msgKeys = append(msgKeys, q.genMsgKey(idStr))
}
err = q.redisCli.Del(ctx, msgKeys...).Err()
if err != nil && err != redis.Nil {
return fmt.Errorf("del msgs failed: %v", err)
}
err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err()
if err != nil && err != redis.Nil {
return fmt.Errorf("remove from garbage key failed: %v", err)
}
return nil
}
ack
func (q *DelayQueue) ack(idStr string) error {
ctx := context.Background()
err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()
if err != nil {
return fmt.Errorf("remove from unack failed: %v", err)
}
// msg key has ttl, ignore result of delete
_ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
q.redisCli.HDel(ctx, q.retryCountKey, idStr)
return nil
}
unack key 中消息的重試時間改為現(xiàn)在,隨后執(zhí)行的 unack2RetryScript 會立即將它移動到 retry keyfunc (q *DelayQueue) nack(idStr string) error {
ctx := context.Background()
// update retry time as now, unack2Retry will move it to retry immediately
err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{
Member: idStr,
Score: float64(time.Now().Unix()),
}).Err()
if err != nil {
return fmt.Errorf("negative ack failed: %v", err)
}
return nil
}
consume
consume 函數(shù),它負責調(diào)用上述腳本將消息轉移到正確的集合中并回調(diào) consumer 來消費消息:func (q *DelayQueue) consume() error {
// 執(zhí)行 pending2ready,將已到時間的消息轉移到 ready
err := q.pending2Ready()
if err != nil {
return err
}
// 循環(huán)調(diào)用 ready2Unack 拉取消息進行消費
var fetchCount uint
for {
idStr, err := q.ready2Unack()
if err == redis.Nil { // consumed all
break
}
if err != nil {
return err
}
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
}
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
}
if err != nil {
return err
}
if fetchCount >= q.fetchLimit {
break
}
}
// 將 nack 或超時的消息放入重試隊列
err = q.unack2Retry()
if err != nil {
return err
}
// 清理已達到最大重試次數(shù)的消息
err = q.garbageCollect()
if err != nil {
return err
}
// 消費重試隊列
fetchCount = 0
for {
idStr, err := q.retry2Unack()
if err == redis.Nil { // consumed all
break
}
if err != nil {
return err
}
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
}
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
}
if err != nil {
return err
}
if fetchCount >= q.fetchLimit {
break
}
}
return nil
}
推薦閱讀:
替代ELK:ClickHouse+Kafka+FlieBeat
互聯(lián)網(wǎng)初中高級大廠面試題(9個G) 內(nèi)容包含Java基礎、JavaWeb、MySQL性能優(yōu)化、JVM、鎖、百萬并發(fā)、消息隊列、高性能緩存、反射、Spring全家桶原理、微服務、Zookeeper......等技術棧!
?戳閱讀原文領取! 朕已閱
評論
圖片
表情


