Rocketmq源碼分析14:事務(wù)消息
注:本系列源碼分析基于RocketMq 4.8.0,gitee倉(cāng)庫(kù)鏈接:https://gitee.com/funcy/rocketmq.git.
rocketMq支持一類特別的消息:事務(wù)消息,本文將從源碼角度分析事務(wù)消息的實(shí)現(xiàn)原理。
1. demo 準(zhǔn)備
事務(wù)消息的示例位于org.apache.rocketmq.example.transaction包中,我們先來(lái)看看它的使用:
1.1 準(zhǔn)備事務(wù)監(jiān)聽器:TransactionListener
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
TransactionListener是事務(wù)監(jiān)聽接口,它有兩個(gè)方法:
executeLocalTransaction(...):執(zhí)行事務(wù),這里是事務(wù)的內(nèi)容checkLocalTransaction(...):檢查事務(wù)的執(zhí)行狀態(tài)
1.2 事務(wù)消息的producer
接著就是事務(wù)消息的生產(chǎn)者了,代碼如下:
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 這里的 producer 類型是 TransactionMQProducer
TransactionMQProducer producer
= new TransactionMQProducer("please_rename_unique_group_name");
// 準(zhǔn)備一線程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
}
);
producer.setExecutorService(executorService);
// 設(shè)置監(jiān)聽
TransactionListener transactionListener = new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
producer.start();
// 發(fā)送事務(wù)消息
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
與普通消息的producer不同的是,事務(wù)消息的producer類型是TransactionMQProducer,并且需要設(shè)置事務(wù)監(jiān)聽器。
有了示例demo,接著我們就來(lái)分析這其中的流程了。
2. 啟動(dòng):TransactionMQProducer#start
TransactionMQProducer的啟動(dòng)方法為start(),內(nèi)容如下:
@Override
public void start() throws MQClientException {
// 初始化環(huán)境
this.defaultMQProducerImpl.initTransactionEnv();
// 調(diào)用父類DefaultMQProducer的方法
super.start();
}
這個(gè)方法先是調(diào)用DefaultMQProducerImpl#initTransactionEnv方法進(jìn)行了一些初始化操作,然后調(diào)用父類DefaultMQProducer的start()方法進(jìn)行啟動(dòng)操作。從這里可以看出,與普通消息的producer啟動(dòng)流程相比,事務(wù)消息的producer僅是多了一步初始化事務(wù)環(huán)境操作。
我們進(jìn)入DefaultMQProducerImpl#initTransactionEnv方法,看看它做了什么:
public void initTransactionEnv() {
TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
// 整個(gè)方法就是對(duì) checkExecutor 進(jìn)行賦值
if (producer.getExecutorService() != null) {
this.checkExecutor = producer.getExecutorService();
} else {
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(
producer.getCheckRequestHoldMax());
this.checkExecutor = new ThreadPoolExecutor(
producer.getCheckThreadPoolMinSize(),
producer.getCheckThreadPoolMaxSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.checkRequestQueue);
}
}
從代碼中可以看到,這整個(gè)方法就是對(duì)成員變量checkExecutor進(jìn)行賦值操作。
3. 發(fā)送消息:TransactionMQProducer#sendMessageInTransaction(...)
發(fā)送消息的方法為 TransactionMQProducer#sendMessageInTransaction(...),代碼如下:
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
繼續(xù)跟進(jìn),進(jìn)入 DefaultMQProducerImpl#sendMessageInTransaction 方法:
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
// 獲取 TransactionListener
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// 清除延遲級(jí)別,可以看到,事務(wù)消息不支持延遲
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
// 設(shè)置消息屬性,指定消息類型為事務(wù)消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP,
this.defaultMQProducer.getProducerGroup());
try {
// 發(fā)送消息,按同步模式發(fā)送
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
// 處理返回值
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
// 這里省略了好多的判斷
...
// 發(fā)送成功,執(zhí)行本地事務(wù)
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
// 結(jié)束遠(yuǎn)程事務(wù),注意傳入的 localTransactionState
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn(...);
}
// 構(gòu)造返回值
TransactionSendResult transactionSendResult = new TransactionSendResult();
...
return transactionSendResult;
}
這個(gè)方法就是用來(lái)發(fā)送事務(wù)消息的方法了,這里將其中的關(guān)鍵點(diǎn)總結(jié)如下:
獲取 TransactionListener,這個(gè)TransactionListener就是我們?cè)谑纠?code style="overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(145, 109, 213);font-weight: bolder;background-image: none;background-position: initial;background-size: initial;background-repeat: initial;background-attachment: initial;background-origin: initial;background-clip: initial;">demo中調(diào)用producer.setTransactionListener(...)設(shè)置的如果延遲級(jí)別不等于0,則將其清除,這就表明事務(wù)消息不支持延遲 設(shè)置消息屬性,指定消息類型為事務(wù)消息, broker在收到消息時(shí),會(huì)對(duì)事務(wù)消息進(jìn)行特別處理發(fā)送消息,發(fā)送方式與普通消息的發(fā)送并不區(qū)別,不過(guò)需要指明的是,這里是按同步模式發(fā)送的 處理消息的發(fā)送結(jié)果,如果發(fā)送失敗,則將事務(wù)狀態(tài)設(shè)置為 ROLLBACK_MESSAGE,表示需要回滾;發(fā)送成功則執(zhí)行本地事務(wù),也就是執(zhí)行transactionListener.executeLocalTransaction(...)方法,方法返回事務(wù)狀態(tài)結(jié)束遠(yuǎn)程事務(wù),這一步會(huì)將第5步得到的事務(wù)狀態(tài)發(fā)往 broker,接下來(lái)的事就由broker進(jìn)行處理了
這里我們來(lái)看一眼transactionListener.executeLocalTransaction(...)方法的內(nèi)容:
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
...
}
這是示例demo中的內(nèi)容,在executeLocalTransaction(...)方法中可以返回事務(wù)的執(zhí)行狀態(tài),這個(gè)狀態(tài)非常重要,因?yàn)檫@個(gè)狀態(tài)之后會(huì)發(fā)往broker,broker會(huì)根據(jù)這個(gè)狀態(tài)來(lái)判斷是要提交還是回滾消息。
我們?cè)賮?lái)看看結(jié)束事務(wù)的方法DefaultMQProducerImpl#endTransaction:
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException,
MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
// 找到一個(gè)broker地址
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(
sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
// 設(shè)置消息頭,根據(jù)消息狀態(tài)設(shè)置 提交/回滾 標(biāo)識(shí)
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " +
localException.toString()) : null;
// 發(fā)送方式為 oneway
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader,
remark, this.defaultMQProducer.getSendMsgTimeout());
}
這個(gè)方法就是向broker發(fā)送結(jié)束事務(wù)操作了,代碼中關(guān)鍵之處有兩點(diǎn):
根據(jù) localTransactionState狀態(tài)來(lái)設(shè)置事務(wù)提交/回滾的標(biāo)識(shí),localTransactionState的值來(lái)源于事務(wù)消息的發(fā)送結(jié)果,或本地事務(wù)的執(zhí)行結(jié)果消息的發(fā)送方式為 oneway,這表明rocketMq并不關(guān)心該消息的返回值,為何不關(guān)心呢?因?yàn)槭聞?wù)消息還有個(gè)broker反查機(jī)制,即broker定時(shí)向producer發(fā)送消息反查事務(wù)的狀態(tài),這點(diǎn)本文后面會(huì)分析。
到這里,producer就處理完事務(wù)消息的發(fā)送流程了,接下來(lái)我們來(lái)看看broker是如何處理事務(wù)相關(guān)消息的。
4. broker 處理事務(wù)消息
在上一節(jié)的TransactionMQProducer#sendMessageInTransaction(...)方法中,一共向broker發(fā)送了兩條消息,這里我們來(lái)分析這兩條消息所做的內(nèi)容。
4.1 處理事務(wù)消息:SendMessageProcessor#asyncSendMessage
producer向broker發(fā)送事務(wù)消息后,處理流程同普通消息的處理流程一致,本文僅關(guān)注兩者不同之處,在SendMessageProcessor#asyncSendMessage方法中,會(huì)區(qū)分普通消息與事務(wù)消息:
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx,
RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
// 如果是事務(wù)消息
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
// 處理事務(wù)消息
putMessageResult = this.brokerController.getTransactionalMessageService()
.asyncPrepareMessage(msgInner);
} else {
// 發(fā)送普通消息
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner,
responseHeader, mqtraceContext, ctx, queueIdInt);
}
繼續(xù)跟進(jìn)TransactionalMessageBridge#asyncPutHalfMessage
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(
MessageExtBrokerInner messageInner) {
// parseHalfMessageInner(...):消息轉(zhuǎn)換
// asyncPutMessage(...):消息存儲(chǔ),就是保存到commitLog中
return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}
這個(gè)方法中有兩個(gè)操作:
parseHalfMessageInner(...):消息轉(zhuǎn)換,這個(gè)方法會(huì)將事務(wù)消息暫存到事務(wù)消息的專屬隊(duì)列中asyncPutMessage(...):消息存儲(chǔ),就是保存到commitLog中,這點(diǎn)與普通消息并無(wú)差別
由于事務(wù)消息存儲(chǔ)與普通消息的存儲(chǔ)并無(wú)差別,因此這里,我們主要來(lái)看看事務(wù)消息的轉(zhuǎn)換過(guò)程,進(jìn)入TransactionalMessageBridge#parseHalfMessageInner方法:
/**
* 構(gòu)建消息內(nèi)容
* @param msgInner
* @return
*/
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// 保存原始的topic與queueId
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
// 指定新的 topic 與 queue,其實(shí)就是暫存到事務(wù)相關(guān)的queue中
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(),
MessageSysFlag.TRANSACTION_NOT_TYPE));
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder
.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
這一步的操作很直觀,就是將消息的topic與queueId保存下來(lái),然后換成事務(wù)專用的topic與queueId,然后存儲(chǔ)到commitLog中,由些,事務(wù)消息的發(fā)送也就結(jié)束了。
4.2 處理結(jié)束事務(wù)的消息:EndTransactionProcessor#processRequest
結(jié)束事務(wù)消息的code為END_TRANSACTION,處理該code的方法為 EndTransactionProcessor#processRequest:
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader)request
.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.debug("Transaction request:{}", requestHeader);
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
}
...
OperationResult result = new OperationResult();
// 事務(wù)提交操作
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 從commitLog中獲取消息
result = this.brokerController.getTransactionalMessageService()
.commitMessage(requestHeader);
// 如果返回成功
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 獲取消息,在這里方法里會(huì)處理消息轉(zhuǎn)換操作,即拿到真正要發(fā)送的topic與queue
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
// 省略一些setXxx(...)方法
...
// 真正的投遞操作
RemotingCommand sendResult = sendFinalMessage(msgInner);
// 投遞完成,刪除消息,當(dāng)然不是真正地從磁盤上刪除,只是將消息標(biāo)記為刪除
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService()
.deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
// 事務(wù)回滾操作
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
// 獲取消息
result = this.brokerController.getTransactionalMessageService()
.rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 沒有投遞操作,直接刪除,不是真正地從磁盤上刪除,只是將消息標(biāo)記為刪除
this.brokerController.getTransactionalMessageService().deletePrepareMessage(
result.getPrepareMessage());
}
return res;
}
}
// 并沒有處理 UNKNOW 的操作
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}
在這個(gè)方法里,會(huì)分別處理事務(wù)的提交與回滾操作,
在事務(wù)的提交處理中,可以看到此時(shí)事務(wù)才真正地投遞出去,投遞出去后,會(huì)把原本的事務(wù)消息標(biāo)記為刪除; 在事務(wù)的回滾操作中,直接就把原本的事務(wù)消息標(biāo)識(shí)為刪除了
我們來(lái)看看事務(wù)消息的真正投遞過(guò)程,進(jìn)入EndTransactionProcessor#sendFinalMessage方法:
private RemotingCommand sendFinalMessage(MessageExtBrokerInner msgInner) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
// 投遞操作
final PutMessageResult putMessageResult
= this.brokerController.getMessageStore().putMessage(msgInner);
if (putMessageResult != null) {
switch (putMessageResult.getPutMessageStatus()) {
// 省略結(jié)果的處理
...
}
}
}
這里的brokerController.getMessageStore().putMessage(...)操作,就是把消息再一次寫入到commitLog,不過(guò)此時(shí)的topic與queueId就是最初的了,接下來(lái)consumer就能對(duì)其進(jìn)行消費(fèi)了。
5. 事務(wù)的反查機(jī)制
前面我們分析了broker是如何處理事務(wù)消息的COMMIT_MESSAGE與ROLLBACK_MESSAGE狀態(tài),實(shí)際上,事務(wù)消息除了以上兩種狀態(tài)外,還有第三種狀態(tài):UNKNOW,從EndTransactionProcessor#processRequest方法來(lái)看,broker并沒有處理這種狀態(tài)!
當(dāng)出現(xiàn)UNKNOW狀態(tài)時(shí),rocketMq該怎么辦呢?實(shí)際上,EndTransactionProcessor#processRequest沒有處理UNKNOW狀態(tài),這就表明UNKNOW狀態(tài)的事務(wù)消息既不會(huì)執(zhí)行提交操作,也不會(huì)提交回滾操作,它會(huì)由一個(gè)單獨(dú)的線程來(lái)進(jìn)行操作,這個(gè)線程就是事務(wù)消息的檢查線程。
5.1 檢查線程的啟動(dòng)
在broker的啟動(dòng)流程中,BrokerController#start會(huì)執(zhí)行這樣一個(gè)方法:
public void start() throws Exception {
...
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
// 啟動(dòng)一些處理器
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
this.registerBrokerAll(true, false, true);
}
...
}
/**
* 在這里會(huì)啟動(dòng) 事務(wù)消息的檢查線程
*/
private void startProcessorByHa(BrokerRole role) {
if (BrokerRole.SLAVE != role) {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.start();
}
}
}
在這個(gè)方法里會(huì)TransactionalMessageCheckService的start()方法,我們先來(lái)看看這個(gè)操作做了什么,然后就來(lái)到了ServiceThread#start方法:
public abstract class ServiceThread implements Runnable {
...
public void start() {
log.info(...);
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
}
...
}
可以看到,TransactionalMessageCheckService的start()方法來(lái)自于ServiceThread,在ServiceThread的start()方法中,會(huì)啟動(dòng)一個(gè)線程來(lái)處理操作。這里我們直接進(jìn)入TransactionalMessageCheckService#run方法看看這個(gè)線程做了什么:
@Override
public void run() {
log.info("Start transaction check service thread!");
long checkInterval = brokerController.getBrokerConfig()
.getTransactionCheckInterval();
while (!this.isStopped()) {
// 運(yùn)行操作
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
跟進(jìn)ServiceThread#waitForRunning方法:
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
// 執(zhí)行操作
this.onWaitEnd();
return;
}
//entry to wait
waitPoint.reset();
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}
@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
// 檢查操作
this.brokerController.getTransactionalMessageService().check(timeout, checkMax,
this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}",
System.currentTimeMillis() - begin);
}
最終會(huì)執(zhí)行到TransactionalMessageServiceImpl#check方法,這個(gè)方法就是用來(lái)處理事務(wù)消息的檢查操作的:
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
// 事務(wù)消息的隊(duì)列名
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
// 獲取要檢查的消息隊(duì)列
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
// 省略了好多的內(nèi)容
...
// 從隊(duì)列上獲取事務(wù)消息
GetResult getResult = getHalfMsg(messageQueue, i);
// 省略了好多的內(nèi)容
...
// 檢查事務(wù)狀態(tài)
listener.resolveHalfMsg(msgExt);
// 依然是省略了好多的內(nèi)容
...
}
} catch (Throwable e) {
log.error("Check error", e);
}
}
這個(gè)方法中省略了大量代碼,關(guān)鍵操作就兩個(gè):
從事務(wù)消息的 topic上獲取消息檢查消息的事務(wù)狀態(tài)
5.2 broker發(fā)送檢查消息
這里直接來(lái)看檢查事務(wù)狀態(tài)的操作,進(jìn)入Broker2Client#checkProducerTransactionState方法:
public void checkProducerTransactionState(
final String group,
final Channel channel,
final CheckTransactionStateRequestHeader requestHeader,
final MessageExt messageExt) throws Exception {
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
request.setBody(MessageDecoder.encode(messageExt, false));
try {
// 發(fā)送檢測(cè)消息到producer,code 為 CHECK_TRANSACTION_STATE
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
log.error(...);
}
}
從上面的代碼來(lái)看,檢查消息的code為CHECK_TRANSACTION_STATE,請(qǐng)求方式為Oneway這表明broker并不關(guān)心該消息的返回結(jié)果。
producer收到broker發(fā)送過(guò)來(lái)的檢查消息后,又會(huì)怎么處理呢?下一小節(jié)我們?cè)俳視浴?/p>
5.3 producer處理檢查消息
從上一小節(jié)的分析可知,broker發(fā)送的檢查消息的code為CHECK_TRANSACTION_STATE,producer處理該code的方法為ClientRemotingProcessor#processRequest:
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
// 檢查事務(wù)狀態(tài)
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);
// 省略其他
...
default:
break;
}
return null;
}
我們跟進(jìn)ClientRemotingProcessor#checkTransactionState方法:
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final CheckTransactionStateRequestHeader requestHeader = (CheckTransactionStateRequestHeader)
request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (messageExt != null) {
if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {
messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(),
this.mqClientFactory.getClientConfig().getNamespace()));
}
String transactionId = messageExt
.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
messageExt.setTransactionId(transactionId);
}
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
// 獲得一個(gè)producer
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null) {
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
// 檢查狀態(tài)
producer.checkTransactionState(addr, messageExt, requestHeader);
} else {
log.debug("checkTransactionState, pick producer by group[{}] failed", group);
}
} else {
log.warn("checkTransactionState, pick producer group failed");
}
} else {
log.warn("checkTransactionState, decode message failed");
}
return null;
}
這個(gè)方法雖然有點(diǎn)長(zhǎng),但主要操作就兩個(gè):
獲得一個(gè) producer:this.mqClientFactory.selectProducer(group)檢查事務(wù)狀態(tài): producer.checkTransactionState(...)
這里我們直接看檢查事務(wù)狀態(tài)的操作,進(jìn)入DefaultMQProducerImpl#checkTransactionState方法:
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
// 省略了一堆的內(nèi)容
...
};
this.checkExecutor.submit(request);
}
在DefaultMQProducerImpl#checkTransactionState方法中,先是創(chuàng)建了一個(gè)Runnable對(duì)象,然后將該對(duì)象提交到checkExecutor線程池中,在本文的一開始,我們?cè)诜治?code style="overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(145, 109, 213);font-weight: bolder;background-image: none;background-position: initial;background-size: initial;background-repeat: initial;background-attachment: initial;background-origin: initial;background-clip: initial;">TransactionMQProducer的啟動(dòng)流程中就提到過(guò),它的賦值在DefaultMQProducerImpl#initTransactionEnv方法,現(xiàn)在看到了它的使用。
根據(jù)線程池的運(yùn)行流程,它運(yùn)行的內(nèi)容主要就是Runnable的run()方法了,它的run()方法內(nèi)容如下:
Runnable request = new Runnable() {
private final String brokerAddr = addr;
private final MessageExt message = msg;
private final CheckTransactionStateRequestHeader checkRequestHeader = header;
private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
@Override
public void run() {
// 獲得 checkListener
TransactionCheckListener transactionCheckListener
= DefaultMQProducerImpl.this.checkListener();
// 1. 獲取 listener
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
if (transactionCheckListener != null) {
localTransactionState = transactionCheckListener
.checkLocalTransactionState(message);
} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");
// 2. 檢查事務(wù)狀態(tài)
localTransactionState = transactionListener.checkLocalTransaction(message);
} else {
log.warn(...);
}
} catch (Throwable e) {
log.error(...);
exception = e;
}
// 處理事務(wù)狀態(tài)
this.processTransactionState(localTransactionState, group, exception);
} else {
log.warn(...);
}
}
private void processTransactionState(
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
// 設(shè)置檢查標(biāo)記
thisHeader.setFromTransactionCheck(true);
String uniqueKey = message.getProperties()
.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
uniqueKey = message.getMsgId();
}
thisHeader.setMsgId(uniqueKey);
thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
// 3. 處理事務(wù) 提交/回滾 狀態(tài)
switch (localTransactionState) {
case COMMIT_MESSAGE:
// 設(shè)置提交狀態(tài)為:提交
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
// 設(shè)置提交狀態(tài)為:回滾
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
log.warn(...);
break;
case UNKNOW:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
log.warn(...);
break;
default:
break;
}
String remark = null;
if (exception != null) {
remark = "checkLocalTransactionState Exception: "
+ RemotingHelper.exceptionSimpleDesc(exception);
}
try {
// 4. 發(fā)送消息,結(jié)束事務(wù)
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl()
.endTransactionOneway(brokerAddr, thisHeader, remark, 3000);
} catch (Exception e) {
log.error("endTransactionOneway exception", e);
}
}
};
這部分的代碼有點(diǎn)長(zhǎng),不過(guò)關(guān)鍵部分就4點(diǎn):
獲取 listener,這就是我們一開始在demo里設(shè)置的TransactionListenerImpl檢查事務(wù)狀態(tài),這里就是運(yùn)行我們自己指定的方法: TransactionListenerImpl#checkLocalTransaction處理事務(wù) 提交/回滾 狀態(tài),這一步就是根據(jù) TransactionListenerImpl#checkLocalTransaction方法的執(zhí)行結(jié)果,來(lái)設(shè)置提交/回滾狀態(tài)向 broker發(fā)送結(jié)束事務(wù)的消息,這個(gè)同前面DefaultMQProducerImpl#sendMessageInTransaction方法中的操作是一致的
6. 總結(jié)
分析完事務(wù)消息的流程后,我們來(lái)對(duì)整個(gè)流程做個(gè)總結(jié):

