SpringBoot集成RabbitMQ
點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號”
優(yōu)質(zhì)文章,第一時(shí)間送達(dá)
? 作者?|? 未夏
來源 |? urlify.cn/MV3qeq
66套java從入門到精通實(shí)戰(zhàn)課程分享
一、介紹
RabbitMQ是一個(gè)實(shí)現(xiàn)了AMQP協(xié)議(Advanced Message Queue Protocol)的消息隊(duì)列
AMQP信息模型如下圖所示:

RoutingKey,生產(chǎn)者分布信息時(shí),指定RoutingKey
BindingKey,表示把隊(duì)列綁定到交換機(jī)的路徑名
當(dāng)一個(gè)消息發(fā)布到 RabbitMQ 后,首先到達(dá)指定的Exchange并從消息中取出RoutingKey,由Exchange判斷同哪個(gè)BindingKey匹配,配對成功后 Exchange 把消息分配給 指定的Queue,消費(fèi)者從 Queue 中得到消息
Exchange有4種類型:
Fanout Exchange:忽略key對比,發(fā)送Message到Exchange下游綁定的所有Queue
Direct Exchange:比較Message的routing key和Queue的binding key,完全匹配時(shí),Message才會發(fā)送到該Queue
Topic Exchange:比較Message的routing key和Queue的binding key,按規(guī)則匹配成功時(shí),Message才會發(fā)送到該Queue(使用?
*?和?#?這2個(gè)通配符。*?- 匹配一個(gè)詞,#?- 匹配 0 個(gè)或多個(gè)詞)默認(rèn)Exchange:比較Message的routing key和Queue的名字,完全匹配時(shí),Message才會發(fā)送到該Queue
消息隊(duì)列是有序的,只有頭部的信息被消費(fèi)后,才能消費(fèi)下一個(gè)消息
二、SpringBoot集成RabbitMQ
2.1引入依賴
?org.springframework.boot
?spring-boot-starter-amqp
2.2配置RabbitMQ
配置主要分為3步:
聲明交換機(jī)
聲明隊(duì)列
將隊(duì)列綁定到隊(duì)列上
@Configuration
public?class?RabbitmqConfig?{
????public?final?static?String?EXCHANGE_TOPIC?=?"topicExchange";
????public?final?static?String?QUEUE_USER?=?"user_Queue";
????public?final?static?String?QUEUE_CITY?=?"city_Queue";
????public?final?static?String?QUEUE_DEVICE?=?"device_Queue";
????public?final?static?String?BINDINGKEY_ONE?=?"topic.a";
????public?final?static?String?BINDINGKEY_OTHER?=?"other.#";
????@Bean
????TopicExchange?topicExchange()?{
????????return?new?TopicExchange(EXCHANGE_TOPIC);
????}
????@Bean
????Queue?userQueue()?{
????????return?new?Queue(QUEUE_USER);
????}
????@Bean
????Queue?cityQueue()?{
????????return?new?Queue(QUEUE_CITY);
????}
????@Bean
????Queue?deviceQueue()?{
????????return?new?Queue(QUEUE_DEVICE);
????}
????@Bean
????Binding?userBinding(@Qualifier("userQueue")?Queue?queue,?TopicExchange?topicExchange)?{
????????return?BindingBuilder.bind(queue).to(topicExchange).with(BINDINGKEY_ONE);
????}
????@Bean
????Binding?cityBinding(@Qualifier("cityQueue")?Queue?queue,?TopicExchange?topicExchange)?{
????????return?BindingBuilder.bind(queue).to(topicExchange).with(BINDINGKEY_OTHER);
????}
????@Bean
????Binding?deviceBinding(@Qualifier("deviceQueue")?Queue?queue,?TopicExchange?topicExchange)?{
????????return?BindingBuilder.bind(queue).to(topicExchange).with(BINDINGKEY_OTHER);
????}
}
2.3生產(chǎn)信息
Spring提供模板方法AmqpTemplate來進(jìn)行信息的發(fā)布、接收
AmqpTemplate的convertAndSend需要指定發(fā)往的交換機(jī)名、RoutingKey以及信息內(nèi)容
下面進(jìn)行簡單封裝:
@Component
public?class?MessageSender?{
????@Autowired
????private?AmqpTemplate?amqpTemplate;
????public?void?send(String?routingkey,?String?message)?{
????????this.amqpTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPIC,?routingkey,?message);
????}
}單元測試:
@ExtendWith(SpringExtension.class)
@SpringBootTest
class?MessageSenderTest?{
????@Autowired
????private?MessageSender?messageSender;
????@Test
????public?void?userQueueTest()?{
????????messageSender.send("topic.a",?"wo?is?user");
????}
????@Test
????public?void?cityQueueTest()?{
????????messageSender.send("other.a",?"wo?is?city");
????}
????@Test
????public?void?deviceQueueTest()?{
????????messageSender.send("other.b",?"wo?is?device");
????}
}
2.4消費(fèi)消息
@RabbitListener注解聲明消息監(jiān)聽器,并指定要監(jiān)聽的隊(duì)列
@RabbitHandler注解聲明消息的具體處理方法
@Component
@RabbitListener(queues?=?RabbitmqConfig.QUEUE_CITY)
public?class?CityQueueListenter?{
????@RabbitHandler
????public?void?handle(String?message)?{
????????System.out.println("city-queue:?"?+?message);
????}
}
三、延時(shí)隊(duì)列
延時(shí)隊(duì)列是一種特殊的消息隊(duì)列
消息隊(duì)列是有序的,只有頭部的信息被消費(fèi)后,才能消費(fèi)下一個(gè)消息。延時(shí)隊(duì)列也遵循這個(gè)規(guī)則,不同于普通隊(duì)列是隊(duì)列里面的某個(gè)信息一旦到達(dá)指定期限將會被提權(quán)進(jìn)行優(yōu)先處理,而不管隊(duì)列前面有多少個(gè)消息在排隊(duì)
3.1利用死信隊(duì)列實(shí)現(xiàn)
RabbitMQ中有一個(gè)高級特性叫TTL(Time To Live),表示一條消息或者該隊(duì)列中的所有消息的最大存活時(shí)間,單位是毫秒
如果一條消息設(shè)置了TTL屬性或者進(jìn)入了設(shè)置TTL屬性的隊(duì)列,那么這條消息如果在TTL設(shè)置的時(shí)間內(nèi)沒有被消費(fèi),則會成為“死信”
“死信”是RabbitMQ中的一種消息機(jī)制,同普通的消息隊(duì)列沒有什么區(qū)別,只不過消息生成者由RabbitMQ充當(dāng)。如果隊(duì)列里的消息出現(xiàn)以下情況:
消息被否定確認(rèn)(使用?
channel.basicNack?或?channel.basicReject?),并且此時(shí)requeue?屬性被設(shè)置為false消息在隊(duì)列的存活時(shí)間超過設(shè)置的TTL時(shí)間
消息隊(duì)列的消息數(shù)量已經(jīng)超過最大隊(duì)列長度
RabbitMQ會將消息從原先的隊(duì)列抽取出來,發(fā)送到死信隊(duì)列里面
如果設(shè)置了隊(duì)列的TTL屬性,那么一旦消息過期,就會被隊(duì)列丟棄;如果消息單獨(dú)設(shè)置TTL屬性,消息即使過期,也不一定會被馬上丟棄,因?yàn)橄⑹欠襁^期是在即將投遞到消費(fèi)者之前判定的,如果當(dāng)前隊(duì)列有嚴(yán)重的消息積壓情況,則已過期的消息也許還能存活較長時(shí)間。
利用死信實(shí)現(xiàn)延時(shí)隊(duì)列:
通過
x-message-ttl為隊(duì)列設(shè)置TTL,一旦消息超時(shí)將被轉(zhuǎn)發(fā)給死信交換機(jī)通過
x-dead-letter-exchange為消息隊(duì)列綁定死信交換機(jī),將消息路由至死信隊(duì)列,然后消費(fèi)者監(jiān)聽死信隊(duì)列
Map?args?=?new?HashMap<>();
//?設(shè)置死信交換機(jī)
args.put("x-dead-letter-exchange",?DEAD_LETTER_EXCHANGE);
//?給成為死信的消息指定Routing?Key
args.put("x-dead-letter-routing-key",?DEAD_LETTER_QUEUEA_ROUTING_KEY);
//?設(shè)置隊(duì)列TTL,?凡進(jìn)入該隊(duì)列的消息超時(shí)后將成為死信
args.put("x-message-ttl",?6000);
//?返回延時(shí)隊(duì)列
return?QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build(); 僅在聲明延時(shí)隊(duì)列時(shí)指定上述設(shè)置,其余死信交換機(jī)、死信隊(duì)列以及消費(fèi)消息正常設(shè)置即可
3.2利用插件實(shí)現(xiàn)
為隊(duì)列指定TTL實(shí)現(xiàn)延時(shí)隊(duì)列的缺點(diǎn)是所有消息的過期時(shí)間一樣,不夠多樣化;如果為消息單獨(dú)指定TTL,無法保證消息超時(shí)一定變?yōu)樗佬?/span>
RabbitMQ官方提供一個(gè)插件bbitmq_delayed_message_exchange,解決上述問題
點(diǎn)擊官網(wǎng)下載插件,放到安裝目錄plugins文件夾下,然后轉(zhuǎn)到sbin目錄下執(zhí)行:
rabbitmq-plugins?enable?rabbitmq_delayed_message_exchange利用死信實(shí)現(xiàn)延時(shí)隊(duì)列需要聲明兩個(gè)交換機(jī)、兩個(gè)隊(duì)列以及為其中一個(gè)隊(duì)列設(shè)置TTL等屬性,而使用插件僅需配置一個(gè)CustomExchange類型的交換機(jī)即可:
@Configuration
public?class?RabbitmqConfig?{
????@Bean
????public?CustomExchange?delayExchange()?{
????????Map?args?=?new?HashMap<>();
????????args.put("x-delayed-type",?"direct");
????????return?new?CustomExchange("delayed_exchange",?"x-delayed-message",?true,?false,?args);
????}
????@Bean
????public?Queue?queue()?{
????????Queue?queue?=?new?Queue("delay_queue_1");
????????return?queue;
????}
????@Bean
????public?Binding?binding()?{
????????return?BindingBuilder.bind(queue()).to(delayExchange()).with("delay_queue_1").noargs();
????}
} CustomExchange必須是x-delayed-message類型
消息生產(chǎn)端,AmqpTemplate的convertAndSend方法需要傳入一個(gè)MessagePostProcessor實(shí)例設(shè)置消息的TTL
rabbitTemplate.convertAndSend("delayed_exchange",?"delay_queue_1",?msg,?new?MessagePostProcessor()?{
?@Override
?public?Message?postProcessMessage(Message?message)?throws?AmqpException?{
??message.getMessageProperties().setHeader("x-delay",?TTL);
??return?message;
?}
});消費(fèi)端代碼不用變更
四、參考
圖解RabbitMQ
一文帶你搞定RabbitMQ死信隊(duì)列
一文帶你搞定RabbitMQ延遲隊(duì)列
粉絲福利:Java從入門到入土學(xué)習(xí)路線圖
???

?長按上方微信二維碼?2 秒
感謝點(diǎn)贊支持下哈?
