<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          springboot整合ActiveMQ實現(xiàn)異步交易了解下?

          共 13415字,需瀏覽 27分鐘

           ·

          2021-05-28 14:04

          首先說明下,今天的內(nèi)容是實打?qū)嵉母韶洠液芏喙揪€上環(huán)境就是這么搞的

          前言

          前段時間,我們分享了ActiveMQ的一些基本知識,介紹了ActiveMQ的簡單部署和基本用法,演示了java環(huán)境使用ActiveMQ收發(fā)消息的簡單操作,但當時只講了ActiveMQdemo也不是web項目,距離我們實際應(yīng)用確實也比較遠,為了讓各位小伙伴更夠更直觀地了解ActiveMQ的實際應(yīng)用場景,體會到異步交易的魅力,今天我們通過一個小小的demo,來看下springbootActiveMQ的整合應(yīng)用。

          今天的核心知識點就兩個:

          • Springboot異步交易
          • springboot整合ActiveMQ

          好了,話不多說,我們直接開始。

          正文

          我們的內(nèi)容,是以文件異步導(dǎo)出業(yè)務(wù)為例寫的一些業(yè)務(wù)代碼。我先簡單說下業(yè)務(wù)處理過程,第一步,用戶發(fā)起文件導(dǎo)出請求,后端接收到前端請求后,驗證請求參數(shù),并發(fā)起異步文件導(dǎo)出交易,交易發(fā)起成功后返回結(jié)果。

          第二步,導(dǎo)出成功后,用戶可以在文件下載中心進行下載。

          為了演示方便,我把所有數(shù)據(jù)都存放在reids里面了,一般實際項目中會把文件信息存放在數(shù)據(jù)庫中,處理成功后才會放進緩存。項目的完整源碼附在文末,有興趣的小伙伴自己去看。

          啟用JMS

          創(chuàng)建項目,我們這里就不介紹,到今天還不會搭建springboot開發(fā)環(huán)境,確實該面壁思過了。項目創(chuàng)建完成后,在springboot入口加上如下配置啟用jms(java message servic):

          @EnableJms

          引入依賴

          除了spring-boot-starter-web,這里我們還需要引入如下依賴:

          <dependency>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-starter-activemq</artifactId>
          </dependency>

          <dependency>
              <groupId>org.apache.activemq</groupId>
              <artifactId>activemq-pool</artifactId>
              <version>5.15.10</version>
          </dependency>
          <!-- 在2.X版本,spring.activemq.pool.enabled=true時,需依賴該jar -->
          <dependency>
              <groupId>org.messaginghub</groupId>
              <artifactId>pooled-jms</artifactId>
              <version>1.0.3</version>
          </dependency>

          這里簡單說明下,第一個依賴是activemqstarter,是activemq組件的核心依賴,所有的組件都是基于他展開的;

          第二個依賴是activemq的連接池,類似于數(shù)據(jù)庫連接池;

          第三個依賴是activemq自動配置類依賴的包。

          后面兩個依賴是可選的,如果你啟用了activeMQ連接池(spring.activemq.pool.enabled=true時),那你就必須依賴,沒有依賴的話,sprinbgoot啟動會報錯:

          主要原因是activemq的自動配置時依賴了這個包,沒有這個包Jms的連接工廠是無法被初始化的:

          有興趣的小伙伴可以自己把這個依賴先拿掉試下。

          添加配置

          完成上面的工作,我們要啟動本地的ActvieMQ服務(wù),然后添加ActvieMQ配置信息:

          spring.activemq.broker-url=tcp://127.0.0.1:61616
          spring.activemq.pool.enabled=true
          spring.activemq.pool.max-connections=100

          如果不需要連接池,后面兩個配置可以直接拿掉。

          消息發(fā)送接口

          發(fā)送接口 就是消息的生產(chǎn)者,springboot提供了消息的模板類(JmsMessagingTemplate),我們可以通過Autowired注入使用:

          @Service
          public class JmsSendService {
              @Autowired
              private JmsMessagingTemplate jmsTemplate;

              public void sendMessage(String queueName, String message) {
                  jmsTemplate.convertAndSend(queueName, message);
              }
          }

          ActiveMQ支持有返回值和無返回值兩種會話形式,你可以根據(jù)自己的需要選擇,JmsMessagingTemplate都是支持的,提供的模板方法也比較豐富:

          這里我們只用到了convertAndSend,字面意思就是方法的意思,object是消息內(nèi)容,destination是消息隊列名稱,看下源碼你就知道,方法內(nèi)部會把我們的消息內(nèi)容轉(zhuǎn)換成Message對象,當然如果你有特殊需求,你也可以自己組裝Message,只是過程比較繁瑣,簡單業(yè)務(wù)的話,用我這種方式就比較簡便了。

          如果你需要接收返回值,那你可以調(diào)用sendAndReceive(Message<T> var)接口來實現(xiàn),但是需要你自己定義自己的Message<T>,需要實現(xiàn)Message<T>接口。

          class StringMessage implements Message<String{

              private String payload;
              private MessageHeaders messageHeaders;

              public StringMessage(String payload) {
                  this.payload = payload;
              }

              @Override
              public String getPayload() {
                  return this.payload;
              }

              @Override
              public MessageHeaders getHeaders() {
                  return this.messageHeaders;
              }
          }

          調(diào)用sendAndReceive

          public String sendAndReceive(String queueName, String message) {
                  Message<?> messageBack = jmsTemplate.sendAndReceive(queueName, new StringMessage(message));
                  return (String)messageBack.getPayload();
              }

          JmsMessagingTemplate其實就是springboot抽象出來的一個通用的消息發(fā)送模板,它理論上是可以支持所有mq的,只需要官方提供starter即可,對開發(fā)者來說,確實比較友好,只需要修改配置,剩下的就不用管了,很方便有木有。

          這里是servicce層的實現(xiàn)過程:

          /**
               * 文件導(dǎo)出
               * @param name
               * @param userId
               * @return
               */

          public JSONObject export(String userId, String name) {

              JSONObject result = new JSONObject();
              result.put("userId", userId);
              result.put("type"0);
              String uuId = UUIDUtil.getUUId();
              result.put("fileId", uuId);
              result.put("name", name);
              // 異步導(dǎo)出文件
              doExport(result);
              result.put("success"true);
              result.put("code"0);
              result.put("message""數(shù)據(jù)導(dǎo)出提交成功,請稍后到文件中心下載!");
              return result;
          }

          springboot異步交易

          導(dǎo)出文件方法doExport內(nèi)部,我們使用了多線程異步交易,這樣的好處是把業(yè)務(wù)邏輯都放進異步交易中處理,可以將響應(yīng)結(jié)果更快地呈現(xiàn)給用戶,讓接口響應(yīng)更快。這里我們插個樓,講一些springboot異步線程池的用法。

          啟用異步交易

          springboot啟動異步交易很簡單,只需要在項目入口加上@EnableAsync即可

          添加異步線程池配置

          配置線程池大小

          @Configuration
          public class ExcuterConfig {
              @Bean
              public TaskExecutor taskExecutor() {
                  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
                  executor.setCorePoolSize(10);
                  executor.setMaxPoolSize(150);
                  executor.setQueueCapacity(500);
                  return executor;
              }
          }

          使用線程池

          這里的taskExecutor就是我們前面配置的方法名。這里的異步線程和mq的異步交易是不一樣的。線程池大小是固定的,當所有線程被阻塞,線程池隊列也被占滿,有新的交易進來時,線程池會因為資源耗盡報錯,這時候后續(xù)業(yè)務(wù)是無法正常處理的;但是mq基本上是不存在阻塞資源耗盡的情況的(除非資源耗盡),特別是對于不需要有返回指定的交易,它只是一個消息倉庫,只要消息不被消費,消息是可以一直存在的,也不會超時。

          @Async("taskExecutor")
              void doExport(JSONObject jsonObject) {
                  try {
                      String name = jsonObject.getString("name");
                      if (StringUtils.hasLength(name)) {
                          String userId = jsonObject.getString("userId");
                          String uuId = jsonObject.getString("fileId");
                          // 其他數(shù)據(jù)校驗,這里通過睡眠模擬
                          Thread.sleep(1000L);
                          // 組裝保存文件信息
                          jsonObject.put("type"0);
                          jsonObject.put("isDownload"false);
                          jsonObject.put("createTime", System.currentTimeMillis());
                          // 保存文件數(shù)據(jù),實際業(yè)務(wù)中,這部分應(yīng)該是存在數(shù)據(jù)庫里的,這里為了演示方便,直接存在數(shù)據(jù)庫里了
                          redisUtil.setString(String.format("fileExport.%s.%s", userId, uuId), jsonObject.toJSONString());
                          // 發(fā)送文件導(dǎo)出業(yè)務(wù)消息
                          jmsSendService.sendMessage("file-export-queue", jsonObject.toJSONString());
                      }
                  } catch (Exception e) {
                      logger.error("數(shù)據(jù)導(dǎo)出錯誤", e);
                  }
              }

          消息接收消費

          這里主要是通過@JmsListener創(chuàng)建了一個消息監(jiān)聽器,監(jiān)聽ActiveMQ指定隊列的狀態(tài),當有新的消息進來時,該方法會被執(zhí)行。方法內(nèi)部是我們要異步業(yè)務(wù)處理過程。針對不同的業(yè)務(wù)類別,你可以指定不同的隊列名稱,但是同一個業(yè)務(wù)的發(fā)送方和消費者必須是相同的隊列名稱,否則是無法被消費的。

           @JmsListener(destination = "file-export-queue",  containerFactory = "jmsListenerContainerFactory")
              public void testMq(String message) {
                  logger.info("文件導(dǎo)出業(yè)務(wù)入?yún)ⅲ簕}", message);
                  JSONObject messageJsonObject = JSON.parseObject(message);
                  Integer type = messageJsonObject.getInteger("type");
                  if (type == 0) {
                      Object fileId = messageJsonObject.get("fileId");
                      Object userId = messageJsonObject.get("userId");
                      String filePath = String.format("./%s.txt", fileId);
                      messageJsonObject.put("path", filePath);
                      String fileKey = String.format("fileExport.%s.%s", userId, fileId);
                      // 查詢數(shù)據(jù)
                      List<String> dataList = Lists.newArrayList("張三""歷史""周三");
                      try(FileOutputStream fileOutputStream = new FileOutputStream(filePath)) {
                          for (String s : dataList) {
                              fileOutputStream.write(s.getBytes(StandardCharsets.UTF_8));
                              fileOutputStream.write("\n".getBytes());
                          }
                          redisUtil.setString(fileKey, messageJsonObject.toJSONString());
                      } catch (Exception e) {
                          logger.error("文件導(dǎo)出失敗", e);
                      }
                  }

          測試

          到這里,我們就可以簡單測試下了,我寫了一個頁面,兩個接口。

          接口

          @GetMapping("/file/{user_id}/export")
              public JSONObject fileExport(@PathVariable("user_id") String userId,
                                           @RequestParam String name) 
          {
                  return fileService.export(userId, name);
              }

              @GetMapping("/file/{user_id}/download/{file_id}")
              public JSONObject download(@PathVariable("user_id") String userId,
                                         @PathVariable("file_id") String fileId,
                                         HttpServletResponse response) 
          {
                  return fileService.download(userId, fileId, response);
              }

          頁面

          這里名稱隨便輸,數(shù)據(jù)是寫死的。導(dǎo)出請求提交成功后,會返回文件id,我把文件id展示在頁面上,點擊鏈接就可以下載

          總結(jié)

          今天的內(nèi)容從整體上來看,還是比較簡單的,主要是springboot已經(jīng)把好多配置工作搞好了,我們只需要簡單配置即可。但是過程還是有點艱辛的,官方?jīng)]有提供相關(guān)文檔,網(wǎng)上的教程我又不想?yún)⒖迹圆攘撕枚嗫樱ǖ臅r間也有點長,但是結(jié)局還是比較完美的,所有需求都實現(xiàn)了,而且還讓我積累了整合經(jīng)驗。但是在整合過程中,我發(fā)現(xiàn)對于ActiveMQ配置這塊,我還是比較迷,大部分的配置都不清楚,所以未來這塊還需要深入去研究下。

          最后,希望有興趣的小伙伴最好自己動手實踐下,畢竟實踐出真知,眼睛會了,手不見得會……

          今天分享內(nèi)容的源碼:

          https://github.com/Syske/learning-dome-code/tree/dev/springboot-activemq-demo
          - END -


          瀏覽 52
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日日夜夜精品视品 | 亚洲AV无码精品 | 欧美亚在线| 日本成人大香蕉视频在线观看 | 9·1成长视频蘑菇视频大全 |