<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spring Boot整合RabbitMQ詳細(xì)教程

          共 8186字,需瀏覽 17分鐘

           ·

          2020-12-28 03:28

          點擊上方藍(lán)色字體,選擇“標(biāo)星公眾號”

          優(yōu)質(zhì)文章,第一時間送達(dá)

          ? 作者?|??小小丑年紀(jì)

          來源 |? urlify.cn/MZvAry

          66套java從入門到精通實戰(zhàn)課程分享

          1.首先我們簡單了解一下消息中間件的應(yīng)用場景

          異步處理

          場景說明:用戶注冊后,需要發(fā)注冊郵件和注冊短信,傳統(tǒng)的做法有兩種1.串行的方式;2.并行的方式?
          (1)串行方式:將注冊信息寫入數(shù)據(jù)庫后,發(fā)送注冊郵件,再發(fā)送注冊短信,以上三個任務(wù)全部完成后才返回給客戶端。這有一個問題是,郵件,短信并不是必須的,它只是一個通知,而這種做法讓客戶端等待沒有必要等待的東西.

          (2)并行方式:將注冊信息寫入數(shù)據(jù)庫后,發(fā)送郵件的同時,發(fā)送短信,以上三個任務(wù)完成后,返回給客戶端,并行的方式能提高處理的時間。?
          ?
          假設(shè)三個業(yè)務(wù)節(jié)點分別使用50ms,串行方式使用時間150ms,并行使用時間100ms。雖然并性已經(jīng)提高的處理時間,但是,前面說過,郵件和短信對我正常的使用網(wǎng)站沒有任何影響,客戶端沒有必要等著其發(fā)送完成才顯示注冊成功,應(yīng)該是寫入數(shù)據(jù)庫后就返回.
          (3)消息隊列?
          引入消息隊列后,把發(fā)送郵件,短信不是必須的業(yè)務(wù)邏輯異步處理?
          ?
          由此可以看出,引入消息隊列后,用戶的響應(yīng)時間就等于寫入數(shù)據(jù)庫的時間+寫入消息隊列的時間(可以忽略不計),引入消息隊列后處理后,響應(yīng)時間是串行的3倍,是并行的2倍。

          ?應(yīng)用解耦

          場景:雙11是購物狂節(jié),用戶下單后,訂單系統(tǒng)需要通知庫存系統(tǒng),傳統(tǒng)的做法就是訂單系統(tǒng)調(diào)用庫存系統(tǒng)的接口.
          ?
          這種做法有一個缺點:

          • 當(dāng)庫存系統(tǒng)出現(xiàn)故障時,訂單就會失敗。

          • 訂單系統(tǒng)和庫存系統(tǒng)高耦合.?
            引入消息隊列?

          • 訂單系統(tǒng):用戶下單后,訂單系統(tǒng)完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。

          • 庫存系統(tǒng):訂閱下單的消息,獲取下單消息,進(jìn)行庫操作。?
            就算庫存系統(tǒng)出現(xiàn)故障,消息隊列也能保證消息的可靠投遞,不會導(dǎo)致消息丟失。

          流量削峰

          流量削峰一般在秒殺活動中應(yīng)用廣泛?
          場景:秒殺活動,一般會因為流量過大,導(dǎo)致應(yīng)用掛掉,為了解決這個問題,一般在應(yīng)用前端加入消息隊列。?
          作用:?
          1.可以控制活動人數(shù),超過此一定閥值的訂單直接丟棄(我為什么秒殺一次都沒有成功過呢^^)?
          2.可以緩解短時間的高流量壓垮應(yīng)用(應(yīng)用程序按自己的最大處理能力獲取訂單)?
          ?
          1.用戶的請求,服務(wù)器收到之后,首先寫入消息隊列,加入消息隊列長度超過最大值,則直接拋棄用戶請求或跳轉(zhuǎn)到錯誤頁面.?

          2.秒殺業(yè)務(wù)根據(jù)消息隊列中的請求信息,再做后續(xù)處理.

          以上內(nèi)容的來源是:https://blog.csdn.net/whoamiyang/article/details/54954780,在此感謝

          2.各種消息中間件性能的比較:

          TPS比較 一ZeroMq 最好,RabbitMq 次之, ActiveMq 最差。

          持久化消息比較—zeroMq不支持,activeMq和rabbitMq都支持。持久化消息主要是指:MQ down或者M(jìn)Q所在的服務(wù)器down了,消息不會丟失的機(jī)制。

          可靠性、靈活的路由、集群、事務(wù)、高可用的隊列、消息排序、問題追蹤、可視化管理工具、插件系統(tǒng)、社區(qū)—RabbitMq最好,ActiveMq次之,ZeroMq最差。

          高并發(fā)—從實現(xiàn)語言來看,RabbitMQ最高,原因是它的實現(xiàn)語言是天生具備高并發(fā)高可用的erlang語言。

          綜上所述:RabbitMQ的性能相對來說更好更全面,是消息中間件的首選。

          3.接下來我們在springboot當(dāng)中整合使用RabbitMQ

          第一步:導(dǎo)入maven依賴


          org.springframework.boot
          spring-boot-starter-amqp
          1.5.2.RELEASE

          第二步:在application.properties文件當(dāng)中引入RabbitMQ基本的配置信息

          #對于rabbitMQ的支持
          spring.rabbitmq.host=127.0.0.1
          spring.rabbitmq.port=5672
          spring.rabbitmq.username=guest
          spring.rabbitmq.password=guest

          第三步:編寫RabbitConfig類,類里面設(shè)置很多個EXCHANGE,QUEUE,ROUTINGKEY,是為了接下來的不同使用場景。

          /**
          Broker:它提供一種傳輸服務(wù),它的角色就是維護(hù)一條從生產(chǎn)者到消費者的路線,保證數(shù)據(jù)能按照指定的方式進(jìn)行傳輸,
          Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個隊列。
          Queue:消息的載體,每個消息都會被投到一個或多個隊列。
          Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來.
          Routing Key:路由關(guān)鍵字,exchange根據(jù)這個關(guān)鍵字進(jìn)行消息投遞。
          vhost:虛擬主機(jī),一個broker里可以有多個vhost,用作不同用戶的權(quán)限分離。
          Producer:消息生產(chǎn)者,就是投遞消息的程序.
          Consumer:消息消費者,就是接受消息的程序.
          Channel:消息通道,在客戶端的每個連接里,可建立多個channel.
          */
          @Configuration
          public?class?RabbitConfig?{
          ?
          private?final?Logger?logger?=?LoggerFactory.getLogger(this.getClass());
          ?
          @Value("${spring.rabbitmq.host}")
          private?String?host;
          ?
          @Value("${spring.rabbitmq.port}")
          private?int?port;
          ?
          @Value("${spring.rabbitmq.username}")
          private?String?username;
          ?
          @Value("${spring.rabbitmq.password}")
          private?String?password;
          ?
          ?
          public?static?final?String?EXCHANGE_A?=?"my-mq-exchange_A";
          public?static?final?String?EXCHANGE_B?=?"my-mq-exchange_B";
          public?static?final?String?EXCHANGE_C?=?"my-mq-exchange_C";
          ?
          ?
          public?static?final?String?QUEUE_A?=?"QUEUE_A";
          public?static?final?String?QUEUE_B?=?"QUEUE_B";
          public?static?final?String?QUEUE_C?=?"QUEUE_C";
          ?
          public?static?final?String?ROUTINGKEY_A?=?"spring-boot-routingKey_A";
          public?static?final?String?ROUTINGKEY_B?=?"spring-boot-routingKey_B";
          public?static?final?String?ROUTINGKEY_C?=?"spring-boot-routingKey_C";
          ?
          @Bean
          public?ConnectionFactory?connectionFactory()?{
          CachingConnectionFactory?connectionFactory?=?new?CachingConnectionFactory(host,port);
          connectionFactory.setUsername(username);
          connectionFactory.setPassword(password);
          connectionFactory.setVirtualHost("/");
          connectionFactory.setPublisherConfirms(true);
          return?connectionFactory;
          }
          ?
          @Bean
          @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
          //必須是prototype類型
          public?RabbitTemplate?rabbitTemplate()?{
          RabbitTemplate?template?=?new?RabbitTemplate(connectionFactory());
          return?template;
          }
          }

          第四步:編寫消息的生產(chǎn)者

          @Component
          public?class?MsgProducer?implements?RabbitTemplate.ConfirmCallback?{
          ?
          private?final?Logger?logger?=?LoggerFactory.getLogger(this.getClass());
          ?
          //由于rabbitTemplate的scope屬性設(shè)置為ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自動注入
          private?RabbitTemplate?rabbitTemplate;
          /**
          *?構(gòu)造方法注入rabbitTemplate
          */
          @Autowired
          public?MsgProducer(RabbitTemplate?rabbitTemplate)?{
          this.rabbitTemplate?=?rabbitTemplate;
          rabbitTemplate.setConfirmCallback(this);?//rabbitTemplate如果為單例的話,那回調(diào)就是最后設(shè)置的內(nèi)容
          }
          ?
          public?void?sendMsg(String?content)?{
          CorrelationData?correlationId?=?new?CorrelationData(UUID.randomUUID().toString());
          //把消息放入ROUTINGKEY_A對應(yīng)的隊列當(dāng)中去,對應(yīng)的是隊列A
          rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A,?RabbitConfig.ROUTINGKEY_A,?content,?correlationId);
          }
          /**
          *?回調(diào)
          */
          @Override
          public?void?confirm(CorrelationData?correlationData,?boolean?ack,?String?cause)?{
          logger.info("?回調(diào)id:"?+?correlationData);
          if?(ack)?{
          logger.info("消息成功消費");
          }?else?{
          logger.info("消息消費失敗:"?+?cause);
          }
          }
          }

          第五步:把交換機(jī),隊列,通過路由關(guān)鍵字進(jìn)行綁定,寫在RabbitConfig類當(dāng)中

          /**
          *?針對消費者配置
          *?1.?設(shè)置交換機(jī)類型
          *?2.?將隊列綁定到交換機(jī)
          FanoutExchange:?將消息分發(fā)到所有的綁定隊列,無routingkey的概念
          HeadersExchange :通過添加屬性key-value匹配
          DirectExchange:按照routingkey分發(fā)到指定隊列
          TopicExchange:多關(guān)鍵字匹配
          */
          @Bean
          public?DirectExchange?defaultExchange()?{
          return?new?DirectExchange(EXCHANGE_A);
          }
          /**
          *?獲取隊列A
          *?@return
          */
          @Bean
          public?Queue?queueA()?{
          return?new?Queue(QUEUE_A,?true);?//隊列持久
          }
          @Bean
          public?Binding?binding()?{
          ?
          return?BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
          }

          一個交換機(jī)可以綁定多個消息隊列,也就是消息通過一個交換機(jī),可以分發(fā)到不同的隊列當(dāng)中去。

          @Bean
          public?Binding?binding()?{
          return?BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
          }
          @Bean
          public?Binding?bindingB(){
          return?BindingBuilder.bind(queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_B);
          }

          第六步:編寫消息的消費者,這一步也是最復(fù)雜的,因為可以編寫出很多不同的需求出來,寫法也有很多的不同。

          ? ? 比如一個生產(chǎn)者,一個消費者

          @Component
          @RabbitListener(queues?=?RabbitConfig.QUEUE_A)
          public?class?MsgReceiver?{
          ?
          private?final?Logger?logger?=?LoggerFactory.getLogger(this.getClass());
          ?
          @RabbitHandler
          public?void?process(String?content)?{
          logger.info("接收處理隊列A當(dāng)中的消息:?"?+?content);
          }
          ?
          }

          比如一個生產(chǎn)者,多個消費者,可以寫多個消費者,并且他們的分發(fā)是負(fù)載均衡的。

          @Component
          @RabbitListener(queues?=?RabbitConfig.QUEUE_A)
          public?class?MsgReceiverC_one?{
          private?final?Logger?logger?=?LoggerFactory.getLogger(this.getClass());
          ?
          @RabbitHandler
          public?void?process(String?content)?{
          logger.info("處理器one接收處理隊列A當(dāng)中的消息:?"?+?content);
          }
          }
          @Component
          @RabbitListener(queues?=?RabbitConfig.QUEUE_A)
          public?class?MsgReceiverC_two?{
          private?final?Logger?logger?=?LoggerFactory.getLogger(this.getClass());
          @RabbitHandler
          public?void?process(String?content)?{
          logger.info("處理器two接收處理隊列A當(dāng)中的消息:?"?+?content);
          }
          ?
          }


          另外一種消息處理機(jī)制的寫法如下,在RabbitMQConfig類里面增加bean:

          @Bean
          public?SimpleMessageListenerContainer?messageContainer()?{
          //加載處理消息A的隊列
          SimpleMessageListenerContainer?container?=?new?SimpleMessageListenerContainer(connectionFactory());
          //設(shè)置接收多個隊列里面的消息,這里設(shè)置接收隊列A
          //假如想一個消費者處理多個隊列里面的信息可以如下設(shè)置:
          //container.setQueues(queueA(),queueB(),queueC());
          container.setQueues(queueA());
          container.setExposeListenerChannel(true);
          //設(shè)置最大的并發(fā)的消費者數(shù)量
          container.setMaxConcurrentConsumers(10);
          //最小的并發(fā)消費者的數(shù)量
          container.setConcurrentConsumers(1);
          //設(shè)置確認(rèn)模式手工確認(rèn)
          container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
          container.setMessageListener(new?ChannelAwareMessageListener()?{
          @Override
          public?void?onMessage(Message?message,?Channel?channel)?throws?Exception?{
          /**通過basic.qos方法設(shè)置prefetch_count=1,這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message,
          換句話說,在接收到該Consumer的ack前,它不會將新的Message分發(fā)給它?*/
          channel.basicQos(1);
          byte[]?body?=?message.getBody();
          logger.info("接收處理隊列A當(dāng)中的消息:"?+?new?String(body));
          /**為了保證永遠(yuǎn)不會丟失消息,RabbitMQ支持消息應(yīng)答機(jī)制。
          當(dāng)消費者接收到消息并完成任務(wù)后會往RabbitMQ服務(wù)器發(fā)送一條確認(rèn)的命令,然后RabbitMQ才會將消息刪除。*/
          channel.basicAck(message.getMessageProperties().getDeliveryTag(),?false);
          }
          });
          return?container;
          }

          下面是當(dāng)一個消費者,處理多個隊列里面的信息打印的log

          ?

          Fanout Exchange

          Fanout 就是我們熟悉的廣播模式,給Fanout交換機(jī)發(fā)送消息,綁定了這個交換機(jī)的所有隊列都收到這個消息。

          //配置fanout_exchange
          @Bean
          FanoutExchange?fanoutExchange()?{
          return?new?FanoutExchange(RabbitConfig.FANOUT_EXCHANGE);
          }
          ?
          //把所有的隊列都綁定到這個交換機(jī)上去
          @Bean
          Binding?bindingExchangeA(Queue?queueA,FanoutExchange?fanoutExchange)?{
          return?BindingBuilder.bind(queueA).to(fanoutExchange);
          }
          @Bean
          Binding?bindingExchangeB(Queue?queueB,?FanoutExchange?fanoutExchange)?{
          return?BindingBuilder.bind(queueB).to(fanoutExchange);
          }
          @Bean
          Binding?bindingExchangeC(Queue?queueC,?FanoutExchange?fanoutExchange)?{
          return?BindingBuilder.bind(queueC).to(fanoutExchange);
          }

          消息發(fā)送,這里不設(shè)置routing_key,因為設(shè)置了也無效,發(fā)送端的routing_key寫任何字符都會被忽略。

          public?void?sendAll(String?content)?{
          rabbitTemplate.convertAndSend("fanoutExchange","",?content);
          }

          消息處理的結(jié)果如下所示:

          ?





          粉絲福利:Java從入門到入土學(xué)習(xí)路線圖

          ???

          ?長按上方微信二維碼?2 秒


          感謝點贊支持下哈?

          瀏覽 50
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  91精品综合久久久久久五月丁香 | 香蕉视频日本免费色老板 | 无码在线豆花 | 后入极品在线 | 日日爱av |