Rocketmq源碼分析12:consumer 負載均衡
注:本系列源碼分析基于RocketMq 4.8.0,gitee倉庫鏈接:https://gitee.com/funcy/rocketmq.git.
接上文,繼續(xù)分析consumer消費流程。
5. 如何選擇消息隊列:RebalanceService
讓我們回到PullMessageService#run()方法:
public class PullMessageService extends ServiceThread {
...
private final LinkedBlockingQueue<PullRequest> pullRequestQueue
= new LinkedBlockingQueue<PullRequest>();
/**
* 將 pullRequest 放入 pullRequestQueue 中
*/
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 從 pullRequestQueue 獲取一個 pullRequest,阻塞的方式
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
...
}
PullMessageService線程獲得了pullRequest后,然后就開始了一次又一次的拉起消息的操作,那這個pullRequest最初是在哪里添加進來的呢?這就是本節(jié)要分析的「負載均衡」功能了。
處理負載均衡的線程為RebalanceService,它是在MQClientInstance#start方法中啟動的,我們直接進入其run()方法:
public class RebalanceService extends ServiceThread {
// 省略其他
...
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
在它的run()方法中,僅是調(diào)用了MQClientInstance#doRebalance方法,我們繼續(xù)進入:
public void doRebalance() {
// consumerTable 存放的就是當前 consumer
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
在MQClientInstance#doRebalance方法中,會遍歷所有的consumer,然后調(diào)用DefaultMQPushConsumerImpl#doRebalance方法作進一步的處理,consumerTable就是用來保存DefaultMQPushConsumerImpl實例的,繼續(xù)進入DefaultMQPushConsumerImpl#doRebalance方法:
@Override
public void doRebalance() {
if (!this.pause) {
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
繼續(xù)跟進,來到RebalanceImpl#doRebalance方法:
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
// 客戶端負載均衡:根據(jù)主題來處理負載均衡
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
/**
* 這就是最張?zhí)幚碡撦d均衡的地方了
*/
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
// 廣播模式:不需要處理負載均衡,每個消費者都要消費,只需要更新負載信息
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
// 更新負載均衡信息,這里傳入的參數(shù)是mqSet,即所有隊列
boolean changed = this
.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info(...);
}
} else {
log.warn(...);
}
break;
}
// 集群模式
case CLUSTERING: {
// 根據(jù)訂閱的主題獲取消息隊列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 客戶端id,根據(jù) topic 與 consumerGroup 獲取所有的 consumerId
List<String> cidAll = this.mQClientFactory
.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn(...);
}
}
if (null == cidAll) {
log.warn(...);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// 排序后才能保證消費者負載策略相對穩(wěn)定
Collections.sort(mqAll);
Collections.sort(cidAll);
// MessageQueue 的負載策略
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 按負載策略進行分配,返回當前消費者實際訂閱的messageQueue集合
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error(...);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// 更新負載均衡信息,傳入?yún)?shù)是 allocateResultSet,即當前consumer分配到的隊列
boolean changed = this
.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(...);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
RebalanceImpl#rebalanceByTopic方法就是最終處理負載均衡的方法了,在這個方法里會區(qū)分廣播模式與集群模式的處理。
在廣播模式下,一條消息會被同一個消費組中的所有consumer消費,而集群模式下,一條消息只會被同一個消費組下的一個consumer消費。
正是因為如此,廣播模式下并沒有負載均衡可言,直接把所有的隊列都分配給當前consumer處理,然后更新QueueTable的負載均衡信息;而集群模式會先分配當前consumer消費的消息隊列,再更新QueueTable的負載均衡信息。
這里我們來看看集群模式,看看它的操作:
strategy.allocate(...):按負載均衡策略為當前consumer分配隊列updateProcessQueueTableInRebalance(...):更新負載均衡信息。
在rocketMq中,提供了這些負載均衡策略:
AllocateMessageQueueAveragely:平均負載策略,rocketMq默認使用的策略AllocateMessageQueueAveragelyByCircle:環(huán)形平均分配,這個和平均分配唯一的區(qū)別就是,再分隊列的時候,平均隊列是將屬于自己的MessageQueue全部拿走,而環(huán)形平均則是,一人拿一個,拿到的Queue不是連續(xù)的。AllocateMessageQueueByConfig:用戶自定義配置AllocateMessageQueueByMachineRoom:同機房負載策略,這個策略就是當前Consumer只負載處在指定的機房內(nèi)的MessageQueue,brokerName的命名必須要按要求的格式來設置:機房名@brokerNameAllocateMachineRoomNearby:就近機房負載策略,在AllocateMessageQueueByMachineRoom策略中,如果同一機房中只有MessageQueue而沒有consumer,那這個MessageQueue上的消息該如何消費呢?AllocateMachineRoomNearby就是擴充了該功能的處理AllocateMessageQueueConsistentHash:一致性哈希策略
這里我們重點來分析平均負載策略AllocateMessageQueueAveragely:
public List<MessageQueue> allocate(String consumerGroup, String currentCID,
List<MessageQueue> mqAll, List<String> cidAll) {
// 返回值
List<MessageQueue> result = new ArrayList<MessageQueue>();
// 省略一些判斷操作
...
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
// 1. 消費者數(shù)量大于隊列數(shù)量:averageSize = 1
// 2. 消費者數(shù)量小于等于隊列數(shù)量:averageSize = 隊列數(shù)量 / 消費者數(shù)量,還要處理個+1的操作
int averageSize = mqAll.size() <= cidAll.size()
? 1 : (mod > 0 && index < mod
? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
這個方法中,關鍵的分配方法就在后面幾行,如果只看代碼,會感覺有點暈,這里我舉一個例子來簡單解釋下:
假設:messageQueue一共有6個,consumer有4個,當前consumer的index為1,有了這些前提后,接下來我們就來看它的分配過程了。
計算取余操作:
6 % 4 = 2,這表明messageQueue不能平均分配給每個consumer,接下來就來看看這個余數(shù)2是如何處理的計算每個
consumer平均處理的messageQueue數(shù)量消費者索引 0 1 2 3 處理數(shù)量 2 2 1 1 這里需要注意,如果 consumer數(shù)量大于messageQueue數(shù)量,那每個consumer最多只會分配到一個messageQueue,這種情況下,余數(shù)2不會進行處理,并且有的consumer處理的messageQueue數(shù)量為0,同一個messageQueue不會同時被兩個及以上的consumer消費掉這里的 messageQueue數(shù)量為6,consumer為4,計算得到每個consumer處理的隊列數(shù)最少為1,除此之外,為了實現(xiàn)“平均”,有2個consumer會需要多處理1個messageQueue,按“平均”的分配原則,如果index小于mod,則會分配多1個messageQueue,這里的mod為2,結果如下:分配完每個
consumer處理的messageQueue數(shù)量后,這些messageQueue該如何分配呢?從代碼來看,分配時會先分配完一個consumer,再分配下一個consumer,最終結果就是這樣:隊列 Q0 Q1 Q2 Q3 Q4 Q5 消費者 C1 C1 C2 C2 C4 C5
從圖中可以看到,在6個messageQueue、4個consumer、當前consumer的index為1的情況下,當前consumer會分到2個隊列,分別為Q2/Q3.
將messageQueue分配完成后,接下來就是更新負載信息了,方法為RebalanceImpl#updateProcessQueueTableInRebalance:
private boolean updateProcessQueueTableInRebalance(final String topic,
final Set<MessageQueue> mqSet, final boolean isOrder) {
...
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn(...);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
}
// pullRequest 最初產(chǎn)生的地方:mq 不存在,就添加
else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
// 添加 pullRequest
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 發(fā)布
this.dispatchPullRequest(pullRequestList);
return changed;
}
這個方法中最最關鍵的就是pullRequestList的添加操作了:先遍歷傳入的MessageQueue,如果當前consumer沒有消費過該messageQueue,則添加一個新的pullRequest到pullRequestList,之后就是發(fā)布pullRequestList了。
看到這里,我們就應該能明白,最初的pullRequest就是在這里產(chǎn)生的,而發(fā)布pullRequestList的操作,就是將pullRequest丟給pullMessageService線程處理了:
/**
* RebalancePushImpl#dispatchPullRequest:發(fā)布pullRequest的操作
*/
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
// 在這里執(zhí)行pullRequest,其實就是把 pullRequest 添加到
// PullMessageService#pullRequestQueue 中
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
限于篇幅,本文就先到這里了,下篇繼續(xù)。
限于作者個人水平,文中難免有錯誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權,非商業(yè)轉(zhuǎn)載請注明出處。
本文首發(fā)于微信公眾號 「Java技術探秘」,如果您喜歡本文,歡迎關注該公眾號,讓我們一起在技術的世界里探秘吧!
