<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          死信隊(duì)列???

          共 13191字,需瀏覽 27分鐘

           ·

          2021-03-14 18:19

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”

          優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

            作者 |  無話可說丶

          來源 |  urlify.cn/NvERz2

          死信隊(duì)列的作用

          死信交換機(jī)有什么用呢?在創(chuàng)建隊(duì)列的時(shí)候 可以給這個(gè)隊(duì)列附帶一個(gè)交換機(jī), 那么這個(gè)隊(duì)列作廢的消息就會(huì)被重新發(fā)到附帶的交換機(jī),然后讓這個(gè)交換機(jī)重新路由這條消息。

          死信消息產(chǎn)生的來源

          • 消息被拒絕(basic.reject或basic.nack)并且requeue=false

          • 消息TTL過期

          • 隊(duì)列達(dá)到最大長(zhǎng)度(隊(duì)列滿了,無法再添加數(shù)據(jù)到mq中)

          死信隊(duì)列處理的方式

          • 丟棄,如果不是很重要,可以選擇丟棄

          • 記錄死信入庫(kù),然后做后續(xù)的業(yè)務(wù)分析或處理

          • 通過死信隊(duì)列,由負(fù)責(zé)監(jiān)聽死信的應(yīng)用程序進(jìn)行處理

          消息超時(shí)進(jìn)入死信隊(duì)列

          通俗的說,就是消息產(chǎn)生之后,因?yàn)樵O(shè)置了超時(shí)時(shí)間,在這段時(shí)間內(nèi)消息沒有被消費(fèi)就會(huì)被扔到死信隊(duì)列里面。

           // 交換機(jī)名稱
              private static final String DESTINATION_NAME = "rabbitMq_topic";
              //消息隊(duì)列
              private static final String queueName = "topic_queue";
              //routingKey
              private static final String routingKey = "topic.#";

              //配置死信隊(duì)列
              private static final String dlxExchangeName = "dlx.exchange";
              private static final String dlxQueueName = "dlx.queue";
              private static final String dlxRoutingKey = "#";

              @Test
              public void producer() throws IOException, TimeoutException {
                  //獲取連接
                  Connection connection = MQConnectionUtils.newConnection();
                  //創(chuàng)建通道
                  Channel channel = connection.createChannel();
                  Map<String, Object> arguments = new HashMap<String, Object>(16);
                  // 為隊(duì)列設(shè)置隊(duì)列交換器
                  arguments.put("x-dead-letter-exchange", dlxExchangeName);
                  // 設(shè)置隊(duì)列中的消息 60s 鐘后過期
                  arguments.put("x-message-ttl", 60000);
                  //正常生產(chǎn)者綁定交換機(jī) 參數(shù)1 交換機(jī)名稱 參數(shù)2 交換機(jī)類型
                  channel.exchangeDeclare(DESTINATION_NAME, "topic"truefalse, null);
                  //消費(fèi)聲明隊(duì)列
                  channel.queueDeclare(queueName, truefalsefalse, arguments);
                  //消費(fèi)者隊(duì)列綁定交換機(jī) 綁定路由件 路由鍵
                  channel.queueBind(queueName, DESTINATION_NAME, routingKey);

                  String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 測(cè)試消息超時(shí),傳遞到死信隊(duì)列";

                  // 創(chuàng)建死信交換器和隊(duì)列
                  channel.exchangeDeclare(dlxExchangeName, "topic"truefalse, null);
                  channel.queueDeclare(dlxQueueName, truefalsefalse, null);
                  channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);

                  //生產(chǎn)者發(fā)送消息者
                  channel.basicPublish(DESTINATION_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                  System.err.println("消息發(fā)送完成......");

              }

          只監(jiān)聽了死信隊(duì)列的消息,正常消息無需監(jiān)聽接收

              /**
               * 監(jiān)聽死信隊(duì)列
               *
               * @throws IOException
               * @throws TimeoutException
               * @throws InterruptedException
               */
              @Test
              public void dlxConsumer() throws IOException, TimeoutException, InterruptedException {
                  //獲取連接
                  Connection connection = MQConnectionUtils.newConnection();
                  //創(chuàng)建通道
                  Channel channel = connection.createChannel();
                  System.out.println("死信消費(fèi)者啟動(dòng) ..........");
                  Thread.sleep(65000);
                  com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          System.err.println("死信隊(duì)列接收到消息:" + new String(body));
                          System.err.println("deliveryTag:" + envelope.getDeliveryTag());
                          channel.basicAck(envelope.getDeliveryTag(), false);
                      }
                  };
                  channel.basicConsume(dlxQueueName, consumer);
              }

          消息被退回

          這個(gè)我在之前的整合SpringBoot的時(shí)候有實(shí)驗(yàn)過。

          channel.basicNack(envelope.getDeliveryTag(),false,false);

          隊(duì)列達(dá)到最大長(zhǎng)度

          這個(gè)和消息超時(shí)差不多,只不過是設(shè)置了隊(duì)列的最大容量而已。
          只需要把上面的代碼修改一下就可以了。

              @Test
              public void producer() throws IOException, TimeoutException {
                  //獲取連接
                  Connection connection = MQConnectionUtils.newConnection();
                  //創(chuàng)建通道
                  Channel channel = connection.createChannel();

                  Map<String, Object> arguments = new HashMap<String, Object>(16);
                  // 為隊(duì)列設(shè)置隊(duì)列交換器
                  arguments.put("x-dead-letter-exchange", dlxExchangeName);
                  //設(shè)置隊(duì)列長(zhǎng)度為3
                  arguments.put("x-max-length", 3);
                  //正常生產(chǎn)者綁定交換機(jī) 參數(shù)1 交換機(jī)名稱 參數(shù)2 交換機(jī)類型
                  channel.exchangeDeclare(DESTINATION_NAME, "topic"truefalse, null);
                  //消費(fèi)聲明隊(duì)列
                  channel.queueDeclare(queueName, truefalsefalse, arguments);
                  //消費(fèi)者隊(duì)列綁定交換機(jī) 綁定路由件 路由鍵
                  channel.queueBind(queueName, DESTINATION_NAME, routingKey);

                  String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 測(cè)試消息超時(shí),傳遞到死信隊(duì)列";

                  // 創(chuàng)建死信交換器和隊(duì)列
                  channel.exchangeDeclare(dlxExchangeName, "topic"truefalse, null);
                  channel.queueDeclare(dlxQueueName, truefalsefalse, null);
                  channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);

                  //生產(chǎn)者發(fā)送消息者
                  for (int i = 0; i < 5; i++) {
                      channel.basicPublish(DESTINATION_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+i).getBytes());
                  }
                  System.out.println("消息發(fā)送完成......");
              }


           @Test
              public void consumer() throws IOException, TimeoutException, InterruptedException {
                  //獲取連接
                  Connection connection = MQConnectionUtils.newConnection();
                  //創(chuàng)建通道
                  Channel channel = connection.createChannel();
                  //此處設(shè)置一次只消費(fèi)1個(gè),且必須是ASK之后的消息才能算
                  channel.basicQos(1);
                  System.out.println("消費(fèi)者啟動(dòng) ..........");
                  com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          System.out.println("正常隊(duì)列:" + new String(body));
                          System.out.println("deliveryTag:" + envelope.getDeliveryTag());
                          channel.basicAck(envelope.getDeliveryTag(), false);
                      }
                  };
                  channel.basicConsume(queueName, consumer);
              }

              /**
               * 監(jiān)聽死信隊(duì)列
               *
               * @throws IOException
               * @throws TimeoutException
               * @throws InterruptedException
               */
              @Test
              public void dlxConsumer() throws IOException, TimeoutException, InterruptedException {
                  //獲取連接
                  Connection connection = MQConnectionUtils.newConnection();
                  //創(chuàng)建通道
                  Channel channel = connection.createChannel();
                  System.out.println("死信消費(fèi)者啟動(dòng) ..........");
                  com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          System.err.println("死信隊(duì)列接收到消息:" + new String(body));
                          System.err.println("deliveryTag:" + envelope.getDeliveryTag());
                          channel.basicAck(envelope.getDeliveryTag(), false);
                      }
                  };
                  channel.basicConsume(dlxQueueName, consumer);
              }





          鋒哥最新SpringCloud分布式電商秒殺課程發(fā)布

          ??????

          ??長(zhǎng)按上方微信二維碼 2 秒





          感謝點(diǎn)贊支持下哈 

          瀏覽 111
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  91无码精品国产 | 黄色小视频在线 | 蜜臀尤物一区二区三区直播 | 三区在线播放 | 男人天堂国产精品 |