<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spring Boot快速集成RocketMQ實(shí)戰(zhàn)教程

          共 8404字,需瀏覽 17分鐘

           ·

          2020-12-26 22:13

          前言

          RocketMQ是目前主流的消息中間件之一,并且自身就支持分布式功能。最初由阿里巴巴團(tuán)隊(duì)開(kāi)發(fā),并且經(jīng)歷過(guò)雙十一等海量消息場(chǎng)景的考驗(yàn),后捐贈(zèng)給Apache開(kāi)源基金會(huì),這也是為什么我們經(jīng)常聽(tīng)說(shuō)RocketMQ是阿里巴巴的消息中間件,項(xiàng)目卻在Apache的頂級(jí)項(xiàng)目中。

          網(wǎng)絡(luò)上通過(guò)SpringBoot集成RocketMQ的教程很多,但大多數(shù)都無(wú)法做到快速、通用的進(jìn)行集成。本篇文章帶大家快速完成基于Spring Boot的集成使用,同時(shí)針對(duì)一些集成過(guò)程中的概念和使用方法以實(shí)例進(jìn)行講解。

          RocketMQ的部署

          關(guān)于RocketMQ的部署,通常有單Master模式、多Master模式、多Master多Slave模式(異步復(fù)制或同步雙寫(xiě))等。

          本文重點(diǎn)介紹RocketMQ的集成部分,就不再這里講解如何部署Master的部署過(guò)程,讀者學(xué)習(xí)時(shí)只需部署單機(jī)模式或基于Docker部署即可。

          依賴集成

          首先創(chuàng)建一個(gè)SpringBoot項(xiàng)目,為了方便通過(guò)瀏覽器訪問(wèn)測(cè)試,引入web對(duì)應(yīng)的starter。

          <parent>    <groupId>org.springframework.bootgroupId>    <artifactId>spring-boot-starter-parentartifactId>    <version>2.4.0version>    <relativePath/> parent><dependencies>    <dependency>        <groupId>org.springframework.bootgroupId>        <artifactId>spring-boot-starter-webartifactId>    dependency>dependencies>

          上面的依賴以及可以完成一個(gè)基于SpringBoot的web項(xiàng)目了。下面需要集成RocketMQ的依賴。

          在此步驟中有兩個(gè)選擇,一個(gè)就是直接引入RocketMQ的依賴,比如:

          <dependency>    <groupId>org.apache.rocketmqgroupId>    <artifactId>rocketmq-clientartifactId>    <version>4.3.0version>dependency>

          但此種方式需要進(jìn)行大量的配置及實(shí)例化操作,并不能夠達(dá)到快速集成、方便使用的目的。

          這里我們采用RocketMQ官方提供的基于spring的集成。項(xiàng)目的源碼及依賴使用位于GitHub上:https://github.com/apache/rocketmq-spring?。

          在該項(xiàng)目的ReadMe中已經(jīng)清晰的描述了如何引入依賴:

          <dependency>    <groupId>org.apache.rocketmqgroupId>    <artifactId>rocketmq-spring-boot-starterartifactId>    <version>${RELEASE.VERSION}version>dependency>

          我們只需按照說(shuō)明,引入對(duì)應(yīng)的依賴即可,這里采用2.1.1版本。因此,引入依賴文件如下:

          <dependency>    <groupId>org.apache.rocketmqgroupId>    <artifactId>rocketmq-spring-boot-starterartifactId>    <version>2.1.1version>dependency>

          引入依賴之后,剩下的就是配置文件的配置和使用了。

          配置文件

          我們知道,SpringBoot默認(rèn)的starter內(nèi)置了很多配置文件,直接通過(guò)yml文件進(jìn)行配置即可使用。這里引入了rocketmq的starter,雖然并不是官方的,但使用方式基本一致。

          在yml文件中配置如下參數(shù):

          rocketmq:
          name-server: 127.0.0.1:9876
          producer:
          group: test-group

          name-server參數(shù)對(duì)應(yīng)的就是部署的RocketMQ的Nameserver服務(wù),如果有多個(gè)的話用英文分號(hào)(;)進(jìn)行分割。

          如果使用的是SpringBoot2.0+的框架或者是JDK10,可將name-server改成nameServer。否則,可能會(huì)出現(xiàn)一些奇怪的bug。

          上面是簡(jiǎn)化了的最基礎(chǔ)的配置,其他的配置均采用默認(rèn)配置,如果需要定制化配置,可對(duì)具體參數(shù)按照統(tǒng)一形式進(jìn)行配置。

          生產(chǎn)者示例

          當(dāng)完成了上面的集成,生產(chǎn)者使用其實(shí)非常簡(jiǎn)單,只需要在使用的地方注入RocketMQTemplate對(duì)象,然后調(diào)用其對(duì)應(yīng)的發(fā)送方法即可。簡(jiǎn)單示例如下:

          @Componentpublic class TestSendService {
          @Resource private RocketMQTemplate rocketMQTemplate;
          public void send() { rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());????}}

          但如果是在項(xiàng)目中,這樣每次使用都注入一個(gè)RocketMQTemplate并不符合面向?qū)ο蟮乃枷?,而且RocketMQTemplate還提供了多個(gè)常用的方法,比如同步、異步、直接發(fā)送等模式。

          我們可以將其封裝成為一個(gè)通用的Service,這樣其他服務(wù)只需注入對(duì)應(yīng)的Service,調(diào)用公共的方法即可,并且注明每個(gè)方法的使用場(chǎng)景。

          抽象出來(lái)的Service接口如下:

          /** * Rocket MQ 對(duì)應(yīng)服務(wù)封裝 * **/public interface RocketMqService {
          /** * 同步發(fā)送消息
          *

          * 當(dāng)發(fā)送的消息很重要是,且對(duì)響應(yīng)時(shí)間不敏感的時(shí)候采用sync方式; * * @param mqMsg 發(fā)送消息實(shí)體類 */ void send(MqMsg mqMsg);
          /** * 異步發(fā)送消息,異步返回消息結(jié)果
          *

          * 當(dāng)發(fā)送的消息很重要,且對(duì)響應(yīng)時(shí)間非常敏感的時(shí)候采用async方式; * * @param mqMsg 發(fā)送消息實(shí)體類 */ void asyncSend(MqMsg mqMsg);
          /** * 直接發(fā)送發(fā)送消息,不關(guān)心返回結(jié)果,容易消息丟失,適合日志收集、不精確統(tǒng)計(jì)等消息發(fā)送;
          *

          * 當(dāng)發(fā)送的消息不重要時(shí),采用one-way方式,以提高吞吐量; * * @param mqMsg 發(fā)送消息實(shí)體類 */ void syncSendOrderly(MqMsg mqMsg);
          }

          定義了不同類型消息發(fā)送的方法,同時(shí)在注釋部分說(shuō)明具體方法的使用場(chǎng)景。其中將發(fā)送的參數(shù)封裝為MqMsg對(duì)象,MqMsg的結(jié)構(gòu)如下:

          public class MqMsg {
          /** * 一級(jí)消息:消息topic */ private String topic;
          /** * 二級(jí)消息:消息topic對(duì)應(yīng)的tags */ private String tags;
          /** * 消息內(nèi)容 */ private String content; // 省略getter/setter方法}

          其中,topic為消息的主題,content為消息的內(nèi)容,具體內(nèi)容可根據(jù)生產(chǎn)者和消費(fèi)者之間進(jìn)行協(xié)定。

          針對(duì)上述的接口,提供具體的方法實(shí)現(xiàn):

          @Service("rocketMqService")public class RocketMqServiceImpl implements RocketMqService {
          private static final Logger log = LoggerFactory.getLogger(RocketMqServiceImpl.class);
          @Resource private RocketMQTemplate rocketMQTemplate;
          @Override public void send(MqMsg mqMsg) { log.info("send發(fā)送消息到mqMsg={}", mqMsg); rocketMQTemplate.send(mqMsg.getTopic() + ":" + mqMsg.getTags(), MessageBuilder.withPayload(mqMsg.getContent()).build()); }
          @Override public void asyncSend(MqMsg mqMsg) { log.info("asyncSend發(fā)送消息到mqMsg={}", mqMsg); rocketMQTemplate.asyncSend(mqMsg.getTopic() + ":" + mqMsg.getTags(), mqMsg.getContent(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 成功不做日志記錄或處理 }
          @Override public void onException(Throwable throwable) { log.info("mqMsg={}消息發(fā)送失敗", mqMsg); } }); }
          @Override public void syncSendOrderly(MqMsg mqMsg) { log.info("syncSendOrderly發(fā)送消息到mqMsg={}", mqMsg); rocketMQTemplate.sendOneWay(mqMsg.getTopic() + ":" + mqMsg.getTags(), mqMsg.getContent()); }}

          其中異步發(fā)送方法asyncSend的異步返回結(jié)果中可以根據(jù)具體的業(yè)務(wù)場(chǎng)景進(jìn)行針對(duì)性的處理。

          上述方法中均默認(rèn)使用了tag對(duì)topic進(jìn)行分類。如果具體的業(yè)務(wù)中不需要tag,則可對(duì)上述方法中拼接的冒號(hào)+tag部分去除。本實(shí)例使用tag進(jìn)行分類,方便用到時(shí)可以借鑒。

          完成了上面的封裝之后,在業(yè)務(wù)使用場(chǎng)景中只需注入對(duì)應(yīng)的Service即可,后面測(cè)試時(shí)我們會(huì)進(jìn)行演示。

          消費(fèi)者示例

          關(guān)于消費(fèi)者我們可以直接實(shí)現(xiàn)RocketMQListener接口,然后通過(guò)@RocketMQMessageListener注解來(lái)匹配目標(biāo)消息。

          這里為了演示統(tǒng)一topic下不同的tag的使用方法,分兩個(gè)消費(fèi)者來(lái)進(jìn)行演示,直接看代碼:

          /** * 消息隊(duì)列消費(fèi)端使用示例 * **/@Service@RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC        , consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_REGISTERED        , selectorExpression = MqTopicConstant.DEMO_TAG_REGISTERED)public class MqRegisteredListenerDemo implements RocketMQListener<String> {
          private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class);
          @Override public void onMessage(String message) { log.info("received registered message: {}", message); }}

          上面的消費(fèi)者監(jiān)聽(tīng)器是針對(duì)DEMO_TAG_REGISTERED(自定義變量)這個(gè)tag來(lái)進(jìn)行處理的,如果不需要tag,可去掉selectorExpression元素配置即可。@RocketMQMessageListener注解里selectorExpression默認(rèn)是*,也就是接收topic下全部消息。

          上面用到了常量類MqTopicConstant,主要是統(tǒng)一定義對(duì)應(yīng)的topic和tag,內(nèi)容如下:

          /** * 消息隊(duì)列相關(guān)常亮配置,包括group、topic、tag * **/public class MqTopicConstant {
          /** * 示例消息隊(duì)列,topic1個(gè) */ public static final String DEMO_TOPIC = "test-top-1";
          /** * 注冊(cè)tag */ public static final String DEMO_TAG_REGISTERED = "registered";
          /** * 修改tag */ public static final String DEMO_TAG_MODIFY = "modify";
          /** * consumer group */ public static final String DEMO_CONSUMER_GROUP_REGISTERED = "consumer_test-top-1_registered";
          public static final String DEMO_CONSUMER_GROUP_MODIFY = "consumer_test-top-1_modify";
          }

          下面再來(lái)看看第二個(gè)監(jiān)聽(tīng)“更新”功能的tag代碼實(shí)現(xiàn):

          /** * 消息隊(duì)列消費(fèi)端使用示例 * **/@Service@RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC        , consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_MODIFY        , selectorExpression = MqTopicConstant.DEMO_TAG_MODIFY)public class MqModifyListenerDemo implements RocketMQListener<String> {
          private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class);
          @Override public void onMessage(String message) { log.info("received modify message: {}", message); }}

          這個(gè)消費(fèi)者與第一個(gè)不同的地方有兩個(gè),均在注解部分。其中topic兩個(gè)是相同的,selectorExpression一個(gè)針對(duì)注冊(cè)功能的tag進(jìn)行過(guò)濾,一個(gè)針對(duì)修改信息的功能進(jìn)行過(guò)濾。而消費(fèi)組consumerGroup對(duì)應(yīng)的值不能相同,否則啟動(dòng)時(shí)會(huì)拋出異常。

          經(jīng)過(guò)上述步驟,已經(jīng)完成了生產(chǎn)者、消費(fèi)者的配置。下面就同一個(gè)測(cè)試來(lái)驗(yàn)證服務(wù)的執(zhí)行情況。

          測(cè)試驗(yàn)證

          定義一個(gè)Controller,用于外部請(qǐng)求,觸發(fā)用戶注冊(cè)和修改操作的消息發(fā)送。關(guān)于測(cè)試也可以使用單元測(cè)試,但需要讓線層阻塞,防止消費(fèi)者還沒(méi)接收到消息時(shí),單元測(cè)試已經(jīng)執(zhí)行完成。

          @RestController@RequestMapping("/demo")public class DemoController {
          @Resource private RocketMqService rocketMqService;
          @GetMapping("/send") public void send() { MqMsg mqMsg = new MqMsg(); mqMsg.setTopic(MqTopicConstant.DEMO_TOPIC); mqMsg.setTags(MqTopicConstant.DEMO_TAG_REGISTERED);
          // 此處可為其他VO對(duì)象,替換掉Map Map<String, String> userInfo = new HashMap<>(); userInfo.put("username", "zhangsan"); userInfo.put("age", "12"); // 此處可封裝為json等格式 mqMsg.setContent(userInfo.toString()); // 第一個(gè)發(fā)送注冊(cè)消息 rocketMqService.asyncSend(mqMsg);
          mqMsg.setTags(MqTopicConstant.DEMO_TAG_MODIFY); userInfo.put("age", "18"); // 此處可封裝為json等格式 mqMsg.setContent(userInfo.toString()); // 發(fā)送修改消息 rocketMqService.asyncSend(mqMsg); }}

          在上述測(cè)試中第一部分發(fā)送了注冊(cè)用戶的消息,第二部分針對(duì)注冊(cè)的消息進(jìn)行了修改,又發(fā)送了一個(gè)消息。消息內(nèi)容直接將Map轉(zhuǎn)換為字符串了,在實(shí)戰(zhàn)的過(guò)程中可根據(jù)雙方協(xié)商,比如采用Json或其他序列化方法。

          然后在瀏覽器訪問(wèn):http://localhost:8080/demo/send?,觸發(fā)消息的發(fā)送。

          此時(shí)查看控制臺(tái),幾乎在瞬間,就可以看到如下日志信息:

          2020-11-24 19:31:17.900  INFO 92401 --- [nio-8080-exec-1] c.e.r.service.impl.RocketMqServiceImpl   : asyncSend發(fā)送消息到mqMsg=com.example.rocketmq.vo.MqMsg@7cda781f2020-11-24 19:31:17.906  INFO 92401 --- [nio-8080-exec-1] c.e.r.service.impl.RocketMqServiceImpl   : asyncSend發(fā)送消息到mqMsg=com.example.rocketmq.vo.MqMsg@7cda781f2020-11-24 19:31:17.942  INFO 92401 --- [MessageThread_1] c.e.r.listener.MqRegisteredListenerDemo  : received registered message: {age=12, username=zhangsan}2020-11-24 19:31:17.942  INFO 92401 --- [MessageThread_1] c.e.r.listener.MqRegisteredListenerDemo  : received modify message: {age=18, username=zhangsan}

          很顯然,消費(fèi)者已經(jīng)成功接收到消息,并且同一個(gè)topic,根據(jù)不同的tag分別進(jìn)行處理。至此,一個(gè)完整是示例演示完畢。

          小結(jié)

          關(guān)于消息隊(duì)列,還是其他很多方法都位于RocketMQTemplate當(dāng)中,根據(jù)業(yè)務(wù)需要可以繼續(xù)封裝對(duì)應(yīng)的Service。關(guān)于tag也還有不同的使用方式,大家可基于該文提供的基本框架和基本思路進(jìn)一步完善。

          瀏覽 56
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  免费看天堂的逼 | 丁香五月婷婷激情网 | 一本色道久久综合无码人妻软件 | 成人免费视频 国产免费麻豆网站 | 欧美操逼视屏 |