springboot整合ActiveMQ實現(xiàn)異步交易了解下?

首先說明下,今天的內(nèi)容是實打?qū)嵉母韶洠液芏喙揪€上環(huán)境就是這么搞的
前言
前段時間,我們分享了ActiveMQ的一些基本知識,介紹了ActiveMQ的簡單部署和基本用法,演示了java環(huán)境使用ActiveMQ收發(fā)消息的簡單操作,但當時只講了ActiveMQ,demo也不是web項目,距離我們實際應(yīng)用確實也比較遠,為了讓各位小伙伴更夠更直觀地了解ActiveMQ的實際應(yīng)用場景,體會到異步交易的魅力,今天我們通過一個小小的demo,來看下springboot和ActiveMQ的整合應(yīng)用。
今天的核心知識點就兩個:
Springboot異步交易springboot整合ActiveMQ
好了,話不多說,我們直接開始。
正文
我們的內(nèi)容,是以文件異步導(dǎo)出業(yè)務(wù)為例寫的一些業(yè)務(wù)代碼。我先簡單說下業(yè)務(wù)處理過程,第一步,用戶發(fā)起文件導(dǎo)出請求,后端接收到前端請求后,驗證請求參數(shù),并發(fā)起異步文件導(dǎo)出交易,交易發(fā)起成功后返回結(jié)果。
第二步,導(dǎo)出成功后,用戶可以在文件下載中心進行下載。

為了演示方便,我把所有數(shù)據(jù)都存放在reids里面了,一般實際項目中會把文件信息存放在數(shù)據(jù)庫中,處理成功后才會放進緩存。項目的完整源碼附在文末,有興趣的小伙伴自己去看。
啟用JMS
創(chuàng)建項目,我們這里就不介紹,到今天還不會搭建springboot開發(fā)環(huán)境,確實該面壁思過了。項目創(chuàng)建完成后,在springboot入口加上如下配置啟用jms(java message servic):
@EnableJms
引入依賴
除了spring-boot-starter-web,這里我們還需要引入如下依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.10</version>
</dependency>
<!-- 在2.X版本,spring.activemq.pool.enabled=true時,需依賴該jar -->
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
<version>1.0.3</version>
</dependency>
這里簡單說明下,第一個依賴是activemq的starter,是activemq組件的核心依賴,所有的組件都是基于他展開的;
第二個依賴是activemq的連接池,類似于數(shù)據(jù)庫連接池;
第三個依賴是activemq自動配置類依賴的包。
后面兩個依賴是可選的,如果你啟用了activeMQ連接池(spring.activemq.pool.enabled=true時),那你就必須依賴,沒有依賴的話,sprinbgoot啟動會報錯:

主要原因是activemq的自動配置時依賴了這個包,沒有這個包Jms的連接工廠是無法被初始化的:


有興趣的小伙伴可以自己把這個依賴先拿掉試下。
添加配置
完成上面的工作,我們要啟動本地的ActvieMQ服務(wù),然后添加ActvieMQ配置信息:
spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100
如果不需要連接池,后面兩個配置可以直接拿掉。
消息發(fā)送接口
發(fā)送接口 就是消息的生產(chǎn)者,springboot提供了消息的模板類(JmsMessagingTemplate),我們可以通過Autowired注入使用:
@Service
public class JmsSendService {
@Autowired
private JmsMessagingTemplate jmsTemplate;
public void sendMessage(String queueName, String message) {
jmsTemplate.convertAndSend(queueName, message);
}
}
ActiveMQ支持有返回值和無返回值兩種會話形式,你可以根據(jù)自己的需要選擇,JmsMessagingTemplate都是支持的,提供的模板方法也比較豐富:

