Rocketmq源碼分析01:搭建源碼調(diào)試環(huán)境
1. 基本架構(gòu)
RocketMQ架構(gòu)上主要分為四部分,如下圖所示:

Producer:消息發(fā)布的角色,支持分布式集群方式部署。Producer通過(guò)MQ的負(fù)載均衡模塊選擇相應(yīng)的Broker集群隊(duì)列進(jìn)行消息投遞,投遞的過(guò)程支持快速失敗并且低延遲。Consumer:消息消費(fèi)的角色,支持分布式集群方式部署。支持以push推,pull拉兩種模式對(duì)消息進(jìn)行消費(fèi)。同時(shí)也支持集群方式和廣播方式的消費(fèi),它提供實(shí)時(shí)消息訂閱機(jī)制,可以滿足大多數(shù)用戶的需求。NameServer:NameServer是一個(gè)非常簡(jiǎn)單的Topic路由注冊(cè)中心,其角色類似Dubbo中的zookeeper,支持Broker的動(dòng)態(tài)注冊(cè)與發(fā)現(xiàn)。主要包括兩個(gè)功能:NameServer通常也是集群的方式部署,各實(shí)例間相互不進(jìn)行信息通訊。Broker是向每一臺(tái)NameServer注冊(cè)自己的路由信息,所以每一個(gè)NameServer實(shí)例上面都保存一份完整的路由信息。當(dāng)某個(gè)NameServer因某種原因下線了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以動(dòng)態(tài)感知Broker的路由的信息。Broker管理,NameServer接受Broker集群的注冊(cè)信息并且保存下來(lái)作為路由信息的基本數(shù)據(jù)。然后提供心跳檢測(cè)機(jī)制,檢查Broker是否還存活;路由信息管理,每個(gè) NameServer將保存關(guān)于Broker集群的整個(gè)路由信息和用于客戶端查詢的隊(duì)列信息。然后Producer和Conumser通過(guò)NameServer就可以知道整個(gè)Broker集群的路由信息,從而進(jìn)行消息的投遞和消費(fèi)。BrokerServer:Broker主要負(fù)責(zé)消息的存儲(chǔ)、投遞和查詢以及服務(wù)高可用保證,為了實(shí)現(xiàn)這些功能,Broker包含了以下幾個(gè)重要子模塊:Client Manager:負(fù)責(zé)管理客戶端(Producer/Consumer)和維護(hù)Consumer的Topic訂閱信息Store Service:提供方便簡(jiǎn)單的API接口處理消息存儲(chǔ)到物理硬盤和查詢功能。HA Service:高可用服務(wù),提供Master Broker和Slave Broker之間的數(shù)據(jù)同步功能。Index Service:根據(jù)特定的Message key對(duì)投遞到Broker的消息進(jìn)行索引服務(wù),以提供消息的快速查詢。Remoting Module:整個(gè)Broker的實(shí)體,負(fù)責(zé)處理來(lái)自clients端的請(qǐng)求。

2. 獲取源碼
rocketMq項(xiàng)目的github倉(cāng)庫(kù)為https://github.com/apache/rocketmq.git,由于網(wǎng)絡(luò)原因,我們并不會(huì)直接使用github倉(cāng)庫(kù),而是將其導(dǎo)入到gitee上,只需在gitee創(chuàng)建新倉(cāng)庫(kù)時(shí),選擇導(dǎo)入已有倉(cāng)庫(kù)即可:

導(dǎo)入到gitee后,就可以進(jìn)行checkout了,本文對(duì)應(yīng)的gitee倉(cāng)庫(kù)為https://gitee.com/funcy/rocketmq.git。
checkout源碼到本地后,默認(rèn)是master分支,本人習(xí)慣基于tag創(chuàng)建自己的分支,然后在自己的分支上進(jìn)行分析,rocketMq的tag如下:

最新版本是4.8.0,我們將基于此tag創(chuàng)建新分支,使用的命令如下:
# 切換到 rocketmq-all-4.8.0
git checkout rocketmq-all-4.8.0
# 基于 rocketmq-all-4.8.0 創(chuàng)建自己的分析,名稱為 rocketmq-all-4.8.0-LEARN
git checkout -b rocketmq-all-4.8.0-LEARN
# 將 rocketmq-all-4.8.0-LEARN 分支推送到遠(yuǎn)程倉(cāng)庫(kù)
git push -u origin rocketmq-all-4.8.0-LEARN
接下來(lái),我們所有的操作都是在rocketmq-all-4.8.0-LEARN分支上進(jìn)行了。
3. 本地啟動(dòng)
拿到代碼后,我們就開(kāi)始進(jìn)行本地啟動(dòng)了,沒(méi)錯(cuò),就是在idea中進(jìn)行啟動(dòng)。
3.1 復(fù)制conf目錄
在啟動(dòng)項(xiàng)目前,我們需要進(jìn)行一些配置,rocketMq項(xiàng)目的配置文件位于rocketmq/distribution模塊下的conf目錄中,直接整個(gè)復(fù)制到rocketmq目錄下:

也不需要改動(dòng),復(fù)制出來(lái)就行了,這些配置的內(nèi)容后面分析源碼時(shí)再講解吧。
3.2 啟動(dòng)nameServer
nameServer的主類為org.apache.rocketmq.namesrv.NamesrvStartup:

如果我們直接運(yùn)行main()方法,會(huì)報(bào)錯(cuò):

