別再用 Redis List 實(shí)現(xiàn)消息隊(duì)列了,Stream 專為隊(duì)列而生
使用 Redis 的 List 實(shí)現(xiàn)消息隊(duì)列有很多局限性,比如:
沒有良好的 ACK 機(jī)制;
沒有 ConsumerGroup 消費(fèi)組概念;
消息堆積。
List 是線性結(jié)構(gòu),想要查詢指定數(shù)據(jù)需要遍歷整個(gè)列表;
Stream 是 Redis 5.0 引入的一種專門為消息隊(duì)列設(shè)計(jì)的數(shù)據(jù)類型,Stream 是一個(gè)包含 0 個(gè)或者多個(gè)元素的有序隊(duì)列,這些元素根據(jù) ID 的大小進(jìn)行有序排列。
它實(shí)現(xiàn)了大部分消息隊(duì)列的功能:
消息 ID 系列化生成;
消息遍歷;
消息的阻塞和非阻塞讀;
Consumer Groups 消費(fèi)組;
ACK 確認(rèn)機(jī)制。
支持多播。
提供了很多消息隊(duì)列操作命令,并且借鑒 Kafka 的 Consumer Groups 的概念,提供了消費(fèi)組功能。
同時(shí)提供了消息的持久化和主從復(fù)制機(jī)制,客戶端可以訪問任何時(shí)刻的數(shù)據(jù),并且能記住每一個(gè)客戶端的訪問位置,從而保證消息不丟失。
廢話少說,先來看下如何使用,官網(wǎng)文檔詳見:https://redis.io/topics/streams-intro
?
XADD:插入消息
XADD 云嵐宗?* task kill name 蕭炎
"1645936602161-0"
XADD?streamName?id?field?value?[field?value?...]
當(dāng)前毫秒內(nèi)的時(shí)間戳; 順序編號(hào)。從 0 為起始值,用于區(qū)分同一時(shí)間內(nèi)產(chǎn)生的多個(gè)命令。
?
XREAD:讀取消息
XREAD?COUNT?1?BLOCK?0?STREAMS?云嵐宗?0-0
1)?1)?"\xe4\xba\x91\xe5\xb2\x9a\xe5\xae\x97"
???2)?1)?1)?"1645936602161-0"
?????????2)?1)?"task"
????????????2)?"kill"
????????????3)?"name"
????????????4)?"蕭炎"?#?蕭炎
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]COUNT:表示每個(gè)流中最多讀取的元素個(gè)數(shù); BLOCK:阻塞讀取,當(dāng)消息隊(duì)列沒有消息的時(shí)候,則阻塞等待, 0 表示無限等待,單位是毫秒。 ID:消息 ID,在讀取消息的時(shí)候可以指定 ID,并從這個(gè) ID 的下一條消息開始讀取,0-0 則表示從第一個(gè)元素開始讀取。
XREAD?COUNT?1?BLOCK?0?STREAMS?云嵐宗?$
XREAD COUNT 2 BLOCK 0 STREAMS 云嵐宗 0-0 指令的時(shí)候又會(huì)重新讀取到。?
ConsumerGroup

