<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          圖文結(jié)合!Redis延遲隊列g(shù)olang高效實踐

          共 14775字,需瀏覽 30分鐘

           ·

          2022-09-02 11:44

          導(dǎo)語?|? 本文主要講述如何使用golang基于Redis實現(xiàn)延遲消息隊列組件。希望對有需求的同學(xué)有所幫助。


          一、背景


          業(yè)務(wù)中經(jīng)常會有這樣的場景:


          • 到期后自動執(zhí)行指定操作;


          • 查詢某個任務(wù)是否完成,未完成等待一定時間再次查詢;


          • 回調(diào)通知,當回調(diào)失敗時,等待后重試;等等還有其他很多類似的場景。


          很多時候我們會直接通過一個本地定時器來幫我們完成這個任務(wù)。如果我們的系統(tǒng)是多實例分布式的,本地定時器就會面臨很多問題,如:怎么保證重復(fù)處理的問題;統(tǒng)一管控的問題等 等。面對本地定時器遇到的問題,我們可以使用分布式延遲隊列來實現(xiàn)。


          這里介紹一種使用golang基于redis實現(xiàn)延遲隊列的具體實踐。



          二、實現(xiàn)原理


          1、使用redis可以通過List類型來實現(xiàn)隊列的功能,通過LPOP,RPUSH來保證先進先出的特性。


          ac2bf5d915bdd46820cd7f4dea9a70ce.webp


          2、針對需要延遲處理的消息可以通過SortedSet有序集合類型來存儲, 消息到期時期使用時間戳,作為member score的值。


          f12b24de2948e26a0c7379e072b60ee7.webp


          3、定時輪訓(xùn)sortedset,使用到期時間戳作為score,通過ZRANGEBYSCORE排序獲取到期的消息,將到期的消息遷移到List隊列中即可。

          5f2cfd99e28d3f8de29543a9561a6bee.webp



          三、消息遷移的原子性


          針對到期消息的往list的遷移需要三個動作:


          • 查詢到期消息;


          • 從sortedset取出到期消息;


          • 將到期消息push到list隊列中。


          那如何保證這個三個操作的 原子性 (要么都成功,要么都失?。?。在redis中有兩種處理方式可以保證多操作間的原子性。


          (一)Transaction


          • MULTI標記一個事務(wù)塊的開始。事務(wù)塊內(nèi)的多條命令會按照先后順序被放進一個隊列當中,最后由EXEC命令原子性(atomic)地執(zhí)行。


          • EXEC執(zhí)行所有事務(wù)塊內(nèi)的命令。



          (二)LuaScript


          Redis使用單個Lua解釋器去運行所有腳本,并且,Redis也保證腳本會以原子性(atomic)的方式執(zhí)行:當某個腳本正在運行的時候,不會有其他腳本或Redis命令被執(zhí)行。這和使用MULTI/EXEC包圍的事務(wù)很類似。在其他別的客戶端看來,腳本的效果(effect)要么是不可見的(not visible),要么就是已完成的(already completed)。注意:編寫的script不能很慢,因為會阻塞其他命令的執(zhí)行。


          Transaction能夠保證多個操作的原子性。LuaScript不僅保證了多操作間的原子性,可以處理更復(fù)雜的邏輯,如保障get、set操作的原子性。這里針對到期消息的遷移是先查詢到期的消息,然后兩個寫操作完成。所以這里使用LuaScript實現(xiàn)更為簡單方便。否則還需要加一個鎖來避免同一個到期消息多次處理的問題。如果遷移到期消息的模塊是單實例(非并發(fā))處理的,不需要加鎖處理。



          四、List,SortedSet性能


          網(wǎng)上針對redis的壓測很多,這里我們使用memtier_benchmark將與延遲隊列使用到相關(guān)的操作進行壓測。使用redis6.0,8核16Glinux服務(wù)器。


          (一)List讀寫性能 LPOP,RPUSH時間復(fù)雜度為O(1)


          6ec6e02108163c69fb8b2d26d2d976f9.webp


          6ec6e02108163c69fb8b2d26d2d976f9.webp



          (二)sortedset 相關(guān)操作的讀寫性能


          • zadd O(M*log(N)), N是有序集的基數(shù),M為成功添加的新成員的數(shù)量。


          8bd902763a44c37fbbdda7eaff3cc5ea.webp


          • ZREMRANGEBYRANKS O(log(N)+M), N為有序集的基數(shù),而M為被移除成員的數(shù)量。


          3b8c59f2bba6aeb568991bfa9c85b1fe.webp


          在熟悉了基于redis實現(xiàn)的延遲隊列的基本方法后,接下來看下使用golang具體的實現(xiàn)。


          五、消息協(xié)議定義


          定義一個消息結(jié)構(gòu)來保存消息:


                
                  
                    // Job
                  
                
                
                  type Job struct {
                
                
                    Id        string `msgpack:"1"` // 任務(wù)id
                
                
                    Topic     string `msgpack:"2"` // 消息名
                
                
                    Delay     int64  `msgpack:"3"` // 延遲時間
                
                
                    Playload  []byte `msgpack:"4"` // 消息體
                
                
                    Timestamp int64  `msgpack:"5"` // 消息投遞時間
                
                
                  }
                
              


          這里使用msgpack實現(xiàn)消息的序列化。messagepack是一個高效的二進制序列化協(xié)議。相比json編碼后的數(shù)據(jù)的體積更小,編解碼的速度更快。redis script也支持messagepack。


          benchmark性能測試:


          4a1a9350fee7e3cf43f315bafa9da08e.webp


          其他編解碼性能對比參考:

          https://github.com/alecthomas/go_serialization_benchmarks



          六、延遲隊列的核心-redis


          基于redis實現(xiàn)分布式延遲隊列,其核心是使用List類型實現(xiàn)隊列功能;使用sortedset實現(xiàn)延遲消息的管理,并且輪詢sortedset將到期的消息遷移到List隊列中,再啟用consumer實例處理List隊列中的消息,就完成了整個延遲隊列的核心處理流程。先來看下針對redis操作的相關(guān)實現(xiàn),這里操作redis的庫使用的是go-redis庫。


          (一)獲取延時消息數(shù)


                
                  
                    // zcard
                  
                
                
                  func zcard(rdb *redis.Client, key string) *redis.IntCmd {
                
                
                    return rdb.ZCard(context.Background(), key)
                
                
                  }
                
              



          (二)獲取等待執(zhí)行的消息數(shù)


                
                  
                    // list 長度
                  
                
                
                  func llen(rdb *redis.Client, key string) *redis.IntCmd {
                
                
                    return rdb.LLen(context.Background(), key)
                
                
                  }
                
              


          這兩個方法屬于metric方法,可以幫助我們了解當前延時隊列的消息積壓情況,為我們對consumer實例的水平擴展提供參考指標。



          (三)發(fā)送可執(zhí)行消息


                
                  
                    // 推送新的job到隊列
                  
                
                
                  func lpush(rdb *redis.Client, key string, value interface{}) error {
                
                
                    return rdb.LPush(context.Background(), key, value).Err()
                
                
                  }
                
              



          (四)發(fā)送延時消息


                
                  
                    // 增加延遲job
                  
                
                
                  func zadd(rdb *redis.Client, key string, value interface{}, delay int) error {
                
                
                    return rdb.ZAdd(
                
                
                      context.Background(),
                
                
                      key,
                
                
                      &redis.Z{Score: float64(delay), Member: value}).Err()
                
                
                  }
                
              



          (五)獲取可執(zhí)行消息


                
                  
                    // 從ready隊列取消息
                  
                
                
                  func rpop(rdb *redis.Client, key string) *redis.StringCmd {
                
                
                    return rdb.RPop(context.Background(), key)
                
                
                  }
                
              



          (六)到期消息遷移到待執(zhí)行隊列 ,這里使用redis script實現(xiàn)。每次都取指定數(shù)量(limit 0 num)的到期消息,時間花費相對穩(wěn)定。也不至于在到期消息突增時,導(dǎo)致redis內(nèi)存占用突增。每次執(zhí)行僅對客戶端返回消息數(shù),從而降低網(wǎng)絡(luò)傳輸。


                
                  
                    // 將到期的job遷移到ready隊列等待執(zhí)行,這里使用redis script實現(xiàn)
                  
                
                
                  func migrateExpiredJobs(rdb *redis.Client, delaykey, readyKey string) error {
                
                
                    script := redis.NewScript(`
                
                
                    local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1], 'limit', 0, 20)
                
                
                    if(next(val) ~= nil) then
                
                
                      redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
                
                
                      redis.call('rpush', KEYS[2], unpack(val, 1, #val))
                
                
                    end
                
                
                    return #val
                
                
                    `)
                
                
                    return script.Run(context.Background(), rdb, []string{delaykey, readyKey}, time.Now().Unix()).Err()
                
                
                  }
                
              



          七、Product實現(xiàn)


          product功能比較單一,僅實現(xiàn)消息的投遞。這里是對Redis兩個方法RPUSH、ZADD的的封裝。


                
                  
                    // QueueClient
                  
                
                
                  type QueueClient struct {
                
                
                    queue *queue
                
                
                  }
                
              


          QueueClient提供了 兩個操作方法


          • 發(fā)送即時消息,針對非延遲執(zhí)行的消息直接投遞到ready隊列中,等待執(zhí)行;


                
                  
                    // Dispatch
                  
                
                
                  func (c *QueueClient) Dispatch(topic string, payload []byte) error {
                
                
                    return c.queue.Push(&Job{
                
                
                      Topic:     topic,
                
                
                      Playload:  payload,
                
                
                      Delay:     0,
                
                
                      Timestamp: time.Now().Unix(),
                
                
                    })
                
                
                  }
                
              


          • 發(fā)起延遲消息,針對需要延遲執(zhí)行的消息這里支持秒級的延遲消息。這里底層使用的ZADD。


                
                  
                    // DispatchDelaySecond
                  
                
                
                  func (c *QueueClient) DispatchDelaySecond(topic string, payload []byte, delaySec int) error {
                
                
                    return c.queue.DelayJob(&Job{
                
                
                      Topic:     topic,
                
                
                      Playload:  payload,
                
                
                      Delay:     int64(delaySec),
                
                
                      Timestamp: time.Now().Unix(),
                
                
                    })
                
                
                  }
                
              



          八、consumer實現(xiàn)


          在完成消息投遞的相關(guān)方法的實現(xiàn)后,我們來看下如何實現(xiàn)一個穩(wěn)定高效的消息處理框架。


          consumer完成 兩個核心操作


          • 將到期的消息,遷移到可執(zhí)行隊列中;


          • 從可執(zhí)行隊列中取出消息完成相應(yīng)的處理。


          接下來看下consumer queueserver的實現(xiàn)的主要方法。


          bedb384850dd325ff5ad371f2fdb15a6.webp


          1、QueueServer開始執(zhí)行的topic worker處理消息。一個topic一個協(xié)程的處理方式,保證不同的topic之間不相互影響;


                
                  
                    // QueueServer
                  
                
                
                  type QueueServer struct {
                
                
                    queueOption *QueueOption
                
                
                    topicwokers []*TopicWorker
                
                
                    queue       *queue
                
                
                    stopCh      chan struct{}
                
                
                    close       uint32
                
                
                  }
                
                
                  
                    
          // Run 開始處理消息直到收到退出命令 func (s *QueueServer) Run(ctx context.Context) error { // 一個topic 一個協(xié)程處理 for _, topicWorker := range s.topicwokers { go s.processJob(ctx, topicWorker) } // 監(jiān)聽系統(tǒng)信號 go s.watchSystemSignal(ctx) // 等待退出 <-s.stopCh ctxTimeOut, cancalFunc := context.WithTimeout(ctx, time.Second*time.Duration(s.queueOption.CloseWaitTime)) defer cancalFunc() return s.Close(ctxTimeOut) }


          2、定義統(tǒng)一消息處理接口JobHandler,定義了兩個方法:Topic返回隊列名;Execute完成消息的處理,可以實現(xiàn)業(yè)務(wù)處理邏輯。


                
                  
                    // JobHandler
                  
                
                
                  type JobHandler interface {
                
                
                    Topic() string // 返回topic名稱
                
                
                    Execute(context.Context, []byte) error // 處理消息
                
                
                  }
                
              


          通過topic worker來管理每個消息隊列的處理。


                
                  
                    // TopicWorker
                  
                
                
                  type TopicWorker struct {
                
                
                    TopicName   string     // topic 名稱
                
                
                    Handler     JobHandler // 處理job的方法
                
                
                    WorkerCount int        // 并行任務(wù)數(shù)
                
                
                    WorkerPool  *semaphore.Weighted // 通過信號量控制并發(fā)協(xié)程數(shù)
                
                
                  }
                
              


          通過信號量semaphore庫實現(xiàn)協(xié)程的并發(fā)數(shù)控制。如果消息間有順序要求,可以設(shè)置workerCount為1。當server退出的時候,獲取與workerCount相等數(shù)量的信號量等待所有處理消息的協(xié)程執(zhí)行完成。s.queue.GetReadyJob(topic.TopicName) 這個方法的內(nèi)部出了獲取可執(zhí)行消息外,還執(zhí)行了到期消息遷移的方法。


                
                  
                    // GetReadyJob 遷移到期消息,返回可執(zhí)行消息
                  
                
                
                  func (q *queue) GetReadyJob(topic string) (*Job, error) {
                
                
                    migrateExpiredJobs(q.rdb, 
                
                
                                             fmt.Sprintf("%s:%s", topic, DelayJobType),
                
                
                                             fmt.Sprintf("%s:%s", topic, ReadyJobType))
                
                
                    return q.Pop(topic)
                
                
                  }
                
              


                
                  
                    // processJob 處理消息
                  
                
                
                  func (s *QueueServer) processJob(ctx context.Context, topic *TopicWorker) error {
                
                
                    // 循環(huán)獲取消息直到server退出
                
                
                    for {
                
                
                      // 判斷server是否退出
                
                
                      if atomic.LoadUint32(&s.close) == closed {
                
                
                        break
                
                
                      }
                
                
                      // 通過信號量控制并發(fā)的協(xié)程數(shù),正在運行的協(xié)程達到上限就等待
                
                
                      if err := topic.WorkerPool.Acquire(ctx, 1); err != nil {
                
                
                        return err
                
                
                      }
                
                
                      job, err := s.queue.GetReadyJob(topic.TopicName)
                
                
                      if err != nil && err != redis.Nil {
                
                
                        topic.WorkerPool.Release(1)
                
                
                        continue
                
                
                      }
                
                
                      // 沒有要執(zhí)行的job
                
                
                      if job == nil {
                
                
                        topic.WorkerPool.Release(1)
                
                
                        time.Sleep(time.Second * time.Duration(s.queueOption.WaitTime))
                
                
                        continue
                
                
                      }
                
                
                      go func() {
                
                
                        topic.Handler.Execute(ctx, job.Playload)
                
                
                        topic.WorkerPool.Release(1)
                
                
                      }()
                
                
                    }
                
                
                    // 等待所有處理消息的協(xié)程執(zhí)行完成
                
                
                    if err := topic.WorkerPool.Acquire(ctx, int64(topic.WorkerCount)); err != nil {
                
                
                      return err
                
                
                    }
                
                
                    return nil
                
                
                  }
                
                
                  
                    



          九、擴展


          (一)Job錯誤重試


          如果想要給上面的Job處理加上錯誤重試的機制。我們給Job struct加上TryCount字段,當JobHandler 執(zhí)行返回error時,把job放入可執(zhí)行隊列或是延遲集合(等待指定時候后重試)。通過TryCount來限定重試的次數(shù)。超過指定次數(shù)后丟棄消息。



          (二)Job超時重試


          如果想要給上面的Job處理加上超時重試的機制。我們給Job struct加上TryTimeOut字段,當讀取消息的同時把job放入延遲集合(等待到達超時時間后重試)。通過TryCount來限定重試的次數(shù)。超過指定次數(shù)后丟棄消息。執(zhí)行成功時需要從set集合刪除。




          十、總結(jié)


          使用golang基于redis實現(xiàn)延遲隊列的方法如上所述,實現(xiàn)方式很多,但核心基本相同,可能在某些實現(xiàn)細節(jié)上略有差異。比如:


          • 使用單獨的協(xié)程來完成到期消息到可執(zhí)行隊列的遷移;


          • 使用redis stream來實現(xiàn)隊列。


          熟悉php laravel框架的應(yīng)該覺得這個方案相似,本文的實現(xiàn)方案跟laravel里的queue庫實現(xiàn)方案類似,它支持更多的消息驅(qū)動:本地、文件、mysql、redis等。但我們借助golang可以實現(xiàn)的更高效消息處理框架。使用這種方式需要考慮消息丟失時的補償機制。


          參考資料: 1. redis操作: http://doc.redisfans.com/index.html 2.go-redis: https://github.com/go-redis/redis



          ?作者簡介


          cc7ad53d9d2c5e718a4db567c6a75c5c.webp

          王曉林

          騰訊應(yīng)用開發(fā)工程師

          騰訊應(yīng)用開發(fā)工程師,目前負責(zé)游可愛平臺(yka.qq.com)的后臺開發(fā)工作。



          推薦閱讀


          福利
          我為大家整理了一份 從入門到進階的Go學(xué)習(xí)資料禮包 ,包含學(xué)習(xí)建議:入門看什么,進階看什么。 關(guān)注公眾號 「polarisxu」,回復(fù)? ebook ?獲?。贿€可以回復(fù)「進群」,和數(shù)萬 Gopher 交流學(xué)習(xí)。

          瀏覽 37
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  又污又黄又爽的网站 | 东京热无码视频 | 欧美在线导航 | 久久一级精品视频 | 操逼网站免费视频 |