Spring Boot整合RabbitMQ詳細(xì)教程
點擊上方藍(lán)色字體,選擇“標(biāo)星公眾號”
優(yōu)質(zhì)文章,第一時間送達(dá)
? 作者?|??小小丑年紀(jì)
來源 |? urlify.cn/MZvAry
1.首先我們簡單了解一下消息中間件的應(yīng)用場景
異步處理
場景說明:用戶注冊后,需要發(fā)注冊郵件和注冊短信,傳統(tǒng)的做法有兩種1.串行的方式;2.并行的方式?
(1)串行方式:將注冊信息寫入數(shù)據(jù)庫后,發(fā)送注冊郵件,再發(fā)送注冊短信,以上三個任務(wù)全部完成后才返回給客戶端。這有一個問題是,郵件,短信并不是必須的,它只是一個通知,而這種做法讓客戶端等待沒有必要等待的東西. 
(2)并行方式:將注冊信息寫入數(shù)據(jù)庫后,發(fā)送郵件的同時,發(fā)送短信,以上三個任務(wù)完成后,返回給客戶端,并行的方式能提高處理的時間。?
?
假設(shè)三個業(yè)務(wù)節(jié)點分別使用50ms,串行方式使用時間150ms,并行使用時間100ms。雖然并性已經(jīng)提高的處理時間,但是,前面說過,郵件和短信對我正常的使用網(wǎng)站沒有任何影響,客戶端沒有必要等著其發(fā)送完成才顯示注冊成功,應(yīng)該是寫入數(shù)據(jù)庫后就返回.
(3)消息隊列?
引入消息隊列后,把發(fā)送郵件,短信不是必須的業(yè)務(wù)邏輯異步處理?
?
由此可以看出,引入消息隊列后,用戶的響應(yīng)時間就等于寫入數(shù)據(jù)庫的時間+寫入消息隊列的時間(可以忽略不計),引入消息隊列后處理后,響應(yīng)時間是串行的3倍,是并行的2倍。
?應(yīng)用解耦
場景:雙11是購物狂節(jié),用戶下單后,訂單系統(tǒng)需要通知庫存系統(tǒng),傳統(tǒng)的做法就是訂單系統(tǒng)調(diào)用庫存系統(tǒng)的接口.
?
這種做法有一個缺點:
當(dāng)庫存系統(tǒng)出現(xiàn)故障時,訂單就會失敗。
訂單系統(tǒng)和庫存系統(tǒng)高耦合.?
引入消息隊列?
訂單系統(tǒng):用戶下單后,訂單系統(tǒng)完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。
庫存系統(tǒng):訂閱下單的消息,獲取下單消息,進(jìn)行庫操作。?
就算庫存系統(tǒng)出現(xiàn)故障,消息隊列也能保證消息的可靠投遞,不會導(dǎo)致消息丟失。
流量削峰
流量削峰一般在秒殺活動中應(yīng)用廣泛?
場景:秒殺活動,一般會因為流量過大,導(dǎo)致應(yīng)用掛掉,為了解決這個問題,一般在應(yīng)用前端加入消息隊列。?
作用:?
1.可以控制活動人數(shù),超過此一定閥值的訂單直接丟棄(我為什么秒殺一次都沒有成功過呢^^)?
2.可以緩解短時間的高流量壓垮應(yīng)用(應(yīng)用程序按自己的最大處理能力獲取訂單)?
?
1.用戶的請求,服務(wù)器收到之后,首先寫入消息隊列,加入消息隊列長度超過最大值,則直接拋棄用戶請求或跳轉(zhuǎn)到錯誤頁面.?
2.秒殺業(yè)務(wù)根據(jù)消息隊列中的請求信息,再做后續(xù)處理.
以上內(nèi)容的來源是:https://blog.csdn.net/whoamiyang/article/details/54954780,在此感謝
2.各種消息中間件性能的比較:
TPS比較 一ZeroMq 最好,RabbitMq 次之, ActiveMq 最差。
持久化消息比較—zeroMq不支持,activeMq和rabbitMq都支持。持久化消息主要是指:MQ down或者M(jìn)Q所在的服務(wù)器down了,消息不會丟失的機(jī)制。
可靠性、靈活的路由、集群、事務(wù)、高可用的隊列、消息排序、問題追蹤、可視化管理工具、插件系統(tǒng)、社區(qū)—RabbitMq最好,ActiveMq次之,ZeroMq最差。
高并發(fā)—從實現(xiàn)語言來看,RabbitMQ最高,原因是它的實現(xiàn)語言是天生具備高并發(fā)高可用的erlang語言。
綜上所述:RabbitMQ的性能相對來說更好更全面,是消息中間件的首選。
3.接下來我們在springboot當(dāng)中整合使用RabbitMQ
第一步:導(dǎo)入maven依賴
org.springframework.boot
spring-boot-starter-amqp
1.5.2.RELEASE
第二步:在application.properties文件當(dāng)中引入RabbitMQ基本的配置信息
#對于rabbitMQ的支持
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest第三步:編寫RabbitConfig類,類里面設(shè)置很多個EXCHANGE,QUEUE,ROUTINGKEY,是為了接下來的不同使用場景。
/**
Broker:它提供一種傳輸服務(wù),它的角色就是維護(hù)一條從生產(chǎn)者到消費者的路線,保證數(shù)據(jù)能按照指定的方式進(jìn)行傳輸,
Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個隊列。
Queue:消息的載體,每個消息都會被投到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來.
Routing Key:路由關(guān)鍵字,exchange根據(jù)這個關(guān)鍵字進(jìn)行消息投遞。
vhost:虛擬主機(jī),一個broker里可以有多個vhost,用作不同用戶的權(quán)限分離。
Producer:消息生產(chǎn)者,就是投遞消息的程序.
Consumer:消息消費者,就是接受消息的程序.
Channel:消息通道,在客戶端的每個連接里,可建立多個channel.
*/
@Configuration
public?class?RabbitConfig?{
?
private?final?Logger?logger?=?LoggerFactory.getLogger(this.getClass());
?
@Value("${spring.rabbitmq.host}")
private?String?host;
?
@Value("${spring.rabbitmq.port}")
private?int?port;
?
@Value("${spring.rabbitmq.username}")
private?String?username;
?
@Value("${spring.rabbitmq.password}")
private?String?password;
?
?
public?static?final?String?EXCHANGE_A?=?"my-mq-exchange_A";
public?static?final?String?EXCHANGE_B?=?"my-mq-exchange_B";
public?static?final?String?EXCHANGE_C?=?"my-mq-exchange_C";
?
?
public?static?final?String?QUEUE_A?=?"QUEUE_A";
public?static?final?String?QUEUE_B?=?"QUEUE_B";
public?static?final?String?QUEUE_C?=?"QUEUE_C";
?
public?static?final?String?ROUTINGKEY_A?=?"spring-boot-routingKey_A";
public?static?final?String?ROUTINGKEY_B?=?"spring-boot-routingKey_B";
public?static?final?String?ROUTINGKEY_C?=?"spring-boot-routingKey_C";
?
@Bean
public?ConnectionFactory?connectionFactory()?{
CachingConnectionFactory?connectionFactory?=?new?CachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);
return?connectionFactory;
}
?
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必須是prototype類型
public?RabbitTemplate?rabbitTemplate()?{
RabbitTemplate?template?=?new?RabbitTemplate(connectionFactory());
return?template;
}
}
第四步:編寫消息的生產(chǎn)者
@Component
public?class?MsgProducer?implements?RabbitTemplate.ConfirmCallback?{
?
private?final?Logger?logger?=?LoggerFactory.getLogger(this.getClass());
?
//由于rabbitTemplate的scope屬性設(shè)置為ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自動注入
private?RabbitTemplate?rabbitTemplate;
/**
*?構(gòu)造方法注入rabbitTemplate
*/
@Autowired
public?MsgProducer(RabbitTemplate?rabbitTemplate)?{
this.rabbitTemplate?=?rabbitTemplate;
rabbitTemplate.setConfirmCallback(this);?//rabbitTemplate如果為單例的話,那回調(diào)就是最后設(shè)置的內(nèi)容
}
?
public?void?sendMsg(String?content)?{
CorrelationData?correlationId?=?new?CorrelationData(UUID.randomUUID().toString());
//把消息放入ROUTINGKEY_A對應(yīng)的隊列當(dāng)中去,對應(yīng)的是隊列A
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A,?RabbitConfig.ROUTINGKEY_A,?content,?correlationId);
}
/**
*?回調(diào)
*/
@Override
public?void?confirm(CorrelationData?correlationData,?boolean?ack,?String?cause)?{
logger.info("?回調(diào)id:"?+?correlationData);
if?(ack)?{
logger.info("消息成功消費");
}?else?{
logger.info("消息消費失敗:"?+?cause);
}
}
}第五步:把交換機(jī),隊列,通過路由關(guān)鍵字進(jìn)行綁定,寫在RabbitConfig類當(dāng)中
/**
*?針對消費者配置
*?1.?設(shè)置交換機(jī)類型
*?2.?將隊列綁定到交換機(jī)
FanoutExchange:?將消息分發(fā)到所有的綁定隊列,無routingkey的概念
HeadersExchange :通過添加屬性key-value匹配
DirectExchange:按照routingkey分發(fā)到指定隊列
TopicExchange:多關(guān)鍵字匹配
*/
@Bean
public?DirectExchange?defaultExchange()?{
return?new?DirectExchange(EXCHANGE_A);
}
/**
*?獲取隊列A
*?@return
*/
@Bean
public?Queue?queueA()?{
return?new?Queue(QUEUE_A,?true);?//隊列持久
}
@Bean
public?Binding?binding()?{
?
return?BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
}一個交換機(jī)可以綁定多個消息隊列,也就是消息通過一個交換機(jī),可以分發(fā)到不同的隊列當(dāng)中去。
@Bean
public?Binding?binding()?{
return?BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
}
@Bean
public?Binding?bindingB(){
return?BindingBuilder.bind(queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_B);
}第六步:編寫消息的消費者,這一步也是最復(fù)雜的,因為可以編寫出很多不同的需求出來,寫法也有很多的不同。
? ? 比如一個生產(chǎn)者,一個消費者
@Component
@RabbitListener(queues?=?RabbitConfig.QUEUE_A)
public?class?MsgReceiver?{
?
private?final?Logger?logger?=?LoggerFactory.getLogger(this.getClass());
?
@RabbitHandler
public?void?process(String?content)?{
logger.info("接收處理隊列A當(dāng)中的消息:?"?+?content);
}
?
}

