<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          用 Redis 做一個可靠的延遲隊列

          共 10643字,需瀏覽 22分鐘

           ·

          2022-08-04 11:49

          程序員的成長之路
          互聯(lián)網(wǎng)/程序員/技術/資料共享 
          關注


          閱讀本文大概需要 7 分鐘。

          來自:cnblogs.com/Finley/p/16400287.html

          我們先看看以下業(yè)務場景:
          • 當訂單一直處于未支付狀態(tài)時,如何及時的關閉訂單,并退還庫存?
          • 新創(chuàng)建店鋪,N天內(nèi)沒有上傳商品,系統(tǒng)如何知道該信息,并發(fā)送激活短信?
          上述場景最簡單直接的解決方案是定時掃表。我們假設 10 分鐘未支付則關閉訂單、定時任務設置為 5 分鐘一次,那么一個訂單最晚會在 15 分鐘關閉。高達 5 分鐘的誤差是業(yè)務難以接受的。另一方面頻繁的掃表可能消耗過多數(shù)據(jù)庫資源,影響線上交易吞吐量。
          此外還有朋友使用 Redis 的過期通知、時間輪、Java 的 DelayQueue 等方式實現(xiàn)延時任務。我們在之前的文章中討論過他們的缺陷:比如使用 Redis 過期通知不保證準時、發(fā)送即忘不保證送達,時間輪缺乏持久化機制容易丟失等。
          總結一下,我們對于延時隊列的要求有下列幾條(從重要到不重要排列):
          • 持久化: 服務重啟或崩潰不能丟失任務
          • 確認重試機制: 任務處理失敗或超時應該有重試
          • 定時盡量精確
          最合適的解決方案是使用 Pulsa、RocketMQ 等專業(yè)消息隊列的延時投遞功能。不過引入新的中間件通常存在各種非技術方面的麻煩。Redis 作為廣泛使用的中間件,何不用 Redis 來制作延時隊列呢?
          使用有序集合結構實現(xiàn)延時隊列的方法已經(jīng)廣為人知,無非是將消息作為有序集合的 member 投遞時間戳作為 score,使用 zrangebyscore 命令搜索已到投遞時間的消息然后將其發(fā)給消費者。
          除了基本的延時投遞之外我們的消息隊列具有下列優(yōu)勢:
          • 提供 ACK 和重試機制
          • 只需要 Redis 和消費者即可運行,無需其它組件
          • 提供 At-Least-One 投遞語義、并保證消息不會并發(fā)消費
          本文的完整代碼:
          https://github.com/HDT3213/delayqueue
          可以直接 go get github.com/hdt3213/delayqueue 完成安裝。
          具體使用也非常簡單,只需要注冊處理消息的回調(diào)函數(shù)并調(diào)用 start() 即可:
          package main

          import (
           "github.com/go-redis/redis/v8"
           "github.com/hdt3213/delayqueue"
           "strconv"
           "time"
          )

          func main() {
           redisCli := redis.NewClient(&redis.Options{
            Addr: "127.0.0.1:6379",
           })
           queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {
            // 注冊處理消息的回調(diào)函數(shù)
                  // 返回 true 表示已成功消費,返回 false 消息隊列會重新投遞次消息
            return true
           })
           // 發(fā)送延時消息
           for i := 0; i < 10; i++ {
            err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
            if err != nil {
             panic(err)
            }
           }

           // 啟動消費協(xié)程
           done := queue.StartConsume()
           // 阻塞等待消費協(xié)程退出
           <-done
          }
          由于數(shù)據(jù)存儲在 redis 中所以我們最多能保證在 redis 無故障且消息隊列相關 key 未被外部篡改的情況下不會丟失消息。

          原理詳解

          消息隊列涉及幾個關鍵的 redis 數(shù)據(jù)結構:
          • msgKey: 為了避免兩條內(nèi)容完全相同的消息造成意外的影響,我們將每條消息放到一個字符串類型的鍵中,并分配一個 UUID 作為它的唯一標識。其它數(shù)據(jù)結構中只存儲 UUID 而不存儲完整的消息內(nèi)容。每個 msg 擁有一個獨立的 key 而不是將所有消息放到一個哈希表是為了利用 TTL 機制避免
          • pendingKey: 有序集合類型,member 為消息 ID, score 為投遞時間的 unix 時間戳。
          • readyKey: 列表類型,需要投遞的消息 ID。
          • unAckKey: 有序集合類型,member 為消息 ID, score 為重試時間的 unix 時間戳。
          • retryKey: 列表類型,已到重試時間的消息 ID
          • garbageKey: 集合類型,用于暫存已達重試上線的消息 ID
          • retryCountKey: 哈希表類型,鍵為消息 ID, 值為剩余的重試次數(shù)
          流程如下圖所示:
          由于我們允許分布式地部署多個消費者,每個消費者都在定時執(zhí)行 lua 腳本,所以多個消費者可能處于上述流程中不同狀態(tài),我們無法預知(或控制)上圖中五個操作發(fā)生的先后順序,也無法控制有多少實例正在執(zhí)行同一個操作。
          因此我們需要保證上圖中五個操作滿足三個條件:
          • 都是原子性的
          • 不會重復處理同一條消息
          • 操作前后消息隊列始終處于正確的狀態(tài)
          只要滿足這三個條件,我們就可以部署多個實例且不需要使用分布式鎖等技術來進行狀態(tài)同步。
          是不是聽起來有點嚇人??? 其實簡單的很,讓我們一起來詳細看看吧~

          pending2ReadyScript

          pending2ReadyScript 使用 zrangebyscore 掃描已到投遞時間的消息ID并把它們移動到 ready 中:
          -- keys: pendingKey, readyKey
          -- argv: currentTime
          local msgs = redis.call('ZRangeByScore'KEYS[1], '0', ARGV[1])  -- 從 pending key 中找出已到投遞時間的消息
          if (#msgs == 0) then return end
          local args2 = {'LPush'KEYS[2]} -- 將他們放入 ready key 中
          for _,v in ipairs(msgs) do
           table.insert(args2, v) 
          end
          redis.call(unpack(args2))
          redis.call('ZRemRangeByScore'KEYS[1], '0', ARGV[1])  -- 從 pending key 中刪除已投遞的消息

          ready2UnackScript

          ready2UnackScript 從 ready 或者 retry 中取出一條消息發(fā)送給消費者并放入 unack 中,類似于 RPopLPush
          -- keys: readyKey/retryKey, unackKey
          -- argv: retryTime
          local msg = redis.call('RPop'KEYS[1])
          if (not msg) then return end
          redis.call('ZAdd'KEYS[2], ARGV[1], msg)
          return msg

          unack2RetryScript

          unack2RetryScript 從 retry 中找出所有已到重試時間的消息并把它們移動到 unack 中:
          -- keys: unackKey, retryCountKey, retryKey, garbageKey
          -- argv: currentTime
          local msgs = redis.call('ZRangeByScore'KEYS[1], '0', ARGV[1])  -- 找到已到重試時間的消息
          if (#msgs == 0) then return end
          local retryCounts = redis.call('HMGet'KEYS[2], unpack(msgs)) -- 查詢剩余重試次數(shù)
          for i,v in ipairs(retryCounts) do
           local k = msgs[i]
           if tonumber(v) > 0 then -- 剩余次數(shù)大于 0
            redis.call("HIncrBy"KEYS[2], k, -1-- 減少剩余重試次數(shù)
            redis.call("LPush"KEYS[3], k) -- 添加到 retry key 中
           else -- 剩余重試次數(shù)為 0
            redis.call("HDel"KEYS[2], k) -- 刪除重試次數(shù)記錄
            redis.call("SAdd"KEYS[4], k) -- 添加到垃圾桶,等待后續(xù)刪除
           end
          end
          redis.call('ZRemRangeByScore'KEYS[1], '0', ARGV[1])  -- 將已處理的消息從 unack key 中刪除
          因為 redis 要求 lua 腳本必須在執(zhí)行前在 KEYS 參數(shù)中聲明自己要訪問的 key, 而我們將每個 msg 有一個獨立的 key,我們在執(zhí)行 unack2RetryScript 之前是不知道哪些 msg key 需要被刪除。所以 lua 腳本只將需要刪除的消息記在 garbage key 中,腳本執(zhí)行完后再通過 del 命令將他們刪除:
          func (q *DelayQueue) garbageCollect() error {
           ctx := context.Background()
           msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()
           if err != nil {
            return fmt.Errorf("smembers failed: %v", err)
           }
           if len(msgIds) == 0 {
            return nil
           }
           // allow concurrent clean
           msgKeys := make([]string0len(msgIds))
           for _, idStr := range msgIds {
            msgKeys = append(msgKeys, q.genMsgKey(idStr))
           }
           err = q.redisCli.Del(ctx, msgKeys...).Err()
           if err != nil && err != redis.Nil {
            return fmt.Errorf("del msgs failed: %v", err)
           }
           err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err()
           if err != nil && err != redis.Nil {
            return fmt.Errorf("remove from garbage key failed: %v", err)
           }
           return nil
          }
          之前提到的 lua 腳本都是原子性執(zhí)行的,不會有其它命令插入其中。gc 函數(shù)由 3 條 redis 命令組成,在執(zhí)行過程中可能會有其它命令插入執(zhí)行過程中,不過考慮到一條消息進入垃圾回收流程之后不會復活所以不需要保證 3 條命令原子性。

          ack

          ack 只需要將消息徹底刪除即可:
          func (q *DelayQueue) ack(idStr string) error {
           ctx := context.Background()
           err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()
           if err != nil {
            return fmt.Errorf("remove from unack failed: %v", err)
           }
           // msg key has ttl, ignore result of delete
           _ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
           q.redisCli.HDel(ctx, q.retryCountKey, idStr)
           return nil
          }
          否定確認只需要將 unack key 中消息的重試時間改為現(xiàn)在,隨后執(zhí)行的 unack2RetryScript 會立即將它移動到 retry key
          func (q *DelayQueue) nack(idStr string) error {
           ctx := context.Background()
           // update retry time as now, unack2Retry will move it to retry immediately
           err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{
            Member: idStr,
            Score:  float64(time.Now().Unix()),
           }).Err()
           if err != nil {
            return fmt.Errorf("negative ack failed: %v", err)
           }
           return nil
          }

          consume

          消息隊列的核心邏輯是每秒執(zhí)行一次的 consume 函數(shù),它負責調(diào)用上述腳本將消息轉移到正確的集合中并回調(diào) consumer 來消費消息:
          func (q *DelayQueue) consume() error {
           // 執(zhí)行 pending2ready,將已到時間的消息轉移到 ready
           err := q.pending2Ready()
           if err != nil {
            return err
           }
           // 循環(huán)調(diào)用 ready2Unack 拉取消息進行消費
           var fetchCount uint
           for {
            idStr, err := q.ready2Unack()
            if err == redis.Nil { // consumed all
             break
            }
            if err != nil {
             return err
            }
            fetchCount++
            ack, err := q.callback(idStr)
            if err != nil {
             return err
            }
            if ack {
             err = q.ack(idStr)
            } else {
             err = q.nack(idStr)
            }
            if err != nil {
             return err
            }
            if fetchCount >= q.fetchLimit {
             break
            }
           }
           // 將 nack 或超時的消息放入重試隊列
           err = q.unack2Retry()
           if err != nil {
            return err
           }
              // 清理已達到最大重試次數(shù)的消息
           err = q.garbageCollect()
           if err != nil {
            return err
           }
           // 消費重試隊列
           fetchCount = 0
           for {
            idStr, err := q.retry2Unack()
            if err == redis.Nil { // consumed all
             break
            }
            if err != nil {
             return err
            }
            fetchCount++
            ack, err := q.callback(idStr)
            if err != nil {
             return err
            }
            if ack {
             err = q.ack(idStr)
            } else {
             err = q.nack(idStr)
            }
            if err != nil {
             return err
            }
            if fetchCount >= q.fetchLimit {
             break
            }
           }
           return nil
          }
          至此一個簡單可靠的延時隊列就做好了,何不趕緊開始試用呢?


          <END>

          推薦閱讀:

          知乎:不到 20 人的 IT 公司該去嗎?

          替代ELK:ClickHouse+Kafka+FlieBeat

          互聯(lián)網(wǎng)初中高級大廠面試題(9個G)

          內(nèi)容包含Java基礎、JavaWeb、MySQL性能優(yōu)化、JVM、鎖、百萬并發(fā)、消息隊列、高性能緩存、反射、Spring全家桶原理、微服務、Zookeeper......等技術棧!

          ?戳閱讀原文領取!                                  朕已閱 

          瀏覽 55
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲色拍视频 | 国产亚洲日韩欧美蝌蚪窝91视频 | 亚洲免费视频在线看 | 五月婷婷AV手机免费观看 | 日本特级AAA |