Rocketmq源碼分析08:producer 啟動(dòng)流程
注:本系列源碼分析基于RocketMq 4.8.0,gitee倉(cāng)庫(kù)鏈接:https://gitee.com/funcy/rocketmq.git.
本文我們來(lái)分析rocketMq producer 發(fā)送消息的流程.
producer發(fā)送消息的示例在org.apache.rocketmq.example.simple.Producer類(lèi)中,代碼如下:
public class Producer {
public static void main(String[] args)
throws MQClientException, InterruptedException {
String nameServer = "localhost:9876";
// 1. 創(chuàng)建 DefaultMQProducer 對(duì)象
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr(nameServer);
// 2. 啟動(dòng) producer
producer.start();
for (int i = 0; i < 1; i++)
try {
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 3. 發(fā)送消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
以上代碼分三步走:
創(chuàng)建 DefaultMQProducer對(duì)象啟動(dòng) producer發(fā)送消息
接下來(lái)我們的分析也按這三步進(jìn)行。
1. DefaultMQProducer構(gòu)造方法
DefaultMQProducer構(gòu)造方法代碼如下:
public DefaultMQProducer(final String producerGroup) {
// 繼續(xù)調(diào)用
this(null, producerGroup, null);
}
/**
* 最終調(diào)用的構(gòu)造方法
*/
public DefaultMQProducer(final String namespace,
final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
這個(gè)方法就是簡(jiǎn)單地賦了值,然后創(chuàng)建了DefaultMQProducerImpl實(shí)例,我們繼續(xù)看DefaultMQProducerImpl的構(gòu)造方法:
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
// 異步發(fā)送的隊(duì)列
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
// 處理異步發(fā)送的線(xiàn)程池
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
}
這個(gè)構(gòu)造方法依然還是處理賦值操作,并沒(méi)做什么實(shí)質(zhì)性?xún)?nèi)容,就不繼續(xù)深究了。
2. DefaultMQProducer#start:?jiǎn)?dòng)producer
接著我們來(lái)看看producer的啟動(dòng)流程,進(jìn)入DefaultMQProducer#start方法:
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
// 調(diào)用 defaultMQProducerImpl 的 start() 方法
this.defaultMQProducerImpl.start();
// 消息軌跡相關(guān),我們不關(guān)注
if (null != traceDispatcher) {
...
}
}
這個(gè)方法先是調(diào)用了defaultMQProducerImpl#start方法,然后處理消息軌跡相關(guān)操作,關(guān)于rocketMq消息軌跡相關(guān)內(nèi)容,本文就不過(guò)多探討了,我們將目光聚集于DefaultMQProducerImpl#start(boolean)方法:
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 檢查一些配置信息
this.checkConfig();
// 修改當(dāng)前的 instanceName 為當(dāng)前進(jìn)程id
if (!this.defaultMQProducer.getProducerGroup()
.equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 獲取mq實(shí)例
this.mQClientFactory = MQClientManager.getInstance()
.getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 注冊(cè) mqClient 實(shí)例
boolean registerOK = mQClientFactory.registerProducer(
this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException(...);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(),
new TopicPublishInfo());
// 啟動(dòng)實(shí)例
if (startFactory) {
mQClientFactory.start();
}
log.info(...);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException(...);
default:
break;
}
// 發(fā)送心跳到所有的broker
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 定時(shí)掃描異步請(qǐng)求的返回結(jié)果
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
}
}, 1000 * 3, 1000);
}
這個(gè)方法并不復(fù)雜相關(guān)內(nèi)容都已經(jīng)作了注釋?zhuān)@里重點(diǎn)提出3個(gè)方法:
mQClientFactory.start():執(zhí)行方法為MQClientInstance#start,這個(gè)方法里會(huì)啟動(dòng)一些組件,我們稍后會(huì)分析。mQClientFactory.sendHeartbeatToAllBrokerWithLock():發(fā)送心跳到所有的broker,最終執(zhí)行的方法為MQClientAPIImpl#sendHearbeat:
這里是與public int sendHearbeat(
final String addr,
final HeartbeatData heartbeatData,
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
// request 的 code 為 HEART_BEAT
RemotingCommand request = RemotingCommand
.createRequestCommand(RequestCode.HEART_BEAT, null);
request.setLanguage(clientConfig.getLanguage());
request.setBody(heartbeatData.encode());
// 異步調(diào)用
RemotingCommand response = this.remotingClient
.invokeSync(addr, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return response.getVersion();
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}broker通信,request的code為HEART_BEAT,后面的分析中我們會(huì)看到,producer也會(huì)同nameServer通信。定時(shí)掃描異步請(qǐng)求的返回結(jié)果:最終執(zhí)行的方法為 RequestFutureTable.scanExpiredRequest(),關(guān)于該方法的內(nèi)容,我們?cè)诜治?code style="overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(145, 109, 213);font-weight: bolder;background-image: none;background-position: initial;background-size: initial;background-repeat: initial;background-attachment: initial;background-origin: initial;background-clip: initial;">producer發(fā)送異步消息時(shí)再分析。
2.1 MQClientInstance#start:?jiǎn)?dòng)MQClientInstance
接下來(lái)我們來(lái)看看MQClientInstance的啟動(dòng),方法為MQClientInstance#start,代碼如下:
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 獲取 nameServer 的地址
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// 啟動(dòng)遠(yuǎn)程服務(wù),這個(gè)方法只是裝配了netty客戶(hù)端相關(guān)配置
// 注意:1. 這里是netty客戶(hù)端,2. 這里并沒(méi)有創(chuàng)建連接
this.mQClientAPIImpl.start();
// 啟動(dòng)定時(shí)任務(wù)
this.startScheduledTask();
// pull服務(wù),僅對(duì)consumer啟作用
this.pullMessageService.start();
// 啟動(dòng)負(fù)載均衡服務(wù),僅對(duì)consumer啟作用
this.rebalanceService.start();
// 啟用內(nèi)部的 producer
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException(...);
default:
break;
}
}
}
這個(gè)方法進(jìn)行的操作在注釋中已經(jīng)說(shuō)明得很清楚了,接下來(lái)我們對(duì)以上的部分操作做進(jìn)一步分析。
1. mQClientAPIImpl.start():配置netty客戶(hù)端
這里調(diào)用的是NettyRemotingClient#start方法,代碼如下:
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
...
});
// 這里使用的是Bootstrap而非ServerBootstrap,表示這是netty客戶(hù)端
Bootstrap handler = this.bootstrap
.group(this.eventLoopGroupWorker)
.channel(NioSocketChannel.class)
.option(...)
// 省略各種option
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 省略pipeline的裝配
...
}
});
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
...
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
對(duì)于這個(gè)方法,說(shuō)明有兩個(gè)點(diǎn):
方法里使用的是 Bootstrap而非ServerBootstrap,表示這是netty客戶(hù)端整個(gè)方法中并沒(méi)有創(chuàng)建連接
2. startScheduledTask():?jiǎn)?dòng)定時(shí)任務(wù)
啟動(dòng)定時(shí)任務(wù)的方法為MQClientInstance#startScheduledTask,代碼如下:
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
// 定時(shí)獲取 nameServer 的地址
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
// 定時(shí)更新topic的路由信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
// 定時(shí)發(fā)送心跳信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
// 持久化消息者的消費(fèi)偏移量,可以放在本地文件,也可以推送到 broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 調(diào)整線(xiàn)程池的線(xiàn)程數(shù)量,并沒(méi)有用上
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
這里共有5個(gè)定時(shí)任務(wù):
定時(shí)獲取 nameServer的地址,MQClientInstance#start一開(kāi)始會(huì)調(diào)用MQClientAPIImpl#fetchNameServerAddr獲取nameServer,這里也調(diào)用了這個(gè)方法定時(shí)更新 topic的路由信息,這里會(huì)去nameServer獲取路由信息,之后再分析定時(shí)發(fā)送心跳信息到 nameServer,在DefaultMQProducerImpl#start(boolean)中,我們也提到了向nameServer發(fā)送心跳信息,兩處調(diào)用的是同一個(gè)方法持久化消費(fèi)者的消費(fèi)偏移量,這個(gè)僅對(duì)消費(fèi)者 consumer有效,后面分析消費(fèi)者時(shí)再作分析調(diào)整線(xiàn)程池的線(xiàn)程數(shù)量,不過(guò)追蹤到最后,發(fā)現(xiàn)這個(gè)并沒(méi)有生效,就不多說(shuō)了
這里我們重點(diǎn)來(lái)看topic路由信息的獲取,我們經(jīng)過(guò)對(duì)MQClientInstance#updateTopicRouteInfoFromNameServer()的一路追蹤,我們來(lái)到了MQClientAPIImpl#getTopicRouteInfoFromNameServer(...)
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
// 發(fā)送請(qǐng)求的 code 為 GET_ROUTEINFO_BY_TOPIC
RemotingCommand request = RemotingCommand
.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
...
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
}
...
}
...
}
這里發(fā)送向NameServer發(fā)送消息的code是GET_ROUTEINFO_BY_TOPIC,這點(diǎn)在前面分析nameServer的消息處理時(shí)也分析過(guò)了,并且還分析了當(dāng)消息送達(dá)nameServer后,nameServer是如何返回topic數(shù)據(jù)的,遺忘的小伙伴可以看下之前分析nameServer的文章。
限于篇幅,本文就先到這里了,本文主要是分析producer啟動(dòng)流程,下一篇文章將分析消息發(fā)送流程。
限于作者個(gè)人水平,文中難免有錯(cuò)誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。
本文首發(fā)于微信公眾號(hào) 「Java技術(shù)探秘」,如果您喜歡本文,歡迎關(guān)注該公眾號(hào),讓我們一起在技術(shù)的世界里探秘吧!
