RocketMQ消息發(fā)送流程
記得點(diǎn)擊?"歡少的成長之路",?設(shè)為星標(biāo)?
后臺(tái)點(diǎn)擊【聯(lián)系我】,申請加入優(yōu)質(zhì)技術(shù)學(xué)習(xí)社群
大家好,我是Leo。
今天聊一下RocketMQ消息發(fā)送,重試機(jī)制,故障延遲機(jī)制,獲取路由機(jī)制,消息隊(duì)列的選擇
消息發(fā)送
關(guān)系圖
首先放一下Broker Cluster,Broker,Topic,Queue的關(guān)系圖。因?yàn)橄挛闹饕獣?huì)沿著這四塊進(jìn)行梳理

發(fā)送的三種方式
消息發(fā)送的三種方式
同步:發(fā)送者向MQ發(fā)送一條消息后,一直等待服務(wù)器返回成功才繼續(xù)下一個(gè)。 異步:發(fā)送者向MQ發(fā)送一條消息后,通過回調(diào)函數(shù)調(diào)用消息發(fā)布函數(shù)繼續(xù)發(fā)送,主線程立即返回。 單向:發(fā)送者向MQ發(fā)送一條消息后,直接返回,不等待消息服務(wù)器的結(jié)果,也不注冊函數(shù),簡單來說,就是只管發(fā)。其他啥也不管。
源碼
package?org.apache.rocketmq.client.impl;
/**
?*?消息發(fā)送的三種方式
?*/
public?enum?CommunicationMode?{
????//?同步發(fā)送
????SYNC,
????//?異步發(fā)送
????ASYNC,
????//?單向發(fā)送
????ONEWAY,
}
重試機(jī)制
RocketMQ的重試機(jī)制,主要由下列兩個(gè)參數(shù)決定。默認(rèn)重試次數(shù)為2次,重試機(jī)制提高了消息發(fā)送成功的幾率。
/**
?*?同步模式下內(nèi)部嘗試發(fā)送消息的最大次數(shù)
?*/
private?int?retryTimesWhenSendFailed?=?2
/**
?*?異步模式下內(nèi)部嘗試發(fā)送消息的最大次數(shù)
?*/
private?int?retryTimesWhenSendAsyncFailed?=?2;
故障延遲機(jī)制
RocketMQ的故障延遲機(jī)制,主要由下列參數(shù)決定,默認(rèn)是不開啟的。故障延遲機(jī)制,主要體現(xiàn)在集群的時(shí)候,當(dāng)broker發(fā)送錯(cuò)誤時(shí),可以有效的規(guī)避多次發(fā)送消息都發(fā)往一個(gè)broker(queue)的錯(cuò)誤。
/**
?*??默認(rèn)不啟用Broker故障延遲機(jī)制。
?*/
private?boolean?sendLatencyFaultEnable?=?false;
獲取路由信息機(jī)制
消息在發(fā)送時(shí),需要知道,要發(fā)往哪個(gè)broker。首先會(huì)去?brokerAddrTable?中查找當(dāng)前brokerName是否存在在本地的緩存中
如果成功返回brokerName 否則就返回null
/*?Broker?Name?*/???/*?brokerId?*/????/*?address?*/
private?final?ConcurrentMap>?brokerAddrTable;
如果成功一帆風(fēng)順,如果找不到的話,肯定要做一些安全處理。
如果找不到的話,會(huì)通過?tryToFindTopicPublishInfo?函數(shù)嘗試查找主題發(fā)布信息
在? topicPublishInfoTable?緩存中根據(jù)topic名稱查找是否存在如果沒有緩存,會(huì)創(chuàng)建一個(gè)以topic名稱為key,空? TopicPublishInfo?為value到?topicPublishInfoTable?,然后更新到?NameServer?。如果消息的路由信息存在,并且? MessageQueue?不為空 直接返回路由信息否則使用默認(rèn)主題
/*?topic?*/
private?final?ConcurrentMap?topicPublishInfoTable
消息隊(duì)列的選擇
開啟故障延遲
遍歷主題隊(duì)列的消息隊(duì)列,根據(jù)訪問次數(shù)進(jìn)行隨機(jī)自增取模。
如果當(dāng)前消息隊(duì)列是可用的就直接返回。函數(shù)名?isAvailable
如果是不可用的,從失敗的brokeName列表中通過?pickOneAtLeast?函數(shù)選擇一個(gè)可用的broker。拿到brokerName之后,再根據(jù)brokerName反查這個(gè)隊(duì)列的寫隊(duì)列數(shù)
如果小于0說明該broker依據(jù)恢復(fù),從失敗的條目中移出當(dāng)前broker 如果大于0通過? selectOneMessageQueue?函數(shù)選出一個(gè)消息隊(duì)列
/**
?*?失敗的broker列表
?*/
private?final?LatencyFaultTolerance??latencyFaultTolerance;
沒有開啟故障延遲
如果上一次選擇的執(zhí)行發(fā)送消息失敗的broker名稱為空,它會(huì)通過?selectOneMessageQueue()?函數(shù)對當(dāng)前訪問的次數(shù)取絕對值,然后與消息隊(duì)列的大小取模得到一個(gè)下標(biāo),然后從?messageQueueList?中根據(jù)下標(biāo)取出?MessageQueue
如果上一次選擇的執(zhí)行發(fā)送消息失敗的broker名稱不為空,會(huì)遍歷消息隊(duì)列,對當(dāng)前訪問的次數(shù)取絕對值,然后與消息隊(duì)列的大小取模得到一個(gè)下標(biāo)后,拿著下標(biāo)獲取對應(yīng)的?brokerName?并且判斷當(dāng)前的?brokerName?是否與上一次發(fā)送消息失敗的?brokerName?相等,
如果相等就遍歷所有主題內(nèi)的消息隊(duì)列。假如還是沒有找到一個(gè)合適的,就會(huì)隨機(jī)選擇一個(gè) 如果不相等,就把當(dāng)前下標(biāo)的? MessageQueue?返出去。
下圖的json字符串就是?MessageQueue?信息
/**
?*?該主題隊(duì)列的消息隊(duì)列
?*/
private?List??messageQueueList?=?new?ArrayList??();
[
????{
????????"brokerName":?"broker-a",
????????"queueId":?0
????},
????{
????????"brokerName":?"broker-a",
????????"queueId":?1
????},?{
????????"brokerName":?"broker-b",
????????"queueId":?0
????},?{
????????"brokerName":?"broker-b",
????????"queueId":?1
????},?{
????????"brokerName":?"broker-c",
????????"queueId":?0
????},?{
????????"brokerName":?"broker-c",
????????"queueId":?1
????}
]
開啟故障延遲機(jī)制中的可用依據(jù)是:檢查時(shí)間是否到達(dá)了下次可使用的時(shí)間點(diǎn)
如果沒有該機(jī)制,如果broker宕機(jī),由于路由算法中的消息隊(duì)列是按broker排序的,順序選擇,如果上一次根據(jù)路由算法選擇的是宕機(jī)的broker的第一個(gè)隊(duì)列,那么隨后的下次選擇的是宕機(jī)broker的第二個(gè)隊(duì)列,消息發(fā)送很有可能會(huì)失敗,再次引發(fā)重試,帶來不必要的性能損耗。
selectOneMessageQueue()也可以看成是兜底策略-輪詢算法
同步發(fā)送
由上文得知,消息發(fā)送有三種方式。我們先看一下同步發(fā)送主要做了哪些事情。
DefaultMQProducerImpl的send函數(shù)是發(fā)送消息的入口
通過? makeSureStateOK?函數(shù)檢查服務(wù)狀態(tài)是否正常通過? checkMessage?函數(shù)校驗(yàn)?Message?與?DefaultMQProducer?是否符合發(fā)送的規(guī)則校驗(yàn)消息的主題不能等于消息隊(duì)列集合的主題信息以及以上操作是否超時(shí) 校驗(yàn)brokerName是否存在,如果不存在通過? findBrokerAddressInPublish?函數(shù)去nameserver拉取通過? brokerVIPChannel?函數(shù)校驗(yàn)是否使用了vip管道,如果使用了管道在原來的基礎(chǔ)上把?端口-2通過配置信息獲取生成uniqId的算法規(guī)則以及封裝? Message?的實(shí)例信息對? Message 的 body?信息進(jìn)行壓縮獲取當(dāng)前的配置信息,是否啟用事務(wù)。 封裝發(fā)送消息模板權(quán)限信息? SendMessageContext,構(gòu)造請求頭發(fā)送之前,校驗(yàn)一下? Topid?類型是否屬于重試類型消息(這里可以看看下列注釋)通過? CommunicationMode?枚舉類型判斷當(dāng)前是什么發(fā)送方式判斷當(dāng)前是正常指令發(fā)送,還是RPC指令發(fā)送,判斷是否對字段進(jìn)行壓縮處理(簡化壓縮有助于提速序列化速度) 根據(jù)broker地址獲取Netty對應(yīng)的Channel,并遠(yuǎn)程調(diào)用(這里的發(fā)送,用的是Netty框架) 通過? processSendResponse?函數(shù)處理同步返回的參數(shù),如果參數(shù)為0,說明發(fā)送成功。最后封裝?SendResult?返回
第四步中,如果在nameserver拉取不到,說明服務(wù)宕機(jī)了。
第五步中,vip的管道配置從配置文件中的com.rocketmq.sendMessageWithVIPChannel得知
第六步中,批量信息不支持壓縮
第十步中,如果是重試消息,通過獲取自定義重試次數(shù),在請求頭區(qū)分特別處理
第十一步中,因?yàn)檫@里介紹的是同步發(fā)送,就只寫同步發(fā)送流程了,異步,單向會(huì)在下面段落體現(xiàn)出來
第十二步中,通過配置文件中org.apache.rocketmq.client.sendSmartMsg得知字段是否簡化壓縮
異步發(fā)送
聊完同步發(fā)送,我們看一下異步發(fā)送
DefaultMQProducerImpl的send函數(shù)是發(fā)送消息的入口(這里跟同步的區(qū)別是多了一個(gè)?SendCallback)
在生產(chǎn)者生產(chǎn)消息發(fā)送時(shí),通過? ExecutorService?新增一個(gè)異步任務(wù)進(jìn)行發(fā)送(可看下列注釋,可看源碼區(qū))通過? makeSureStateOK?函數(shù)檢查服務(wù)狀態(tài)是否正常通過? checkMessage?函數(shù)校驗(yàn)?Message?與?DefaultMQProducer?是否符合發(fā)送的規(guī)則校驗(yàn)消息的主題不能等于消息隊(duì)列集合的主題信息以及以上操作是否超時(shí) 校驗(yàn)brokerName是否存在,如果不存在通過? findBrokerAddressInPublish?函數(shù)去nameserver拉取通過? brokerVIPChannel?函數(shù)校驗(yàn)是否使用了vip管道,如果使用了管道在原來的基礎(chǔ)上把?端口-2通過配置信息獲取生成uniqId的算法規(guī)則以及封裝? Message?的實(shí)例信息對? Message 的 body?信息進(jìn)行壓縮獲取當(dāng)前的配置信息,是否啟用事務(wù)。 封裝發(fā)送消息模板權(quán)限信息? SendMessageContext,構(gòu)造請求頭發(fā)送之前,校驗(yàn)一下? Topid?類型是否屬于重試類型消息(這里可以看看下列注釋)通過? CommunicationMode?枚舉類型判斷當(dāng)前是什么發(fā)送方式判斷當(dāng)前是正常指令發(fā)送,還是RPC指令發(fā)送,判斷是否對字段進(jìn)行壓縮處理(簡化壓縮有助于提速序列化速度) 根據(jù)broker地址獲取Netty對應(yīng)的Channel,并遠(yuǎn)程調(diào)用(這里的發(fā)送,用的是Netty框架) 通過? processSendResponse?函數(shù)處理并且利用委托?remotingClient.invokeAsync?等待返回的SendResult?結(jié)構(gòu)體上一步驟再插一句,異步發(fā)送會(huì)處理一個(gè)? updateFaultItem?函數(shù)記錄當(dāng)前?不可以時(shí)間/可用時(shí)間?時(shí)間
第一步中 借助 java.util.concurrent.ExecutorService ,實(shí)現(xiàn)一個(gè)線程池達(dá)到可以讓任務(wù)在后臺(tái)執(zhí)行。
第十五步中 RemotingClient的invokeAsync函數(shù)

