<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          「查缺補(bǔ)漏」鞏固你的RocketMQ知識(shí)體系

          共 16075字,需瀏覽 33分鐘

           ·

          2020-08-27 17:08

          轉(zhuǎn)自:Kerwin啊??作者:柯小賢

          Windows安裝部署

          下載

          地址:[https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.5.2/rocketmq-all-4.5.2-bin-release.zip]

          選擇‘Binary’進(jìn)行下載

          解壓已下載工程

          配置

          新增系統(tǒng)變量 ROCKETMQ_HOME ?-> ?F:\RocketMQ\rocketmq-4.5.2

          JAVA_HOME ? ? ? ? ? ? -> ?F:\Java_JDK\JDK1.8

          Path 系統(tǒng)變量新增:Maven/bin目錄

          PS:RocketMQ 消息存儲(chǔ)在C:\Users\Administrator\store store目錄中 ?文件占用較大,注意刪除不必要的內(nèi)容

          啟動(dòng)

          start mqnamesrv.cmd

          start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

          Rocket集成可視化監(jiān)控插件

          1. 任意目錄(拉取項(xiàng)目,隨便哪里都行)git clone https://github.com/apache/rocketmq-externals.git

          2. 進(jìn)入‘rocketmq-externals\rocketmq-console\src\main\resources’文件夾,打開‘a(chǎn)pplication.properties’進(jìn)行配置

          3. 其實(shí)就是一個(gè)SpringBoot服務(wù),確定好端口,別重復(fù)即可

            server.port=8100

            rocketmq.config.namesrvAddr=127.0.0.1:9876

          4. 進(jìn)入‘\rocketmq-externals\rocketmq-console’文件夾

            執(zhí)行‘mvn clean package -Dmaven.test.skip=true’,編譯生成target

            java -jar rocketmq-console-ng-1.0.1.jar

          5. 根據(jù)配置地址訪問(wèn):http://127.0.0.1:8100

          Rocket可視化監(jiān)控插件 增加Topic | 自動(dòng)增加Topic(4.5.2版本)

          4.5.2 版本 支持自動(dòng)創(chuàng)建Topic

          4.3.0 版本 必須通過(guò)監(jiān)控程序配置Topic,否則執(zhí)行程序報(bào)錯(cuò),沒有此路由

          SpringBoot集成 RocketMQ

              org.apache.rocketmq    rocketmq-client    4.5.2

          RocketMQ基本概念


          概覽

          基于RocketMQ的分布式系統(tǒng),一般可以分為四個(gè)集群:Name server、broker、producer、consumer

          1. name server

            • 提供輕量級(jí)的服務(wù)發(fā)現(xiàn)和路由服務(wù);

            • 每個(gè)節(jié)點(diǎn)都存放了全部的路由信息和對(duì)應(yīng)的讀寫服務(wù);

            • 存儲(chǔ)支持水平擴(kuò)展

          2. broker

            • 提供滿足TOPIC和QUEUE機(jī)制的消息存儲(chǔ)服務(wù);
            • 有推和拉兩種模式;
            • 通過(guò)2或3拷貝實(shí)現(xiàn)高可用;
            • 提供上億消息的堆積能力;
            • 提供故障恢復(fù)、統(tǒng)計(jì)功能和告警功能;
          3. producer

            • 支持分布式部署,通過(guò)負(fù)載平衡模塊給broker發(fā)消息
            • 支持快速失敗
            • 低延遲
          4. consumer

            1. 支持推和拉兩種模式
            2. 支持集群消費(fèi)和廣播消費(fèi)

          核心模塊


          Name Server

          提供Broker管理;Routing管理(路由管理)

          NameServer,很多時(shí)候稱為命名發(fā)現(xiàn)服務(wù),其在RocketMQ中起著中轉(zhuǎn)承接的作用,是一個(gè)無(wú)狀態(tài)的服務(wù),多個(gè)NameServer之間不通信。任何Producer,Consumer,Broker與所有NameServer通信,向NameServer請(qǐng)求或者發(fā)送數(shù)據(jù)。而且都是單向的,Producer和Consumer請(qǐng)求數(shù)據(jù),Broker發(fā)送數(shù)據(jù)。正是因?yàn)檫@種單向的通信,RocketMQ水平擴(kuò)容變得很容易

          • 提供輕量級(jí)的服務(wù)發(fā)現(xiàn)和路由服務(wù);
          • 每個(gè)節(jié)點(diǎn)都存放了全部的路由信息和對(duì)應(yīng)的讀寫服務(wù);
          • 存儲(chǔ)支持水平擴(kuò)展

          總結(jié):相比于ZooKeeper提供的分布式鎖,發(fā)布和訂閱,數(shù)據(jù)一致性,選舉等,在RocketMQ是不適用的,因此重寫了一套更加輕量級(jí)的發(fā)現(xiàn)服務(wù),主要用以存儲(chǔ) Broker相關(guān)信息以及當(dāng)前Broker上的topic信息,路由信息等

          Broker Server

          提供Remoting Module、客戶端管理、存儲(chǔ)服務(wù)、HA服務(wù)(主從)、索引服務(wù)

          • 提供滿足TOPIC和QUEUE機(jī)制的消息存儲(chǔ)服務(wù);
          • 有推和拉兩種模式;
          • 通過(guò)2或3拷貝實(shí)現(xiàn)高可用;
          • 提供上億消息的堆積能力;
          • 提供故障恢復(fù)、統(tǒng)計(jì)功能和告警功能;

          producer

          • 支持分布式部署,通過(guò)負(fù)載平衡模塊給broker發(fā)消息
          • 支持快速失敗
          • 低延遲

          consumer

          • 支持推和拉兩種模式
          • 支持集群消費(fèi)和廣播消費(fèi)

          核心角色介紹


          生產(chǎn)者

          生產(chǎn)者發(fā)送業(yè)務(wù)系統(tǒng)產(chǎn)生的消息給broker, RocketMQ提供了多種發(fā)送方式:同步的、異步的、單向的


          生產(chǎn)者組

          具有相同角色的生產(chǎn)者被分到一組, 假如原始的生產(chǎn)者在事務(wù)后崩潰,broker會(huì)聯(lián)系 同一生產(chǎn)者組中的不同生產(chǎn)者實(shí)例,繼續(xù)提交或回滾事務(wù)


          消費(fèi)者

          一個(gè)消費(fèi)者從broker拉取信息,并將信息返還給應(yīng)用。為了我們應(yīng)用的正確性,提供了兩種消費(fèi)者類型:

          拉式消費(fèi)者:拉式消費(fèi)者從broker拉取消息,一旦一批消息被拉取,用戶應(yīng)用系統(tǒng)將發(fā)起消費(fèi)過(guò)程。

          推式消費(fèi)者:推式消費(fèi)者,從另一方面講,囊括了消息的拉取、消費(fèi)過(guò)程,并保持了內(nèi)部的其他工作,留下了一個(gè)回調(diào) 接口給終端用戶去實(shí)現(xiàn),實(shí)現(xiàn)在消息到達(dá)時(shí)要執(zhí)行的內(nèi)容。


          消費(fèi)者組

          具有相同角色的消費(fèi)者被組在一起,稱為消費(fèi)者組,它完成了負(fù)載均衡和容錯(cuò)的目標(biāo)

          一個(gè)消費(fèi)組中的消費(fèi)者實(shí)例必須有確定的相同的訂閱topic


          Topic(主題)

          Topic是一個(gè)消息的目錄,在這個(gè)目錄中,生產(chǎn)者傳送消息,消費(fèi)者拉取消息,可以多個(gè)消費(fèi)者訂閱同一個(gè)topic,一個(gè)生產(chǎn)者也可以發(fā)送多個(gè)topic

          PS:RocketMQ 基于發(fā)布訂閱模式,發(fā)布訂閱的核心即 Topic 主題


          Message(消息)

          消息是被傳遞的信息。一個(gè)消息必須有一個(gè)Topic,它可以理解為信件上的地址。一個(gè)消息也可以有一個(gè)可選的tag,和額外的key-value對(duì)。例如:你可以設(shè)置業(yè)務(wù)中的鍵到你的消息中,在broker服務(wù)中查找消息,以便在開發(fā)期間診斷問(wèn)題


          消息隊(duì)列

          Topic被分割成一個(gè)或多個(gè)消息隊(duì)列。隊(duì)列分為3種角色:異步主、同步主、從。如果你不能容忍消息丟失,我們建議你部署同步主,并加一個(gè)從隊(duì)列。如果你容忍丟失,但你希望隊(duì)列總是可用,你可以部署異步主和從隊(duì)列。如果你想最簡(jiǎn)單,你只需要一個(gè)異步主,不需要從隊(duì)列。消息保存磁盤的方式也有兩種,推薦使用的是異步保存,同步保存是昂貴的并會(huì)導(dǎo)致性能損失,如果你想要可靠性,我們推薦你使用同步主+從的方式。


          Tag(標(biāo)簽)

          標(biāo)簽,用另外一個(gè)詞來(lái)說(shuō),就是子主題,為用戶提供額外的靈活性。具有相同Topic的消息可以有不同的tag。


          Broker(隊(duì)列)

          Broker是RocketMQ的一個(gè)主要組件,它接收生產(chǎn)者發(fā)送的消息,存儲(chǔ)它們并準(zhǔn)備處理消費(fèi)者的拉取請(qǐng)求。它也存儲(chǔ)消息相關(guān)的元數(shù)據(jù), 包括消費(fèi)組,消費(fèi)成功的偏移量,主題、隊(duì)列的信息。


          名稱服務(wù)

          名稱服務(wù)主要提供路由信息。生產(chǎn)者/消費(fèi)者客戶端尋找topic,并找到通信的隊(duì)列列表。


          消息的順序

          當(dāng)DefaultMQPushConsumer被使用,你就要決定消費(fèi)消息時(shí),是順序消費(fèi)還是同時(shí)消費(fèi)

          • 順序消費(fèi)

          順序消費(fèi)消息的意思是 消息將按照生產(chǎn)者發(fā)送到隊(duì)列時(shí)的順序被消費(fèi)掉。如果你被強(qiáng)制要求使用全局的順序,你要確保你的topic只有一個(gè)消息隊(duì)列。

          如果指定順序消費(fèi),消息被同時(shí)消費(fèi)的數(shù)量就是訂閱這個(gè)topic的消費(fèi)組的數(shù)量。

          • 同時(shí)消費(fèi)

          當(dāng)同時(shí)消費(fèi)消息時(shí),消息同時(shí)消費(fèi)的最大數(shù)量取決于消費(fèi)客戶端指定的線程池的大小。


          最佳實(shí)踐

          Producer最佳實(shí)踐
          1. 一個(gè)應(yīng)用盡可能用一個(gè) Topic,消息子類型用 tags 來(lái)標(biāo)識(shí),tags 可以由應(yīng)用自由設(shè)置。只有發(fā)送消息設(shè)置了tags,消費(fèi)方在訂閱消息時(shí),才可以利用 tags 在 broker 做消息過(guò)濾。

          2. 每個(gè)消息在業(yè)務(wù)層面的唯一標(biāo)識(shí)碼,要設(shè)置到 keys 字段,方便將來(lái)定位消息丟失問(wèn)題。由于是哈希索引,請(qǐng)務(wù)必保證 key 盡可能唯一,這樣可以避免潛在的哈希沖突。

            消息發(fā)送成功或者失敗,要打印消息日志,務(wù)必要打印 sendresult 和 key 字段。

          3. 對(duì)于消息不可丟失應(yīng)用,務(wù)必要有消息重發(fā)機(jī)制。例如:消息發(fā)送失敗,存儲(chǔ)到數(shù)據(jù)庫(kù),能有定時(shí)程序嘗試重發(fā)或者人工觸發(fā)重發(fā)。

          4. 某些應(yīng)用如果不關(guān)注消息是否發(fā)送成功,請(qǐng)直接使用sendOneWay方法發(fā)送消息。

          Consumer最佳實(shí)踐
          1. 消費(fèi)過(guò)程要做到冪等(即消費(fèi)端去重)
          2. 盡量使用批量方式消費(fèi)方式,可以很大程度上提高消費(fèi)吞吐量。
          3. 優(yōu)化每條消息消費(fèi)過(guò)程

          MQ核心問(wèn)題


          1.消息隊(duì)列適合解決的問(wèn)題

          解決的核心問(wèn)題主要是:異步、解耦、削峰

          但是引入消息隊(duì)列也會(huì)有很多額外的問(wèn)題,比如系統(tǒng)復(fù)雜性會(huì)大大增加,同時(shí)需要解決重復(fù)下發(fā),重復(fù)消費(fèi),消費(fèi)順序,消息丟失,重試機(jī)制等等問(wèn)題,因此不能濫用,合適的場(chǎng)景用合適的技術(shù)


          2.消息模型:主題和隊(duì)列的區(qū)別

          一、消息隊(duì)列的演進(jìn)

          1、初始階段

          最初的消息隊(duì)列,就是一個(gè)嚴(yán)格意義上的隊(duì)列。隊(duì)列是一種數(shù)據(jù)結(jié)構(gòu),先進(jìn)先出,在消息入隊(duì)出隊(duì)過(guò)程中,保證這些消息嚴(yán)格有序。早期的消息隊(duì)列就是按照“隊(duì)列”的數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì)的

          隊(duì)列模型:

          生產(chǎn)者(Producer)發(fā)消息就是入隊(duì)操作,消費(fèi)者(Consumer)收消息就是出隊(duì)也就是刪除操作,服務(wù)端存放消息的容器自然就稱為“隊(duì)列”。

          • 如果有多個(gè)生產(chǎn)者往同一個(gè)隊(duì)列里面發(fā)送消息,這個(gè)隊(duì)列中可以消費(fèi)到的消息,就是這些生產(chǎn)者生產(chǎn)的所有消息的合集。消息的順序就是這些生產(chǎn)者發(fā)送消息的自然順序。
          • 如果有多個(gè)消費(fèi)者接收同一個(gè)隊(duì)列的消息,這些消費(fèi)者之間實(shí)際上是競(jìng)爭(zhēng)的關(guān)系,每個(gè)消費(fèi)者只能收到隊(duì)列中的一部分消息,也就是說(shuō)任何一條消息只能被其中的一個(gè)消費(fèi)者收到。

          2、發(fā)布 - 訂閱模型階段

          如果需要將一份消息數(shù)據(jù)分發(fā)給多個(gè)消費(fèi)者,要求每個(gè)消費(fèi)者都能收到全量的消息,例如,對(duì)于一份訂單數(shù)據(jù),風(fēng)控系統(tǒng)、分析系統(tǒng)、支付系統(tǒng)等都需要接收消息。

          這個(gè)時(shí)候,單個(gè)隊(duì)列就滿足不了需求,一個(gè)可行的解決方式是,為每個(gè)消費(fèi)者創(chuàng)建一個(gè)單獨(dú)的隊(duì)列,讓生產(chǎn)者發(fā)送多份。但是同樣的一份消息數(shù)據(jù)被復(fù)制到多個(gè)隊(duì)列中會(huì)浪費(fèi)資源,更重要的是,生產(chǎn)者必須知道有多少個(gè)消費(fèi)者。為每個(gè)消費(fèi)者單獨(dú)發(fā)送一份消息,這實(shí)際上**違背了消息隊(duì)列“解耦”**這個(gè)設(shè)計(jì)初衷。

          為了解決這個(gè)問(wèn)題,演化出了另外一種消息模型:發(fā)布 - 訂閱模型(Publish-Subscribe Pattern)

          消息的發(fā)送方稱為發(fā)布者(Publisher),消息的接收方稱為訂閱者(Subscriber),服務(wù)端存放消息的容器稱為主題(Topic)。

          • 發(fā)布者將消息發(fā)送到主題中,訂閱者在接收消息之前需要先“訂閱主題”。
          • 每份訂閱中,訂閱者都可以接收到主題的所有消息。

          3、總結(jié):

          • 在很長(zhǎng)的一段時(shí)間,隊(duì)列模式和發(fā)布 - 訂閱模式是并存的。
          • 有些消息隊(duì)列同時(shí)支持這兩種消息模型,比如 ActiveMQ。
          • 對(duì)比這兩種模型,生產(chǎn)者就是發(fā)布者,消費(fèi)者就是訂閱者,隊(duì)列就是主題,并沒有本質(zhì)的區(qū)別。它們最大的區(qū)別是:一份消息數(shù)據(jù)能不能被消費(fèi)多次的問(wèn)題
          • 實(shí)際上,在這種發(fā)布 - 訂閱模型中,如果只有一個(gè)訂閱者,那它和隊(duì)列模型就基本是一樣的了。也就是說(shuō),發(fā)布 - 訂閱模型在功能層面上是可以兼容隊(duì)列模型的。

          二、RabbitMQ 的消息模型

          少數(shù)依然堅(jiān)持使用隊(duì)列模型的產(chǎn)品之一。

          RabbitMQ 使用 Exchange 模塊解決多個(gè)消費(fèi)者的問(wèn)題。Exchange 位于生產(chǎn)者和隊(duì)列之間,生產(chǎn)者并不關(guān)心將消息發(fā)送給哪個(gè)隊(duì)列,而是將消息發(fā)送給 Exchange,由 Exchange 上配置的策略來(lái)決定將消息投遞到哪些隊(duì)列中。

          • 同一份消息如果需要被多個(gè)消費(fèi)者來(lái)消費(fèi),需要配置 Exchange 將消息發(fā)送到多個(gè)隊(duì)列,每個(gè)隊(duì)列中都存放一份完整的消息數(shù)據(jù),可以為一個(gè)消費(fèi)者提供消費(fèi)服務(wù)。

          三、RocketMQ 的消息模型

          RocketMQ 使用的消息模型是標(biāo)準(zhǔn)的發(fā)布 - 訂閱模型。在 RocketMQ 也有隊(duì)列(Queue)這個(gè)概念。

          消息隊(duì)列的消費(fèi)機(jī)制:

          幾乎所有的消息隊(duì)列產(chǎn)品都使用一種非常樸素的“請(qǐng)求 - 確認(rèn)”機(jī)制,確保消息不會(huì)在傳遞過(guò)程中由于網(wǎng)絡(luò)或服務(wù)器故障丟失。

          在生產(chǎn)端,生產(chǎn)者先將消息發(fā)送給服務(wù)端,也就是 Broker,服務(wù)端在收到消息并將消息寫入主題或者隊(duì)列中后,會(huì)給生產(chǎn)者發(fā)送確認(rèn)的響應(yīng)。如果生產(chǎn)者沒有收到服務(wù)端的確認(rèn)或者收到失敗的響應(yīng),則會(huì)重新發(fā)送消息

          在消費(fèi)端,消費(fèi)者在收到消息并完成自己的消費(fèi)業(yè)務(wù)邏輯(比如,將數(shù)據(jù)保存到數(shù)據(jù)庫(kù)中)后,也會(huì)給服務(wù)端發(fā)送消費(fèi)成功的確認(rèn),服務(wù)端只有收到消費(fèi)確認(rèn)后,才認(rèn)為一條消息被成功消費(fèi),否則它會(huì)給消費(fèi)者重新發(fā)送這條消息,直到收到對(duì)應(yīng)的消費(fèi)成功確認(rèn)。

          這個(gè)確認(rèn)機(jī)制很好地保證了消息傳遞過(guò)程中的可靠性,但是,引入這個(gè)機(jī)制在消費(fèi)端帶來(lái)了一個(gè)問(wèn)題:為了確保消息的有序性,在某一條消息被成功消費(fèi)之前,下一條消息是不能被消費(fèi)的,也就是說(shuō),每個(gè)主題在任意時(shí)刻,至多只能有一個(gè)消費(fèi)者實(shí)例在進(jìn)行消費(fèi),那就沒法通過(guò)水平擴(kuò)展消費(fèi)者的數(shù)量來(lái)提升消費(fèi)端總體的消費(fèi)性能

          為了解決這個(gè)問(wèn)題,RocketMQ 在主題下面增加了隊(duì)列的概念:

          • 每個(gè)主題包含多個(gè)隊(duì)列,通過(guò)多個(gè)隊(duì)列來(lái)實(shí)現(xiàn)多實(shí)例并行生產(chǎn)和消費(fèi)。需要注意的是,RocketMQ 只在隊(duì)列上保證消息的有序性,主題層面是無(wú)法保證消息的嚴(yán)格順序的。
          • 生產(chǎn)者會(huì)往所有隊(duì)列發(fā)消息,但不是“同一條消息每個(gè)隊(duì)列都發(fā)一次”,每條消息只會(huì)往某個(gè)隊(duì)列里面發(fā)送一次。
          • 一個(gè)消費(fèi)組,每個(gè)隊(duì)列上只能串行消費(fèi),多個(gè)隊(duì)列加一起就是并行消費(fèi)了,并行度就是隊(duì)列數(shù)量,隊(duì)列數(shù)量越多并行度越大,所以水平擴(kuò)展可以提升消費(fèi)性能。
          • 每隊(duì)列每消費(fèi)組維護(hù)一個(gè)消費(fèi)位置(offset),記錄這個(gè)消費(fèi)組在這個(gè)隊(duì)列上消費(fèi)到哪兒了。
          • 訂閱者是通過(guò)消費(fèi)組(Consumer Group)來(lái)體現(xiàn)的。每個(gè)消費(fèi)組都消費(fèi)主題中一份完整的消息,不同消費(fèi)組之間消費(fèi)進(jìn)度彼此不受影響,也就是說(shuō),一條消息被 Consumer Group1 消費(fèi)過(guò),也會(huì)再給 Consumer Group2 消費(fèi)。
          • 消費(fèi)組中包含多個(gè)消費(fèi)者,同一個(gè)組內(nèi)的消費(fèi)者是競(jìng)爭(zhēng)消費(fèi)的關(guān)系,每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)組內(nèi)的一部分消息。如果一條消息被消費(fèi)者 Consumer1 消費(fèi)了,那同組的其他消費(fèi)者就不會(huì)再收到這條消息。
          • 由于消息需要被不同的組進(jìn)行多次消費(fèi),所以消費(fèi)完的消息并不會(huì)立即被刪除,這就需要 RocketMQ 為每個(gè)消費(fèi)組在每個(gè)隊(duì)列上維護(hù)一個(gè)消費(fèi)位置(Consumer Offset),這個(gè)位置之前的消息都被消費(fèi)過(guò),之后的消息都沒有被消費(fèi)過(guò),每成功消費(fèi)一條消息,消費(fèi)位置就加一。我們?cè)谑褂孟㈥?duì)列的時(shí)候**,丟消息的原因大多是由于消費(fèi)位置處理不當(dāng)導(dǎo)致的**。

          四、Kafka 的消息模型

          Kafka 的消息模型和 RocketMQ 是完全一樣的,唯一的區(qū)別是,在 Kafka 中,隊(duì)列這個(gè)概念的名稱不一樣,Kafka 中對(duì)應(yīng)的名稱是“分區(qū)(Partition)”,含義和功能是沒有任何區(qū)別的。

          五、總結(jié)

          • 常用的消息隊(duì)列中,RabbitMQ 采用的是隊(duì)列模型,但是它一樣可以實(shí)現(xiàn)發(fā)布 - 訂閱的功能。RocketMQ 和 Kafka 采用的是發(fā)布 - 訂閱模型,并且二者的消息模型是基本一致的。

          3.消息丟失怎么辦? 如何保證消息的可靠性傳輸?

          首先如何驗(yàn)證消息是否丟失?

          • 如果是 IT 基礎(chǔ)設(shè)施比較完善的公司,一般都有分布式鏈路追蹤系統(tǒng),使用類似的追蹤系統(tǒng)可以很方便地追蹤每一條消息。
          • 如果沒有這樣的追蹤系統(tǒng),我們可以利用消息隊(duì)列的有序性來(lái)驗(yàn)證是否有消息丟失

          即保證消息消費(fèi)順序的情況下,根據(jù)消息的序號(hào),在消費(fèi)段判斷是否連續(xù)

          解決方案:

          消息從生產(chǎn)到消費(fèi)的過(guò)程中,可以劃分三個(gè)階段:

          1、生產(chǎn)階段

          消息隊(duì)列通過(guò)最常用的請(qǐng)求確認(rèn)機(jī)制,來(lái)保證消息的可靠傳遞:當(dāng)你代碼調(diào)用發(fā)消息方法時(shí),消息隊(duì)列客戶端會(huì)把消息發(fā)送到Broker,Broker收到消息后,會(huì)給客戶端返回一個(gè)確認(rèn)響應(yīng),表明消息已收到??蛻舳耸盏巾憫?yīng)后,完成了一次正常消息的發(fā)送。

          有些消息隊(duì)列在長(zhǎng)時(shí)間沒收到發(fā)送確認(rèn)響應(yīng)后,會(huì)自動(dòng)重試,如果重試失敗,就會(huì)以返回值或者異常的方式告知用戶。在編寫發(fā)送消息的代碼時(shí),需要注意,正確處理返回值或者捕獲異常,就可以保證這個(gè)階段的消息不會(huì)丟失。

          同步發(fā)送時(shí),只要注意捕獲異常即可。

          異步發(fā)送時(shí),則需要在回調(diào)方法里進(jìn)行檢查。這個(gè)地方需要特別注意,很多丟消息的原因就是,我們使用了異步發(fā)送,卻沒有在回調(diào)中檢查發(fā)送結(jié)果。

          2、存儲(chǔ)階段

          在存儲(chǔ)階段正常情況下,只要Broker在正常運(yùn)行,就不會(huì)出現(xiàn)丟消息的問(wèn)題;但是如果Broker出現(xiàn)故障,比如進(jìn)程死掉或者服務(wù)器宕機(jī),還是可能會(huì)丟失消息的。

          如果對(duì)消息的可靠性要求非常高,可以通過(guò)配置Broker參數(shù)來(lái)避免因?yàn)殄礄C(jī)丟消息:

          • 對(duì)于單個(gè)節(jié)點(diǎn)的 Broker,需要配置 Broker 參數(shù),在收到消息后,將消息寫入磁盤后再給 Producer 返回確認(rèn)響應(yīng),這樣即使發(fā)生宕機(jī),由于消息已經(jīng)被寫入磁盤,就不會(huì)丟失消息,恢復(fù)后還可以繼續(xù)消費(fèi)。例如,在 RocketMQ 中,需要將刷盤方式 flushDiskType 配置為 SYNC_FLUSH 同步刷盤。
          • 對(duì)于 Broker 是由多個(gè)節(jié)點(diǎn)組成的集群,需要將 Broker 集群配置成:至少將消息發(fā)送到 2 個(gè)以上的節(jié)點(diǎn),再給客戶端回復(fù)發(fā)送確認(rèn)響應(yīng)。這樣當(dāng)某個(gè) Broker 宕機(jī)時(shí),其他的 Broker 可以替代宕機(jī)的 Broker,也不會(huì)發(fā)生消息丟失。

          3、消息階段

          消費(fèi)階段采用和生產(chǎn)階段類似的確認(rèn)機(jī)制來(lái)保證消息的可靠傳遞,客戶端從 Broker 拉取消息后,執(zhí)行用戶的消費(fèi)業(yè)務(wù)邏輯,成功后,才會(huì)給 Broker 發(fā)送消費(fèi)確認(rèn)響應(yīng)。如果 Broker 沒有收到消費(fèi)確認(rèn)響應(yīng),下次拉消息的時(shí)候還會(huì)返回同一條消息,確保消息不會(huì)在網(wǎng)絡(luò)傳輸過(guò)程中丟失,也不會(huì)因?yàn)榭蛻舳嗽趫?zhí)行消費(fèi)邏輯中出錯(cuò)導(dǎo)致丟失。

          在編寫消費(fèi)代碼時(shí)需要注意的是:不要在收到消息后就立即發(fā)送消費(fèi)確認(rèn),而是應(yīng)該在執(zhí)行完所有消費(fèi)業(yè)務(wù)邏輯之后,再發(fā)送消費(fèi)確認(rèn)。


          4.處理消費(fèi)過(guò)程中的重復(fù)消息

          在消息傳遞過(guò)程中,如果出現(xiàn)傳遞失敗的情況,發(fā)送方會(huì)執(zhí)行重試,重試過(guò)程中就有可能產(chǎn)生重復(fù)的消息。如果沒有對(duì)重復(fù)消息進(jìn)行處理,就可能導(dǎo)致系統(tǒng)的數(shù)據(jù)出現(xiàn)錯(cuò)誤。

          比如,一個(gè)消費(fèi)訂單消息,統(tǒng)計(jì)下單金額的微服務(wù),如果沒有正確處理重復(fù)消息,那就會(huì)出現(xiàn)重復(fù)統(tǒng)計(jì),導(dǎo)致統(tǒng)計(jì)結(jié)果錯(cuò)誤。

          一、消息重復(fù)的情況必然存在

          在MQTT協(xié)議中,給出了三種傳遞消息時(shí)能夠提供的服務(wù)質(zhì)量標(biāo)準(zhǔn):

          • At most once:至多一次。最多會(huì)被送達(dá)一次,也就是說(shuō)沒有消息可靠性保證,允許丟消息。一般都是一些對(duì)消息可靠性要求不高的監(jiān)控場(chǎng)景使用,比如每分鐘上報(bào)一次機(jī)房溫度數(shù)據(jù),可以接受數(shù)據(jù)少量丟失。
          • At least once:至少一次。至少會(huì)被送達(dá)一次,也就是說(shuō)不允許丟消息,但是允許有少量重復(fù)消息出現(xiàn)。
          • Exactly once:恰好一次。只會(huì)被送達(dá)一次,不允許丟失也不允許重復(fù),這個(gè)是最高等級(jí)。

          這個(gè)服務(wù)質(zhì)量標(biāo)準(zhǔn)不僅適用于 MQTT,對(duì)所有的消息隊(duì)列都是適用的。常用的絕大部分消息隊(duì)列提供的服務(wù)質(zhì)量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 。也就是說(shuō),消息隊(duì)列很難保證消息不重復(fù)。

          注意:Kafka 支持的“Exactly once”和我們剛剛提到的消息傳遞的服務(wù)質(zhì)量標(biāo)準(zhǔn)“Exactly once”是不一樣的,它是 Kafka 提供的另外一個(gè)特性,Kafka 中支持的事務(wù)也和我們通常意義理解的事務(wù)有一定的差異。在 Kafka 中,事務(wù)和 Excactly once 主要是為了配合流計(jì)算使用的特性。

          二、用冪等性解決重復(fù)消息問(wèn)題

          冪等本來(lái)是一個(gè)數(shù)學(xué)上的概念,它的定義是:如果一個(gè)函數(shù)f(x)滿足:f(f(x)) = f(x),則函數(shù)f(x)滿足米冪等性。擴(kuò)展到計(jì)算機(jī)領(lǐng)域,被用來(lái)描述一個(gè)操作、方法或者服務(wù)。

          • 一個(gè)冪等操作的特點(diǎn)是,其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。
          • 一個(gè)冪等方法,使用同樣的參數(shù),對(duì)它進(jìn)行多次調(diào)用和一次調(diào)用,對(duì)系統(tǒng)產(chǎn)生的影響是一樣的。所以不用擔(dān)心重復(fù)執(zhí)行會(huì)對(duì)系統(tǒng)造成任何改變。

          舉例:

          1、在不考慮并發(fā)的情況下,“將賬戶 X 的余額設(shè)置為 100 元”,執(zhí)行一次后對(duì)系統(tǒng)的影響是,賬戶 X 的余額變成了 100 元。只要提供的參數(shù) 100 元不變,那即使再執(zhí)行多少次,賬戶 X 的余額始終都是 100 元,不會(huì)變化,這個(gè)操作就是一個(gè)冪等的操作。

          2、“將賬戶 X 的余額加 100 元”,這個(gè)操作它就不是冪等的,每執(zhí)行一次,賬戶余額就會(huì)增加 100 元,執(zhí)行多次和執(zhí)行一次對(duì)系統(tǒng)的影響(也就是賬戶的余額)是不一樣的。

          如果消費(fèi)消息的業(yè)務(wù)邏輯具備冪等性,那就不用擔(dān)心消息重復(fù)的問(wèn)題,因?yàn)橥粭l消息,消費(fèi)一次和消費(fèi)多次對(duì)系統(tǒng)的影響是完全一樣的。消費(fèi)多次等于消費(fèi)一次。從對(duì)系統(tǒng)的影響結(jié)果來(lái)說(shuō):At least once + 冪等消費(fèi) = Exactly once。

          實(shí)現(xiàn)冪等操作最好的方式是,從業(yè)務(wù)邏輯設(shè)計(jì)上入手,將消費(fèi)的業(yè)務(wù)邏輯設(shè)計(jì)成具備冪等性的操作。

          常用的設(shè)計(jì)冪等操作的方法

          (1)利用數(shù)據(jù)庫(kù)的唯一約束實(shí)現(xiàn)冪等

          上面提到的那個(gè)不具備冪等特性的轉(zhuǎn)賬的例子:將賬戶 X 的余額加 100 元。在這個(gè)例子中,我們可以通過(guò)改造業(yè)務(wù)邏輯,讓它具備冪等性。

          首先,我們可以限定,對(duì)于每個(gè)轉(zhuǎn)賬單每個(gè)賬戶只可以執(zhí)行一次變更操作,在分布式系統(tǒng)中,這個(gè)限制實(shí)現(xiàn)的方法非常多,最簡(jiǎn)單的是我們?cè)跀?shù)據(jù)庫(kù)中建一張轉(zhuǎn)賬流水表,這個(gè)表有三個(gè)字段:轉(zhuǎn)賬單 ID、賬戶 ID 和變更金額,然后給轉(zhuǎn)賬單 ID 和賬戶 ID 這兩個(gè)字段聯(lián)合起來(lái)創(chuàng)建一個(gè)唯一約束,這樣對(duì)于相同的轉(zhuǎn)賬單 ID 和賬戶 ID,表里至多只能存在一條記錄。

          這樣,我們消費(fèi)消息的邏輯可以變?yōu)椋骸霸谵D(zhuǎn)賬流水表中增加一條轉(zhuǎn)賬記錄,然后再根據(jù)轉(zhuǎn)賬記錄,異步操作更新用戶余額即可?!痹谵D(zhuǎn)賬流水表增加一條轉(zhuǎn)賬記錄這個(gè)操作中,由于我們?cè)谶@個(gè)表中預(yù)先定義了“賬戶 ID 轉(zhuǎn)賬單 ID”的唯一約束,對(duì)于同一個(gè)轉(zhuǎn)賬單同一個(gè)賬戶只能插入一條記錄,后續(xù)重復(fù)的插入操作都會(huì)失敗,這樣就實(shí)現(xiàn)了一個(gè)冪等的操作。

          基于這個(gè)思路,不光是可以使用關(guān)系型數(shù)據(jù)庫(kù),只要是支持類似“INSERT IF NOT EXIST”語(yǔ)義的存儲(chǔ)類系統(tǒng)都可以用于實(shí)現(xiàn)冪等,比如,你可以用 Redis 的 SETNX 命令來(lái)替代數(shù)據(jù)庫(kù)中的唯一約束,來(lái)實(shí)現(xiàn)冪等消費(fèi)。

          (2)為更新的數(shù)據(jù)設(shè)置前置條件

          給數(shù)據(jù)變更設(shè)置一個(gè)前置條件,如果滿足條件就更新數(shù)據(jù),否則拒絕更新數(shù)據(jù),在更新數(shù)據(jù)的時(shí)候,同時(shí)變更前置條件中需要判斷的數(shù)據(jù)。這樣,重復(fù)執(zhí)行這個(gè)操作時(shí),由于第一次更新數(shù)據(jù)的時(shí)候已經(jīng)變更了前置條件中需要判斷的數(shù)據(jù),不滿足前置條件,則不會(huì)重復(fù)執(zhí)行更新數(shù)據(jù)操作。

          比如,“將賬戶 X 的余額增加 100 元”這個(gè)操作并不滿足冪等性,我們可以把這個(gè)操作加上一個(gè)前置條件,變?yōu)椋骸叭绻~戶 X 當(dāng)前的余額為 500 元,將余額加 100 元”,這個(gè)操作就具備了冪等性。對(duì)應(yīng)到消息隊(duì)列中的使用時(shí),可以在發(fā)消息時(shí)在消息體中帶上當(dāng)前的余額,在消費(fèi)的時(shí)候進(jìn)行判斷數(shù)據(jù)庫(kù)中,當(dāng)前余額是否與消息中的余額相等,只有相等才執(zhí)行變更操作。

          但是,如果我們要更新的數(shù)據(jù)不是數(shù)值,或者我們要做一個(gè)比較復(fù)雜的更新操作怎么辦?用什么作為前置判斷條件呢?更加通用的方法是,給你的數(shù)據(jù)增加一個(gè)版本號(hào)屬性,每次更數(shù)據(jù)前,比較當(dāng)前數(shù)據(jù)的版本號(hào)是否和消息中的版本號(hào)一致,如果不一致就拒絕更新數(shù)據(jù),更新數(shù)據(jù)的同時(shí)將版本號(hào) +1,一樣可以實(shí)現(xiàn)冪等更新。

          (3)記錄并檢查操作

          如果上面提到的兩種實(shí)現(xiàn)冪等方法都不能適用于你的場(chǎng)景,還有一種通用性最強(qiáng),適用范圍最廣的實(shí)現(xiàn)冪等性方法:記錄并檢查操作,也稱為“Token 機(jī)制或者 GUID(全局唯一 ID)機(jī)制”,實(shí)現(xiàn)的思路特別簡(jiǎn)單:在執(zhí)行數(shù)據(jù)更新操作之前,先檢查一下是否執(zhí)行過(guò)這個(gè)更新操作。這種方法適用范圍最廣,但是實(shí)現(xiàn)難度和復(fù)雜度也比較高,一般不推薦使用。

          具體的實(shí)現(xiàn)方法是,在發(fā)送消息時(shí),給每條消息指定一個(gè)全局唯一的 ID,消費(fèi)時(shí),先根據(jù)這個(gè) ID 檢查這條消息是否有被消費(fèi)過(guò),如果沒有消費(fèi)過(guò),才更新數(shù)據(jù),然后將消費(fèi)狀態(tài)置為已消費(fèi)

          在分布式系統(tǒng)中,這個(gè)方法其實(shí)是非常難實(shí)現(xiàn)的。首先,給每個(gè)消息指定一個(gè)全局唯一的 ID 就是一件不那么簡(jiǎn)單的事兒,方法有很多,但都不太好同時(shí)滿足簡(jiǎn)單、高可用和高性能,或多或少都要有些犧牲。更加麻煩的是,在“檢查消費(fèi)狀態(tài),然后更新數(shù)據(jù)并且設(shè)置消費(fèi)狀態(tài)”中,三個(gè)操作必須作為一組操作保證原子性,才能真正實(shí)現(xiàn)冪等,否則就會(huì)出現(xiàn) Bug。

          比如說(shuō),對(duì)于同一條消息:“全局 ID 為 8,操作為:給 ID 為 666 賬戶增加 100 元”,有可能出現(xiàn)這樣的情況:

          • t0 時(shí)刻:Consumer A 收到條消息,檢查消息執(zhí)行狀態(tài),發(fā)現(xiàn)消息未處理過(guò),開始執(zhí)行“賬戶增加 100 元”;
          • t1 時(shí)刻:Consumer B 收到條消息,檢查消息執(zhí)行狀態(tài),發(fā)現(xiàn)消息未處理過(guò),因?yàn)檫@個(gè)時(shí)刻,Consumer A 還未來(lái)得及更新消息執(zhí)行狀態(tài)。

          這樣就會(huì)導(dǎo)致賬戶被錯(cuò)誤地增加了兩次 100 元,這是一個(gè)在分布式系統(tǒng)中非常容易犯的錯(cuò)誤,一定要引以為戒。對(duì)于這個(gè)問(wèn)題,當(dāng)然我們可以用事務(wù)來(lái)實(shí)現(xiàn),也可以用鎖來(lái)實(shí)現(xiàn),但是在分布式系統(tǒng)中,無(wú)論是分布式事務(wù)還是分布式鎖都是比較難解決問(wèn)題。


          5.利用事務(wù)消息實(shí)現(xiàn)分布式事務(wù)

          一、消息事務(wù)

          其實(shí)很多場(chǎng)景下,我們“發(fā)消息”這個(gè)過(guò)程,目的往往是通知另外一個(gè)系統(tǒng)或者模塊去更新數(shù)據(jù),消息隊(duì)列中的“事務(wù)”,主要解決消息生產(chǎn)者和消息消費(fèi)者的數(shù)據(jù)一致性問(wèn)題。

          用戶在電商APP上購(gòu)物時(shí),先把商品加到購(gòu)物車?yán)?,然后幾件商品一起下單,最后支付,完成?gòu)物流程。

          這個(gè)過(guò)程中有一個(gè)需要用到消息隊(duì)列的步驟,訂單系統(tǒng)創(chuàng)建訂單后,發(fā)消息給購(gòu)物車系統(tǒng),將已下單的商品從購(gòu)物車中刪除。因?yàn)閺馁?gòu)物車刪除已下單商品這個(gè)步驟,并不是用戶下單支付這個(gè)主要流程中必要的步驟,使用消息隊(duì)列來(lái)異步清理購(gòu)物車是更加合理。

          對(duì)于訂單系統(tǒng),它創(chuàng)建訂單的過(guò)程實(shí)際執(zhí)行了2個(gè)步驟的操作:

          • 在訂單庫(kù)中插入一條訂單數(shù)據(jù),創(chuàng)建訂單;
          • 發(fā)消息給消息隊(duì)列,消息的內(nèi)容就是剛剛創(chuàng)建的訂單

          對(duì)于購(gòu)物車系統(tǒng):

          • 訂閱相應(yīng)的主題,接收訂單創(chuàng)建的消息,然后清理購(gòu)物車,在購(gòu)物車中刪除訂單的商品。

          在分布式系統(tǒng)中,上面提到的步驟,任何一個(gè)都有可能失敗,如果不做任何處理,那就有可能出現(xiàn)訂單數(shù)據(jù)與購(gòu)物車數(shù)據(jù)不一致的情況,比如:

          • 創(chuàng)建了訂單,沒有清理購(gòu)物車;
          • 訂單沒創(chuàng)建成功,購(gòu)物車?yán)锩娴纳唐穮s被清掉了。

          所以我們需要解決的問(wèn)題為:在上述任意步驟都有可能失敗的情況下,還要保證訂單庫(kù)和購(gòu)物車庫(kù)這兩個(gè)庫(kù)的數(shù)據(jù)一致性。

          二、分布式事務(wù)

          分布式事務(wù)就是要在分布式系統(tǒng)中實(shí)現(xiàn)事務(wù)。在分布式系統(tǒng)中,在保證可用性和不嚴(yán)重犧牲性能的前提下,光是要實(shí)現(xiàn)數(shù)據(jù)的一致性就已經(jīng)非常困難了,顯然實(shí)現(xiàn)嚴(yán)格的分布式事務(wù)是更加不可能完成的任務(wù)。所以目前大家所說(shuō)的分布式事務(wù),更多情況下,是在分布式系統(tǒng)中事務(wù)的不完整實(shí)現(xiàn),在不同的應(yīng)用場(chǎng)景中,有不同的實(shí)現(xiàn),目的都是通過(guò)一些妥協(xié)來(lái)解決實(shí)際問(wèn)題。

          常見的分布式事務(wù)實(shí)現(xiàn):

          • 2PC(Two-phase Commit,也叫二階段提交)
          • TCC(Try-Confirm-Cancel)
          • 事務(wù)消息

          每一種實(shí)現(xiàn)都有其特定的使用場(chǎng)景,也有各自的問(wèn)題,都不是完美的解決方案。

          事務(wù)消息適用的場(chǎng)景主要是那些需要異步更新數(shù)據(jù),并且對(duì)數(shù)據(jù)實(shí)時(shí)性要求不太高的場(chǎng)景。比如在創(chuàng)建訂單后,如果出現(xiàn)短暫的幾秒,購(gòu)物車?yán)锏纳唐窙]有被及時(shí)情況,也不是完全不可接受的,只要最終購(gòu)物車的數(shù)據(jù)和訂單數(shù)據(jù)保持一致就可。

          三、消息隊(duì)列實(shí)現(xiàn)分布式事務(wù)

          事務(wù)消息需要消息隊(duì)列提供相應(yīng)的功能才能實(shí)現(xiàn),kafka和RocketMQ都提供了事務(wù)相關(guān)功能。

          對(duì)于訂單系統(tǒng):

          • 首先,訂單系統(tǒng)在消息隊(duì)列上開啟一個(gè)事務(wù)。
          • 然后訂單系統(tǒng)給消息服務(wù)器發(fā)送一個(gè)“半消息”,這個(gè)半消息不是說(shuō)消息內(nèi)容不完整,它包含的內(nèi)容就是完整的消息內(nèi)容,半消息和普通消息的唯一區(qū)別是,在事務(wù)提交之前,對(duì)于消費(fèi)者來(lái)說(shuō),這個(gè)消息是不可見的。
          • 半消息發(fā)送成功后,訂單系統(tǒng)就可以執(zhí)行本地事務(wù)了,在訂單庫(kù)中創(chuàng)建一條訂單記錄,并提交訂單庫(kù)的數(shù)據(jù)庫(kù)事務(wù)。
          • 然后根據(jù)本地事務(wù)的執(zhí)行結(jié)果決定提交或者回滾事務(wù)消息。如果訂單創(chuàng)建成功,那就提交事務(wù)消息,購(gòu)物車系統(tǒng)就可以消費(fèi)到這條消息繼續(xù)后續(xù)的流程。如果訂單創(chuàng)建失敗,那就回滾事務(wù)消息,購(gòu)物車系統(tǒng)就不會(huì)收到這條消息。這樣就基本實(shí)現(xiàn)了“要么都成功,要么都失敗”的一致性要求。

          對(duì)于購(gòu)物車系統(tǒng):

          • 對(duì)于購(gòu)物車系統(tǒng)收到訂單創(chuàng)建成功消息清理購(gòu)物車這個(gè)操作來(lái)說(shuō),失敗的處理比較簡(jiǎn)單,只要成功執(zhí)行購(gòu)物車清理后再提交消費(fèi)確認(rèn)即可,如果失敗,由于沒有提交消費(fèi)確認(rèn),消息隊(duì)列會(huì)自動(dòng)重試

          如果在第四步提交事務(wù)消息時(shí)失敗了怎么辦?Kafka 和 RocketMQ 給出了 2 種不同的解決方案:

          1、Kafka 的解決方案:

          直接拋出異常,讓用戶自行處理。我們可以在業(yè)務(wù)代碼中反復(fù)重試提交,直到提交成功,或者刪除之前創(chuàng)建的訂單進(jìn)行補(bǔ)償。

          2、RocketMQ 的解決方案:

          在 RocketMQ 中的事務(wù)實(shí)現(xiàn)中,增加了事務(wù)反查的機(jī)制來(lái)解決事務(wù)消息提交失敗的問(wèn)題。如果 Producer 也就是訂單系統(tǒng),在提交或者回滾事務(wù)消息時(shí)發(fā)生網(wǎng)絡(luò)異常,RocketMQ 的 Broker 沒有收到提交或者回滾的請(qǐng)求,Broker 會(huì)定期去 Producer 上反查這個(gè)事務(wù)對(duì)應(yīng)的本地事務(wù)的狀態(tài),然后根據(jù)反查結(jié)果決定提交或者回滾這個(gè)事務(wù)。為了支撐這個(gè)事務(wù)反查機(jī)制,我們的業(yè)務(wù)代碼需要實(shí)現(xiàn)一個(gè)反查本地事務(wù)狀態(tài)的接口,告知 RocketMQ 本地事務(wù)是成功還是失敗。

          綜合上面講的通用事務(wù)消息的實(shí)現(xiàn)和 RocketMQ 的事務(wù)反查機(jī)制,使用 RocketMQ 事務(wù)消息功能實(shí)現(xiàn)分布式事務(wù)的流程如下圖:


          6.消息隊(duì)列中的順序問(wèn)題

          當(dāng)我們說(shuō)順序時(shí),我們?cè)谡f(shuō)什么?

          日常思維中,順序大部分情況會(huì)和時(shí)間關(guān)聯(lián)起來(lái),即時(shí)間的先后表示事件的順序關(guān)系。

          比如事件A發(fā)生在下午3點(diǎn)一刻,而事件B發(fā)生在下午4點(diǎn),那么我們認(rèn)為事件A發(fā)生在事件B之前,他們的順序關(guān)系為先A后B。

          上面的例子之所以成立是因?yàn)樗麄冇邢嗤膮⒖枷担此麄兊臅r(shí)間是對(duì)應(yīng)的同一個(gè)物理時(shí)鐘的時(shí)間。如果A發(fā)生的時(shí)間是北京時(shí)間,而B依賴的時(shí)間是東京時(shí)間,那么先A后B的順序關(guān)系還成立嗎?

          如果沒有一個(gè)絕對(duì)的時(shí)間參考,那么A和B之間還有順序嗎,或者說(shuō)怎么斷定A和B的順序?

          顯而易見的,如果A、B兩個(gè)事件之間如果是有因果關(guān)系的,那么A一定發(fā)生在B之前(前因后果,有因才有果)。相反,在沒有一個(gè)絕對(duì)的時(shí)間的參考的情況下,若A、B之間沒有因果關(guān)系,那么A、B之間就沒有順序關(guān)系。

          那么,我們?cè)谡f(shuō)順序時(shí),其實(shí)說(shuō)的是:

          • 有絕對(duì)時(shí)間參考的情況下,事件的發(fā)生時(shí)間的關(guān)系;
          • 和沒有時(shí)間參考下的,一種由因果關(guān)系推斷出來(lái)的happening before的關(guān)系;

          在分布式環(huán)境中討論順序

          當(dāng)把順序放到分布式環(huán)境(多線程、多進(jìn)程都可以認(rèn)為是一個(gè)分布式的環(huán)境)中去討論時(shí):

          • 同一線程上的事件順序是確定的,可以認(rèn)為他們有相同的時(shí)間作為參考
          • 不同線程間的順序只能通過(guò)因果關(guān)系去推斷

          (點(diǎn)表示事件,波浪線箭頭表示事件間的消息)

          上圖中,進(jìn)程P中的事件順序?yàn)閜1->p2->p3->p4(時(shí)間推斷)。而因?yàn)閜1給進(jìn)程Q的q2發(fā)了消息,那么p1一定在q2之前(因果推斷)。但是無(wú)法確定p1和q1之間的順序關(guān)系。

          推薦閱讀《Time, Clocks, and the Ordering of Events in a Distributed System》,會(huì)透徹的分析分布式系統(tǒng)中的順序問(wèn)題。

          消息中間件中的順序消息

          什么是順序消息

          有了上述的基礎(chǔ)之后,我們回到本篇文章的主題中,聊一聊消息中間件中的順序消息。

          順序消息(FIFO 消息)是 MQ 提供的一種嚴(yán)格按照順序進(jìn)行發(fā)布和消費(fèi)的消息類型。順序消息由兩個(gè)部分組成:順序發(fā)布和順序消費(fèi)。

          順序消息包含兩種類型:

          分區(qū)順序:一個(gè)Partition內(nèi)所有的消息按照先進(jìn)先出的順序進(jìn)行發(fā)布和消費(fèi)

          全局順序:一個(gè)Topic內(nèi)所有的消息按照先進(jìn)先出的順序進(jìn)行發(fā)布和消費(fèi)

          這是阿里云上對(duì)順序消息的定義,把順序消息拆分成了順序發(fā)布和順序消費(fèi)。那么多線程中發(fā)送消息算不算順序發(fā)布?

          如上一部分介紹的,多線程中若沒有因果關(guān)系則沒有順序。那么用戶在多線程中去發(fā)消息就意味著用戶不關(guān)心那些在不同線程中被發(fā)送的消息的順序。即多線程發(fā)送的消息,不同線程間的消息不是順序發(fā)布的,同一線程的消息是順序發(fā)布的。這是需要用戶自己去保障的。

          而對(duì)于順序消費(fèi),則需要保證哪些來(lái)自同一個(gè)發(fā)送線程的消息在消費(fèi)時(shí)是按照相同的順序被處理的(為什么不說(shuō)他們應(yīng)該在一個(gè)線程中被消費(fèi)呢?)。

          全局順序其實(shí)是分區(qū)順序的一個(gè)特例,即使Topic只有一個(gè)分區(qū)(以下不在討論全局順序,因?yàn)槿猪樞驅(qū)⒚媾R性能的問(wèn)題,而且絕大多數(shù)場(chǎng)景都不需要全局順序)。

          如何保證順序

          在MQ的模型中,順序需要由3個(gè)階段去保障:

          1. 消息被發(fā)送時(shí)保持順序
          2. 消息被存儲(chǔ)時(shí)保持和發(fā)送的順序一致
          3. 消息被消費(fèi)時(shí)保持和存儲(chǔ)的順序一致

          發(fā)送時(shí)保持順序意味著對(duì)于有順序要求的消息,用戶應(yīng)該在同一個(gè)線程中采用同步的方式發(fā)送。存儲(chǔ)保持和發(fā)送的順序一致則要求在同一線程中被發(fā)送出來(lái)的消息A和B,存儲(chǔ)時(shí)在空間上A一定在B之前。而消費(fèi)保持和存儲(chǔ)一致則要求消息A、B到達(dá)Consumer之后必須按照先A后B的順序被處理。

          如下圖所示:

          對(duì)于兩個(gè)訂單的消息的原始數(shù)據(jù):a1、b1、b2、a2、a3、b3(絕對(duì)時(shí)間下發(fā)生的順序):

          • 在發(fā)送時(shí),a訂單的消息需要保持a1、a2、a3的順序,b訂單的消息也相同,但是a、b訂單之間的消息沒有順序關(guān)系,這意味著a、b訂單的消息可以在不同的線程中被發(fā)送出去

          • 在存儲(chǔ)時(shí),需要分別保證a、b訂單的消息的順序,但是a、b訂單之間的消息的順序可以不保證


            • a1、b1、b2、a2、a3、b3是可以接受的
            • a1、a2、b1、b2、a3、b3也是可以接受的
            • a1、a3、b1、b2、a2、b3是不能接受的
          • 消費(fèi)時(shí)保證順序的簡(jiǎn)單方式就是“什么都不做”,不對(duì)收到的消息的順序進(jìn)行調(diào)整,即只要一個(gè)分區(qū)的消息只由一個(gè)線程處理即可;當(dāng)然,如果a、b在一個(gè)分區(qū)中,在收到消息后也可以將他們拆分到不同線程中處理,不過(guò)要權(quán)衡一下收益

          開源RocketMQ中順序的實(shí)現(xiàn)

          上圖是RocketMQ順序消息原理的介紹,將不同訂單的消息路由到不同的分區(qū)中。文檔只是給出了Producer順序的處理,Consumer消費(fèi)時(shí)通過(guò)一個(gè)分區(qū)只能有一個(gè)線程消費(fèi)的方式來(lái)保證消息順序,具體實(shí)現(xiàn)如下。

          Producer端

          Producer端確保消息順序唯一要做的事情就是將消息路由到特定的分區(qū),在RocketMQ中,通過(guò)MessageQueueSelector來(lái)實(shí)現(xiàn)分區(qū)的選擇。

          • Listmqs:消息要發(fā)送的Topic下所有的分區(qū)
          • Message msg:消息對(duì)象
          • 額外的參數(shù):用戶可以傳遞自己的參數(shù)

          比如如下實(shí)現(xiàn)就可以保證相同的訂單的消息被路由到相同的分區(qū):

          long orderId = ((Order) object).getOrderId;return mqs.get(orderId % mqs.size());

          Consumer端

          RocketMQ消費(fèi)端有兩種類型:MQPullConsumer和MQPushConsumer。

          MQPullConsumer由用戶控制線程,主動(dòng)從服務(wù)端獲取消息,每次獲取到的是一個(gè)MessageQueue中的消息。PullResult中的List msgFoundList自然和存儲(chǔ)順序一致,用戶需要再拿到這批消息后自己保證消費(fèi)的順序。

          對(duì)于PushConsumer,由用戶注冊(cè)MessageListener來(lái)消費(fèi)消息,在客戶端中需要保證調(diào)用MessageListener時(shí)消息的順序性。RocketMQ中的實(shí)現(xiàn)如下:

          1. PullMessageService單線程的從Broker獲取消息
          2. PullMessageService將消息添加到ProcessQueue中(ProcessMessage是一個(gè)消息的緩存),之后提交一個(gè)消費(fèi)任務(wù)到ConsumeMessageOrderService
          3. ConsumeMessageOrderService多線程執(zhí)行,每個(gè)線程在消費(fèi)消息時(shí)需要拿到MessageQueue的鎖
          4. 拿到鎖之后從ProcessQueue中獲取消息

          保證消費(fèi)順序的核心思想是:

          • 獲取到消息后添加到ProcessQueue中,單線程執(zhí)行,所以ProcessQueue中的消息是順序的
          • 提交的消費(fèi)任務(wù)時(shí)提交的是“對(duì)某個(gè)MQ進(jìn)行一次消費(fèi)”,這次消費(fèi)請(qǐng)求是從ProcessQueue中獲取消息消費(fèi),所以也是順序的(無(wú)論哪個(gè)線程獲取到鎖,都是按照ProcessQueue中消息的順序進(jìn)行消費(fèi))

          順序和異常的關(guān)系

          順序消息需要Producer和Consumer都保證順序。Producer需要保證消息被路由到正確的分區(qū),消息需要保證每個(gè)分區(qū)的數(shù)據(jù)只有一個(gè)線程消息,那么就會(huì)有一些缺陷:

          • 發(fā)送順序消息無(wú)法利用集群的Failover特性,因?yàn)椴荒芨鼡QMessageQueue進(jìn)行重試
          • 因?yàn)榘l(fā)送的路由策略導(dǎo)致的熱點(diǎn)問(wèn)題,可能某一些MessageQueue的數(shù)據(jù)量特別大
          • 消費(fèi)的并行讀依賴于分區(qū)數(shù)量
          • 消費(fèi)失敗時(shí)無(wú)法跳過(guò)

          不能更換MessageQueue重試就需要MessageQueue有自己的副本,通過(guò)Raft、Paxos之類的算法保證有可用的副本,或者通過(guò)其他高可用的存儲(chǔ)設(shè)備來(lái)存儲(chǔ)MessageQueue。

          熱點(diǎn)問(wèn)題好像沒有什么好的解決辦法,只能通過(guò)拆分MessageQueue和優(yōu)化路由方法來(lái)盡量均衡的將消息分配到不同的MessageQueue。

          消費(fèi)并行度理論上不會(huì)有太大問(wèn)題,因?yàn)镸essageQueue的數(shù)量可以調(diào)整。

          消費(fèi)失敗的無(wú)法跳過(guò)是不可避免的,因?yàn)樘^(guò)可能導(dǎo)致后續(xù)的數(shù)據(jù)處理都是錯(cuò)誤的。不過(guò)可以提供一些策略,由用戶根據(jù)錯(cuò)誤類型來(lái)決定是否跳過(guò),并且提供重試隊(duì)列之類的功能,在跳過(guò)之后用戶可以在“其他”地方重新消費(fèi)到這條消息。


          鳴謝

          感謝極客時(shí)間所屬的《消息隊(duì)列高手課鏈接[1]


          最后

          本篇是一篇大合集,中間肯定參考了許多其他人的文章內(nèi)容或圖片,但由于時(shí)間比較久遠(yuǎn),當(dāng)時(shí)并沒有一一記錄,為此表示歉意,如果有作者發(fā)現(xiàn)了自己的文章或圖片,可以私聊我,我會(huì)進(jìn)行補(bǔ)充。

          如果你發(fā)現(xiàn)寫的還不錯(cuò),可以搜索公眾號(hào)「是Kerwin啊」,一起進(jìn)步!

          也可以查看Kerwin的GitHub主頁(yè)[2]

          參考資料

          [1]

          鏈接: https://time.geekbang.org/column/intro/100032301

          [2]

          Kerwin的GitHub主頁(yè): https://github.com/kkzhilu



          往期推薦



          哇,ElasticSearch多字段權(quán)重排序居然可以這么玩

          微服務(wù)的戰(zhàn)爭(zhēng):按什么維度拆分服務(wù)

          一款直擊痛點(diǎn)的優(yōu)秀 HTTP 框架,讓我超高效完成了第三方接口的對(duì)接

          給你的SpringBoot做埋點(diǎn)監(jiān)控--JVM應(yīng)用度量框架Micrometer


          后臺(tái)回復(fù)?學(xué)習(xí)資料?領(lǐng)取學(xué)習(xí)視頻


          如有收獲,點(diǎn)個(gè)在看,誠(chéng)摯感謝

          瀏覽 48
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲AV成人影视网 | 大香蕉人妻在线 | 国产豆花在线综合 | 中文字幕日本精品5 | 黄片免费看视频 |