RabbitMQ 延遲隊(duì)列實(shí)現(xiàn)定時(shí)任務(wù)的正確姿勢(shì),你學(xué)會(huì)了么?
閱讀本文大概需要 5.5 分鐘。
來(lái)自:blog.csdn.net/wantnrun/article/details/80401641
場(chǎng)景
Rabbitmq延遲隊(duì)列
死信交換機(jī)
一個(gè)消息被Consumer拒收了,并且reject方法的參數(shù)里requeue是false。也就是說(shuō)不會(huì)被再次放在隊(duì)列里,被其他消費(fèi)者使用。 上面的消息的TTL到了,消息過(guò)期了。 隊(duì)列的長(zhǎng)度限制滿了。排在前面的消息會(huì)被丟棄或者扔到死信路由上。
消息TTL(消息存活時(shí)間)
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);
當(dāng)上面的消息扔到隊(duì)列中后,過(guò)了60秒,如果沒(méi)有被消費(fèi),它就死了。不會(huì)被消費(fèi)者消費(fèi)到。這個(gè)消息后面的,沒(méi)有“死掉”的消息對(duì)頂上來(lái),被消費(fèi)者消費(fèi)。死信在隊(duì)列中并不會(huì)被刪除和釋放,它會(huì)被統(tǒng)計(jì)到隊(duì)列的消息數(shù)中去

創(chuàng)建交換機(jī)(Exchanges)和隊(duì)列(Queues)
創(chuàng)建死信交換機(jī)

創(chuàng)建自動(dòng)過(guò)期消息隊(duì)列

x-dead-letter-exchange代表消息過(guò)期后,消息要進(jìn)入的交換機(jī),這里配置的是delay,也就是死信交換機(jī),x-dead-letter-routing-key是配置消息過(guò)期后,進(jìn)入死信交換機(jī)的routing-key,跟發(fā)送消息的routing-key一個(gè)道理,根據(jù)這個(gè)key將消息放入不同的隊(duì)列創(chuàng)建消息處理隊(duì)列

消息隊(duì)列綁定到交換機(jī)



String msg = "hello word";
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("6000");
messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("delay", "delay",message);
messageProperties.setExpiration("6000");
注意:因?yàn)橐屜⒆詣?dòng)過(guò)期,所以一定不能設(shè)置delay_queue1的監(jiān)聽(tīng),不能讓這個(gè)隊(duì)列里面的消息被接受到,否則消息一旦被消費(fèi),就不存在過(guò)期了
package wang.raye.rabbitmq.demo1;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DelayQueue {
/** 消息交換機(jī)的名字*/
public static final String EXCHANGE = "delay";
/** 隊(duì)列key1*/
public static final String ROUTINGKEY1 = "delay";
/** 隊(duì)列key2*/
public static final String ROUTINGKEY2 = "delay_key";
/**
* 配置鏈接信息
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672);
connectionFactory.setUsername("kberp");
connectionFactory.setPassword("kberp");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true); // 必須要設(shè)置
return connectionFactory;
}
/**
* 配置消息交換機(jī)
* 針對(duì)消費(fèi)者配置
FanoutExchange: 將消息分發(fā)到所有的綁定隊(duì)列,無(wú)routingkey的概念
HeadersExchange :通過(guò)添加屬性key-value匹配
DirectExchange:按照routingkey分發(fā)到指定隊(duì)列
TopicExchange:多關(guān)鍵字匹配
*/
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE, true, false);
}
/**
* 配置消息隊(duì)列2
* 針對(duì)消費(fèi)者配置
* @return
*/
@Bean
public Queue queue() {
return new Queue("delay_queue2", true); //隊(duì)列持久
}
/**
* 將消息隊(duì)列2與交換機(jī)綁定
* 針對(duì)消費(fèi)者配置
* @return
*/
@Bean
@Autowired
public Binding binding() {
return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);
}
/**
* 接受消息的監(jiān)聽(tīng),這個(gè)監(jiān)聽(tīng)會(huì)接受消息隊(duì)列1的消息
* 針對(duì)消費(fèi)者配置
* @return
*/
@Bean
@Autowired
public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設(shè)置確認(rèn)模式手工確認(rèn)
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("delay_queue2 收到消息 : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認(rèn)消息成功消費(fèi)
}
});
return container;
}
}
總結(jié)
推薦閱讀:
剛進(jìn)美團(tuán),就被各種Code Review,真的有必要嗎?
別再用 kill -9 了,這才是微服務(wù)上下線的正確姿勢(shì)!
最近面試BAT,整理一份面試資料《Java面試BATJ通關(guān)手冊(cè)》,覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫(kù)、數(shù)據(jù)結(jié)構(gòu)等等。
朕已閱 
評(píng)論
圖片
表情