這是官網(wǎng)提供的一張圖,流程如下:
producer發(fā)送一條“半消息”,broker收到后,返回“ok”,進(jìn)入第2步執(zhí)行本地事務(wù),得到執(zhí)行結(jié)果,成功則進(jìn)行第3步,失敗則進(jìn)行第4步 本地事務(wù)執(zhí)行成功,發(fā)送“commit”消息到 broker,此時(shí)第1步發(fā)送的“半消息”才真正投遞出去本地事務(wù)執(zhí)行失敗,發(fā)送“rollback”消息到 broker,此第1步發(fā)送的“半消息”就取消了,再也不會(huì)進(jìn)行發(fā)送了
正常情況下,以上4步就滿足事務(wù)消息的流程了,但實(shí)際中可能會(huì)異常情況:第3步或第4步發(fā)送失敗了,導(dǎo)致broker中的半消息遲遲收不到回滾或提交的通知,此時(shí)就會(huì)用到回查機(jī)制:
broker遲遲收不到回滾或提交的通知,發(fā)送一條單向消息給producer,通知producer反查本地的事務(wù)執(zhí)行結(jié)果producer收到broker的消息后,調(diào)用回查方法,檢查本地事務(wù)狀態(tài)producer得到本地事務(wù)的狀態(tài),再發(fā)一條單向消息告知broker此前的"半消息"是提交還是回滾
限于作者個(gè)人水平,文中難免有錯(cuò)誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。
本文首發(fā)于微信公眾號(hào) 「Java技術(shù)探秘」,如果您喜歡本文,歡迎關(guān)注該公眾號(hào),讓我們一起在技術(shù)的世界里探秘吧!