單向發(fā)送
單向發(fā)送,與同步發(fā)送相似。與同步發(fā)送不同的是通過 RemotingClient#invokeOneway 函數(shù)委托發(fā)送。


從 invokeOnway進(jìn)入后
如果當(dāng)前addr為空,獲取和創(chuàng)建Nameserver通道 創(chuàng)建成功后,只要通道是活躍的,并且不為空,就利用Netty框架進(jìn)行? writeAndFlush
創(chuàng)建通道時(shí),通過 ReentrantLock 對nameSeverChannel加鎖,超時(shí)時(shí)長為3秒
源碼
DefaultMQProducerImpl#send同步函數(shù)
/**
?*?內(nèi)核同步發(fā)送
?*?@param?msg
?*?@param?mq
?*?@return
?*?@throws?MQClientException
?*?@throws?RemotingException
?*?@throws?MQBrokerException
?*?@throws?InterruptedException
?*/
public?SendResult?send(Message?msg,?MessageQueue?mq)?throws?MQClientException,?RemotingException,?MQBrokerException,?InterruptedException?{
????return?send(msg,?mq,?this.defaultMQProducer.getSendMsgTimeout());
}
/**
?*?內(nèi)核同步發(fā)送下的??send子函數(shù)
?*?@param?msg
?*?@param?mq
?*?@param?timeout
?*?@return
?*?@throws?MQClientException
?*?@throws?RemotingException
?*?@throws?MQBrokerException
?*?@throws?InterruptedException
?*/
public?SendResult?send(Message?msg,?MessageQueue?mq,?long?timeout)?throws?MQClientException,?RemotingException,?MQBrokerException,?InterruptedException?{
????long?beginStartTime?=?System.currentTimeMillis();
????this.makeSureStateOK();
????Validators.checkMessage(msg,?this.defaultMQProducer);
????if?(!msg.getTopic().equals(mq.getTopic()))?{
????????//?消息的主題不等于mq的主題
????????throw?new?MQClientException("message's?topic?not?equal?mq's?topic",?null);
????}
????long?costTime?=?System.currentTimeMillis()?-?beginStartTime;
????if?(timeout?????????throw?new?RemotingTooMuchRequestException("call?timeout");
????}
????return?this.sendKernelImpl(msg,?mq,?CommunicationMode.SYNC,?null,?null,?timeout);
}
DefaultMQProducerImpl#send異步函數(shù)
/**
?*?內(nèi)核異步
?*
?*?@param?msg
?*?@param?mq
?*?@param?sendCallback
?*?@throws?MQClientException
?*?@throws?RemotingException
?*?@throws?InterruptedException
?*/
public?void?send(Message?msg,?MessageQueue?mq,?SendCallback?sendCallback)?throws?MQClientException,?RemotingException,?InterruptedException?{
????send(msg,?mq,?sendCallback,?this.defaultMQProducer.getSendMsgTimeout());
}
/**
?*?內(nèi)核異步發(fā)送下的??send?子函數(shù)
?
?*?@param?msg
?*?@param?mq
?*?@param?sendCallback
?*?@param?timeout??????the?sendCallback?will?be?invoked?at?most?time
?*?@throws?MQClientException
?*?@throws?RemotingException
?*?@throws?InterruptedException
?*/
@Deprecated
public?void?send(final?Message?msg,?final?MessageQueue?mq,?final?SendCallback?sendCallback,?final?long?timeout)?throws?MQClientException,?RemotingException,?InterruptedException?{
????final?long?beginStartTime?=?System.currentTimeMillis();
????ExecutorService?executor?=?this.getAsyncSenderExecutor();
????try?{
????????executor.submit(new?Runnable()?{
????????????@Override
????????????public?void?run()?{
????????????????try?{
????????????????????makeSureStateOK();
????????????????????Validators.checkMessage(msg,?defaultMQProducer);
????????????????????if?(!msg.getTopic().equals(mq.getTopic()))?{
????????????????????????throw?new?MQClientException("message's?topic?not?equal?mq's?topic",?null);
????????????????????}
????????????????????long?costTime?=?System.currentTimeMillis()?-?beginStartTime;
????????????????????if?(timeout?>?costTime)?{
????????????????????????try?{
????????????????????????????sendKernelImpl(msg,?mq,?CommunicationMode.ASYNC,?sendCallback,?null,?timeout?-?costTime);
????????????????????????}?catch?(MQBrokerException?e)?{
????????????????????????????throw?new?MQClientException("unknown?exception",?e);
????????????????????????}
????????????????????}?else?{
????????????????????????sendCallback.onException(new?RemotingTooMuchRequestException("call?timeout"));
????????????????????}
????????????????}?catch?(Exception?e)?{
????????????????????sendCallback.onException(e);
????????????????}
????????????}
????????});
????}?catch?(RejectedExecutionException?e)?{
????????throw?new?MQClientException("executor?rejected?",?e);
????}
}
DefaultMQProducerImpl#sendOneway函數(shù)
/**
?*?內(nèi)核單向發(fā)送
?*/
public?void?sendOneway(Message?msg,?MessageQueue?mq)?throws?MQClientException,?RemotingException,?InterruptedException?{
????this.makeSureStateOK();
????Validators.checkMessage(msg,?this.defaultMQProducer);
????try?{
????????this.sendKernelImpl(msg,?mq,?CommunicationMode.ONEWAY,?null,?null,?this.defaultMQProducer.getSendMsgTimeout());
????}?catch?(MQBrokerException?e)?{
????????throw?new?MQClientException("unknown?exception",?e);
????}
}
往期推薦
圖文并茂!深入了解RocketMQ的內(nèi)存映射機(jī)制
結(jié)尾
單向發(fā)送那里如果有問題,可以私信我。我們一起交流!
關(guān)于整篇的思路與總結(jié)。主要是從RocketMQ的消息發(fā)送入手的,消息發(fā)送主要分三種
同步 異步 單向
從三種方式各自深入源碼進(jìn)行分析得知,同步,單向,異步流程大致相同
異步發(fā)送與同步發(fā)送最大的不同:?異步發(fā)送在同步發(fā)送的基礎(chǔ)上利用ExecutorService 進(jìn)行初始化異步任務(wù)。在執(zhí)行完成之后,還會(huì)有一個(gè)?updateFaultItem?時(shí)間記錄處理。
正常情況下,會(huì)傳一個(gè)false值,false代表沒有問題,會(huì)采用我們自己計(jì)算的時(shí)間戳賦值 異常情況下,會(huì)傳一個(gè)true值,true代表有問題,會(huì)采用默認(rèn)時(shí)間30s賦值
單向又與同步,異步有些不同,單向因?yàn)椴恍枰朗欠癯晒?,所以他把這條發(fā)送請求進(jìn)行委托處理(利用Netty框架Channel的?writeAndFlush)
非常歡迎大家加我個(gè)人微信有關(guān)后端方面的問題我們在群內(nèi)一起討論!?我們下期再見!
歡迎『點(diǎn)贊』、『在看』、『轉(zhuǎn)發(fā)』三連支持一下,下次見~