Redis Stream 的結(jié)構(gòu)如上圖所示。有一個(gè)消息鏈表,每個(gè)消息都有一個(gè)唯一的 ID 和對應(yīng)的內(nèi)容; 消息持久化; 每個(gè)消費(fèi)組的狀態(tài)是獨(dú)立的,不不影響,同一份的 Stream 消息會(huì)被所有的消費(fèi)組消費(fèi); 一個(gè)消費(fèi)組可以由多個(gè)消費(fèi)者組成,消費(fèi)者之間是競爭關(guān)系,任意一個(gè)消費(fèi)者讀取了消息都會(huì)使 last_deliverd_id 往前移動(dòng); 每個(gè)消費(fèi)者有一個(gè) pending_ids 變量,用于記錄當(dāng)前消費(fèi)者讀取了但是還沒 ack 的消息。它用來保證消息至少被客戶端消費(fèi)了一次。
XGROUP用于創(chuàng)建、銷毀和管理消費(fèi)者組。 XREADGROUP通過消費(fèi)組從流中讀取數(shù)據(jù)。 XACK是允許消費(fèi)者將待處理消息標(biāo)記為已正確處理的命令。
創(chuàng)建消費(fèi)組
XGROUP CREATE 指令創(chuàng)建消費(fèi)組 (Consumer Group),需要傳遞起始消息 ID 參數(shù)用來初始化 last_delivered_id 變量。XADD?bossStream?*?name?zhangsan?age?26
XADD?bossStream?*?name?lisi?age?2
XADD?bossStream?*?name?bigold?age?40
#?語法如下
#?XGROUP?CREATE?stream?group?start_id
XGROUP?CREATE?bossStream?青龍門?0-0?MKSTREAM
XGROUP?CREATE?bossStream?六扇門?0-0?MKSTREAM
stream:指定隊(duì)列的名字; group:指定消費(fèi)組名字; start_id:指定消費(fèi)組在 Stream 中的起始 ID,它決定了消費(fèi)者組從哪個(gè) ID 之后開始讀取消息, 0-0從第一條開始讀取,$表示從最后一條向后開始讀取,只接收新消息。MKSTREAM:默認(rèn)情況下,XGROUP CREATE命令在目標(biāo)流不存在時(shí)返回錯(cuò)誤。可以使用可選 MKSTREAM子命令作為 之后的最后一個(gè)參數(shù)來自動(dòng)創(chuàng)建流。
讀取消息
consumer1 從bossStream 阻塞讀取一條消息:XREADGROUP?GROUP?青龍門?consumer1?COUNT?1?BLOCK?0?STREAMS?bossStream?>
1)?1)?"bossStream"
???2)?1)?1)?"1645957821396-0"
?????????2)?1)?"name"
????????????2)?"zhangsan"
????????????3)?"age"
????????????4)?"26"
XREADGROUP?GROUP?groupName?consumerName?[COUNT?n]?[BLOCK?ms]?STREAMS?streamName?[stream?...]?id?[id?...]
XREAD 大同小異,區(qū)別在于新增 GROUP groupName consumerName 選項(xiàng)。>:命令的最后參數(shù)>,表示從尚未被消費(fèi)的消息開始讀取;BLOCK:阻塞讀取;
consumer2 執(zhí)行讀取操作:XREADGROUP?GROUP?青龍門?consumer2?COUNT?1?BLOCK?0?STREAMS?bossStream?>
1)?1)?"bossStream"
???2)?1)?1)?"1645957838700-0"
?????????2)?1)?"name"
????????????2)?"lisi"
????????????3)?"age"
????????????4)?"2"
consumer2 不能再讀取到 zhangsan 了,而是讀取下一條 lisi 因?yàn)檫@條消息已經(jīng)被 consumer1 讀取了。
XPENDING 查看已讀未確認(rèn)消息
XREADGROUP GROUP groupName consumerName 讀取消息,但是沒有給 Stream 發(fā)送 XACK 命令,消息依然保留。bossStream 中的 消費(fèi)組「青龍門」中各個(gè)消費(fèi)者已讀取未確認(rèn)的消息信息:XPENDING?bossStream?青龍門
1)?(integer)?2
2)?"1645957821396-0"
3)?"1645957838700-0"
4)?1)?1)?"consumer1"
??????2)?"1"
???2)?1)?"consumer2"
??????2)?"1"
1)未確認(rèn)消息條數(shù);2) ~ 3)青龍門中所有消費(fèi)者讀取的消息最小和最大 ID;
consumer1讀取了哪些數(shù)據(jù),使用以下命令:XPENDING?bossStream?青龍門?-?+?10?consumer1
1)?1)?"1645957821396-0"
???2)?"consumer1"
???3)?(integer)?3758384
???4)?(integer)?1
ACK 確認(rèn)
XACK?bossStream?青龍門?1645957821396-0?1645957838700-0
(integer)?2
XACK key group-key ID [ID ...]
?
使用 Redisson 實(shí)戰(zhàn)
<dependency>
??<groupId>org.redissongroupId>
??<artifactId>redisson-spring-boot-starterartifactId>
??<version>3.16.7version>
dependency>
spring:
??application:
????name:?redission
??redis:
????host:?127.0.0.1
????port:?6379
????ssl:?false
@Slf4j
@Service
public?class?QueueService?{
????@Autowired
????private?RedissonClient?redissonClient;
????/**
?????*?發(fā)送消息到隊(duì)列
?????*
?????*?@param?message
?????*/
????public?void?sendMessage(String?message)?{
????????RStream?stream?=?redissonClient.getStream("sensor#4921");
????????stream.add("speed",?"19");
????????stream.add("velocity",?"39%");
????????stream.add("temperature",?"10C");
????}
????/**
?????*?消費(fèi)者消費(fèi)消息
?????*
?????*?@param?message
?????*/
????public?void?consumerMessage(String?message)?{
????????RStream?stream?=?redissonClient.getStream("sensor#4921");
????????stream.createGroup("sensors_data",?StreamMessageId.ALL);
????????Map>?messages?=?stream.readGroup("sensors_data",?"consumer_1");
????????for?(Map.Entry>?entry?:?messages.entrySet())?{
??????????Map?msg?=?entry.getValue();
??????????System.out.println(msg);
??????????stream.ack("sensors_data",?entry.getKey());
????????}
????}
}
有道無術(shù),術(shù)可成;有術(shù)無道,止于術(shù)
歡迎大家關(guān)注Java之道公眾號(hào)
好文章,我在看??
