MQ之-RocketMq系列一

官方定義:
Apache RocketMQ is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability
核心組件
生產(chǎn)者 : 消息的發(fā)送者,
消費者 : 消息接收者
生產(chǎn)者組 : 一類生產(chǎn)者
消費者組 :一類消費者
NameServer :管理Broker、Topic
Broker :存儲和收發(fā)消息
Topic :一類消息的統(tǒng)稱
核心流程
每個Broker與NameServer集群中的所有節(jié)點建立長連接,定時注冊Topic信息到所有NameServer
Producer與NameServer集群中的其中一個節(jié)點(隨機(jī)選擇)建立長連接,定期從NameServer獲取Topic路由信息,并向提供Topic 服務(wù)的Master建立長連接,且定時向Master發(fā)送心跳。Producer完全無狀態(tài),可集群部署
Consumer與NameServer集群中的其中一個節(jié)點(隨機(jī)選擇)建立長連接,定期從NameServer獲取Topic路由信息,并向提供Topic服務(wù)的Master、Slave建立長連接,且定時向Master、Slave發(fā)送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,消費者在向Master拉取消息時,Master服務(wù)器會根據(jù)拉取偏移量與最大偏移量的距離(判斷是否讀老消息,產(chǎn)生讀I/O),以及從服務(wù)器是否可讀等因素建議下一次是從Master還 是Slave拉取
如圖:

高級特性
延遲消息
是指消息發(fā)送到broker后,不會立即被消費,等待特定時間投遞給真正的topic。broker有配置項messageDelayLevel,默認(rèn)值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18個level。可以配置自定義messageDelayLevel。注意,messageDelayLevel是broker的屬性,不屬于某個topic。發(fā)消息時,設(shè)置delayLevel等級即可:msg.setDelayLevel(level)。level有以下三種情況:
level == 0,消息為非延遲消息
1<=level<=maxLevel,消息延遲特定時間,例如level==1,延遲1s
level > maxLevel,則level== maxLevel,例如level==20,延遲2h
定時消息會暫存在名為SCHEDULE_TOPIC_XXXX的topic中,并根據(jù)delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一個queue只存相同延遲的消息,保證具有相同發(fā)送延遲的消息能夠順序消費。broker會調(diào)度消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的topic。需要注意的是,定時消息會在第一次寫入和調(diào)度寫入真實topic時都會計數(shù),因此發(fā)送數(shù)量、tps都會變高。
查看SCHEDULE_TOPIC_XXXX 主題信息:

順序消息
全局有序
需要把topic 的隊列設(shè)置成一個,product 和 consumer 并發(fā)數(shù)也要是一個。
部分有序
要保證部分消息有序,需要發(fā)送端和消費端配合處理。在發(fā)送端,要做到把同一業(yè)務(wù)ID的消息發(fā)送到同一個Message Queue;在消費過程中,要做到從同一個Message Queue讀取的消息不被并發(fā)處理,這樣才能達(dá)到部分有序。消費端通過使用MessageListenerOrderly類來解決單Message Queue的消息被并發(fā)處理的問題
事務(wù)消息

消息存儲
提升消息寫入CommitLog 的速度至關(guān)重要,因為這個部分的性能提升會直接提升Broker處理消息寫入的吞吐量,比如你寫入一條消息到CommitLog 磁盤文件假設(shè)需要10ms,那么每個線程每秒可以處理100個寫入消息,假設(shè)有100個線程,每秒只能處理1萬個寫入消息請求。但是如果把寫入性能優(yōu)化為只需要1ms,那么每個線程每秒可以處理1000個消息寫入,此時100個線程每秒可以處理10萬個寫入消息的請求,Broker是基于OS操作系統(tǒng)的PageCache和順序?qū)憙蓚€機(jī)制,來提升寫入CommitLog文件的性能的。
消費者從OS Cache 讀取ConsumeQueue中的offset在從OS Cache 或 磁盤讀取消息,因為OS Cache放不下所有commitLog所以,如果你的消費者機(jī)器一直快速的拉取和消費,跟上了product寫入broker的消息速率,那么每次拉取幾乎都是最近剛寫入commitLog的數(shù)據(jù),幾乎都在OS Cache 里面。
Topic 下的每個MessageQueue 都會有一系列的ConsumeQueue文件,ConsumeQueue 也是基于OS Cache,存儲的是一條消息對應(yīng)在commitLog文件中的offset偏移量,實際上在ConsumeQueue中存儲的每條數(shù)據(jù)不只是消息在CommitLog中的offset偏移量,還包含了消息的長度以及tag hashcode,一條數(shù)據(jù)是20個字節(jié),每個ConsumeQueue文件保存30萬條數(shù)據(jù)大概每個文件是5.72MB。
如圖:

刷盤機(jī)制
RocketMQ 的所有消息都是持久化的,先寫入系統(tǒng) PageCache,然后刷盤,可以保證內(nèi)存與磁盤都有一份數(shù)據(jù), 訪問時,直接從內(nèi)存讀取。消息在通過Producer寫入RocketMQ的時候,有兩種寫磁盤方式,分布式同步刷盤和異步刷盤。
同步刷盤
同步刷盤與異步刷盤的唯一區(qū)別是異步刷盤寫完 PageCache直接返回,而同步刷盤需要等待刷盤完成才返回, 同步刷盤流程如下:
(1). 寫入 PageCache后,線程等待,通知刷盤線程刷盤。
(2). 刷盤線程刷盤后,喚醒前端等待線程,可能是一批線程。
(3). 前端等待線程向用戶返回成功
異步刷盤
Product 發(fā)送消息給Broker,Broker將消息寫入OS PageCache中就返回ACK給Product,有丟消息的可能。
零拷貝

Netty 擴(kuò)展
RocketMQ的RPC通信采用Netty組件作為底層通信庫,同樣也遵循了Reactor多線程模型,同時又在這之上做了一些擴(kuò)展和優(yōu)化。
一個Reactor主線程負(fù)責(zé)監(jiān)聽TCP網(wǎng)絡(luò)連接請求,建立好連接,創(chuàng)建SocketChannel,并注冊到selector上。rocketMq會自動根據(jù)OS類型選擇NIO和Epoll,然后監(jiān)聽真正的網(wǎng)絡(luò)數(shù)據(jù)。拿到網(wǎng)絡(luò)數(shù)據(jù)后,再交給Reactor線程池,在真正執(zhí)行業(yè)務(wù)邏輯之前需要進(jìn)行SSL驗證、編解碼、空閑檢查、網(wǎng)絡(luò)連接管理,這些工作交給Worker線程池去做。處理業(yè)務(wù)的操作交給業(yè)務(wù)線程池執(zhí)行,根據(jù)RomotingComman的業(yè)務(wù)請求碼code去processorTable這個本地緩存變量中找到對應(yīng)的processor,然后封裝成task任務(wù)后,提交給對應(yīng)的業(yè)務(wù)processor處理線程池來執(zhí)行。

源碼環(huán)境搭建
源碼拉取 https://github.com/apache/rocketmq 版本選擇的是4.5.1
導(dǎo)入idea
執(zhí)行安裝 mvn clean install -Dmaven.test.skip=true
在項目根目錄下創(chuàng)建文件夾conf和dataDir,從 distribution項目下 拷貝 broker.conf 和 logback_broker.xml 和logback_namesrv.xml 放入conf 目錄下。
啟動NameServer

控制臺打印,說明 NameServer 啟動成功。
The Name Server boot success. serializeType=JSON啟動Broker 將conf文件夾下的broker.conf 文件修改配置如下
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# namesrvAddr地址
namesrvAddr=127.0.0.1:9876
# 啟用自動創(chuàng)建主題
autoCreateTopicEnable=true
# 存儲路徑
storePathRootDir=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir
# commitLog路徑
storePathCommitLog=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\commitlog
# 消息隊列存儲路徑
storePathConsumeQueue=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\consumequeue
# 消息索引存儲路徑
storePathIndex=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\index
# checkpoint文件路徑
storeCheckpoint=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\checkpoint
# abort文件存儲路徑
abortFile=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\abort
配置 broker.conf 和 ROCKETMQ_HOME , 并啟動 BrokerStartup 。