比如一個生產(chǎn)者,多個消費者,可以寫多個消費者,并且他們的分發(fā)是負(fù)載均衡的。
@Component
@RabbitListener(queues?=?RabbitConfig.QUEUE_A)
public?class?MsgReceiverC_one?{
private?final?Logger?logger?=?LoggerFactory.getLogger(this.getClass());
?
@RabbitHandler
public?void?process(String?content)?{
logger.info("處理器one接收處理隊列A當(dāng)中的消息:?"?+?content);
}
}
@Component
@RabbitListener(queues?=?RabbitConfig.QUEUE_A)
public?class?MsgReceiverC_two?{
private?final?Logger?logger?=?LoggerFactory.getLogger(this.getClass());
@RabbitHandler
public?void?process(String?content)?{
logger.info("處理器two接收處理隊列A當(dāng)中的消息:?"?+?content);
}
?
}

另外一種消息處理機(jī)制的寫法如下,在RabbitMQConfig類里面增加bean:
@Bean
public?SimpleMessageListenerContainer?messageContainer()?{
//加載處理消息A的隊列
SimpleMessageListenerContainer?container?=?new?SimpleMessageListenerContainer(connectionFactory());
//設(shè)置接收多個隊列里面的消息,這里設(shè)置接收隊列A
//假如想一個消費者處理多個隊列里面的信息可以如下設(shè)置:
//container.setQueues(queueA(),queueB(),queueC());
container.setQueues(queueA());
container.setExposeListenerChannel(true);
//設(shè)置最大的并發(fā)的消費者數(shù)量
container.setMaxConcurrentConsumers(10);
//最小的并發(fā)消費者的數(shù)量
container.setConcurrentConsumers(1);
//設(shè)置確認(rèn)模式手工確認(rèn)
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new?ChannelAwareMessageListener()?{
@Override
public?void?onMessage(Message?message,?Channel?channel)?throws?Exception?{
/**通過basic.qos方法設(shè)置prefetch_count=1,這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message,
換句話說,在接收到該Consumer的ack前,它不會將新的Message分發(fā)給它?*/
channel.basicQos(1);
byte[]?body?=?message.getBody();
logger.info("接收處理隊列A當(dāng)中的消息:"?+?new?String(body));
/**為了保證永遠(yuǎn)不會丟失消息,RabbitMQ支持消息應(yīng)答機(jī)制。
當(dāng)消費者接收到消息并完成任務(wù)后會往RabbitMQ服務(wù)器發(fā)送一條確認(rèn)的命令,然后RabbitMQ才會將消息刪除。*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(),?false);
}
});
return?container;
}下面是當(dāng)一個消費者,處理多個隊列里面的信息打印的log

?
Fanout Exchange
Fanout 就是我們熟悉的廣播模式,給Fanout交換機(jī)發(fā)送消息,綁定了這個交換機(jī)的所有隊列都收到這個消息。
//配置fanout_exchange
@Bean
FanoutExchange?fanoutExchange()?{
return?new?FanoutExchange(RabbitConfig.FANOUT_EXCHANGE);
}
?
//把所有的隊列都綁定到這個交換機(jī)上去
@Bean
Binding?bindingExchangeA(Queue?queueA,FanoutExchange?fanoutExchange)?{
return?BindingBuilder.bind(queueA).to(fanoutExchange);
}
@Bean
Binding?bindingExchangeB(Queue?queueB,?FanoutExchange?fanoutExchange)?{
return?BindingBuilder.bind(queueB).to(fanoutExchange);
}
@Bean
Binding?bindingExchangeC(Queue?queueC,?FanoutExchange?fanoutExchange)?{
return?BindingBuilder.bind(queueC).to(fanoutExchange);
}消息發(fā)送,這里不設(shè)置routing_key,因為設(shè)置了也無效,發(fā)送端的routing_key寫任何字符都會被忽略。
public?void?sendAll(String?content)?{
rabbitTemplate.convertAndSend("fanoutExchange","",?content);
}消息處理的結(jié)果如下所示:

?
粉絲福利:Java從入門到入土學(xué)習(xí)路線圖
???

?長按上方微信二維碼?2 秒
感謝點贊支持下哈?
