<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          rocketmq+springboot實(shí)現(xiàn)分布式事務(wù)

          共 12730字,需瀏覽 26分鐘

           ·

          2021-06-07 08:19

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”

          優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

          1 、執(zhí)行流程

          (1) 發(fā)送方向 MQ 服務(wù)端發(fā)送消息。
          (2) MQ Server 將消息持久化成功之后,向發(fā)送方 ACK 確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息為半消息。
          (3) 發(fā)送方開始執(zhí)行本地事務(wù)邏輯。
          (4) 發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向 MQ Server 提交二次確認(rèn)(Commit 或是 Rollback),MQ Server 收到Commit 狀態(tài)則將半消息標(biāo)記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態(tài)則刪除半消息,訂閱方將不會(huì)接受該消息。
          (5) 在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認(rèn)最終未到達(dá) MQ Server,經(jīng)過固定時(shí)間后MQ Server 將對(duì)該消息發(fā)起消息回查。
          (6) 發(fā)送方收到消息回查后,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
          (7) 發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),MQ Server 仍按照步驟4對(duì)半消息進(jìn)行操作。

          2 、工程

          2.1 pom

          <parent>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-parent</artifactId>
                  <version>2.3.0.RELEASE</version>
                  <relativePath/> <!-- lookup parent from repository -->
              </parent>
              <properties>
                  <java.version>1.8</java.version>
              </properties>
              <dependencies>
                  <dependency>
                      <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-starter-web</artifactId>
                  </dependency>
                  <dependency>
                      <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-starter-test</artifactId>
                      <scope>test</scope>
                  </dependency>
                  <dependency>
                      <groupId>org.projectlombok</groupId>
                      <artifactId>lombok</artifactId>
                  </dependency>
                  <dependency>
                      <groupId>com.alibaba</groupId>
                      <artifactId>fastjson</artifactId>
                      <version>1.2.71</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.commons</groupId>
                      <artifactId>commons-collections4</artifactId>
                      <version>4.2</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.commons</groupId>
                      <artifactId>commons-lang3</artifactId>
                  </dependency>
                  <dependency>
                      <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-starter-logging</artifactId>
                  </dependency>
                  <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
                  <dependency>
                      <groupId>org.apache.rocketmq</groupId>
                      <artifactId>rocketmq-spring-boot-starter</artifactId>
                      <version>2.0.1</version>
                  </dependency>

                  <dependency>
                      <groupId>org.apache.rocketmq</groupId>
                      <artifactId>rocketmq-client</artifactId>
                      <version>4.3.2</version>
                  </dependency>
              </dependencies>
              <build>
                  <plugins>
                      <plugin>
                          <groupId>org.springframework.boot</groupId>
                          <artifactId>spring-boot-maven-plugin</artifactId>
                          <version>2.3.0.RELEASE</version>
                      </plugin>
                      <plugin>
                          <groupId>org.apache.maven.plugins</groupId>
                          <artifactId>maven-compiler-plugin</artifactId>
                          <version>3.8.1</version>
                          <configuration>
                              <source>1.8</source>
                              <target>1.8</target>
                          </configuration>
                      </plugin>
                  </plugins>
              </build>

          2.2 application.yml

          rocketmq:
            name-server: 192.168.38.50:9876
            producer:
              group: transcation-group


          2.3 TransactionListenerImpl

          @RocketMQTransactionListener(txProducerGroup = "transaction-producer-group")
          @Slf4j
          public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

              private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>();

              /**
               *  執(zhí)行業(yè)務(wù)邏輯
               */
              @Override
              public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
                  String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
                  try {
                      System.out.println("用戶A賬戶減500元.");
                      System.out.println("用戶B賬戶加500元.");
                      STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT);
                      return RocketMQLocalTransactionState.COMMIT;
                  } catch (Exception e) {
                      e.printStackTrace();
                  }

                  STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);
                  return RocketMQLocalTransactionState.UNKNOWN;

              }

              /**
               * 回查
               */
              @Override
              public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
                  String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
                  log.info("回查消息 -> transId ={} , state = {}", transId, STATE_MAP.get(transId));
                  return STATE_MAP.get(transId);
              }
          }


          2.4 SpringTransactionProducer

          @Component
          @Slf4j
          public class SpringTransactionProducer {

              @Autowired
              private RocketMQTemplate rocketMQTemplate;

              /**
               * 發(fā)送消息
               *
               */
              public void sendMsg(String topic, String msg) {
                  Message<String> message = MessageBuilder.withPayload(msg).build();
                  this.rocketMQTemplate.sendMessageInTransaction("transaction-producer-group", topic, message, null);
                  log.info("發(fā)送成功");
              }
          }

          2.5 SpringTxConsumer

          @Component
          @RocketMQMessageListener(topic = "pay_topic",
                  consumerGroup = "transaction-consumer-group",
                  selectorExpression = "*")
          @Slf4j
          public class SpringTxConsumer implements RocketMQListener<String> {

              @Override
              public void onMessage(String msg) {
                  log.info("接收到消息 -> {}", msg);
              }
          }

          2.6 ProducerController

          @RestController
          @RequestMapping("/producer")
          public class ProducerController {

              @Autowired
              private SpringTransactionProducer springTransactionProducer;

              @GetMapping("/sendMsg")
              public String sendMsg() {
                  springTransactionProducer.sendMsg("pay_topic""用戶A賬戶減500元,用戶B賬戶加500元。");
                  return "發(fā)送成功";
              }

          }


          2.7 RocketApplication

          @SpringBootApplication
          public class RocketApplication {

              public static void main(String[] args) {
                  SpringApplication.run(RocketApplication.class);
              }

          }


          3 、測(cè)試

          3.1 正常消費(fèi)測(cè)試

          描述: 正常啟動(dòng)及可。

          3.2 回查代碼測(cè)試

          描述: 執(zhí)行本地事務(wù)時(shí)添加異常,重啟測(cè)試,發(fā)現(xiàn)消費(fèi)者沒有收到消息。



          版權(quán)聲明:本文為博主原創(chuàng)文章,遵循 CC 4.0 BY-SA 版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接和本聲明。

          本文鏈接:

          https://blog.csdn.net/qq_34125999/article/details/117339245









          瀏覽 64
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美成人毛片 | 五月婷婷网国产区 | 亚洲无码天堂在线视频 | 经典国产三级在线 | 久草大香蕉视频在线 |