Redis 中如何實(shí)現(xiàn)的消息隊(duì)列?實(shí)現(xiàn)的方式有幾種
Redis 中實(shí)現(xiàn)消息隊(duì)列的方式有幾種
1、使用 List 類(lèi)型實(shí)現(xiàn)
2、使用 ZSet 類(lèi)型實(shí)現(xiàn)
3、使用發(fā)布訂閱者模式實(shí)現(xiàn)消息隊(duì)列;
4、使用 Stream 實(shí)現(xiàn)消息隊(duì)列。
幾種消息隊(duì)列具體使用和優(yōu)缺點(diǎn)
1、List 類(lèi)型實(shí)現(xiàn)的方式最為簡(jiǎn)單和直接,它主要是通過(guò) lpush、rpop 存入和讀取實(shí)現(xiàn)消息隊(duì)列的,如下圖所示:

lpush 可以把最新的消息存儲(chǔ)到消息隊(duì)列(List 集合)的首部,而 rpop 可以讀取消息隊(duì)列的尾部,這樣就實(shí)現(xiàn)了先進(jìn)先出,如下圖所示:

優(yōu)點(diǎn):使用 List 實(shí)現(xiàn)消息隊(duì)列的優(yōu)點(diǎn)是消息可以被持久化,List 可以借助 Redis 本身的持久化功能,AOF 或者是 RDB 或混合持久化的方式,用于把數(shù)據(jù)保存至磁盤(pán),這樣當(dāng) Redis 重啟之后,消息不會(huì)丟失。
缺點(diǎn):但使用 List 同樣存在一定的問(wèn)題,比如消息不支持重復(fù)消費(fèi)、沒(méi)有按照主題訂閱的功能、不支持消費(fèi)消息確認(rèn)等。
2、ZSet 實(shí)現(xiàn)消息隊(duì)列:它是利用 zadd 和 zrangebyscore 來(lái)實(shí)現(xiàn)存入和讀取消息的。
優(yōu)點(diǎn):同樣具備持久化的功能
缺點(diǎn):List 存在的問(wèn)題它也同樣存在,不但如此,使用 ZSet 還不能存儲(chǔ)相同元素的值。因?yàn)樗怯行蚣希行蚣系拇鎯?chǔ)元素值是不能重復(fù)的,但分值可以重復(fù),也就是說(shuō)當(dāng)消息值重復(fù)時(shí),只能存儲(chǔ)一條信息在 ZSet 中。
3、發(fā)布訂閱:使用發(fā)布和訂閱的類(lèi)型,我們可以實(shí)現(xiàn)主題訂閱的功能,也就是 Pattern Subscribe 的功能。因此我們可以使用一個(gè)消費(fèi)者“queue_*”來(lái)訂閱所有以“queue_”開(kāi)頭的消息隊(duì)列,如下圖所示:

優(yōu)點(diǎn):可以按照主題訂閱方式
缺點(diǎn):
a、無(wú)法持久化保存消息,如果 Redis 服務(wù)器宕機(jī)或重啟,那么所有的消息將會(huì)丟失;
b、發(fā)布訂閱模式是“發(fā)后既忘”的工作模式,如果有訂閱者離線重連之后就不能消費(fèi)之前的歷史消息;
c、不支持消費(fèi)者確認(rèn)機(jī)制,穩(wěn)定性不能得到保證,例如當(dāng)消費(fèi)者獲取到消息之后,還沒(méi)來(lái)得及執(zhí)行就宕機(jī)了。因?yàn)闆](méi)有消費(fèi)者確認(rèn)機(jī)制,Redis 就會(huì)誤以為消費(fèi)者已經(jīng)執(zhí)行了,因此就不會(huì)重復(fù)發(fā)送未被正常消費(fèi)的消息了,這樣整體的 Redis 穩(wěn)定性就被沒(méi)有辦法得到保障了。
4、Stream 類(lèi)型實(shí)現(xiàn):使用 Stream 的 xadd 和 xrange 來(lái)實(shí)現(xiàn)消息的存入和讀取了,并且 Stream 提供了 xack 手動(dòng)確認(rèn)消息消費(fèi)的命令,用它我們就可以實(shí)現(xiàn)消費(fèi)者確認(rèn)的功能了,使用命令如下:
127.0.0.1:6379> xack mq group1 1580959593553-0
(integer) 1消費(fèi)確認(rèn)增加了消息的可靠性,一般在業(yè)務(wù)處理完成之后,需要執(zhí)行 ack 確認(rèn)消息已經(jīng)被消費(fèi)完成,整個(gè)流程的執(zhí)行如下圖所示:

