Spring Boot快速集成RocketMQ實(shí)戰(zhàn)教程
前言
RocketMQ是目前主流的消息中間件之一,并且自身就支持分布式功能。最初由阿里巴巴團(tuán)隊(duì)開(kāi)發(fā),并且經(jīng)歷過(guò)雙十一等海量消息場(chǎng)景的考驗(yàn),后捐贈(zèng)給Apache開(kāi)源基金會(huì),這也是為什么我們經(jīng)常聽(tīng)說(shuō)RocketMQ是阿里巴巴的消息中間件,項(xiàng)目卻在Apache的頂級(jí)項(xiàng)目中。
網(wǎng)絡(luò)上通過(guò)SpringBoot集成RocketMQ的教程很多,但大多數(shù)都無(wú)法做到快速、通用的進(jìn)行集成。本篇文章帶大家快速完成基于Spring Boot的集成使用,同時(shí)針對(duì)一些集成過(guò)程中的概念和使用方法以實(shí)例進(jìn)行講解。
RocketMQ的部署
關(guān)于RocketMQ的部署,通常有單Master模式、多Master模式、多Master多Slave模式(異步復(fù)制或同步雙寫(xiě))等。
本文重點(diǎn)介紹RocketMQ的集成部分,就不再這里講解如何部署Master的部署過(guò)程,讀者學(xué)習(xí)時(shí)只需部署單機(jī)模式或基于Docker部署即可。
依賴集成
首先創(chuàng)建一個(gè)SpringBoot項(xiàng)目,為了方便通過(guò)瀏覽器訪問(wèn)測(cè)試,引入web對(duì)應(yīng)的starter。
<parent><groupId>org.springframework.bootgroupId><artifactId>spring-boot-starter-parentartifactId><version>2.4.0version><relativePath/>parent><dependencies><dependency><groupId>org.springframework.bootgroupId><artifactId>spring-boot-starter-webartifactId>dependency>dependencies>
上面的依賴以及可以完成一個(gè)基于SpringBoot的web項(xiàng)目了。下面需要集成RocketMQ的依賴。
在此步驟中有兩個(gè)選擇,一個(gè)就是直接引入RocketMQ的依賴,比如:
<dependency><groupId>org.apache.rocketmqgroupId><artifactId>rocketmq-clientartifactId><version>4.3.0version>dependency>
但此種方式需要進(jìn)行大量的配置及實(shí)例化操作,并不能夠達(dá)到快速集成、方便使用的目的。
這里我們采用RocketMQ官方提供的基于spring的集成。項(xiàng)目的源碼及依賴使用位于GitHub上:https://github.com/apache/rocketmq-spring?。
在該項(xiàng)目的ReadMe中已經(jīng)清晰的描述了如何引入依賴:
<dependency><groupId>org.apache.rocketmqgroupId><artifactId>rocketmq-spring-boot-starterartifactId><version>${RELEASE.VERSION}version>dependency>
我們只需按照說(shuō)明,引入對(duì)應(yīng)的依賴即可,這里采用2.1.1版本。因此,引入依賴文件如下:
<dependency><groupId>org.apache.rocketmqgroupId><artifactId>rocketmq-spring-boot-starterartifactId><version>2.1.1version>dependency>
引入依賴之后,剩下的就是配置文件的配置和使用了。
配置文件
我們知道,SpringBoot默認(rèn)的starter內(nèi)置了很多配置文件,直接通過(guò)yml文件進(jìn)行配置即可使用。這里引入了rocketmq的starter,雖然并不是官方的,但使用方式基本一致。
在yml文件中配置如下參數(shù):
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: test-group
name-server參數(shù)對(duì)應(yīng)的就是部署的RocketMQ的Nameserver服務(wù),如果有多個(gè)的話用英文分號(hào)(;)進(jìn)行分割。
如果使用的是SpringBoot2.0+的框架或者是JDK10,可將name-server改成nameServer。否則,可能會(huì)出現(xiàn)一些奇怪的bug。
上面是簡(jiǎn)化了的最基礎(chǔ)的配置,其他的配置均采用默認(rèn)配置,如果需要定制化配置,可對(duì)具體參數(shù)按照統(tǒng)一形式進(jìn)行配置。
生產(chǎn)者示例
當(dāng)完成了上面的集成,生產(chǎn)者使用其實(shí)非常簡(jiǎn)單,只需要在使用的地方注入RocketMQTemplate對(duì)象,然后調(diào)用其對(duì)應(yīng)的發(fā)送方法即可。簡(jiǎn)單示例如下:
public class TestSendService {private RocketMQTemplate rocketMQTemplate;public void send() {rocketMQTemplate.send("test-topic-1",MessageBuilder.withPayload("Hello, World! I'm from spring message").build());????}}
但如果是在項(xiàng)目中,這樣每次使用都注入一個(gè)RocketMQTemplate并不符合面向?qū)ο蟮乃枷?,而且RocketMQTemplate還提供了多個(gè)常用的方法,比如同步、異步、直接發(fā)送等模式。
我們可以將其封裝成為一個(gè)通用的Service,這樣其他服務(wù)只需注入對(duì)應(yīng)的Service,調(diào)用公共的方法即可,并且注明每個(gè)方法的使用場(chǎng)景。
抽象出來(lái)的Service接口如下:
/*** Rocket MQ 對(duì)應(yīng)服務(wù)封裝***/public interface RocketMqService {/*** 同步發(fā)送消息** 當(dāng)發(fā)送的消息很重要是,且對(duì)響應(yīng)時(shí)間不敏感的時(shí)候采用sync方式;** @param mqMsg 發(fā)送消息實(shí)體類*/void send(MqMsg mqMsg);/*** 異步發(fā)送消息,異步返回消息結(jié)果** 當(dāng)發(fā)送的消息很重要,且對(duì)響應(yīng)時(shí)間非常敏感的時(shí)候采用async方式;** @param mqMsg 發(fā)送消息實(shí)體類*/void asyncSend(MqMsg mqMsg);/*** 直接發(fā)送發(fā)送消息,不關(guān)心返回結(jié)果,容易消息丟失,適合日志收集、不精確統(tǒng)計(jì)等消息發(fā)送;** 當(dāng)發(fā)送的消息不重要時(shí),采用one-way方式,以提高吞吐量;** @param mqMsg 發(fā)送消息實(shí)體類*/void syncSendOrderly(MqMsg mqMsg);}
定義了不同類型消息發(fā)送的方法,同時(shí)在注釋部分說(shuō)明具體方法的使用場(chǎng)景。其中將發(fā)送的參數(shù)封裝為MqMsg對(duì)象,MqMsg的結(jié)構(gòu)如下:
public class MqMsg {/*** 一級(jí)消息:消息topic*/private String topic;/*** 二級(jí)消息:消息topic對(duì)應(yīng)的tags*/private String tags;/*** 消息內(nèi)容*/private String content;// 省略getter/setter方法}
其中,topic為消息的主題,content為消息的內(nèi)容,具體內(nèi)容可根據(jù)生產(chǎn)者和消費(fèi)者之間進(jìn)行協(xié)定。
針對(duì)上述的接口,提供具體的方法實(shí)現(xiàn):
("rocketMqService")public class RocketMqServiceImpl implements RocketMqService {private static final Logger log = LoggerFactory.getLogger(RocketMqServiceImpl.class);private RocketMQTemplate rocketMQTemplate;public void send(MqMsg mqMsg) {log.info("send發(fā)送消息到mqMsg={}", mqMsg);rocketMQTemplate.send(mqMsg.getTopic() + ":" + mqMsg.getTags(),MessageBuilder.withPayload(mqMsg.getContent()).build());}public void asyncSend(MqMsg mqMsg) {log.info("asyncSend發(fā)送消息到mqMsg={}", mqMsg);rocketMQTemplate.asyncSend(mqMsg.getTopic() + ":" + mqMsg.getTags(), mqMsg.getContent(),new SendCallback() {public void onSuccess(SendResult sendResult) {// 成功不做日志記錄或處理}public void onException(Throwable throwable) {log.info("mqMsg={}消息發(fā)送失敗", mqMsg);}});}public void syncSendOrderly(MqMsg mqMsg) {log.info("syncSendOrderly發(fā)送消息到mqMsg={}", mqMsg);rocketMQTemplate.sendOneWay(mqMsg.getTopic() + ":" + mqMsg.getTags(), mqMsg.getContent());}}
其中異步發(fā)送方法asyncSend的異步返回結(jié)果中可以根據(jù)具體的業(yè)務(wù)場(chǎng)景進(jìn)行針對(duì)性的處理。
上述方法中均默認(rèn)使用了tag對(duì)topic進(jìn)行分類。如果具體的業(yè)務(wù)中不需要tag,則可對(duì)上述方法中拼接的冒號(hào)+tag部分去除。本實(shí)例使用tag進(jìn)行分類,方便用到時(shí)可以借鑒。
完成了上面的封裝之后,在業(yè)務(wù)使用場(chǎng)景中只需注入對(duì)應(yīng)的Service即可,后面測(cè)試時(shí)我們會(huì)進(jìn)行演示。
消費(fèi)者示例
關(guān)于消費(fèi)者我們可以直接實(shí)現(xiàn)RocketMQListener接口,然后通過(guò)@RocketMQMessageListener注解來(lái)匹配目標(biāo)消息。
這里為了演示統(tǒng)一topic下不同的tag的使用方法,分兩個(gè)消費(fèi)者來(lái)進(jìn)行演示,直接看代碼:
/*** 消息隊(duì)列消費(fèi)端使用示例***/(topic = MqTopicConstant.DEMO_TOPIC, consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_REGISTERED, selectorExpression = MqTopicConstant.DEMO_TAG_REGISTERED)public class MqRegisteredListenerDemo implements RocketMQListener<String> {private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class);public void onMessage(String message) {log.info("received registered message: {}", message);}}
上面的消費(fèi)者監(jiān)聽(tīng)器是針對(duì)DEMO_TAG_REGISTERED(自定義變量)這個(gè)tag來(lái)進(jìn)行處理的,如果不需要tag,可去掉selectorExpression元素配置即可。@RocketMQMessageListener注解里selectorExpression默認(rèn)是*,也就是接收topic下全部消息。
上面用到了常量類MqTopicConstant,主要是統(tǒng)一定義對(duì)應(yīng)的topic和tag,內(nèi)容如下:
/*** 消息隊(duì)列相關(guān)常亮配置,包括group、topic、tag***/public class MqTopicConstant {/*** 示例消息隊(duì)列,topic1個(gè)*/public static final String DEMO_TOPIC = "test-top-1";/*** 注冊(cè)tag*/public static final String DEMO_TAG_REGISTERED = "registered";/*** 修改tag*/public static final String DEMO_TAG_MODIFY = "modify";/*** consumer group*/public static final String DEMO_CONSUMER_GROUP_REGISTERED = "consumer_test-top-1_registered";public static final String DEMO_CONSUMER_GROUP_MODIFY = "consumer_test-top-1_modify";}
下面再來(lái)看看第二個(gè)監(jiān)聽(tīng)“更新”功能的tag代碼實(shí)現(xiàn):
/*** 消息隊(duì)列消費(fèi)端使用示例***/(topic = MqTopicConstant.DEMO_TOPIC, consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_MODIFY, selectorExpression = MqTopicConstant.DEMO_TAG_MODIFY)public class MqModifyListenerDemo implements RocketMQListener<String> {private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class);public void onMessage(String message) {log.info("received modify message: {}", message);}}
這個(gè)消費(fèi)者與第一個(gè)不同的地方有兩個(gè),均在注解部分。其中topic兩個(gè)是相同的,selectorExpression一個(gè)針對(duì)注冊(cè)功能的tag進(jìn)行過(guò)濾,一個(gè)針對(duì)修改信息的功能進(jìn)行過(guò)濾。而消費(fèi)組consumerGroup對(duì)應(yīng)的值不能相同,否則啟動(dòng)時(shí)會(huì)拋出異常。
經(jīng)過(guò)上述步驟,已經(jīng)完成了生產(chǎn)者、消費(fèi)者的配置。下面就同一個(gè)測(cè)試來(lái)驗(yàn)證服務(wù)的執(zhí)行情況。
測(cè)試驗(yàn)證
定義一個(gè)Controller,用于外部請(qǐng)求,觸發(fā)用戶注冊(cè)和修改操作的消息發(fā)送。關(guān)于測(cè)試也可以使用單元測(cè)試,但需要讓線層阻塞,防止消費(fèi)者還沒(méi)接收到消息時(shí),單元測(cè)試已經(jīng)執(zhí)行完成。
("/demo")public class DemoController {private RocketMqService rocketMqService;("/send")public void send() {MqMsg mqMsg = new MqMsg();mqMsg.setTopic(MqTopicConstant.DEMO_TOPIC);mqMsg.setTags(MqTopicConstant.DEMO_TAG_REGISTERED);// 此處可為其他VO對(duì)象,替換掉MapMap<String, String> userInfo = new HashMap<>();userInfo.put("username", "zhangsan");userInfo.put("age", "12");// 此處可封裝為json等格式mqMsg.setContent(userInfo.toString());// 第一個(gè)發(fā)送注冊(cè)消息rocketMqService.asyncSend(mqMsg);mqMsg.setTags(MqTopicConstant.DEMO_TAG_MODIFY);userInfo.put("age", "18");// 此處可封裝為json等格式mqMsg.setContent(userInfo.toString());// 發(fā)送修改消息rocketMqService.asyncSend(mqMsg);}}
在上述測(cè)試中第一部分發(fā)送了注冊(cè)用戶的消息,第二部分針對(duì)注冊(cè)的消息進(jìn)行了修改,又發(fā)送了一個(gè)消息。消息內(nèi)容直接將Map轉(zhuǎn)換為字符串了,在實(shí)戰(zhàn)的過(guò)程中可根據(jù)雙方協(xié)商,比如采用Json或其他序列化方法。
然后在瀏覽器訪問(wèn):http://localhost:8080/demo/send?,觸發(fā)消息的發(fā)送。
此時(shí)查看控制臺(tái),幾乎在瞬間,就可以看到如下日志信息:
2020-11-24 19:31:17.900 INFO 92401 --- [nio-8080-exec-1] c.e.r.service.impl.RocketMqServiceImpl : asyncSend發(fā)送消息到mqMsg=com.example.rocketmq.vo.MqMsg@7cda781f2020-11-24 19:31:17.906 INFO 92401 --- [nio-8080-exec-1] c.e.r.service.impl.RocketMqServiceImpl : asyncSend發(fā)送消息到mqMsg=com.example.rocketmq.vo.MqMsg@7cda781f2020-11-24 19:31:17.942 INFO 92401 --- [MessageThread_1] c.e.r.listener.MqRegisteredListenerDemo : received registered message: {age=12, username=zhangsan}2020-11-24 19:31:17.942 INFO 92401 --- [MessageThread_1] c.e.r.listener.MqRegisteredListenerDemo : received modify message: {age=18, username=zhangsan}
很顯然,消費(fèi)者已經(jīng)成功接收到消息,并且同一個(gè)topic,根據(jù)不同的tag分別進(jìn)行處理。至此,一個(gè)完整是示例演示完畢。
小結(jié)
關(guān)于消息隊(duì)列,還是其他很多方法都位于RocketMQTemplate當(dāng)中,根據(jù)業(yè)務(wù)需要可以繼續(xù)封裝對(duì)應(yīng)的Service。關(guān)于tag也還有不同的使用方式,大家可基于該文提供的基本框架和基本思路進(jìn)一步完善。
