<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RabbitMQ實現(xiàn)延時消息的兩種方法

          共 14340字,需瀏覽 29分鐘

           ·

          2021-05-16 12:07

          點擊上方藍色字體,選擇“標星公眾號”

          優(yōu)質(zhì)文章,第一時間送達

            作者 |  javalank

          來源 |  urlify.cn/JRZFjq

          76套java從入門到精通實戰(zhàn)課程分享

          1、死信隊列

          1.1消息什么時候變?yōu)樗佬?dead-letter)

          1. 消息被否定接收,消費者使用basic.reject 或者 basic.nack并且requeue 重回隊列屬性設(shè)為false。

          2. 消息在隊列里得時間超過了該消息設(shè)置的過期時間(TTL)。

          3. 消息隊列到達了它的最大長度,之后再收到的消息。

          1.2死信隊列的原理

          當一個消息再隊列里變?yōu)樗佬艜r,它會被重新publish到另一個exchange交換機上,這個exchange就為DLX。因此我們只需要在聲明正常的業(yè)務(wù)隊列時添加一個可選的"x-dead-letter-exchange"參數(shù),值為死信交換機,死信就會被rabbitmq重新publish到配置的這個交換機上,我們接著監(jiān)聽這個交換機就可以了。

          1.3 代碼實現(xiàn)

          1. 引入amqp依賴

          2. 聲明交換機,隊列

          package com.lank.demo.config;

          import org.springframework.amqp.core.*;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import java.util.HashMap;
          import java.util.Map;

          /**
           * @author lank
           * @since 2020/12/14 10:44
           */
          @Configuration
          public class RabbitmqConfig {

              //死信交換機,隊列,路由相關(guān)配置
              public static final String DLK_EXCHANGE = "dlk.exchange";
              public static final String DLK_ROUTEKEY = "dlk.routeKey";
              public static final String DLK_QUEUE = "dlk.queue";

              //業(yè)務(wù)交換機,隊列,路由相關(guān)配置
              public static final String DEMO_EXCHANGE = "demo.exchange";
              public static final String DEMO_QUEUE = "demo.queue";
              public static final String DEMO_ROUTEKEY = "demo.routeKey";

              //延時插件DelayedMessagePlugin的交換機,隊列,路由相關(guān)配置
              public static final String DMP_EXCHANGE = "dmp.exchange";
              public static final String DMP_ROUTEKEY = "dmp.routeKey";
              public static final String DMP_QUEUE = "dmp.queue";

              @Bean
              public DirectExchange demoExchange(){
                  return new DirectExchange(DEMO_EXCHANGE,true,false);
              }

              @Bean
              public Queue demoQueue(){
                  //只需要在聲明業(yè)務(wù)隊列時添加x-dead-letter-exchange,值為死信交換機
                  Map<String,Object> map = new HashMap<>(1);
                  map.put("x-dead-letter-exchange",DLK_EXCHANGE);
                  //該參數(shù)x-dead-letter-routing-key可以修改該死信的路由key,不設(shè)置則使用原消息的路由key
                  map.put("x-dead-letter-routing-key",DLK_ROUTEKEY);
                  return new Queue(DEMO_QUEUE,true,false,false,map);
              }

              @Bean
              public Binding demoBind(){
                  return BindingBuilder.bind(demoQueue()).to(demoExchange()).with(DEMO_ROUTEKEY);
              }

              @Bean
              public DirectExchange dlkExchange(){
                  return new DirectExchange(DLK_EXCHANGE,true,false);
              }

              @Bean
              public Queue dlkQueue(){
                  return new Queue(DLK_QUEUE,true,false,false);
              }

              @Bean
              public Binding dlkBind(){
                  return BindingBuilder.bind(dlkQueue()).to(dlkExchange()).with(DLK_ROUTEKEY);
              }


              //延遲插件使用
              //1、聲明一個類型為x-delayed-message的交換機
              //2、參數(shù)添加一個x-delayed-type值為交換機的類型用于路由key的映射
              @Bean
              public CustomExchange dmpExchange(){
                  Map<String, Object> arguments = new HashMap<>(1);
                  arguments.put("x-delayed-type""direct");
                  return new CustomExchange(DMP_EXCHANGE,"x-delayed-message",true,false,arguments);
              }

              @Bean
              public Queue dmpQueue(){
                  return new Queue(DMP_QUEUE,true,false,false);
              }

              @Bean
              public Binding dmpBind(){
                  return BindingBuilder.bind(dmpQueue()).to(dmpExchange()).with(DMP_ROUTEKEY).noargs();
              }
              

          }
          1. 聲明一個類用于發(fā)送帶過期時間的消息

          package com.lank.demo.rabbitmq;

          import com.lank.demo.config.RabbitmqConfig;
          import lombok.extern.slf4j.Slf4j;
          import org.springframework.amqp.AmqpException;
          import org.springframework.amqp.core.Message;
          import org.springframework.amqp.core.MessagePostProcessor;
          import org.springframework.amqp.rabbit.core.RabbitTemplate;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.stereotype.Component;

          /**
           * @author lank
           * @since 2020/12/14 10:33
           */
          @Component
          @Slf4j
          public class MessageSender {

              @Autowired
              private RabbitTemplate rabbitTemplate;

              //使用死信隊列發(fā)送消息方法封裝
              public void send(String message,Integer time){
                  String ttl = String.valueOf(time*1000);
                  //exchange和routingKey都為業(yè)務(wù)的就可以,只需要設(shè)置消息的過期時間
                  rabbitTemplate.convertAndSend(RabbitmqConfig.DEMO_EXCHANGE, RabbitmqConfig.DEMO_ROUTEKEY,message, new MessagePostProcessor() {
                      @Override
                      public Message postProcessMessage(Message message) throws AmqpException {
                          //設(shè)置消息的過期時間,是以毫秒為單位的
                          message.getMessageProperties().setExpiration(ttl);
                          return message;
                      }
                  });
                  log.info("使用死信隊列消息:{}發(fā)送成功,過期時間:{}秒。",message,time);
              }

              //使用延遲插件發(fā)送消息方法封裝
              public void send2(String message,Integer time){
                  rabbitTemplate.convertAndSend(RabbitmqConfig.DMP_EXCHANGE, RabbitmqConfig.DMP_ROUTEKEY,message, new MessagePostProcessor() {
                      @Override
                      public Message postProcessMessage(Message message) throws AmqpException {
                      //使用延遲插件只需要在消息的header中添加x-delay屬性,值為過期時間,單位毫秒
                          message.getMessageProperties().setHeader("x-delay",time*1000);
                          return message;
                      }
                  });
                  log.info("使用延遲插件發(fā)送消息:{}發(fā)送成功,過期時間:{}秒。",message,time);
              }
          }
          1. 編寫一個類用于消費消息

          package com.lank.demo.rabbitmq;

          import com.lank.demo.config.RabbitmqConfig;
          import lombok.extern.slf4j.Slf4j;
          import org.springframework.amqp.core.Message;
          import org.springframework.amqp.rabbit.annotation.RabbitHandler;
          import org.springframework.amqp.rabbit.annotation.RabbitListener;
          import org.springframework.stereotype.Component;

          /**
           * @author lank
           * @since 2020/12/15 15:57
           */

          @Component
          @Slf4j
          public class MessageReceiver {

              @RabbitHandler
              @RabbitListener(queues = RabbitmqConfig.DLK_QUEUE)
              public void onMessage(Message message){
                  log.info("使用死信隊列,收到消息:{}",new String(message.getBody()));
              }

              @RabbitHandler
              @RabbitListener(queues = RabbitmqConfig.DMP_QUEUE)
              public void onMessage2(Message message){
                  log.info("使用延遲插件,收到消息:{}",new String(message.getBody()));
              }
          }
          1. 編寫Controller調(diào)用發(fā)送消息方法測試結(jié)果

          package com.lank.demo.controller;
          import com.lank.demo.rabbitmq.MessageSender;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.web.bind.annotation.GetMapping;
          import org.springframework.web.bind.annotation.RequestParam;
          import org.springframework.web.bind.annotation.RestController;

          /**
           * @author lank
           * @since 2020/12/14 17:05
           */
          @RestController
          public class MessageController {

              @Autowired
              public MessageSender messageSender;

              //死信隊列controller
              @GetMapping("/send")
              public String send(@RequestParam String msg,Integer time){
                  messageSender.send(msg,time);
                  return "ok";
              }

              //延遲插件controller
              @GetMapping("/send2")
              public String sendByPlugin(@RequestParam String msg,Integer time){
                  messageSender.send2(msg,time);
                  return "ok";
              }

          }
          1. 配置文件application.properties

          server.port=4399
          #virtual-host使用默認的/就好,如果需要/demo需自己在控制臺添加
          spring.rabbitmq.virtual-host=/demo
          spring.rabbitmq.host=localhost
          spring.rabbitmq.port=5672
          spring.rabbitmq.username=guest
          spring.rabbitmq.password=guest
          1. 啟動項目,打開rabbitmq控制臺,可以看到交換機和隊列已經(jīng)創(chuàng)建好。

          2. 在瀏覽器中請求http://localhost:4399/send?msg=hello&time=5,從控制臺的輸出來看,剛好5s后接收到消息。

          2020-12-16 22:47:28.071  INFO 13304 --- [nio-4399-exec-1] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用死信隊列消息:hello發(fā)送成功,過期時間:5秒。
          2020-12-16 22:47:33.145  INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver           : 使用死信隊列,收到消息:hello

          1.4死信隊列的一個小注意點

          當我往死信隊列中發(fā)送兩條不同過期時間的消息時,如果先發(fā)送的消息A的過期時間大于后發(fā)送的消息B的過期時間時,由于消息的順序消費,消息B過期后并不會立即重新publish到死信交換機,而是會等到消息A過期后一起被消費。

          依次發(fā)送兩個請求http://localhost:4399/send?msg=消息A&time=30和http://localhost:4399/send?msg=消息B&time=10,消息A先發(fā)送,過期時間30S,消息B后發(fā)送,過期時間10S,我們想要的結(jié)果應(yīng)該是10S收到消息B,30S后收到消息A,但結(jié)果并不是,控制臺輸出如下:

          2020-12-16 22:54:47.339  INFO 13304 --- [nio-4399-exec-5] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用死信隊列消息:消息A發(fā)送成功,過期時間:30秒。
          2020-12-16 22:54:54.278 INFO 13304 --- [nio-4399-exec-6] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用死信隊列消息:消息B發(fā)送成功,過期時間:10秒。
          2020-12-16 22:55:17.356  INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver           : 使用死信隊列,收到消息:消息A
          2020-12-16 22:55:17.357  INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver           : 使用死信隊列,收到消息:消息B

          消息A30S后被成功消費,緊接著消息B被消費。因此當我們使用死信隊列時應(yīng)該注意是否消息的過期時間都是一樣的,比如訂單超過10分鐘未支付修改其狀態(tài)。如果當一個隊列各個消息的過期時間不一致時,使用死信隊列就可能達不到延時的作用。這時候我們可以使用延時插件來實現(xiàn)這需求。

          2 、延時插件

          RabbitMQ Delayed Message Plugin是一個rabbitmq的插件,所以使用前需要安裝它,可以參考的GitHub地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

          2.1如何實現(xiàn)

          1. 安裝好插件后只需要聲明一個類型type為"x-delayed-message"的exchange,并且在其可選參數(shù)下配置一個key為"x-delayed-typ",值為交換機類型(topic/direct/fanout)的屬性。

          2. 聲明一個隊列綁定到該交換機

          3. 在發(fā)送消息的時候消息的header里添加一個key為"x-delay",值為過期時間的屬性,單位毫秒。

          4. 代碼就在上面,配置類為DMP開頭的,發(fā)送消息的方法為send2()。

          5. 啟動后在rabbitmq控制臺可以看到一個類型為x-delayed-message的交換機。

          6. 繼續(xù)在瀏覽器中發(fā)送兩個請求http://localhost:4399/send2?msg=消息A&time=30和http://localhost:4399/send2?msg=消息B&time=10,控制臺輸出如下,不會出現(xiàn)死信隊列出現(xiàn)的問題:

          2020-12-16 23:31:19.819  INFO 13304 --- [nio-4399-exec-9] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用延遲插件發(fā)送消息:消息A發(fā)送成功,過期時間:30秒。
          2020-12-16 23:31:27.673 INFO 13304 --- [io-4399-exec-10] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用延遲插件發(fā)送消息:消息B發(fā)送成功,過期時間:10秒。
          2020-12-16 23:31:37.833  INFO 13304 --- [ntContainer#1-1] c.l.r.rabbitmq.MessageReceiver           : 使用延遲插件,收到消息:消息B
          2020-12-16 23:31:49.917  INFO 13304 --- [ntContainer#1-1] c.l.r.rabbitmq.MessageReceiver           : 使用延遲插件,收到消息:消息A

          死信交換機官網(wǎng)介紹:https://www.rabbitmq.com/dlx.html
          延時插件GitHub:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange






          鋒哥最新SpringCloud分布式電商秒殺課程發(fā)布

          ??????

          ??長按上方微信二維碼 2 秒





          感謝點贊支持下哈 

          瀏覽 61
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  免费看黄色一级视频 | 黄色电影在线视频 | 九色视频免费看 | A黄色视频| 男人av资源在线 欧美操逼免费观看 |