<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RocketMQ進(jìn)階 - 事務(wù)消息

          共 7291字,需瀏覽 15分鐘

           ·

          2020-08-20 08:17


          分布式消息選型的時(shí)候是否支持事務(wù)消息是一個(gè)很重要的考量點(diǎn),而目前只有RocketMQ對(duì)事務(wù)消息支持的最好。今天我們來(lái)嘮嘮如何實(shí)現(xiàn)RocketMQ的事務(wù)消息!

          Apache RocketMQ在4.3.0版中已經(jīng)支持分布式事務(wù)消息,這里RocketMQ采用了2PC的思想來(lái)實(shí)現(xiàn)了提交事務(wù)消息,同時(shí)增加一個(gè)補(bǔ)償邏輯來(lái)處理二階段超時(shí)或者失敗的消息,如下圖所示。

          RocketMQ事務(wù)流程概要

          RocketMQ實(shí)現(xiàn)事務(wù)消息主要分為兩個(gè)階段:正常事務(wù)的發(fā)送及提交、事務(wù)信息的補(bǔ)償流程 整體流程為:

          • 正常事務(wù)發(fā)送與提交階段
          1. 生產(chǎn)者發(fā)送一個(gè)半消息給MQServer(半消息是指消費(fèi)者暫時(shí)不能消費(fèi)的消息)
          2. 服務(wù)端響應(yīng)消息寫(xiě)入結(jié)果,半消息發(fā)送成功
          3. 開(kāi)始執(zhí)行本地事務(wù)
          4. 根據(jù)本地事務(wù)的執(zhí)行狀態(tài)執(zhí)行Commit或者Rollback操作
          • 事務(wù)信息的補(bǔ)償流程
          1. 如果MQServer長(zhǎng)時(shí)間沒(méi)收到本地事務(wù)的執(zhí)行狀態(tài)會(huì)向生產(chǎn)者發(fā)起一個(gè)確認(rèn)回查的操作請(qǐng)求
          2. 生產(chǎn)者收到確認(rèn)回查請(qǐng)求后,檢查本地事務(wù)的執(zhí)行狀態(tài)
          3. 根據(jù)檢查后的結(jié)果執(zhí)行Commit或者Rollback操作 補(bǔ)償階段主要是用于解決生產(chǎn)者在發(fā)送Commit或者Rollback操作時(shí)發(fā)生超時(shí)或失敗的情況。

          RocketMQ事務(wù)流程關(guān)鍵

          1. 事務(wù)消息在一階段對(duì)用戶(hù)不可見(jiàn) 事務(wù)消息相對(duì)普通消息最大的特點(diǎn)就是一階段發(fā)送的消息對(duì)用戶(hù)是不可見(jiàn)的,也就是說(shuō)消費(fèi)者不能直接消費(fèi)。這里RocketMQ的實(shí)現(xiàn)方法是原消息的主題與消息消費(fèi)隊(duì)列,然后把主題改成 RMQ_SYS_TRANS_HALF_TOPIC ,這樣由于消費(fèi)者沒(méi)有訂閱這個(gè)主題,所以不會(huì)被消費(fèi)。
          2. 如何處理第二階段的失敗消息?在本地事務(wù)執(zhí)行完成后會(huì)向MQServer發(fā)送Commit或Rollback操作,此時(shí)如果在發(fā)送消息的時(shí)候生產(chǎn)者出故障了,那么要保證這條消息最終被消費(fèi),MQServer會(huì)像服務(wù)端發(fā)送回查請(qǐng)求,確認(rèn)本地事務(wù)的執(zhí)行狀態(tài)。當(dāng)然了rocketmq并不會(huì)無(wú)休止的的信息事務(wù)狀態(tài)回查,默認(rèn)回查15次,如果15次回查還是無(wú)法得知事務(wù)狀態(tài),RocketMQ默認(rèn)回滾該消息。
          3. 消息狀態(tài) 事務(wù)消息有三種狀態(tài):
          • TransactionStatus.CommitTransaction:提交事務(wù)消息,消費(fèi)者可以消費(fèi)此消息
          • TransactionStatus.RollbackTransaction:回滾事務(wù),它代表該消息將被刪除,不允許被消費(fèi)。
          • TransactionStatus.Unknown :中間狀態(tài),它代表需要檢查消息隊(duì)列來(lái)確定狀態(tài)。

          代碼實(shí)現(xiàn)

          首先假設(shè)我們有這樣一個(gè)需求:

          用戶(hù)請(qǐng)求訂單微服務(wù) order-service 接口刪除訂單(退貨),刪除訂單后需要發(fā)送消息給用戶(hù)服務(wù) account-service,用戶(hù)微服務(wù)收到消息后會(huì)給用戶(hù)賬戶(hù)增加余額。

          這個(gè)需求跟錢(qián)相關(guān),肯定要保證消息的事務(wù)性,接下來(lái)我們根據(jù)上面的原理實(shí)現(xiàn)整個(gè)流程。

          基礎(chǔ)配置

          生產(chǎn)者order-servcie和account-service都要引入RocketMQ相關(guān)依賴(lài),增加RocketMQ的相關(guān)配置

          • 引入組件
          <dependency>
          ?<groupId>org.apache.rocketmqgroupId>
          ?<artifactId>rocketmq-spring-boot-starterartifactId>
          dependency>
          • 添加配置
          #?within?rocketmq
          rocketmq:
          ??name-server:?xxx.xx.x.xx:9876;?xxx.xx.x.xx:9876
          ??producer:
          ????group:?cloud-group

          發(fā)送半消息

          order-service在執(zhí)行刪除訂單操作時(shí)發(fā)送一條半消息給MQServer,發(fā)送半消息主要是使用 rocketMQTemplate.sendMessageInTransaction() 方法,發(fā)送事務(wù)消息。

          @Override
          public?void?delete(String?orderNo)?{
          ?Order?order?=?orderMapper.selectByNo(orderNo);
          ?//如果訂單存在且狀態(tài)為有效,進(jìn)行業(yè)務(wù)處理
          ?if?(order?!=?null?&&?CloudConstant.VALID_STATUS.equals(order.getStatus()))?{
          ??String?transactionId?=?UUID.randomUUID().toString();
          ??//如果可以刪除訂單則發(fā)送消息給rocketmq,讓用戶(hù)中心消費(fèi)消息
          ??rocketMQTemplate.sendMessageInTransaction("add-amount",
          ????MessageBuilder.withPayload(
          ??????UserAddMoneyDTO.builder()
          ????????.userCode(order.getAccountCode())
          ????????.amount(order.getAmount())
          ????????.build()
          ????)
          ????.setHeader(RocketMQHeaders.TRANSACTION_ID,?transactionId)
          ????.setHeader("order_id",order.getId())
          ????.build()
          ????,order
          ??);
          ?}
          }

          首先先校驗(yàn)一下訂單狀態(tài),然后發(fā)送消息給MQServer,這個(gè)邏輯大家都看得懂,主要是關(guān)注 sendMessageInTransaction() 方法,源碼如下:

          public?TransactionSendResult?sendMessageInTransaction(String?destination,?Message?message,?Object?arg)?throws?MessagingException?{
          ?try?{
          ??if?(((TransactionMQProducer)this.producer).getTransactionListener()?==?null)?{
          ???throw?new?IllegalStateException("The?rocketMQTemplate?does?not?exist?TransactionListener");
          ??}?else?{
          ???org.apache.rocketmq.common.message.Message?rocketMsg?=?this.createRocketMqMessage(destination,?message);
          ???return?this.producer.sendMessageInTransaction(rocketMsg,?arg);
          ??}
          ?}?catch?(MQClientException?var5)?{
          ??throw?RocketMQUtil.convert(var5);
          ?}
          }

          該方法有三個(gè)參數(shù):

          • destination:目的地(主題),這里發(fā)送給 add-amount 這個(gè)主題
          • message:發(fā)送給消費(fèi)者的消息體,需要使用 MessageBuilder.withPayload() 來(lái)構(gòu)建消息
          • arg:參數(shù)

          注意,這里我們生成了一個(gè)transactionId,并放在header中跟消息一起發(fā)送(這里實(shí)際也可以構(gòu)造成一個(gè)對(duì)象,放在arg里進(jìn)行發(fā)送),作用后面再講!

          執(zhí)行本地事務(wù)與回查

          MQServer收到半消息后會(huì)告訴生產(chǎn)者order-service確認(rèn)收到半消息,這時(shí)候order-service需要執(zhí)行本地事務(wù),執(zhí)行完本地事務(wù)后再告訴MQServer本地事務(wù)的執(zhí)行狀態(tài),確認(rèn)消息究竟是Commit還是Rollback。如果在告訴MQServer本地執(zhí)行狀態(tài)的時(shí)候出異常了還需要讓MQServer能夠回查到,怎么實(shí)現(xiàn)這一些列操作呢?

          RocketMQ提供了 RocketMQLocalTransactionListener 接口,本地事務(wù)監(jiān)聽(tīng)器,這個(gè)接口類(lèi)的實(shí)現(xiàn)如下:

          第一個(gè)方法 executeLocalTransaction 為執(zhí)行本地事務(wù);第二個(gè)方法 checkLocalTransaction 為檢查本地事務(wù)的執(zhí)行狀態(tài),也就是回查動(dòng)作。有了這個(gè)接口類(lèi)我們的執(zhí)行邏輯清楚了,但是還有個(gè)問(wèn)題:本地事務(wù)已經(jīng)執(zhí)行完成了,怎么去回查本地事務(wù)的執(zhí)行結(jié)果呢?

          我們可以在執(zhí)行本地事務(wù)的時(shí)候同時(shí)生成一個(gè)事務(wù)日志,讓本地事務(wù)與日志事務(wù)在同一個(gè)方法中,同時(shí)添加 @Transactional 注解,保證兩個(gè)操作事務(wù)是一個(gè)原子操作。這樣如果事務(wù)日志表中有這個(gè)本地事務(wù)的信息,那就代表本地事務(wù)執(zhí)行成功,需要Commit,相反如果沒(méi)有對(duì)應(yīng)的事務(wù)日志,則表示沒(méi)執(zhí)行成功,需要Rollback

          思路既然理順了,咱們就開(kāi)擼。

          • 首先創(chuàng)建一個(gè)日志表很簡(jiǎn)單的三個(gè)字段,主要是這個(gè)事務(wù)id,需要根據(jù)這個(gè)事務(wù)id回查事務(wù),還記得我們?cè)诎l(fā)送半消息時(shí)生成的事務(wù)id嗎,就是干這個(gè)用的!
          • 在生產(chǎn)者編寫(xiě)方法實(shí)現(xiàn) RocketMQLocalTransactionListener
          @Slf4j
          @RocketMQTransactionListener
          @RequiredArgsConstructor(onConstructor?=?@__(@Autowired))
          public?class?AddUserAmountListener?implements?RocketMQLocalTransactionListener?{
          ????private?final?OrderService?orderService;
          ????private?final?RocketMqTransactionLogMapper?rocketMqTransactionLogMapper;
          ????/**
          ?????*?執(zhí)行本地事務(wù)
          ?????*/

          ????@Override
          ????public?RocketMQLocalTransactionState?executeLocalTransaction(Message?message,?Object?arg)?{
          ????????log.info("執(zhí)行本地事務(wù)");
          ????????MessageHeaders?headers?=?message.getHeaders();
          ????????//獲取事務(wù)ID
          ????????String?transactionId?=?(String)?headers.get(RocketMQHeaders.TRANSACTION_ID);
          ????????Integer?orderId?=?Integer.valueOf((String)headers.get("order_id"));
          ????????log.info("transactionId?is?{},?orderId?is?{}",transactionId,orderId);

          ????????try{
          ????????????//執(zhí)行本地事務(wù),并記錄日志
          ????????????orderService.changeStatuswithRocketMqLog(orderId,?CloudConstant.INVALID_STATUS,transactionId);
          ????????????//執(zhí)行成功,可以提交事務(wù)
          ????????????return?RocketMQLocalTransactionState.COMMIT;
          ????????}catch?(Exception?e){
          ????????????return?RocketMQLocalTransactionState.ROLLBACK;
          ????????}
          ????}

          ????/**
          ?????*?本地事務(wù)的檢查,檢查本地事務(wù)是否成功
          ?????*/

          ????@Override
          ????public?RocketMQLocalTransactionState?checkLocalTransaction(Message?message)?{

          ????????MessageHeaders?headers?=?message.getHeaders();
          ????????//獲取事務(wù)ID
          ????????String?transactionId?=?(String)?headers.get(RocketMQHeaders.TRANSACTION_ID);
          ????????log.info("檢查本地事務(wù),事務(wù)ID:{}",transactionId);
          ????????//根據(jù)事務(wù)id從日志表檢索
          ????????QueryWrapper?queryWrapper?=?new?QueryWrapper<>();
          ????????queryWrapper.eq("transaction_id",transactionId);
          ????????RocketmqTransactionLog?rocketmqTransactionLog?=?rocketMqTransactionLogMapper.selectOne(queryWrapper);
          ????????if(null?!=?rocketmqTransactionLog){
          ????????????return?RocketMQLocalTransactionState.COMMIT;
          ????????}
          ????????return?RocketMQLocalTransactionState.ROLLBACK;
          ????}
          }
          • 執(zhí)行本地事務(wù)的方法
          @Transactional(rollbackFor?=?RuntimeException.class)
          @Override
          public?void?changeStatuswithRocketMqLog(Integer?id,String?status,String?transactionId){
          ????//將訂單狀態(tài)置位無(wú)效
          ?orderMapper.changeStatus(id,status);
          ????//插入事務(wù)表
          ?rocketMqTransactionLogMapper.insert(
          ???RocketmqTransactionLog.builder()
          ?????.transactionId(transactionId)
          ?????.log("執(zhí)行刪除訂單操作")
          ???.build()
          ?);
          }

          這一塊的代碼邏輯都是在生產(chǎn)端,即Order-Server,大家不要搞錯(cuò)了

          消費(fèi)消息

          Rollback的消息MQServer會(huì)給我們處理,我們只要關(guān)注Commit狀態(tài)時(shí)消費(fèi)端可以正常消費(fèi)即可。在 account-service監(jiān)聽(tīng)消息,如果收到消息則給用戶(hù)賬戶(hù)增加余額。

          @Slf4j
          @Service
          @RocketMQMessageListener(topic?=?"add-amount",consumerGroup?=?"cloud-group")
          @RequiredArgsConstructor(onConstructor?=?@__(@Autowired)?)
          public?class?AddUserAmountListener?implements?RocketMQListener<UserAddMoneyDTO>?{
          ????private?final?AccountMapper?accountMapper;
          ????/**
          ?????*?收到消息的業(yè)務(wù)邏輯
          ?????*/

          ????@Override
          ????public?void?onMessage(UserAddMoneyDTO?userAddMoneyDTO)?{
          ????????log.info("received?message:?{}",userAddMoneyDTO);
          ????????accountMapper.increaseAmount(userAddMoneyDTO.getUserCode(),userAddMoneyDTO.getAmount());
          ????????log.info("add?money?success");
          ????}
          }

          測(cè)試

          訂單表有這樣一條記錄,用戶(hù)為jianzh5,amount為200

          用戶(hù)表的記錄,執(zhí)行完成后jianzh5的賬戶(hù)應(yīng)該變成250

          • 調(diào)用刪除訂單接口,刪除訂單
          • 發(fā)送半消息
          • 執(zhí)行本地事務(wù),并生成事務(wù)日志
          • 模擬異常情況 在發(fā)送Commit消息的時(shí)候我們用命令殺掉進(jìn)程 taskkill /pid 19748 -t -f,模擬異常!
          • 重新啟動(dòng)order-service,查看是否會(huì)執(zhí)行回查動(dòng)作MQServer進(jìn)行回查,檢查事務(wù)日志,判斷是否可以提交事務(wù)
          • 消費(fèi)者消費(fèi)事務(wù)消息,保證事務(wù)的一致性

          小結(jié)

          使用RocketMQ實(shí)現(xiàn)事務(wù)消息的過(guò)程還是很復(fù)雜的,需要好好理解開(kāi)頭的那張圖,只有理解了事務(wù)消息的交互過(guò)程才能編寫(xiě)相應(yīng)的代碼!



          如果本文對(duì)你有幫助,
          別忘記給我個(gè)三連:
          點(diǎn)贊,轉(zhuǎn)發(fā),評(píng)論
          咱們下期見(jiàn)!

          收藏?等于白嫖點(diǎn)贊?才是真情!


          1.?人人都能看懂的 6 種限流實(shí)現(xiàn)方案!

          2.?一個(gè)空格引發(fā)的“慘案“

          3.?大型網(wǎng)站架構(gòu)演化發(fā)展歷程

          4.?Java語(yǔ)言“坑爹”排行榜TOP 10

          5. 我是一個(gè)Java類(lèi)(附帶精彩吐槽)

          6. 看完這篇Redis緩存三大問(wèn)題,保你能和面試官互扯

          7. 程序員必知的 89 個(gè)操作系統(tǒng)核心概念

          8. 深入理解 MySQL:快速學(xué)會(huì)分析SQL執(zhí)行效率

          9. API 接口設(shè)計(jì)規(guī)范

          10. Spring Boot 面試,一個(gè)問(wèn)題就干趴下了!



          掃碼二維碼關(guān)注我


          ·end·

          —如果本文有幫助,請(qǐng)分享到朋友圈吧—

          我們一起愉快的玩耍!



          你點(diǎn)的每個(gè)贊,我都認(rèn)真當(dāng)成了喜歡

          瀏覽 43
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲小说区图片区都市 | 精品亚洲韩国 | 99在线热播 | 欧美国产三级一区小说 | 日本亚洲欧洲在线观看 |