這里我們只用到了convertAndSend,字面意思就是方法的意思,object是消息內(nèi)容,destination是消息隊列名稱,看下源碼你就知道,方法內(nèi)部會把我們的消息內(nèi)容轉(zhuǎn)換成Message對象,當然如果你有特殊需求,你也可以自己組裝Message,只是過程比較繁瑣,簡單業(yè)務(wù)的話,用我這種方式就比較簡便了。
如果你需要接收返回值,那你可以調(diào)用sendAndReceive(Message<T> var)接口來實現(xiàn),但是需要你自己定義自己的Message<T>,需要實現(xiàn)Message<T>接口。
class StringMessage implements Message<String> {
private String payload;
private MessageHeaders messageHeaders;
public StringMessage(String payload) {
this.payload = payload;
}
@Override
public String getPayload() {
return this.payload;
}
@Override
public MessageHeaders getHeaders() {
return this.messageHeaders;
}
}
調(diào)用sendAndReceive:
public String sendAndReceive(String queueName, String message) {
Message<?> messageBack = jmsTemplate.sendAndReceive(queueName, new StringMessage(message));
return (String)messageBack.getPayload();
}
JmsMessagingTemplate其實就是springboot抽象出來的一個通用的消息發(fā)送模板,它理論上是可以支持所有mq的,只需要官方提供starter即可,對開發(fā)者來說,確實比較友好,只需要修改配置,剩下的就不用管了,很方便有木有。
這里是servicce層的實現(xiàn)過程:
/**
* 文件導(dǎo)出
* @param name
* @param userId
* @return
*/
public JSONObject export(String userId, String name) {
JSONObject result = new JSONObject();
result.put("userId", userId);
result.put("type", 0);
String uuId = UUIDUtil.getUUId();
result.put("fileId", uuId);
result.put("name", name);
// 異步導(dǎo)出文件
doExport(result);
result.put("success", true);
result.put("code", 0);
result.put("message", "數(shù)據(jù)導(dǎo)出提交成功,請稍后到文件中心下載!");
return result;
}
springboot異步交易
導(dǎo)出文件方法doExport內(nèi)部,我們使用了多線程異步交易,這樣的好處是把業(yè)務(wù)邏輯都放進異步交易中處理,可以將響應(yīng)結(jié)果更快地呈現(xiàn)給用戶,讓接口響應(yīng)更快。這里我們插個樓,講一些springboot異步線程池的用法。
啟用異步交易
springboot啟動異步交易很簡單,只需要在項目入口加上@EnableAsync即可
添加異步線程池配置
配置線程池大小
@Configuration
public class ExcuterConfig {
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(150);
executor.setQueueCapacity(500);
return executor;
}
}
使用線程池
這里的taskExecutor就是我們前面配置的方法名。這里的異步線程和mq的異步交易是不一樣的。線程池大小是固定的,當所有線程被阻塞,線程池隊列也被占滿,有新的交易進來時,線程池會因為資源耗盡報錯,這時候后續(xù)業(yè)務(wù)是無法正常處理的;但是mq基本上是不存在阻塞資源耗盡的情況的(除非資源耗盡),特別是對于不需要有返回指定的交易,它只是一個消息倉庫,只要消息不被消費,消息是可以一直存在的,也不會超時。
@Async("taskExecutor")
void doExport(JSONObject jsonObject) {
try {
String name = jsonObject.getString("name");
if (StringUtils.hasLength(name)) {
String userId = jsonObject.getString("userId");
String uuId = jsonObject.getString("fileId");
// 其他數(shù)據(jù)校驗,這里通過睡眠模擬
Thread.sleep(1000L);
// 組裝保存文件信息
jsonObject.put("type", 0);
jsonObject.put("isDownload", false);
jsonObject.put("createTime", System.currentTimeMillis());
// 保存文件數(shù)據(jù),實際業(yè)務(wù)中,這部分應(yīng)該是存在數(shù)據(jù)庫里的,這里為了演示方便,直接存在數(shù)據(jù)庫里了
redisUtil.setString(String.format("fileExport.%s.%s", userId, uuId), jsonObject.toJSONString());
// 發(fā)送文件導(dǎo)出業(yè)務(wù)消息
jmsSendService.sendMessage("file-export-queue", jsonObject.toJSONString());
}
} catch (Exception e) {
logger.error("數(shù)據(jù)導(dǎo)出錯誤", e);
}
}
消息接收消費
這里主要是通過@JmsListener創(chuàng)建了一個消息監(jiān)聽器,監(jiān)聽ActiveMQ指定隊列的狀態(tài),當有新的消息進來時,該方法會被執(zhí)行。方法內(nèi)部是我們要異步業(yè)務(wù)處理過程。針對不同的業(yè)務(wù)類別,你可以指定不同的隊列名稱,但是同一個業(yè)務(wù)的發(fā)送方和消費者必須是相同的隊列名稱,否則是無法被消費的。
@JmsListener(destination = "file-export-queue", containerFactory = "jmsListenerContainerFactory")
public void testMq(String message) {
logger.info("文件導(dǎo)出業(yè)務(wù)入?yún)ⅲ簕}", message);
JSONObject messageJsonObject = JSON.parseObject(message);
Integer type = messageJsonObject.getInteger("type");
if (type == 0) {
Object fileId = messageJsonObject.get("fileId");
Object userId = messageJsonObject.get("userId");
String filePath = String.format("./%s.txt", fileId);
messageJsonObject.put("path", filePath);
String fileKey = String.format("fileExport.%s.%s", userId, fileId);
// 查詢數(shù)據(jù)
List<String> dataList = Lists.newArrayList("張三", "歷史", "周三");
try(FileOutputStream fileOutputStream = new FileOutputStream(filePath)) {
for (String s : dataList) {
fileOutputStream.write(s.getBytes(StandardCharsets.UTF_8));
fileOutputStream.write("\n".getBytes());
}
redisUtil.setString(fileKey, messageJsonObject.toJSONString());
} catch (Exception e) {
logger.error("文件導(dǎo)出失敗", e);
}
}
測試
到這里,我們就可以簡單測試下了,我寫了一個頁面,兩個接口。
接口
@GetMapping("/file/{user_id}/export")
public JSONObject fileExport(@PathVariable("user_id") String userId,
@RequestParam String name) {
return fileService.export(userId, name);
}
@GetMapping("/file/{user_id}/download/{file_id}")
public JSONObject download(@PathVariable("user_id") String userId,
@PathVariable("file_id") String fileId,
HttpServletResponse response) {
return fileService.download(userId, fileId, response);
}
頁面
這里名稱隨便輸,數(shù)據(jù)是寫死的。導(dǎo)出請求提交成功后,會返回文件id,我把文件id展示在頁面上,點擊鏈接就可以下載

總結(jié)
今天的內(nèi)容從整體上來看,還是比較簡單的,主要是springboot已經(jīng)把好多配置工作搞好了,我們只需要簡單配置即可。但是過程還是有點艱辛的,官方?jīng)]有提供相關(guān)文檔,網(wǎng)上的教程我又不想?yún)⒖迹圆攘撕枚嗫樱ǖ臅r間也有點長,但是結(jié)局還是比較完美的,所有需求都實現(xiàn)了,而且還讓我積累了整合經(jīng)驗。但是在整合過程中,我發(fā)現(xiàn)對于ActiveMQ配置這塊,我還是比較迷,大部分的配置都不清楚,所以未來這塊還需要深入去研究下。
最后,希望有興趣的小伙伴最好自己動手實踐下,畢竟實踐出真知,眼睛會了,手不見得會……
今天分享內(nèi)容的源碼:
https://github.com/Syske/learning-dome-code/tree/dev/springboot-activemq-demo
- END -