<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spring Boot整合RabbitMQ詳細(xì)教程

          共 10009字,需瀏覽 21分鐘

           ·

          2021-03-19 12:26

          超全面!Java核心知識(shí)總結(jié)(點(diǎn)擊查看)

          超全面!Java核心知識(shí)總結(jié)(點(diǎn)擊查看)

          來(lái)源 |  urlify.cn/MZvAry


          1.首先我們簡(jiǎn)單了解一下消息中間件的應(yīng)用場(chǎng)景

          異步處理

          場(chǎng)景說(shuō)明:用戶注冊(cè)后,需要發(fā)注冊(cè)郵件和注冊(cè)短信,傳統(tǒng)的做法有兩種1.串行的方式;2.并行的方式 
          (1)串行方式:將注冊(cè)信息寫(xiě)入數(shù)據(jù)庫(kù)后,發(fā)送注冊(cè)郵件,再發(fā)送注冊(cè)短信,以上三個(gè)任務(wù)全部完成后才返回給客戶端。這有一個(gè)問(wèn)題是,郵件,短信并不是必須的,它只是一個(gè)通知,而這種做法讓客戶端等待沒(méi)有必要等待的東西.

          (2)并行方式:將注冊(cè)信息寫(xiě)入數(shù)據(jù)庫(kù)后,發(fā)送郵件的同時(shí),發(fā)送短信,以上三個(gè)任務(wù)完成后,返回給客戶端,并行的方式能提高處理的時(shí)間。 
           
          假設(shè)三個(gè)業(yè)務(wù)節(jié)點(diǎn)分別使用50ms,串行方式使用時(shí)間150ms,并行使用時(shí)間100ms。雖然并性已經(jīng)提高的處理時(shí)間,但是,前面說(shuō)過(guò),郵件和短信對(duì)我正常的使用網(wǎng)站沒(méi)有任何影響,客戶端沒(méi)有必要等著其發(fā)送完成才顯示注冊(cè)成功,應(yīng)該是寫(xiě)入數(shù)據(jù)庫(kù)后就返回.
          (3)消息隊(duì)列 
          引入消息隊(duì)列后,把發(fā)送郵件,短信不是必須的業(yè)務(wù)邏輯異步處理 
           
          由此可以看出,引入消息隊(duì)列后,用戶的響應(yīng)時(shí)間就等于寫(xiě)入數(shù)據(jù)庫(kù)的時(shí)間+寫(xiě)入消息隊(duì)列的時(shí)間(可以忽略不計(jì)),引入消息隊(duì)列后處理后,響應(yīng)時(shí)間是串行的3倍,是并行的2倍。

           應(yīng)用解耦

          場(chǎng)景:雙11是購(gòu)物狂節(jié),用戶下單后,訂單系統(tǒng)需要通知庫(kù)存系統(tǒng),傳統(tǒng)的做法就是訂單系統(tǒng)調(diào)用庫(kù)存系統(tǒng)的接口.
           
          這種做法有一個(gè)缺點(diǎn):

          • 當(dāng)庫(kù)存系統(tǒng)出現(xiàn)故障時(shí),訂單就會(huì)失敗。

          • 訂單系統(tǒng)和庫(kù)存系統(tǒng)高耦合. 
            引入消息隊(duì)列 

          • 訂單系統(tǒng):用戶下單后,訂單系統(tǒng)完成持久化處理,將消息寫(xiě)入消息隊(duì)列,返回用戶訂單下單成功。

          • 庫(kù)存系統(tǒng):訂閱下單的消息,獲取下單消息,進(jìn)行庫(kù)操作。 
            就算庫(kù)存系統(tǒng)出現(xiàn)故障,消息隊(duì)列也能保證消息的可靠投遞,不會(huì)導(dǎo)致消息丟失。

          流量削峰

          流量削峰一般在秒殺活動(dòng)中應(yīng)用廣泛 
          場(chǎng)景:秒殺活動(dòng),一般會(huì)因?yàn)榱髁窟^(guò)大,導(dǎo)致應(yīng)用掛掉,為了解決這個(gè)問(wèn)題,一般在應(yīng)用前端加入消息隊(duì)列。 
          作用: 
          1.可以控制活動(dòng)人數(shù),超過(guò)此一定閥值的訂單直接丟棄(我為什么秒殺一次都沒(méi)有成功過(guò)呢^^) 
          2.可以緩解短時(shí)間的高流量壓垮應(yīng)用(應(yīng)用程序按自己的最大處理能力獲取訂單) 
           
          1.用戶的請(qǐng)求,服務(wù)器收到之后,首先寫(xiě)入消息隊(duì)列,加入消息隊(duì)列長(zhǎng)度超過(guò)最大值,則直接拋棄用戶請(qǐng)求或跳轉(zhuǎn)到錯(cuò)誤頁(yè)面. 

          2.秒殺業(yè)務(wù)根據(jù)消息隊(duì)列中的請(qǐng)求信息,再做后續(xù)處理.

          以上內(nèi)容的來(lái)源是:https://blog.csdn.net/whoamiyang/article/details/54954780,在此感謝

          2.各種消息中間件性能的比較:

          TPS比較 一ZeroMq 最好,RabbitMq 次之, ActiveMq 最差。

          持久化消息比較—zeroMq不支持,activeMq和rabbitMq都支持。持久化消息主要是指:MQ down或者M(jìn)Q所在的服務(wù)器down了,消息不會(huì)丟失的機(jī)制。

          可靠性、靈活的路由、集群、事務(wù)、高可用的隊(duì)列、消息排序、問(wèn)題追蹤、可視化管理工具、插件系統(tǒng)、社區(qū)—RabbitMq最好,ActiveMq次之,ZeroMq最差。

          高并發(fā)—從實(shí)現(xiàn)語(yǔ)言來(lái)看,RabbitMQ最高,原因是它的實(shí)現(xiàn)語(yǔ)言是天生具備高并發(fā)高可用的erlang語(yǔ)言。

          綜上所述:RabbitMQ的性能相對(duì)來(lái)說(shuō)更好更全面,是消息中間件的首選。

          3.接下來(lái)我們?cè)趕pringboot當(dāng)中整合使用RabbitMQ

          第一步:導(dǎo)入maven依賴

          <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
          <version>1.5.2.RELEASE</version>
          </dependency>

          第二步:在application.properties文件當(dāng)中引入RabbitMQ基本的配置信息

          #對(duì)于rabbitMQ的支持
          spring.rabbitmq.host=127.0.0.1
          spring.rabbitmq.port=5672
          spring.rabbitmq.username=guest
          spring.rabbitmq.password=guest

          第三步:編寫(xiě)RabbitConfig類(lèi),類(lèi)里面設(shè)置很多個(gè)EXCHANGE,QUEUE,ROUTINGKEY,是為了接下來(lái)的不同使用場(chǎng)景。

          /**
          Broker:它提供一種傳輸服務(wù),它的角色就是維護(hù)一條從生產(chǎn)者到消費(fèi)者的路線,保證數(shù)據(jù)能按照指定的方式進(jìn)行傳輸,
          Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列。
          Queue:消息的載體,每個(gè)消息都會(huì)被投到一個(gè)或多個(gè)隊(duì)列。
          Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來(lái).
          Routing Key:路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞。
          vhost:虛擬主機(jī),一個(gè)broker里可以有多個(gè)vhost,用作不同用戶的權(quán)限分離。
          Producer:消息生產(chǎn)者,就是投遞消息的程序.
          Consumer:消息消費(fèi)者,就是接受消息的程序.
          Channel:消息通道,在客戶端的每個(gè)連接里,可建立多個(gè)channel.
          */
          @Configuration
          public class RabbitConfig {
           
          private final Logger logger = LoggerFactory.getLogger(this.getClass());
           
          @Value("${spring.rabbitmq.host}")
          private String host;
           
          @Value("${spring.rabbitmq.port}")
          private int port;
           
          @Value("${spring.rabbitmq.username}")
          private String username;
           
          @Value("${spring.rabbitmq.password}")
          private String password;
           
           
          public static final String EXCHANGE_A = "my-mq-exchange_A";
          public static final String EXCHANGE_B = "my-mq-exchange_B";
          public static final String EXCHANGE_C = "my-mq-exchange_C";
           
           
          public static final String QUEUE_A = "QUEUE_A";
          public static final String QUEUE_B = "QUEUE_B";
          public static final String QUEUE_C = "QUEUE_C";
           
          public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
          public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";
          public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";
           
          @Bean
          public ConnectionFactory connectionFactory() {
          CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
          connectionFactory.setUsername(username);
          connectionFactory.setPassword(password);
          connectionFactory.setVirtualHost("/");
          connectionFactory.setPublisherConfirms(true);
          return connectionFactory;
          }
           
          @Bean
          @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
          //必須是prototype類(lèi)型
          public RabbitTemplate rabbitTemplate() {
          RabbitTemplate template = new RabbitTemplate(connectionFactory());
          return template;
          }
          }

          第四步:編寫(xiě)消息的生產(chǎn)者

          @Component
          public class MsgProducer implements RabbitTemplate.ConfirmCallback {
           
          private final Logger logger = LoggerFactory.getLogger(this.getClass());
           
          //由于rabbitTemplate的scope屬性設(shè)置為ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自動(dòng)注入
          private RabbitTemplate rabbitTemplate;
          /**
          * 構(gòu)造方法注入rabbitTemplate
          */
          @Autowired
          public MsgProducer(RabbitTemplate rabbitTemplate) {
          this.rabbitTemplate = rabbitTemplate;
          rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果為單例的話,那回調(diào)就是最后設(shè)置的內(nèi)容
          }
           
          public void sendMsg(String content) {
          CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
          //把消息放入ROUTINGKEY_A對(duì)應(yīng)的隊(duì)列當(dāng)中去,對(duì)應(yīng)的是隊(duì)列A
          rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId);
          }
          /**
          * 回調(diào)
          */
          @Override
          public void confirm(CorrelationData correlationData, boolean ack, String cause) {
          logger.info(" 回調(diào)id:" + correlationData);
          if (ack) {
          logger.info("消息成功消費(fèi)");
          else {
          logger.info("消息消費(fèi)失敗:" + cause);
          }
          }
          }

          第五步:把交換機(jī),隊(duì)列,通過(guò)路由關(guān)鍵字進(jìn)行綁定,寫(xiě)在RabbitConfig類(lèi)當(dāng)中

          /**
          * 針對(duì)消費(fèi)者配置
          * 1. 設(shè)置交換機(jī)類(lèi)型
          * 2. 將隊(duì)列綁定到交換機(jī)
          FanoutExchange: 將消息分發(fā)到所有的綁定隊(duì)列,無(wú)routingkey的概念
          HeadersExchange :通過(guò)添加屬性key-value匹配
          DirectExchange:按照routingkey分發(fā)到指定隊(duì)列
          TopicExchange:多關(guān)鍵字匹配
          */
          @Bean
          public DirectExchange defaultExchange() {
          return new DirectExchange(EXCHANGE_A);
          }
          /**
          * 獲取隊(duì)列A
          * @return
          */
          @Bean
          public Queue queueA() {
          return new Queue(QUEUE_A, true); //隊(duì)列持久
          }
          @Bean
          public Binding binding() {
           
          return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
          }

          一個(gè)交換機(jī)可以綁定多個(gè)消息隊(duì)列,也就是消息通過(guò)一個(gè)交換機(jī),可以分發(fā)到不同的隊(duì)列當(dāng)中去。

          @Bean
          public Binding binding() {
          return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
          }
          @Bean
          public Binding bindingB(){
          return BindingBuilder.bind(queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_B);
          }

          第六步:編寫(xiě)消息的消費(fèi)者,這一步也是最復(fù)雜的,因?yàn)榭梢跃帉?xiě)出很多不同的需求出來(lái),寫(xiě)法也有很多的不同。

              比如一個(gè)生產(chǎn)者,一個(gè)消費(fèi)者

          @Component
          @RabbitListener(queues = RabbitConfig.QUEUE_A)
          public class MsgReceiver {
           
          private final Logger logger = LoggerFactory.getLogger(this.getClass());
           
          @RabbitHandler
          public void process(String content) {
          logger.info("接收處理隊(duì)列A當(dāng)中的消息: " + content);
          }
           
          }

          比如一個(gè)生產(chǎn)者,多個(gè)消費(fèi)者,可以寫(xiě)多個(gè)消費(fèi)者,并且他們的分發(fā)是負(fù)載均衡的。

          @Component
          @RabbitListener(queues = RabbitConfig.QUEUE_A)
          public class MsgReceiverC_one {
          private final Logger logger = LoggerFactory.getLogger(this.getClass());
           
          @RabbitHandler
          public void process(String content) {
          logger.info("處理器one接收處理隊(duì)列A當(dāng)中的消息: " + content);
          }
          }
          @Component
          @RabbitListener(queues = RabbitConfig.QUEUE_A)
          public class MsgReceiverC_two {
          private final Logger logger = LoggerFactory.getLogger(this.getClass());
          @RabbitHandler
          public void process(String content) {
          logger.info("處理器two接收處理隊(duì)列A當(dāng)中的消息: " + content);
          }
           
          }


          另外一種消息處理機(jī)制的寫(xiě)法如下,在RabbitMQConfig類(lèi)里面增加bean:

          @Bean
          public SimpleMessageListenerContainer messageContainer() {
          //加載處理消息A的隊(duì)列
          SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
          //設(shè)置接收多個(gè)隊(duì)列里面的消息,這里設(shè)置接收隊(duì)列A
          //假如想一個(gè)消費(fèi)者處理多個(gè)隊(duì)列里面的信息可以如下設(shè)置:
          //container.setQueues(queueA(),queueB(),queueC());
          container.setQueues(queueA());
          container.setExposeListenerChannel(true);
          //設(shè)置最大的并發(fā)的消費(fèi)者數(shù)量
          container.setMaxConcurrentConsumers(10);
          //最小的并發(fā)消費(fèi)者的數(shù)量
          container.setConcurrentConsumers(1);
          //設(shè)置確認(rèn)模式手工確認(rèn)
          container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
          container.setMessageListener(new ChannelAwareMessageListener() {
          @Override
          public void onMessage(Message message, Channel channel) throws Exception {
          /**通過(guò)basic.qos方法設(shè)置prefetch_count=1,這樣RabbitMQ就會(huì)使得每個(gè)Consumer在同一個(gè)時(shí)間點(diǎn)最多處理一個(gè)Message,
          換句話說(shuō),在接收到該Consumer的ack前,它不會(huì)將新的Message分發(fā)給它 */
          channel.basicQos(1);
          byte[] body = message.getBody();
          logger.info("接收處理隊(duì)列A當(dāng)中的消息:" + new String(body));
          /**為了保證永遠(yuǎn)不會(huì)丟失消息,RabbitMQ支持消息應(yīng)答機(jī)制。
          當(dāng)消費(fèi)者接收到消息并完成任務(wù)后會(huì)往RabbitMQ服務(wù)器發(fā)送一條確認(rèn)的命令,然后RabbitMQ才會(huì)將消息刪除。*/
          channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
          }
          });
          return container;
          }

          下面是當(dāng)一個(gè)消費(fèi)者,處理多個(gè)隊(duì)列里面的信息打印的log

           

          Fanout Exchange

          Fanout 就是我們熟悉的廣播模式,給Fanout交換機(jī)發(fā)送消息,綁定了這個(gè)交換機(jī)的所有隊(duì)列都收到這個(gè)消息。

          //配置fanout_exchange
          @Bean
          FanoutExchange fanoutExchange() {
          return new FanoutExchange(RabbitConfig.FANOUT_EXCHANGE);
          }
           
          //把所有的隊(duì)列都綁定到這個(gè)交換機(jī)上去
          @Bean
          Binding bindingExchangeA(Queue queueA,FanoutExchange fanoutExchange) {
          return BindingBuilder.bind(queueA).to(fanoutExchange);
          }
          @Bean
          Binding bindingExchangeB(Queue queueB, FanoutExchange fanoutExchange) {
          return BindingBuilder.bind(queueB).to(fanoutExchange);
          }
          @Bean
          Binding bindingExchangeC(Queue queueC, FanoutExchange fanoutExchange) {
          return BindingBuilder.bind(queueC).to(fanoutExchange);
          }

          消息發(fā)送,這里不設(shè)置routing_key,因?yàn)樵O(shè)置了也無(wú)效,發(fā)送端的routing_key寫(xiě)任何字符都會(huì)被忽略。

          public void sendAll(String content) {
          rabbitTemplate.convertAndSend("fanoutExchange","", content);
          }

          消息處理的結(jié)果如下所示:


          如有文章對(duì)你有幫助,

          在看”和轉(zhuǎn)發(fā)是對(duì)我最大的支持!


          推薦 GitHub 書(shū)籍倉(cāng)庫(kù)
          https://github.com/ebooklist/awesome-ebooks-list

          整理了大部分常用 技術(shù)書(shū)籍PDF,持續(xù)更新中... 你需要的技術(shù)書(shū)籍,這里可能都有...


          點(diǎn)擊文末“閱讀原文”可直達(dá)

          整理不易,麻煩各位小伙伴在GitHub中來(lái)個(gè)Star支持一下

          瀏覽 53
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  无码欧美成人 | 91成人电影免费 | 亚洲中国无码 | 超碰自拍9 | 欧美精品久久久久久 |