<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Redis 中如何實(shí)現(xiàn)的消息隊(duì)列?實(shí)現(xiàn)的方式有幾種

          共 7156字,需瀏覽 15分鐘

           ·

          2021-08-22 02:00

          Redis 中實(shí)現(xiàn)消息隊(duì)列的方式有幾種

          1、使用 List 類(lèi)型實(shí)現(xiàn)

          2、使用 ZSet 類(lèi)型實(shí)現(xiàn)

          3、使用發(fā)布訂閱者模式實(shí)現(xiàn)消息隊(duì)列;

          4、使用 Stream 實(shí)現(xiàn)消息隊(duì)列。

          幾種消息隊(duì)列具體使用和優(yōu)缺點(diǎn)

          1、List 類(lèi)型實(shí)現(xiàn)的方式最為簡(jiǎn)單和直接,它主要是通過(guò) lpush、rpop 存入和讀取實(shí)現(xiàn)消息隊(duì)列的,如下圖所示:


          lpush 可以把最新的消息存儲(chǔ)到消息隊(duì)列(List 集合)的首部,而 rpop 可以讀取消息隊(duì)列的尾部,這樣就實(shí)現(xiàn)了先進(jìn)先出,如下圖所示:


          優(yōu)點(diǎn):使用 List 實(shí)現(xiàn)消息隊(duì)列的優(yōu)點(diǎn)是消息可以被持久化,List 可以借助 Redis 本身的持久化功能,AOF 或者是 RDB 或混合持久化的方式,用于把數(shù)據(jù)保存至磁盤(pán),這樣當(dāng) Redis 重啟之后,消息不會(huì)丟失。

          缺點(diǎn):但使用 List 同樣存在一定的問(wèn)題,比如消息不支持重復(fù)消費(fèi)、沒(méi)有按照主題訂閱的功能、不支持消費(fèi)消息確認(rèn)等。

          2、ZSet 實(shí)現(xiàn)消息隊(duì)列:它是利用 zadd 和 zrangebyscore 來(lái)實(shí)現(xiàn)存入和讀取消息的。

          優(yōu)點(diǎn):同樣具備持久化的功能

          缺點(diǎn):List 存在的問(wèn)題它也同樣存在,不但如此,使用 ZSet 還不能存儲(chǔ)相同元素的值。因?yàn)樗怯行蚣希行蚣系拇鎯?chǔ)元素值是不能重復(fù)的,但分值可以重復(fù),也就是說(shuō)當(dāng)消息值重復(fù)時(shí),只能存儲(chǔ)一條信息在 ZSet 中。

          3、發(fā)布訂閱:使用發(fā)布和訂閱的類(lèi)型,我們可以實(shí)現(xiàn)主題訂閱的功能,也就是 Pattern Subscribe 的功能。因此我們可以使用一個(gè)消費(fèi)者“queue_*”來(lái)訂閱所有以“queue_”開(kāi)頭的消息隊(duì)列,如下圖所示:


          優(yōu)點(diǎn):可以按照主題訂閱方式

          缺點(diǎn):

          a、無(wú)法持久化保存消息,如果 Redis 服務(wù)器宕機(jī)或重啟,那么所有的消息將會(huì)丟失;

          b、發(fā)布訂閱模式是“發(fā)后既忘”的工作模式,如果有訂閱者離線重連之后就不能消費(fèi)之前的歷史消息;

          c、不支持消費(fèi)者確認(rèn)機(jī)制,穩(wěn)定性不能得到保證,例如當(dāng)消費(fèi)者獲取到消息之后,還沒(méi)來(lái)得及執(zhí)行就宕機(jī)了。因?yàn)闆](méi)有消費(fèi)者確認(rèn)機(jī)制,Redis 就會(huì)誤以為消費(fèi)者已經(jīng)執(zhí)行了,因此就不會(huì)重復(fù)發(fā)送未被正常消費(fèi)的消息了,這樣整體的 Redis 穩(wěn)定性就被沒(méi)有辦法得到保障了。

          4、Stream 類(lèi)型實(shí)現(xiàn):使用 Stream 的 xadd 和 xrange 來(lái)實(shí)現(xiàn)消息的存入和讀取了,并且 Stream 提供了 xack 手動(dòng)確認(rèn)消息消費(fèi)的命令,用它我們就可以實(shí)現(xiàn)消費(fèi)者確認(rèn)的功能了,使用命令如下:

          127.0.0.1:6379> xack mq group1 1580959593553-0

          (integer) 1

          消費(fèi)確認(rèn)增加了消息的可靠性,一般在業(yè)務(wù)處理完成之后,需要執(zhí)行 ack 確認(rèn)消息已經(jīng)被消費(fèi)完成,整個(gè)流程的執(zhí)行如下圖所示:


          其中“Group”為群組,消費(fèi)者也就是接收者需要訂閱到群組才能正常獲取到消息。

          在 Java 代碼中使用 List 實(shí)現(xiàn)消息隊(duì)列會(huì)有什么問(wèn)題?應(yīng)該如何解決?

          先看代碼部分實(shí)現(xiàn):

          import redis.clients.jedis.Jedis;

          publicclass ListMQTest {

          public static void main(String[] args){

          // 啟動(dòng)一個(gè)線程作為消費(fèi)者

          new Thread(() -> consumer()).start();

          // 生產(chǎn)者

          producer();

          }

          /**

          * 生產(chǎn)者

          */


          public static void producer() {

          Jedis jedis = new Jedis("127.0.0.1", 6379);

          // 推送消息

          jedis.lpush("mq", "Hello, List.");

          }

          /**

          * 消費(fèi)者

          */


          public static void consumer() {

          Jedis jedis = new Jedis("127.0.0.1", 6379);

          // 消費(fèi)消息

          while (true) {

          // 獲取消息

          String msg = jedis.rpop("mq");

          if (msg != null) {

          // 接收到了消息

          System.out.println("接收到消息:" + msg);

          }

          }

          }

          }

          可以看出以上消費(fèi)者的實(shí)現(xiàn)是通過(guò) while 無(wú)限循環(huán)來(lái)獲取消息,但如果消息的空閑時(shí)間比較長(zhǎng),一直沒(méi)有新任務(wù),而 while 循環(huán)不會(huì)因此停止,它會(huì)一直執(zhí)行循環(huán)的動(dòng)作,這樣就會(huì)白白浪費(fèi)了系統(tǒng)的資源。

          解決辦法:借助 Redis 中的阻塞讀來(lái)替代 rpop 的方法就可以解決此問(wèn)題

          import redis.clients.jedis.Jedis;

          public class ListMQExample {

          public static void main(String[] args) throws InterruptedException {

          // 消費(fèi)者

          new Thread(() -> bConsumer()).start();

          // 生產(chǎn)者

          producer();

          }

          /**

          * 生產(chǎn)者

          */


          public static void producer() throws InterruptedException {

          Jedis jedis = new Jedis("127.0.0.1", 6379);

          // 推送消息

          jedis.lpush("mq", "Hello, Java.");

          Thread.sleep(1000);

          jedis.lpush("mq", "message 2.");

          Thread.sleep(2000);

          jedis.lpush("mq", "message 3.");

          }

          /**

          * 消費(fèi)者(阻塞版)

          */


          public static void bConsumer() {

          Jedis jedis = new Jedis("127.0.0.1", 6379);

          while (true) {

          // 阻塞讀

          for (String item : jedis.brpop(0,"mq")) {

          // 讀取到相關(guān)數(shù)據(jù),進(jìn)行業(yè)務(wù)處理

          System.out.println(item);

          }

          }

          }

          }

          使用 brpop 替代 rpop 來(lái)讀取最后一條消息,就可以解決 while 循環(huán)在沒(méi)有數(shù)據(jù)的情況下,一直循環(huán)消耗系統(tǒng)資源的情況了。brpop 中的 b 是 blocking 的意思,表示阻塞讀,也就是當(dāng)隊(duì)列沒(méi)有數(shù)據(jù)時(shí),它會(huì)進(jìn)入休眠狀態(tài),當(dāng)有數(shù)據(jù)進(jìn)入隊(duì)列之后,它才會(huì)“蘇醒”過(guò)來(lái)執(zhí)行讀取任務(wù),這樣就可以解決 while 循環(huán)一直執(zhí)行消耗系統(tǒng)資源的問(wèn)題了。

          在程序中如何使用 Stream 來(lái)實(shí)現(xiàn)消息隊(duì)列

          在開(kāi)始實(shí)現(xiàn)消息隊(duì)列之前,我們必須先創(chuàng)建分組才行,因?yàn)橄M(fèi)者需要關(guān)聯(lián)分組信息才能正常運(yùn)行,具體實(shí)現(xiàn)代碼如下:

          import com.google.gson.Gson;

          import redis.clients.jedis.Jedis;

          import redis.clients.jedis.StreamEntry;

          import redis.clients.jedis.StreamEntryID;

          import utils.JedisUtils;

          import java.util.AbstractMap;

          import java.util.HashMap;

          import java.util.List;

          import java.util.Map;

          public class StreamGroupExample {

          private static final String _STREAM_KEY = "mq"; // 流 key

          private static final String _GROUP_NAME = "g1"; // 分組名稱

          private static final String _CONSUMER_NAME = "c1"; // 消費(fèi)者 1 的名稱

          private static final String _CONSUMER2_NAME = "c2"; // 消費(fèi)者 2 的名稱

          public static void main(String[] args) {

          // 生產(chǎn)者

          producer();

          // 創(chuàng)建消費(fèi)組

          createGroup(_STREAM_KEY, _GROUP_NAME);

          // 消費(fèi)者 1

          new Thread(() -> consumer()).start();

          // 消費(fèi)者 2

          new Thread(() -> consumer2()).start();

          }

          /**

          * 創(chuàng)建消費(fèi)分組

          * @param stream 流 key

          * @param groupName 分組名稱

          */


          public static void createGroup(String stream, String groupName) {

          Jedis jedis = JedisUtils.getJedis();

          jedis.xgroupCreate(stream, groupName, new StreamEntryID(), true);

          }

          /**

          * 生產(chǎn)者

          */


          public static void producer() {

          Jedis jedis = JedisUtils.getJedis();

          // 添加消息 1

          Map<String, String> map = new HashMap<>();

          map.put("data", "redis");

          StreamEntryID id = jedis.xadd(_STREAM_KEY, null, map);

          System.out.println("消息添加成功 ID:" + id);

          // 添加消息 2

          Map<String, String> map2 = new HashMap<>();

          map2.put("data", "java");

          StreamEntryID id2 = jedis.xadd(_STREAM_KEY, null, map2);

          System.out.println("消息添加成功 ID:" + id2);

          }

          /**

          * 消費(fèi)者 1

          */


          public static void consumer() {

          Jedis jedis = JedisUtils.getJedis();

          // 消費(fèi)消息

          while (true) {

          // 讀取消息

          Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,

          new StreamEntryID().UNRECEIVED_ENTRY);

          // 阻塞讀取一條消息(最大阻塞時(shí)間120s)

          List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER_NAME, 1,

          120 * 1000, true, entry);

          if (list != null && list.size() == 1) {

          // 讀取到消息

          Map<String, String> content = list.get(0).getValue().get(0).getFields(); // 消息內(nèi)容

          System.out.println("Consumer 1 讀取到消息 ID:" + list.get(0).getValue().get(0).getID() +

          " 內(nèi)容:" + new Gson().toJson(content));

          }

          }

          }

          /**

          * 消費(fèi)者 2

          */


          public static void consumer2() {

          Jedis jedis = JedisUtils.getJedis();

          // 消費(fèi)消息

          while (true) {

          // 讀取消息

          Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,

          new StreamEntryID().UNRECEIVED_ENTRY);

          // 阻塞讀取一條消息(最大阻塞時(shí)間120s)

          List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER2_NAME, 1,

          120 * 1000, true, entry);

          if (list != null && list.size() == 1) {

          // 讀取到消息

          Map<String, String> content = list.get(0).getValue().get(0).getFields(); // 消息內(nèi)容

          System.out.println("Consumer 2 讀取到消息 ID:" + list.get(0).getValue().get(0).getID() +

          " 內(nèi)容:" + new Gson().toJson(content));

          }

          }

          }

          }

          以上代碼運(yùn)行結(jié)果如下:

          消息添加成功 ID:1580971482344-0

          消息添加成功 ID:1580971482415-0

          Consumer 1 讀取到消息 ID:1580971482344-0 內(nèi)容:{"data":"redis"}

          Consumer 2 讀取到消息 ID:1580971482415-0 內(nèi)容:{"data":"java"}

          其中,jedis.xreadGroup() 方法的第五個(gè)參數(shù) noAck 表示是否自動(dòng)確認(rèn)消息,如果設(shè)置 true 收到消息會(huì)自動(dòng)確認(rèn) (ack) 消息,否則需要手動(dòng)確認(rèn)。

          可以看出,同一個(gè)分組內(nèi)的多個(gè) consumer 會(huì)讀取到不同消息,不同的 consumer 不會(huì)讀取到分組內(nèi)的同一條消息。


          瀏覽 82
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日本影视91爱爱 | 日韩黄色一级片 | 最近最新MV字幕观看 | 国产一区二区免费播放 | 欧美成人黄色电影网站 |