rabbitMQ幾大常用消息隊(duì)列模型以及springboot集成rabbitMQ
????????????????????????????????點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”
優(yōu)質(zhì)文章,第一時(shí)間送達(dá)
76套java從入門(mén)到精通實(shí)戰(zhàn)課程分享
1.MQ引言
1.1 什么是MQ
MQ(Message Quene) : 翻譯為?消息隊(duì)列,通過(guò)典型的?生產(chǎn)者和消費(fèi)者模型,生產(chǎn)者不斷向消息隊(duì)列中生產(chǎn)消息,消費(fèi)者不斷的從隊(duì)列中獲取消息。因?yàn)橄⒌纳a(chǎn)和消費(fèi)都是異步的,而且只關(guān)心消息的發(fā)送和接收,沒(méi)有業(yè)務(wù)邏輯的侵入,輕松的實(shí)現(xiàn)系統(tǒng)間解耦。別名為?消息中間件?通過(guò)利用高效可靠的消息傳遞機(jī)制進(jìn)行平臺(tái)無(wú)關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來(lái)進(jìn)行分布式系統(tǒng)的集成。
1.2 MQ有哪些
當(dāng)今市面上有很多主流的消息中間件,如老牌的ActiveMQ、RabbitMQ,炙手可熱的Kafka,阿里巴巴自主開(kāi)發(fā)RocketMQ等。
1.3 不同MQ特點(diǎn)
#?1.ActiveMQ
? ActiveMQ 是Apache出品,最流行的,能力強(qiáng)勁的開(kāi)源消息總線。它是一個(gè)完全支持JMS規(guī)范的的消息中間件。豐富的API,多種集群架構(gòu)模式讓ActiveMQ在業(yè)界成為老牌的消息中間件,在中小型企業(yè)頗受歡迎!
#?2.Kafka
? Kafka是LinkedIn開(kāi)源的分布式發(fā)布-訂閱消息系統(tǒng),目前歸屬于Apache頂級(jí)項(xiàng)目。Kafka主要特點(diǎn)是基于Pull的模式來(lái)處理消息消費(fèi),
??追求高吞吐量,一開(kāi)始的目的就是用于日志收集和傳輸。0.8版本開(kāi)始支持復(fù)制,不支持事務(wù),對(duì)消息的重復(fù)、丟失、錯(cuò)誤沒(méi)有嚴(yán)格要求,
??適合產(chǎn)生大量數(shù)據(jù)的互聯(lián)網(wǎng)服務(wù)的數(shù)據(jù)收集業(yè)務(wù)。
#?3.RocketMQ
? RocketMQ是阿里開(kāi)源的消息中間件,它是純Java開(kāi)發(fā),具有高吞吐量、高可用性、適合大規(guī)模分布式系統(tǒng)應(yīng)用的特點(diǎn)。RocketMQ思路起
??源于Kafka,但并不是Kafka的一個(gè)Copy,它對(duì)消息的可靠傳輸及事務(wù)性做了優(yōu)化,目前在阿里集團(tuán)被廣泛應(yīng)用于交易、充值、流計(jì)算、消
??息推送、日志流式處理、binglog分發(fā)等場(chǎng)景。
#?4.RabbitMQ
? RabbitMQ是使用Erlang語(yǔ)言開(kāi)發(fā)的開(kāi)源消息隊(duì)列系統(tǒng),基于AMQP協(xié)議來(lái)實(shí)現(xiàn)。AMQP的主要特征是面向消息、隊(duì)列、路由(包括點(diǎn)對(duì)點(diǎn)和
??發(fā)布/訂閱)、可靠性、安全。AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi)對(duì)數(shù)據(jù)一致性、穩(wěn)定性和可靠性要求很高的場(chǎng)景,對(duì)性能和吞吐量的要求還在
??其次。
??
RabbitMQ比Kafka可靠,Kafka更適合IO高吞吐的處理,一般應(yīng)用在大數(shù)據(jù)日志處理或?qū)?shí)時(shí)性(少量延遲),可靠性(少量丟數(shù)據(jù))要求稍低的場(chǎng)景使用,比如ELK日志收集。
2.RabbitMQ 的引言
2.1 RabbitMQ
基于
AMQP協(xié)議,erlang語(yǔ)言開(kāi)發(fā),是部署最廣泛的開(kāi)源消息中間件,是最受歡迎的開(kāi)源消息中間件之一。
[
官網(wǎng): https://www.rabbitmq.com/
官方教程: https://www.rabbitmq.com/#getstarted
?#?AMQP?協(xié)議
?? AMQP(advanced message queuing protocol)`在2003年時(shí)被提出,最早用于解決金融領(lǐng)不同平臺(tái)之間的消息傳遞交互問(wèn)題。顧名思義,AMQP是一種協(xié)議,更準(zhǔn)確的說(shuō)是一種binary wire-level protocol(鏈接協(xié)議)。這是其和JMS的本質(zhì)差別,AMQP不從API層進(jìn)行限定,而是直接定義網(wǎng)絡(luò)交換的數(shù)據(jù)格式。這使得實(shí)現(xiàn)了AMQP的provider天然性就是跨平臺(tái)的。

3.1 第一種模型(直連)

在上圖的模型中,有以下概念:
P:生產(chǎn)者,也就是要發(fā)送消息的程序
C:消費(fèi)者:消息的接受者,會(huì)一直等待消息到來(lái)。
queue:消息隊(duì)列,圖中紅色部分。類似一個(gè)郵箱,可以緩存消息;生產(chǎn)者向其中投遞消息,消費(fèi)者從其中取出消息。
1. 開(kāi)發(fā)生產(chǎn)者
??//創(chuàng)建連接工廠
??ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
??connectionFactory.setHost("10.15.0.9");
??connectionFactory.setPort(5672);
??connectionFactory.setUsername("ems");
??connectionFactory.setPassword("123");
??connectionFactory.setVirtualHost("/ems");
??Connection?connection?=?connectionFactory.newConnection();
??//創(chuàng)建通道
??Channel?channel?=?connection.createChannel();
??//參數(shù)1:?是否持久化??參數(shù)2:是否獨(dú)占隊(duì)列?參數(shù)3:是否自動(dòng)刪除??參數(shù)4:其他屬性
??channel.queueDeclare("hello",true,false,false,null);
??channel.basicPublish("","hello",?null,"hello?rabbitmq".getBytes());
??channel.close();
??connection.close();
2. 開(kāi)發(fā)消費(fèi)者
??//創(chuàng)建連接工廠
??ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
??connectionFactory.setHost("127.0.0.1");
??connectionFactory.setPort(5672);
??connectionFactory.setUsername("ems");
??connectionFactory.setPassword("123");
??connectionFactory.setVirtualHost("/ems");
??Connection?connection?=?connectionFactory.newConnection();
??Channel?channel?=?connection.createChannel();
??channel.queueDeclare("hello",?true,?false,?false,?null);
??channel.basicConsume("hello",true,new?DefaultConsumer(channel){
????@Override
????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
??????System.out.println(new?String(body));
????}
??});
3. 參數(shù)的說(shuō)明
??channel.queueDeclare("hello",true,false,false,null);
??'參數(shù)1':用來(lái)聲明通道對(duì)應(yīng)的隊(duì)列
??'參數(shù)2':用來(lái)指定是否持久化隊(duì)列
??'參數(shù)3':用來(lái)指定是否獨(dú)占隊(duì)列
??'參數(shù)4':用來(lái)指定是否自動(dòng)刪除隊(duì)列
??'參數(shù)5':對(duì)隊(duì)列的額外配置
3.2 第二種模型(work quene)
Work queues,也被稱為(Task queues),任務(wù)模型。當(dāng)消息處理比較耗時(shí)的時(shí)候,可能生產(chǎn)消息的速度會(huì)遠(yuǎn)遠(yuǎn)大于消息的消費(fèi)速度。長(zhǎng)此以往,消息就會(huì)堆積越來(lái)越多,無(wú)法及時(shí)處理。此時(shí)就可以使用work 模型:讓多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,共同消費(fèi)隊(duì)列中的消息。隊(duì)列中的消息一旦消費(fèi),就會(huì)消失,因此任務(wù)是不會(huì)被重復(fù)執(zhí)行的。

角色:
P:生產(chǎn)者:任務(wù)的發(fā)布者
C1:消費(fèi)者-1,領(lǐng)取任務(wù)并且完成任務(wù),假設(shè)完成速度較慢
C2:消費(fèi)者-2:領(lǐng)取任務(wù)并完成任務(wù),假設(shè)完成速度快
1. 開(kāi)發(fā)生產(chǎn)者
channel.queueDeclare("hello",?true,?false,?false,?null);
for?(int?i?=?0;?i?10;?i++)?{
??channel.basicPublish("",?"hello",?null,?(i+"====>:我是消息").getBytes());
}
2.開(kāi)發(fā)消費(fèi)者-1
channel.queueDeclare("hello",true,false,false,null);
channel.basicConsume("hello",true,new?DefaultConsumer(channel){
??@Override
??public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????System.out.println("消費(fèi)者1:?"+new?String(body));
??}
});
3.開(kāi)發(fā)消費(fèi)者-2
channel.queueDeclare("hello",true,false,false,null);
channel.basicConsume("hello",true,new?DefaultConsumer(channel){
??@Override
??public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????try?{
??????Thread.sleep(1000);???//處理消息比較慢?一秒處理一個(gè)消息
????}?catch?(InterruptedException?e)?{
??????e.printStackTrace();
????}
????System.out.println("消費(fèi)者2:?"+new?String(body));??
??}
});
4.測(cè)試結(jié)果


總結(jié):默認(rèn)情況下,RabbitMQ將按順序?qū)⒚總€(gè)消息發(fā)送給下一個(gè)使用者。平均而言,每個(gè)消費(fèi)者都會(huì)收到相同數(shù)量的消息。這種分發(fā)消息的方式稱為循環(huán)。
5.消息自動(dòng)確認(rèn)機(jī)制
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.
But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker.
channel.basicQos(1);//一次只接受一條未確認(rèn)的消息
//參數(shù)2:關(guān)閉自動(dòng)確認(rèn)消息
channel.basicConsume("hello",false,new?DefaultConsumer(channel){
??@Override
??public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????System.out.println("消費(fèi)者1:?"+new?String(body));
????channel.basicAck(envelope.getDeliveryTag(),false);//手動(dòng)確認(rèn)消息
??}
});
設(shè)置通道一次只能消費(fèi)一個(gè)消息
關(guān)閉消息的自動(dòng)確認(rèn),開(kāi)啟手動(dòng)確認(rèn)消息


3.3 第三種模型(fanout)
fanout 扇出 也稱為廣播

在廣播模式下,消息發(fā)送流程是這樣的:
可以有多個(gè)消費(fèi)者
每個(gè)消費(fèi)者有自己的queue(隊(duì)列)
每個(gè)隊(duì)列都要綁定到Exchange(交換機(jī))
生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機(jī),交換機(jī)來(lái)決定要發(fā)給哪個(gè)隊(duì)列,生產(chǎn)者無(wú)法決定。
交換機(jī)把消息發(fā)送給綁定過(guò)的所有隊(duì)列
隊(duì)列的消費(fèi)者都能拿到消息。實(shí)現(xiàn)一條消息被多個(gè)消費(fèi)者消費(fèi)
1. 開(kāi)發(fā)生產(chǎn)者
//聲明交換機(jī)
channel.exchangeDeclare("logs","fanout");//廣播?一條消息多個(gè)消費(fèi)者同時(shí)消費(fèi)
//發(fā)布消息
channel.basicPublish("logs","",null,"hello".getBytes());
2. 開(kāi)發(fā)消費(fèi)者-1
//綁定交換機(jī)
channel.exchangeDeclare("logs","fanout");
//創(chuàng)建臨時(shí)隊(duì)列
String?queue?=?channel.queueDeclare().getQueue();
//將臨時(shí)隊(duì)列綁定exchange
channel.queueBind(queue,"logs","");
//處理消息
channel.basicConsume(queue,true,new?DefaultConsumer(channel){
??@Override
??public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????System.out.println("消費(fèi)者1:?"+new?String(body));
??}
});
3. 開(kāi)發(fā)消費(fèi)者-2
//綁定交換機(jī)
channel.exchangeDeclare("logs","fanout");
//創(chuàng)建臨時(shí)隊(duì)列
String?queue?=?channel.queueDeclare().getQueue();
//將臨時(shí)隊(duì)列綁定exchange
channel.queueBind(queue,"logs","");
//處理消息
channel.basicConsume(queue,true,new?DefaultConsumer(channel){
??@Override
??public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????System.out.println("消費(fèi)者2:?"+new?String(body));
??}
});
4.開(kāi)發(fā)消費(fèi)者-3
//綁定交換機(jī)
channel.exchangeDeclare("logs","fanout");
//創(chuàng)建臨時(shí)隊(duì)列
String?queue?=?channel.queueDeclare().getQueue();
//將臨時(shí)隊(duì)列綁定exchange
channel.queueBind(queue,"logs","");
//處理消息
channel.basicConsume(queue,true,new?DefaultConsumer(channel){
??@Override
??public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????System.out.println("消費(fèi)者3:?"+new?String(body));
??}
});
5. 測(cè)試結(jié)果



3.4 第四種模型(Routing)
3.4.1 Routing 之訂閱模型-Direct(直連)
在Fanout模式中,一條消息,會(huì)被所有訂閱的隊(duì)列都消費(fèi)。但是,在某些場(chǎng)景下,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到Direct類型的Exchange。
在Direct模型下:
隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)
RoutingKey(路由key)消息的發(fā)送方在 向 Exchange發(fā)送消息時(shí),也必須指定消息的?
RoutingKey。Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的
Routing Key進(jìn)行判斷,只有隊(duì)列的Routingkey與消息的?Routing key完全一致,才會(huì)接收到消息
流程:

圖解:
P:生產(chǎn)者,向Exchange發(fā)送消息,發(fā)送消息時(shí),會(huì)指定一個(gè)routing key。
X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給 與routing key完全匹配的隊(duì)列
C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 error 的消息
C2:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 info、error、warning 的消息
1. 開(kāi)發(fā)生產(chǎn)者
//聲明交換機(jī)??參數(shù)1:交換機(jī)名稱?參數(shù)2:交換機(jī)類型?基于指令的Routing?key轉(zhuǎn)發(fā)
channel.exchangeDeclare("logs_direct","direct");
String?key?=?"";
//發(fā)布消息
channel.basicPublish("logs_direct",key,null,("指定的route?key"+key+"的消息").getBytes());
2.開(kāi)發(fā)消費(fèi)者-1
?//聲明交換機(jī)
channel.exchangeDeclare("logs_direct","direct");
//創(chuàng)建臨時(shí)隊(duì)列
String?queue?=?channel.queueDeclare().getQueue();
//綁定隊(duì)列和交換機(jī)
channel.queueBind(queue,"logs_direct","error");
channel.queueBind(queue,"logs_direct","info");
channel.queueBind(queue,"logs_direct","warn");
//消費(fèi)消息
channel.basicConsume(queue,true,new?DefaultConsumer(channel){
??@Override
??public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????System.out.println("消費(fèi)者1:?"+new?String(body));
??}
});
3.開(kāi)發(fā)消費(fèi)者-2
//聲明交換機(jī)
channel.exchangeDeclare("logs_direct","direct");
//創(chuàng)建臨時(shí)隊(duì)列
String?queue?=?channel.queueDeclare().getQueue();
//綁定隊(duì)列和交換機(jī)
channel.queueBind(queue,"logs_direct","error");
//消費(fèi)消息
channel.basicConsume(queue,true,new?DefaultConsumer(channel){
??@Override
??public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????System.out.println("消費(fèi)者2:?"+new?String(body));
??}
});
4.測(cè)試生產(chǎn)者發(fā)送Route key為error的消息時(shí)


5.測(cè)試生產(chǎn)者發(fā)送Route key為info的消息時(shí)


3.4.2 Routing 之訂閱模型-Topic
Topic類型的Exchange與Direct相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊(duì)列。只不過(guò)Topic類型Exchange可以讓隊(duì)列在綁定Routing key?的時(shí)候使用通配符!這種模型Routingkey?一般都是由一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割,例如:?item.insert

#?統(tǒng)配符
??*?(star)?can?substitute?for?exactly?one?word.????匹配不多不少恰好1個(gè)詞
??#?(hash)?can?substitute?for?zero?or?more?words.??匹配一個(gè)或多個(gè)詞
#?如:
??audit.#????匹配audit.irs.corporate或者?audit.irs?等
????audit.*???只能匹配?audit.irs
1.開(kāi)發(fā)生產(chǎn)者
//生命交換機(jī)和交換機(jī)類型?topic?使用動(dòng)態(tài)路由(通配符方式)
channel.exchangeDeclare("topics","topic");
String?routekey?=?"user.save";//動(dòng)態(tài)路由key
//發(fā)布消息
channel.basicPublish("topics",routekey,null,("這是路由中的動(dòng)態(tài)訂閱模型,route?key:?["+routekey+"]").getBytes());
2.開(kāi)發(fā)消費(fèi)者-1
Routing Key中使用*通配符方式
?//聲明交換機(jī)
channel.exchangeDeclare("topics","topic");
//創(chuàng)建臨時(shí)隊(duì)列
String?queue?=?channel.queueDeclare().getQueue();
//綁定隊(duì)列與交換機(jī)并設(shè)置獲取交換機(jī)中動(dòng)態(tài)路由
channel.queueBind(queue,"topics","user.*");
//消費(fèi)消息
channel.basicConsume(queue,true,new?DefaultConsumer(channel){
??@Override
??public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????System.out.println("消費(fèi)者1:?"+new?String(body));
??}
});
3.開(kāi)發(fā)消費(fèi)者-2
Routing Key中使用#通配符方式
//聲明交換機(jī)
channel.exchangeDeclare("topics","topic");
//創(chuàng)建臨時(shí)隊(duì)列
String?queue?=?channel.queueDeclare().getQueue();
//綁定隊(duì)列與交換機(jī)并設(shè)置獲取交換機(jī)中動(dòng)態(tài)路由
channel.queueBind(queue,"topics","user.#");
//消費(fèi)消息
channel.basicConsume(queue,true,new?DefaultConsumer(channel){
??@Override
??public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????System.out.println("消費(fèi)者2:?"+new?String(body));
??}
});
4.測(cè)試結(jié)果


4. SpringBoot中使用RabbitMQ
4.0 搭建初始環(huán)境
1. 引入依賴
??org.springframework.boot
??spring-boot-starter-amqp
2. 配置配置文件
spring:
??application:
????name:?springboot_rabbitmq
??rabbitmq:
????host:?10.15.0.9
????port:?5672
????username:?ems
????password:?123
????virtual-host:?/ems
RabbitTemplate?用來(lái)簡(jiǎn)化操作 使用時(shí)候直接在項(xiàng)目中注入即可使用
4.1 第一種hello world模型使用
1、開(kāi)發(fā)生產(chǎn)者
@Autowired
private?RabbitTemplate?rabbitTemplate;
@Test
public?void?testHello(){
??rabbitTemplate.convertAndSend("hello","hello?world");
}2、開(kāi)發(fā)消費(fèi)者
@Component
@RabbitListener(queuesToDeclare?=?@Queue("hello"))
public?class?HelloCustomer?{
????@RabbitHandler
????public?void?receive1(String?message){
????????System.out.println("message?=?"?+?message);
????}
}
4.2 第二種work模型使用
1、開(kāi)發(fā)生產(chǎn)者
@Autowired
private?RabbitTemplate?rabbitTemplate;
@Test
public?void?testWork(){
??for?(int?i?=?0;?i?10;?i++)?{
????rabbitTemplate.convertAndSend("work","hello?work!");
??}
}2、開(kāi)發(fā)消費(fèi)者
@Component
public?class?WorkCustomer?{
????@RabbitListener(queuesToDeclare?=?@Queue("work"))
????public?void?receive1(String?message){
????????System.out.println("work?message1?=?"?+?message);
????}
????@RabbitListener(queuesToDeclare?=?@Queue("work"))
????public?void?receive2(String?message){
????????System.out.println("work?message2?=?"?+?message);
????}
}
說(shuō)明:默認(rèn)在Spring AMQP實(shí)現(xiàn)中Work這種方式就是公平調(diào)度,如果需要實(shí)現(xiàn)能者多勞需要額外配置
4.3 Fanout 廣播模型
1、開(kāi)發(fā)生產(chǎn)者
@Autowired
private?RabbitTemplate?rabbitTemplate;
@Test
public?void?testFanout()?throws?InterruptedException?{
??rabbitTemplate.convertAndSend("logs","","這是日志廣播");
}2、開(kāi)發(fā)消費(fèi)者
@Component
public?class?FanoutCustomer?{
????@RabbitListener(bindings?=?@QueueBinding(
????????????value?=?@Queue,
????????????exchange?=?@Exchange(name="logs",type?=?"fanout")
????))
????public?void?receive1(String?message){
????????System.out.println("message1?=?"?+?message);
????}
????@RabbitListener(bindings?=?@QueueBinding(
????????????value?=?@Queue,?//創(chuàng)建臨時(shí)隊(duì)列
????????????exchange?=?@Exchange(name="logs",type?=?"fanout")??//綁定交換機(jī)類型
????))
????public?void?receive2(String?message){
????????System.out.println("message2?=?"?+?message);
????}
}
4.4 Route 路由模型
1、開(kāi)發(fā)生產(chǎn)者
@Autowired
private?RabbitTemplate?rabbitTemplate;
@Test
public?void?testDirect(){
??rabbitTemplate.convertAndSend("directs","error","error?的日志信息");
}2、開(kāi)發(fā)消費(fèi)者
@Component
public?class?DirectCustomer?{
????@RabbitListener(bindings?={
????????????@QueueBinding(
????????????????????value?=?@Queue(),
????????????????????key={"info","error"},
????????????????????exchange?=?@Exchange(type?=?"direct",name="directs")
????????????)})
????public?void?receive1(String?message){
????????System.out.println("message1?=?"?+?message);
????}
????@RabbitListener(bindings?={
????????????@QueueBinding(
????????????????????value?=?@Queue(),
????????????????????key={"error"},
????????????????????exchange?=?@Exchange(type?=?"direct",name="directs")
????????????)})
????public?void?receive2(String?message){
????????System.out.println("message2?=?"?+?message);
????}
}
4.5 Topic 訂閱模型(動(dòng)態(tài)路由模型)
1、開(kāi)發(fā)生產(chǎn)者
@Autowired
private?RabbitTemplate?rabbitTemplate;
//topic
@Test
public?void?testTopic(){
??rabbitTemplate.convertAndSend("topics","user.save.findAll","user.save.findAll?的消息");
}2、開(kāi)發(fā)消費(fèi)者
@Component
public?class?TopCustomer?{
????@RabbitListener(bindings?=?{
????????????@QueueBinding(
????????????????????value?=?@Queue,
????????????????????key?=?{"user.*"},
????????????????????exchange?=?@Exchange(type?=?"topic",name?=?"topics")
????????????)
????})
????public?void?receive1(String?message){
????????System.out.println("message1?=?"?+?message);
????}
????@RabbitListener(bindings?=?{
????????????@QueueBinding(
????????????????????value?=?@Queue,
????????????????????key?=?{"user.#"},
????????????????????exchange?=?@Exchange(type?=?"topic",name?=?"topics")
????????????)
????})
????public?void?receive2(String?message){
????????System.out.println("message2?=?"?+?message);
????}
}
版權(quán)聲明:本文為博主原創(chuàng)文章,遵循 CC 4.0 BY-SA 版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接和本聲明。
本文鏈接:
https://blog.csdn.net/weixin_45827693/article/details/113408116
粉絲福利:Java從入門(mén)到入土學(xué)習(xí)路線圖
??????

??長(zhǎng)按上方微信二維碼?2 秒
感謝點(diǎn)贊支持下哈?
