<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          MQ之-RocketMq系列一

          共 4209字,需瀏覽 9分鐘

           ·

          2022-08-25 02:13

          官方定義:

          Apache RocketMQ is a distributed messaging and streaming platform with low latency,  high performance and reliability, trillion-level capacity and flexible  scalability

          核心組件

          • 生產(chǎn)者 : 消息的發(fā)送者,

          • 消費者 : 消息接收者

          • 生產(chǎn)者組 : 一類生產(chǎn)者

          • 消費者組 :一類消費者

          • NameServer :管理Broker、Topic

          • Broker :存儲和收發(fā)消息

          • Topic :一類消息的統(tǒng)稱

          核心流程

          1. 每個Broker與NameServer集群中的所有節(jié)點建立長連接,定時注冊Topic信息到所有NameServer

          2. Producer與NameServer集群中的其中一個節(jié)點(隨機(jī)選擇)建立長連接,定期從NameServer獲取Topic路由信息,并向提供Topic 服務(wù)的Master建立長連接,且定時向Master發(fā)送心跳。Producer完全無狀態(tài),可集群部署

          3. Consumer與NameServer集群中的其中一個節(jié)點(隨機(jī)選擇)建立長連接,定期從NameServer獲取Topic路由信息,并向提供Topic服務(wù)的Master、Slave建立長連接,且定時向Master、Slave發(fā)送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,消費者在向Master拉取消息時,Master服務(wù)器會根據(jù)拉取偏移量與最大偏移量的距離(判斷是否讀老消息,產(chǎn)生讀I/O),以及從服務(wù)器是否可讀等因素建議下一次是從Master還 是Slave拉取

          如圖:

          高級特性

          延遲消息

          是指消息發(fā)送到broker后,不會立即被消費,等待特定時間投遞給真正的topic。broker有配置項messageDelayLevel,默認(rèn)值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18個level。可以配置自定義messageDelayLevel。注意,messageDelayLevel是broker的屬性,不屬于某個topic。發(fā)消息時,設(shè)置delayLevel等級即可:msg.setDelayLevel(level)。level有以下三種情況:

          • level == 0,消息為非延遲消息

          • 1<=level<=maxLevel,消息延遲特定時間,例如level==1,延遲1s

          • level > maxLevel,則level== maxLevel,例如level==20,延遲2h

          定時消息會暫存在名為SCHEDULE_TOPIC_XXXX的topic中,并根據(jù)delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一個queue只存相同延遲的消息,保證具有相同發(fā)送延遲的消息能夠順序消費。broker會調(diào)度消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的topic。需要注意的是,定時消息會在第一次寫入和調(diào)度寫入真實topic時都會計數(shù),因此發(fā)送數(shù)量、tps都會變高。

          查看SCHEDULE_TOPIC_XXXX 主題信息:


          順序消息
          全局有序

          需要把topic 的隊列設(shè)置成一個,product 和 consumer 并發(fā)數(shù)也要是一個。

          部分有序

          要保證部分消息有序,需要發(fā)送端和消費端配合處理。在發(fā)送端,要做到把同一業(yè)務(wù)ID的消息發(fā)送到同一個Message Queue;在消費過程中,要做到從同一個Message Queue讀取的消息不被并發(fā)處理,這樣才能達(dá)到部分有序。消費端通過使用MessageListenerOrderly類來解決單Message Queue的消息被并發(fā)處理的問題


          事務(wù)消息

          消息存儲
          1. 提升消息寫入CommitLog 的速度至關(guān)重要,因為這個部分的性能提升會直接提升Broker處理消息寫入的吞吐量,比如你寫入一條消息到CommitLog 磁盤文件假設(shè)需要10ms,那么每個線程每秒可以處理100個寫入消息,假設(shè)有100個線程,每秒只能處理1萬個寫入消息請求。但是如果把寫入性能優(yōu)化為只需要1ms,那么每個線程每秒可以處理1000個消息寫入,此時100個線程每秒可以處理10萬個寫入消息的請求,Broker是基于OS操作系統(tǒng)的PageCache和順序?qū)憙蓚€機(jī)制,來提升寫入CommitLog文件的性能的。

          1. 消費者從OS Cache 讀取ConsumeQueue中的offset在從OS Cache 或 磁盤讀取消息,因為OS Cache放不下所有commitLog所以,如果你的消費者機(jī)器一直快速的拉取和消費,跟上了product寫入broker的消息速率,那么每次拉取幾乎都是最近剛寫入commitLog的數(shù)據(jù),幾乎都在OS Cache 里面。

          1. Topic 下的每個MessageQueue 都會有一系列的ConsumeQueue文件,ConsumeQueue 也是基于OS Cache,存儲的是一條消息對應(yīng)在commitLog文件中的offset偏移量,實際上在ConsumeQueue中存儲的每條數(shù)據(jù)不只是消息在CommitLog中的offset偏移量,還包含了消息的長度以及tag hashcode,一條數(shù)據(jù)是20個字節(jié),每個ConsumeQueue文件保存30萬條數(shù)據(jù)大概每個文件是5.72MB。

          如圖:



          刷盤機(jī)制

          RocketMQ 的所有消息都是持久化的,先寫入系統(tǒng) PageCache,然后刷盤,可以保證內(nèi)存與磁盤都有一份數(shù)據(jù), 訪問時,直接從內(nèi)存讀取。消息在通過Producer寫入RocketMQ的時候,有兩種寫磁盤方式,分布式同步刷盤和異步刷盤。

          同步刷盤

          同步刷盤與異步刷盤的唯一區(qū)別是異步刷盤寫完 PageCache直接返回,而同步刷盤需要等待刷盤完成才返回, 同步刷盤流程如下:

          (1). 寫入 PageCache后,線程等待,通知刷盤線程刷盤。

          (2). 刷盤線程刷盤后,喚醒前端等待線程,可能是一批線程。

          (3). 前端等待線程向用戶返回成功

          異步刷盤

          Product 發(fā)送消息給Broker,Broker將消息寫入OS PageCache中就返回ACK給Product,有丟消息的可能。

          零拷貝

          Netty 擴(kuò)展

          RocketMQ的RPC通信采用Netty組件作為底層通信庫,同樣也遵循了Reactor多線程模型,同時又在這之上做了一些擴(kuò)展和優(yōu)化。

          一個Reactor主線程負(fù)責(zé)監(jiān)聽TCP網(wǎng)絡(luò)連接請求,建立好連接,創(chuàng)建SocketChannel,并注冊到selector上。rocketMq會自動根據(jù)OS類型選擇NIO和Epoll,然后監(jiān)聽真正的網(wǎng)絡(luò)數(shù)據(jù)。拿到網(wǎng)絡(luò)數(shù)據(jù)后,再交給Reactor線程池,在真正執(zhí)行業(yè)務(wù)邏輯之前需要進(jìn)行SSL驗證、編解碼、空閑檢查、網(wǎng)絡(luò)連接管理,這些工作交給Worker線程池去做。處理業(yè)務(wù)的操作交給業(yè)務(wù)線程池執(zhí)行,根據(jù)RomotingComman的業(yè)務(wù)請求碼code去processorTable這個本地緩存變量中找到對應(yīng)的processor,然后封裝成task任務(wù)后,提交給對應(yīng)的業(yè)務(wù)processor處理線程池來執(zhí)行。


          源碼環(huán)境搭建

          1. 源碼拉取 https://github.com/apache/rocketmq 版本選擇的是4.5.1

          2. 導(dǎo)入idea

          3. 執(zhí)行安裝 mvn clean install -Dmaven.test.skip=true

          4. 在項目根目錄下創(chuàng)建文件夾conf和dataDir,從 distribution項目下 拷貝 broker.conf 和 logback_broker.xml 和logback_namesrv.xml 放入conf 目錄下。

          5. 啟動NameServer

          控制臺打印,說明 NameServer 啟動成功。

          The Name Server boot success. serializeType=JSON
          1. 啟動Broker 將conf文件夾下的broker.conf 文件修改配置如下

            brokerClusterName = DefaultCluster
            brokerName = broker-a
            brokerId = 0
            deleteWhen = 04
            fileReservedTime = 48
            brokerRole = ASYNC_MASTER
            flushDiskType = ASYNC_FLUSH
            # namesrvAddr地址
            namesrvAddr=127.0.0.1:9876
            # 啟用自動創(chuàng)建主題
            autoCreateTopicEnable=true
            # 存儲路徑
            storePathRootDir=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir
            # commitLog路徑
            storePathCommitLog=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\commitlog
            # 消息隊列存儲路徑
            storePathConsumeQueue=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\consumequeue
            # 消息索引存儲路徑
            storePathIndex=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\index
            # checkpoint文件路徑
            storeCheckpoint=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\checkpoint
            # abort文件存儲路徑
            abortFile=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\abort
          1. 配置 broker.conf 和 ROCKETMQ_HOME , 并啟動 BrokerStartup 。


          瀏覽 68
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  爆乳操逼| 青草免费视频 | 91最新地址 | 精品av国产日韩一区二区 | 蜜臀久久99精品久久宅男 |