《RabbitMQ》什么是死信隊(duì)列
一 什么是死信隊(duì)列
當(dāng)一條消息在隊(duì)列中出現(xiàn)以下三種情況的時(shí)候,該消息就會(huì)變成一條死信。
消息被拒絕(basic.reject / basic.nack),并且requeue = false
消息TTL過(guò)期
隊(duì)列達(dá)到最大長(zhǎng)度
當(dāng)消息在一個(gè)隊(duì)列中變成一個(gè)死信之后,如果配置了死信隊(duì)列,它將被重新publish到死信交換機(jī),死信交換機(jī)將死信投遞到一個(gè)隊(duì)列上,這個(gè)隊(duì)列就是死信隊(duì)列。
二 實(shí)現(xiàn)死信隊(duì)列
2.1 原理圖

2.2 創(chuàng)建消費(fèi)者
創(chuàng)建一個(gè)消費(fèi)者,綁定消費(fèi)隊(duì)列及死信交換機(jī),交換機(jī)默認(rèn)為direct模型,死信交換機(jī)也是,arguments綁定死信交換機(jī)和key。(注解支持的具體參數(shù)文末會(huì)附上)
public?class?DirectConsumer?{
????@RabbitListener(bindings?=?{
????????????@QueueBinding(value?=?@Queue(value?=?"javatrip",arguments?=?
????????????????????{@Argument(name="x-dead-letter-exchange",value?=?"deadExchange"),
?????????????????????@Argument(name="x-dead-letter-routing-key",value?=?"deadKey")
????????????????????}),
????????????????????exchange?=?@Exchange(value="javatripDirect"),
????????????????????key?=?{"info","error","warning"}
????????????)
????})
public?void?receive1(String?message,?@Headers?Map?headers,?Channel?channel) throws?Exception{
????System.out.println("消費(fèi)者1"+message);
}
2.3 創(chuàng)建生產(chǎn)者
public?void?publishMessage(String?message){
????rabbitTemplate.setMandatory(true);
????rabbitTemplate.convertAndSend("javatripDirect","info",message);
}
三 造成死信的三種情況
3.1 拒絕消息,并且禁止重新入隊(duì)
設(shè)置yml為手動(dòng)簽收模式
spring:
??rabbitmq:
????listener:
??????simple:
????????acknowledge-mode:?manual
設(shè)置拒絕消息并禁止重新入隊(duì)
Long?deliverTag?=?(Long)?headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicNack(deliverTag,false,false);
綁定死信隊(duì)列
@RabbitListener(bindings?=?{
????@QueueBinding(
????????value?=?@Queue(value?=?"javatripDead"),
????????exchange?=?@Exchange(value?=?"deadExchange"),
????????key?=?"deadKey"
????)
})
public?void?receive2(String?message){
????System.out.println("我是一條死信:"+message);
}
3.2 消息TTL過(guò)期
綁定業(yè)務(wù)隊(duì)列的時(shí)候,增加消息的過(guò)期時(shí)長(zhǎng),當(dāng)消息過(guò)期后,消息將被轉(zhuǎn)發(fā)到死信隊(duì)列中。
@RabbitListener(bindings?=?{
????????????@QueueBinding(value?=?@Queue(value?=?"javatrip",arguments?=
????????????????????{@Argument(name="x-dead-letter-exchange",value?=?"deadExchange"),
?????????????????????@Argument(name="x-dead-letter-routing-key",value?=?"deadKey"),
?????????????????????@Argument(name?=?"x-message-ttl",value?=?"3000")
????????????????????}),
????????????????????exchange?=?@Exchange(value="javatripDirect"),
????????????????????key?=?{"info","error","warning"}
????????????)
????})
public?void?receive1(String?message,?@Headers?Map?headers,?Channel?channel) throws?Exception{
????System.out.println("消費(fèi)者1"+message);
}
3.3 隊(duì)列達(dá)到最大長(zhǎng)度
設(shè)置消息隊(duì)列長(zhǎng)度,當(dāng)隊(duì)列中的消息達(dá)到最大長(zhǎng)度后,繼續(xù)發(fā)送消息,消息將被轉(zhuǎn)發(fā)到死信隊(duì)列中。
@RabbitListener(bindings?=?{
????????????@QueueBinding(value?=?@Queue(value?=?"javatrip",arguments?=
????????????????????{@Argument(name="x-dead-letter-exchange",value?=?"deadExchange"),
?????????????????????@Argument(name="x-dead-letter-routing-key",value?=?"deadKey"),
?????????????????????@Argument(name?=?"x-max-length",value?=?"3")
????????????????????}),
????????????????????exchange?=?@Exchange(value="javatripDirect"),
????????????????????key?=?{"info","error","warning"}
????????????)
????})
public?void?receive1(String?message,?@Headers?Map?headers,?Channel?channel) throws?Exception{
????System.out.println("消費(fèi)者1"+message);
}
四 Spring Boot整合RabbitMQ用到的幾個(gè)注解
@QueueBinding作用就是將隊(duì)列和交換機(jī)進(jìn)行綁定,主要有以下三個(gè)參數(shù):
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public?@interface?QueueBinding?{
????/**
?????*?@return?the?queue.
?????*/
????Queue?value();
????/**
?????*?@return?the?exchange.
?????*/
????Exchange?exchange();
????/**
?????*?@return?the?routing?key?or?pattern?for?the?binding.
?????*?Multiple?elements?will?result?in?multiple?bindings.
?????*/
????String[]?key()?default?{};
}
@Queue是聲明隊(duì)列及隊(duì)列的一些屬性,主要參數(shù)如下:
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public?@interface?Queue?{
????/**
?????*?@return?the?queue?name?or?""?for?a?generated?queue?name?(default).
?????*/
????@AliasFor("name")
????String?value()?default?"";
????/**
?????*?@return?the?queue?name?or?""?for?a?generated?queue?name?(default).
?????*?@since?2.0
?????*/
????@AliasFor("value")
????String?name()?default?"";
????/**
?????*?是否持久化
?????*/
????String?durable()?default?"";
????/**
?????*?是否獨(dú)享、排外的.
?????*/
????String?exclusive()?default?"";
????/**
?????*?是否自動(dòng)刪除;
?????*/
????String?autoDelete()?default?"";
????/**
?????*?隊(duì)列的其他屬性參數(shù)
?????*?(1)x-message-ttl:消息的過(guò)期時(shí)間,單位:毫秒;
?????*(2)x-expires:隊(duì)列過(guò)期時(shí)間,隊(duì)列在多長(zhǎng)時(shí)間未被訪問(wèn)將被刪除,單位:毫秒;
?????*(3)x-max-length:隊(duì)列最大長(zhǎng)度,超過(guò)該最大值,則將從隊(duì)列頭部開(kāi)始刪除消息;
?????*(4)x-max-length-bytes:隊(duì)列消息內(nèi)容占用最大空間,受限于內(nèi)存大小,超過(guò)該閾值則從隊(duì)列頭部開(kāi)始刪除消??????*?息;
?????*(5)x-overflow:設(shè)置隊(duì)列溢出行為。這決定了當(dāng)達(dá)到隊(duì)列的最大長(zhǎng)度時(shí)消息會(huì)發(fā)生什么。有效值是drop-?????????* head、reject-publish或reject-publish-dlx。仲裁隊(duì)列類型僅支持drop-head;
?????*(6)x-dead-letter-exchange:死信交換器名稱,過(guò)期或被刪除(因隊(duì)列長(zhǎng)度超長(zhǎng)或因空間超出閾值)的消息???????*?可指定發(fā)送到該交換器中;
?????*(7)x-dead-letter-routing-key:死信消息路由鍵,在消息發(fā)送到死信交換器時(shí)會(huì)使用該路由鍵,如果不設(shè)?????????*?置,則使用消息的原來(lái)的路由鍵值
?????*(8)x-single-active-consumer:表示隊(duì)列是否是單一活動(dòng)消費(fèi)者,true時(shí),注冊(cè)的消費(fèi)組內(nèi)只有一個(gè)消費(fèi)?????*?者消費(fèi)消息,其他被忽略,false時(shí)消息循環(huán)分發(fā)給所有消費(fèi)者(默認(rèn)false)
?????*(9)x-max-priority:隊(duì)列要支持的最大優(yōu)先級(jí)數(shù);如果未設(shè)置,隊(duì)列將不支持消息優(yōu)先級(jí);
?????*(10)x-queue-mode(Lazy mode):將隊(duì)列設(shè)置為延遲模式,在磁盤上保留盡可能多的消息,以減少RAM的使?????????*?用;如果未設(shè)置,隊(duì)列將保留內(nèi)存緩存以盡可能快地傳遞消息;
?????*(11)x-queue-master-locator:在集群模式下設(shè)置鏡像隊(duì)列的主節(jié)點(diǎn)信息。
?????*/
????Argument[]?arguments()?default?{};
}
@Exchange是聲明交換及交換機(jī)的一些屬性,
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public?@interface?Exchange?{
????String?TRUE?=?"true";
????String?FALSE?=?"false";
????/**
?????*?@return?the?exchange?name.
?????*/
????@AliasFor("name")
????String?value()?default?"";
????/**
?????*?@return?the?exchange?name.
?????*?@since?2.0
?????*/
????@AliasFor("value")
????String?name()?default?"";
????/**
?????*?交換機(jī)類型,默認(rèn)DIRECT
?????*/
????String?type()?default?ExchangeTypes.DIRECT;
????/**
?????*?是否持久化
?????*/
????String?durable()?default?TRUE;
????/**
?????*?是否自動(dòng)刪除
?????*/
????String?autoDelete()?default?FALSE;
????/**
?????*?@return?the?arguments?to?apply?when?declaring?this?exchange.
?????*?@since?1.6
?????*/
????Argument[]?arguments()?default?{};
}
