<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RabbitMQ之什么是死信隊列

          共 5744字,需瀏覽 12分鐘

           ·

          2020-10-28 04:21

          一 什么是死信隊列

          當(dāng)一條消息在隊列中出現(xiàn)以下三種情況的時候,該消息就會變成一條死信。

          • 消息被拒絕(basic.reject / basic.nack),并且requeue = false

          • 消息TTL過期

          • 隊列達(dá)到最大長度

          當(dāng)消息在一個隊列中變成一個死信之后,如果配置了死信隊列,它將被重新publish到死信交換機,死信交換機將死信投遞到一個隊列上,這個隊列就是死信隊列。

          二 實現(xiàn)死信隊列

          2.1 原理圖

          2.2 創(chuàng)建消費者

          創(chuàng)建一個消費者,綁定消費隊列及死信交換機,交換機默認(rèn)為direct模型,死信交換機也是,arguments綁定死信交換機和key。(注解支持的具體參數(shù)文末會附上)

          public?class?DirectConsumer?{
          ????@RabbitListener(bindings?=?{
          ????????????@QueueBinding(value?=?@Queue(value?=?"javatrip",arguments?=?
          ????????????????????{@Argument(name="x-dead-letter-exchange",value?=?"deadExchange"),
          ?????????????????????@Argument(name="x-dead-letter-routing-key",value?=?"deadKey")
          ????????????????????}),
          ????????????????????exchange?=?@Exchange(value="javatripDirect"),
          ????????????????????key?=?{"info","error","warning"}
          ????????????)
          ????})
          public?void?receive1(String?message,?@Headers?Map?headers,?Channel?channel)throws?Exception{
          ????System.out.println("消費者1"+message);
          }

          2.3 創(chuàng)建生產(chǎn)者

          public?void?publishMessage(String?message){

          ????rabbitTemplate.setMandatory(true);
          ????rabbitTemplate.convertAndSend("javatripDirect","info",message);
          }

          三 造成死信的三種情況

          3.1 拒絕消息,并且禁止重新入隊

          1. 設(shè)置yml為手動簽收模式

          spring:
          ??rabbitmq:
          ????listener:
          ??????simple:
          ????????acknowledge-mode:?manual
          1. 設(shè)置拒絕消息并禁止重新入隊

          Long?deliverTag?=?(Long)?headers.get(AmqpHeaders.DELIVERY_TAG);
          channel.basicNack(deliverTag,false,false);
          1. 綁定死信隊列

          @RabbitListener(bindings?=?{
          ????@QueueBinding(
          ????????value?=?@Queue(value?=?"javatripDead"),
          ????????exchange?=?@Exchange(value?=?"deadExchange"),
          ????????key?=?"deadKey"
          ????)
          })
          public?void?receive2(String?message){
          ????System.out.println("我是一條死信:"+message);
          }

          3.2 消息TTL過期

          綁定業(yè)務(wù)隊列的時候,增加消息的過期時長,當(dāng)消息過期后,消息將被轉(zhuǎn)發(fā)到死信隊列中。

          @RabbitListener(bindings?=?{
          ????????????@QueueBinding(value?=?@Queue(value?=?"javatrip",arguments?=
          ????????????????????{@Argument(name="x-dead-letter-exchange",value?=?"deadExchange"),
          ?????????????????????@Argument(name="x-dead-letter-routing-key",value?=?"deadKey"),
          ?????????????????????@Argument(name?=?"x-message-ttl",value?=?"3000")
          ????????????????????}),
          ????????????????????exchange?=?@Exchange(value="javatripDirect"),
          ????????????????????key?=?{"info","error","warning"}
          ????????????)
          ????})
          public?void?receive1(String?message,?@Headers?Map?headers,?Channel?channel)throws?Exception{
          ????System.out.println("消費者1"+message);
          }

          3.3 隊列達(dá)到最大長度

          設(shè)置消息隊列長度,當(dāng)隊列中的消息達(dá)到最大長度后,繼續(xù)發(fā)送消息,消息將被轉(zhuǎn)發(fā)到死信隊列中。

          @RabbitListener(bindings?=?{
          ????????????@QueueBinding(value?=?@Queue(value?=?"javatrip",arguments?=
          ????????????????????{@Argument(name="x-dead-letter-exchange",value?=?"deadExchange"),
          ?????????????????????@Argument(name="x-dead-letter-routing-key",value?=?"deadKey"),
          ?????????????????????@Argument(name?=?"x-max-length",value?=?"3")
          ????????????????????}),
          ????????????????????exchange?=?@Exchange(value="javatripDirect"),
          ????????????????????key?=?{"info","error","warning"}
          ????????????)
          ????})
          public?void?receive1(String?message,?@Headers?Map?headers,?Channel?channel)throws?Exception{
          ????System.out.println("消費者1"+message);
          }

          四 Spring Boot整合RabbitMQ用到的幾個注解

          1. @QueueBinding作用就是將隊列和交換機進(jìn)行綁定,主要有以下三個參數(shù):

          @Target({})
          @Retention(RetentionPolicy.RUNTIME)
          public?@interface?QueueBinding?{

          ????/**
          ?????*?@return?the?queue.
          ?????*/

          ????Queue?value();

          ????/**
          ?????*?@return?the?exchange.
          ?????*/

          ????Exchange?exchange();

          ????/**
          ?????*?@return?the?routing?key?or?pattern?for?the?binding.
          ?????*?Multiple?elements?will?result?in?multiple?bindings.
          ?????*/

          ????String[]?key()?default?{};
          }
          1. @Queue是聲明隊列及隊列的一些屬性,主要參數(shù)如下:

          @Target({})
          @Retention(RetentionPolicy.RUNTIME)
          public?@interface?Queue?{

          ????/**
          ?????*?@return?the?queue?name?or?""?for?a?generated?queue?name?(default).
          ?????*/

          ????@AliasFor("name")
          ????String?value()?default?"";

          ????/**
          ?????*?@return?the?queue?name?or?""?for?a?generated?queue?name?(default).
          ?????*?@since?2.0
          ?????*/

          ????@AliasFor("value")
          ????String?name()?default?"";

          ????/**
          ?????*?是否持久化
          ?????*/

          ????String?durable()?default?"";

          ????/**
          ?????*?是否獨享、排外的.
          ?????*/

          ????String?exclusive()?default?"";

          ????/**
          ?????*?是否自動刪除;
          ?????*/

          ????String?autoDelete()?default?"";

          ????/**
          ?????*?隊列的其他屬性參數(shù)
          ?????*?(1)x-message-ttl:消息的過期時間,單位:毫秒;
          ?????*(2)x-expires:隊列過期時間,隊列在多長時間未被訪問將被刪除,單位:毫秒;
          ?????*(3)x-max-length:隊列最大長度,超過該最大值,則將從隊列頭部開始刪除消息;
          ?????*(4)x-max-length-bytes:隊列消息內(nèi)容占用最大空間,受限于內(nèi)存大小,超過該閾值則從隊列頭部開始刪除消??????*?息;
          ?????*(5)x-overflow:設(shè)置隊列溢出行為。這決定了當(dāng)達(dá)到隊列的最大長度時消息會發(fā)生什么。有效值是drop-?????????* head、reject-publish或reject-publish-dlx。仲裁隊列類型僅支持drop-head;
          ?????*(6)x-dead-letter-exchange:死信交換器名稱,過期或被刪除(因隊列長度超長或因空間超出閾值)的消息???????*?可指定發(fā)送到該交換器中;
          ?????*(7)x-dead-letter-routing-key:死信消息路由鍵,在消息發(fā)送到死信交換器時會使用該路由鍵,如果不設(shè)?????????*?置,則使用消息的原來的路由鍵值
          ?????*(8)x-single-active-consumer:表示隊列是否是單一活動消費者,true時,注冊的消費組內(nèi)只有一個消費?????*?者消費消息,其他被忽略,false時消息循環(huán)分發(fā)給所有消費者(默認(rèn)false)
          ?????*(9)x-max-priority:隊列要支持的最大優(yōu)先級數(shù);如果未設(shè)置,隊列將不支持消息優(yōu)先級;
          ?????*(10)x-queue-mode(Lazy mode):將隊列設(shè)置為延遲模式,在磁盤上保留盡可能多的消息,以減少RAM的使?????????*?用;如果未設(shè)置,隊列將保留內(nèi)存緩存以盡可能快地傳遞消息;
          ?????*(11)x-queue-master-locator:在集群模式下設(shè)置鏡像隊列的主節(jié)點信息。
          ?????*/

          ????Argument[]?arguments()?default?{};
          }
          1. @Exchange是聲明交換及交換機的一些屬性,

          @Target({})
          @Retention(RetentionPolicy.RUNTIME)
          public?@interface?Exchange?{

          ????String?TRUE?=?"true";

          ????String?FALSE?=?"false";

          ????/**
          ?????*?@return?the?exchange?name.
          ?????*/

          ????@AliasFor("name")
          ????String?value()?default?"";

          ????/**
          ?????*?@return?the?exchange?name.
          ?????*?@since?2.0
          ?????*/

          ????@AliasFor("value")
          ????String?name()?default?"";

          ????/**
          ?????*?交換機類型,默認(rèn)DIRECT
          ?????*/

          ????String?type()?default?ExchangeTypes.DIRECT;

          ????/**
          ?????*?是否持久化
          ?????*/

          ????String?durable()?default?TRUE;

          ????/**
          ?????*?是否自動刪除
          ?????*/

          ????String?autoDelete()?default?FALSE;

          ????/**
          ?????*?@return?the?arguments?to?apply?when?declaring?this?exchange.
          ?????*?@since?1.6
          ?????*/

          ????Argument[]?arguments()?default?{};
          }





          推薦閱讀:


          喜歡我可以給我設(shè)為星標(biāo)哦

          好文章,我“在看”
          瀏覽 56
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  青青草乱伦电影 | 男人天堂| 啪啪啪啪啪啪啪网站 | 久久国产成人精品Av | 欧美成人考逼视频 |