Redis Stream實(shí)現(xiàn)消息隊(duì)列中間件
?專注于PHP、MySQL、Linux和前端開發(fā),感興趣的感謝點(diǎn)個(gè)關(guān)注喲!!!文章整理在GitHub,Gitee主要包含的技術(shù)有PHP、Redis、MySQL、JavaScript、HTML&CSS、Linux、Java、Golang、Linux和工具資源等相關(guān)理論知識(shí)、面試題和實(shí)戰(zhàn)內(nèi)容。
@author:7small7。
@source:公眾號(hào)-菜鳥成長學(xué)習(xí)筆記。
@project:微信小程序 程序員面試題大全。
Redis實(shí)現(xiàn)隊(duì)列功能
在日常開發(fā)中,很多時(shí)候我們可能會(huì)使用隊(duì)列實(shí)現(xiàn)異步任務(wù)的分發(fā)。例如用戶下單的積分成長值增加、消息發(fā)送等等常見。這種場(chǎng)景可以使用Redis中的list數(shù)據(jù)類型來實(shí)現(xiàn)隊(duì)列功能。但存在不足的幾點(diǎn):
程序異常如何實(shí)現(xiàn)隊(duì)列消息回滾?
消息消費(fèi)中,進(jìn)程異常如何保證消息不丟失?
多消費(fèi)者該如何處理?
Redis Stream是什么?
Redis Stream 是 Redis 5.0 版本新增加的數(shù)據(jù)結(jié)構(gòu)。
Redis Stream 主要用于消息隊(duì)列(MQ,Message Queue),Redis 本身是有一個(gè) Redis 發(fā)布訂閱 (pub/sub) 來實(shí)現(xiàn)消息隊(duì)列的功能,但它有個(gè)缺點(diǎn)就是消息無法持久化,如果出現(xiàn)網(wǎng)絡(luò)斷開、Redis 宕機(jī)等,消息就會(huì)被丟棄。簡單來說發(fā)布訂閱 (pub/sub) 可以分發(fā)消息,但無法記錄歷史消息。而 Redis Stream 提供了消息的持久化和主備復(fù)制功能,可以讓任何客戶端訪問任何時(shí)刻的數(shù)據(jù),并且能記住每一個(gè)客戶端的訪問位置,還能保證消息不丟失。
Redis Stream 的結(jié)構(gòu)如下所示,它有一個(gè)消息鏈表,將所有加入的消息都串起來,每個(gè)消息都有一個(gè)唯一的 ID 和對(duì)應(yīng)的內(nèi)容:

