<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          徹底看懂RocketMQ事務(wù)實(shí)現(xiàn)原理

          共 7405字,需瀏覽 15分鐘

           ·

          2020-09-25 05:46

          ????????

          面試中經(jīng)常會(huì)問(wèn)到比如RocketMQ的事務(wù)是如何實(shí)現(xiàn)的呢?學(xué)習(xí)框架,我們不僅要熟練使用,更要掌握設(shè)計(jì)及原理,才算熟悉一個(gè)框架。


          1 RocketMQ 事務(wù)使用案例

          public class CreateOrderService {
          @Autowired private OrderDao orderDao; @Autowired private ExecutorService executorService;
          private TransactionMQProducer producer;
          // 初始化transactionListener 和 producer @Init public void init() throws MQClientException { TransactionListener transactionListener = createTransactionListener(); producer = new TransactionMQProducer("myGroup"); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); }
          // 創(chuàng)建訂單服務(wù)的請(qǐng)求入口 @PUT @RequestMapping(...) public boolean createOrder(@RequestBody CreateOrderRequest request) { // 根據(jù)創(chuàng)建訂單請(qǐng)求創(chuàng)建一條消息 Message msg = createMessage(request); // 發(fā)送事務(wù)消息 SendResult sendResult = producer.sendMessageInTransaction(msg, request); // 返回:事務(wù)是否成功 return sendResult.getSendStatus() == SendStatus.SEND_OK; }
          private TransactionListener createTransactionListener() { return new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { CreateOrderRequest request = (CreateOrderRequest ) arg; try { // 執(zhí)行本地事務(wù)創(chuàng)建訂單 orderDao.createOrderInDB(request); // 如果沒(méi)拋異常說(shuō)明執(zhí)行成功,提交事務(wù)消息 return LocalTransactionState.COMMIT_MESSAGE; } catch (Throwable t) { // 失敗則直接回滾事務(wù)消息 return LocalTransactionState.ROLLBACK_MESSAGE; } } // 反查本地事務(wù) @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 從消息中獲得訂單ID String orderId = msg.getUserProperty("orderId");
          // 去db查詢訂單號(hào)是否存在,若存在則提交事務(wù) // 若不存在,可能是本地事務(wù)失敗了,也可能是本地事務(wù)還在執(zhí)行,所以返回UNKNOW return orderDao.isOrderIdExistsInDB(orderId)? LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW; } }; }}


          如上案例展示了一個(gè)訂單創(chuàng)建服務(wù),即往db插一條訂單記錄,并發(fā)一條創(chuàng)建訂單的消息,要求寫(xiě)db和發(fā)消息倆個(gè)操作在一個(gè)事務(wù)內(nèi)執(zhí)行。
          首先在init()方法中初始化了transactionListener和發(fā)生RocketMQ事務(wù)消息的變量producer。


          • createOrder()
            真正提供創(chuàng)建訂單服務(wù)的方法,根據(jù)請(qǐng)求的參數(shù)創(chuàng)建一條消息,然后調(diào)用 producer發(fā)事務(wù)消息,并返回事務(wù)執(zhí)行結(jié)果。

          • createTransactionListener()
            在init()方法中調(diào)用,構(gòu)造實(shí)現(xiàn)RocketMQ的TransactionListener接口的匿名類,該接口需要實(shí)現(xiàn)如下兩個(gè)方法:

            • executeLocalTransaction:執(zhí)行本地事務(wù),在這里我們直接把訂單數(shù)據(jù)插入到數(shù)據(jù)庫(kù)中,并返回本地事務(wù)的執(zhí)行結(jié)果。

            • checkLocalTransaction:反查本地事務(wù),上述流程中是在db中查詢訂單號(hào)是否存在,若存在則提交事務(wù),若不存在,可能本地事務(wù)失敗了,也可能本地事務(wù)還在執(zhí)行,所以返回UNKNOW


          這樣便使用RocketMQ的事務(wù)簡(jiǎn)單實(shí)現(xiàn)了一個(gè)創(chuàng)建訂單的分布式事務(wù)。




          2 RocketMQ事務(wù)消息實(shí)現(xiàn)原理





          2.1 Pro端如何發(fā)事務(wù)消息?




          DefaultMQProducerImpl#sendMessageInTransaction

          public TransactionSendResult sendMessageInTransaction(final Message msg,    final LocalTransactionExecuter localTransactionExecuter, final Object arg)    throws MQClientException {    TransactionListener transactionListener = getCheckListener();    if (null == localTransactionExecuter && null == transactionListener) {        throw new MQClientException("tranExecutor is null", null);    }
          // ignore DelayTimeLevel parameter if (msg.getDelayTimeLevel() != 0) { MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); }
          Validators.checkMessage(msg, this.defaultMQProducer);
          SendResult sendResult = null; // 給待發(fā)送消息添加屬性,表明是一個(gè)事務(wù)消息(即半消息) MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); // 像發(fā)送普通消息一樣,把這條事務(wù)消息發(fā)往Broker try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); }
          LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { // 事務(wù)消息發(fā)送成功 case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { log.debug("Used new transaction API"); // 開(kāi)始執(zhí)行本地事務(wù) localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; }
          if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; }
          // 事務(wù)過(guò)程的最后,給Broker發(fā)送提交或回滾事務(wù)的RPC請(qǐng)求。 try { this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); }
          TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult;}


          有事務(wù)反查機(jī)制作兜底,該RPC請(qǐng)求即使失敗或丟失,也不會(huì)影響事務(wù)最終的結(jié)果。最后構(gòu)建事務(wù)消息的發(fā)送結(jié)果,并返回。




          2.2 Broker端如何處理事務(wù)消息?



          SendMessageProcessor#asyncSendMessage




          跟進(jìn)去看看真正處理半消息的業(yè)務(wù)邏輯,這段處理邏輯在類


          TransactionalMessageBridge


          • putHalfMessage

          • parseHalfMessageInner



            RocketMQ并非將事務(wù)消息保存至消息中 client 指定的 queue,而是記錄了原始的 topic 和 queue 后,把這個(gè)事務(wù)消息保存在

          設(shè)計(jì)思想
          • 特殊的內(nèi)部 topic:RMQ_SYS_TRANS_HALF_TOPIC

          • 序號(hào)為 0 的 queue


          這套 topic 和 queue 對(duì)消費(fèi)者不可見(jiàn),因此里面的消息也永遠(yuǎn)不會(huì)被消費(fèi)。這就保證在事務(wù)提交成功之前,這個(gè)事務(wù)消息對(duì) Consumer 是消費(fèi)不到的。





          2.3 Broker端如何事務(wù)反查?


          在Broker的TransactionalMessageCheckService服務(wù)中啟動(dòng)了一個(gè)定時(shí)器,定時(shí)從事務(wù)消息queue中讀出所有待反查的事務(wù)消息。


          AbstractTransactionalMessageCheckListener#resolveHalfMsg


          • 針對(duì)每個(gè)需要反查的半消息,Broker會(huì)給對(duì)應(yīng)的Producer發(fā)一個(gè)要求執(zhí)行事務(wù)狀態(tài)反查的RPC請(qǐng)求

          • AbstractTransactionalMessageCheckListener#sendCheckMessage

          • Broker2Client#checkProducerTransactionState

            根據(jù)RPC返回響應(yīng)中的反查結(jié)果,來(lái)決定這個(gè)半消息是需要提交還是回滾,或者后續(xù)繼續(xù)來(lái)反查。


          最后,提交或者回滾事務(wù)。首先把半消息標(biāo)記為已處理


          1. 如果是提交事務(wù),就把半消息從半消息隊(duì)列中復(fù)制到該消息真正的topic和queue中

          2. 如果是回滾事務(wù),什么都不做

          EndTransactionProcessor#processRequest


          最后結(jié)束該事務(wù)。



          3 總結(jié)


          • 整體實(shí)現(xiàn)流程


          RocketMQ是基于兩階段提交來(lái)實(shí)現(xiàn)的事務(wù),把這些事務(wù)消息暫存在一個(gè)特殊的queue中,待事務(wù)提交后再移動(dòng)到業(yè)務(wù)隊(duì)列中。最后,RocketMQ的事務(wù)適用于解決本地事務(wù)和發(fā)消息的數(shù)據(jù)一致性問(wèn)題。


          參考

          • https://juejin.im/post/6844904193526857742

          點(diǎn)個(gè)在看支持我吧,轉(zhuǎn)發(fā)就更好了
          瀏覽 51
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  动漫操逼视频免费看 | 亚洲人爱免费视频 | 三级久久| 菠萝内射视频在线 | 999av |