徹底看懂RocketMQ事務(wù)實(shí)現(xiàn)原理

面試中經(jīng)常會(huì)問(wèn)到比如RocketMQ的事務(wù)是如何實(shí)現(xiàn)的呢?學(xué)習(xí)框架,我們不僅要熟練使用,更要掌握設(shè)計(jì)及原理,才算熟悉一個(gè)框架。
1 RocketMQ 事務(wù)使用案例
public class CreateOrderService {private OrderDao orderDao;private ExecutorService executorService;private TransactionMQProducer producer;// 初始化transactionListener 和 producerpublic void init() throws MQClientException {TransactionListener transactionListener = createTransactionListener();producer = new TransactionMQProducer("myGroup");producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();}// 創(chuàng)建訂單服務(wù)的請(qǐng)求入口(...)public boolean createOrder(@RequestBody CreateOrderRequest request) {// 根據(jù)創(chuàng)建訂單請(qǐng)求創(chuàng)建一條消息Message msg = createMessage(request);// 發(fā)送事務(wù)消息SendResult sendResult = producer.sendMessageInTransaction(msg, request);// 返回:事務(wù)是否成功return sendResult.getSendStatus() == SendStatus.SEND_OK;}private TransactionListener createTransactionListener() {return new TransactionListener() {public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {CreateOrderRequest request = (CreateOrderRequest ) arg;try {// 執(zhí)行本地事務(wù)創(chuàng)建訂單orderDao.createOrderInDB(request);// 如果沒(méi)拋異常說(shuō)明執(zhí)行成功,提交事務(wù)消息return LocalTransactionState.COMMIT_MESSAGE;} catch (Throwable t) {// 失敗則直接回滾事務(wù)消息return LocalTransactionState.ROLLBACK_MESSAGE;}}// 反查本地事務(wù)public LocalTransactionState checkLocalTransaction(MessageExt msg) {// 從消息中獲得訂單IDString orderId = msg.getUserProperty("orderId");// 去db查詢訂單號(hào)是否存在,若存在則提交事務(wù)// 若不存在,可能是本地事務(wù)失敗了,也可能是本地事務(wù)還在執(zhí)行,所以返回UNKNOWreturn orderDao.isOrderIdExistsInDB(orderId)?LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW;}};}}
如上案例展示了一個(gè)訂單創(chuàng)建服務(wù),即往db插一條訂單記錄,并發(fā)一條創(chuàng)建訂單的消息,要求寫(xiě)db和發(fā)消息倆個(gè)操作在一個(gè)事務(wù)內(nèi)執(zhí)行。
首先在init()方法中初始化了transactionListener和發(fā)生RocketMQ事務(wù)消息的變量producer。
createOrder()
真正提供創(chuàng)建訂單服務(wù)的方法,根據(jù)請(qǐng)求的參數(shù)創(chuàng)建一條消息,然后調(diào)用 producer發(fā)事務(wù)消息,并返回事務(wù)執(zhí)行結(jié)果。createTransactionListener()
在init()方法中調(diào)用,構(gòu)造實(shí)現(xiàn)RocketMQ的TransactionListener接口的匿名類,該接口需要實(shí)現(xiàn)如下兩個(gè)方法:executeLocalTransaction:執(zhí)行本地事務(wù),在這里我們直接把訂單數(shù)據(jù)插入到數(shù)據(jù)庫(kù)中,并返回本地事務(wù)的執(zhí)行結(jié)果。

checkLocalTransaction:反查本地事務(wù),上述流程中是在db中查詢訂單號(hào)是否存在,若存在則提交事務(wù),若不存在,可能本地事務(wù)失敗了,也可能本地事務(wù)還在執(zhí)行,所以返回UNKNOW

這樣便使用RocketMQ的事務(wù)簡(jiǎn)單實(shí)現(xiàn)了一個(gè)創(chuàng)建訂單的分布式事務(wù)。
2 RocketMQ事務(wù)消息實(shí)現(xiàn)原理
2.1 Pro端如何發(fā)事務(wù)消息?
DefaultMQProducerImpl#sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter localTransactionExecuter, final Object arg)throws MQClientException {TransactionListener transactionListener = getCheckListener();if (null == localTransactionExecuter && null == transactionListener) {throw new MQClientException("tranExecutor is null", null);}// ignore DelayTimeLevel parameterif (msg.getDelayTimeLevel() != 0) {MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);}Validators.checkMessage(msg, this.defaultMQProducer);SendResult sendResult = null;// 給待發(fā)送消息添加屬性,表明是一個(gè)事務(wù)消息(即半消息)MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());// 像發(fā)送普通消息一樣,把這條事務(wù)消息發(fā)往Brokertry {sendResult = this.send(msg);} catch (Exception e) {throw new MQClientException("send message Exception", e);}LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable localException = null;switch (sendResult.getSendStatus()) {// 事務(wù)消息發(fā)送成功case SEND_OK: {try {if (sendResult.getTransactionId() != null) {msg.putUserProperty("__transactionId__", sendResult.getTransactionId());}String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {msg.setTransactionId(transactionId);}if (null != localTransactionExecuter) {localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener != null) {log.debug("Used new transaction API");// 開(kāi)始執(zhí)行本地事務(wù)localTransactionState = transactionListener.executeLocalTransaction(msg, arg);}if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}} catch (Throwable e) {log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}// 事務(wù)過(guò)程的最后,給Broker發(fā)送提交或回滾事務(wù)的RPC請(qǐng)求。try {this.endTransaction(sendResult, localTransactionState, localException);} catch (Exception e) {log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);}TransactionSendResult transactionSendResult = new TransactionSendResult();transactionSendResult.setSendStatus(sendResult.getSendStatus());transactionSendResult.setMessageQueue(sendResult.getMessageQueue());transactionSendResult.setMsgId(sendResult.getMsgId());transactionSendResult.setQueueOffset(sendResult.getQueueOffset());transactionSendResult.setTransactionId(sendResult.getTransactionId());transactionSendResult.setLocalTransactionState(localTransactionState);return transactionSendResult;}
有事務(wù)反查機(jī)制作兜底,該RPC請(qǐng)求即使失敗或丟失,也不會(huì)影響事務(wù)最終的結(jié)果。最后構(gòu)建事務(wù)消息的發(fā)送結(jié)果,并返回。
2.2 Broker端如何處理事務(wù)消息?
SendMessageProcessor#asyncSendMessage

跟進(jìn)去看看真正處理半消息的業(yè)務(wù)邏輯,這段處理邏輯在類
TransactionalMessageBridge
putHalfMessage

parseHalfMessageInner

RocketMQ并非將事務(wù)消息保存至消息中 client 指定的 queue,而是記錄了原始的 topic 和 queue 后,把這個(gè)事務(wù)消息保存在


特殊的內(nèi)部 topic:RMQ_SYS_TRANS_HALF_TOPIC
序號(hào)為 0 的 queue
這套 topic 和 queue 對(duì)消費(fèi)者不可見(jiàn),因此里面的消息也永遠(yuǎn)不會(huì)被消費(fèi)。這就保證在事務(wù)提交成功之前,這個(gè)事務(wù)消息對(duì) Consumer 是消費(fèi)不到的。
2.3 Broker端如何事務(wù)反查?
在Broker的TransactionalMessageCheckService服務(wù)中啟動(dòng)了一個(gè)定時(shí)器,定時(shí)從事務(wù)消息queue中讀出所有待反查的事務(wù)消息。
AbstractTransactionalMessageCheckListener#resolveHalfMsg
針對(duì)每個(gè)需要反查的半消息,Broker會(huì)給對(duì)應(yīng)的Producer發(fā)一個(gè)要求執(zhí)行事務(wù)狀態(tài)反查的RPC請(qǐng)求
AbstractTransactionalMessageCheckListener#sendCheckMessage

Broker2Client#checkProducerTransactionState

根據(jù)RPC返回響應(yīng)中的反查結(jié)果,來(lái)決定這個(gè)半消息是需要提交還是回滾,或者后續(xù)繼續(xù)來(lái)反查。
最后,提交或者回滾事務(wù)。首先把半消息標(biāo)記為已處理
如果是提交事務(wù),就把半消息從半消息隊(duì)列中復(fù)制到該消息真正的topic和queue中
如果是回滾事務(wù),什么都不做
EndTransactionProcessor#processRequest
最后結(jié)束該事務(wù)。
3 總結(jié)
整體實(shí)現(xiàn)流程

RocketMQ是基于兩階段提交來(lái)實(shí)現(xiàn)的事務(wù),把這些事務(wù)消息暫存在一個(gè)特殊的queue中,待事務(wù)提交后再移動(dòng)到業(yè)務(wù)隊(duì)列中。最后,RocketMQ的事務(wù)適用于解決本地事務(wù)和發(fā)消息的數(shù)據(jù)一致性問(wèn)題。
參考
https://juejin.im/post/6844904193526857742

