Rocketmq源碼分析11:consumer 消費流程
注:本系列源碼分析基于RocketMq 4.8.0,gitee倉庫鏈接:https://gitee.com/funcy/rocketmq.git.
接上文,繼續(xù)分析consumer消費流程。
4. 拉取消息:PullMessageService
在MQClientInstance#start方法中,會啟動消息拉取的服務(wù):PullMessageService,PullMessageService是ServiceThread的子類,啟動該服務(wù)時會創(chuàng)建一個新的線程,我們直接來看PullMessageService#run()方法,
public class PullMessageService extends ServiceThread {
...
private final LinkedBlockingQueue<PullRequest> pullRequestQueue
= new LinkedBlockingQueue<PullRequest>();
/**
* 將 pullRequest 放入 pullRequestQueue 中
*/
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 從 pullRequestQueue 獲取一個 pullRequest,阻塞的方式
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
...
}
在PullMessageService#run()方法中,該方法會從pullRequestQueue中獲取一個pullRequest的操作,然后調(diào)用this.pullMessage(pullRequest)進行拉取操作,注意到pullRequest的類型為LinkedBlockingQueue,并且使用的是阻塞方法take(),因此如果LinkedBlockingQueue中沒有內(nèi)容,那take()方法就會一直在這里阻塞。
關(guān)于pullRequestQueue中的內(nèi)容是在哪里放放的,可以看到PullMessageService#executePullRequestImmediately方法中,會調(diào)用pullRequestQueue.put(pullRequest)方法放入元素。誰會調(diào)用PullMessageService#executePullRequestImmediately(...)方法呢?關(guān)于這點,我們先留個疑問,后面分析負載均衡服務(wù)時再揭曉。
我們回到PullMessageService#run()方法,該方法調(diào)用了this.pullMessage(pullRequest)方法對pullRequest做了進一步處理,我們跟進去:
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory
.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
// 繼續(xù)處理
impl.pullMessage(pullRequest);
} else {
log.warn(...);
}
}
在這個方法里,調(diào)用的是DefaultMQPushConsumerImpl#pullMessage來進一步處理pullRequest:
/**
* 拉取消息的核心流程
* @param pullRequest
*/
public void pullMessage(final PullRequest pullRequest) {
// 這里省略非常多的代碼
...
// pullCallback 是在這里生成的,這里我們并不打算討論
PullCallback pullCallback = new PullCallback() {
...
}
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
// 拉取消息
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
可以看到,這個方法里,最核心就是拉取消息的操作了,方法為PullAPIWrapper#pullKernelImpl:
public PullResult pullKernelImpl(
final MessageQueue mq,
final String subExpression,
final String expressionType,
final long subVersion,
final long offset,
final int maxNums,
final int sysFlag,
final long commitOffset,
final long brokerSuspendMaxTimeMillis,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 找到broker
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
if (null == findBrokerResult) {
// broker 為 null,更新 topic 信息后,再獲取一次
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
}
if (findBrokerResult != null) {
{
// check version
...
}
int sysFlagInner = sysFlag;
if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
// 構(gòu)建請求
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
//這里省略了好多的 requestHeader.setXxx 操作
...
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
// 從broker拉取消息
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);
return pullResult;
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
這個方法主要是組裝拉取消息的請求,組裝好之后接著就調(diào)用了MQClientAPIImpl#pullMessage方法,我們再進去一探究竟:
public PullResult pullMessage(
final String addr,
final PullMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
// 請求code為PULL_MESSAGE
RemotingCommand request = RemotingCommand
.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
// 拉取數(shù)據(jù)的幾種方式
switch (communicationMode) {
case ONEWAY:
assert false;
return null;
case ASYNC:
// 異步調(diào)用的是 pullCallback 處理
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
case SYNC:
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
break;
}
return null;
}
與發(fā)送消息一樣,rocketMq拉取消息的模式也有三種:
ONEWAY:什么也不做,直接返回nullASYNC:異步方式,拉取成功或失敗后,會在pullCallback對象中處理回調(diào)信息SYNC:同步方式,拉取的消息同步返回
由于進入的方法是異步方式,因此這里我們主要看異步方式的實現(xiàn),進入MQClientAPIImpl#pullMessageAsync方法:
private void pullMessageAsync(
final String addr,
final RemotingCommand request,
final long timeoutMillis,
final PullCallback pullCallback
) throws RemotingException, InterruptedException {
// 異步拉取
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
// 處理拉取消息的結(jié)果
RemotingCommand response = responseFuture.getResponseCommand();
// 有響應(yīng)
if (response != null) {
try {
PullResult pullResult = MQClientAPIImpl.this
.processPullResponse(response, addr);
assert pullResult != null;
// 調(diào)用 pullCallback 的 onSuccess(...) 方法
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
// 調(diào)用 pullCallback 的 onException(...) 方法
pullCallback.onException(e);
}
} else {
...
}
}
});
}
這塊的操作與producer發(fā)送異步消息的套路一模一樣,調(diào)用的同樣是remotingClient.invokeAsync(...)方法,結(jié)果處理同樣的是在InvokeCallback對象中。在InvokeCallback#operationComplete方法中,成功時會調(diào)用調(diào)用 pullCallback 的 onSuccess(...) 方法,失敗時則調(diào)用 pullCallback 的 onException(...) 方法,接下來我們來看看pullCallback的內(nèi)容。
pullCallback對象是在DefaultMQPushConsumerImpl#pullMessage方法中創(chuàng)建并傳入的,它的內(nèi)容如下:
/**
* 拉取消息的核心流程
* @param pullRequest
*/
public void pullMessage(final PullRequest pullRequest) {
// 省略其他代碼,重點關(guān)注 pullCallback
...
// 消息拉取的回調(diào)函數(shù),在拉取到消息后會進入這個方法處理
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
// 處理消息,將二制消息解碼為java對象,也會對消息進行tag過濾
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
pullRequest.getMessageQueue(), pullResult, subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
...
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null
|| pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this
.executePullRequestImmediately(pullRequest);
} else {
...
// 處理消息,處理順序與并發(fā)消息
DefaultMQPushConsumerImpl.this.consumeMessageService
.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
// 準(zhǔn)備下一次的運行
if (DefaultMQPushConsumerImpl.this
.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this
.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this
.executePullRequestImmediately(pullRequest);
}
}
...
break;
// 省略其他狀態(tài)的處理
...
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic()
.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
// 這個方法會把 pullRequest 丟到 pullRequestQueue 中
DefaultMQPushConsumerImpl.this
.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
// 省略其他代碼,重點關(guān)注 pullCallback
...
}
PullCallback主要有兩個方法:
onSuccess(...):拉取消息成功時調(diào)用,在這個方法里會解碼消息,消費消息,然后準(zhǔn)備下一次的pullQequest請求onException(...):拉取消息異常時調(diào)用,在這個方法里主要是將出現(xiàn)異常的pullQequest丟到pullRequestQueue,等待下一次再調(diào)用
接下來,我們這兩個方法進行具體分析。
4.1 消息解碼操作
處理消息解碼操作的方法為PullAPIWrapper#processPullResult,還過這個方法并不只是處理解碼,還處理了其他操作,代碼如下:
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
// 將二進制數(shù)據(jù)解碼為對象
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
// 按 tag 過濾
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
// 真正的過濾操作
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
if (this.hasHook()) {
FilterMessageContext filterMessageContext = new FilterMessageContext();
filterMessageContext.setUnitMode(unitMode);
filterMessageContext.setMsgList(msgListFilterAgain);
this.executeHook(filterMessageContext);
}
// 進一步處理過后濾的消息
for (MessageExt msg : msgListFilterAgain) {
// 事務(wù)消息的標(biāo)識
String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(traFlag)) {
msg.setTransactionId(msg.getProperty(
MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
}
// 偏移量
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
Long.toString(pullResult.getMinOffset()));
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
Long.toString(pullResult.getMaxOffset()));
msg.setBrokerName(mq.getBrokerName());
}
pullResultExt.setMsgFoundList(msgListFilterAgain);
}
pullResultExt.setMessageBinary(null);
return pullResult;
}
這個方法所做的工作有3件:
將二進制數(shù)據(jù)解碼為對象,即將 byte[]解碼為List<MessageExt>如果 consumer指定了tag,則按tag進行過濾,其實就是調(diào)用Set#contains()判斷tag是否符合條件設(shè)置消息的屬性,如 TransactionId、BrokerName
4.2 消費消息
消費消息的相關(guān)代碼為
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
消費消息的模式有兩種:
ConsumeMessageConcurrentlyService:并發(fā)消費消息ConsumeMessageOrderlyService:順序消費消息
關(guān)于這兩點的差別我們之后再分析,這里我們使用的消費模式是并發(fā)消費消息,進入ConsumeMessageConcurrentlyService#submitConsumeRequest方法:
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// 一次只拉取32條數(shù)據(jù),不足32條直接處理
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest
= new ConsumeRequest(msgs, processQueue, messageQueue);
try {
// 添加任務(wù)
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
// 超過32條就進行分頁處理,每頁都使用一個線程處理
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest
= new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
這個方法就是將獲得到的消息封裝為ConsumeRequest,然后提交到線程池中處理。在處理時,會判斷消息的多少,如消息超過32條,就會對消息進行分頁,每頁都使用一個線程處理。
ConsumeRequest最終在線程池中執(zhí)行了,根據(jù)線程的執(zhí)行規(guī)律,我們直接進入它的run方法看看做了什么:
class ConsumeRequest implements Runnable {
...
@Override
public void run() {
...
// 取出消息監(jiān)聽器
MessageListenerConcurrently listener
= ConsumeMessageConcurrentlyService.this.messageListener;
...
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(
msg, String.valueOf(System.currentTimeMillis()));
}
}
// 交由listener實際處理消息
status = listener.consumeMessage(
Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
...
}
...
// 處理結(jié)果
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn(...);
}
}
}
以上代碼做了大量的刪減,我們僅保留了重要部分,重要部分主要包含三個操作:
取出當(dāng)前 consumer的消息監(jiān)聽器執(zhí)行消息監(jiān)聽器的 consumeMessage()方法處理 consumeMessage()方法的返回值
這個consumer的消息監(jiān)聽器是個啥呢?我們在org.apache.rocketmq.example.simple.PushConsumer中是這樣注冊listener的:
public class PushConsumer {
public static void main(String[] args)
throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
...
// 注冊監(jiān)聽器,監(jiān)聽消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 這里獲得了消息
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動
consumer.start();
}
}
這里指定的MessageListenerConcurrently#consumeMessage(...)方法就是在ConsumeRequest#run()中調(diào)用的。
執(zhí)行完MessageListenerConcurrently#consumeMessage(...)方法后,接下來會處理這個方法的返回值,方法為ConsumeMessageConcurrentlyService#processConsumeResult,我們直接看關(guān)鍵代碼:
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
// 省略重試的操作,后面分析重試機制時再詳細展開
...
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
// 更新偏移量
this.defaultMQPushConsumerImpl.getOffsetStore()
.updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
在這個方法會處理兩個操作:
根據(jù)返回結(jié)果確認是否需要重試,關(guān)于重試機制這里就不展開討論了,后面分析時再詳細展開 更新消費位置的偏移量,更新時,會根據(jù)廣播模式與集群模式從而執(zhí)行不同的更新策略,這點我們一會再分析
4.3 準(zhǔn)備下一次的pullRequest請求
讓我們回到DefaultMQPushConsumerImpl#pullMessage方法,準(zhǔn)備下一次運行的代碼如下:
// 準(zhǔn)備下一次的運行
if (DefaultMQPushConsumerImpl.this
.defaultMQPushConsumer.getPullInterval() > 0) {
// 延遲 xxx 秒后進行一次 pullRequest
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this
.defaultMQPushConsumer.getPullInterval());
} else {
// 立即進行一次 pullRequest
DefaultMQPushConsumerImpl.this
.executePullRequestImmediately(pullRequest);
}
這兩個方法非常相似,區(qū)別在于,一個是延遲 xxx 秒后進行一次 pullRequest,另一個是立即進行一次 pullRequest,我們來看看它的操作:
private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
// 繼續(xù)調(diào)用
this.mQClientFactory.getPullMessageService()
.executePullRequestLater(pullRequest, timeDelay);
}
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
if (!isStopped()) {
// 只執(zhí)行一次,延遲執(zhí)行
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
// 調(diào)用的是 executePullRequestImmediately(...)
PullMessageService.this.executePullRequestImmediately(pullRequest);
}
}, timeDelay, TimeUnit.MILLISECONDS);
} else {
log.warn("PullMessageServiceScheduledThread has shutdown");
}
}
延遲獲取的操作,最終使用scheduledExecutorService來調(diào)用executePullRequestImmediately(...),需要注意的是,這個scheduledExecutorService只會執(zhí)行一次,首次執(zhí)行時間為指定的timeDelay后,也就是defaultMQPushConsumer.getPullInterval()的值。
最終,無論是延遲執(zhí)行還是立即執(zhí)行,都會調(diào)用PullMessageService#executePullRequestImmediately方法,內(nèi)容如下:
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
可以看到,這里僅是pullRequest放入pullRequestQueue中,之后PullMessageService線程就會從其中獲取到這個pullRequest,從而又一次發(fā)起獲取消息的請求了。
限于篇幅,本文就先到這里了,下篇繼續(xù)。
限于作者個人水平,文中難免有錯誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。
本文首發(fā)于微信公眾號 「Java技術(shù)探秘」,如果您喜歡本文,歡迎關(guān)注該公眾號,讓我們一起在技術(shù)的世界里探秘吧!
