<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Springboot+RabbitMQ死信隊(duì)列

          共 10487字,需瀏覽 21分鐘

           ·

          2021-03-15 09:25

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”

          優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

          76套java從入門到精通實(shí)戰(zhàn)課程分享

          關(guān)于死信隊(duì)列

          在大多數(shù)的MQ中間件中,都有死信隊(duì)列的概念。死信隊(duì)列同其他的隊(duì)列一樣都是普通的隊(duì)列。在RabbitMQ中并沒(méi)有特定的“死信隊(duì)列”類型,而是通過(guò)配置,將其實(shí)現(xiàn)。

          當(dāng)我們?cè)趧?chuàng)建一個(gè)業(yè)務(wù)的交換機(jī)和隊(duì)列的時(shí)候,可以配置參數(shù),指明另一個(gè)隊(duì)列為當(dāng)前隊(duì)列的死信隊(duì)列,在RabbitMQ中,死信隊(duì)列(嚴(yán)格的說(shuō)應(yīng)該是死信交換機(jī))被稱為DLX Exchange。當(dāng)消息“死掉”后,會(huì)被自動(dòng)路由到DLX Exchange的queue中。


          什么樣的消息會(huì)進(jìn)入死信隊(duì)列?

          1.消息的TTL過(guò)期。

          2.消費(fèi)者對(duì)broker應(yīng)答Nack,并且消息禁止重回隊(duì)列。

          3.Queue隊(duì)列長(zhǎng)度已達(dá)上限。


          場(chǎng)景分析

          以用戶訂單支付為場(chǎng)景。在各大電商平臺(tái)上,訂單的都有待支付時(shí)間,通常為30min。當(dāng)用戶超過(guò)30min未支付訂單,該訂單的狀態(tài)應(yīng)該會(huì)變成“超時(shí)取消”,或類似的狀態(tài)值的改變。

          如果不使用MQ,可以設(shè)計(jì)一個(gè)定時(shí)任務(wù),定時(shí)查詢數(shù)據(jù)庫(kù),判斷訂單的狀態(tài)和支付時(shí)間是否已經(jīng)到期,若到期則修改訂單的狀態(tài)。但顯然,這不是一個(gè)很好的操作,頻繁訪問(wèn)數(shù)據(jù)庫(kù),造成不必要的資源浪費(fèi)。

          使用MQ,我們可以在下單的時(shí)候,當(dāng)訂單數(shù)據(jù)入庫(kù)后,發(fā)送一條Message到Queue中,并設(shè)置過(guò)期時(shí)間為30min或自定義的支付過(guò)期時(shí)間。

             /**
               * 發(fā)送帶有過(guò)期時(shí)間的消息
               */
              @GetMapping("/sendDlx")
              public void sendDlx() {
                  Order order = new Order();
                  order.setItemId(1);
                  order.setStatus(1);
                  rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey, 
                          JSON.toJSONString(order), message -> {
                      message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                      // 模擬,設(shè)置10S后消息過(guò)期
                      message.getMessageProperties().setExpiration("10000");
                      return message;
                  });
              }


          若30min后,還未有消費(fèi)者(下游服務(wù))消費(fèi)這條消息,那么該條消息就會(huì)被路由到死信隊(duì)列中。我們可以設(shè)置一個(gè)監(jiān)聽去監(jiān)聽死信隊(duì)列,當(dāng)收到死信隊(duì)列的消息后,則根據(jù)消息數(shù)據(jù),查詢數(shù)據(jù)庫(kù)訂單狀態(tài)是否還是待支付狀態(tài),若是,則修改成超時(shí)取消。


          代碼實(shí)現(xiàn)

          以下是demo,未做服務(wù)的拆分,因此整個(gè)流程都是單個(gè)服務(wù)實(shí)現(xiàn)的,所以就沒(méi)有下游服務(wù),但并不影響整體業(yè)務(wù)。


          RabbitMQConfig

          將需要的交換機(jī),隊(duì)列,綁定都聲明成SpringBean。Spring會(huì)自動(dòng)創(chuàng)建這些到RabbitMQ服務(wù)中。

          @Value注解部分都是配置文件exchange、queue、routingKey的名稱。

          /**
           * @author wulei
           */
          @Configuration
          public class RabbitConfig {

              @Value("${sunspring.order.exchange}")
              private String orderExchange;

              @Value("${sunspring.order.queue}")
              private String orderQueue;

              @Value("${sunspring.order.routingKey}")
              private String orderRoutingKey;

              @Value("${sunspring.dlx.exchange}")
              private String dlxExchange;

              @Value("${sunspring.dlx.queue}")
              private String dlxQueue;

              @Value("${sunspring.dlx.routingKey}")
              private String dlxRoutingKey;

              /**
               * 聲明死信隊(duì)列
               * @return DirectExchange
               */
              @Bean
              public DirectExchange dlxExchange() {
                  return new DirectExchange(dlxExchange);
              }

              /**
               * 聲明死信隊(duì)列
               * @return Queue
               */
              @Bean
              public Queue dlxQueue() {
                  return new Queue(dlxQueue);
              }

              /**
               * 綁定死信隊(duì)列到死信交換機(jī)
               * @return Binding
               */
              @Bean
              public Binding binding() {
                  return BindingBuilder.bind(dlxQueue())
                          .to(dlxExchange())
                          .with(dlxRoutingKey);
              }

              /**
               * 聲明訂單業(yè)務(wù)交換機(jī)
               * @return DirectExchange
               */
              @Bean
              public DirectExchange orderExchange() {
                  return new DirectExchange(orderExchange);
              }

              /**
               * 聲明訂單業(yè)務(wù)隊(duì)列
               * @return Queue
               */
              @Bean
              public Queue orderQueue() {
                  Map<String,Object> arguments = new HashMap<>(2);
                  // 綁定該隊(duì)列到私信交換機(jī)
                  arguments.put("x-dead-letter-exchange",dlxExchange);
                  arguments.put("x-dead-letter-routing-key",dlxRoutingKey);
                  return new Queue(orderQueue,true,false,false,arguments);
              }

              /**
               * 綁定訂單隊(duì)列到訂單交換機(jī)
               * @return Binding
               */
              @Bean
              public Binding orderBinding() {
                  return BindingBuilder.bind(orderQueue())
                          .to(orderExchange())
                          .with(orderRoutingKey);

              }

          }


          sunspring.order.exchange=sunspring_order_exchange
          sunspring.order.queue=sunspring_order_queue
          sunspring.order.routingKey=sunspring.order

          sunspring.dlx.exchange=sunspring_dlx_exchange
          sunspring.dlx.queue=sunspring.dlx.queue
          sunspring.dlx.routingKey=dlx

          在聲明業(yè)務(wù)隊(duì)列時(shí),創(chuàng)建了一個(gè)Map,并且put了兩個(gè)值,這兩個(gè)值就是死信隊(duì)列的聲明。

          x-dead-letter-exchange:死信交換機(jī)的名稱

          x-dead-letter-routing-key:死信交換機(jī)的路由鍵,因?yàn)閐emo中兩個(gè)交換機(jī)的類型都是direct的,因此路由鍵必須相同。

          /**
               * 聲明訂單業(yè)務(wù)隊(duì)列
               * @return Queue
               */
              @Bean
              public Queue orderQueue() {
                  Map<String,Object> arguments = new HashMap<>(2);
                  // 綁定該隊(duì)列到私信交換機(jī)
                  arguments.put("x-dead-letter-exchange",dlxExchange);
                  arguments.put("x-dead-letter-routing-key",dlxRoutingKey);
                  return new Queue(orderQueue,true,false,false,arguments);
              }


          監(jiān)控頁(yè)面

          在exchange列表中有剛剛創(chuàng)建的業(yè)務(wù)交換機(jī)sunspring_order_exchange和死信交換機(jī)

          sunspring_dlx_exchange


          在Queue列表中,有死信隊(duì)列sunspring_dlx_queue和業(yè)務(wù)隊(duì)列sunspring_order_queue

          并且業(yè)務(wù)隊(duì)列上有DLX標(biāo)記,可見當(dāng)前隊(duì)列已經(jīng)綁定了一個(gè)死信隊(duì)列。DLK表示的路由鍵。


          場(chǎng)景模擬

          生產(chǎn)者

          生產(chǎn)者發(fā)送了一個(gè)過(guò)期時(shí)間為10S的消息。

          message.getMessageProperties().setExpiration(“10000”);

          /**
               * 發(fā)送帶有過(guò)期時(shí)間的消息
               */
              @GetMapping("/sendDlx")
              public void sendDlx() {
                  Order order = new Order();
                  order.setItemId(1);
                  order.setStatus(1);
                  rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey,
                          JSON.toJSONString(order), message -> {
                      message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                      message.getMessageProperties().setExpiration("10000");
                      return message;
                  });
              }

          sunspring_order_queue接受到了一條消息,當(dāng)前消息的狀態(tài)是ready的,表示沒(méi)有任何消費(fèi)者消費(fèi)這條消息。

          10s后,當(dāng)前消息路由到了死信隊(duì)列中,sunspring_order_queue消息數(shù)量變成0,sunspring_dlx_queue數(shù)量變成1。

          消費(fèi)者,設(shè)置死信隊(duì)列監(jiān)聽

          通過(guò)設(shè)置對(duì)死信隊(duì)列的監(jiān)聽,可以發(fā)現(xiàn),在Springboot啟動(dòng)之后,創(chuàng)建了對(duì)RabbitMQ的監(jiān)聽,死信隊(duì)列的消息也立刻被消費(fèi)了。

          因此,我們可以監(jiān)聽死信隊(duì)列,對(duì)未被消費(fèi)的消息進(jìn)行下一步操作。如場(chǎng)景分析中的更改訂單狀態(tài)。

             @RabbitListener(queues = "sunspring.dlx.queue")
              public void dlxListener(Message message,Channel channel) throws IOException {
                  System.out.println(new String(message.getBody()));

                  //對(duì)消息進(jìn)行業(yè)務(wù)處理....
                  channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
              }

          2019-08-20 20:05:05.158  INFO 4420 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [120.27.243.91:5672]
          2019-08-20 20:05:05.224  INFO 4420 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#68ab0936:0/SimpleConnection@74606204 [delegate=amqp://[email protected]:5672/, localPort= 13563]
          {"itemId":1,"status":1}


          ————————————————

          版權(quán)聲明:本文為CSDN博主「小伙子你那什么車啊」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接及本聲明。

          原文鏈接:

          https://blog.csdn.net/shishishi777/article/details/99879419






          粉絲福利:Java從入門到入土學(xué)習(xí)路線圖

          ??????

          ??長(zhǎng)按上方微信二維碼 2 秒


          感謝點(diǎn)贊支持下哈 

          瀏覽 53
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  影音先锋一区二区 | www.俺去啦 | 人人妻人人爱人人操 | 看免费中国黄色视频 | 免费在线黄色视频 |