<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Redis Stream實(shí)現(xiàn)消息隊(duì)列中間件

          共 3139字,需瀏覽 7分鐘

           ·

          2022-06-07 22:03

          ?專注于PHP、MySQL、Linux和前端開發(fā),感興趣的感謝點(diǎn)個(gè)關(guān)注喲!文章整理在GitHub,Gitee主要包含的技術(shù)有PHP、Redis、MySQL、JavaScript、HTML&CSS、Linux、Java、Golang、Linux和工具資源等相關(guān)理論知識(shí)、面試題和實(shí)戰(zhàn)內(nèi)容。

          @author:7small7。

          @source:公眾號(hào)-菜鳥成長學(xué)習(xí)筆記。

          @project:微信小程序 程序員面試題大全。

          Redis實(shí)現(xiàn)隊(duì)列功能

          在日常開發(fā)中,很多時(shí)候我們可能會(huì)使用隊(duì)列實(shí)現(xiàn)異步任務(wù)的分發(fā)。例如用戶下單的積分成長值增加、消息發(fā)送等等常見。這種場(chǎng)景可以使用Redis中的list數(shù)據(jù)類型來實(shí)現(xiàn)隊(duì)列功能。但存在不足的幾點(diǎn):

          1. 程序異常如何實(shí)現(xiàn)隊(duì)列消息回滾?

          2. 消息消費(fèi)中,進(jìn)程異常如何保證消息不丟失?

          3. 多消費(fèi)者該如何處理?

          Redis Stream是什么?

          1. Redis Stream 是 Redis 5.0 版本新增加的數(shù)據(jù)結(jié)構(gòu)。

          2. Redis Stream 主要用于消息隊(duì)列(MQ,Message Queue),Redis 本身是有一個(gè) Redis 發(fā)布訂閱 (pub/sub) 來實(shí)現(xiàn)消息隊(duì)列的功能,但它有個(gè)缺點(diǎn)就是消息無法持久化,如果出現(xiàn)網(wǎng)絡(luò)斷開、Redis 宕機(jī)等,消息就會(huì)被丟棄。簡單來說發(fā)布訂閱 (pub/sub) 可以分發(fā)消息,但無法記錄歷史消息。而 Redis Stream 提供了消息的持久化和主備復(fù)制功能,可以讓任何客戶端訪問任何時(shí)刻的數(shù)據(jù),并且能記住每一個(gè)客戶端的訪問位置,還能保證消息不丟失。

          3. Redis Stream 的結(jié)構(gòu)如下所示,它有一個(gè)消息鏈表,將所有加入的消息都串起來,每個(gè)消息都有一個(gè)唯一的 ID 和對(duì)應(yīng)的內(nèi)容:

          組成部分

          1. Redis Stream由消息ID、消費(fèi)者群組和消費(fèi)者三大部分組成。每一次添加消息到Streams中,消息ID會(huì)向后增加。消息ID可以手動(dòng)指定也可以有Redis內(nèi)部自動(dòng)生成。消息ID只能是整數(shù),采用Redis自動(dòng)生成時(shí),組成的部分是當(dāng)前時(shí)間毫秒時(shí)間戳-當(dāng)前毫秒數(shù)生成的序號(hào)。每一個(gè)消費(fèi)者ID對(duì)應(yīng)一個(gè)消費(fèi)者群組,每一個(gè)消費(fèi)者群組下面又存在多個(gè)消費(fèi)者。

          2. 一個(gè)消息隊(duì)列,可以存在多個(gè)消費(fèi)者群組。每個(gè)消費(fèi)者群組之間消息消息是隔離的。

          3. 每個(gè)消費(fèi)者群組消費(fèi)一次消息之后,last_delivered_id會(huì)自動(dòng)往后移動(dòng)。當(dāng)該群組下的消費(fèi)者消費(fèi)消息之后,其他的消費(fèi)者就不能接著消費(fèi)該消息。每一個(gè)消費(fèi)者消費(fèi)之后,嚴(yán)格需要ack進(jìn)行確認(rèn),該消息才會(huì)被標(biāo)識(shí)為真正消費(fèi)。否則Pending_ids[]將記錄未進(jìn)行ack的消息。

          基礎(chǔ)操作

          隊(duì)列操作

          1. 添加隊(duì)列,xadd 隊(duì)列名稱 隊(duì)列id key1 value1 key2 value2 ....
          127.0.0.1:6379>?xadd?stream1?*?id?1?name?lisi?age?12
          "1654413784100-0"

          隊(duì)列ID的組成部分由當(dāng)前的時(shí)間戳-序號(hào)生成組成,并且都只能是整數(shù)。如果當(dāng)前同一個(gè)時(shí)間戳內(nèi)生成多個(gè)ID,則序號(hào)自增的。這種方式也可以解決時(shí)間回?fù)軉栴}。

          1. 隊(duì)列長度
          127.0.0.1:6379>?xlen?stream1
          (integer)?1
          1. 隊(duì)列元素,xrange 隊(duì)列名稱 隊(duì)列起始位置 隊(duì)列結(jié)束位置 [count 讀取個(gè)數(shù)]
          127.0.0.1:6379>?xrange?stream1?-?+
          1)?1)?"1654413784100-0"
          ???2)?1)?"id"
          ??????2)?"1"
          ??????3)?"name"
          ??????4)?"lisi"
          ??????5)?"age"
          ??????6)?"12"

          起始位置實(shí)則會(huì)消息的ID,使用-、+作為起始位置表示從隊(duì)列的開始位置和結(jié)束位置開始讀取。

          消費(fèi)組操作

          插入隊(duì)列數(shù)據(jù)

          首先我們創(chuàng)建一個(gè)隊(duì)列,并向其中插入消息。

          127.0.0.1:6379>?xadd?stream1?*?id?1?name?zhangsan?age?12
          "1654335810726-0"
          127.0.0.1:6379>?xadd?stream1?*?id?2?name?lisi?age?12
          "1654335821262-0"
          127.0.0.1:6379>?xadd?stream1?*?id?3?name?wangwu?age?12
          "1654335829951-0"
          127.0.0.1:6379>?xadd?stream1?*?id?4?name?tony?age?12
          "1654335839247-0"
          127.0.0.1:6379>?xadd?stream1?*?id?5?name?tom?age?12
          "1654335850424-0"

          創(chuàng)建消費(fèi)者群組

          向隊(duì)列創(chuàng)建消費(fèi)群組。xgroup create 隊(duì)列名稱 消費(fèi)者組 消息ID開始位置-消息ID結(jié)束位置

          127.0.0.1:6379>?xgroup?create?stream1?cg1?0-0
          OK
          127.0.0.1:6379>?xgroup?create?stream1?cg2?0-0
          OK

          查看消費(fèi)者群組

          查看消費(fèi)者群組。xinfo groups 隊(duì)列名稱

          127.0.0.1:6379>?xinfo?groups?stream1
          1)??1)?"name"//?群組名稱
          ????2)?"cg1"
          ????3)?"consumers"//?群組下消費(fèi)者數(shù)量
          ????4)?(integer)?0
          ????5)?"pending"//?待確認(rèn)的ack消息數(shù)
          ????6)?(integer)?0
          ????7)?"last-delivered-id"//?消費(fèi)群組最后一次消費(fèi)的消息ID
          ????8)?"0-0"
          ????9)?"entries-read"
          ???10)?(nil)
          ???11)?"lag"
          ???12)?(integer)?5
          2)??1)?"name"
          ????2)?"cg2"
          ????3)?"consumers"
          ????4)?(integer)?0
          ????5)?"pending"
          ????6)?(integer)?0
          ????7)?"last-delivered-id"
          ????8)?"0-0"
          ????9)?"entries-read"
          ???10)?(nil)
          ???11)?"lag"
          ???12)?(integer)?5

          讀取消費(fèi)組信息

          消費(fèi)隊(duì)列消息。xreadgroup group 消費(fèi)者群組名 消費(fèi)者名稱 count 消費(fèi)個(gè)數(shù) streams 隊(duì)列名稱 消息消費(fèi)ID

          127.0.0.1:6379>?xreadgroup?group?cg1?c1?count?1?streams?stream1?>
          1)?1)?"stream1"
          ???1)?1)?1)?"1654335810726-0"
          ?????????1)?1)?"id"
          ????????????1)?"1"
          ????????????2)?"name"
          ????????????3)?"zhangsan"
          ????????????4)?"age"
          ????????????5)?"12"
          127.0.0.1:6379>?xreadgroup?group?cg1?c2?count?1?streams?stream1?>
          1)?1)?"stream1"
          ???2)?1)?1)?"1654335821262-0"
          ?????????2)?1)?"id"
          ????????????2)?"2"
          ????????????3)?"name"
          ????????????4)?"lisi"
          ????????????5)?"age"
          ????????????6)?"12"

          瀏覽 80
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天天插天天日天天干 | 性猛交XXXXX富婆免费视频 | 大香蕉99热 | 青青草成人免费视频 | 先锋乱伦一区 |