<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RocketMQ消息發(fā)送流程

          共 7637字,需瀏覽 16分鐘

           ·

          2022-05-22 15:26

          記得點(diǎn)擊?"歡少的成長之路",?設(shè)為星標(biāo)?

          后臺(tái)點(diǎn)擊【聯(lián)系我】,申請加入優(yōu)質(zhì)技術(shù)學(xué)習(xí)社群

          大家好,我是Leo。

          今天聊一下RocketMQ消息發(fā)送,重試機(jī)制,故障延遲機(jī)制,獲取路由機(jī)制,消息隊(duì)列的選擇

          消息發(fā)送

          關(guān)系圖

          首先放一下Broker Cluster,Broker,Topic,Queue的關(guān)系圖。因?yàn)橄挛闹饕獣?huì)沿著這四塊進(jìn)行梳理

          發(fā)送的三種方式

          消息發(fā)送的三種方式

          • 同步:發(fā)送者向MQ發(fā)送一條消息后,一直等待服務(wù)器返回成功才繼續(xù)下一個(gè)。
          • 異步:發(fā)送者向MQ發(fā)送一條消息后,通過回調(diào)函數(shù)調(diào)用消息發(fā)布函數(shù)繼續(xù)發(fā)送,主線程立即返回。
          • 單向:發(fā)送者向MQ發(fā)送一條消息后,直接返回,不等待消息服務(wù)器的結(jié)果,也不注冊函數(shù),簡單來說,就是只管發(fā)。其他啥也不管。

          源碼

          package?org.apache.rocketmq.client.impl;

          /**
          ?*?消息發(fā)送的三種方式
          ?*/

          public?enum?CommunicationMode?{
          ????//?同步發(fā)送
          ????SYNC,
          ????//?異步發(fā)送
          ????ASYNC,
          ????//?單向發(fā)送
          ????ONEWAY,
          }

          重試機(jī)制

          RocketMQ的重試機(jī)制,主要由下列兩個(gè)參數(shù)決定。默認(rèn)重試次數(shù)為2次,重試機(jī)制提高了消息發(fā)送成功的幾率。

          /**
          ?*?同步模式下內(nèi)部嘗試發(fā)送消息的最大次數(shù)
          ?*/

          private?int?retryTimesWhenSendFailed?=?2
          /**
          ?*?異步模式下內(nèi)部嘗試發(fā)送消息的最大次數(shù)
          ?*/

          private?int?retryTimesWhenSendAsyncFailed?=?2;

          故障延遲機(jī)制

          RocketMQ的故障延遲機(jī)制,主要由下列參數(shù)決定,默認(rèn)是不開啟的。故障延遲機(jī)制,主要體現(xiàn)在集群的時(shí)候,當(dāng)broker發(fā)送錯(cuò)誤時(shí),可以有效的規(guī)避多次發(fā)送消息都發(fā)往一個(gè)broker(queue)的錯(cuò)誤。

          /**
          ?*??默認(rèn)不啟用Broker故障延遲機(jī)制。
          ?*/

          private?boolean?sendLatencyFaultEnable?=?false;

          獲取路由信息機(jī)制

          消息在發(fā)送時(shí),需要知道,要發(fā)往哪個(gè)broker。首先會(huì)去?brokerAddrTable?中查找當(dāng)前brokerName是否存在在本地的緩存中

          1. 如果成功返回brokerName
          2. 否則就返回null
          /*?Broker?Name?*/???/*?brokerId?*/????/*?address?*/
          private?final?ConcurrentMap>?brokerAddrTable;

          如果成功一帆風(fēng)順,如果找不到的話,肯定要做一些安全處理。

          如果找不到的話,會(huì)通過?tryToFindTopicPublishInfo?函數(shù)嘗試查找主題發(fā)布信息

          1. 在?topicPublishInfoTable?緩存中根據(jù)topic名稱查找是否存在
          2. 如果沒有緩存,會(huì)創(chuàng)建一個(gè)以topic名稱為key,空?TopicPublishInfo?為value到?topicPublishInfoTable?,然后更新到?NameServer?。
          3. 如果消息的路由信息存在,并且?MessageQueue?不為空 直接返回路由信息
          4. 否則使用默認(rèn)主題
          /*?topic?*/
          private?final?ConcurrentMap?topicPublishInfoTable

          消息隊(duì)列的選擇

          開啟故障延遲

          遍歷主題隊(duì)列的消息隊(duì)列,根據(jù)訪問次數(shù)進(jìn)行隨機(jī)自增取模。

          如果當(dāng)前消息隊(duì)列是可用的就直接返回。函數(shù)名?isAvailable

          如果是不可用的,從失敗的brokeName列表中通過?pickOneAtLeast?函數(shù)選擇一個(gè)可用的broker。拿到brokerName之后,再根據(jù)brokerName反查這個(gè)隊(duì)列的寫隊(duì)列數(shù)

          1. 如果小于0說明該broker依據(jù)恢復(fù),從失敗的條目中移出當(dāng)前broker
          2. 如果大于0通過?selectOneMessageQueue?函數(shù)選出一個(gè)消息隊(duì)列
          /**
          ?*?失敗的broker列表
          ?*/

          private?final?LatencyFaultTolerance??latencyFaultTolerance;

          沒有開啟故障延遲

          如果上一次選擇的執(zhí)行發(fā)送消息失敗的broker名稱為空,它會(huì)通過?selectOneMessageQueue()?函數(shù)對當(dāng)前訪問的次數(shù)取絕對值,然后與消息隊(duì)列的大小取模得到一個(gè)下標(biāo),然后從?messageQueueList?中根據(jù)下標(biāo)取出?MessageQueue

          如果上一次選擇的執(zhí)行發(fā)送消息失敗的broker名稱不為空,會(huì)遍歷消息隊(duì)列,對當(dāng)前訪問的次數(shù)取絕對值,然后與消息隊(duì)列的大小取模得到一個(gè)下標(biāo)后,拿著下標(biāo)獲取對應(yīng)的?brokerName?并且判斷當(dāng)前的?brokerName?是否與上一次發(fā)送消息失敗的?brokerName?相等,

          1. 如果相等就遍歷所有主題內(nèi)的消息隊(duì)列。假如還是沒有找到一個(gè)合適的,就會(huì)隨機(jī)選擇一個(gè)
          2. 如果不相等,就把當(dāng)前下標(biāo)的?MessageQueue?返出去。

          下圖的json字符串就是?MessageQueue?信息

          /**
          ?*?該主題隊(duì)列的消息隊(duì)列
          ?*/

          private?List??messageQueueList?=?new?ArrayList??();

          [
          ????{
          ????????"brokerName":?"broker-a",
          ????????"queueId":?0
          ????},
          ????{
          ????????"brokerName":?"broker-a",
          ????????"queueId":?1
          ????},?{
          ????????"brokerName":?"broker-b",
          ????????"queueId":?0
          ????},?{
          ????????"brokerName":?"broker-b",
          ????????"queueId":?1
          ????},?{
          ????????"brokerName":?"broker-c",
          ????????"queueId":?0
          ????},?{
          ????????"brokerName":?"broker-c",
          ????????"queueId":?1
          ????}
          ]

          開啟故障延遲機(jī)制中的可用依據(jù)是:檢查時(shí)間是否到達(dá)了下次可使用的時(shí)間點(diǎn)

          如果沒有該機(jī)制,如果broker宕機(jī),由于路由算法中的消息隊(duì)列是按broker排序的,順序選擇,如果上一次根據(jù)路由算法選擇的是宕機(jī)的broker的第一個(gè)隊(duì)列,那么隨后的下次選擇的是宕機(jī)broker的第二個(gè)隊(duì)列,消息發(fā)送很有可能會(huì)失敗,再次引發(fā)重試,帶來不必要的性能損耗。

          selectOneMessageQueue()也可以看成是兜底策略-輪詢算法

          同步發(fā)送

          由上文得知,消息發(fā)送有三種方式。我們先看一下同步發(fā)送主要做了哪些事情。

          DefaultMQProducerImpl的send函數(shù)是發(fā)送消息的入口

          1. 通過?makeSureStateOK?函數(shù)檢查服務(wù)狀態(tài)是否正常
          2. 通過?checkMessage?函數(shù)校驗(yàn)?Message?與?DefaultMQProducer?是否符合發(fā)送的規(guī)則
          3. 校驗(yàn)消息的主題不能等于消息隊(duì)列集合的主題信息以及以上操作是否超時(shí)
          4. 校驗(yàn)brokerName是否存在,如果不存在通過?findBrokerAddressInPublish?函數(shù)去nameserver拉取
          5. 通過?brokerVIPChannel?函數(shù)校驗(yàn)是否使用了vip管道,如果使用了管道在原來的基礎(chǔ)上把?端口-2
          6. 通過配置信息獲取生成uniqId的算法規(guī)則以及封裝?Message?的實(shí)例信息
          7. 對?Message 的 body?信息進(jìn)行壓縮
          8. 獲取當(dāng)前的配置信息,是否啟用事務(wù)。
          9. 封裝發(fā)送消息模板權(quán)限信息?SendMessageContext,構(gòu)造請求頭
          10. 發(fā)送之前,校驗(yàn)一下?Topid?類型是否屬于重試類型消息(這里可以看看下列注釋)
          11. 通過?CommunicationMode?枚舉類型判斷當(dāng)前是什么發(fā)送方式
          12. 判斷當(dāng)前是正常指令發(fā)送,還是RPC指令發(fā)送,判斷是否對字段進(jìn)行壓縮處理(簡化壓縮有助于提速序列化速度)
          13. 根據(jù)broker地址獲取Netty對應(yīng)的Channel,并遠(yuǎn)程調(diào)用(這里的發(fā)送,用的是Netty框架)
          14. 通過?processSendResponse?函數(shù)處理同步返回的參數(shù),如果參數(shù)為0,說明發(fā)送成功。最后封裝?SendResult?返回

          第四步中,如果在nameserver拉取不到,說明服務(wù)宕機(jī)了。

          第五步中,vip的管道配置從配置文件中的com.rocketmq.sendMessageWithVIPChannel得知

          第六步中,批量信息不支持壓縮

          第十步中,如果是重試消息,通過獲取自定義重試次數(shù),在請求頭區(qū)分特別處理

          第十一步中,因?yàn)檫@里介紹的是同步發(fā)送,就只寫同步發(fā)送流程了,異步,單向會(huì)在下面段落體現(xiàn)出來

          第十二步中,通過配置文件中org.apache.rocketmq.client.sendSmartMsg得知字段是否簡化壓縮

          異步發(fā)送

          聊完同步發(fā)送,我們看一下異步發(fā)送

          DefaultMQProducerImpl的send函數(shù)是發(fā)送消息的入口(這里跟同步的區(qū)別是多了一個(gè)?SendCallback

          1. 在生產(chǎn)者生產(chǎn)消息發(fā)送時(shí),通過?ExecutorService?新增一個(gè)異步任務(wù)進(jìn)行發(fā)送(可看下列注釋,可看源碼區(qū))
          2. 通過?makeSureStateOK?函數(shù)檢查服務(wù)狀態(tài)是否正常
          3. 通過?checkMessage?函數(shù)校驗(yàn)?Message?與?DefaultMQProducer?是否符合發(fā)送的規(guī)則
          4. 校驗(yàn)消息的主題不能等于消息隊(duì)列集合的主題信息以及以上操作是否超時(shí)
          5. 校驗(yàn)brokerName是否存在,如果不存在通過?findBrokerAddressInPublish?函數(shù)去nameserver拉取
          6. 通過?brokerVIPChannel?函數(shù)校驗(yàn)是否使用了vip管道,如果使用了管道在原來的基礎(chǔ)上把?端口-2
          7. 通過配置信息獲取生成uniqId的算法規(guī)則以及封裝?Message?的實(shí)例信息
          8. 對?Message 的 body?信息進(jìn)行壓縮
          9. 獲取當(dāng)前的配置信息,是否啟用事務(wù)。
          10. 封裝發(fā)送消息模板權(quán)限信息?SendMessageContext,構(gòu)造請求頭
          11. 發(fā)送之前,校驗(yàn)一下?Topid?類型是否屬于重試類型消息(這里可以看看下列注釋)
          12. 通過?CommunicationMode?枚舉類型判斷當(dāng)前是什么發(fā)送方式
          13. 判斷當(dāng)前是正常指令發(fā)送,還是RPC指令發(fā)送,判斷是否對字段進(jìn)行壓縮處理(簡化壓縮有助于提速序列化速度)
          14. 根據(jù)broker地址獲取Netty對應(yīng)的Channel,并遠(yuǎn)程調(diào)用(這里的發(fā)送,用的是Netty框架)
          15. 通過?processSendResponse?函數(shù)處理并且利用委托?remotingClient.invokeAsync?等待返回的SendResult?結(jié)構(gòu)體
          16. 上一步驟再插一句,異步發(fā)送會(huì)處理一個(gè)?updateFaultItem?函數(shù)記錄當(dāng)前?不可以時(shí)間/可用時(shí)間?時(shí)間

          第一步中 借助 java.util.concurrent.ExecutorService ,實(shí)現(xiàn)一個(gè)線程池達(dá)到可以讓任務(wù)在后臺(tái)執(zhí)行。

          第十五步中 RemotingClient的invokeAsync函數(shù)

          單向發(fā)送

          單向發(fā)送,與同步發(fā)送相似。與同步發(fā)送不同的是通過 RemotingClient#invokeOneway 函數(shù)委托發(fā)送。

          從 invokeOnway進(jìn)入后

          1. 如果當(dāng)前addr為空,獲取和創(chuàng)建Nameserver通道
          2. 創(chuàng)建成功后,只要通道是活躍的,并且不為空,就利用Netty框架進(jìn)行?writeAndFlush

          創(chuàng)建通道時(shí),通過 ReentrantLock 對nameSeverChannel加鎖,超時(shí)時(shí)長為3秒

          源碼

          DefaultMQProducerImpl#send同步函數(shù)

          /**
          ?*?內(nèi)核同步發(fā)送
          ?*?@param?msg
          ?*?@param?mq
          ?*?@return
          ?*?@throws?MQClientException
          ?*?@throws?RemotingException
          ?*?@throws?MQBrokerException
          ?*?@throws?InterruptedException
          ?*/

          public?SendResult?send(Message?msg,?MessageQueue?mq)?throws?MQClientException,?RemotingException,?MQBrokerException,?InterruptedException?{
          ????return?send(msg,?mq,?this.defaultMQProducer.getSendMsgTimeout());
          }

          /**
          ?*?內(nèi)核同步發(fā)送下的??send子函數(shù)
          ?*?@param?msg
          ?*?@param?mq
          ?*?@param?timeout
          ?*?@return
          ?*?@throws?MQClientException
          ?*?@throws?RemotingException
          ?*?@throws?MQBrokerException
          ?*?@throws?InterruptedException
          ?*/

          public?SendResult?send(Message?msg,?MessageQueue?mq,?long?timeout)?throws?MQClientException,?RemotingException,?MQBrokerException,?InterruptedException?{
          ????long?beginStartTime?=?System.currentTimeMillis();
          ????this.makeSureStateOK();
          ????Validators.checkMessage(msg,?this.defaultMQProducer);

          ????if?(!msg.getTopic().equals(mq.getTopic()))?{
          ????????//?消息的主題不等于mq的主題
          ????????throw?new?MQClientException("message's?topic?not?equal?mq's?topic",?null);
          ????}

          ????long?costTime?=?System.currentTimeMillis()?-?beginStartTime;
          ????if?(timeout?????????throw?new?RemotingTooMuchRequestException("call?timeout");
          ????}

          ????return?this.sendKernelImpl(msg,?mq,?CommunicationMode.SYNC,?null,?null,?timeout);
          }

          DefaultMQProducerImpl#send異步函數(shù)

          /**
          ?*?內(nèi)核異步
          ?*
          ?*?@param?msg
          ?*?@param?mq
          ?*?@param?sendCallback
          ?*?@throws?MQClientException
          ?*?@throws?RemotingException
          ?*?@throws?InterruptedException
          ?*/

          public?void?send(Message?msg,?MessageQueue?mq,?SendCallback?sendCallback)?throws?MQClientException,?RemotingException,?InterruptedException?{
          ????send(msg,?mq,?sendCallback,?this.defaultMQProducer.getSendMsgTimeout());
          }
          /**
          ?*?內(nèi)核異步發(fā)送下的??send?子函數(shù)
          ?
          ?*?@param?msg
          ?*?@param?mq
          ?*?@param?sendCallback
          ?*?@param?timeout??????the?sendCallback?will?be?invoked?at?most?time
          ?*?@throws?MQClientException
          ?*?@throws?RemotingException
          ?*?@throws?InterruptedException
          ?*/

          @Deprecated
          public?void?send(final?Message?msg,?final?MessageQueue?mq,?final?SendCallback?sendCallback,?final?long?timeout)?throws?MQClientException,?RemotingException,?InterruptedException?{
          ????final?long?beginStartTime?=?System.currentTimeMillis();
          ????ExecutorService?executor?=?this.getAsyncSenderExecutor();
          ????try?{
          ????????executor.submit(new?Runnable()?{
          ????????????@Override
          ????????????public?void?run()?{
          ????????????????try?{
          ????????????????????makeSureStateOK();
          ????????????????????Validators.checkMessage(msg,?defaultMQProducer);

          ????????????????????if?(!msg.getTopic().equals(mq.getTopic()))?{
          ????????????????????????throw?new?MQClientException("message's?topic?not?equal?mq's?topic",?null);
          ????????????????????}
          ????????????????????long?costTime?=?System.currentTimeMillis()?-?beginStartTime;
          ????????????????????if?(timeout?>?costTime)?{
          ????????????????????????try?{
          ????????????????????????????sendKernelImpl(msg,?mq,?CommunicationMode.ASYNC,?sendCallback,?null,?timeout?-?costTime);
          ????????????????????????}?catch?(MQBrokerException?e)?{
          ????????????????????????????throw?new?MQClientException("unknown?exception",?e);
          ????????????????????????}
          ????????????????????}?else?{
          ????????????????????????sendCallback.onException(new?RemotingTooMuchRequestException("call?timeout"));
          ????????????????????}
          ????????????????}?catch?(Exception?e)?{
          ????????????????????sendCallback.onException(e);
          ????????????????}

          ????????????}

          ????????});
          ????}?catch?(RejectedExecutionException?e)?{
          ????????throw?new?MQClientException("executor?rejected?",?e);
          ????}

          }

          DefaultMQProducerImpl#sendOneway函數(shù)

          /**
          ?*?內(nèi)核單向發(fā)送
          ?*/

          public?void?sendOneway(Message?msg,?MessageQueue?mq)?throws?MQClientException,?RemotingException,?InterruptedException?{
          ????this.makeSureStateOK();
          ????Validators.checkMessage(msg,?this.defaultMQProducer);

          ????try?{
          ????????this.sendKernelImpl(msg,?mq,?CommunicationMode.ONEWAY,?null,?null,?this.defaultMQProducer.getSendMsgTimeout());
          ????}?catch?(MQBrokerException?e)?{
          ????????throw?new?MQClientException("unknown?exception",?e);
          ????}
          }

          往期推薦

          2022年4月文章目錄整理

          RocketMQ數(shù)據(jù)壓縮的那套把戲

          圖文并茂!深入理解RocketMQ的刷盤機(jī)制

          圖文并茂!深入了解RocketMQ的過期刪除機(jī)制

          圖文并茂!深入了解RocketMQ的內(nèi)存映射機(jī)制

          結(jié)尾

          單向發(fā)送那里如果有問題,可以私信我。我們一起交流!

          關(guān)于整篇的思路與總結(jié)。主要是從RocketMQ的消息發(fā)送入手的,消息發(fā)送主要分三種

          1. 同步
          2. 異步
          3. 單向

          從三種方式各自深入源碼進(jìn)行分析得知,同步,單向,異步流程大致相同

          異步發(fā)送與同步發(fā)送最大的不同:?異步發(fā)送在同步發(fā)送的基礎(chǔ)上利用ExecutorService 進(jìn)行初始化異步任務(wù)。在執(zhí)行完成之后,還會(huì)有一個(gè)?updateFaultItem?時(shí)間記錄處理。

          1. 正常情況下,會(huì)傳一個(gè)false值,false代表沒有問題,會(huì)采用我們自己計(jì)算的時(shí)間戳賦值
          2. 異常情況下,會(huì)傳一個(gè)true值,true代表有問題,會(huì)采用默認(rèn)時(shí)間30s賦值

          單向又與同步,異步有些不同,單向因?yàn)椴恍枰朗欠癯晒?,所以他把這條發(fā)送請求進(jìn)行委托處理(利用Netty框架Channel的?writeAndFlush

          非常歡迎大家加我個(gè)人微信有關(guān)后端方面的問題我們在群內(nèi)一起討論!?我們下期再見!

          歡迎『點(diǎn)贊』、『在看』、『轉(zhuǎn)發(fā)』三連支持一下,下次見~

          瀏覽 25
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久久加勅比 | 黄色成人观看网站 | 乱伦性爱视频 | 就爱日五月天 | 久久人妻av |