報(bào)錯(cuò)信息已經(jīng)很明確了,需要我們配置ROCKETMQ_HOME目錄,我們?cè)?code style="font-size: 14px;overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">idea中進(jìn)行配置即可:
打開(kāi)配置界面:

填寫ROCKETMQ_HOME配置:

這里我填寫的是ROCKETMQ_HOME=/Users/chengyan/IdeaProjects/myproject/rocketmq,這個(gè)ROCKETMQ_HOME路徑就是conf文件夾所在的目錄。
填寫好后,就可以啟動(dòng)了:

3.3 啟動(dòng)broker
broker的主類為org.apache.rocketmq.broker.BrokerStartup,啟動(dòng)方式與nameServer很相似,啟動(dòng)前也要配置ROCKETMQ_HOME路徑:

相比于nameServer,這里多配置了啟動(dòng)參數(shù):
-n localhost:9876 autoCreateTopicEnable=true
這個(gè)啟動(dòng)參數(shù)是指定nameServer的地址,以及開(kāi)啟自動(dòng)創(chuàng)建topic的功能。
配置完成之后就可以啟動(dòng)了:

3.4 啟動(dòng)管理后臺(tái)
rocketMq的管理后臺(tái)在另一個(gè)倉(cāng)庫(kù)https://github.com/apache/rocketmq-externals,除了后臺(tái),這個(gè)倉(cāng)庫(kù)還包含了許多的其他模塊:

我們并不需要分析這個(gè)項(xiàng)目,源碼本可以不必下載,但我在找這個(gè)項(xiàng)目的release版本時(shí),發(fā)現(xiàn)并沒(méi)有提供已編譯好的jar包,需要自己構(gòu)建代碼,因此我就再次下載了這個(gè)代碼源碼。當(dāng)然,由于網(wǎng)絡(luò)的原因,這個(gè)項(xiàng)目的源碼也被我導(dǎo)入到了gitee上,地址為https://gitee.com/funcy/rocketmq-externals.git.
這個(gè)項(xiàng)目的代碼我們并不分析,因此直接在master分支上操作即可,
管理后臺(tái)項(xiàng)目為rocketmq-console,主類為org.apache.rocketmq.console.App:

在啟動(dòng)前,我們需要修改下application.properties的配置,找到rocketmq.config.namesrvAddr配置,添加nameServer的ip與端口,這里我們連接的是本地應(yīng)用,直接填寫localhost:9876:
...
rocketmq.config.namesrvAddr=localhost:9876
...
啟動(dòng),結(jié)果如下:

訪問(wèn)http://localhost:8080,結(jié)果如下:

可以看到broker已經(jīng)出現(xiàn)在cluster列表中了,這就表明啟動(dòng)成功了。
4. 收發(fā)消息測(cè)試
rocketMq項(xiàng)目的example模塊下有大量的測(cè)試示例,我們選擇其一進(jìn)行消息收發(fā)測(cè)試。
4.1 啟動(dòng)Consumer
我們先找到org.apache.rocketmq.example.simple.PushConsumer,代碼如下:
public class PushConsumer {
public static void main(String[] args)
throws InterruptedException, MQClientException {
String nameServer = "localhost:9876";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
這個(gè)Consumer監(jiān)聽(tīng)的topic是TopicTest,后面我們就會(huì)往這個(gè)topic發(fā)送消息。另外,需要注意nameServer的配置,我們是在本地啟動(dòng)的nameServer,因此這里配置的是localhost:9876。
運(yùn)行main()方法,結(jié)果如下:

4.2 啟動(dòng)Producer
我們找到 org.apache.rocketmq.example.simple.Producer 類,代碼如下:
public class Producer {
public static void main(String[] args)
throws MQClientException, InterruptedException {
String nameServer = "localhost:9876";
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr(nameServer);
producer.start();
for (int i = 0; i < 10; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
同樣地,這里使用的是的nameServer地址是localhost:9876,topic 是TopicTest,運(yùn)行,結(jié)果如下:

再回過(guò)頭看看PushConsumer的控制臺(tái):

可以看到,Producer發(fā)送消息成功了,PushConsumer也成功獲取到消息了。
4.3 異常分析
如圖所示:

如果出現(xiàn)異常:
org.apache.rocketmq.client.exception.MQClientException:
No route info of this topic: TopicTest
這表明當(dāng)前broker中沒(méi)有TopicTest的topic,這時(shí)我們可以手動(dòng)創(chuàng)建topic,也可以在啟動(dòng)時(shí)指定autoCreateTopicEnable=true.
如果是按上面步驟進(jìn)行的,請(qǐng)確認(rèn)下org.apache.rocketmq.broker.BrokerStartup是否配置啟動(dòng)參數(shù)
-n localhost:9876 autoCreateTopicEnable=true
配置方式就按3.3節(jié)的方式配置就行了。
5. 總結(jié)
本文主要介紹了rocketMq的基本架構(gòu),通過(guò)源碼展示了rocketMq的啟動(dòng)方式,最后通過(guò)rocketMq項(xiàng)目下example模塊中的測(cè)試代碼展示了消息的收發(fā)過(guò)程。
總的來(lái)說(shuō),本文還是在準(zhǔn)備源碼分析的環(huán)境,下篇文章開(kāi)始,我們就正式開(kāi)始rocketMq的源碼分析了。
限于作者個(gè)人水平,文中難免有錯(cuò)誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。
本文首發(fā)于微信公眾號(hào) Java技術(shù)探秘,如果您喜歡本文,歡迎關(guān)注該公眾號(hào),讓我們一起在技術(shù)的世界里探秘吧!
