Spring Boot整合RabbitMQ詳細(xì)教程
超全面!Java核心知識(shí)總結(jié)(點(diǎn)擊查看)
超全面!Java核心知識(shí)總結(jié)(點(diǎn)擊查看)
1.首先我們簡(jiǎn)單了解一下消息中間件的應(yīng)用場(chǎng)景
異步處理
場(chǎng)景說(shuō)明:用戶注冊(cè)后,需要發(fā)注冊(cè)郵件和注冊(cè)短信,傳統(tǒng)的做法有兩種1.串行的方式;2.并行的方式
(1)串行方式:將注冊(cè)信息寫(xiě)入數(shù)據(jù)庫(kù)后,發(fā)送注冊(cè)郵件,再發(fā)送注冊(cè)短信,以上三個(gè)任務(wù)全部完成后才返回給客戶端。這有一個(gè)問(wèn)題是,郵件,短信并不是必須的,它只是一個(gè)通知,而這種做法讓客戶端等待沒(méi)有必要等待的東西. 
(2)并行方式:將注冊(cè)信息寫(xiě)入數(shù)據(jù)庫(kù)后,發(fā)送郵件的同時(shí),發(fā)送短信,以上三個(gè)任務(wù)完成后,返回給客戶端,并行的方式能提高處理的時(shí)間。
假設(shè)三個(gè)業(yè)務(wù)節(jié)點(diǎn)分別使用50ms,串行方式使用時(shí)間150ms,并行使用時(shí)間100ms。雖然并性已經(jīng)提高的處理時(shí)間,但是,前面說(shuō)過(guò),郵件和短信對(duì)我正常的使用網(wǎng)站沒(méi)有任何影響,客戶端沒(méi)有必要等著其發(fā)送完成才顯示注冊(cè)成功,應(yīng)該是寫(xiě)入數(shù)據(jù)庫(kù)后就返回.
(3)消息隊(duì)列
引入消息隊(duì)列后,把發(fā)送郵件,短信不是必須的業(yè)務(wù)邏輯異步處理
由此可以看出,引入消息隊(duì)列后,用戶的響應(yīng)時(shí)間就等于寫(xiě)入數(shù)據(jù)庫(kù)的時(shí)間+寫(xiě)入消息隊(duì)列的時(shí)間(可以忽略不計(jì)),引入消息隊(duì)列后處理后,響應(yīng)時(shí)間是串行的3倍,是并行的2倍。
應(yīng)用解耦
場(chǎng)景:雙11是購(gòu)物狂節(jié),用戶下單后,訂單系統(tǒng)需要通知庫(kù)存系統(tǒng),傳統(tǒng)的做法就是訂單系統(tǒng)調(diào)用庫(kù)存系統(tǒng)的接口.
這種做法有一個(gè)缺點(diǎn):
當(dāng)庫(kù)存系統(tǒng)出現(xiàn)故障時(shí),訂單就會(huì)失敗。
訂單系統(tǒng)和庫(kù)存系統(tǒng)高耦合.
引入消息隊(duì)列
訂單系統(tǒng):用戶下單后,訂單系統(tǒng)完成持久化處理,將消息寫(xiě)入消息隊(duì)列,返回用戶訂單下單成功。
庫(kù)存系統(tǒng):訂閱下單的消息,獲取下單消息,進(jìn)行庫(kù)操作。
就算庫(kù)存系統(tǒng)出現(xiàn)故障,消息隊(duì)列也能保證消息的可靠投遞,不會(huì)導(dǎo)致消息丟失。
流量削峰
流量削峰一般在秒殺活動(dòng)中應(yīng)用廣泛
場(chǎng)景:秒殺活動(dòng),一般會(huì)因?yàn)榱髁窟^(guò)大,導(dǎo)致應(yīng)用掛掉,為了解決這個(gè)問(wèn)題,一般在應(yīng)用前端加入消息隊(duì)列。
作用:
1.可以控制活動(dòng)人數(shù),超過(guò)此一定閥值的訂單直接丟棄(我為什么秒殺一次都沒(méi)有成功過(guò)呢^^)
2.可以緩解短時(shí)間的高流量壓垮應(yīng)用(應(yīng)用程序按自己的最大處理能力獲取訂單)
1.用戶的請(qǐng)求,服務(wù)器收到之后,首先寫(xiě)入消息隊(duì)列,加入消息隊(duì)列長(zhǎng)度超過(guò)最大值,則直接拋棄用戶請(qǐng)求或跳轉(zhuǎn)到錯(cuò)誤頁(yè)面.
2.秒殺業(yè)務(wù)根據(jù)消息隊(duì)列中的請(qǐng)求信息,再做后續(xù)處理.
以上內(nèi)容的來(lái)源是:https://blog.csdn.net/whoamiyang/article/details/54954780,在此感謝
2.各種消息中間件性能的比較:
TPS比較 一ZeroMq 最好,RabbitMq 次之, ActiveMq 最差。
持久化消息比較—zeroMq不支持,activeMq和rabbitMq都支持。持久化消息主要是指:MQ down或者M(jìn)Q所在的服務(wù)器down了,消息不會(huì)丟失的機(jī)制。
可靠性、靈活的路由、集群、事務(wù)、高可用的隊(duì)列、消息排序、問(wèn)題追蹤、可視化管理工具、插件系統(tǒng)、社區(qū)—RabbitMq最好,ActiveMq次之,ZeroMq最差。
高并發(fā)—從實(shí)現(xiàn)語(yǔ)言來(lái)看,RabbitMQ最高,原因是它的實(shí)現(xiàn)語(yǔ)言是天生具備高并發(fā)高可用的erlang語(yǔ)言。
綜上所述:RabbitMQ的性能相對(duì)來(lái)說(shuō)更好更全面,是消息中間件的首選。
3.接下來(lái)我們?cè)趕pringboot當(dāng)中整合使用RabbitMQ
第一步:導(dǎo)入maven依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.5.2.RELEASE</version>
</dependency>第二步:在application.properties文件當(dāng)中引入RabbitMQ基本的配置信息
#對(duì)于rabbitMQ的支持
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest第三步:編寫(xiě)RabbitConfig類(lèi),類(lèi)里面設(shè)置很多個(gè)EXCHANGE,QUEUE,ROUTINGKEY,是為了接下來(lái)的不同使用場(chǎng)景。
/**
Broker:它提供一種傳輸服務(wù),它的角色就是維護(hù)一條從生產(chǎn)者到消費(fèi)者的路線,保證數(shù)據(jù)能按照指定的方式進(jìn)行傳輸,
Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列。
Queue:消息的載體,每個(gè)消息都會(huì)被投到一個(gè)或多個(gè)隊(duì)列。
Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來(lái).
Routing Key:路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞。
vhost:虛擬主機(jī),一個(gè)broker里可以有多個(gè)vhost,用作不同用戶的權(quán)限分離。
Producer:消息生產(chǎn)者,就是投遞消息的程序.
Consumer:消息消費(fèi)者,就是接受消息的程序.
Channel:消息通道,在客戶端的每個(gè)連接里,可建立多個(gè)channel.
*/
@Configuration
public class RabbitConfig {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
public static final String EXCHANGE_A = "my-mq-exchange_A";
public static final String EXCHANGE_B = "my-mq-exchange_B";
public static final String EXCHANGE_C = "my-mq-exchange_C";
public static final String QUEUE_A = "QUEUE_A";
public static final String QUEUE_B = "QUEUE_B";
public static final String QUEUE_C = "QUEUE_C";
public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";
public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必須是prototype類(lèi)型
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
}
第四步:編寫(xiě)消息的生產(chǎn)者
@Component
public class MsgProducer implements RabbitTemplate.ConfirmCallback {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
//由于rabbitTemplate的scope屬性設(shè)置為ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自動(dòng)注入
private RabbitTemplate rabbitTemplate;
/**
* 構(gòu)造方法注入rabbitTemplate
*/
@Autowired
public MsgProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果為單例的話,那回調(diào)就是最后設(shè)置的內(nèi)容
}
public void sendMsg(String content) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
//把消息放入ROUTINGKEY_A對(duì)應(yīng)的隊(duì)列當(dāng)中去,對(duì)應(yīng)的是隊(duì)列A
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId);
}
/**
* 回調(diào)
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info(" 回調(diào)id:" + correlationData);
if (ack) {
logger.info("消息成功消費(fèi)");
} else {
logger.info("消息消費(fèi)失敗:" + cause);
}
}
}第五步:把交換機(jī),隊(duì)列,通過(guò)路由關(guān)鍵字進(jìn)行綁定,寫(xiě)在RabbitConfig類(lèi)當(dāng)中
/**
* 針對(duì)消費(fèi)者配置
* 1. 設(shè)置交換機(jī)類(lèi)型
* 2. 將隊(duì)列綁定到交換機(jī)
FanoutExchange: 將消息分發(fā)到所有的綁定隊(duì)列,無(wú)routingkey的概念
HeadersExchange :通過(guò)添加屬性key-value匹配
DirectExchange:按照routingkey分發(fā)到指定隊(duì)列
TopicExchange:多關(guān)鍵字匹配
*/
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE_A);
}
/**
* 獲取隊(duì)列A
* @return
*/
@Bean
public Queue queueA() {
return new Queue(QUEUE_A, true); //隊(duì)列持久
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
}一個(gè)交換機(jī)可以綁定多個(gè)消息隊(duì)列,也就是消息通過(guò)一個(gè)交換機(jī),可以分發(fā)到不同的隊(duì)列當(dāng)中去。
@Bean
public Binding binding() {
return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
}
@Bean
public Binding bindingB(){
return BindingBuilder.bind(queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_B);
}第六步:編寫(xiě)消息的消費(fèi)者,這一步也是最復(fù)雜的,因?yàn)榭梢跃帉?xiě)出很多不同的需求出來(lái),寫(xiě)法也有很多的不同。
比如一個(gè)生產(chǎn)者,一個(gè)消費(fèi)者
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public class MsgReceiver {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@RabbitHandler
public void process(String content) {
logger.info("接收處理隊(duì)列A當(dāng)中的消息: " + content);
}
}

