RabbitMQ之什么是死信隊列

一 什么是死信隊列
當(dāng)一條消息在隊列中出現(xiàn)以下三種情況的時候,該消息就會變成一條死信。
消息被拒絕(basic.reject / basic.nack),并且requeue = false
消息TTL過期
隊列達(dá)到最大長度
當(dāng)消息在一個隊列中變成一個死信之后,如果配置了死信隊列,它將被重新publish到死信交換機,死信交換機將死信投遞到一個隊列上,這個隊列就是死信隊列。
二 實現(xiàn)死信隊列
2.1 原理圖

2.2 創(chuàng)建消費者
創(chuàng)建一個消費者,綁定消費隊列及死信交換機,交換機默認(rèn)為direct模型,死信交換機也是,arguments綁定死信交換機和key。(注解支持的具體參數(shù)文末會附上)
public?class?DirectConsumer?{
????@RabbitListener(bindings?=?{
????????????@QueueBinding(value?=?@Queue(value?=?"javatrip",arguments?=?
????????????????????{@Argument(name="x-dead-letter-exchange",value?=?"deadExchange"),
?????????????????????@Argument(name="x-dead-letter-routing-key",value?=?"deadKey")
????????????????????}),
????????????????????exchange?=?@Exchange(value="javatripDirect"),
????????????????????key?=?{"info","error","warning"}
????????????)
????})
public?void?receive1(String?message,?@Headers?Map?headers,?Channel?channel) throws?Exception{
????System.out.println("消費者1"+message);
}
2.3 創(chuàng)建生產(chǎn)者
public?void?publishMessage(String?message){
????rabbitTemplate.setMandatory(true);
????rabbitTemplate.convertAndSend("javatripDirect","info",message);
}
三 造成死信的三種情況
3.1 拒絕消息,并且禁止重新入隊
設(shè)置yml為手動簽收模式
spring:
??rabbitmq:
????listener:
??????simple:
????????acknowledge-mode:?manual
設(shè)置拒絕消息并禁止重新入隊
Long?deliverTag?=?(Long)?headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicNack(deliverTag,false,false);
綁定死信隊列
@RabbitListener(bindings?=?{
????@QueueBinding(
????????value?=?@Queue(value?=?"javatripDead"),
????????exchange?=?@Exchange(value?=?"deadExchange"),
????????key?=?"deadKey"
????)
})
public?void?receive2(String?message){
????System.out.println("我是一條死信:"+message);
}
3.2 消息TTL過期
綁定業(yè)務(wù)隊列的時候,增加消息的過期時長,當(dāng)消息過期后,消息將被轉(zhuǎn)發(fā)到死信隊列中。
@RabbitListener(bindings?=?{
????????????@QueueBinding(value?=?@Queue(value?=?"javatrip",arguments?=
????????????????????{@Argument(name="x-dead-letter-exchange",value?=?"deadExchange"),
?????????????????????@Argument(name="x-dead-letter-routing-key",value?=?"deadKey"),
?????????????????????@Argument(name?=?"x-message-ttl",value?=?"3000")
????????????????????}),
????????????????????exchange?=?@Exchange(value="javatripDirect"),
????????????????????key?=?{"info","error","warning"}
????????????)
????})
public?void?receive1(String?message,?@Headers?Map?headers,?Channel?channel) throws?Exception{
????System.out.println("消費者1"+message);
}
3.3 隊列達(dá)到最大長度
設(shè)置消息隊列長度,當(dāng)隊列中的消息達(dá)到最大長度后,繼續(xù)發(fā)送消息,消息將被轉(zhuǎn)發(fā)到死信隊列中。
@RabbitListener(bindings?=?{
????????????@QueueBinding(value?=?@Queue(value?=?"javatrip",arguments?=
????????????????????{@Argument(name="x-dead-letter-exchange",value?=?"deadExchange"),
?????????????????????@Argument(name="x-dead-letter-routing-key",value?=?"deadKey"),
?????????????????????@Argument(name?=?"x-max-length",value?=?"3")
????????????????????}),
????????????????????exchange?=?@Exchange(value="javatripDirect"),
????????????????????key?=?{"info","error","warning"}
????????????)
????})
public?void?receive1(String?message,?@Headers?Map?headers,?Channel?channel) throws?Exception{
????System.out.println("消費者1"+message);
}
四 Spring Boot整合RabbitMQ用到的幾個注解
@QueueBinding作用就是將隊列和交換機進(jìn)行綁定,主要有以下三個參數(shù):
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public?@interface?QueueBinding?{
????/**
?????*?@return?the?queue.
?????*/
????Queue?value();
????/**
?????*?@return?the?exchange.
?????*/
????Exchange?exchange();
????/**
?????*?@return?the?routing?key?or?pattern?for?the?binding.
?????*?Multiple?elements?will?result?in?multiple?bindings.
?????*/
????String[]?key()?default?{};
}
@Queue是聲明隊列及隊列的一些屬性,主要參數(shù)如下:
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public?@interface?Queue?{
????/**
?????*?@return?the?queue?name?or?""?for?a?generated?queue?name?(default).
?????*/
????@AliasFor("name")
????String?value()?default?"";
????/**
?????*?@return?the?queue?name?or?""?for?a?generated?queue?name?(default).
?????*?@since?2.0
?????*/
????@AliasFor("value")
????String?name()?default?"";
????/**
?????*?是否持久化
?????*/
????String?durable()?default?"";
????/**
?????*?是否獨享、排外的.
?????*/
????String?exclusive()?default?"";
????/**
?????*?是否自動刪除;
?????*/
????String?autoDelete()?default?"";
????/**
?????*?隊列的其他屬性參數(shù)
?????*?(1)x-message-ttl:消息的過期時間,單位:毫秒;
?????*(2)x-expires:隊列過期時間,隊列在多長時間未被訪問將被刪除,單位:毫秒;
?????*(3)x-max-length:隊列最大長度,超過該最大值,則將從隊列頭部開始刪除消息;
?????*(4)x-max-length-bytes:隊列消息內(nèi)容占用最大空間,受限于內(nèi)存大小,超過該閾值則從隊列頭部開始刪除消??????*?息;
?????*(5)x-overflow:設(shè)置隊列溢出行為。這決定了當(dāng)達(dá)到隊列的最大長度時消息會發(fā)生什么。有效值是drop-?????????* head、reject-publish或reject-publish-dlx。仲裁隊列類型僅支持drop-head;
?????*(6)x-dead-letter-exchange:死信交換器名稱,過期或被刪除(因隊列長度超長或因空間超出閾值)的消息???????*?可指定發(fā)送到該交換器中;
?????*(7)x-dead-letter-routing-key:死信消息路由鍵,在消息發(fā)送到死信交換器時會使用該路由鍵,如果不設(shè)?????????*?置,則使用消息的原來的路由鍵值
?????*(8)x-single-active-consumer:表示隊列是否是單一活動消費者,true時,注冊的消費組內(nèi)只有一個消費?????*?者消費消息,其他被忽略,false時消息循環(huán)分發(fā)給所有消費者(默認(rèn)false)
?????*(9)x-max-priority:隊列要支持的最大優(yōu)先級數(shù);如果未設(shè)置,隊列將不支持消息優(yōu)先級;
?????*(10)x-queue-mode(Lazy mode):將隊列設(shè)置為延遲模式,在磁盤上保留盡可能多的消息,以減少RAM的使?????????*?用;如果未設(shè)置,隊列將保留內(nèi)存緩存以盡可能快地傳遞消息;
?????*(11)x-queue-master-locator:在集群模式下設(shè)置鏡像隊列的主節(jié)點信息。
?????*/
????Argument[]?arguments()?default?{};
}
@Exchange是聲明交換及交換機的一些屬性,
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public?@interface?Exchange?{
????String?TRUE?=?"true";
????String?FALSE?=?"false";
????/**
?????*?@return?the?exchange?name.
?????*/
????@AliasFor("name")
????String?value()?default?"";
????/**
?????*?@return?the?exchange?name.
?????*?@since?2.0
?????*/
????@AliasFor("value")
????String?name()?default?"";
????/**
?????*?交換機類型,默認(rèn)DIRECT
?????*/
????String?type()?default?ExchangeTypes.DIRECT;
????/**
?????*?是否持久化
?????*/
????String?durable()?default?TRUE;
????/**
?????*?是否自動刪除
?????*/
????String?autoDelete()?default?FALSE;
????/**
?????*?@return?the?arguments?to?apply?when?declaring?this?exchange.
?????*?@since?1.6
?????*/
????Argument[]?arguments()?default?{};
}
推薦閱讀:
喜歡我可以給我設(shè)為星標(biāo)哦


評論
圖片
表情
