<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Redis實現(xiàn)消息隊列的4種方案

          共 4114字,需瀏覽 9分鐘

           ·

          2020-10-28 20:53


          Redis作為內(nèi)存中的數(shù)據(jù)結(jié)構(gòu)存儲,常用作數(shù)據(jù)庫、緩存和消息代理。它支持數(shù)據(jù)結(jié)構(gòu),如 字符串,散列,列表,集合,帶有范圍查詢的排序集(sorted sets),位圖(bitmaps),超級日志(hyperloglogs),具有半徑查詢和流的地理空間索引。Redis具有內(nèi)置復(fù)制,Lua腳本,LRU驅(qū)逐,事務(wù)和不同級別的磁盤持久性,并通過Redis Sentinel和Redis Cluster自動分區(qū)。


          為了實現(xiàn)其出色的性能,Redis使用內(nèi)存數(shù)據(jù)集(in-memory dataset)。


          MQ應(yīng)用有很多,比如ActiveMQ,RabbitMQ,Kafka等,但是也可以基于redis來實現(xiàn),可以降低系統(tǒng)的維護成本和實現(xiàn)復(fù)雜度,本篇介紹redis中實現(xiàn)消息隊列的幾種方案。


          ? ?1. 基于List的 LPUSH+BRPOP 的實現(xiàn)
          ? ?2. PUB/SUB,訂閱/發(fā)布模式
          ? ?3. 基于Sorted-Set的實現(xiàn)
          ? ?4. 基于Stream類型的實現(xiàn)


          基于異步消息隊列List lpush-brpop(rpush-blpop)


          使用rpush和lpush操作入隊列,lpop和rpop操作出隊列。


          List支持多個生產(chǎn)者和消費者并發(fā)進出消息,每個消費者拿到都是不同的列表元素。


          但是當隊列為空時,lpop和rpop會一直空輪訓(xùn),消耗資源;所以引入阻塞讀blpop和brpop(b代表blocking),阻塞讀在隊列沒有數(shù)據(jù)的時候進入休眠狀態(tài),?一旦數(shù)據(jù)到來則立刻醒過來,消息延遲幾乎為零。


          ?注意


          你以為上面的方案很完美?還有個問題需要解決:空閑連接的問題。
          ?

          如果線程一直阻塞在那里,Redis客戶端的連接就成了閑置連接,閑置過久,服務(wù)器一般會主動斷開連接,減少閑置資源占用,這個時候blpop和brpop或拋出異常,所以在編寫客戶端消費者的時候要小心,如果捕獲到異常,還有重試。

          ?

          缺點:


          • 做消費者確認ACK麻煩,不能保證消費者消費消息后是否成功處理的問題(宕機或處理異常等),通常需要維護一個Pending列表,保證消息處理確認。

          • 不能做廣播模式,如pub/sub,消息發(fā)布/訂閱模型

          • 不能重復(fù)消費,一旦消費就會被刪除

          • 不支持分組消費


          PUB/SUB,訂閱/發(fā)布模式


          • SUBSCRIBE,用于訂閱信道

          • PUBLISH,向信道發(fā)送消息

          • UNSUBSCRIBE,取消訂閱


          此模式允許生產(chǎn)者只生產(chǎn)一次消息,由中間件負責(zé)將消息復(fù)制到多個消息隊列,每個消息隊列由對應(yīng)的消費組消費。


          優(yōu)點


          • 典型的廣播模式,一個消息可以發(fā)布到多個消費者

          • 多信道訂閱,消費者可以同時訂閱多個信道,從而接收多類消息

          • 消息即時發(fā)送,消息不用等待消費者讀取,消費者會自動接收到信道發(fā)布的消息


          缺點


          • 消息一旦發(fā)布,不能接收。換句話就是發(fā)布時若客戶端不在線,則消息丟失,不能尋回

          • 不能保證每個消費者接收的時間是一致的

          • 若消費者客戶端出現(xiàn)消息積壓,到一定程度,會被強制斷開,導(dǎo)致消息意外丟失。通常發(fā)生在消息的生產(chǎn)遠大于消費速度時


          可見,Pub/Sub 模式不適合做消息存儲,消息積壓類的業(yè)務(wù),而是擅長處理廣播,即時通訊,即時反饋的業(yè)務(wù)。


          基于Sorted-Set的實現(xiàn)


          Sortes Set(有序列表),類似于java的SortedSet和HashMap的結(jié)合體,一方面她是一個set,保證內(nèi)部value的唯一性,另一方面它可以給每個value賦予一個score,代表這個value的排序權(quán)重。內(nèi)部實現(xiàn)是“跳躍表”。


          有序集合的方案是在自己確定消息順I(yè)D時比較常用,使用集合成員的Score來作為消息ID,保證順序,還可以保證消息ID的單調(diào)遞增。通常可以使用時間戳+序號的方案。確保了消息ID的單調(diào)遞增,利用SortedSet的依據(jù)


          Score排序的特征,就可以制作一個有序的消息隊列了。


          優(yōu)點


          就是可以自定義消息ID,在消息ID有意義時,比較重要。
          ?

          缺點
          ?

          缺點也明顯,不允許重復(fù)消息(因為是集合),同時消息ID確定有錯誤會導(dǎo)致消息的順序出錯。


          基于Stream類型的實現(xiàn)


          Stream為redis 5.0后新增的數(shù)據(jù)結(jié)構(gòu)。支持多播的可持久化消息隊列,實現(xiàn)借鑒了Kafka設(shè)計。

          Redis Stream的結(jié)構(gòu)如上圖所示,它有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的ID和對應(yīng)的內(nèi)容。消息是持久化的,Redis重啟后,內(nèi)容還在。

          每個Stream都有唯一的名稱,它就是Redis的key,在我們首次使用xadd指令追加消息時自動創(chuàng)建。


          每個Stream都可以掛多個消費組,每個消費組會有個游標last_delivered_id在Stream數(shù)組之上往前移動,表示當前消費組已經(jīng)消費到哪條消息了。每個消費組都有一個Stream內(nèi)唯一的名稱,消費組不會自動創(chuàng)建,它需要單獨的指令xgroup create進行創(chuàng)建,需要指定從Stream的某個消息ID開始消費,這個ID用來初始化last_delivered_id變量。


          每個消費組(Consumer Group)的狀態(tài)都是獨立的,相互不受影響。也就是說同一份Stream內(nèi)部的消息會被每個消費組都消費到。


          同一個消費組(Consumer Group)可以掛接多個消費者(Consumer),這些消費者之間是競爭關(guān)系,任意一個消費者讀取了消息都會使游標last_delivered_id往前移動。每個消費者者有一個組內(nèi)唯一名稱。


          消費者(Consumer)內(nèi)部會有個狀態(tài)變量pending_ids,它記錄了當前已經(jīng)被客戶端讀取的消息,但是還沒有ack。如果客戶端沒有ack,這個變量里面的消息ID會越來越多,一旦某個消息被ack,它就開始減少。這個pending_ids變量在Redis官方被稱之為PEL,也就是Pending Entries List,這是一個很核心的數(shù)據(jù)結(jié)構(gòu),它用來確保客戶端至少消費了消息一次,而不會在網(wǎng)絡(luò)傳輸?shù)闹型緛G失了沒處理。


          增刪改查


          1. xadd 追加消息

          2. xdel 刪除消息,這里的刪除僅僅是設(shè)置了標志位,不影響消息總長度

          3. xrange 獲取消息列表,會自動過濾已經(jīng)刪除的消息

          4. xlen 消息長度

          5. del 刪除Stream


          獨立消費


          我們可以在不定義消費組的情況下進行Stream消息的獨立消費,當Stream沒有新消息時,甚至可以阻塞等待。Redis設(shè)計了一個單獨的消費指令xread,可以將Stream當成普通的消息隊列(list)來使用。使用xread時,我們可以完全忽略消費組(Consumer Group)的存在,就好比Stream就是一個普通的列表(list)。


          創(chuàng)建消費組


          Stream通過xgroup create指令創(chuàng)建消費組(Consumer Group),需要傳遞起始消息ID參數(shù)用來初始化last_delivered_id變量。


          消費


          Stream提供了xreadgroup指令可以進行消費組的組內(nèi)消費,需要提供消費組名稱、消費者名稱和起始消息ID。它同xread一樣,也可以阻塞等待新消息。讀到新消息后,對應(yīng)的消息ID就會進入消費者的PEL(正在處理的消息)結(jié)構(gòu)里,客戶端處理完畢后使用xack指令通知服務(wù)器,本條消息已經(jīng)處理完畢,該消息ID就會從PEL中移除。


          Stream消息太多怎么辦


          讀者很容易想到,要是消息積累太多,Stream的鏈表豈不是很長,內(nèi)容會不會爆掉就是個問題了。xdel指令又不會刪除消息,它只是給消息做了個標志位。


          Redis自然考慮到了這一點,所以它提供了一個定長Stream功能。
          在xadd的指令提供一個定長長度maxlen,就可以將老的消息干掉,確保最多不超過指定長度。


          127.0.0.1:6379> xlen?codehole
          (integer) 5
          127.0.0.1:6379> xadd?codehole?maxlen?3 * name?xiaorui?age?1
          1527855160273-0
          127.0.0.1:6379> xlen?codehole
          (integer) 3


          我們看到Stream的長度被砍掉了。


          消息如果忘記ACK會怎樣


          Stream在每個消費者結(jié)構(gòu)中保存了正在處理中的消息ID列表PEL,如果消費者收到了消息處理完了但是沒有回復(fù)ack,就會導(dǎo)致PEL列表不斷增長,如果有很多消費組的話,那么這個PEL占用的內(nèi)存就會放大。


          PEL如何避免消息丟失


          在客戶端消費者讀取Stream消息時,Redis服務(wù)器將消息回復(fù)給客戶端的過程中,客戶端突然斷開了連接,消息就丟失了。但是PEL里已經(jīng)保存了發(fā)出去的消息ID。待客戶端重新連上之后,可以再次收到PEL中的消息ID列表。不過此時xreadgroup的起始消息必須是任意有效的消息ID,一般將參數(shù)設(shè)為0-0,表示讀取所有的PEL消息以及自last_delivered_id之后的新消息。


          分區(qū)Partition


          Redis沒有原生支持分區(qū)的能力,想要使用分區(qū),需要分配多個Stream,然后在客戶端使用一定的策略來講消息放入不同的stream。


          結(jié)論


          Stream的消費模型借鑒了kafka的消費分組的概念,它彌補了Redis Pub/Sub不能持久化消息的缺陷。但是它又不同于kafka,kafka的消息可以分partition,而Stream不行。如果非要分parition的話,得在客戶端做,提供不同的Stream名稱,對消息進行hash取模來選擇往哪個Stream里塞。如果讀者稍微研究過Redis作者的另一個開源項目Disque的話,這極可能是作者意識到Disque項目的活躍程度不夠,所以將Disque的內(nèi)容移植到了Redis里面。這只是本人的猜測,未必是作者的初衷。如果讀者有什么不同的想法,可以在評論區(qū)一起參與討論。


          原文鏈接:cdeason.cn/2019/08/25/Redis實現(xiàn)消息隊列的方案/




          瀏覽 68
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产l精品久久久久久久久久 | 国产精品久久久久久久免费大片 | 欧美激情性爱网站 | 台湾精品无码 | 无码一区.一区 |