<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          《RabbitMQ》什么是死信隊(duì)列

          共 5767字,需瀏覽 12分鐘

           ·

          2020-08-13 09:49

          一 什么是死信隊(duì)列

          當(dāng)一條消息在隊(duì)列中出現(xiàn)以下三種情況的時(shí)候,該消息就會(huì)變成一條死信。

          • 消息被拒絕(basic.reject / basic.nack),并且requeue = false

          • 消息TTL過(guò)期

          • 隊(duì)列達(dá)到最大長(zhǎng)度

          當(dāng)消息在一個(gè)隊(duì)列中變成一個(gè)死信之后,如果配置了死信隊(duì)列,它將被重新publish到死信交換機(jī),死信交換機(jī)將死信投遞到一個(gè)隊(duì)列上,這個(gè)隊(duì)列就是死信隊(duì)列。

          二 實(shí)現(xiàn)死信隊(duì)列

          2.1 原理圖

          2.2 創(chuàng)建消費(fèi)者

          創(chuàng)建一個(gè)消費(fèi)者,綁定消費(fèi)隊(duì)列及死信交換機(jī),交換機(jī)默認(rèn)為direct模型,死信交換機(jī)也是,arguments綁定死信交換機(jī)和key。(注解支持的具體參數(shù)文末會(huì)附上)

          public?class?DirectConsumer?{
          ????@RabbitListener(bindings?=?{
          ????????????@QueueBinding(value?=?@Queue(value?=?"javatrip",arguments?=?
          ????????????????????{@Argument(name="x-dead-letter-exchange",value?=?"deadExchange"),
          ?????????????????????@Argument(name="x-dead-letter-routing-key",value?=?"deadKey")
          ????????????????????}),
          ????????????????????exchange?=?@Exchange(value="javatripDirect"),
          ????????????????????key?=?{"info","error","warning"}
          ????????????)
          ????})
          public?void?receive1(String?message,?@Headers?Map?headers,?Channel?channel)throws?Exception{
          ????System.out.println("消費(fèi)者1"+message);
          }

          2.3 創(chuàng)建生產(chǎn)者

          public?void?publishMessage(String?message){

          ????rabbitTemplate.setMandatory(true);
          ????rabbitTemplate.convertAndSend("javatripDirect","info",message);
          }

          三 造成死信的三種情況

          3.1 拒絕消息,并且禁止重新入隊(duì)

          1. 設(shè)置yml為手動(dòng)簽收模式

          spring:
          ??rabbitmq:
          ????listener:
          ??????simple:
          ????????acknowledge-mode:?manual
          1. 設(shè)置拒絕消息并禁止重新入隊(duì)

          Long?deliverTag?=?(Long)?headers.get(AmqpHeaders.DELIVERY_TAG);
          channel.basicNack(deliverTag,false,false);
          1. 綁定死信隊(duì)列

          @RabbitListener(bindings?=?{
          ????@QueueBinding(
          ????????value?=?@Queue(value?=?"javatripDead"),
          ????????exchange?=?@Exchange(value?=?"deadExchange"),
          ????????key?=?"deadKey"
          ????)
          })
          public?void?receive2(String?message){
          ????System.out.println("我是一條死信:"+message);
          }

          3.2 消息TTL過(guò)期

          綁定業(yè)務(wù)隊(duì)列的時(shí)候,增加消息的過(guò)期時(shí)長(zhǎng),當(dāng)消息過(guò)期后,消息將被轉(zhuǎn)發(fā)到死信隊(duì)列中。

          @RabbitListener(bindings?=?{
          ????????????@QueueBinding(value?=?@Queue(value?=?"javatrip",arguments?=
          ????????????????????{@Argument(name="x-dead-letter-exchange",value?=?"deadExchange"),
          ?????????????????????@Argument(name="x-dead-letter-routing-key",value?=?"deadKey"),
          ?????????????????????@Argument(name?=?"x-message-ttl",value?=?"3000")
          ????????????????????}),
          ????????????????????exchange?=?@Exchange(value="javatripDirect"),
          ????????????????????key?=?{"info","error","warning"}
          ????????????)
          ????})
          public?void?receive1(String?message,?@Headers?Map?headers,?Channel?channel)throws?Exception{
          ????System.out.println("消費(fèi)者1"+message);
          }

          3.3 隊(duì)列達(dá)到最大長(zhǎng)度

          設(shè)置消息隊(duì)列長(zhǎng)度,當(dāng)隊(duì)列中的消息達(dá)到最大長(zhǎng)度后,繼續(xù)發(fā)送消息,消息將被轉(zhuǎn)發(fā)到死信隊(duì)列中。

          @RabbitListener(bindings?=?{
          ????????????@QueueBinding(value?=?@Queue(value?=?"javatrip",arguments?=
          ????????????????????{@Argument(name="x-dead-letter-exchange",value?=?"deadExchange"),
          ?????????????????????@Argument(name="x-dead-letter-routing-key",value?=?"deadKey"),
          ?????????????????????@Argument(name?=?"x-max-length",value?=?"3")
          ????????????????????}),
          ????????????????????exchange?=?@Exchange(value="javatripDirect"),
          ????????????????????key?=?{"info","error","warning"}
          ????????????)
          ????})
          public?void?receive1(String?message,?@Headers?Map?headers,?Channel?channel)throws?Exception{
          ????System.out.println("消費(fèi)者1"+message);
          }

          四 Spring Boot整合RabbitMQ用到的幾個(gè)注解

          1. @QueueBinding作用就是將隊(duì)列和交換機(jī)進(jìn)行綁定,主要有以下三個(gè)參數(shù):

          @Target({})
          @Retention(RetentionPolicy.RUNTIME)
          public?@interface?QueueBinding?{

          ????/**
          ?????*?@return?the?queue.
          ?????*/

          ????Queue?value();

          ????/**
          ?????*?@return?the?exchange.
          ?????*/

          ????Exchange?exchange();

          ????/**
          ?????*?@return?the?routing?key?or?pattern?for?the?binding.
          ?????*?Multiple?elements?will?result?in?multiple?bindings.
          ?????*/

          ????String[]?key()?default?{};
          }
          1. @Queue是聲明隊(duì)列及隊(duì)列的一些屬性,主要參數(shù)如下:

          @Target({})
          @Retention(RetentionPolicy.RUNTIME)
          public?@interface?Queue?{

          ????/**
          ?????*?@return?the?queue?name?or?""?for?a?generated?queue?name?(default).
          ?????*/

          ????@AliasFor("name")
          ????String?value()?default?"";

          ????/**
          ?????*?@return?the?queue?name?or?""?for?a?generated?queue?name?(default).
          ?????*?@since?2.0
          ?????*/

          ????@AliasFor("value")
          ????String?name()?default?"";

          ????/**
          ?????*?是否持久化
          ?????*/

          ????String?durable()?default?"";

          ????/**
          ?????*?是否獨(dú)享、排外的.
          ?????*/

          ????String?exclusive()?default?"";

          ????/**
          ?????*?是否自動(dòng)刪除;
          ?????*/

          ????String?autoDelete()?default?"";

          ????/**
          ?????*?隊(duì)列的其他屬性參數(shù)
          ?????*?(1)x-message-ttl:消息的過(guò)期時(shí)間,單位:毫秒;
          ?????*(2)x-expires:隊(duì)列過(guò)期時(shí)間,隊(duì)列在多長(zhǎng)時(shí)間未被訪問(wèn)將被刪除,單位:毫秒;
          ?????*(3)x-max-length:隊(duì)列最大長(zhǎng)度,超過(guò)該最大值,則將從隊(duì)列頭部開(kāi)始刪除消息;
          ?????*(4)x-max-length-bytes:隊(duì)列消息內(nèi)容占用最大空間,受限于內(nèi)存大小,超過(guò)該閾值則從隊(duì)列頭部開(kāi)始刪除消??????*?息;
          ?????*(5)x-overflow:設(shè)置隊(duì)列溢出行為。這決定了當(dāng)達(dá)到隊(duì)列的最大長(zhǎng)度時(shí)消息會(huì)發(fā)生什么。有效值是drop-?????????* head、reject-publish或reject-publish-dlx。仲裁隊(duì)列類型僅支持drop-head;
          ?????*(6)x-dead-letter-exchange:死信交換器名稱,過(guò)期或被刪除(因隊(duì)列長(zhǎng)度超長(zhǎng)或因空間超出閾值)的消息???????*?可指定發(fā)送到該交換器中;
          ?????*(7)x-dead-letter-routing-key:死信消息路由鍵,在消息發(fā)送到死信交換器時(shí)會(huì)使用該路由鍵,如果不設(shè)?????????*?置,則使用消息的原來(lái)的路由鍵值
          ?????*(8)x-single-active-consumer:表示隊(duì)列是否是單一活動(dòng)消費(fèi)者,true時(shí),注冊(cè)的消費(fèi)組內(nèi)只有一個(gè)消費(fèi)?????*?者消費(fèi)消息,其他被忽略,false時(shí)消息循環(huán)分發(fā)給所有消費(fèi)者(默認(rèn)false)
          ?????*(9)x-max-priority:隊(duì)列要支持的最大優(yōu)先級(jí)數(shù);如果未設(shè)置,隊(duì)列將不支持消息優(yōu)先級(jí);
          ?????*(10)x-queue-mode(Lazy mode):將隊(duì)列設(shè)置為延遲模式,在磁盤上保留盡可能多的消息,以減少RAM的使?????????*?用;如果未設(shè)置,隊(duì)列將保留內(nèi)存緩存以盡可能快地傳遞消息;
          ?????*(11)x-queue-master-locator:在集群模式下設(shè)置鏡像隊(duì)列的主節(jié)點(diǎn)信息。
          ?????*/

          ????Argument[]?arguments()?default?{};
          }
          1. @Exchange是聲明交換及交換機(jī)的一些屬性,

          @Target({})
          @Retention(RetentionPolicy.RUNTIME)
          public?@interface?Exchange?{

          ????String?TRUE?=?"true";

          ????String?FALSE?=?"false";

          ????/**
          ?????*?@return?the?exchange?name.
          ?????*/

          ????@AliasFor("name")
          ????String?value()?default?"";

          ????/**
          ?????*?@return?the?exchange?name.
          ?????*?@since?2.0
          ?????*/

          ????@AliasFor("value")
          ????String?name()?default?"";

          ????/**
          ?????*?交換機(jī)類型,默認(rèn)DIRECT
          ?????*/

          ????String?type()?default?ExchangeTypes.DIRECT;

          ????/**
          ?????*?是否持久化
          ?????*/

          ????String?durable()?default?TRUE;

          ????/**
          ?????*?是否自動(dòng)刪除
          ?????*/

          ????String?autoDelete()?default?FALSE;

          ????/**
          ?????*?@return?the?arguments?to?apply?when?declaring?this?exchange.
          ?????*?@since?1.6
          ?????*/

          ????Argument[]?arguments()?default?{};
          }



          < END >

          往期精選
          ?《RabbitMQ》如何保證消息不被重復(fù)消費(fèi)
          ?《RabbitMQ》如何保證消息的可靠性
          ??一文搞懂TCP和UDP的區(qū)別
          ??程序員接私活的19個(gè)平臺(tái)
          ??Spring AOP實(shí)現(xiàn)原理
          ??Spring IOC實(shí)現(xiàn)原理

          瀏覽 78
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  有码一区二区 | 操日本熟女| 久色影视| 97精品人妻一区二区 | 色黄在线观看 |