<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          SpringBoot+RabbitMQ 方式收發(fā)消息

          共 10047字,需瀏覽 21分鐘

           ·

          2020-09-15 15:30

          本文來源:juejin.im/post/6859152029823008781

          本篇會(huì)和SpringBoot做整合,采用自動(dòng)配置的方式進(jìn)行開發(fā),我們只需要聲明RabbitMQ地址就可以了,關(guān)于各種創(chuàng)建連接關(guān)閉連接的事都由Spring幫我們了~

          交給Spring幫我們管理連接可以讓我們專注于業(yè)務(wù)邏輯,就像聲明式事務(wù)一樣易用,方便又高效。


          祝有好收獲,先贊后看,快樂無限。

          本文代碼:?

          • https://gitee.com/he-erduo/spring-boot-learning-demo
          • https://github.com/he-erduo/spring-boot-learning-demo

          1. ?環(huán)境配置

          第一節(jié)我們先來搞一下環(huán)境的配置,上一篇中我們已經(jīng)引入了自動(dòng)配置的包,我們既然使用了自動(dòng)配置的方式,那RabbitMQ的連接信息我們直接放在配置文件中就行了,就像我們需要用到JDBC連接的時(shí)候去配置一下DataSource一樣。

          如圖所示,我們只需要指明一下連接的IP+端口號(hào)和用戶名密碼就行了,這里我用的是默認(rèn)的用戶名與密碼,不寫的話默認(rèn)也都是guest,端口號(hào)也是默認(rèn)5672。

          主要我們需要看一下手動(dòng)確認(rèn)消息的配置,需要配置成manual才是手動(dòng)確認(rèn),日后還會(huì)有其他的配置項(xiàng),眼下我們配置這一個(gè)就可以了。


          接下來我們要配置一個(gè)Queue,上一篇中我們往一個(gè)名叫erduo的隊(duì)列中發(fā)送消息,當(dāng)時(shí)是我們手動(dòng)定義的此隊(duì)列,這里我們也需要手動(dòng)配置,聲明一個(gè)Bean就可以了。

          @Configuration
          public?class?RabbitmqConfig?{
          ????@Bean
          ????public?Queue?erduo()?{
          ????????//?其三個(gè)參數(shù):durable exclusive autoDelete
          ????????//?一般只設(shè)置一下持久化即可
          ????????return?new?Queue("erduo",true);
          ????}

          }

          就這么簡(jiǎn)單聲明一下就可以了,當(dāng)然了RabbitMQ畢竟是一個(gè)獨(dú)立的組件,如果你在RabbitMQ中通過其他方式已經(jīng)創(chuàng)建過一個(gè)名叫erduo的隊(duì)列了,你這里也可以不聲明,這里起到的一個(gè)效果就是如果你沒有這個(gè)隊(duì)列,會(huì)按照你聲明的方式幫你創(chuàng)建這個(gè)隊(duì)列。

          配置完環(huán)境之后,我們就可以以SpringBoot的方式來編寫生產(chǎn)者和消費(fèi)者了。

          2. ?生產(chǎn)者與RabbitTemplate

          和上一篇的節(jié)奏一樣,我們先來編寫生產(chǎn)者,不過這次我要引入一個(gè)新的工具:RabbitTemplate。

          聽它的這個(gè)名字就知道,又是一個(gè)拿來即用的工具類,Spring家族這點(diǎn)就很舒服,什么東西都給你封裝一遍,讓你用起來更方便更順手。

          RabbitTemplate實(shí)現(xiàn)了標(biāo)準(zhǔn)AmqpTemplate接口,功能大致可以分為發(fā)送消息和接受消息。

          我們這里是在生產(chǎn)者中來用,主要就是使用它的發(fā)送消息功能:sendconvertAndSend方法。

          //?發(fā)送消息到默認(rèn)的Exchange,使用默認(rèn)的routing?key
          void?send(Message?message)?throws?AmqpException;

          //?使用指定的routing?key發(fā)送消息到默認(rèn)的exchange
          void?send(String?routingKey,?Message?message)?throws?AmqpException;

          //?使用指定的routing?key發(fā)送消息到指定的exchange
          void?send(String?exchange,?String?routingKey,?Message?message)?throws?AmqpException;

          send方法是發(fā)送byte數(shù)組的數(shù)據(jù)的模式,這里代表消息內(nèi)容的對(duì)象是Message對(duì)象,它的構(gòu)造方法就是傳入byte數(shù)組數(shù)據(jù),所以我們需要把我們的數(shù)據(jù)轉(zhuǎn)成byte數(shù)組然后構(gòu)造成一個(gè)Message對(duì)象再進(jìn)行發(fā)送。

          //?Object類型,可以傳入POJO
          void?convertAndSend(Object?message)?throws?AmqpException;

          void?convertAndSend(String?routingKey,?Object?message)?throws?AmqpException;

          void?convertAndSend(String?exchange,?String?routingKey,?Object?message)?throws?AmqpException;

          convertAndSend方法是可以傳入POJO對(duì)象作為參數(shù),底層是有一個(gè)MessageConverter幫我們自動(dòng)將數(shù)據(jù)轉(zhuǎn)換成byte類型或String或序列化類型。

          所以這里支持的傳入對(duì)象也只有三種:byte類型,String類型和實(shí)現(xiàn)了Serializable接口的POJO。


          介紹完了,我們可以看一下代碼:

          @Slf4j
          @Component("rabbitProduce")
          public?class?RabbitProduce?{
          ????@Autowired
          ????private?RabbitTemplate?rabbitTemplate;

          ????public?void?send()?{
          ????????String?message?=?"Hello 我是作者和耳朵,歡迎關(guān)注我。"?+?LocalDateTime.now().toString();

          ????????System.out.println("Message?content?:?"?+?message);

          ????????//?指定消息類型
          ????????MessageProperties?props?=?MessagePropertiesBuilder.newInstance()
          ????????????????.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build();

          ????????rabbitTemplate.send(Producer.QUEUE_NAME,new?Message(message.getBytes(StandardCharsets.UTF_8),props));
          ????????System.out.println("消息發(fā)送完畢。");
          ????}

          ????public?void?convertAndSend()?{
          ????????User?user?=?new?User();

          ????????System.out.println("Message?content?:?"?+?user);

          ????????rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user);
          ????????System.out.println("消息發(fā)送完畢。");
          ????}

          }

          這里我特意寫明了兩個(gè)例子,一個(gè)用來測(cè)試send,另一個(gè)用來測(cè)試convertAndSend。

          send方法里我們看下來和之前的代碼是幾乎一樣的,定義一個(gè)消息,然后直接send,但是這個(gè)構(gòu)造消息的構(gòu)造方法可能比我們想的要多一個(gè)參數(shù),我們?cè)瓉碚f的只要把數(shù)據(jù)轉(zhuǎn)成二進(jìn)制數(shù)組放進(jìn)去即可,現(xiàn)在看來還要多放一個(gè)參數(shù)了。

          MessageProperties,是的我們需要多放一個(gè)MessageProperties對(duì)象,從他的名字我們也可以看出它的功能就是附帶一些參數(shù),但是某些參數(shù)是少不了的,不帶不行。

          比如我的代碼這里就是設(shè)置了一下消息的類型,消息的類型有很多種可以是二進(jìn)制類型,文本類型,或者序列化類型,JSON類型,我這里設(shè)置的就是文本類型,指定類型是必須的,也可以為我們拿到消息之后要將消息轉(zhuǎn)換成什么樣的對(duì)象提供一個(gè)參考。

          convertAndSend方法就要簡(jiǎn)單太多,這里我放了一個(gè)User對(duì)象拿來測(cè)試用,直接指定隊(duì)列然后放入這個(gè)對(duì)象即可。

          Tips:User必須實(shí)現(xiàn)Serializable接口,不然的話調(diào)用此方法的時(shí)候會(huì)拋出IllegalArgumentException異常。


          代碼完成之后我們就可以調(diào)用了,這里我寫一個(gè)測(cè)試類進(jìn)行調(diào)用:

          @SpringBootTest
          public?class?RabbitProduceTest?{
          ????@Autowired
          ????private?RabbitProduce?rabbitProduce;

          ????@Test
          ????public?void?sendSimpleMessage()?{
          ????????rabbitProduce.send();
          ????????rabbitProduce.convertAndSend();
          ????}
          }

          效果如下圖~

          同時(shí)在控制臺(tái)使用命令rabbitmqctl.bat list_queues查看隊(duì)列-erduo現(xiàn)在的情況:

          如此一來,我們的生產(chǎn)者測(cè)試就算完成了,現(xiàn)在消息隊(duì)列里兩條消息了,而且消息類型肯定不一樣,一個(gè)是我們?cè)O(shè)置的文本類型,一個(gè)是自動(dòng)設(shè)置的序列化類型。

          3. ?消費(fèi)者與RabbitListener

          既然隊(duì)列里面已經(jīng)有消息了,接下來我們就要看我們?cè)撊绾瓮ㄟ^新的方式拿到消息并消費(fèi)與確認(rèn)了。

          消費(fèi)者這里我們要用到@RabbitListener來幫我們拿到指定隊(duì)列消息,它的用法很簡(jiǎn)單也很復(fù)雜,我們可以先來說簡(jiǎn)單的方式,直接放到方法上,指定監(jiān)聽的隊(duì)列就行了。

          @Slf4j
          @Component("rabbitConsumer")
          public?class?RabbitConsumer?{

          ????@RabbitListener(queues?=?Producer.QUEUE_NAME)
          ????public?void?onMessage(Message?message,?Channel?channel)?throws?Exception?{
          ????????System.out.println("Message?content?:?"?+?message);
          ????????channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
          ????????System.out.println("消息已確認(rèn)");
          ????}

          }

          這段代碼就代表onMessage方法會(huì)處理erduo(Producer.QUEUE_NAME是常量字符串"erduo")隊(duì)列中的消息。

          我們可以看到這個(gè)方法里面有兩個(gè)參數(shù),MessageChannel,如果用不到Channel可以不寫此參數(shù),但是Message消息一定是要的,它代表了消息本身。

          我們可以想想,我們的程序從RabbitMQ之中拉回一條條消息之后,要以怎么樣的方式展示給我們呢?

          沒錯(cuò),就是封裝為一個(gè)個(gè)Message對(duì)象,這里面放入了一條消息的所有信息,數(shù)據(jù)結(jié)構(gòu)是什么樣一會(huì)我一run你就能看到了。

          同時(shí)這里我們使用Channel做一個(gè)消息確認(rèn)的操作,這里的DeliveryTag代表的是這個(gè)消息在隊(duì)列中的序號(hào),這個(gè)信息存放在MessageProperties中。

          4. ?SpringBoot 啟動(dòng)!

          編寫完生產(chǎn)者和消費(fèi)者,同時(shí)已經(jīng)運(yùn)行過生產(chǎn)者往消息隊(duì)列里面放了兩條信息,接下來我們可以直接啟動(dòng)消息,查看消費(fèi)情況:

          在我紅色框線標(biāo)記的地方可以看到,因?yàn)槲覀冇辛讼M(fèi)者所以項(xiàng)目啟動(dòng)后先和RabbitMQ建立了一個(gè)連接進(jìn)行監(jiān)聽隊(duì)列。

          隨后就開始消費(fèi)我們隊(duì)列中的兩條消息:

          第一條信息是contentType=text/plain類型,所以直接就在控制臺(tái)上打印出了具體內(nèi)容。

          第二條信息是contentType=application/x-java-serialized-object,在打印的時(shí)候只打印了一個(gè)內(nèi)存地址+字節(jié)大小。

          不管怎么說,數(shù)據(jù)我們是拿到了,也就是代表我們的消費(fèi)是沒有問題的,同時(shí)也都進(jìn)行了消息確認(rèn)操作,從數(shù)據(jù)上看,整個(gè)消息可以分為兩部分:bodyMessageProperties

          我們可以單獨(dú)使用一個(gè)注解拿到這個(gè)body的內(nèi)容 - @Payload

          @RabbitListener(queues?=?Producer.QUEUE_NAME)
          public?void?onMessage(@Payload?String?body,?Channel?channel)?throws?Exception?{
          ????System.out.println("Message?content?:?"?+?body);
          }

          也可以單獨(dú)使用一個(gè)注解拿到MessageProperties的headers屬性,headers屬性在截圖里也可以看到,只不過是個(gè)空的 - @Headers。

          @RabbitListener(queues?=?Producer.QUEUE_NAME)
          public?void?onMessage(@Payload?String?body,?@Headers?Map?headers)?throws?Exception?{
          ????System.out.println("Message?content?:?"?+?body);
          ????System.out.println("Message?headers?:?"?+?headers);
          }

          這兩個(gè)注解都算是擴(kuò)展知識(shí),我還是更喜歡直接拿到全部,全都要!??!

          上面我們已經(jīng)完成了消息的發(fā)送與消費(fèi),整個(gè)過程我們可以再次回想一下,一切都和我畫的這張圖上一樣的軌跡:

          只不過我們一直沒有指定Exchage一直使用的默認(rèn)路由,希望大家好好記住這張圖。

          5. ?@RabbitListener與@RabbitHandler

          下面再來補(bǔ)一些知識(shí)點(diǎn),有關(guān)@RabbitListener@RabbitHandler

          @RabbitListener上面我們已經(jīng)簡(jiǎn)單的進(jìn)行了使用,稍微擴(kuò)展一下它其實(shí)是可以監(jiān)聽多個(gè)隊(duì)列的,就像這樣:

          @RabbitListener(queues?=?{?"queue1",?"queue2"?})
          public?void?onMessage(Message?message,?Channel?channel)?throws?Exception?{
          ????System.out.println("Message?content?:?"?+?message);
          ????channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
          ????System.out.println("消息已確認(rèn)");
          }

          還有一些其他的特性如綁定之類的,這里不再贅述因?yàn)樘簿幋a了一般用不上。

          下面來說說這節(jié)要主要講的一個(gè)特性:@RabbitListener和@RabbitHandler的搭配使用。

          前面我們沒有提到,@RabbitListener注解其實(shí)是可以注解在類上的,這個(gè)注解在類上標(biāo)志著這個(gè)類監(jiān)聽某個(gè)隊(duì)列或某些隊(duì)列。

          這兩個(gè)注解的搭配使用就要讓@RabbitListener注解在類上,然后用@RabbitHandler注解在方法上,根據(jù)方法參數(shù)的不同自動(dòng)識(shí)別并去消費(fèi),寫個(gè)例子給大家看一看更直觀一些。

          @Slf4j
          @Component("rabbitConsumer")
          @RabbitListener(queues?=?Producer.QUEUE_NAME)
          public?class?RabbitConsumer?{

          ????@RabbitHandler
          ????public?void?onMessage(@Payload?String?message){
          ????????System.out.println("Message?content?:?"?+?message);
          ????}

          ????@RabbitHandler
          ????public?void?onMessage(@Payload?User?user)?{
          ????????System.out.println("Message?content?:?"?+?user);
          ????}
          }

          大家可以看看這個(gè)例子,我們先用@RabbitListener監(jiān)聽erduo隊(duì)列中的消息,然后使用@RabbitHandler注解了兩個(gè)方法。

          • 第一個(gè)方法的body類型是String類型,這就代表著這個(gè)方法只能處理文本類型的消息。
          • 第二個(gè)方法的body類型是User類型,這就代表著這個(gè)方法只能處理序列化類型且為User類型的消息。

          這兩個(gè)方法正好對(duì)應(yīng)著我們第二節(jié)中測(cè)試類會(huì)發(fā)送的兩種消息,所以我們往RabbitMQ中發(fā)送兩條測(cè)試消息,用來測(cè)試這段代碼,看看效果:

          都在控制臺(tái)上如常打印了,如果@RabbitHandler注解的方法中沒有一個(gè)的類型可以和你消息的類型對(duì)的上,比如消息都是byte數(shù)組類型,這里沒有對(duì)應(yīng)的方法去接收,系統(tǒng)就會(huì)在控制臺(tái)不斷的報(bào)錯(cuò),如果你出現(xiàn)這個(gè)情況就證明你類型寫的不正確。

          假設(shè)你的erduo隊(duì)列中會(huì)出現(xiàn)三種類型的消息:byte,文本和序列化,那你就必須要有對(duì)應(yīng)的處理這三種消息的方法,不然消息發(fā)過來的時(shí)候就會(huì)因?yàn)闊o法正確轉(zhuǎn)換而報(bào)錯(cuò)。

          而且使用了@RabbitHandler注解之后就不能再和之前一樣使用Message做接收類型。

          @RabbitHandler
          public?void?onMessage(Message?message,?Channel?channel)?throws?Exception?{
          ????System.out.println("Message?content?:?"?+?message);
          ????channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
          ????System.out.println("消息已確認(rèn)");
          }

          這樣寫的話會(huì)報(bào)類型轉(zhuǎn)換異常的,所以二者選其一。

          同時(shí)上文我的@RabbitHandler沒有進(jìn)行消息確認(rèn),大家可以自己試一下進(jìn)行消息確認(rèn)。

          6. ?消息的序列化轉(zhuǎn)換

          通過上文我們已經(jīng)知道,能被自動(dòng)轉(zhuǎn)換的對(duì)象只有byte[]、Stringjava序列化對(duì)象(實(shí)現(xiàn)了Serializable接口的對(duì)象),但是并不是所有的Java對(duì)象都會(huì)去實(shí)現(xiàn)Serializable接口,而且序列化的過程中使用的是JDK自帶的序列化方法,效率低下。

          所以我們更普遍的做法是:使用Jackson先將數(shù)據(jù)轉(zhuǎn)換成JSON格式發(fā)送給RabbitMQ,再接收消息的時(shí)候再用Jackson將數(shù)據(jù)反序列化出來。

          這樣做可以完美解決上面的痛點(diǎn):消息對(duì)象既不必再去實(shí)現(xiàn)Serializable接口,也有比較高的效率(Jackson序列化效率業(yè)界應(yīng)該是最好的了)。

          默認(rèn)的消息轉(zhuǎn)換方案是消息轉(zhuǎn)換頂層接口-MessageConverter的一個(gè)子類:SimpleMessageConverter,我們?nèi)绻獡Q到另一個(gè)消息轉(zhuǎn)換器只需要替換掉這個(gè)轉(zhuǎn)換器就行了。

          上圖是MessageConverter結(jié)構(gòu)樹的結(jié)構(gòu)樹,可以看到除了SimpleMessageConverter之外還有一個(gè)Jackson2JsonMessageConverter,我們只需要將它定義為Bean,就可以直接使用這個(gè)轉(zhuǎn)換器了。

          @Bean
          ????public?MessageConverter?jackson2JsonMessageConverter()?{
          ????????return?new?Jackson2JsonMessageConverter(jacksonObjectMapper);
          ????}

          這樣就可以了,這里的jacksonObjectMapper可以不傳入,但是默認(rèn)的ObjectMapper方案對(duì)JDK8的時(shí)間日期序列化會(huì)不太友好,具體可以參考我的上一篇文章:從LocalDateTime序列化探討全局一致性序列化,總的來說就是定義了自己的ObjectMapper。

          同時(shí)為了接下來測(cè)試方便,我又定義了一個(gè)專門測(cè)試JSON序列化的隊(duì)列:

          @Bean
          public?Queue?erduoJson()?{
          ????//?其三個(gè)參數(shù):durable exclusive autoDelete
          ????//?一般只設(shè)置一下持久化即可
          ????return?new?Queue("erduo_json",true);
          }

          如此之后就可以進(jìn)行測(cè)試了,先是生產(chǎn)者代碼

          public?void?sendObject()?{
          ????????Client?client?=?new?Client();

          ????????System.out.println("Message?content?:?"?+?client);

          ????????rabbitTemplate.convertAndSend(RabbitJsonConsumer.JSON_QUEUE,client);
          ????????System.out.println("消息發(fā)送完畢。");
          ????}

          我又重新定義了一個(gè)Client對(duì)象,它和之前測(cè)試使用的User對(duì)象成員變量都是一樣的,不一樣的是它沒有實(shí)現(xiàn)Serializable接口。

          同時(shí)為了保留之前的測(cè)試代碼,我又新建了一個(gè)RabbitJsonConsumer,用于測(cè)試JSON序列化的相關(guān)消費(fèi)代碼,里面定義了一個(gè)靜態(tài)變量:JSON_QUEUE = "erduo_json";

          所以這段代碼是將Client對(duì)象作為消息發(fā)送到"erduo_json"隊(duì)列中去,隨后我們?cè)跍y(cè)試類中run一下進(jìn)行一次發(fā)送。

          緊著是消費(fèi)者代碼

          @Slf4j
          @Component("rabbitJsonConsumer")
          @RabbitListener(queues?=?RabbitJsonConsumer.JSON_QUEUE)
          public?class?RabbitJsonConsumer?{
          ????public?static?final?String?JSON_QUEUE?=?"erduo_json";

          ????@RabbitHandler
          ????public?void?onMessage(Client?client,?@Headers?Map?headers,?Channel?channel)?throws?Exception?{
          ????????System.out.println("Message?content?:?"?+?client);
          ????????System.out.println("Message?headers?:?"?+?headers);
          ????????channel.basicAck((Long)?headers.get(AmqpHeaders.DELIVERY_TAG),false);
          ????????System.out.println("消息已確認(rèn)");
          ????}

          }

          有了上文的經(jīng)驗(yàn)之后,這段代碼理解起來也是很簡(jiǎn)單了吧,同時(shí)給出了上一節(jié)沒寫的如何在@RabbitHandler模式下進(jìn)行消息簽收。

          我們直接來看看效果:

          在打印的Headers里面,往后翻可以看到contentType=application/json,這個(gè)contentType是表明了消息的類型,這里正是說明我們新的消息轉(zhuǎn)換器生效了,將所有消息都轉(zhuǎn)換成了JSON類型。

          后記

          這兩篇講完了RabbitMQ的基本收發(fā)消息,包括手動(dòng)配置和自動(dòng)配置的兩種方式,這些大家仔細(xì)研讀之后應(yīng)該會(huì)對(duì)RabbitMQ收發(fā)消息沒什么疑問了~

          不過我們一直以來發(fā)消息時(shí)都是使用默認(rèn)的交換機(jī),下篇將會(huì)講述一下RabbitMQ的幾種交換機(jī)類型,以及其使用方式。

          本文代碼:碼云地址?GitHub地址

          https://gitee.com/he-erduo/spring-boot-learning-demo?

          https://github.com/he-erduo/spring-boot-learning-demo

          推薦閱讀

          InnoDB索引允許NULL對(duì)性能有影響嗎

          Mysql的binlog和relay-log到底長(zhǎng)啥樣?

          漲姿勢(shì):為啥MySQL官方不推薦使用uuid或者雪花id作為主鍵?

          瀏覽 32
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  2021无码在线观看 | 影音先锋成人在线视频 | 日本天堂免费a | 亚洲电影,操 | 中国一级久久毛 |