組成部分
Redis Stream由消息ID、消費(fèi)者群組和消費(fèi)者三大部分組成。每一次添加消息到Streams中,消息ID會(huì)向后增加。消息ID可以手動(dòng)指定也可以有Redis內(nèi)部自動(dòng)生成。
消息ID只能是整數(shù),采用Redis自動(dòng)生成時(shí),組成的部分是當(dāng)前時(shí)間毫秒時(shí)間戳-當(dāng)前毫秒數(shù)生成的序號(hào)。每一個(gè)消費(fèi)者ID對(duì)應(yīng)一個(gè)消費(fèi)者群組,每一個(gè)消費(fèi)者群組下面又存在多個(gè)消費(fèi)者。一個(gè)消息隊(duì)列,可以存在多個(gè)消費(fèi)者群組。每個(gè)消費(fèi)者群組之間消息消息是隔離的。
每個(gè)消費(fèi)者群組消費(fèi)一次消息之后,
last_delivered_id會(huì)自動(dòng)往后移動(dòng)。當(dāng)該群組下的消費(fèi)者消費(fèi)消息之后,其他的消費(fèi)者就不能接著消費(fèi)該消息。每一個(gè)消費(fèi)者消費(fèi)之后,嚴(yán)格需要ack進(jìn)行確認(rèn),該消息才會(huì)被標(biāo)識(shí)為真正消費(fèi)。否則Pending_ids[]將記錄未進(jìn)行ack的消息。
基礎(chǔ)操作
隊(duì)列操作
添加隊(duì)列, xadd 隊(duì)列名稱 隊(duì)列id key1 value1 key2 value2 ....。
127.0.0.1:6379>?xadd?stream1?*?id?1?name?lisi?age?12
"1654413784100-0"
隊(duì)列ID的組成部分由當(dāng)前的時(shí)間戳-序號(hào)生成組成,并且都只能是整數(shù)。如果當(dāng)前同一個(gè)時(shí)間戳內(nèi)生成多個(gè)ID,則序號(hào)自增的。這種方式也可以解決時(shí)間回?fù)軉栴}。
隊(duì)列長度
127.0.0.1:6379>?xlen?stream1
(integer)?1
隊(duì)列元素, xrange 隊(duì)列名稱 隊(duì)列起始位置 隊(duì)列結(jié)束位置 [count 讀取個(gè)數(shù)]。
127.0.0.1:6379>?xrange?stream1?-?+
1)?1)?"1654413784100-0"
???2)?1)?"id"
??????2)?"1"
??????3)?"name"
??????4)?"lisi"
??????5)?"age"
??????6)?"12"
起始位置實(shí)則會(huì)消息的ID,使用-、+作為起始位置表示從隊(duì)列的開始位置和結(jié)束位置開始讀取。
消費(fèi)組操作
插入隊(duì)列數(shù)據(jù)
首先我們創(chuàng)建一個(gè)隊(duì)列,并向其中插入消息。
127.0.0.1:6379>?xadd?stream1?*?id?1?name?zhangsan?age?12
"1654335810726-0"
127.0.0.1:6379>?xadd?stream1?*?id?2?name?lisi?age?12
"1654335821262-0"
127.0.0.1:6379>?xadd?stream1?*?id?3?name?wangwu?age?12
"1654335829951-0"
127.0.0.1:6379>?xadd?stream1?*?id?4?name?tony?age?12
"1654335839247-0"
127.0.0.1:6379>?xadd?stream1?*?id?5?name?tom?age?12
"1654335850424-0"
創(chuàng)建消費(fèi)者群組
向隊(duì)列創(chuàng)建消費(fèi)群組。xgroup create 隊(duì)列名稱 消費(fèi)者組 消息ID開始位置-消息ID結(jié)束位置。
127.0.0.1:6379>?xgroup?create?stream1?cg1?0-0
OK
127.0.0.1:6379>?xgroup?create?stream1?cg2?0-0
OK
查看消費(fèi)者群組
查看消費(fèi)者群組。xinfo groups 隊(duì)列名稱。
127.0.0.1:6379>?xinfo?groups?stream1
1)??1)?"name"//?群組名稱
????2)?"cg1"
????3)?"consumers"//?群組下消費(fèi)者數(shù)量
????4)?(integer)?0
????5)?"pending"//?待確認(rèn)的ack消息數(shù)
????6)?(integer)?0
????7)?"last-delivered-id"//?消費(fèi)群組最后一次消費(fèi)的消息ID
????8)?"0-0"
????9)?"entries-read"
???10)?(nil)
???11)?"lag"
???12)?(integer)?5
2)??1)?"name"
????2)?"cg2"
????3)?"consumers"
????4)?(integer)?0
????5)?"pending"
????6)?(integer)?0
????7)?"last-delivered-id"
????8)?"0-0"
????9)?"entries-read"
???10)?(nil)
???11)?"lag"
???12)?(integer)?5
讀取消費(fèi)組信息
消費(fèi)隊(duì)列消息。xreadgroup group 消費(fèi)者群組名 消費(fèi)者名稱 count 消費(fèi)個(gè)數(shù) streams 隊(duì)列名稱 消息消費(fèi)ID。
127.0.0.1:6379>?xreadgroup?group?cg1?c1?count?1?streams?stream1?>
1)?1)?"stream1"
???1)?1)?1)?"1654335810726-0"
?????????1)?1)?"id"
????????????1)?"1"
????????????2)?"name"
????????????3)?"zhangsan"
????????????4)?"age"
????????????5)?"12"
127.0.0.1:6379>?xreadgroup?group?cg1?c2?count?1?streams?stream1?>
1)?1)?"stream1"
???2)?1)?1)?"1654335821262-0"
?????????2)?1)?"id"
????????????2)?"2"
????????????3)?"name"
????????????4)?"lisi"
????????????5)?"age"
????????????6)?"12"

