RabbitMQ 延遲隊列實現(xiàn)定時任務(wù),這才是正確的方式!
點擊關(guān)注公眾號,Java干貨及時送達(dá)
場景
開發(fā)中經(jīng)常需要用到定時任務(wù),對于商城來說,定時任務(wù)尤其多,比如優(yōu)惠券定時過期、訂單定時關(guān)閉、微信支付2小時未支付關(guān)閉訂單等等,都需要用到定時任務(wù),但是定時任務(wù)本身有一個問題。
一般來說我們都是通過定時輪詢查詢數(shù)據(jù)庫來判斷是否有任務(wù)需要執(zhí)行,也就是說不管怎么樣,我們需要先查詢數(shù)據(jù)庫,而且有些任務(wù)對時間準(zhǔn)確要求比較高的,需要每秒查詢一次,對于系統(tǒng)小倒是無所謂,如果系統(tǒng)本身就大而且數(shù)據(jù)也多的情況下,這就不大現(xiàn)實了,所以需要其他方式的,當(dāng)然實現(xiàn)的方式有多種多樣的,比如Redis實現(xiàn)定時隊列、基于優(yōu)先級隊列的JDK延遲隊列、時間輪等。
因為我們項目中本身就使用到了Rabbitmq,所以基于方便開發(fā)和維護(hù)的原則,我們使用了Rabbitmq延遲隊列來實現(xiàn)定時任務(wù),不知道rabbitmq是什么的和不知道springboot怎么集成Rabbitmq的可以在公從號Java技術(shù)棧查看我之前的文章Spring boot集成RabbitMQ。
Spring Boot 最全基礎(chǔ)教程和示例代碼:https://github.com/javastacks/spring-boot-best-practice
Rabbitmq延遲隊列
Rabbitmq本身是沒有延遲隊列的,只能通過Rabbitmq本身隊列的特性來實現(xiàn),想要Rabbitmq實現(xiàn)延遲隊列,需要使用Rabbitmq的死信交換機(Exchange)和消息的存活時間TTL(Time To Live)
死信交換機
一個消息在滿足如下條件下,會進(jìn)死信交換機,記住這里是交換機而不是隊列,一個交換機可以對應(yīng)很多隊列。
一個消息被Consumer拒收了,并且reject方法的參數(shù)里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。
上面的消息的TTL到了,消息過期了。
隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。
死信交換機就是普通的交換機,只是因為我們把過期的消息扔進(jìn)去,所以叫死信交換機,并不是說死信交換機是某種特定的交換機。另外,MySQL 系列面試題和答案全部整理好了,微信搜索Java技術(shù)棧,在后臺發(fā)送:面試,可以在線閱讀。
消息TTL(消息存活時間)
消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設(shè)置TTL。對隊列設(shè)置就是隊列沒有消費者連著的保留時間,也可以對每一個單獨的消息做單獨的設(shè)置。超過了這個時間,我們認(rèn)為這個消息就死了,稱之為死信。
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);
處理流程圖

創(chuàng)建交換機(Exchanges)和隊列(Queues)
創(chuàng)建死信交換機

如圖所示,就是創(chuàng)建一個普通的交換機,這里為了方便區(qū)分,把交換機的名字取為:delay。
創(chuàng)建自動過期消息隊列
這個隊列的主要作用是讓消息定時過期的,比如我們需要2小時候關(guān)閉訂單,我們就需要把消息放進(jìn)這個隊列里面,把消息過期時間設(shè)置為2小時

創(chuàng)建一個一個名為delay_queue1的自動過期的隊列,當(dāng)然圖片上面的參數(shù)并不會讓消息自動過期,因為我們并沒有設(shè)置x-message-ttl參數(shù),如果整個隊列的消息有消息都是相同的,可以設(shè)置。
這里為了靈活,所以并沒有設(shè)置,另外兩個參數(shù)x-dead-letter-exchange代表消息過期后,消息要進(jìn)入的交換機,這里配置的是delay,也就是死信交換機,x-dead-letter-routing-key是配置消息過期后,進(jìn)入死信交換機的routing-key,跟發(fā)送消息的routing-key一個道理,根據(jù)這個key將消息放入不同的隊列。
創(chuàng)建消息處理隊列
這個隊列才是真正處理消息的隊列,所有進(jìn)入這個隊列的消息都會被處理