比如一個(gè)生產(chǎn)者,多個(gè)消費(fèi)者,可以寫(xiě)多個(gè)消費(fèi)者,并且他們的分發(fā)是負(fù)載均衡的。
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public class MsgReceiverC_one {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@RabbitHandler
public void process(String content) {
logger.info("處理器one接收處理隊(duì)列A當(dāng)中的消息: " + content);
}
}
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public class MsgReceiverC_two {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@RabbitHandler
public void process(String content) {
logger.info("處理器two接收處理隊(duì)列A當(dāng)中的消息: " + content);
}
}

另外一種消息處理機(jī)制的寫(xiě)法如下,在RabbitMQConfig類(lèi)里面增加bean:
@Bean
public SimpleMessageListenerContainer messageContainer() {
//加載處理消息A的隊(duì)列
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
//設(shè)置接收多個(gè)隊(duì)列里面的消息,這里設(shè)置接收隊(duì)列A
//假如想一個(gè)消費(fèi)者處理多個(gè)隊(duì)列里面的信息可以如下設(shè)置:
//container.setQueues(queueA(),queueB(),queueC());
container.setQueues(queueA());
container.setExposeListenerChannel(true);
//設(shè)置最大的并發(fā)的消費(fèi)者數(shù)量
container.setMaxConcurrentConsumers(10);
//最小的并發(fā)消費(fèi)者的數(shù)量
container.setConcurrentConsumers(1);
//設(shè)置確認(rèn)模式手工確認(rèn)
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
/**通過(guò)basic.qos方法設(shè)置prefetch_count=1,這樣RabbitMQ就會(huì)使得每個(gè)Consumer在同一個(gè)時(shí)間點(diǎn)最多處理一個(gè)Message,
換句話說(shuō),在接收到該Consumer的ack前,它不會(huì)將新的Message分發(fā)給它 */
channel.basicQos(1);
byte[] body = message.getBody();
logger.info("接收處理隊(duì)列A當(dāng)中的消息:" + new String(body));
/**為了保證永遠(yuǎn)不會(huì)丟失消息,RabbitMQ支持消息應(yīng)答機(jī)制。
當(dāng)消費(fèi)者接收到消息并完成任務(wù)后會(huì)往RabbitMQ服務(wù)器發(fā)送一條確認(rèn)的命令,然后RabbitMQ才會(huì)將消息刪除。*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
});
return container;
}下面是當(dāng)一個(gè)消費(fèi)者,處理多個(gè)隊(duì)列里面的信息打印的log

Fanout Exchange
Fanout 就是我們熟悉的廣播模式,給Fanout交換機(jī)發(fā)送消息,綁定了這個(gè)交換機(jī)的所有隊(duì)列都收到這個(gè)消息。
//配置fanout_exchange
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(RabbitConfig.FANOUT_EXCHANGE);
}
//把所有的隊(duì)列都綁定到這個(gè)交換機(jī)上去
@Bean
Binding bindingExchangeA(Queue queueA,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueA).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue queueB, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueB).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue queueC, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueC).to(fanoutExchange);
}消息發(fā)送,這里不設(shè)置routing_key,因?yàn)樵O(shè)置了也無(wú)效,發(fā)送端的routing_key寫(xiě)任何字符都會(huì)被忽略。
public void sendAll(String content) {
rabbitTemplate.convertAndSend("fanoutExchange","", content);
}消息處理的結(jié)果如下所示:

如有文章對(duì)你有幫助,
“在看”和轉(zhuǎn)發(fā)是對(duì)我最大的支持!
推薦, GitHub 書(shū)籍倉(cāng)庫(kù) https://github.com/ebooklist/awesome-ebooks-list 整理了大部分常用 技術(shù)書(shū)籍PDF,持續(xù)更新中... 你需要的技術(shù)書(shū)籍,這里可能都有...
點(diǎn)擊文末“閱讀原文”可直達(dá)
整理不易,麻煩各位小伙伴在GitHub中來(lái)個(gè)Star支持一下

