全網(wǎng)最深入的RocketMQ Consumer 學習筆記

本文來源:http://r6d.cn/Zz4w
學習一下RocketMQ?- 消費者的原理和使用??
消費模式
消息消費有兩種模式:

1、并發(fā)消費
并發(fā)消費是默認的處理方法,一個消費者使用線程池技術,可以并發(fā)消費多條消息,提升機器的資源利用率。默認配置是 20 個線程,所以一臺機器默認情況下,同一瞬間可以消費 20 個消息。關注公眾后碼猿技術專欄獲取更多面試資源。
其中 ConsumeMessageConcurrentlyService 的構造函數(shù)如下:
public?ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl?defaultMQPushConsumerImpl,
????????MessageListenerConcurrently?messageListener)?{
????????this.defaultMQPushConsumerImpl?=?defaultMQPushConsumerImpl;
????????this.messageListener?=?messageListener;
????????this.defaultMQPushConsumer?=?this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
????????this.consumerGroup?=?this.defaultMQPushConsumer.getConsumerGroup();
????????this.consumeRequestQueue?=?new?LinkedBlockingQueue();
????????this.consumeExecutor?=?new?ThreadPoolExecutor(
????????????this.defaultMQPushConsumer.getConsumeThreadMin(),
????????????this.defaultMQPushConsumer.getConsumeThreadMax(),
????????????1000?*?60,
????????????TimeUnit.MILLISECONDS,
????????????this.consumeRequestQueue,
????????????new?ThreadFactoryImpl("ConsumeMessageThread_"));
????????this.scheduledExecutorService?=?Executors.newSingleThreadScheduledExecutor(new?ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
????????this.cleanExpireMsgExecutors?=?Executors.newSingleThreadScheduledExecutor(new?ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
????}
2、順序消費
有些業(yè)務場景,消息的消費需要順序性,例如購物時,下訂單、庫存校驗、支付、發(fā)送物流,雖然都屬于「購物」這個場景的子任務,但他們之間是有順序性的。如果它們業(yè)務處理通過消息解耦,那消息消費也得要有順序性。
RocketMQ 的做法就是分區(qū)有序性,首先需要發(fā)送者,將有順序的消息發(fā)往 Topic 下同一個 MessageQueue,然后消費者,順序地一個一個進行消費,消費失敗將會一直重試,前面消息消費完成才能進行下一個,所以需要在業(yè)務上確保消息失敗機制,避免消息阻塞。
冪等消費
在 RocketMQ 的設計中,是不保證消息的冪等性,這時候需要業(yè)務方自行保證,重復消費消費不會對數(shù)據(jù)造成影響,從數(shù)學意義上來說,f(x) = f(f(x)),多次計算的結果都是一致的。
RocketMQ 保證存儲在 Broker 的消息最少投遞一次,該特性保證消息一定會被消費,但由于網(wǎng)絡抖動或者其它場景,導致一條消息可能被消費多次。
在相同業(yè)務類型的消息中,這里需要考慮兩個場景
并發(fā)消費 消息消費超時后重復投遞
第一個場景很好理解,一條相同類型的消息被不同的消費者同時拉取,可能是不同發(fā)送者同時發(fā)送的,例如喜聞樂見的 A B 轉(zhuǎn)賬問題。關注公眾后碼猿技術專欄獲取更多面試資源。
第二個場景比較難遇到,默認情況,消息處理超過 15 分鐘后,將會重新投遞消費,如果原來服務器 A 還在處理中,重新投遞的消息被服務器 B 拉取了;另一種就是手動重發(fā)消息,通過控制臺可以重新發(fā)送一模一樣的消息,MessageID 和消息體跟之前一樣,這兩種情況下也會造成消息重復消費。
于是設計上,考慮了使用 Redis 做分布式鎖,通過競爭鎖來避免同時消息,以及用 Redis 暫存消費狀態(tài),設計如下:

注意點:
1、鎖資源 key 的組裝規(guī)則(【消費組】+【:】+【主題 topic】+【:】+【messageId 或者 messageKey】
2、鎖對應的狀態(tài)流轉(zhuǎn)(Processing or Successed)
3、避免處理耗時超過鎖 expire 時間,導致其它服務器訂閱消息并成功消費。加入一個定時線程池,搶到鎖資源后,組裝定時任務,進行【續(xù)時】
4、任務成功后,修改狀態(tài)為【Successed】,失效時間訂為 1h;失敗情況,清理掉所有鎖資源和定時任務,返回失敗重試策略
5、根據(jù) Redis 中保存的狀態(tài),過濾重復的消息
在消息 SDK 代碼實現(xiàn)上,通過裝飾器模式,將 MessageConsumer 包裝起來,在業(yè)務邏輯不需改動太大情況下,動態(tài)增加了冪等消費的功能。
負載均衡

上圖展示了,在一個 pullRequestQueue,可能獲取到多個消息 MessageExt,然后每個消息將會進入消費線程池中消費。
Consumer 端使用 RebalanceImpl 來實現(xiàn)負載均衡,所以想要理解拉取消息的流程,需要重點查看它實現(xiàn)。
Consumer 實例啟動時,在工廠 MQClientInstance 中能夠看到 new RebalanceService(this);,啟動了一個后臺線程,每隔 20s 進行重平衡操作 mqClientFactory.doRebalance()
同樣按照消費者的消費模式,重平衡邏輯處理分成兩個 switch 分支,接下來討論的是『并發(fā)消費』邏輯
一、獲取 MessageQueue 列表
在 RebalanceImpl 維護了一份 map 結構的本地緩存 topicSubscribeInfoTable,以 topic 維度保存了對應的 MesssageQueue 列表
Set?mqSet?=?this.topicSubscribeInfoTable.get(topic);
二、獲取在線的消費者終端列表
List?cidAll?=?this.mQClientFactory.findConsumerIdList(topic,?consumerGroup);
findConsumerIdList 方法接受兩個參數(shù):Topic 主題和 ConsumerGroup 消費組
底層通過發(fā)送 RequestCode.GET_CONSUMER_LIST_BY_GROUP 請求碼的 RemotingCommand 到 Broker 查詢在線消費者列表,拿到結果后反序列化
三、分配 MessageQueue
根據(jù)分配策略,確定當前消費者實例要從哪些 MessageQueue 獲取消息
List?allocateResult?=?
????strategy.allocate(this.consumerGroup,
??????????????????????this.mQClientFactory.getClientId(),
??????????????????????mqAll,
??????????????????????cidAll);

默認分配策略是平均分配,取當前下標 index,隊列數(shù)取余機器數(shù) mod,然后按照區(qū)間給當前應用分配。
例如有 8 個隊列,2 臺在線服務器,那平均消費 4 個隊列,4 4 分配;3 臺服務器的,按照 3 3 2 分配。
注意:
目前遇到很多業(yè)務團隊,在開發(fā)過程中,使用了相同的分組名,但是訂閱信息不一致,例如之前已經(jīng)部署了兩臺應用,本期開發(fā)時,新增了 Topic 后,反饋有些消息無法消費,查看 Topic 消費情況表現(xiàn)如下:

根本原因就是前面說的 MessageQueue 平均分配后,之前的應用沒有訂閱新 Topic,于是這些消息狀態(tài)一直處于 Not Consumed Yet, 解決方法就是統(tǒng)一訂閱信息或者更換 ConsumerGroup 進行測試。
四、刷新本地緩存 & 構建請求列表
接下來,會根據(jù)前面分配的消息隊列,來構建獲取消息的請求 pullRequest 隊列
“org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance
private?boolean?updateProcessQueueTableInRebalance(final?String?topic,?final?Set?mqSet,?final?boolean?isOrder)?{
????????boolean?changed?=?false;
????????Iterator>?it?=?this.processQueueTable.entrySet().iterator();
????????while?(it.hasNext())?{
????????????Entry?next?=?it.next();
????????????MessageQueue?mq?=?next.getKey();
????????????ProcessQueue?pq?=?next.getValue();
????????????....
????????}
????????
????????List?pullRequestList?=?new?ArrayList();
????????for?(MessageQueue?mq?:?mqSet)?{
????????????if?(!this.processQueueTable.containsKey(mq))?{
????????????????this.removeDirtyOffset(mq);
????????????????ProcessQueue?pq?=?new?ProcessQueue();
????????????????long?nextOffset?=?this.computePullFromWhere(mq);
????????????????if?(nextOffset?>=?0)?{
????????????????????ProcessQueue?pre?=?this.processQueueTable.putIfAbsent(mq,?pq);
????????????????????PullRequest?pullRequest?=?new?PullRequest();
????????????????????pullRequest.setConsumerGroup(consumerGroup);
????????????????????pullRequest.setNextOffset(nextOffset);
????????????????????pullRequest.setMessageQueue(mq);
????????????????????pullRequest.setProcessQueue(pq);
????????????????????pullRequestList.add(pullRequest);
????????????????????changed?=?true;
????????????????????}
????????????????}?
????????????}
????????}
????}
????this.dispatchPullRequest(pullRequestList);
????return?changed;
}
@Override
public?void?dispatchPullRequest(List?pullRequestList)?{
????for?(PullRequest?pullRequest?:?pullRequestList)?{
????????this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
????}
}
public?void?executePullRequestImmediately(final?PullRequest?pullRequest)?{
????this.pullRequestQueue.put(pullRequest);
}
updateProcessQueueTableInRebalance 的作用:更新訂閱關系
①、消費節(jié)點上下線
②、Topic 的隊列分區(qū)參數(shù)調(diào)整
以上兩種行為,將會影響到消息訂閱的分配,所以需要客戶端在消費消息前,先確定自己被分配到哪幾個 MessageQueue,在構建 PullRequest 時,參數(shù)中帶上監(jiān)聽的 queueId。
最后,為過濾后的消息隊列集合(mqSet)中的每個 MessageQueue 創(chuàng)建一個 ProcessQueue 對象,并存入 RebalanceImpl 的 processQueueTable 隊列中。
接著構建 PullRequest,并調(diào)用 dispatchPullRequest 方法,將拉取消息的請求放入到 pullRequestQueue 隊列中,等待后面的 PullMessageService 取出來調(diào)用。
五、后臺線程不停從 Broker 拉取消息
后臺線程是:PullMessageService
“org.apache.rocketmq.client.impl.consumer.PullMessageService#run
@Override
public?void?run()?{
????while?(!this.isStopped())?{
????????try?{
????????????PullRequest?pullRequest?=?this.pullRequestQueue.take();
????????????this.pullMessage(pullRequest);
????????}?catch?(InterruptedException?ignored)?{
????????}?catch?(Exception?e)?{
????????????log.error("Pull?Message?Service?Run?Method?exception",?e);
????????}
????}
}
pullRequestQueue 請求隊列,就是前面重平衡服務,構建好放入該隊列中的,然后在 PullMessageService 中的 run 方法,使用 while 死循環(huán),不停的去 Broker 請求新消息
六、消息消費
在獲取消息時,會注冊一個回調(diào)接口,具體入口在 MQConsumerInner,然后在 PullCallback 里調(diào)用 messageListener 進行消費,也就是我們寫的業(yè)務處理邏輯。
在正常消費完成后,將 pullRequest 重新放回拉取消息的任務隊列中,等待 PullMessageService 的下一次獲取,拉取新消息。
正常消費,業(yè)務處理沒有異常的話,將會返回 ConsumeReturnType.SUCCESS 表示成功確認,消費位點也能繼續(xù)前進。
消費失敗將會觸發(fā)補償機制
ConsumeMessageConcurrentlyService
并發(fā)模式下,它會將消息投遞到 %retry 隊列,更新當前位點,讓后面的消息繼續(xù)消費,如果該消息一直失敗,默認最多重試 16 次就會丟到死信隊列中。
ConsumeMessageOrderlyService
順序模式需要注意下,出現(xiàn)失敗它不會投遞到重試隊列,而是將一直在本地重試,直到消費成功為止,所以有可能出現(xiàn)某個 MessageQueue 消費卡住,并且后面消息都不能消費的場景,注意捕獲業(yè)務處理異常。
“org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
public?void?pullMessage(final?PullRequest?pullRequest)?{
????final?ProcessQueue?processQueue?=?pullRequest.getProcessQueue();
????...
????final?SubscriptionData?subscriptionData?=?this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
????//?構建回調(diào)接口
????PullCallback?pullCallback?=?new?PullCallback()?{
????????@Override
????????public?void?onSuccess(PullResult?pullResult)?{
????????????if?(pullResult?!=?null)?{
????????????????pullResult?=?DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(),?pullResult,?subscriptionData);
????????????????switch?(pullResult.getPullStatus())?{
????????????????????case?FOUND:
????????????????????????...
????????????????????????//?這一步消息消費,進入設定的消費邏輯?messageListener
????????????????????????if?(pullResult.getMsgFoundList()?==?null?||?pullResult.getMsgFoundList().isEmpty())?{
????????????????????????????DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
????????????????????????}?else?{
????????????????????????????firstMsgOffset?=?pullResult.getMsgFoundList().get(0).getQueueOffset();
????????????????????????????DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
????????????????????????????????pullRequest.getMessageQueue().getTopic(),?pullResult.getMsgFoundList().size());
????????????????????????????boolean?dispatchToConsume?=?processQueue.putMessage(pullResult.getMsgFoundList());
????????????????????????????DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),?processQueue,?pullRequest.getMessageQueue(),?dispatchToConsume);
????????????????????????????//?是否稍后才消費,重新扔回到請求隊列中
????????????????????????????if?(DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()?>?0)?{
????????????????????????????????DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,?DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
????????????????????????????}?else?{
????????????????????????????????DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
????????????????????????????}
????????????????????????}
????????????????????????break;
????????????...
????????????????????default:
????????????????????????break;
????????????????}
????????????}
????????}
????????@Override
????????public?void?onException(Throwable?e)?{
????????????DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,?pullTimeDelayMillsWhenException);
????????}
????};
????...
????try?{
????????//?從?Broker?端獲取消息
????????this.pullAPIWrapper.pullKernelImpl(
????????????pullRequest.getMessageQueue(),
????????????subExpression,
????????????subscriptionData.getExpressionType(),
????????????subscriptionData.getSubVersion(),
????????????pullRequest.getNextOffset(),
????????????this.defaultMQPushConsumer.getPullBatchSize(),
????????????sysFlag,
????????????commitOffsetValue,
????????????BROKER_SUSPEND_MAX_TIME_MILLIS,
????????????CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
????????????CommunicationMode.ASYNC,
????????????pullCallback
????????);
????}?catch?(Exception?e)?{
????????this.executePullRequestLater(pullRequest,?pullTimeDelayMillsWhenException);
????}
}
獲取消息的這個方法中,有兩個核心部分
①、構建消費回調(diào)函數(shù)
②、從 Broker 端獲取新消息
回調(diào)接口中,設定了對新消息的處理邏輯,包括順序消息的特殊處理,還有是否需要等待一段時間才消費,真正執(zhí)行業(yè)務方設定的消費邏輯在 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest) 或 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest 中。
然后將回調(diào)函數(shù)作為參數(shù),放入 this.pullAPIWrapper.pullKernelImpl 方法中,接收消息后,執(zhí)行回調(diào)函數(shù)來處理消息。
到這一步為止,從消息獲取到消息消費,執(zhí)行本地業(yè)務邏輯的基本流程就基本了解清楚,后面的狀態(tài)確認以及位點 offset 更新,感興趣的可以再去跟蹤一下。
小結
消費者的深入學習分成以下幾部分
消費模式 冪等消費概念 負載均衡
記錄了并發(fā)模式和廣播模式的區(qū)別,使用上需要注意的細節(jié)。
跟大家分享了一下在原生 RMQ 不支持冪等消費,同時不需要業(yè)務方做過多改造的情況下,通過封裝 SDK,在里面實現(xiàn)冪等消費的方案。
最后梳理了一下消費者如何重平衡、構建拉取消息的請求最后消費消息的代碼過程。
其中還有很多細節(jié)點還需要去了解,例如重平衡 doReblance 階段,出現(xiàn)服務器上下線,還處于消費的 MessageQueue 如何處理(看了一下有加鎖 lock 操作,避免兩臺服務器同時操作同一個隊列)的代碼如何實現(xiàn)等等
最后,MQ 的學習之旅,從點到面,還需要繼續(xù)學習,之后再見??~

