<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          面試官問: 如何保證 MQ消息是有序的?

          共 3188字,需瀏覽 7分鐘

           ·

          2021-08-20 13:06

          大家好,我是Tom哥~


          為了系統(tǒng)間解耦,我們通常會(huì)引入MQ框架,大家各司其職共同完成上下游的業(yè)務(wù)流程。

          大致過程:

          • 生產(chǎn)端,創(chuàng)建一條消息,通過網(wǎng)絡(luò)發(fā)送到MQ Server
          • MQ將 消息存儲(chǔ)在topic 的一個(gè)分區(qū)
          • 消費(fèi)端,從分區(qū)中拉取消息,消費(fèi)處理


          但現(xiàn)實(shí)往往不一樣!MQ 架構(gòu)設(shè)計(jì)要滿足高并發(fā)、高性能、高可用等指標(biāo)


          單分區(qū),達(dá)不到我們的吞吐量要求,我們考慮采用多分區(qū)架構(gòu)設(shè)計(jì),正所謂 ”三個(gè)臭皮匠賽過一個(gè)諸葛亮“,多分區(qū)可以有效分?jǐn)側(cè)謮毫Γ嵘w系統(tǒng)性能。



          兩臺(tái) MQ機(jī)器,組成一個(gè)集群,原先一個(gè)分區(qū)存儲(chǔ)6條消息,現(xiàn)在分?jǐn)偟絻蓚€(gè)分區(qū),每個(gè)分區(qū)各存儲(chǔ)3條消息,性能比上面那個(gè)提升一倍。

          貌似可以滿足我們的需求,但任何事情都有兩面性!


          我們看看下面業(yè)務(wù)場景:

          一個(gè)用戶在電商網(wǎng)站上下訂單到交易完成,中間會(huì)經(jīng)歷一系列動(dòng)作,訂單的狀態(tài)也會(huì)隨之變化,一個(gè)訂單會(huì)產(chǎn)生多條MQ消息,下單、付款、發(fā)貨、買家確認(rèn)收貨,消費(fèi)端需要嚴(yán)格按照業(yè)務(wù)狀態(tài)機(jī)的順序處理,否則,就會(huì)出現(xiàn)業(yè)務(wù)問題。

          我們發(fā)現(xiàn),消息帶上了狀態(tài),不再是一個(gè)個(gè)獨(dú)立的個(gè)體,有了上下文依賴關(guān)系!

          對(duì)于這個(gè)問題,突然想到HTTP協(xié)議,其本身也是無狀態(tài)的,也就是說前后兩次請(qǐng)求沒有關(guān)聯(lián),但有些業(yè)務(wù)功能有登錄要求,那怎么解決?

          引入Cookie機(jī)制,每次請(qǐng)求客戶端額外傳輸一些數(shù)據(jù),來達(dá)到上下文關(guān)聯(lián)。


          回到MQ的消息順序問題,我們要如何解決?



          答案:各退一步,保證局部有序。

          比如上面的電商例子,只要保證一個(gè)訂單的多條狀態(tài)消息在同一個(gè)分區(qū),便可以滿足業(yè)務(wù)需求,這個(gè)方案可以覆蓋大部分的業(yè)務(wù)場景。

          這里面只需要有一個(gè)路由策略組件,由它決定消息該放到哪個(gè)分區(qū)中!

          考慮到市面MQ開源框架很多,常見的如:Kafka、Pulsar、RabbitMQ、RocketMQ 等,API方法略有區(qū)別,但設(shè)計(jì)思路是相通的。


          接下來,我們以 RocketMQ 為例:


          生產(chǎn)端提供了一個(gè)接口 MessageQueueSelector

          public interface MessageQueueSelector {
             MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
          }

          接口內(nèi)定義一個(gè)select方法,具體參數(shù)含義:

          • mqs:該Topic下所有的隊(duì)列分片
          • msg:待發(fā)送的消息
          • arg:發(fā)送消息時(shí)傳遞的參數(shù)

          關(guān)于MessageQueueSelector接口,RocketMQ 框架提供了三個(gè)默認(rèn)實(shí)現(xiàn)類:

          • 1、SelectMessageQueueByHash:

          arg參數(shù)的hashcode的絕對(duì)值,然后對(duì)mqs.size()取余,得到目標(biāo)隊(duì)列在mqs的下標(biāo)

          • 2、SelectMessageQueueByRandom:

          對(duì)mqs.size()值取隨機(jī)數(shù)作為目標(biāo)隊(duì)列在mqs的下標(biāo)

          • 3、SelectMessageQueueByMachineRoom

          返回null


          特別注意:

          雖然保證了單個(gè)分片的消息有序,但每個(gè)分片的消費(fèi)者只能是單線程處理,因?yàn)槎嗑€程無法控制消費(fèi)順序。這個(gè)可能會(huì)損失一些性能。


          這里又引出另一個(gè)問題,如何保證一個(gè)隊(duì)列只能有一個(gè)消費(fèi)端呢?

          1、

          org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance

          • 遍歷一個(gè)topic下所有的MessageQueue
          • isOrder && !this.lock(mq) 嘗試對(duì)它加鎖,確保一個(gè)MessageQueue只能被一個(gè)消費(fèi)者處理


          2、將PullRequest對(duì)象放入PullMessageServicepullRequestQueue隊(duì)列中

          public void dispatchPullRequest(List<PullRequest> pullRequestList) {
              for (PullRequest pullRequest : pullRequestList) {
                  this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
                  log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
              }
          }


          3、org.apache.rocketmq.client.impl.consumer.PullMessageService#run

          • PullMessageService 是一個(gè)Runnable線程任務(wù)
          • 無限循環(huán),從隊(duì)列中拉取、處理消息


          另一個(gè)問題,如何保證一個(gè)隊(duì)列,只有一個(gè)線程在處理消息呢?


          1、 DefaultMQPushConsumerImpl#pullMessage

          • ConsumeMessageService 中有兩個(gè)實(shí)現(xiàn)類,因?yàn)槲覀冇邢M(fèi)順序要求,會(huì)選擇ConsumeMessageOrderlyService來處理業(yè)務(wù)


          2、 ConsumeMessageOrderlyService.ConsumeRequest


          • ConcurrentMap中獲取messageQueue對(duì)應(yīng)的鎖對(duì)象
          • 通過 synchronized 關(guān)鍵字,線程來搶占鎖,互斥關(guān)系,從而保證了一個(gè)MessageQueue只能有一個(gè)線程并發(fā)處理


          繼續(xù)往下看,如果擴(kuò)容了怎么辦?

          原來有6個(gè)分區(qū),order_id_1的消息在MessageQueue6 中,此時(shí)擴(kuò)容一倍,現(xiàn)在12個(gè)分區(qū),order_id_1訂單后面產(chǎn)生的消息可能路由到了MessageQueue8 中,同一個(gè)訂單的消息分布在兩個(gè)分區(qū)中,無法保證順序。

          我們能做的是,先將存量消息處理完,再擴(kuò)容。如果是在線業(yè)務(wù),可以搞個(gè)臨時(shí)topic,先將消息暫時(shí)堆積,待擴(kuò)容后,按新的路由規(guī)則重新發(fā)送。


          順序消息,如果某條失敗了怎么辦?會(huì)不會(huì)一直阻塞?

          1、如果失敗,不會(huì)提交消費(fèi)位移,系統(tǒng)會(huì)自動(dòng)重試(有重試上限),此時(shí)會(huì)阻塞后面的消息消費(fèi),直到這條消息處理完

          2、如果這個(gè)消息達(dá)到重試上限,依然失敗,會(huì)進(jìn)入死信隊(duì)列,可以繼續(xù)處理后面的消息




          關(guān)于我:前阿里P7技術(shù)專家,出過專利,競賽拿過獎(jiǎng),CSDN博客專家,負(fù)責(zé)過電商交易、社區(qū)生鮮、互聯(lián)網(wǎng)金融等業(yè)務(wù),多年團(tuán)隊(duì)管理經(jīng)驗(yàn)。


          推薦閱讀:
          MySQL 開源工具集合
          什么是布隆過濾器?如何解決高并發(fā)緩存穿透問題?
          如何通過Binlog來實(shí)現(xiàn)不同系統(tǒng)間數(shù)據(jù)同步
          高并發(fā)服務(wù)優(yōu)化篇:詳解RPC的一次調(diào)用過程
          如何設(shè)計(jì)一個(gè)高性能的秒殺系統(tǒng)

          關(guān)號(hào)互聯(lián)網(wǎng)全棧架構(gòu)價(jià)


          瀏覽 21
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩欧美一道本 | 波多野吉衣中文字幕 | 豆花精品在线 | 囯产精品久久久久久 | 黑人大屌三 。p 黄片免费观看永久 |