Rocketmq源碼分析10:consumer 啟動(dòng)流程
注:本系列源碼分析基于RocketMq 4.8.0,gitee倉(cāng)庫(kù)鏈接:https://gitee.com/funcy/rocketmq.git.
前面分析了producer發(fā)送消息的流程,本文我們來(lái)分析consumer消費(fèi)消息的流程。
consumer消費(fèi)消息的demo為org.apache.rocketmq.example.simple.PushConsumer,代碼如下:
public class PushConsumer {
public static void main(String[] args)
throws InterruptedException, MQClientException {
String nameServer = "localhost:9876";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20181109221800");
// 注冊(cè)監(jiān)聽(tīng)器,監(jiān)聽(tīng)消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 這里獲得了消息
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動(dòng)
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
consumer使用起來(lái)還是挺簡(jiǎn)單的,先是創(chuàng)建了一個(gè)DefaultMQPushConsumer對(duì)象,然后配置了一些屬性,比較關(guān)鍵的就是注冊(cè)消息監(jiān)聽(tīng)器(在這個(gè)監(jiān)聽(tīng)器里會(huì)獲取消息),之后就調(diào)用start()方法啟動(dòng)consumer.
接下來(lái)我們就來(lái)分析這塊的消費(fèi)過(guò)程。
1. 構(gòu)造方法:DefaultMQPushConsumer
consumer的處理類為DefaultMQPushConsumer,我們先來(lái)看看DefaultMQPushConsumer的構(gòu)造方法:
public DefaultMQPushConsumer(final String consumerGroup) {
// 這里指定了隊(duì)列分配策略
this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
}
public DefaultMQPushConsumer(final String namespace, final String consumerGroup,
RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
在構(gòu)造方法中,就只是做了一些成員變量的賦值操作,比較關(guān)鍵的是分配消息隊(duì)列的策略:allocateMessageQueueStrategy,如果指定,默認(rèn)就使用AllocateMessageQueueAveragely,即從各隊(duì)列平均獲取消息。
2. 啟動(dòng)consumer:DefaultMQPushConsumer#start
consumer的啟動(dòng)方法為DefaultMQPushConsumer#start,代碼如下:
public void start() throws MQClientException {
setConsumerGroup(
NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
// 啟動(dòng)
this.defaultMQPushConsumerImpl.start();
// 消息軌跡相關(guān)內(nèi)容,我們不關(guān)注
if (null != traceDispatcher) {
...
}
}
繼續(xù)進(jìn)入DefaultMQPushConsumerImpl#start:
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info(...);
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 客戶端
this.mQClientFactory = MQClientManager.getInstance()
.getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 設(shè)置負(fù)載均衡相關(guān)屬性
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(
this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
// 消息模式:廣播模式存在本地,集群模式存在遠(yuǎn)程(broker)
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 加載消費(fèi)信息的偏移量
this.offsetStore.load();
// 根據(jù)客戶端實(shí)例化不同的consumeMessageService:順序消息與并發(fā)消息
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService = new ConsumeMessageOrderlyService(this,
(MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService = new ConsumeMessageConcurrentlyService(this,
(MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
// 注冊(cè)消費(fèi)組
boolean registerOK = mQClientFactory.registerConsumer(
this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(
defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException(...);
}
// 啟動(dòng)
mQClientFactory.start();
log.info(...);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException(...);
default:
break;
}
// 更新 topic 的信息,從nameServer獲取數(shù)據(jù)
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
// 發(fā)送心跳,發(fā)送到所有的broker
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 負(fù)載均衡
this.mQClientFactory.rebalanceImmediately();
}
這個(gè)方法比較長(zhǎng),整個(gè)consumer的啟動(dòng)流程都在這里了,咱們挑重點(diǎn)說(shuō),來(lái)總結(jié)下這個(gè)方法做了什么。
獲取客戶端 mQClientFactory,類型為org.apache.rocketmq.client.impl.factory.MQClientInstance,如果對(duì)producer還有印象的話,我們就會(huì)發(fā)現(xiàn),producer中的mQClientFactory的類型也是它區(qū)分廣播模式與集群模式的 offsetStore,所謂的offsetStore,就是一存儲(chǔ)器,用來(lái)存儲(chǔ)當(dāng)前消費(fèi)者消費(fèi)信息的偏移量。在廣播模式中,該偏移量保存在本地文件中,而在集群模式中,該偏移量保存在遠(yuǎn)程broker中,廣播模式與集群模式,我們后面再詳細(xì)分析根據(jù)客戶端實(shí)例化不同的 consumeMessageService,這里用來(lái)區(qū)分順序消息與并發(fā)消息,依然是后面再分析啟動(dòng) mQClientFactory,也就是啟動(dòng)客戶端更新 topic信息、發(fā)送心跳信息到broker、處理負(fù)載均衡功能
以上就是DefaultMQPushConsumerImpl#start方法所做的的主要工作了。實(shí)際上,上面的1,2,3點(diǎn)都是一些配置工作,這些配置對(duì)應(yīng)的服務(wù)是在mQClientFactory.start()方法中啟動(dòng)的,我們繼續(xù)。
3. 啟動(dòng)mQClientFactory:MQClientInstance#start
我們來(lái)看看mQClientFactory的啟動(dòng)流程,進(jìn)入MQClientInstance#start:
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 獲取 nameServer 的地址
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// 啟動(dòng)客戶端的遠(yuǎn)程服務(wù),這個(gè)方法會(huì)配置netty客戶端
this.mQClientAPIImpl.start();
// 啟動(dòng)定時(shí)任務(wù)
this.startScheduledTask();
// pull服務(wù),僅對(duì)consumer啟作用
this.pullMessageService.start();
// 啟動(dòng)負(fù)載均衡服務(wù),僅對(duì)consumer啟作用
this.rebalanceService.start();
// 啟用內(nèi)部的 producer
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException(...);
default:
break;
}
}
}
在producer的啟動(dòng)過(guò)程中,也會(huì)調(diào)用這個(gè)方法,前面我們已經(jīng)分析過(guò)了一波了,這次我們?cè)?code style="overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(145, 109, 213);font-weight: bolder;background-image: none;background-position: initial;background-size: initial;background-repeat: initial;background-attachment: initial;background-origin: initial;background-clip: initial;">consumer的角度再來(lái)分析這個(gè)方法。
該方法所做的工作如下:
獲取 nameServer的地址啟動(dòng)客戶端的遠(yuǎn)程服務(wù),這個(gè)方法會(huì)配置 netty客戶端啟動(dòng)定時(shí)任務(wù) 啟動(dòng)拉取消息服務(wù) 啟動(dòng)負(fù)載均衡服務(wù)
上面的1,2與producer的流程并無(wú)區(qū)別,就不再分析了,我們來(lái)看看定時(shí)任務(wù)的啟動(dòng),進(jìn)入方法MQClientInstance#startScheduledTask:
private void startScheduledTask() {
...
// 持久化消費(fèi)者的消費(fèi)偏移量,每5秒一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 省略其他定時(shí)任務(wù)
...
}
這個(gè)方法中還啟動(dòng)了其他一些定時(shí)任務(wù),這里我們重點(diǎn)關(guān)注執(zhí)行MQClientInstance#persistAllConsumerOffset()方法的定時(shí)任務(wù),該定時(shí)任務(wù)會(huì)持久化當(dāng)前消費(fèi)者消費(fèi)消息的偏移量,在本節(jié)我們先對(duì)這個(gè)定時(shí)任務(wù)有個(gè)印象,在分析偏移量持久化一節(jié)再詳細(xì)分析持久化流程。
我們?cè)倩氐?code style="overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(145, 109, 213);font-weight: bolder;background-image: none;background-position: initial;background-size: initial;background-repeat: initial;background-attachment: initial;background-origin: initial;background-clip: initial;">MQClientInstance#start的流程,第4與第5步,主要是啟動(dòng)了兩個(gè)服務(wù):pullMessageService與rebalanceService,這個(gè)類的信息如下:
/**
* PullMessageService
*/
public class PullMessageService extends ServiceThread {
...
}
/**
* RebalanceService
*/
public class RebalanceService extends ServiceThread {
...
}
這兩個(gè)類都是ServiceThread的子類,這兩個(gè)類的start()方法也都是來(lái)自于ServiceThread:
public abstract class ServiceThread implements Runnable {
// 省略其他代碼
...
/**
* start() 方法
*/
public void start() {
log.info(...);
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
}
}
從代碼來(lái)看,ServiceThread實(shí)現(xiàn)了Runnable接口,在其start()方法中,啟動(dòng)了一個(gè)線程,線程的執(zhí)行邏輯正是來(lái)自于其子類的run()方法,因此我們要看pullMessageService與rebalanceService的start()方法執(zhí)行邏輯,只需要看對(duì)應(yīng)類的run()方法即可。
到此為止,consumer的啟動(dòng)就已經(jīng)完成了,各項(xiàng)服務(wù)也啟動(dòng)起來(lái)了,而consumer拉取消息也正是由這些服務(wù)的配合處理的,接下來(lái)我們就來(lái)分析這些服務(wù)做了什么。
限于篇幅,本文就先到這里了,下篇繼續(xù)。
限于作者個(gè)人水平,文中難免有錯(cuò)誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。
本文首發(fā)于微信公眾號(hào) 「Java技術(shù)探秘」,如果您喜歡本文,歡迎關(guān)注該公眾號(hào),讓我們一起在技術(shù)的世界里探秘吧!