消息隊列的名字為delay_queue2
消息隊列綁定到交換機
進(jìn)入交換機詳情頁面,將創(chuàng)建的2個隊列(delayqueue1和delayqueue2)綁定到交換機上面

自動過期消息隊列的routing key 設(shè)置為delay
綁定delayqueue2

delay*queue2 的key要設(shè)置為創(chuàng)建自動過期的隊列的x-dead-letter-routing-key參數(shù),這樣當(dāng)消息過期的時候就可以自動把消息放入delay_queue2這個隊列中了
綁定后的管理頁面如下圖:

當(dāng)然這個綁定也可以使用代碼來實現(xiàn),只是為了直觀表現(xiàn),所以本文使用的管理平臺來操作。
發(fā)送消息
String msg = "hello word"; MessageProperties messageProperties = newMessageProperties(); messageProperties.setExpiration("6000"); messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes()); Message message = newMessage(msg.getBytes(), messageProperties); rabbitTemplate.convertAndSend("delay", "delay",message);
主要的代碼就是
messageProperties.setExpiration("6000");
設(shè)置了讓消息6秒后過期
注意:因為要讓消息自動過期,所以一定不能設(shè)置delay_queue1的監(jiān)聽,不能讓這個隊列里面的消息被接受到,否則消息一旦被消費,就不存在過期了。另外,RabbitMQ 系列面試題和答案全部整理好了,微信搜索Java技術(shù)棧,在后臺發(fā)送:面試,可以在線閱讀。
接收消息
接收消息配置好delay_queue2的監(jiān)聽就好了
package wang.raye.rabbitmq.demo1;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DelayQueue {
/** 消息交換機的名字*/
public static final String EXCHANGE = "delay";
/** 隊列key1*/
public static final String ROUTINGKEY1 = "delay";
/** 隊列key2*/
public static final String ROUTINGKEY2 = "delay_key";
/**
* 配置鏈接信息
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672);
connectionFactory.setUsername("kberp");
connectionFactory.setPassword("kberp");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true); // 必須要設(shè)置
return connectionFactory;
}
/**
* 配置消息交換機
* 針對消費者配置
FanoutExchange: 將消息分發(fā)到所有的綁定隊列,無routingkey的概念
HeadersExchange :通過添加屬性key-value匹配
DirectExchange:按照routingkey分發(fā)到指定隊列
TopicExchange:多關(guān)鍵字匹配
*/
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE, true, false);
}
/**
* 配置消息隊列2
* 針對消費者配置
* @return
*/
@Bean
public Queue queue() {
return new Queue("delay_queue2", true); //隊列持久
}
/**
* 將消息隊列2與交換機綁定
* 針對消費者配置
* @return
*/
@Bean
@Autowired
public Binding binding() {
return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);
}
/**
* 接受消息的監(jiān)聽,這個監(jiān)聽會接受消息隊列1的消息
* 針對消費者配置
* @return
*/
@Bean
@Autowired
public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設(shè)置確認(rèn)模式手工確認(rèn)
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("delay_queue2 收到消息 : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認(rèn)消息成功消費
}
});
return container;
}
}
總結(jié)
基于Rabbitmq實現(xiàn)定時任務(wù),就是將消息設(shè)置一個過期時間,放入一個沒有讀取的隊列中,讓消息過期后自動轉(zhuǎn)入另外一個隊列中,監(jiān)控這個隊列消息的監(jiān)聽處來處理定時任務(wù)具體的操作。最后,關(guān)注公眾號Java技術(shù)棧,在后臺回復(fù):面試,可以獲取我整理的 Java、RabbitMQ 系列面試題和答案,非常齊全。
原文鏈接:https://blog.csdn.net/wantnrun/article/details/80401641
版權(quán)聲明:本文為CSDN博主「RayeWang」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明。






關(guān)注Java技術(shù)??锤喔韶?/strong>


