Rocketmq源碼分析06:broker 消息分發(fā)流程
注:本系列源碼分析基于RocketMq 4.8.0,gitee倉庫鏈接:https://gitee.com/funcy/rocketmq.git.
RocketMq消息處理整個流程如下:

消息接收:消息接收是指接收 producer的消息,處理類是SendMessageProcessor,將消息寫入到commigLog文件后,接收流程處理完畢;消息分發(fā): broker處理消息分發(fā)的類是ReputMessageService,它會啟動一個線程,不斷地將commitLong分到到對應的consumerQueue,這一步操作會寫兩個文件:consumerQueue與indexFile,寫入后,消息分發(fā)流程處理 完畢;消息投遞:消息投遞是指將消息發(fā)往 consumer的流程,consumer會發(fā)起獲取消息的請求,broker收到請求后,調(diào)用PullMessageProcessor類處理,從consumerQueue文件獲取消息,返回給consumer后,投遞流程處理完畢。
以上就是rocketMq處理消息的流程了,接下來我們就從源碼來分析消息分發(fā)的實現(xiàn)。
1. 分發(fā)線程的啟動
消息寫入到commitlog后,接著broker會對這些消息進行分發(fā)操作,這里的分發(fā),是指broker將消息寫入到consumerQueue文件中。
broker消息分發(fā)的操作是在一個單獨的線程中進行的,這里我們來回憶下BrokerController的啟動流程,進入BrokerController#start方法:
public void start() throws Exception {
// 啟動各組件
if (this.messageStore != null) {
this.messageStore.start();
}
...
}
繼續(xù)進入DefaultMessageStore#start方法:
public void start() throws Exception {
...
// 處理 maxPhysicalPosInLogicQueue 的值
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
}
}
}
if (maxPhysicalPosInLogicQueue < 0) {
maxPhysicalPosInLogicQueue = 0;
}
if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
}
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
// 消息分發(fā)操作,啟動新線程來處理
this.reputMessageService.start();
...
}
在BrokerController啟動時,會處理maxPhysicalPosInLogicQueue的值,這個值就是分發(fā)commitlog消息的偏移量,之后就啟動ReputMessageService服務(wù)來處理。ReputMessageService是DefaultMessageStore的內(nèi)部類,它是ServiceThread的子類,start()方法如下:
public abstract class ServiceThread implements Runnable {
public void start() {
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
}
...
}
這個方法僅僅是處理線程的啟動,我們繼續(xù)看ServiceThread。ServiceThread是Runnable的子類,它的run()方法如下:
class ReputMessageService extends ServiceThread {
@Override
public void run() {
DefaultMessageStore.log.info(...);
while (!this.isStopped()) {
try {
Thread.sleep(1);
// 調(diào)用的是 doReput() 方法
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(...);
}
}
DefaultMessageStore.log.info(...);
}
}
從ReputMessageService#run()方法來看,該線程會休眠1ms,然后調(diào)用doReput()方法處理,看來doReput()方法就是關(guān)鍵了!
2. 消息分發(fā):DefaultMessageStore.ReputMessageService#doReput
我們進入DefaultMessageStore.ReputMessageService#doReput方法:
private void doReput() {
// 處理 reputFromOffset
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
log.warn(...);
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
// 從CommitLog中獲取需要進行轉(zhuǎn)發(fā)的消息
SelectMappedBufferResult result
= DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 檢驗數(shù)據(jù)
DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog
.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1
? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
// 分發(fā)消息
DefaultMessageStore.this.doDispatch(dispatchRequest);
// 長輪詢:如果有消息到了主節(jié)點,并且開啟了長輪詢
if (BrokerRole.SLAVE != DefaultMessageStore.this
.getMessageStoreConfig().getBrokerRole()
&&DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){
// 調(diào)用NotifyMessageArrivingListener的arriving方法
DefaultMessageStore.this.messageArrivingListener.arriving(
dispatchRequest.getTopic(),
dispatchRequest.getQueueId(),
dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(),
dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(),
dispatchRequest.getPropertiesMap());
}
...
} else if (size == 0) {
...
}
} else if (!dispatchRequest.isSuccess()) {
...
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}
該方法依舊很長,我們重點關(guān)注與分發(fā)相關(guān)的流程:
commitLog.getData(...):從CommitLog中獲取DispatchRequest需要分發(fā)的消息,參數(shù)reputFromOffset就是消息在文件中的偏移量this.doDispatch(...):分發(fā)操作,就是把消息的相關(guān)寫入ConsumeQueue與IndexFile兩個文件中如果當前節(jié)點為主節(jié)點,且啟用了長輪詢,則調(diào)用 NotifyMessageArrivingListener的arriving方法,在這里會把消息主動投遞到consumer
總的來說,當消息寫入到commitLog后,ReputMessage會根據(jù)上一次分發(fā)消息的偏移量依次從commitLog文件中讀取消息信息,寫入到ConsumeQueue與IndexFile兩個文件中,當然了,這里寫入的只是消息的發(fā)送時間、在commitLog中的位置信息,完整的消息只有commitLog文件才存在。
寫完這兩個文件后,接下來就等待consumer來拉取消息了。當然,consumer主動來拉取可能會導致消息無法實時送達,為解決這個問題,rocketMq給出的解決方案是長輪詢,具體為:如果當前沒有消息,就hold住consumer的請求30s,這30s內(nèi)一旦有消息過來,就及時喚醒consumer的請求,實際將消息發(fā)送出去,就也是NotifyMessageArrivingListener#arriving方法所做的工作,關(guān)于這點我們在分析consumer拉取消息時再詳細分析。
我們再來看看消息分發(fā)消息,進入DefaultMessageStore#doDispatch:
public class DefaultMessageStore implements MessageStore {
private final LinkedList<CommitLogDispatcher> dispatcherList;
/**
* DefaultMessageStore 構(gòu)造方法
*/
public DefaultMessageStore(...) throws IOException {
...
// 消息分發(fā)處理
this.dispatcherList = new LinkedList<>();
// 寫入 ConsumeQueue 文件
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
// 寫入 Index 文件
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
...
}
/**
* 分發(fā)操作
*/
public void doDispatch(DispatchRequest req) {
// 進行分發(fā)操作,dispatcherList 包含兩個對象:
// 1. CommitLogDispatcherBuildConsumeQueue:寫入 ConsumeQueue 文件
// 2. CommitLogDispatcherBuildIndex:寫入 Index 文件
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
}
}
}
從整個方法的運行來看,DefaultMessageStore在創(chuàng)建時,會準備兩個CommitLogDispatcher:
CommitLogDispatcherBuildConsumeQueue:處理ConsumeQueue文件的寫入CommitLogDispatcherBuildIndex:處理IndexFile文件的寫入
在DefaultMessageStore#doDispatch方法中,就是對這兩個文件的寫入操作了:
/**
* consumerQueue 文件分發(fā)的構(gòu)建器
*/
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// 將消息在commitLog文件的位置、tags等信息寫入ConsumerQueue文件
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
/**
* indexFile 文件分發(fā)的構(gòu)建器
*/
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}
需要注意的是,在這兩個文件中,寫入的僅是消息的位置信息,完整的消息內(nèi)容僅在commitLog中保存。
3. 總結(jié)
本文主要分析了broker消息分發(fā)分發(fā),這里說的分發(fā)流程,是指broker將消息寫入到consumerQueue文件的流程。
在broker啟動時,會啟動一個專門的線程:ReputMessageService,該線程會不停地從comsumer獲取消息,然后將其寫入到consumerQueue文件與IndexFile文件中。
當消息分發(fā)到consumerQueue文件后,接著consumer就可以很方便地從各隊列中獲取消息了,下一篇我們來分析broker是如何響應consumer獲取消息請求的。
限于作者個人水平,文中難免有錯誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。
本文首發(fā)于微信公眾號 Java技術(shù)探秘,如果您喜歡本文,歡迎關(guān)注該公眾號,讓我們一起在技術(shù)的世界里探秘吧!
