SpringBoot整合rabbitMq實(shí)現(xiàn)消息延時(shí)發(fā)送
點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”
優(yōu)質(zhì)文章,第一時(shí)間送達(dá)
? 作者?|??山陰路的秋天
來源 |? urlify.cn/VRfei2
66套java從入門到精通實(shí)戰(zhàn)課程分享
實(shí)現(xiàn)思路:利用mq的ttl設(shè)置消息失效時(shí)間?當(dāng)達(dá)到設(shè)置時(shí)間后通過交換機(jī)到達(dá)死信隊(duì)列中,消費(fèi)者端綁定讀取死信隊(duì)列中信息來達(dá)到延時(shí)發(fā)送消息的功能。
demo 如下:
(1)在pom.xml?中引入rabbitMq相關(guān)包
????????
????????????org.springframework.boot
????????????spring-boot-starter-amqp
????????(2)創(chuàng)建rabbitMq連接的工具類
public?class?MqConnectionUtil?{
????public?static?Connection?getConnection()?throws?Exception?{
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????factory.setHost("localhost");
????????factory.setPort(5672);
????????factory.setVirtualHost("virtualHost_wl");
????????factory.setUsername("admin");
????????factory.setPassword("admin");
????????Connection?connection?=?factory.newConnection();
????????return?connection;
????}
}
(3)創(chuàng)建生產(chǎn)者發(fā)送消息
public?class?Send?{
????private?final?static?String?QUEUE_NAME?=?"msg_ttl_queue";
????public?static?void?main(String[]?argv)?throws?Exception?{
????????//獲取連接
????????Connection?connection?=?MqConnectionUtil.getConnection();
????????//創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//設(shè)置延時(shí)隊(duì)列
????????HashMap?args?=?new?HashMap<>();
????????args.put("x-dead-letter-exchange",?"delay-exchange");
????????args.put("x-dead-letter-routing-key",?"msg_ttl_routingKey");
????????channel.queueDeclare("delay_queue",?true,?false,?false,?args);
????????//聲明隊(duì)列
????????channel.queueDeclare(QUEUE_NAME,?true,?false,?false,?null);
????????//綁定路由(delay-queue隊(duì)列消息失效之后轉(zhuǎn)發(fā)到msg_ttl_queue中)
????????channel.queueBind(QUEUE_NAME,?"delay-exchange",?"msg_ttl_routingKey");
????????//設(shè)置延時(shí)屬性
????????AMQP.BasicProperties.Builder?builder?=?new?AMQP.BasicProperties.Builder();
????????//設(shè)置延時(shí)時(shí)間?1分鐘
????????AMQP.BasicProperties?properties?=?builder.expiration("60000").deliveryMode(2).build();
????????//消息內(nèi)容
????????SimpleDateFormat?sft?=?new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss");
????????String?message?=?"生產(chǎn)者創(chuàng)建時(shí)間:?"?+?sft.format(new?Date());
????????channel.basicPublish("",?"delay_queue",?properties,?message.getBytes());
????????System.out.println("Sent:?'"?+?message?+?"'");
????????//關(guān)閉通道和連接
????????channel.close();
????????connection.close();
????}
}
(4)創(chuàng)建對(duì)應(yīng)的消費(fèi)者接受消息
public?class?Receive?{
????private?final?static?String?QUEUE_NAME?=?"msg_ttl_queue";
????public?static?void?main(String[]?argv)?throws?Exception?{
????????//獲取連接
????????Connection?connection?=?MqConnectionUtil.getConnection();
????????//創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//?聲明隊(duì)列
????????channel.queueDeclare(QUEUE_NAME,?true,?false,?false,?null);
????????//?定義隊(duì)列的消費(fèi)者
????????MyConsumer?myConsumer?=?new?MyConsumer(channel);
????????//?監(jiān)聽隊(duì)列
????????channel.basicConsume(QUEUE_NAME,?true,?myConsumer);
????}
}
自定義消費(fèi)者類為:
public?class?MyConsumer?extends?DefaultConsumer?{
????public?MyConsumer(Channel?channel)?{
????????super(channel);
????}
????@Override
????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????SimpleDateFormat?sft?=?new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss");
????????System.out.println("消費(fèi)者接受時(shí)間:"?+?sft.format(new?Date()));
????????System.out.println("consumerTag:?"?+?consumerTag);
????????System.out.println("envelope:?"?+?envelope);
????????System.out.println("properties:?"?+?properties);
????????System.out.println("body:?"?+?new?String(body));
????}
}
生產(chǎn)者發(fā)送消息到延時(shí)隊(duì)列:

?
?到達(dá)設(shè)定的失效時(shí)間1分鐘后到達(dá)指定的隊(duì)列中:

?
?設(shè)置的延時(shí)交換機(jī)為:

?
?
控制臺(tái)打印信息為:

粉絲福利:實(shí)戰(zhàn)springboot+CAS單點(diǎn)登錄系統(tǒng)視頻教程免費(fèi)領(lǐng)取
???
?長按上方微信二維碼?2 秒即可獲取資料感謝點(diǎn)贊支持下哈?![]()
評(píng)論
圖片
表情
