死信隊(duì)列???
點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”
優(yōu)質(zhì)文章,第一時(shí)間送達(dá)
作者 | 無話可說丶
來源 | urlify.cn/NvERz2
死信隊(duì)列的作用
死信交換機(jī)有什么用呢?在創(chuàng)建隊(duì)列的時(shí)候 可以給這個(gè)隊(duì)列附帶一個(gè)交換機(jī), 那么這個(gè)隊(duì)列作廢的消息就會(huì)被重新發(fā)到附帶的交換機(jī),然后讓這個(gè)交換機(jī)重新路由這條消息。
死信消息產(chǎn)生的來源
消息被拒絕(basic.reject或basic.nack)并且requeue=false
消息TTL過期
隊(duì)列達(dá)到最大長(zhǎng)度(隊(duì)列滿了,無法再添加數(shù)據(jù)到mq中)
死信隊(duì)列處理的方式
丟棄,如果不是很重要,可以選擇丟棄
記錄死信入庫(kù),然后做后續(xù)的業(yè)務(wù)分析或處理
通過死信隊(duì)列,由負(fù)責(zé)監(jiān)聽死信的應(yīng)用程序進(jìn)行處理

消息超時(shí)進(jìn)入死信隊(duì)列
通俗的說,就是消息產(chǎn)生之后,因?yàn)樵O(shè)置了超時(shí)時(shí)間,在這段時(shí)間內(nèi)消息沒有被消費(fèi)就會(huì)被扔到死信隊(duì)列里面。
// 交換機(jī)名稱
private static final String DESTINATION_NAME = "rabbitMq_topic";
//消息隊(duì)列
private static final String queueName = "topic_queue";
//routingKey
private static final String routingKey = "topic.#";
//配置死信隊(duì)列
private static final String dlxExchangeName = "dlx.exchange";
private static final String dlxQueueName = "dlx.queue";
private static final String dlxRoutingKey = "#";
@Test
public void producer() throws IOException, TimeoutException {
//獲取連接
Connection connection = MQConnectionUtils.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
Map<String, Object> arguments = new HashMap<String, Object>(16);
// 為隊(duì)列設(shè)置隊(duì)列交換器
arguments.put("x-dead-letter-exchange", dlxExchangeName);
// 設(shè)置隊(duì)列中的消息 60s 鐘后過期
arguments.put("x-message-ttl", 60000);
//正常生產(chǎn)者綁定交換機(jī) 參數(shù)1 交換機(jī)名稱 參數(shù)2 交換機(jī)類型
channel.exchangeDeclare(DESTINATION_NAME, "topic", true, false, null);
//消費(fèi)聲明隊(duì)列
channel.queueDeclare(queueName, true, false, false, arguments);
//消費(fèi)者隊(duì)列綁定交換機(jī) 綁定路由件 路由鍵
channel.queueBind(queueName, DESTINATION_NAME, routingKey);
String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 測(cè)試消息超時(shí),傳遞到死信隊(duì)列";
// 創(chuàng)建死信交換器和隊(duì)列
channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
channel.queueDeclare(dlxQueueName, true, false, false, null);
channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);
//生產(chǎn)者發(fā)送消息者
channel.basicPublish(DESTINATION_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.err.println("消息發(fā)送完成......");
}
只監(jiān)聽了死信隊(duì)列的消息,正常消息無需監(jiān)聽接收
/**
* 監(jiān)聽死信隊(duì)列
*
* @throws IOException
* @throws TimeoutException
* @throws InterruptedException
*/
@Test
public void dlxConsumer() throws IOException, TimeoutException, InterruptedException {
//獲取連接
Connection connection = MQConnectionUtils.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
System.out.println("死信消費(fèi)者啟動(dòng) ..........");
Thread.sleep(65000);
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("死信隊(duì)列接收到消息:" + new String(body));
System.err.println("deliveryTag:" + envelope.getDeliveryTag());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(dlxQueueName, consumer);
}
消息被退回
這個(gè)我在之前的整合SpringBoot的時(shí)候有實(shí)驗(yàn)過。
channel.basicNack(envelope.getDeliveryTag(),false,false);
隊(duì)列達(dá)到最大長(zhǎng)度
這個(gè)和消息超時(shí)差不多,只不過是設(shè)置了隊(duì)列的最大容量而已。
只需要把上面的代碼修改一下就可以了。
@Test
public void producer() throws IOException, TimeoutException {
//獲取連接
Connection connection = MQConnectionUtils.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
Map<String, Object> arguments = new HashMap<String, Object>(16);
// 為隊(duì)列設(shè)置隊(duì)列交換器
arguments.put("x-dead-letter-exchange", dlxExchangeName);
//設(shè)置隊(duì)列長(zhǎng)度為3
arguments.put("x-max-length", 3);
//正常生產(chǎn)者綁定交換機(jī) 參數(shù)1 交換機(jī)名稱 參數(shù)2 交換機(jī)類型
channel.exchangeDeclare(DESTINATION_NAME, "topic", true, false, null);
//消費(fèi)聲明隊(duì)列
channel.queueDeclare(queueName, true, false, false, arguments);
//消費(fèi)者隊(duì)列綁定交換機(jī) 綁定路由件 路由鍵
channel.queueBind(queueName, DESTINATION_NAME, routingKey);
String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 測(cè)試消息超時(shí),傳遞到死信隊(duì)列";
// 創(chuàng)建死信交換器和隊(duì)列
channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
channel.queueDeclare(dlxQueueName, true, false, false, null);
channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);
//生產(chǎn)者發(fā)送消息者
for (int i = 0; i < 5; i++) {
channel.basicPublish(DESTINATION_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+i).getBytes());
}
System.out.println("消息發(fā)送完成......");
}

@Test
public void consumer() throws IOException, TimeoutException, InterruptedException {
//獲取連接
Connection connection = MQConnectionUtils.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
//此處設(shè)置一次只消費(fèi)1個(gè),且必須是ASK之后的消息才能算
channel.basicQos(1);
System.out.println("消費(fèi)者啟動(dòng) ..........");
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("正常隊(duì)列:" + new String(body));
System.out.println("deliveryTag:" + envelope.getDeliveryTag());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(queueName, consumer);
}
/**
* 監(jiān)聽死信隊(duì)列
*
* @throws IOException
* @throws TimeoutException
* @throws InterruptedException
*/
@Test
public void dlxConsumer() throws IOException, TimeoutException, InterruptedException {
//獲取連接
Connection connection = MQConnectionUtils.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
System.out.println("死信消費(fèi)者啟動(dòng) ..........");
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("死信隊(duì)列接收到消息:" + new String(body));
System.err.println("deliveryTag:" + envelope.getDeliveryTag());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(dlxQueueName, consumer);
}



鋒哥最新SpringCloud分布式電商秒殺課程發(fā)布
??????
??長(zhǎng)按上方微信二維碼 2 秒
感謝點(diǎn)贊支持下哈 