其中“Group”為群組,消費(fèi)者也就是接收者需要訂閱到群組才能正常獲取到消息。
在 Java 代碼中使用 List 實(shí)現(xiàn)消息隊(duì)列會(huì)有什么問(wèn)題?應(yīng)該如何解決?
先看代碼部分實(shí)現(xiàn):
import redis.clients.jedis.Jedis;
publicclass ListMQTest {
public static void main(String[] args){
// 啟動(dòng)一個(gè)線程作為消費(fèi)者
new Thread(() -> consumer()).start();
// 生產(chǎn)者
producer();
}
/**
* 生產(chǎn)者
*/
public static void producer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 推送消息
jedis.lpush("mq", "Hello, List.");
}
/**
* 消費(fèi)者
*/
public static void consumer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 消費(fèi)消息
while (true) {
// 獲取消息
String msg = jedis.rpop("mq");
if (msg != null) {
// 接收到了消息
System.out.println("接收到消息:" + msg);
}
}
}
}可以看出以上消費(fèi)者的實(shí)現(xiàn)是通過(guò) while 無(wú)限循環(huán)來(lái)獲取消息,但如果消息的空閑時(shí)間比較長(zhǎng),一直沒(méi)有新任務(wù),而 while 循環(huán)不會(huì)因此停止,它會(huì)一直執(zhí)行循環(huán)的動(dòng)作,這樣就會(huì)白白浪費(fèi)了系統(tǒng)的資源。
解決辦法:借助 Redis 中的阻塞讀來(lái)替代 rpop 的方法就可以解決此問(wèn)題
import redis.clients.jedis.Jedis;
public class ListMQExample {
public static void main(String[] args) throws InterruptedException {
// 消費(fèi)者
new Thread(() -> bConsumer()).start();
// 生產(chǎn)者
producer();
}
/**
* 生產(chǎn)者
*/
public static void producer() throws InterruptedException {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 推送消息
jedis.lpush("mq", "Hello, Java.");
Thread.sleep(1000);
jedis.lpush("mq", "message 2.");
Thread.sleep(2000);
jedis.lpush("mq", "message 3.");
}
/**
* 消費(fèi)者(阻塞版)
*/
public static void bConsumer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
while (true) {
// 阻塞讀
for (String item : jedis.brpop(0,"mq")) {
// 讀取到相關(guān)數(shù)據(jù),進(jìn)行業(yè)務(wù)處理
System.out.println(item);
}
}
}
}使用 brpop 替代 rpop 來(lái)讀取最后一條消息,就可以解決 while 循環(huán)在沒(méi)有數(shù)據(jù)的情況下,一直循環(huán)消耗系統(tǒng)資源的情況了。brpop 中的 b 是 blocking 的意思,表示阻塞讀,也就是當(dāng)隊(duì)列沒(méi)有數(shù)據(jù)時(shí),它會(huì)進(jìn)入休眠狀態(tài),當(dāng)有數(shù)據(jù)進(jìn)入隊(duì)列之后,它才會(huì)“蘇醒”過(guò)來(lái)執(zhí)行讀取任務(wù),這樣就可以解決 while 循環(huán)一直執(zhí)行消耗系統(tǒng)資源的問(wèn)題了。
在程序中如何使用 Stream 來(lái)實(shí)現(xiàn)消息隊(duì)列
在開(kāi)始實(shí)現(xiàn)消息隊(duì)列之前,我們必須先創(chuàng)建分組才行,因?yàn)橄M(fèi)者需要關(guān)聯(lián)分組信息才能正常運(yùn)行,具體實(shí)現(xiàn)代碼如下:
import com.google.gson.Gson;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import utils.JedisUtils;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class StreamGroupExample {
private static final String _STREAM_KEY = "mq"; // 流 key
private static final String _GROUP_NAME = "g1"; // 分組名稱
private static final String _CONSUMER_NAME = "c1"; // 消費(fèi)者 1 的名稱
private static final String _CONSUMER2_NAME = "c2"; // 消費(fèi)者 2 的名稱
public static void main(String[] args) {
// 生產(chǎn)者
producer();
// 創(chuàng)建消費(fèi)組
createGroup(_STREAM_KEY, _GROUP_NAME);
// 消費(fèi)者 1
new Thread(() -> consumer()).start();
// 消費(fèi)者 2
new Thread(() -> consumer2()).start();
}
/**
* 創(chuàng)建消費(fèi)分組
* @param stream 流 key
* @param groupName 分組名稱
*/
public static void createGroup(String stream, String groupName) {
Jedis jedis = JedisUtils.getJedis();
jedis.xgroupCreate(stream, groupName, new StreamEntryID(), true);
}
/**
* 生產(chǎn)者
*/
public static void producer() {
Jedis jedis = JedisUtils.getJedis();
// 添加消息 1
Map<String, String> map = new HashMap<>();
map.put("data", "redis");
StreamEntryID id = jedis.xadd(_STREAM_KEY, null, map);
System.out.println("消息添加成功 ID:" + id);
// 添加消息 2
Map<String, String> map2 = new HashMap<>();
map2.put("data", "java");
StreamEntryID id2 = jedis.xadd(_STREAM_KEY, null, map2);
System.out.println("消息添加成功 ID:" + id2);
}
/**
* 消費(fèi)者 1
*/
public static void consumer() {
Jedis jedis = JedisUtils.getJedis();
// 消費(fèi)消息
while (true) {
// 讀取消息
Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,
new StreamEntryID().UNRECEIVED_ENTRY);
// 阻塞讀取一條消息(最大阻塞時(shí)間120s)
List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER_NAME, 1,
120 * 1000, true, entry);
if (list != null && list.size() == 1) {
// 讀取到消息
Map<String, String> content = list.get(0).getValue().get(0).getFields(); // 消息內(nèi)容
System.out.println("Consumer 1 讀取到消息 ID:" + list.get(0).getValue().get(0).getID() +
" 內(nèi)容:" + new Gson().toJson(content));
}
}
}
/**
* 消費(fèi)者 2
*/
public static void consumer2() {
Jedis jedis = JedisUtils.getJedis();
// 消費(fèi)消息
while (true) {
// 讀取消息
Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,
new StreamEntryID().UNRECEIVED_ENTRY);
// 阻塞讀取一條消息(最大阻塞時(shí)間120s)
List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER2_NAME, 1,
120 * 1000, true, entry);
if (list != null && list.size() == 1) {
// 讀取到消息
Map<String, String> content = list.get(0).getValue().get(0).getFields(); // 消息內(nèi)容
System.out.println("Consumer 2 讀取到消息 ID:" + list.get(0).getValue().get(0).getID() +
" 內(nèi)容:" + new Gson().toJson(content));
}
}
}
}以上代碼運(yùn)行結(jié)果如下:
消息添加成功 ID:1580971482344-0
消息添加成功 ID:1580971482415-0
Consumer 1 讀取到消息 ID:1580971482344-0 內(nèi)容:{"data":"redis"}
Consumer 2 讀取到消息 ID:1580971482415-0 內(nèi)容:{"data":"java"}其中,jedis.xreadGroup() 方法的第五個(gè)參數(shù) noAck 表示是否自動(dòng)確認(rèn)消息,如果設(shè)置 true 收到消息會(huì)自動(dòng)確認(rèn) (ack) 消息,否則需要手動(dòng)確認(rèn)。
可以看出,同一個(gè)分組內(nèi)的多個(gè) consumer 會(huì)讀取到不同消息,不同的 consumer 不會(huì)讀取到分組內(nèi)的同一條消息。
