<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Rocketmq源碼分析14:事務(wù)消息

          共 57311字,需瀏覽 115分鐘

           ·

          2021-05-02 15:10

          注:本系列源碼分析基于RocketMq 4.8.0,gitee倉(cāng)庫(kù)鏈接:https://gitee.com/funcy/rocketmq.git.

          rocketMq支持一類特別的消息:事務(wù)消息,本文將從源碼角度分析事務(wù)消息的實(shí)現(xiàn)原理。

          1. demo 準(zhǔn)備

          事務(wù)消息的示例位于org.apache.rocketmq.example.transaction包中,我們先來(lái)看看它的使用:

          1.1 準(zhǔn)備事務(wù)監(jiān)聽器:TransactionListener

          public class TransactionListenerImpl implements TransactionListener {
              private AtomicInteger transactionIndex = new AtomicInteger(0);

              private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

              @Override
              public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                  int value = transactionIndex.getAndIncrement();
                  int status = value % 3;
                  localTrans.put(msg.getTransactionId(), status);
                  return LocalTransactionState.UNKNOW;
              }

              @Override
              public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                  Integer status = localTrans.get(msg.getTransactionId());
                  if (null != status) {
                      switch (status) {
                          case 0:
                              return LocalTransactionState.UNKNOW;
                          case 1:
                              return LocalTransactionState.COMMIT_MESSAGE;
                          case 2:
                              return LocalTransactionState.ROLLBACK_MESSAGE;
                          default:
                              return LocalTransactionState.COMMIT_MESSAGE;
                      }
                  }
                  return LocalTransactionState.COMMIT_MESSAGE;
              }
          }

          TransactionListener是事務(wù)監(jiān)聽接口,它有兩個(gè)方法:

          • executeLocalTransaction(...):執(zhí)行事務(wù),這里是事務(wù)的內(nèi)容
          • checkLocalTransaction(...):檢查事務(wù)的執(zhí)行狀態(tài)

          1.2 事務(wù)消息的producer

          接著就是事務(wù)消息的生產(chǎn)者了,代碼如下:

          public class TransactionProducer {
              public static void main(String[] args) throws MQClientException, InterruptedException {
                  // 這里的 producer 類型是 TransactionMQProducer
                  TransactionMQProducer producer 
                      = new TransactionMQProducer("please_rename_unique_group_name");
                  // 準(zhǔn)備一線程池
                  ExecutorService executorService = new ThreadPoolExecutor(25100, TimeUnit.SECONDS, 
                      new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                          @Override
                          public Thread newThread(Runnable r) {
                              Thread thread = new Thread(r);
                              thread.setName("client-transaction-msg-check-thread");
                              return thread;
                          }
                      }
                  );

                  producer.setExecutorService(executorService);
                  // 設(shè)置監(jiān)聽
                  TransactionListener transactionListener = new TransactionListenerImpl();
                  producer.setTransactionListener(transactionListener);
                  producer.start();

                  // 發(fā)送事務(wù)消息
                  String[] tags = new String[] {"TagA""TagB""TagC""TagD""TagE"};
                  for (int i = 0; i < 10; i++) {
                      try {
                          Message msg =
                              new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                                  ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                          SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                          System.out.printf("%s%n", sendResult);

                          Thread.sleep(10);
                      } catch (MQClientException | UnsupportedEncodingException e) {
                          e.printStackTrace();
                      }
                  }

                  for (int i = 0; i < 100000; i++) {
                      Thread.sleep(1000);
                  }
                  producer.shutdown();
              }
          }

          與普通消息的producer不同的是,事務(wù)消息的producer類型是TransactionMQProducer,并且需要設(shè)置事務(wù)監(jiān)聽器。

          有了示例demo,接著我們就來(lái)分析這其中的流程了。

          2. 啟動(dòng):TransactionMQProducer#start

          TransactionMQProducer的啟動(dòng)方法為start(),內(nèi)容如下:

          @Override
          public void start() throws MQClientException {
              // 初始化環(huán)境
              this.defaultMQProducerImpl.initTransactionEnv();
              // 調(diào)用父類DefaultMQProducer的方法
              super.start();
          }

          這個(gè)方法先是調(diào)用DefaultMQProducerImpl#initTransactionEnv方法進(jìn)行了一些初始化操作,然后調(diào)用父類DefaultMQProducerstart()方法進(jìn)行啟動(dòng)操作。從這里可以看出,與普通消息的producer啟動(dòng)流程相比,事務(wù)消息的producer僅是多了一步初始化事務(wù)環(huán)境操作。

          我們進(jìn)入DefaultMQProducerImpl#initTransactionEnv方法,看看它做了什么:

          public void initTransactionEnv() {
              TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
              // 整個(gè)方法就是對(duì) checkExecutor 進(jìn)行賦值
              if (producer.getExecutorService() != null) {
                  this.checkExecutor = producer.getExecutorService();
              } else {
                  this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(
                      producer.getCheckRequestHoldMax());
                  this.checkExecutor = new ThreadPoolExecutor(
                      producer.getCheckThreadPoolMinSize(),
                      producer.getCheckThreadPoolMaxSize(),
                      1000 * 60,
                      TimeUnit.MILLISECONDS,
                      this.checkRequestQueue);
              }
          }

          從代碼中可以看到,這整個(gè)方法就是對(duì)成員變量checkExecutor進(jìn)行賦值操作。

          3. 發(fā)送消息:TransactionMQProducer#sendMessageInTransaction(...)

          發(fā)送消息的方法為 TransactionMQProducer#sendMessageInTransaction(...),代碼如下:

          public TransactionSendResult sendMessageInTransaction(final Message msg,
              final Object arg)
           throws MQClientException 
          {
              if (null == this.transactionListener) {
                  throw new MQClientException("TransactionListener is null"null);
              }

              msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
              return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
          }

          繼續(xù)跟進(jìn),進(jìn)入 DefaultMQProducerImpl#sendMessageInTransaction 方法:

          public TransactionSendResult sendMessageInTransaction(final Message msg,
                  final LocalTransactionExecuter localTransactionExecuter, final Object arg)

                  throws MQClientException 
          {
              // 獲取 TransactionListener
              TransactionListener transactionListener = getCheckListener();
              if (null == localTransactionExecuter && null == transactionListener) {
                  throw new MQClientException("tranExecutor is null"null);
              }

              // 清除延遲級(jí)別,可以看到,事務(wù)消息不支持延遲
              if (msg.getDelayTimeLevel() != 0) {
                  MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
              }

              Validators.checkMessage(msg, this.defaultMQProducer);

              SendResult sendResult = null;
              // 設(shè)置消息屬性,指定消息類型為事務(wù)消息
              MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
              MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, 
                  this.defaultMQProducer.getProducerGroup());
              try {
                  // 發(fā)送消息,按同步模式發(fā)送
                  sendResult = this.send(msg);
              } catch (Exception e) {
                  throw new MQClientException("send message Exception", e);
              }

              LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
              Throwable localException = null;
              // 處理返回值
              switch (sendResult.getSendStatus()) {
                  case SEND_OK: {
                      try {
                          // 這里省略了好多的判斷
                          ...

                          // 發(fā)送成功,執(zhí)行本地事務(wù)
                          localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                      } catch (Throwable e) {
                          log.info("executeLocalTransactionBranch exception", e);
                          log.info(msg.toString());
                          localException = e;
                      }
                  }
                  break;
                  case FLUSH_DISK_TIMEOUT:
                  case FLUSH_SLAVE_TIMEOUT:
                  case SLAVE_NOT_AVAILABLE:
                      localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                      break;
                  default:
                      break;
              }

              try {
                  // 結(jié)束遠(yuǎn)程事務(wù),注意傳入的 localTransactionState
                  this.endTransaction(sendResult, localTransactionState, localException);
              } catch (Exception e) {
                  log.warn(...);
              }

              // 構(gòu)造返回值
              TransactionSendResult transactionSendResult = new TransactionSendResult();
              ...

              return transactionSendResult;
          }

          這個(gè)方法就是用來(lái)發(fā)送事務(wù)消息的方法了,這里將其中的關(guān)鍵點(diǎn)總結(jié)如下:

          1. 獲取 TransactionListener,這個(gè)TransactionListener就是我們?cè)谑纠?code style="overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(145, 109, 213);font-weight: bolder;background-image: none;background-position: initial;background-size: initial;background-repeat: initial;background-attachment: initial;background-origin: initial;background-clip: initial;">demo中調(diào)用producer.setTransactionListener(...)設(shè)置的
          2. 如果延遲級(jí)別不等于0,則將其清除,這就表明事務(wù)消息不支持延遲
          3. 設(shè)置消息屬性,指定消息類型為事務(wù)消息,broker在收到消息時(shí),會(huì)對(duì)事務(wù)消息進(jìn)行特別處理
          4. 發(fā)送消息,發(fā)送方式與普通消息的發(fā)送并不區(qū)別,不過(guò)需要指明的是,這里是按同步模式發(fā)送的
          5. 處理消息的發(fā)送結(jié)果,如果發(fā)送失敗,則將事務(wù)狀態(tài)設(shè)置為ROLLBACK_MESSAGE,表示需要回滾;發(fā)送成功則執(zhí)行本地事務(wù),也就是執(zhí)行transactionListener.executeLocalTransaction(...)方法,方法返回事務(wù)狀態(tài)
          6. 結(jié)束遠(yuǎn)程事務(wù),這一步會(huì)將第5步得到的事務(wù)狀態(tài)發(fā)往broker,接下來(lái)的事就由broker進(jìn)行處理了

          這里我們來(lái)看一眼transactionListener.executeLocalTransaction(...)方法的內(nèi)容:

          public class TransactionListenerImpl implements TransactionListener {
              @Override
              public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                  int value = transactionIndex.getAndIncrement();
                  int status = value % 3;
                  localTrans.put(msg.getTransactionId(), status);
                  return LocalTransactionState.UNKNOW;
              }
              ...
          }

          這是示例demo中的內(nèi)容,在executeLocalTransaction(...)方法中可以返回事務(wù)的執(zhí)行狀態(tài),這個(gè)狀態(tài)非常重要,因?yàn)檫@個(gè)狀態(tài)之后會(huì)發(fā)往brokerbroker會(huì)根據(jù)這個(gè)狀態(tài)來(lái)判斷是要提交還是回滾消息。

          我們?cè)賮?lái)看看結(jié)束事務(wù)的方法DefaultMQProducerImpl#endTransaction

          public void endTransaction(
                  final SendResult sendResult,
                  final LocalTransactionState localTransactionState,
                  final Throwable localException)
           throws RemotingException, 
                  MQBrokerException, InterruptedException, UnknownHostException 
          {
              final MessageId id;
              if (sendResult.getOffsetMsgId() != null) {
                  id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
              } else {
                  id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
              }
              String transactionId = sendResult.getTransactionId();
              // 找到一個(gè)broker地址
              final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(
                  sendResult.getMessageQueue().getBrokerName());
              EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
              requestHeader.setTransactionId(transactionId);
              requestHeader.setCommitLogOffset(id.getOffset());
              // 設(shè)置消息頭,根據(jù)消息狀態(tài)設(shè)置 提交/回滾 標(biāo)識(shí)
              switch (localTransactionState) {
                  case COMMIT_MESSAGE:
                      requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                      break;
                  case ROLLBACK_MESSAGE:
                      requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                      break;
                  case UNKNOW:
                      requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                      break;
                  default:
                      break;
              }

              requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
              requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
              requestHeader.setMsgId(sendResult.getMsgId());
              String remark = localException != null ? ("executeLocalTransactionBranch exception: " + 
                  localException.toString()) : null;
              // 發(fā)送方式為 oneway
              this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, 
                  remark, this.defaultMQProducer.getSendMsgTimeout());
          }

          這個(gè)方法就是向broker發(fā)送結(jié)束事務(wù)操作了,代碼中關(guān)鍵之處有兩點(diǎn):

          1. 根據(jù)localTransactionState狀態(tài)來(lái)設(shè)置事務(wù)提交/回滾的標(biāo)識(shí),localTransactionState的值來(lái)源于事務(wù)消息的發(fā)送結(jié)果,或本地事務(wù)的執(zhí)行結(jié)果
          2. 消息的發(fā)送方式為oneway,這表明rocketMq并不關(guān)心該消息的返回值,為何不關(guān)心呢?因?yàn)槭聞?wù)消息還有個(gè)broker反查機(jī)制,即broker定時(shí)向producer發(fā)送消息反查事務(wù)的狀態(tài),這點(diǎn)本文后面會(huì)分析。

          到這里,producer就處理完事務(wù)消息的發(fā)送流程了,接下來(lái)我們來(lái)看看broker是如何處理事務(wù)相關(guān)消息的。

          4. broker 處理事務(wù)消息

          在上一節(jié)的TransactionMQProducer#sendMessageInTransaction(...)方法中,一共向broker發(fā)送了兩條消息,這里我們來(lái)分析這兩條消息所做的內(nèi)容。

          4.1 處理事務(wù)消息:SendMessageProcessor#asyncSendMessage

          producerbroker發(fā)送事務(wù)消息后,處理流程同普通消息的處理流程一致,本文僅關(guān)注兩者不同之處,在SendMessageProcessor#asyncSendMessage方法中,會(huì)區(qū)分普通消息與事務(wù)消息:

          private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, 
                  RemotingCommand request,
                  SendMessageContext mqtraceContext,
                  SendMessageRequestHeader requestHeader)
           
          {
              // 如果是事務(wù)消息
              if (transFlag != null && Boolean.parseBoolean(transFlag)) {
                  if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                      response.setCode(ResponseCode.NO_PERMISSION);
                      response.setRemark(
                              "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                      + "] sending transaction message is forbidden");
                      return CompletableFuture.completedFuture(response);
                  }
                  // 處理事務(wù)消息
                  putMessageResult = this.brokerController.getTransactionalMessageService()
                      .asyncPrepareMessage(msgInner);
              } else {
                  // 發(fā)送普通消息
                  putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
              }
              return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, 
                  responseHeader, mqtraceContext, ctx, queueIdInt);
          }

          繼續(xù)跟進(jìn)TransactionalMessageBridge#asyncPutHalfMessage

          public CompletableFuture<PutMessageResult> asyncPutHalfMessage(
                  MessageExtBrokerInner messageInner)
           
          {
              // parseHalfMessageInner(...):消息轉(zhuǎn)換
              // asyncPutMessage(...):消息存儲(chǔ),就是保存到commitLog中
              return store.asyncPutMessage(parseHalfMessageInner(messageInner));
          }

          這個(gè)方法中有兩個(gè)操作:

          • parseHalfMessageInner(...):消息轉(zhuǎn)換,這個(gè)方法會(huì)將事務(wù)消息暫存到事務(wù)消息的專屬隊(duì)列中
          • asyncPutMessage(...):消息存儲(chǔ),就是保存到commitLog中,這點(diǎn)與普通消息并無(wú)差別

          由于事務(wù)消息存儲(chǔ)與普通消息的存儲(chǔ)并無(wú)差別,因此這里,我們主要來(lái)看看事務(wù)消息的轉(zhuǎn)換過(guò)程,進(jìn)入TransactionalMessageBridge#parseHalfMessageInner方法:

          /**
           * 構(gòu)建消息內(nèi)容
           * @param msgInner
           * @return
           */

          private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
              // 保存原始的topic與queueId
              MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
              MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
                  String.valueOf(msgInner.getQueueId()));
              // 指定新的 topic 與 queue,其實(shí)就是暫存到事務(wù)相關(guān)的queue中
              msgInner.setSysFlag(
                  MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), 
                  MessageSysFlag.TRANSACTION_NOT_TYPE));
              msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
              msgInner.setQueueId(0);
              msgInner.setPropertiesString(MessageDecoder
                  .messageProperties2String(msgInner.getProperties()));
              return msgInner;
          }

          這一步的操作很直觀,就是將消息的topicqueueId保存下來(lái),然后換成事務(wù)專用的topicqueueId,然后存儲(chǔ)到commitLog中,由些,事務(wù)消息的發(fā)送也就結(jié)束了。

          4.2 處理結(jié)束事務(wù)的消息:EndTransactionProcessor#processRequest

          結(jié)束事務(wù)消息的codeEND_TRANSACTION,處理該code的方法為 EndTransactionProcessor#processRequest

          public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
                  RemotingCommandException 
          {
              final RemotingCommand response = RemotingCommand.createResponseCommand(null);
              final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader)request
                  .decodeCommandCustomHeader(EndTransactionRequestHeader.class);
              LOGGER.debug("Transaction request:{}", requestHeader);
              if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
                  response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
                  LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
                  return response;
              }

              ...

              OperationResult result = new OperationResult();
              // 事務(wù)提交操作
              if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
                  // 從commitLog中獲取消息
                  result = this.brokerController.getTransactionalMessageService()
                      .commitMessage(requestHeader);
                  // 如果返回成功
                  if (result.getResponseCode() == ResponseCode.SUCCESS) {
                      RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                      if (res.getCode() == ResponseCode.SUCCESS) {
                          // 獲取消息,在這里方法里會(huì)處理消息轉(zhuǎn)換操作,即拿到真正要發(fā)送的topic與queue
                          MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                          // 省略一些setXxx(...)方法
                          ...
                          // 真正的投遞操作
                          RemotingCommand sendResult = sendFinalMessage(msgInner);
                          // 投遞完成,刪除消息,當(dāng)然不是真正地從磁盤上刪除,只是將消息標(biāo)記為刪除
                          if (sendResult.getCode() == ResponseCode.SUCCESS) {
                              this.brokerController.getTransactionalMessageService()
                                  .deletePrepareMessage(result.getPrepareMessage());
                          }
                          return sendResult;
                      }
                      return res;
                  }
              // 事務(wù)回滾操作
              } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
                  // 獲取消息
                  result = this.brokerController.getTransactionalMessageService()
                      .rollbackMessage(requestHeader);
                  if (result.getResponseCode() == ResponseCode.SUCCESS) {
                      RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                      if (res.getCode() == ResponseCode.SUCCESS) {
                          // 沒有投遞操作,直接刪除,不是真正地從磁盤上刪除,只是將消息標(biāo)記為刪除
                          this.brokerController.getTransactionalMessageService().deletePrepareMessage(
                              result.getPrepareMessage());
                      }
                      return res;
                  }
              }
              // 并沒有處理 UNKNOW 的操作
              response.setCode(result.getResponseCode());
              response.setRemark(result.getResponseRemark());
              return response;
          }

          在這個(gè)方法里,會(huì)分別處理事務(wù)的提交與回滾操作,

          • 在事務(wù)的提交處理中,可以看到此時(shí)事務(wù)才真正地投遞出去,投遞出去后,會(huì)把原本的事務(wù)消息標(biāo)記為刪除;
          • 在事務(wù)的回滾操作中,直接就把原本的事務(wù)消息標(biāo)識(shí)為刪除了

          我們來(lái)看看事務(wù)消息的真正投遞過(guò)程,進(jìn)入EndTransactionProcessor#sendFinalMessage方法:

          private RemotingCommand sendFinalMessage(MessageExtBrokerInner msgInner) {
              final RemotingCommand response = RemotingCommand.createResponseCommand(null);
              // 投遞操作
              final PutMessageResult putMessageResult 
                  = this.brokerController.getMessageStore().putMessage(msgInner);
              if (putMessageResult != null) {
                  switch (putMessageResult.getPutMessageStatus()) {
                      // 省略結(jié)果的處理
                      ...
                  }
              }
          }

          這里的brokerController.getMessageStore().putMessage(...)操作,就是把消息再一次寫入到commitLog,不過(guò)此時(shí)的topicqueueId就是最初的了,接下來(lái)consumer就能對(duì)其進(jìn)行消費(fèi)了。

          5. 事務(wù)的反查機(jī)制

          前面我們分析了broker是如何處理事務(wù)消息的COMMIT_MESSAGEROLLBACK_MESSAGE狀態(tài),實(shí)際上,事務(wù)消息除了以上兩種狀態(tài)外,還有第三種狀態(tài):UNKNOW,從EndTransactionProcessor#processRequest方法來(lái)看,broker并沒有處理這種狀態(tài)!

          當(dāng)出現(xiàn)UNKNOW狀態(tài)時(shí),rocketMq該怎么辦呢?實(shí)際上,EndTransactionProcessor#processRequest沒有處理UNKNOW狀態(tài),這就表明UNKNOW狀態(tài)的事務(wù)消息既不會(huì)執(zhí)行提交操作,也不會(huì)提交回滾操作,它會(huì)由一個(gè)單獨(dú)的線程來(lái)進(jìn)行操作,這個(gè)線程就是事務(wù)消息的檢查線程。

          5.1 檢查線程的啟動(dòng)

          broker的啟動(dòng)流程中,BrokerController#start會(huì)執(zhí)行這樣一個(gè)方法:

          public void start() throws Exception {
              ...
              if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                  // 啟動(dòng)一些處理器
                  startProcessorByHa(messageStoreConfig.getBrokerRole());
                  handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
                  this.registerBrokerAll(truefalsetrue);
              }
              ...
          }

          /**
           * 在這里會(huì)啟動(dòng) 事務(wù)消息的檢查線程
           */

          private void startProcessorByHa(BrokerRole role) {
              if (BrokerRole.SLAVE != role) {
                  if (this.transactionalMessageCheckService != null) {
                      this.transactionalMessageCheckService.start();
                  }
              }
          }

          在這個(gè)方法里會(huì)TransactionalMessageCheckServicestart()方法,我們先來(lái)看看這個(gè)操作做了什么,然后就來(lái)到了ServiceThread#start方法:

          public abstract class ServiceThread implements Runnable {
              ...
              public void start() {
                  log.info(...);
                  if (!started.compareAndSet(falsetrue)) {
                      return;
                  }
                  stopped = false;
                  this.thread = new Thread(this, getServiceName());
                  this.thread.setDaemon(isDaemon);
                  this.thread.start();
              }
              ...
          }

          可以看到,TransactionalMessageCheckServicestart()方法來(lái)自于ServiceThread,在ServiceThreadstart()方法中,會(huì)啟動(dòng)一個(gè)線程來(lái)處理操作。這里我們直接進(jìn)入TransactionalMessageCheckService#run方法看看這個(gè)線程做了什么:

          @Override
          public void run() {
              log.info("Start transaction check service thread!");
              long checkInterval = brokerController.getBrokerConfig()
                  .getTransactionCheckInterval();
              while (!this.isStopped()) {
                  // 運(yùn)行操作
                  this.waitForRunning(checkInterval);
              }
              log.info("End transaction check service thread!");
          }

          跟進(jìn)ServiceThread#waitForRunning方法:

          protected void waitForRunning(long interval) {
              if (hasNotified.compareAndSet(truefalse)) {
                  // 執(zhí)行操作
                  this.onWaitEnd();
                  return;
              }

              //entry to wait
              waitPoint.reset();

              try {
                  waitPoint.await(interval, TimeUnit.MILLISECONDS);
              } catch (InterruptedException e) {
                  log.error("Interrupted", e);
              } finally {
                  hasNotified.set(false);
                  this.onWaitEnd();
              }
          }

          @Override
          protected void onWaitEnd() {
              long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
              int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
              long begin = System.currentTimeMillis();
              log.info("Begin to check prepare message, begin time:{}", begin);
              // 檢查操作
              this.brokerController.getTransactionalMessageService().check(timeout, checkMax, 
                  this.brokerController.getTransactionalMessageCheckListener());
              log.info("End to check prepare message, consumed time:{}"
                  System.currentTimeMillis() - begin);
          }

          最終會(huì)執(zhí)行到TransactionalMessageServiceImpl#check方法,這個(gè)方法就是用來(lái)處理事務(wù)消息的檢查操作的:

          public void check(long transactionTimeout, int transactionCheckMax,
              AbstractTransactionalMessageCheckListener listener)
           
          {
              try {
                  // 事務(wù)消息的隊(duì)列名
                  String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
                  // 獲取要檢查的消息隊(duì)列
                  Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
                  if (msgQueues == null || msgQueues.size() == 0) {
                      log.warn("The queue of topic is empty :" + topic);
                      return;
                  }
                  log.debug("Check topic={}, queues={}", topic, msgQueues);
                  for (MessageQueue messageQueue : msgQueues) {
                      // 省略了好多的內(nèi)容
                      ...

                      // 從隊(duì)列上獲取事務(wù)消息
                      GetResult getResult = getHalfMsg(messageQueue, i);
                      
                      // 省略了好多的內(nèi)容
                      ...

                      // 檢查事務(wù)狀態(tài)
                      listener.resolveHalfMsg(msgExt);

                      // 依然是省略了好多的內(nèi)容
                      ...
                              
                  }
              } catch (Throwable e) {
                  log.error("Check error", e);
              }
          }

          這個(gè)方法中省略了大量代碼,關(guān)鍵操作就兩個(gè):

          1. 從事務(wù)消息的topic上獲取消息
          2. 檢查消息的事務(wù)狀態(tài)

          5.2 broker發(fā)送檢查消息

          這里直接來(lái)看檢查事務(wù)狀態(tài)的操作,進(jìn)入Broker2Client#checkProducerTransactionState方法:

           public void checkProducerTransactionState(
              final String group,
              final Channel channel,
              final CheckTransactionStateRequestHeader requestHeader,
              final MessageExt messageExt)
           throws Exception 
          {
              RemotingCommand request = RemotingCommand.createRequestCommand(
                  RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
              request.setBody(MessageDecoder.encode(messageExt, false));
              try {
                  // 發(fā)送檢測(cè)消息到producer,code 為 CHECK_TRANSACTION_STATE
                  this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
              } catch (Exception e) {
                  log.error(...);
              }
          }

          從上面的代碼來(lái)看,檢查消息的codeCHECK_TRANSACTION_STATE,請(qǐng)求方式為Oneway這表明broker并不關(guān)心該消息的返回結(jié)果。

          producer收到broker發(fā)送過(guò)來(lái)的檢查消息后,又會(huì)怎么處理呢?下一小節(jié)我們?cè)俳視浴?/p>

          5.3 producer處理檢查消息

          從上一小節(jié)的分析可知,broker發(fā)送的檢查消息的codeCHECK_TRANSACTION_STATEproducer處理該code的方法為ClientRemotingProcessor#processRequest

          public RemotingCommand processRequest(ChannelHandlerContext ctx,
              RemotingCommand request)
           throws RemotingCommandException 
          {
              switch (request.getCode()) {
                  // 檢查事務(wù)狀態(tài)
                  case RequestCode.CHECK_TRANSACTION_STATE:
                      return this.checkTransactionState(ctx, request);
                  // 省略其他
                  ...
                  default:
                      break;
              }
              return null;
          }

          我們跟進(jìn)ClientRemotingProcessor#checkTransactionState方法:

          public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
                  RemotingCommand request)
           throws RemotingCommandException 
          {
              final CheckTransactionStateRequestHeader requestHeader = (CheckTransactionStateRequestHeader) 
                  request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
              final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
              final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
              if (messageExt != null) {
                  if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {
                      messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), 
                          this.mqClientFactory.getClientConfig().getNamespace()));
                  }
                  String transactionId = messageExt
                      .getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                  if (null != transactionId && !"".equals(transactionId)) {
                      messageExt.setTransactionId(transactionId);
                  }
                  final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
                  if (group != null) {
                      // 獲得一個(gè)producer
                      MQProducerInner producer = this.mqClientFactory.selectProducer(group);
                      if (producer != null) {
                          final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                          // 檢查狀態(tài)
                          producer.checkTransactionState(addr, messageExt, requestHeader);
                      } else {
                          log.debug("checkTransactionState, pick producer by group[{}] failed", group);
                      }
                  } else {
                      log.warn("checkTransactionState, pick producer group failed");
                  }
              } else {
                  log.warn("checkTransactionState, decode message failed");
              }

              return null;
          }

          這個(gè)方法雖然有點(diǎn)長(zhǎng),但主要操作就兩個(gè):

          1. 獲得一個(gè)producerthis.mqClientFactory.selectProducer(group)
          2. 檢查事務(wù)狀態(tài):producer.checkTransactionState(...)

          這里我們直接看檢查事務(wù)狀態(tài)的操作,進(jìn)入DefaultMQProducerImpl#checkTransactionState方法:

          public void checkTransactionState(final String addr, final MessageExt msg,
                  final CheckTransactionStateRequestHeader header)
           
          {

              Runnable request = new Runnable() {
                  // 省略了一堆的內(nèi)容
                  ...
              };

              this.checkExecutor.submit(request);
          }

          DefaultMQProducerImpl#checkTransactionState方法中,先是創(chuàng)建了一個(gè)Runnable對(duì)象,然后將該對(duì)象提交到checkExecutor線程池中,在本文的一開始,我們?cè)诜治?code style="overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(145, 109, 213);font-weight: bolder;background-image: none;background-position: initial;background-size: initial;background-repeat: initial;background-attachment: initial;background-origin: initial;background-clip: initial;">TransactionMQProducer的啟動(dòng)流程中就提到過(guò),它的賦值在DefaultMQProducerImpl#initTransactionEnv方法,現(xiàn)在看到了它的使用。

          根據(jù)線程池的運(yùn)行流程,它運(yùn)行的內(nèi)容主要就是Runnablerun()方法了,它的run()方法內(nèi)容如下:

          Runnable request = new Runnable() {
              private final String brokerAddr = addr;
              private final MessageExt message = msg;
              private final CheckTransactionStateRequestHeader checkRequestHeader = header;
              private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();

              @Override
              public void run() {
                  // 獲得 checkListener
                  TransactionCheckListener transactionCheckListener 
                      = DefaultMQProducerImpl.this.checkListener();
                  // 1. 獲取 listener
                  TransactionListener transactionListener = getCheckListener();
                  if (transactionCheckListener != null || transactionListener != null) {
                      LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
                      Throwable exception = null;
                      try {
                          if (transactionCheckListener != null) {
                              localTransactionState = transactionCheckListener
                                  .checkLocalTransactionState(message);
                          } else if (transactionListener != null) {
                              log.debug("Used new check API in transaction message");
                              // 2. 檢查事務(wù)狀態(tài)
                              localTransactionState = transactionListener.checkLocalTransaction(message);
                          } else {
                              log.warn(...);
                          }
                      } catch (Throwable e) {
                          log.error(...);
                          exception = e;
                      }
                      // 處理事務(wù)狀態(tài)
                      this.processTransactionState(localTransactionState, group, exception);
                  } else {
                      log.warn(...);
                  }
              }

              private void processTransactionState(
                  final LocalTransactionState localTransactionState,
                  final String producerGroup,
                  final Throwable exception)
           
          {
                  final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
                  thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
                  thisHeader.setProducerGroup(producerGroup);
                  thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
                  // 設(shè)置檢查標(biāo)記
                  thisHeader.setFromTransactionCheck(true);

                  String uniqueKey = message.getProperties()
                      .get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                  if (uniqueKey == null) {
                      uniqueKey = message.getMsgId();
                  }
                  thisHeader.setMsgId(uniqueKey);
                  thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
                  // 3. 處理事務(wù) 提交/回滾 狀態(tài)
                  switch (localTransactionState) {
                      case COMMIT_MESSAGE:
                          // 設(shè)置提交狀態(tài)為:提交
                          thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                          break;
                      case ROLLBACK_MESSAGE:
                          // 設(shè)置提交狀態(tài)為:回滾
                          thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                          log.warn(...);
                          break;
                      case UNKNOW:
                          thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                          log.warn(...);
                          break;
                      default:
                          break;
                  }

                  String remark = null;
                  if (exception != null) {
                      remark = "checkLocalTransactionState Exception: " 
                          + RemotingHelper.exceptionSimpleDesc(exception);
                  }

                  try {
                      // 4. 發(fā)送消息,結(jié)束事務(wù)
                      DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl()
                          .endTransactionOneway(brokerAddr, thisHeader, remark, 3000);
                  } catch (Exception e) {
                      log.error("endTransactionOneway exception", e);
                  }
              }
          };

          這部分的代碼有點(diǎn)長(zhǎng),不過(guò)關(guān)鍵部分就4點(diǎn):

          1. 獲取 listener,這就是我們一開始在demo里設(shè)置的TransactionListenerImpl
          2. 檢查事務(wù)狀態(tài),這里就是運(yùn)行我們自己指定的方法:TransactionListenerImpl#checkLocalTransaction
          3. 處理事務(wù) 提交/回滾 狀態(tài),這一步就是根據(jù)TransactionListenerImpl#checkLocalTransaction方法的執(zhí)行結(jié)果,來(lái)設(shè)置 提交/回滾 狀態(tài)
          4. broker發(fā)送結(jié)束事務(wù)的消息,這個(gè)同前面DefaultMQProducerImpl#sendMessageInTransaction 方法中的操作是一致的

          6. 總結(jié)

          分析完事務(wù)消息的流程后,我們來(lái)對(duì)整個(gè)流程做個(gè)總結(jié):

          這是官網(wǎng)提供的一張圖,流程如下:

          1. producer 發(fā)送一條“半消息”,broker收到后,返回“ok”,進(jìn)入第2步
          2. 執(zhí)行本地事務(wù),得到執(zhí)行結(jié)果,成功則進(jìn)行第3步,失敗則進(jìn)行第4步
          3. 本地事務(wù)執(zhí)行成功,發(fā)送“commit”消息到broker,此時(shí)第1步發(fā)送的“半消息”才真正投遞出去
          4. 本地事務(wù)執(zhí)行失敗,發(fā)送“rollback”消息到broker,此第1步發(fā)送的“半消息”就取消了,再也不會(huì)進(jìn)行發(fā)送了

          正常情況下,以上4步就滿足事務(wù)消息的流程了,但實(shí)際中可能會(huì)異常情況:第3步或第4步發(fā)送失敗了,導(dǎo)致broker中的半消息遲遲收不到回滾或提交的通知,此時(shí)就會(huì)用到回查機(jī)制:

          1. broker遲遲收不到回滾或提交的通知,發(fā)送一條單向消息給producer,通知producer反查本地的事務(wù)執(zhí)行結(jié)果
          2. producer收到broker的消息后,調(diào)用回查方法,檢查本地事務(wù)狀態(tài)
          3. producer得到本地事務(wù)的狀態(tài),再發(fā)一條單向消息告知broker此前的"半消息"是提交還是回滾

          限于作者個(gè)人水平,文中難免有錯(cuò)誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。

          本文首發(fā)于微信公眾號(hào) 「Java技術(shù)探秘」,如果您喜歡本文,歡迎關(guān)注該公眾號(hào),讓我們一起在技術(shù)的世界里探秘吧!


          瀏覽 25
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日本无码少妇内谢视频 | 色播在线永久免费视频 | 人人草在线 | 豆花视频一区二区三区入口 | 牛牛精品视频 |