<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RocketMq架構簡析

          共 6890字,需瀏覽 14分鐘

           ·

          2021-01-05 16:38

          Apache RocketMQ是阿里開源的一款高性能、高吞吐量的分布式消息中間件。

          整體結構


          RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負責生產(chǎn)消息,Consumer 負責消費消息,Broker 負責存儲消息。每個 Broker 可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲于集群中的不同的Broker Group。


          Namesrv


          說道Namesrv首先會想到服務注冊與發(fā)現(xiàn)。分布式服務SOA架構體系中會有服務注冊與發(fā)現(xiàn)中心。主要作用是指導服務調(diào)用方找到服務提供者提供的服務實例。RocketMQ體系中Namesrv主要作用是:為producer和consumer提供關于topic的路由信息。管理broker節(jié)點:監(jiān)控更新broker的實時狀態(tài)。路由注冊、路由刪除(故障剔除)。


          Namesrv充當路由消息的提供者。Namesrv是一個幾乎無狀態(tài)節(jié)點,多個Namesrv實例組成集群,但相互獨立,沒有信息交換。


          1. 路由元信息
          • topicQueueTable:topic 消息隊列路由信息。
          • brokerAddrTable:broker基礎信息。包含broker name,所屬集群名稱,主broker地址等。
          • clusterAddrTable:broker集群信息,存儲集群中所有broker的名稱。
          • brokerLiveTable:broker狀態(tài)信息。
          • filterServerTable:broker上的filterServer列表。filterServer用于消息過濾。
          1. 路由注冊??RocketMQ路由注冊是通過broker與Namesrv的心跳功能實現(xiàn)的。broker啟動時向集群中所有Namesrv發(fā)送心跳包,之后每隔30秒向集群中所有Namesrv發(fā)送心跳包。心跳包中包含:broker集群信息、broker信息、topic配置信息、broker關聯(lián)的FilterServer列表等。如果brokerA為Master。并且brokerA上的topic1的配置信息發(fā)生變化或初次注冊,Namesrv會根據(jù)報文創(chuàng)建或更新Topic路由元數(shù)據(jù),填充topicQueueTable。
          2. 路由刪除??Namesrv收到brokerA的心跳包會更新brokerLiveTable中的brokerA對應的BrokerLiveInfo中的lastUpdateTimestamp。Namesrv每隔10秒掃描brokerLiveTable一次。如果brokerA對應的BrokerLiveInfo 中 lastUpdateTimestamp距當前時間超過 120秒,Namesrv認為brokerA失效,會將brokerA的路由信息移除并關閉與broker的socket連接。更新:topicQueueInfo、brokerAddrTable、brokerLiveTable、filterServerTable等。
          3. 路由發(fā)現(xiàn)??RocketMQ路由發(fā)現(xiàn)是非實時的。當Topic路由信息發(fā)生變化是,Namesrv不會主動推送給客戶端(Producer、Consumer)。而是由客戶端定時到Namesrv拉去最新的路由信息并緩存(包含Topic路由信息)。


          與kafka對比
          kafka 由zookeeper集群提供命名服務(Naming Service)。
          Kafka通過 ZooKeeper 管理集群配置、選舉 Leader 以及在 consumer g


          Broker

          消息中轉(zhuǎn)角色,負責存儲消息、轉(zhuǎn)發(fā)消息。代理服務器在RocketMQ系統(tǒng)中負責接收從生產(chǎn)者發(fā)送來的消息并存儲、同時為消費者的拉取請求作準備。代理服務器也存儲消息相關的元數(shù)據(jù),包括消費者組、消費進度偏移和主題和隊列消息等。
          Broker是以group為單位提供服務。一個group里面分Master和Slave。Master和Slave存儲的數(shù)據(jù)一樣,slave從master同步數(shù)據(jù)(同步雙寫或異步復制看配置)。一個Master可以對應多個Slave,一個Slave只能對應一個Master。Master與Slave的對應關系通過指定相同的BrokerName、不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。broker不必須是物理機或虛擬機:


          每個Broker與Namesrv集群中的所有節(jié)點建立長連接,定時發(fā)送心跳包到所有Namesrv,更新broker信息、topic路由信息等。一個Topic的不同queue(分區(qū))可分布到集群中不同的broker group上。


          與kafka對比:
          kafka和RocketMQ的broker都可以容納多個一個或多個分區(qū)數(shù)據(jù)(kafka分區(qū):partition;RocketMQ分區(qū):queue)
          kafka基于partition(分區(qū)) 做備份/高可用(partition follower)。
          RocketMQ增加了broker group的概念,基于broker(可能包含多個分區(qū))


          Producer

          (消息)生產(chǎn)者。Producer與Namesrv集群中的其中一個節(jié)點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,并向提供Topic服務的broker master建立長連接,且定時向broker master發(fā)送心跳。Producer完全無狀態(tài),可集群部署。
          Producer負責生產(chǎn)消息,一般由業(yè)務系統(tǒng)負責生產(chǎn)消息。一個消息生產(chǎn)者會把業(yè)務應用系統(tǒng)里產(chǎn)生的消息發(fā)送到broker服務器。RocketMQ提供多種發(fā)送方式,同步發(fā)送、異步發(fā)送、順序發(fā)送、單向發(fā)送。同步和異步方式均需要Broker返回確認信息,單向發(fā)送不需要。


          Consumer

          (消息)消費者 Consumer與Namesrv集群中的其中一個節(jié)點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,并向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發(fā)送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規(guī)則由Broker配置決定。
          Consumer負責消費消息,一般是后臺系統(tǒng)負責異步消費。一個消息消費者會從Broker服務器拉取消息、并將其提供給應用程序。從用戶應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。

          集群模式下:相同Consumer Group的每個Consumer實例平均分攤消息。一個條消息僅能被一個Consumer Group消費一次。

          Producer、Consumer都只需要和集群中一個Namesrv建立長連接。Broker需要向集群中所有的Namesrv發(fā)送心跳包。
          其實很好理解:
          Namesrv集群提供高可用的命名服務。
          Producer、Consumer只需要從其中一臺定期同步路由信息。
          如果Broker只隨機調(diào)一臺發(fā)送心跳包。那么不同的Namesrv保存的路由信息會出現(xiàn)


          消費者類型:


          1. 拉取式消費(Pull Consumer) Consumer消費的一種類型,應用通常主動調(diào)用Consumer的拉消息方法從Broker服務器拉消息、主動權由應用控制。一旦獲取了批量消息,應用就會啟動消費過程。Pull方式里,取消息的過程需要用戶自己寫(包括提交offset等操作)。


          2. 推動式消費(Push Consumer) Consumer消費的一種類型,該模式下Broker收到數(shù)據(jù)后會主動推送給消費端,該消費模式一般實時性較高。Push Consumer原理上也是采取pull模式。實際上就是長輪詢的pull模式。



          一些概念


          1. 主題(Topic) 表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬于一個主題,是RocketMQ進行消息訂閱的基本單位。每個topic可分為若干個分區(qū)(queue)

          2. 生產(chǎn)者組(Producer Group) 同一類Producer的集合,這類Producer發(fā)送同一類消息且發(fā)送邏輯一致。如果發(fā)送的是事務消息且原始生產(chǎn)者在發(fā)送之后崩潰,則Broker服務器會聯(lián)系同一生產(chǎn)者組的其他生產(chǎn)者實例以提交或回溯消費。

          3. 消費者組(Consumer Group) 同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一致。消費者組使得在消息消費方面,實現(xiàn)負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者實例必須訂閱完全相同的Topic。RocketMQ 支持兩種消息模式:集群消費(Clustering)和廣播消費(Broadcasting)。

          4. 普通順序消息(Normal Ordered Message) 普通順序消費模式下,消費者通過同一個消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。

          5. 嚴格順序消息(Strictly Ordered Message) 嚴格順序消息模式下,消費者收到的所有消息均是有順序的。

          6. 消息(Message) 消息系統(tǒng)所傳輸信息的物理載體,生產(chǎn)和消費數(shù)據(jù)的最小單位,每條消息必須屬于一個主題。RocketMQ中每個消息擁有唯一的Message ID,且可以攜帶具有業(yè)務標識的Key。系統(tǒng)提供了通過Message ID和Key查詢消息的功能。

          7. 標簽(Tag) 為消息設置的標志,用于同一主題下區(qū)分不同類型的消息。來自同一業(yè)務單元的消息,可以根據(jù)不同業(yè)務目的在同一主題下設置不同標簽。標簽能夠有效地保持代碼的清晰度和連貫性,并優(yōu)化RocketMQ提供的查詢系統(tǒng)。消費者可以根據(jù)Tag實現(xiàn)對不同子主題的不同消費邏輯,實現(xiàn)更好的擴展性。


          關于消息中間件

          消息中間件需要解決的問題:異步化、削峰填谷。

          消息中間件應具備的基礎能力是:消息發(fā)布、訂閱、消費。概念相對簡單這里不過多描述。

          消息中間件的一些重要的機制:


          1. 消息優(yōu)先級(Message Priority;RocketMQ不支持)

          優(yōu)先級是指在一個消息隊列中,每條消息都有不同的優(yōu)先級,一般用整數(shù)來描述,優(yōu)先級高的消息先投遞,如果消息完全在一個內(nèi)存隊列中,那么在投遞前可以按照優(yōu)先級排序,令優(yōu)先級高的先投遞。由于RocketMQ所有消息都是持久化的,所以如果按照優(yōu)先級來排序,開銷會非常大,因此RocketMQ沒有特意支持消息優(yōu)先級,但是可以通過變通的方式實現(xiàn)類似功能,即單獨配置一個優(yōu)先級高的隊列,和一個普通優(yōu)先級的隊列,將不同優(yōu)先級發(fā)送到不同隊列即可。


          2. 順序消息(Message Order)

          消息有序指的是一類消息消費時,能按照發(fā)送的順序來消費。例如:一個訂單產(chǎn)生了3條消息,分別是訂單創(chuàng)建,訂單付款,訂單完成。消費時,要按照這個順序消費才能有意義。但是同時訂單之間是可以并行消費的。RocketMQ可以嚴格的保證消息有序。


          • 投遞消息的順序性:投遞消息的順序性可通過將一組消息投遞到同一分區(qū)實現(xiàn)。例如:借助MessageQueueSelector將對相同訂單的操作消息投放到同一分區(qū)。

          • 消費消息的順序性:RoctetMQ特性保障:特定分區(qū)(queue)中的消息不能同時被同一個消費者組中的多個Consumer消費,以避免重復消費。通過自定義或使用預置的AllocateQueueStrategy可設定分區(qū)的分配策略(哪些分區(qū)分配給哪個消費者消費)。


          3. 高可用、消息可靠性


          3.1 消息持久化

          RocketMQ、Kafka 以文件記錄形式持久化。

          RocketMQ采用了單一的日志文件,即把同1個broker上面所有topic的所有queue的消息,存放在一個文件里面,從而避免了隨機的磁盤寫入。

          如上圖所示,所有消息都存在一個單一的CommitLog文件里面,然后有后臺線程異步的同步到ConsumeQueue,再由Consumer進行消費。


          TODO 同步、異步刷盤。

          TODO RocketMQ充分利用Linux文件系統(tǒng)內(nèi)存cache來提高性能。

          TODO CommitLog index Commitlog segment的大小與頁緩存一致

          RocketMQ消息存儲機制會在后面的文章詳細說明。


          3.2 broker master/salve


          TODO broker group master/salve

          TODO Async/Sync Master;


          4. 高并發(fā)、可擴展 ==> 分布式


          提高并發(fā)效率 => 提高生產(chǎn)、消費并行度=>提高分區(qū)數(shù)量

          RocketMQ、kafka都支持topic數(shù)據(jù)分區(qū)存放、動態(tài)擴展。

          以RocketMQ為例:

          topic創(chuàng)建的時候可以用集群模式去創(chuàng)建(這樣集群里面每個broker的queue的數(shù)量相同),也可以用單個broker模式去創(chuàng)建(這樣每個broker的queue數(shù)量可以不一致)。


          4.1 生產(chǎn)并行度

          RocketMQ的生產(chǎn)并行度是由其自身機制及broker的數(shù)量決定的。這塊后面的文章會詳細分析。


          4.2 消費并行度

          廣播模式下所有消費者會接受并消費當前topic下所有Queue的消息。
          集群模式下,一個queue只分配給一個consumer實例:這是由于拉取消息是consumer主動控制的,如果多個實例同時消費一個queue的消息,會導致同一個消息在不同的實例下被消費多次,所以算法上都是一個queue只分給一個consumer實例,一個consumer實例可以允許同時分到不同的queue。
          Kafka的消費并行度依賴Topic配置的分區(qū)數(shù),如分區(qū)數(shù)為10,那么最多10臺機器來并行消費(每臺機器只能開啟一個線程),或者一臺機器消費(10個線程并行消費)。即消費并行度和分區(qū)數(shù)一致。RocketMQ消費并行度分兩種情況:順序消費方式并行度同卡夫卡完全一致;亂序方式并行度取決于Consumer的線程數(shù),如Topic配置10個隊列,10臺機器消費,每臺機器100個線程,那么并行度為1000。


          4.3 消息隊列分配策略

          Producer使用MessageQueueSelector選擇將消息投放到哪個分區(qū) 使用AllocateMessageQueueStrategy將不同分區(qū)分配給Consumer Group中的不同Consumer。一個分區(qū)(queue)僅允許分配給同一個Consumer Group下的一個Consumer(防止重復消費)。


          MessageQueueSelector


          內(nèi)置實現(xiàn)類:SelectMessageQueueByMachineRoom SelectMessageQueueByHash SelectMessageQueueByRandom



          可以通過實現(xiàn)MessageQueueSelector接口,來自定義Producer投遞消息時選擇分區(qū)的算法。


          AllocateMessageQueueStrategy


          內(nèi)置實現(xiàn)類:

          AllocateMessageQueueAveragely:平均分配算法?

          AllocateMessageQueueAveragelyByCircle:基于環(huán)形平均分配算法

          AllocateMachineRoomNearby:基于機房臨近原則算法

          AllocateMessageQueueByMachineRoom:基于機房分配算法

          AllocateMessageQueueConsistentHash:基于一致性hash算法

          AllocateMessageQueueByConfig:基于配置分配算法

          可以通過實現(xiàn)AllocateMessageQueueStrategy來自定義queue 分配給特定Consumer Group下不同Consumer的策略。


          參考(排名不分先后)


          https://github.com/apache/rocketmq/blob/master/docs/cn/

          https://juejin.im/post/6844903589819875336

          https://jaskey.github.io/blog/2016/12/19/rocketmq-rebalance/

          http://objcoding.com/2019/09/13/kafka-partition-and-rmq-queue/

          http://www.itmuch.com/books/rocketmq


          源:https://juejin.im/post/6844904130822029320

          版權申明:內(nèi)容來源網(wǎng)絡,版權歸原創(chuàng)者所有。除非無法確認,我們都會標明作者及出處,如有侵權煩請告知,我們會立即刪除并表示歉意。謝謝!





          感謝閱讀



          瀏覽 49
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  伊人大香蕉综合 | 天天操天天摸天天碰 | 做爰 视频毛片下载蜜桃 | 国产小电影在线播放 | www.狠狠艹 |