<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          全網(wǎng)最深入的RocketMQ Consumer 學習筆記

          共 10976字,需瀏覽 22分鐘

           ·

          2021-02-06 12:12


          本文來源:http://r6d.cn/Zz4w

          學習一下RocketMQ?- 消費者的原理和使用??


          消費模式

          消息消費有兩種模式:

          1、并發(fā)消費

          并發(fā)消費是默認的處理方法,一個消費者使用線程池技術,可以并發(fā)消費多條消息,提升機器的資源利用率。默認配置是 20 個線程,所以一臺機器默認情況下,同一瞬間可以消費 20 個消息。關注公眾后碼猿技術專欄獲取更多面試資源。

          其中 ConsumeMessageConcurrentlyService 的構造函數(shù)如下:

          public?ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl?defaultMQPushConsumerImpl,
          ????????MessageListenerConcurrently?messageListener)?{
          ????????this.defaultMQPushConsumerImpl?=?defaultMQPushConsumerImpl;
          ????????this.messageListener?=?messageListener;

          ????????this.defaultMQPushConsumer?=?this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
          ????????this.consumerGroup?=?this.defaultMQPushConsumer.getConsumerGroup();
          ????????this.consumeRequestQueue?=?new?LinkedBlockingQueue();

          ????????this.consumeExecutor?=?new?ThreadPoolExecutor(
          ????????????this.defaultMQPushConsumer.getConsumeThreadMin(),
          ????????????this.defaultMQPushConsumer.getConsumeThreadMax(),
          ????????????1000?*?60,
          ????????????TimeUnit.MILLISECONDS,
          ????????????this.consumeRequestQueue,
          ????????????new?ThreadFactoryImpl("ConsumeMessageThread_"));

          ????????this.scheduledExecutorService?=?Executors.newSingleThreadScheduledExecutor(new?ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
          ????????this.cleanExpireMsgExecutors?=?Executors.newSingleThreadScheduledExecutor(new?ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
          ????}

          2、順序消費

          有些業(yè)務場景,消息的消費需要順序性,例如購物時,下訂單、庫存校驗、支付、發(fā)送物流,雖然都屬于「購物」這個場景的子任務,但他們之間是有順序性的。如果它們業(yè)務處理通過消息解耦,那消息消費也得要有順序性。

          RocketMQ 的做法就是分區(qū)有序性,首先需要發(fā)送者,將有順序的消息發(fā)往 Topic 下同一個 MessageQueue,然后消費者,順序地一個一個進行消費,消費失敗將會一直重試,前面消息消費完成才能進行下一個,所以需要在業(yè)務上確保消息失敗機制,避免消息阻塞。


          冪等消費

          RocketMQ 的設計中,是不保證消息的冪等性,這時候需要業(yè)務方自行保證,重復消費消費不會對數(shù)據(jù)造成影響,從數(shù)學意義上來說,f(x) = f(f(x)),多次計算的結果都是一致的。

          RocketMQ 保證存儲在 Broker 的消息最少投遞一次,該特性保證消息一定會被消費,但由于網(wǎng)絡抖動或者其它場景,導致一條消息可能被消費多次。

          在相同業(yè)務類型的消息中,這里需要考慮兩個場景

          • 并發(fā)消費
          • 消息消費超時后重復投遞

          第一個場景很好理解,一條相同類型的消息被不同的消費者同時拉取,可能是不同發(fā)送者同時發(fā)送的,例如喜聞樂見的 A B 轉(zhuǎn)賬問題。關注公眾后碼技術專欄獲取更多面試資源。

          第二個場景比較難遇到,默認情況,消息處理超過 15 分鐘后,將會重新投遞消費,如果原來服務器 A 還在處理中,重新投遞的消息被服務器 B 拉取了;另一種就是手動重發(fā)消息,通過控制臺可以重新發(fā)送一模一樣的消息,MessageID 和消息體跟之前一樣,這兩種情況下也會造成消息重復消費。

          于是設計上,考慮了使用 Redis 做分布式鎖,通過競爭鎖來避免同時消息,以及用 Redis 暫存消費狀態(tài),設計如下:

          注意點:

          1、鎖資源 key 的組裝規(guī)則(【消費組】+【:】+【主題 topic】+【:】+【messageId 或者 messageKey】

          2、鎖對應的狀態(tài)流轉(zhuǎn)(Processing or Successed)

          3、避免處理耗時超過鎖 expire 時間,導致其它服務器訂閱消息并成功消費。加入一個定時線程池,搶到鎖資源后,組裝定時任務,進行【續(xù)時】

          4、任務成功后,修改狀態(tài)為【Successed】,失效時間訂為 1h;失敗情況,清理掉所有鎖資源和定時任務,返回失敗重試策略

          5、根據(jù) Redis 中保存的狀態(tài),過濾重復的消息

          在消息 SDK 代碼實現(xiàn)上,通過裝飾器模式,將 MessageConsumer 包裝起來,在業(yè)務邏輯不需改動太大情況下,動態(tài)增加了冪等消費的功能。


          負載均衡

          上圖展示了,在一個 pullRequestQueue,可能獲取到多個消息 MessageExt,然后每個消息將會進入消費線程池中消費。

          Consumer 端使用 RebalanceImpl 來實現(xiàn)負載均衡,所以想要理解拉取消息的流程,需要重點查看它實現(xiàn)。

          Consumer 實例啟動時,在工廠 MQClientInstance 中能夠看到 new RebalanceService(this);,啟動了一個后臺線程,每隔 20s 進行重平衡操作 mqClientFactory.doRebalance()

          同樣按照消費者的消費模式,重平衡邏輯處理分成兩個 switch 分支,接下來討論的是『并發(fā)消費』邏輯

          一、獲取 MessageQueue 列表

          RebalanceImpl 維護了一份 map 結構的本地緩存 topicSubscribeInfoTable,以 topic 維度保存了對應的 MesssageQueue 列表

          Set?mqSet?=?this.topicSubscribeInfoTable.get(topic);

          二、獲取在線的消費者終端列表

          List?cidAll?=?this.mQClientFactory.findConsumerIdList(topic,?consumerGroup);

          findConsumerIdList 方法接受兩個參數(shù):Topic 主題和 ConsumerGroup 消費組

          底層通過發(fā)送 RequestCode.GET_CONSUMER_LIST_BY_GROUP 請求碼的 RemotingCommandBroker 查詢在線消費者列表,拿到結果后反序列化

          三、分配 MessageQueue

          根據(jù)分配策略,確定當前消費者實例要從哪些 MessageQueue 獲取消息

          List?allocateResult?=?
          ????strategy.allocate(this.consumerGroup,
          ??????????????????????this.mQClientFactory.getClientId(),
          ??????????????????????mqAll,
          ??????????????????????cidAll);

          默認分配策略是平均分配,取當前下標 index,隊列數(shù)取余機器數(shù) mod,然后按照區(qū)間給當前應用分配。

          例如有 8 個隊列,2 臺在線服務器,那平均消費 4 個隊列,4 4 分配;3 臺服務器的,按照 3 3 2 分配。

          注意:

          目前遇到很多業(yè)務團隊,在開發(fā)過程中,使用了相同的分組名,但是訂閱信息不一致,例如之前已經(jīng)部署了兩臺應用,本期開發(fā)時,新增了 Topic 后,反饋有些消息無法消費,查看 Topic 消費情況表現(xiàn)如下:

          根本原因就是前面說的 MessageQueue 平均分配后,之前的應用沒有訂閱新 Topic,于是這些消息狀態(tài)一直處于 Not Consumed Yet, 解決方法就是統(tǒng)一訂閱信息或者更換 ConsumerGroup 進行測試。

          四、刷新本地緩存 & 構建請求列表

          接下來,會根據(jù)前面分配的消息隊列,來構建獲取消息的請求 pullRequest 隊列

          org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance

          private?boolean?updateProcessQueueTableInRebalance(final?String?topic,?final?Set?mqSet,?final?boolean?isOrder)?{
          ????????boolean?changed?=?false;
          ????????Iterator>?it?=?this.processQueueTable.entrySet().iterator();
          ????????while?(it.hasNext())?{
          ????????????Entry?next?=?it.next();
          ????????????MessageQueue?mq?=?next.getKey();
          ????????????ProcessQueue?pq?=?next.getValue();
          ????????????....
          ????????}
          ????????
          ????????List?pullRequestList?=?new?ArrayList();
          ????????for?(MessageQueue?mq?:?mqSet)?{
          ????????????if?(!this.processQueueTable.containsKey(mq))?{
          ????????????????this.removeDirtyOffset(mq);
          ????????????????ProcessQueue?pq?=?new?ProcessQueue();
          ????????????????long?nextOffset?=?this.computePullFromWhere(mq);
          ????????????????if?(nextOffset?>=?0)?{
          ????????????????????ProcessQueue?pre?=?this.processQueueTable.putIfAbsent(mq,?pq);
          ????????????????????PullRequest?pullRequest?=?new?PullRequest();
          ????????????????????pullRequest.setConsumerGroup(consumerGroup);
          ????????????????????pullRequest.setNextOffset(nextOffset);
          ????????????????????pullRequest.setMessageQueue(mq);
          ????????????????????pullRequest.setProcessQueue(pq);
          ????????????????????pullRequestList.add(pullRequest);
          ????????????????????changed?=?true;
          ????????????????????}
          ????????????????}?
          ????????????}
          ????????}
          ????}
          ????this.dispatchPullRequest(pullRequestList);
          ????return?changed;
          }

          @Override
          public?void?dispatchPullRequest(List?pullRequestList)?{
          ????for?(PullRequest?pullRequest?:?pullRequestList)?{
          ????????this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
          ????}
          }

          public?void?executePullRequestImmediately(final?PullRequest?pullRequest)?{
          ????this.pullRequestQueue.put(pullRequest);
          }

          updateProcessQueueTableInRebalance 的作用:更新訂閱關系

          ①、消費節(jié)點上下線 ②、Topic 的隊列分區(qū)參數(shù)調(diào)整

          以上兩種行為,將會影響到消息訂閱的分配,所以需要客戶端在消費消息前,先確定自己被分配到哪幾個 MessageQueue,在構建 PullRequest 時,參數(shù)中帶上監(jiān)聽的 queueId

          最后,為過濾后的消息隊列集合(mqSet)中的每個 MessageQueue 創(chuàng)建一個 ProcessQueue 對象,并存入 RebalanceImplprocessQueueTable 隊列中。

          接著構建 PullRequest,并調(diào)用 dispatchPullRequest 方法,將拉取消息的請求放入到 pullRequestQueue 隊列中,等待后面的 PullMessageService 取出來調(diào)用。

          五、后臺線程不停從 Broker 拉取消息

          后臺線程是:PullMessageService

          org.apache.rocketmq.client.impl.consumer.PullMessageService#run

          @Override
          public?void?run()?{
          ????while?(!this.isStopped())?{
          ????????try?{
          ????????????PullRequest?pullRequest?=?this.pullRequestQueue.take();
          ????????????this.pullMessage(pullRequest);
          ????????}?catch?(InterruptedException?ignored)?{
          ????????}?catch?(Exception?e)?{
          ????????????log.error("Pull?Message?Service?Run?Method?exception",?e);
          ????????}
          ????}
          }

          pullRequestQueue 請求隊列,就是前面重平衡服務,構建好放入該隊列中的,然后在 PullMessageService 中的 run 方法,使用 while 死循環(huán),不停的去 Broker 請求新消息

          六、消息消費

          在獲取消息時,會注冊一個回調(diào)接口,具體入口在 MQConsumerInner,然后在 PullCallback 里調(diào)用 messageListener 進行消費,也就是我們寫的業(yè)務處理邏輯。

          在正常消費完成后,將 pullRequest 重新放回拉取消息的任務隊列中,等待 PullMessageService 的下一次獲取,拉取新消息。

          正常消費,業(yè)務處理沒有異常的話,將會返回 ConsumeReturnType.SUCCESS 表示成功確認,消費位點也能繼續(xù)前進。

          消費失敗將會觸發(fā)補償機制

          • ConsumeMessageConcurrentlyService

          并發(fā)模式下,它會將消息投遞到 %retry 隊列,更新當前位點,讓后面的消息繼續(xù)消費,如果該消息一直失敗,默認最多重試 16 次就會丟到死信隊列中。

          • ConsumeMessageOrderlyService

          順序模式需要注意下,出現(xiàn)失敗它不會投遞到重試隊列,而是將一直在本地重試,直到消費成功為止,所以有可能出現(xiàn)某個 MessageQueue 消費卡住,并且后面消息都不能消費的場景,注意捕獲業(yè)務處理異常。

          org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

          public?void?pullMessage(final?PullRequest?pullRequest)?{
          ????final?ProcessQueue?processQueue?=?pullRequest.getProcessQueue();
          ????...
          ????final?SubscriptionData?subscriptionData?=?this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
          ????//?構建回調(diào)接口
          ????PullCallback?pullCallback?=?new?PullCallback()?{
          ????????@Override
          ????????public?void?onSuccess(PullResult?pullResult)?{
          ????????????if?(pullResult?!=?null)?{
          ????????????????pullResult?=?DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(),?pullResult,?subscriptionData);
          ????????????????switch?(pullResult.getPullStatus())?{
          ????????????????????case?FOUND:
          ????????????????????????...
          ????????????????????????//?這一步消息消費,進入設定的消費邏輯?messageListener
          ????????????????????????if?(pullResult.getMsgFoundList()?==?null?||?pullResult.getMsgFoundList().isEmpty())?{
          ????????????????????????????DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
          ????????????????????????}?else?{
          ????????????????????????????firstMsgOffset?=?pullResult.getMsgFoundList().get(0).getQueueOffset();
          ????????????????????????????DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
          ????????????????????????????????pullRequest.getMessageQueue().getTopic(),?pullResult.getMsgFoundList().size());
          ????????????????????????????boolean?dispatchToConsume?=?processQueue.putMessage(pullResult.getMsgFoundList());
          ????????????????????????????DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),?processQueue,?pullRequest.getMessageQueue(),?dispatchToConsume);
          ????????????????????????????//?是否稍后才消費,重新扔回到請求隊列中
          ????????????????????????????if?(DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()?>?0)?{
          ????????????????????????????????DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,?DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
          ????????????????????????????}?else?{
          ????????????????????????????????DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
          ????????????????????????????}
          ????????????????????????}
          ????????????????????????break;
          ????????????...
          ????????????????????default:
          ????????????????????????break;
          ????????????????}
          ????????????}
          ????????}
          ????????@Override
          ????????public?void?onException(Throwable?e)?{
          ????????????DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,?pullTimeDelayMillsWhenException);
          ????????}
          ????};
          ????...
          ????try?{
          ????????//?從?Broker?端獲取消息
          ????????this.pullAPIWrapper.pullKernelImpl(
          ????????????pullRequest.getMessageQueue(),
          ????????????subExpression,
          ????????????subscriptionData.getExpressionType(),
          ????????????subscriptionData.getSubVersion(),
          ????????????pullRequest.getNextOffset(),
          ????????????this.defaultMQPushConsumer.getPullBatchSize(),
          ????????????sysFlag,
          ????????????commitOffsetValue,
          ????????????BROKER_SUSPEND_MAX_TIME_MILLIS,
          ????????????CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
          ????????????CommunicationMode.ASYNC,
          ????????????pullCallback
          ????????);
          ????}?catch?(Exception?e)?{
          ????????this.executePullRequestLater(pullRequest,?pullTimeDelayMillsWhenException);
          ????}
          }

          獲取消息的這個方法中,有兩個核心部分

          ①、構建消費回調(diào)函數(shù) ②、從 Broker 端獲取新消息

          回調(diào)接口中,設定了對新消息的處理邏輯,包括順序消息的特殊處理,還有是否需要等待一段時間才消費,真正執(zhí)行業(yè)務方設定的消費邏輯在 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest)DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest 中。

          然后將回調(diào)函數(shù)作為參數(shù),放入 this.pullAPIWrapper.pullKernelImpl 方法中,接收消息后,執(zhí)行回調(diào)函數(shù)來處理消息。

          到這一步為止,從消息獲取到消息消費,執(zhí)行本地業(yè)務邏輯的基本流程就基本了解清楚,后面的狀態(tài)確認以及位點 offset 更新,感興趣的可以再去跟蹤一下。


          小結

          消費者的深入學習分成以下幾部分

          • 消費模式
          • 冪等消費概念
          • 負載均衡

          記錄了并發(fā)模式和廣播模式的區(qū)別,使用上需要注意的細節(jié)。

          跟大家分享了一下在原生 RMQ 不支持冪等消費,同時不需要業(yè)務方做過多改造的情況下,通過封裝 SDK,在里面實現(xiàn)冪等消費的方案。

          最后梳理了一下消費者如何重平衡、構建拉取消息的請求最后消費消息的代碼過程。

          其中還有很多細節(jié)點還需要去了解,例如重平衡 doReblance 階段,出現(xiàn)服務器上下線,還處于消費的 MessageQueue 如何處理(看了一下有加鎖 lock 操作,避免兩臺服務器同時操作同一個隊列)的代碼如何實現(xiàn)等等

          最后,MQ 的學習之旅,從點到面,還需要繼續(xù)學習,之后再見??~


          嘿,你在看嗎?
          瀏覽 89
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  超碰人人在线观看 | gogo高清无码 | 俺也去网av | 亚洲日韩色图 | 久久AV电影 |