RocketMQ進(jìn)階 - 事務(wù)消息

分布式消息選型的時(shí)候是否支持事務(wù)消息是一個(gè)很重要的考量點(diǎn),而目前只有RocketMQ對(duì)事務(wù)消息支持的最好。今天我們來(lái)嘮嘮如何實(shí)現(xiàn)RocketMQ的事務(wù)消息!
Apache RocketMQ在4.3.0版中已經(jīng)支持分布式事務(wù)消息,這里RocketMQ采用了2PC的思想來(lái)實(shí)現(xiàn)了提交事務(wù)消息,同時(shí)增加一個(gè)補(bǔ)償邏輯來(lái)處理二階段超時(shí)或者失敗的消息,如下圖所示。
RocketMQ事務(wù)流程概要
RocketMQ實(shí)現(xiàn)事務(wù)消息主要分為兩個(gè)階段:正常事務(wù)的發(fā)送及提交、事務(wù)信息的補(bǔ)償流程 整體流程為:
正常事務(wù)發(fā)送與提交階段
生產(chǎn)者發(fā)送一個(gè)半消息給MQServer(半消息是指消費(fèi)者暫時(shí)不能消費(fèi)的消息) 服務(wù)端響應(yīng)消息寫(xiě)入結(jié)果,半消息發(fā)送成功 開(kāi)始執(zhí)行本地事務(wù) 根據(jù)本地事務(wù)的執(zhí)行狀態(tài)執(zhí)行Commit或者Rollback操作
事務(wù)信息的補(bǔ)償流程
如果MQServer長(zhǎng)時(shí)間沒(méi)收到本地事務(wù)的執(zhí)行狀態(tài)會(huì)向生產(chǎn)者發(fā)起一個(gè)確認(rèn)回查的操作請(qǐng)求 生產(chǎn)者收到確認(rèn)回查請(qǐng)求后,檢查本地事務(wù)的執(zhí)行狀態(tài) 根據(jù)檢查后的結(jié)果執(zhí)行Commit或者Rollback操作 補(bǔ)償階段主要是用于解決生產(chǎn)者在發(fā)送Commit或者Rollback操作時(shí)發(fā)生超時(shí)或失敗的情況。
RocketMQ事務(wù)流程關(guān)鍵
事務(wù)消息在一階段對(duì)用戶(hù)不可見(jiàn) 事務(wù)消息相對(duì)普通消息最大的特點(diǎn)就是一階段發(fā)送的消息對(duì)用戶(hù)是不可見(jiàn)的,也就是說(shuō)消費(fèi)者不能直接消費(fèi)。這里RocketMQ的實(shí)現(xiàn)方法是原消息的主題與消息消費(fèi)隊(duì)列,然后把主題改成 RMQ_SYS_TRANS_HALF_TOPIC,這樣由于消費(fèi)者沒(méi)有訂閱這個(gè)主題,所以不會(huì)被消費(fèi)。如何處理第二階段的失敗消息?在本地事務(wù)執(zhí)行完成后會(huì)向MQServer發(fā)送Commit或Rollback操作,此時(shí)如果在發(fā)送消息的時(shí)候生產(chǎn)者出故障了,那么要保證這條消息最終被消費(fèi),MQServer會(huì)像服務(wù)端發(fā)送回查請(qǐng)求,確認(rèn)本地事務(wù)的執(zhí)行狀態(tài)。當(dāng)然了rocketmq并不會(huì)無(wú)休止的的信息事務(wù)狀態(tài)回查,默認(rèn)回查15次,如果15次回查還是無(wú)法得知事務(wù)狀態(tài),RocketMQ默認(rèn)回滾該消息。 消息狀態(tài) 事務(wù)消息有三種狀態(tài):
TransactionStatus.CommitTransaction:提交事務(wù)消息,消費(fèi)者可以消費(fèi)此消息 TransactionStatus.RollbackTransaction:回滾事務(wù),它代表該消息將被刪除,不允許被消費(fèi)。 TransactionStatus.Unknown :中間狀態(tài),它代表需要檢查消息隊(duì)列來(lái)確定狀態(tài)。
代碼實(shí)現(xiàn)
首先假設(shè)我們有這樣一個(gè)需求:
用戶(hù)請(qǐng)求訂單微服務(wù) order-service 接口刪除訂單(退貨),刪除訂單后需要發(fā)送消息給用戶(hù)服務(wù) account-service,用戶(hù)微服務(wù)收到消息后會(huì)給用戶(hù)賬戶(hù)增加余額。
這個(gè)需求跟錢(qián)相關(guān),肯定要保證消息的事務(wù)性,接下來(lái)我們根據(jù)上面的原理實(shí)現(xiàn)整個(gè)流程。
基礎(chǔ)配置
生產(chǎn)者order-servcie和account-service都要引入RocketMQ相關(guān)依賴(lài),增加RocketMQ的相關(guān)配置
引入組件
<dependency>
?<groupId>org.apache.rocketmqgroupId>
?<artifactId>rocketmq-spring-boot-starterartifactId>
dependency>
添加配置
#?within?rocketmq
rocketmq:
??name-server:?xxx.xx.x.xx:9876;?xxx.xx.x.xx:9876
??producer:
????group:?cloud-group
發(fā)送半消息
order-service在執(zhí)行刪除訂單操作時(shí)發(fā)送一條半消息給MQServer,發(fā)送半消息主要是使用 rocketMQTemplate.sendMessageInTransaction() 方法,發(fā)送事務(wù)消息。
@Override
public?void?delete(String?orderNo)?{
?Order?order?=?orderMapper.selectByNo(orderNo);
?//如果訂單存在且狀態(tài)為有效,進(jìn)行業(yè)務(wù)處理
?if?(order?!=?null?&&?CloudConstant.VALID_STATUS.equals(order.getStatus()))?{
??String?transactionId?=?UUID.randomUUID().toString();
??//如果可以刪除訂單則發(fā)送消息給rocketmq,讓用戶(hù)中心消費(fèi)消息
??rocketMQTemplate.sendMessageInTransaction("add-amount",
????MessageBuilder.withPayload(
??????UserAddMoneyDTO.builder()
????????.userCode(order.getAccountCode())
????????.amount(order.getAmount())
????????.build()
????)
????.setHeader(RocketMQHeaders.TRANSACTION_ID,?transactionId)
????.setHeader("order_id",order.getId())
????.build()
????,order
??);
?}
}
首先先校驗(yàn)一下訂單狀態(tài),然后發(fā)送消息給MQServer,這個(gè)邏輯大家都看得懂,主要是關(guān)注 sendMessageInTransaction() 方法,源碼如下:
public?TransactionSendResult?sendMessageInTransaction(String?destination,?Message>?message,?Object?arg)?throws?MessagingException?{
?try?{
??if?(((TransactionMQProducer)this.producer).getTransactionListener()?==?null)?{
???throw?new?IllegalStateException("The?rocketMQTemplate?does?not?exist?TransactionListener");
??}?else?{
???org.apache.rocketmq.common.message.Message?rocketMsg?=?this.createRocketMqMessage(destination,?message);
???return?this.producer.sendMessageInTransaction(rocketMsg,?arg);
??}
?}?catch?(MQClientException?var5)?{
??throw?RocketMQUtil.convert(var5);
?}
}
該方法有三個(gè)參數(shù):
destination:目的地(主題),這里發(fā)送給 add-amount這個(gè)主題message:發(fā)送給消費(fèi)者的消息體,需要使用 MessageBuilder.withPayload()來(lái)構(gòu)建消息arg:參數(shù)
注意,這里我們生成了一個(gè)transactionId,并放在header中跟消息一起發(fā)送(這里實(shí)際也可以構(gòu)造成一個(gè)對(duì)象,放在arg里進(jìn)行發(fā)送),作用后面再講!
執(zhí)行本地事務(wù)與回查
MQServer收到半消息后會(huì)告訴生產(chǎn)者order-service確認(rèn)收到半消息,這時(shí)候order-service需要執(zhí)行本地事務(wù),執(zhí)行完本地事務(wù)后再告訴MQServer本地事務(wù)的執(zhí)行狀態(tài),確認(rèn)消息究竟是Commit還是Rollback。如果在告訴MQServer本地執(zhí)行狀態(tài)的時(shí)候出異常了還需要讓MQServer能夠回查到,怎么實(shí)現(xiàn)這一些列操作呢?
RocketMQ提供了 RocketMQLocalTransactionListener 接口,本地事務(wù)監(jiān)聽(tīng)器,這個(gè)接口類(lèi)的實(shí)現(xiàn)如下:
第一個(gè)方法 executeLocalTransaction 為執(zhí)行本地事務(wù);第二個(gè)方法 checkLocalTransaction 為檢查本地事務(wù)的執(zhí)行狀態(tài),也就是回查動(dòng)作。有了這個(gè)接口類(lèi)我們的執(zhí)行邏輯清楚了,但是還有個(gè)問(wèn)題:本地事務(wù)已經(jīng)執(zhí)行完成了,怎么去回查本地事務(wù)的執(zhí)行結(jié)果呢?
我們可以在執(zhí)行本地事務(wù)的時(shí)候同時(shí)生成一個(gè)事務(wù)日志,讓本地事務(wù)與日志事務(wù)在同一個(gè)方法中,同時(shí)添加 @Transactional 注解,保證兩個(gè)操作事務(wù)是一個(gè)原子操作。這樣如果事務(wù)日志表中有這個(gè)本地事務(wù)的信息,那就代表本地事務(wù)執(zhí)行成功,需要Commit,相反如果沒(méi)有對(duì)應(yīng)的事務(wù)日志,則表示沒(méi)執(zhí)行成功,需要Rollback
思路既然理順了,咱們就開(kāi)擼。
首先創(chuàng)建一個(gè)日志表
很簡(jiǎn)單的三個(gè)字段,主要是這個(gè)事務(wù)id,需要根據(jù)這個(gè)事務(wù)id回查事務(wù),還記得我們?cè)诎l(fā)送半消息時(shí)生成的事務(wù)id嗎,就是干這個(gè)用的!在生產(chǎn)者編寫(xiě)方法實(shí)現(xiàn) RocketMQLocalTransactionListener
@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor?=?@__(@Autowired))
public?class?AddUserAmountListener?implements?RocketMQLocalTransactionListener?{
????private?final?OrderService?orderService;
????private?final?RocketMqTransactionLogMapper?rocketMqTransactionLogMapper;
????/**
?????*?執(zhí)行本地事務(wù)
?????*/
????@Override
????public?RocketMQLocalTransactionState?executeLocalTransaction(Message?message,?Object?arg)?{
????????log.info("執(zhí)行本地事務(wù)");
????????MessageHeaders?headers?=?message.getHeaders();
????????//獲取事務(wù)ID
????????String?transactionId?=?(String)?headers.get(RocketMQHeaders.TRANSACTION_ID);
????????Integer?orderId?=?Integer.valueOf((String)headers.get("order_id"));
????????log.info("transactionId?is?{},?orderId?is?{}",transactionId,orderId);
????????try{
????????????//執(zhí)行本地事務(wù),并記錄日志
????????????orderService.changeStatuswithRocketMqLog(orderId,?CloudConstant.INVALID_STATUS,transactionId);
????????????//執(zhí)行成功,可以提交事務(wù)
????????????return?RocketMQLocalTransactionState.COMMIT;
????????}catch?(Exception?e){
????????????return?RocketMQLocalTransactionState.ROLLBACK;
????????}
????}
????/**
?????*?本地事務(wù)的檢查,檢查本地事務(wù)是否成功
?????*/
????@Override
????public?RocketMQLocalTransactionState?checkLocalTransaction(Message?message)?{
????????MessageHeaders?headers?=?message.getHeaders();
????????//獲取事務(wù)ID
????????String?transactionId?=?(String)?headers.get(RocketMQHeaders.TRANSACTION_ID);
????????log.info("檢查本地事務(wù),事務(wù)ID:{}",transactionId);
????????//根據(jù)事務(wù)id從日志表檢索
????????QueryWrapper?queryWrapper?=?new?QueryWrapper<>();
????????queryWrapper.eq("transaction_id",transactionId);
????????RocketmqTransactionLog?rocketmqTransactionLog?=?rocketMqTransactionLogMapper.selectOne(queryWrapper);
????????if(null?!=?rocketmqTransactionLog){
????????????return?RocketMQLocalTransactionState.COMMIT;
????????}
????????return?RocketMQLocalTransactionState.ROLLBACK;
????}
}
執(zhí)行本地事務(wù)的方法
@Transactional(rollbackFor?=?RuntimeException.class)
@Override
public?void?changeStatuswithRocketMqLog(Integer?id,String?status,String?transactionId){
????//將訂單狀態(tài)置位無(wú)效
?orderMapper.changeStatus(id,status);
????//插入事務(wù)表
?rocketMqTransactionLogMapper.insert(
???RocketmqTransactionLog.builder()
?????.transactionId(transactionId)
?????.log("執(zhí)行刪除訂單操作")
???.build()
?);
}
這一塊的代碼邏輯都是在生產(chǎn)端,即Order-Server,大家不要搞錯(cuò)了
消費(fèi)消息
Rollback的消息MQServer會(huì)給我們處理,我們只要關(guān)注Commit狀態(tài)時(shí)消費(fèi)端可以正常消費(fèi)即可。在 account-service監(jiān)聽(tīng)消息,如果收到消息則給用戶(hù)賬戶(hù)增加余額。
@Slf4j
@Service
@RocketMQMessageListener(topic?=?"add-amount",consumerGroup?=?"cloud-group")
@RequiredArgsConstructor(onConstructor?=?@__(@Autowired)?)
public?class?AddUserAmountListener?implements?RocketMQListener<UserAddMoneyDTO>?{
????private?final?AccountMapper?accountMapper;
????/**
?????*?收到消息的業(yè)務(wù)邏輯
?????*/
????@Override
????public?void?onMessage(UserAddMoneyDTO?userAddMoneyDTO)?{
????????log.info("received?message:?{}",userAddMoneyDTO);
????????accountMapper.increaseAmount(userAddMoneyDTO.getUserCode(),userAddMoneyDTO.getAmount());
????????log.info("add?money?success");
????}
}
測(cè)試
訂單表有這樣一條記錄,用戶(hù)為jianzh5,amount為200
用戶(hù)表的記錄,執(zhí)行完成后jianzh5的賬戶(hù)應(yīng)該變成250
調(diào)用刪除訂單接口,刪除訂單 
發(fā)送半消息 
執(zhí)行本地事務(wù),并生成事務(wù)日志 
模擬異常情況 在發(fā)送Commit消息的時(shí)候我們用命令殺掉進(jìn)程 taskkill /pid 19748 -t -f,模擬異常!
重新啟動(dòng)order-service,查看是否會(huì)執(zhí)行回查動(dòng)作
MQServer進(jìn)行回查,檢查事務(wù)日志,判斷是否可以提交事務(wù)消費(fèi)者消費(fèi)事務(wù)消息,保證事務(wù)的一致性 
小結(jié)
使用RocketMQ實(shí)現(xiàn)事務(wù)消息的過(guò)程還是很復(fù)雜的,需要好好理解開(kāi)頭的那張圖,只有理解了事務(wù)消息的交互過(guò)程才能編寫(xiě)相應(yīng)的代碼!
收藏?等于白嫖,點(diǎn)贊?才是真情!

1.?人人都能看懂的 6 種限流實(shí)現(xiàn)方案!
3.?大型網(wǎng)站架構(gòu)演化發(fā)展歷程
6. 看完這篇Redis緩存三大問(wèn)題,保你能和面試官互扯
7. 程序員必知的 89 個(gè)操作系統(tǒng)核心概念
8. 深入理解 MySQL:快速學(xué)會(huì)分析SQL執(zhí)行效率
10. Spring Boot 面試,一個(gè)問(wèn)題就干趴下了!

掃碼二維碼關(guān)注我
·end·
—如果本文有幫助,請(qǐng)分享到朋友圈吧—
我們一起愉快的玩耍!

