深入解析分布式消息隊列設計精髓
點擊上方“服務端思維”,選擇“設為星標”
回復”669“獲取獨家整理的精選資料集
回復”加群“加入全國服務端高端社群「后端圈」
作者:vincentchma,騰訊 IEG 后臺開發(fā)工程師
一、消息隊列的演進
分布式消息隊列中間件是是大型分布式系統(tǒng)中常見的中間件。消息隊列主要解決應用耦合、異步消息、流量削鋒等問題,具有高性能、高可用、可伸縮和最終一致性等特點。消息隊列已經(jīng)逐漸成為企業(yè)應用系統(tǒng)內(nèi)部通信的核心手段,使用較多的消息隊列有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、Pulsar 等,此外,利用數(shù)據(jù)庫(如 Redis、MySQL 等)也可實現(xiàn)消息隊列的部分基本功能。
1.基于 OS 的 MQ
單機消息隊列可以通過操作系統(tǒng)原生的進程間通信機制來實現(xiàn),如消息隊列、共享內(nèi)存等。比如我們可以在共享內(nèi)存中維護一個雙端隊列:

消息產(chǎn)出進程不停地往隊列里添加消息,同時消息消費進程不斷地從隊尾有序地取出這些消息。添加消息的任務我們稱為 producer,而取出并使用消息的任務,我們稱之為 consumer。這種模式在早期單機多進程模式中比較常見, 比如 IO 進程把收到的網(wǎng)絡請求存入本機 MQ,任務處理進程從本機 MQ 中讀取任務并進行處理。
單機 MQ 易于實現(xiàn),但是缺點也很明顯:因為依賴于單機 OS 的 IPC 機制,所以無法實現(xiàn)分布式的消息傳遞,并且消息隊列的容量也受限于單機資源。
2.基于 DB 的 MQ
即使用存儲組件(如 Mysql 、 Redis 等)存儲消息, 然后在消息的生產(chǎn)側和消費側實現(xiàn)消息的生產(chǎn)消費邏輯,從而實現(xiàn) MQ 功能。以 Redis 為例, 可以使用 Redis 自帶的 list 實現(xiàn)。Redis list 使用 lpush 命令,從隊列左邊插入數(shù)據(jù);使用 rpop 命令,從隊列右邊取出數(shù)據(jù)。與單機 MQ 相比, 該方案至少滿足了分布式, 但是仍然帶有很多無法接受的缺陷。
熱 key 性能問題:不論是用 codis 還是 twemproxy 這種集群方案,對某個隊列的讀寫請求最終都會落到同一臺 redis 實例上,并且無法通過擴容來解決問題。如果對某個 list 的并發(fā)讀寫非常高,就產(chǎn)生了無法解決的熱 key,嚴重可能導致系統(tǒng)崩潰 沒有消費確認機制:每當執(zhí)行 rpop 消費一條數(shù)據(jù),那條消息就被從 list 中永久刪除了。如果消費者消費失敗,這條消息也沒法找回了。 不支持多訂閱者:一條消息只能被一個消費者消費,rpop 之后就沒了。如果隊列中存儲的是應用的日志,對于同一條消息,監(jiān)控系統(tǒng)需要消費它來進行可能的報警,BI 系統(tǒng)需要消費它來繪制報表,鏈路追蹤需要消費它來繪制調(diào)用關系……這種場景 redis list 就沒辦法支持了 不支持二次消費:一條消息 rpop 之后就沒了。如果消費者程序運行到一半發(fā)現(xiàn)代碼有 bug,修復之后想從頭再消費一次就不行了。
針對上述缺點,redis 5.0 開始引入 stream 數(shù)據(jù)類型,它是專門設計成為消息隊列的數(shù)據(jù)結構,借鑒了很多 kafka 的設計,但是隨著很多分布式 MQ 組件的出現(xiàn),仍然顯得不夠友好, 畢竟 Redis 天生就不是用來做消息轉發(fā)的。
3. 專用分布式 MQ 中間件
隨著時代的發(fā)展,一個真正的消息隊列,已經(jīng)不僅僅是一個隊列那么簡單了,業(yè)務對 MQ 的吞吐量、擴展性、穩(wěn)定性、可靠性等都提出了嚴苛的要求。因此,專用的分布式消息中間件開始大量出現(xiàn)。常見的有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、Pulsar 等等。
二、消息隊列設計要點
消息隊列本質(zhì)上是一個消息的轉發(fā)系統(tǒng), 把一次 RPC 就可以直接完成的消息投遞,轉換成多次 RPC 間接完成,這其中包含兩個關鍵環(huán)節(jié):
1.消息轉儲;
2.消息投遞:時機和對象;
基于此,消息隊列的整體設計思路是:
確定整體的數(shù)據(jù)流向:如 producer 發(fā)送給 MQ,MQ 轉發(fā)給 consumer,consumer 回復消費確認,消息刪除、消息備份等。 利用 RPC 將數(shù)據(jù)流串起來,最好基于現(xiàn)有的 RPC 框架,盡量做到無狀態(tài),方便水平擴展。 存儲選型,綜合考慮性能、可靠性和開發(fā)維護成本等諸多因素。 消息投遞,消費模式 push、pull。 消費關系維護,單播、多播等,可以利用 zk、config server 等保存消費關系。 高級特性,如可靠投遞,重復消息,順序消息等, 很多高級特性之間是相互制約的關系,這里要充分結合應用場景做出取舍。

1.MQ 基本特性
RPC 通信
MQ 組件要實現(xiàn)和生產(chǎn)者以及消費者進行通信功能, 這里涉及到 RPC 通信問題。消息隊列的 RPC,和普通的 RPC 沒有本質(zhì)區(qū)別。對于負載均衡、服務發(fā)現(xiàn)、序列化協(xié)議等等問題都可以借助現(xiàn)有 RPC 框架來實現(xiàn),避免重復造輪子。
存儲系統(tǒng)
存儲可以做成很多方式。比如存儲在內(nèi)存里,存儲在分布式 KV 里,存儲在磁盤里,存儲在數(shù)據(jù)庫里等等。但歸結起來,主要有持久化和非持久化兩種。
持久化的形式能更大程度地保證消息的可靠性(如斷電等不可抗外力),并且理論上能承載更大限度的消息堆積(外存的空間遠大于內(nèi)存)。但并不是每種消息都需要持久化存儲。很多消息對于投遞性能的要求大于可靠性的要求,且數(shù)量極大(如日志)。這時候,消息不落地直接暫存內(nèi)存,嘗試幾次 failover,最終投遞出去也未嘗不可。常見的消息隊列普遍兩種形式都支持。
從速度來看,理論上,文件系統(tǒng)>分布式 KV(持久化)>分布式文件系統(tǒng)>數(shù)據(jù)庫,而可靠性卻相反。還是要從支持的業(yè)務場景出發(fā)作出最合理的選擇。
高可用
MQ 的高可用,依賴于 RPC 和存儲的高可用。通常 RPC 服務自身都具有服務自動發(fā)現(xiàn),負載均衡等功能,保證了其高可用。存儲的高可用, 例如 Kafka,使用分區(qū)加主備模式,保證每一個分區(qū)內(nèi)的高可用性,也就是每一個分區(qū)至少要有一個備份且需要做數(shù)據(jù)的同步。
推拉模型
push 和 pull 模型各有利弊,兩種模式也都有被市面上成熟的消息中間件選用。
1.慢消費
慢消費是 push 模型最大的致命傷,如果消費者的速度比發(fā)送者的速度慢很多,會出現(xiàn)兩種惡劣的情況:
1.消息在 broker 的堆積。假設這些消息都是有用的無法丟棄的,消息就要一直在 broker 端保存。
2.broker 推送給 consumer 的消息 consumer 無法處理,此時 consumer 只能拒絕或者返回錯誤。
而 pull 模式下,consumer 可以按需消費,不用擔心自己處理不了的消息來騷擾自己,而 broker 堆積消息也會相對簡單,無需記錄每一個要發(fā)送消息的狀態(tài),只需要維護所有消息的隊列和偏移量就可以了。所以對于慢消費,消息量有限且到來的速度不均勻的情況,pull 模式比較合適。
2.消息延遲與忙等
這是 pull 模式最大的短板。由于主動權在消費方,消費方無法準確地決定何時去拉取最新的消息。如果一次 pull 取到消息了還可以繼續(xù)去 pull,如果沒有 pull 取到則需要等待一段時間重新 pull。
消息投放時機
即消費者應該在什么時機消費消息。一般有以下三種方式:
攢夠了一定數(shù)量才投放。 到達了一定時間就投放。 有新的數(shù)據(jù)到來就投放。
至于如何選擇,也要結合具體的業(yè)務場景來決定。比如,對及時性要求高的數(shù)據(jù),可用采用方式 3 來完成。
消息投放對象
不管是 JMS 規(guī)范中的 Topic/Queue,Kafka 里面的 Topic/Partition/ConsumerGroup,還是 AMQP(如 RabbitMQ)的 Exchange 等等, 都是為了維護消息的消費關系而抽象出來的概念。本質(zhì)上,消息的消費無外乎點到點的一對一單播,或一對多廣播。另外比較特殊的情況是組間廣播、組內(nèi)單播。比較通用的設計是,不同的組注冊不同的訂閱,支持組間廣播。組內(nèi)不同的機器,如果注冊一個相同的 ID,則單播;如果注冊不同的 ID(如 IP 地址+端口),則廣播。
例如 pulsar 支持的訂閱模型有:
Exclusive:獨占型,一個訂閱只能有一個消息者消費消息。 Failover:災備型,一個訂閱同時只有一個消費者,可以有多個備份消費者。一旦主消費者故障則備份消費者接管。不會出現(xiàn)同時有兩個活躍的消費者。 Shared:共享型,一個訂閱中同時可以有多個消費者,多個消費者共享 Topic 中的消息。 Key_Shared:鍵共享型,多個消費者各取一部分消息。
通常會在公共存儲上維護廣播關系,如 config server、zookeeper 等。
2.隊列高級特性
常見的高級特性有可靠投遞、消息丟失、消息重復、事務等等,他們并非是 MQ 必備的特性。由于這些特性可能是相互制約的,所以不可能完全兼顧。所以要依照業(yè)務的需求,來仔細衡量各種特性實現(xiàn)的成本、利弊,最終做出最為合理的設計。
可靠投遞
如何保證消息完全不丟失?
直觀的方案是,在任何不可靠操作之前,先將消息落地,然后操作。當失敗或者不知道結果(比如超時)時,消息狀態(tài)是待發(fā)送,定時任務不停輪詢所有待發(fā)送消息,最終一定可以送達。但是,這樣必然導致消息可能會重復,并且在異常情況下,消息延遲較大。
例如:
producer 往 broker 發(fā)送消息之前,需要做一次落地。 請求到 server 后,server 確保數(shù)據(jù)落地后再告訴客戶端發(fā)送成功。 支持廣播的消息隊列需要對每個接收者,持久化一個發(fā)送狀態(tài),直到所有接收者都確認收到,才可刪除消息。
即對于任何不能確認消息已送達的情況,都要重推消息。但是,隨著而來的問題就是消息重復。在消息重復和消息丟失之間,無法兼顧,要結合應用場景做出取舍。
消費確認
當 broker 把消息投遞給消費者后,消費者可以立即確認收到了消息。但是,有些情況消費者可能需要再次接收該消息(比如收到消息、但是處理失敗),即消費者主動要求重發(fā)消息。所以,要允許消費者主動進行消費確認。
順序消息
對于 push 模式,要求支持分區(qū)且單分區(qū)只支持一個消費者消費,并且消費者只有確認一個消息消費后才能 push 另外一個消息,還要發(fā)送者保證發(fā)送順序唯一。
對于 pull 模式,比如 kafka 的做法:
producer 對應 partition,并且單線程。 consumer 對應 partition,消費確認(或批量確認),單線程消費。
但是這樣也只是實現(xiàn)了消息的分區(qū)有序性,并不一定全局有序。總體而言,要求消息有序的 MQ 場景還是比較少的。
三、Kafka
Kafka 是一個分布式發(fā)布訂閱消息系統(tǒng)。它以高吞吐、可持久化、可水平擴展、支持流數(shù)據(jù)處理等多種特性而被廣泛使用(如 Storm、Spark、Flink)。在大數(shù)據(jù)系統(tǒng)中,數(shù)據(jù)需要在各個子系統(tǒng)中高性能、低延遲的不停流轉。傳統(tǒng)的企業(yè)消息系統(tǒng)并不是非常適合大規(guī)模的數(shù)據(jù)處理,但 Kafka 出現(xiàn)了,它可以高效的處理實時消息和離線消息,降低編程復雜度,使得各個子系統(tǒng)可以快速高效的進行數(shù)據(jù)流轉,Kafka 承擔高速數(shù)據(jù)總線的作用。
kafka 基礎概念
BrokerKafka 集群包含一個或多個服務器,這種服務器被稱為 broker。 TopicTopic 在邏輯上可以被認為是一個 queue,每條消費都必須指定它的 Topic,可以簡單理解為必須指明把這條消息放進哪個 queue 里。為了使得 Kafka 的吞吐率可以線性提高,物理上把 Topic 分成一個或多個 Partition,每個 Partition 在物理上對應一個文件夾,該文件夾下存儲這個 Partition 的所有消息和索引文件。 PartitionParition 是物理上的概念,每個 Topic 包含一個或多個 Partition。 Producer負責發(fā)布消息到 Kafka broker。 Consumer消息消費者,向 Kafka broker 讀取消息的客戶端。 Consumer Group每個 Consumer 屬于一個特定的 Consumer Group(可為每個 Consumer 指定 group name,若不指定 group name 則屬于默認的 group)。

一個典型的 kafka 集群包含若干 Producer,若干個 Broker(kafka 支持水平擴展)、若干個 Consumer Group,以及一個 zookeeper 集群。Producer 使用 push 模式將消息發(fā)布到 broker。consumer 使用 pull 模式從 broker 訂閱并消費消息。多個 broker 協(xié)同工作,producer 和 consumer 部署在各個業(yè)務邏輯中。kafka 通過 zookeeper 管理集群配置及服務協(xié)同。
這樣就組成了一個高性能的分布式消息發(fā)布和訂閱系統(tǒng)。Kafka 有一個細節(jié)是和其他 mq 中間件不同的點,producer 發(fā)送消息到 broker 的過程是 push,而 consumer 從 broker 消費消息的過程是 pull,主動去拉數(shù)據(jù)。而不是 broker 把數(shù)據(jù)主動發(fā)送給 consumer。
Producer 發(fā)送消息到 broker 時,會根據(jù) Paritition 機制選擇將其存儲到哪一個 Partition。如果 Partition 機制設置合理,所有消息可以均勻分布到不同的 Partition 里,這樣就實現(xiàn)了負載均衡。如果一個 Topic 對應一個文件,那這個文件所在的機器 I/O 將會成為這個 Topic 的性能瓶頸,而有了 Partition 后,不同的消息可以并行寫入不同 broker 的不同 Partition 里,極大的提高了吞吐率。
Kafka 特點
優(yōu)點:
高性能:單機測試能達到 100w tps 低延時:生產(chǎn)和消費的延時都很低,e2e 的延時在正常的 cluster 中也很低 可用性高:replicate+ isr + 選舉 機制保證 工具鏈成熟:監(jiān)控 運維 管理 方案齊全 生態(tài)成熟:大數(shù)據(jù)場景必不可少 kafka stream
不足:
無法彈性擴容:對 partition 的讀寫都在 partition leader 所在的 broker,如果該 broker 壓力過大,也無法通過新增 broker 來解決問題 擴容成本高:集群中新增的 broker 只會處理新 topic,如果要分擔老 topic-partition 的壓力,需要手動遷移 partition,這時會占用大量集群帶寬 消費者新加入和退出會造成整個消費組 rebalance:導致數(shù)據(jù)重復消費,影響消費速度,增加延遲 partition 過多會使得性能顯著下降:ZK 壓力大,broker 上 partition 過多讓磁盤順序寫幾乎退化成隨機寫
高吞吐機制
順序存取
如果把消息以隨機的方式寫入到磁盤,那么磁盤首先要做的就是尋址,也就是定位到數(shù)據(jù)所在的物理地址,在磁盤上就要找到對應柱面、磁頭以及對應的扇區(qū);這個過程相對內(nèi)存來說會消耗大量時間,為了規(guī)避隨機讀寫帶來的時間消耗,kafka 采用順序寫的方式存儲數(shù)據(jù)。
頁緩存
即使是順序存取,但是頻繁的 I/O 操作仍然會造成磁盤的性能瓶頸,所以 kafka 使用了頁緩存和零拷貝技術。當進程準備讀取磁盤上的文件內(nèi)容時, 操作系統(tǒng)會先查看待讀取的數(shù)據(jù)是否在頁緩存中,如果存在則直接返回數(shù)據(jù), 從而避免了對物理磁盤的 I/O 操作;
如果沒有命中, 則操作系統(tǒng)會向磁盤發(fā)起讀取請求并將讀取的數(shù)據(jù)頁存入頁緩存, 之后再將數(shù)據(jù)返回給進程。一個進程需要將數(shù)據(jù)寫入磁盤, 那么操作系統(tǒng)也會檢測數(shù)據(jù)對應的頁是否在頁緩存中,如果不存在, 則會先在頁緩存中添加相應的頁, 最后將數(shù)據(jù)寫入對應的頁。被修改過后的頁也就變成了臟頁, 操作系統(tǒng)會在合適的時間把臟頁中的數(shù)據(jù)寫入磁盤, 以保持數(shù)據(jù)的 一 致性。
Kafka 中大量使用了頁緩存, 這是 Kafka 實現(xiàn)高吞吐的重要因素之 一 。雖然消息都是先被寫入頁緩存,然后由操作系統(tǒng)負責具體的刷盤任務的, 但在 Kafka 中同樣提供了同步刷盤及間斷性強制刷盤(fsync),可以通過參數(shù)來控制。
同步刷盤能夠保證消息的可靠性,避免因為宕機導致頁緩存數(shù)據(jù)還未完成同步時造成的數(shù)據(jù)丟失。但是實際使用上,我們沒必要去考慮這樣的因素以及這種問題帶來的損失,消息可靠性可以由多副本來解決,同步刷盤會帶來性能的影響。
頁緩存的好處:
I/O Scheduler 會將連續(xù)的小塊寫組裝成大塊的物理寫從而提高性能; I/O Scheduler 會嘗試將一些寫操作重新按順序排好,從而減少磁頭移動時間; 充分利用所有空閑內(nèi)存(非 JVM 內(nèi)存); 讀操作可以直接在 Page Cache 內(nèi)進行,如果消費和生產(chǎn)速度相當,甚至不需要通過物理磁盤交換數(shù)據(jù); 如果進程重啟,JVM 內(nèi)的 Cache 會失效,但 Page Cache 仍然可用。
零拷貝
零拷貝技術可以減少 CPU 的上下文切換和數(shù)據(jù)拷貝次數(shù)。
常規(guī)方式

應用程序一次常規(guī)的數(shù)據(jù)請求過程,發(fā)生了 4 次拷貝,2 次 DMA 和 2 次 CPU,而 CPU 發(fā)生了 4 次的切換。(DMA 簡單理解就是,在進行 I/O 設備和內(nèi)存的數(shù)據(jù)傳輸?shù)臅r候,數(shù)據(jù)搬運的工作全部交給 DMA 控制器,而 CPU 不再參與任何與數(shù)據(jù)搬運相關的事情)
零拷貝的方式

通過零拷貝優(yōu)化,CPU 只發(fā)生了 2 次的上下文切換和 3 次數(shù)據(jù)拷貝。
批量發(fā)送
Kafka 允許進行批量發(fā)送消息,先將消息緩存在內(nèi)存中,然后一次請求批量發(fā)送出去,這種策略將大大減少服務端的 I/O 次數(shù)。
數(shù)據(jù)壓縮
Kafka 還支持對消息集合進行壓縮,Producer 可以通過 GZIP 或 Snappy 格式對消息集合進行壓縮,Producer 壓縮之后,在 Consumer 需進行解壓,雖然增加了 CPU 的工作,但在對大數(shù)據(jù)處理上,瓶頸在網(wǎng)絡上而不是 CPU,所以這個成本很值得。
高可用機制
副本
Producer 在發(fā)布消息到某個 Partition 時,先通過 ZooKeeper 找到該 Partition 的 Leader,然后無論該 Topic 的 Replication Factor 為多少,Producer 只將該消息發(fā)送到該 Partition 的 Leader。Leader 會將該消息寫入其本地 Log。
每個 Follower 都從 Leader pull 數(shù)據(jù)。這種方式上,F(xiàn)ollower 存儲的數(shù)據(jù)順序與 Leader 保持一致。Follower 在收到該消息后,向 Leader 發(fā)送 ACK, 并把消息寫入其 Log。一旦 Leader 收到了 ISR 中的所有 Replica 的 ACK,該消息就被認為已經(jīng) commit 了,Leader 將增加 HW 并且向 Producer 發(fā)送 ACK。
為了提高性能,每個 Follower 在接收到數(shù)據(jù)后就立馬向 Leader 發(fā)送 ACK,而非等到數(shù)據(jù)寫入 Log 中。因此,對于已經(jīng) commit 的消息,Kafka 只能保證它被存于多個 Replica 的內(nèi)存中,而不能保證它們被持久化到磁盤中,也就不能完全保證異常發(fā)生后該條消息一定能被 Consumer 消費。Consumer 讀消息也是從 Leader 讀取,只有被 commit 過的消息才會暴露給 Consumer。Kafka Replication 的數(shù)據(jù)流如下圖所示:

對于 Kafka 而言,定義一個 Broker 是否“活著”包含兩個條件:
一是它必須維護與 ZooKeeper 的 session(這個通過 ZooKeeper 的 Heartbeat 機制來實現(xiàn))。 二是 Follower 必須能夠及時將 Leader 的消息復制過來,不能“落后太多”。
Leader 會跟蹤與其保持同步的 Replica 列表,該列表稱為 ISR(即 in-sync Replica)。如果一個 Follower 宕機,或者落后太多,Leader 將把它從 ISR 中移除。這里所描述的“落后太多”指 Follower 復制的消息落后于 Leader 后的條數(shù)超過預定值或者 Follower 超過一定時間未向 Leader 發(fā)送 fetch 請求。Kafka 的復制機制既不是完全的同步復制,也不是單純的異步復制。
完全同步復制要求所有能工作的 Follower 都復制完,這條消息才會被認為 commit,這種復制方式極大的影響了吞吐率(高吞吐率是 Kafka 非常重要的一個特性)。異步復制方式下,F(xiàn)ollower 異步的從 Leader 復制數(shù)據(jù),數(shù)據(jù)只要被 Leader 寫入 log 就被認為已經(jīng) commit,這種情況下如果 Follower 都復制完都落后于 Leader,而如果 Leader 突然宕機,則會丟失數(shù)據(jù)。而 Kafka 的這種使用 ISR 的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率。Follower 可以批量的從 Leader 復制數(shù)據(jù),這樣極大的提高復制性能(批量寫磁盤),極大減少了 Follower 與 Leader 的差距。
一條消息只有被 ISR 里的所有 Follower 都從 Leader 復制過去才會被認為已提交。這樣就避免了部分數(shù)據(jù)被寫進了 Leader,還沒來得及被任何 Follower 復制就宕機了,而造成數(shù)據(jù)丟失(Consumer 無法消費這些數(shù)據(jù))。而對于 Producer 而言,它可以選擇是否等待消息 commit。這種機制確保了只要 ISR 有一個或以上的 Follower,一條被 commit 的消息就不會丟失。
故障恢復
Leader 故障
leader 發(fā)生故障后,會從 ISR 中選出一個新的 leader,之后,為保證多個副本之間的數(shù)據(jù)一致性,其余的 follower 會先將各自的 log 文件高于 HW 的部分截掉,然后從新的 leader 同步數(shù)據(jù)。注意:這只能保證副本之間的數(shù)據(jù)一致性,并不能保證數(shù)據(jù)不丟失或者不重復。
Kafka 在 ZooKeeper 中動態(tài)維護了一個 ISR(in-sync replicas),這個 ISR 里的所有 Replica 都跟上了 leader,只有 ISR 里的成員才有被選為 Leader 的可能。在這種模式下,對于 f+1 個 Replica,一個 Partition 能在保證不丟失已經(jīng) commit 的消息的前提下容忍 f 個 Replica 的失敗。

LEO:每個副本最大的 offset。
HW:消費者能見到的最大的 offset,ISR 隊列中最小的 LEO。
Follower 故障
follower 發(fā)生故障后會被臨時踢出 ISR 集合,待該 follower 恢復后,follower 會 讀取本地磁盤記錄的上次的 HW,并將 log 文件高于 HW 的部分截取掉,從 HW 開始向 leader 進行同步數(shù)據(jù)操作。等該 follower 的 LEO 大于等于該 partition 的 HW,即 follower 追上 leader 后,就可以重新加入 ISR 了。
擴展性
由于 Broker 存儲著特定分區(qū)的數(shù)據(jù), 因此,不管是 Broker 還是分區(qū)的擴縮容,都是比較復雜的,屬于典型的“有狀態(tài)服務”擴縮容問題。接下來,我們看一下 Pulsar 是怎么針對 kafka 的不足進行優(yōu)化的。
四、Pulsar
Apache Pulsar 是 Apache 軟件基金會頂級項目,是下一代云原生分布式消息流平臺,集消息、存儲、輕量化函數(shù)式計算為一體。采用計算與存儲分離架構設計,支持多租戶、持久化存儲、多機房跨區(qū)域數(shù)據(jù)復制,具有強一致性、高吞吐、低延時及高可擴展性等流數(shù)據(jù)存儲特性。在消息領域,Pulsar 是第一個將存儲計算分離云原生架構落地的開源項目。
服務和存儲分離

在 kafka 的基礎上,把數(shù)據(jù)存儲功能從 Broker 中分離出來,Broker 僅面向生產(chǎn)者、消費者提供數(shù)據(jù)讀寫能力,但其自身并不存儲數(shù)據(jù)。而在 Broker 層下面使用 Bookie 作為存儲層,承擔具體的數(shù)據(jù)存儲職責。在 Pulsar 中,broker 的含義和 kafka 中的 broker 是一致的,就是一個運行的 Pulsar 實例, 提供多個分區(qū)的讀寫服務。由于 broker 層不在承擔數(shù)據(jù)存儲職責,使得 Broker 層成為無狀態(tài)服務。這樣一來,Broker 的擴縮容就變得非常簡單。
相比之下,服務存儲集于一體的 Kafka 就非常難以擴容。
Broker 和 Bookie 相互獨立,方便實現(xiàn)獨立的擴展以及獨立的容錯 Broker 無狀態(tài),便于快速上、下線,更加適合于云原生場景 分區(qū)存儲不受限于單個節(jié)點存儲容量 Bookie 數(shù)據(jù)分布均勻

分片存儲

1.在 Kafka 分區(qū)(Partition)概念的基礎上,按照時間或大小,把分區(qū)切分成分片(Segment)。
2.同一個分區(qū)的分片,分散存儲在集群中所有的 Bookie 節(jié)點上。
3.同一個分片,擁有多個副本,副本數(shù)量可以指定,存儲于不同的 Bookie 節(jié)點。
Pulsar 性能
和 Kafka 一樣,Pulsar 也使用了順序讀寫和零拷貝等技術來提高系統(tǒng)的性能。
此外,Pulsar 還設計了分層緩存機制,在服務層和存儲層都做了分層緩存,來提高性能。
生產(chǎn)者發(fā)送消息時,調(diào)用 Bookie 層寫入消息時,同時將消息寫入 broker 緩存中。 實時消費時(追尾讀),首先從 broker 緩存中讀取數(shù)據(jù),避免從持久層 bookie 中讀取,從而降低投遞延遲。 讀取歷史消息(追趕讀)場景中,bookie 會將磁盤消息讀入 bookie 讀緩存中,從而避免每次都讀取磁盤數(shù)據(jù),降低讀取延時。

Pulsar 擴展性
分片存儲解決了分區(qū)容量受單節(jié)點存儲空間限制的問題,當容量不夠時,可以通過擴容 Bookie 節(jié)點的方式支撐更多的分區(qū)數(shù)據(jù),也解決了分區(qū)數(shù)據(jù)傾斜問題,數(shù)據(jù)可以均勻的分配在 Bookie 節(jié)點上。
Broker 和 Bookie 靈活的容錯以及無縫的擴容能力讓 Apache Pulsar 具備非常高的可用性,實現(xiàn)了無限制的分區(qū)存儲。

Broker 擴展
在 Pulsar 中 Broker 是無狀態(tài)的,可以通過增加節(jié)點的方式實現(xiàn)快速擴容。當需要支持更多的消費者或生產(chǎn)者時,可以簡單地添加更多的 Broker 節(jié)點來滿足業(yè)務需求。Pulsar 支持自動的分區(qū)負載均衡,在 Broker 節(jié)點的資源使用率達到閾值時,會將負載遷移到負載較低的 Broker 節(jié)點。新增 Broker 節(jié)點時,分區(qū)也將在 Brokers 中做平衡遷移,一些分區(qū)的所有權會轉移到新的 Broker 節(jié)點。
Bookie 擴展
存儲層的擴容,通過增加 Bookie 節(jié)點來實現(xiàn)。通過資源感知和數(shù)據(jù)放置策略,流量將自動切換到新的 Apache Bookie 中,整個過程不會涉及到不必要的數(shù)據(jù)搬遷。即擴容時,不會將舊數(shù)據(jù)從現(xiàn)有存儲節(jié)點重新復制到新存儲節(jié)點。

如圖所示,起始狀態(tài)有四個存儲節(jié)點,Bookie1, Bookie2, Bookie3, Bookie4,以 Topic1-Part2 為例,當這個分區(qū)的最新的存儲分片是 SegmentX 時,對存儲層擴容,添加了新的 Bookie 節(jié)點,BookieX,BookieY,那么當存儲分片滾動之后,新生成的存儲分片, SegmentX+1,SegmentX+2,會優(yōu)先選擇新的 Bookie 節(jié)點(BookieX,BookieY)來保存數(shù)據(jù)。
Pulsar 可用性
Broker 容錯
如下圖,假設當存儲分片滾動到 SegmentX 時,Broker2 節(jié)點失敗。此時生產(chǎn)者和消費者向其他的 Broker 發(fā)起請求,這個過程會觸發(fā)分區(qū)的所有權轉移,即將 Broker2 擁有的分區(qū) Topic1-Part2 的所有權轉移到其他的 Broker(Broker3)。
由于數(shù)據(jù)存儲和數(shù)據(jù)服務分離,所以新 Broker 接管分區(qū)的所有權時,它不需要復制 Partiton 的數(shù)據(jù)。新的分區(qū) Owner(Broker3)會產(chǎn)生一個新的分片 SegmentX+1, 如果有新數(shù)據(jù)到來,會存儲在新的分片 Segment x+1 上,不會影響分區(qū)的可用性。
即當某個 Broker 實例故障時,整個集群的消息存儲能力仍然完好。此時,集群只是喪失了特定分區(qū)的消息服務,只需要把這些分區(qū)的服務權限分配給其他 Broker 即可。
注意,和 Kafka 一樣, Pulsar 的一個分區(qū)仍然只能由一個 Broker 提供服務,否則無法保證消息的分區(qū)有序性。

Bookie 容錯
如下圖,假設 Bookie 2 上的 Segment 4 損壞。Bookie Auditor 會檢測到這個錯誤并進行復制修復。Bookie 中的副本修復是 Segment 級別的多對多快速修復,BookKeeper 可以從 Bookie 3 和 Bookie 4 讀取 Segment 4 中的消息,并在 Bookie 1 處修復 Segment 4。如果是 Bookie 節(jié)點故障,這個 Bookie 節(jié)點上所有的 Segment 會按照上述方式復制到其他的 Bookie 節(jié)點。
所有的副本修復都在后臺進行,對 Broker 和應用透明,Broker 會產(chǎn)生新的 Segment 來處理寫入請求,不會影響分區(qū)的可用性。

Pulsar 其他特性
基于上述的設計特點,Pulsar 提供了很多特性。
讀寫分離
Pulsar 另外一個有吸引力的特性是提供了讀寫分離的能力,讀寫分離保證了在有大量滯后消費(磁盤 IO 會增加)時,不會影響服務的正常運行,尤其是不會影響到數(shù)據(jù)的寫入。讀寫分離的能力由 Bookie 提供,簡單說一下 Bookie 存儲涉及到的概念:
Journals:Journal 文件包含了 Bookie 事務日志,在 Ledger (可以認為是分片的一部分) 更新之前,Journal 保證描述更新的事務寫入到 Non-volatile 的存儲介質(zhì)上; Entry logger:Entry 日志文件管理寫入的 Entry,來自不同 ledger 的 entry 會被聚合然后順序寫入; Index files:每個 Ledger 都有一個對應的索引文件,記錄數(shù)據(jù)在 Entry 日志文件中的 Offset 信息。
Entry 的讀寫入過程下圖所示,數(shù)據(jù)的寫入流程:
數(shù)據(jù)首先會寫入 Journal,寫入 Journal 的數(shù)據(jù)會實時落到磁盤; 然后,數(shù)據(jù)寫入到 Memtable ,Memtable 是讀寫緩存; 寫入 Memtable 之后,對寫入請求進行響應; Memtable 寫滿之后,會 Flush 到 Entry Logger 和 Index cache,Entry Logger 中保存了數(shù)據(jù),Index cache 保存了數(shù)據(jù)的索引信息,然后由后臺線程將 Entry Logger 和 Index cache 數(shù)據(jù)落到磁盤。
數(shù)據(jù)的讀取流程:
如果是 Tailing read 請求,直接從 Memtable 中讀取 Entry; 如果是 Catch-up read(滯后消費)請求,先讀取 Index 信息,然后索引從 Entry Logger 文件讀取 Entry。

一般在進行 Bookie 的配置時,會將 Journal 和 Ledger 存儲磁盤進行隔離,減少 Ledger 對于 Journal 寫入的影響,并且推薦 Journal 使用性能較好的 SSD 磁盤,讀寫分離主要體現(xiàn)在:
寫入 Entry 時,Journal 中的數(shù)據(jù)需要實時寫到磁盤,Ledger 的數(shù)據(jù)不需要實時落盤,通過后臺線程批量落盤,因此寫入的性能主要受到 Journal 磁盤的影響; 讀取 Entry 時,首先從 Memtable 讀取,命中則返回;如果不命中,再從 Ledger 磁盤中讀取,所以對于 Catch-up read 的場景,讀取數(shù)據(jù)會影響 Ledger 磁盤的 IO,對 Journal 磁盤沒有影響,也就不會影響到數(shù)據(jù)的寫入。
所以,數(shù)據(jù)寫入是主要是受 Journal 磁盤的負載影響,不會受 Ledger 磁盤的影響。另外,Segment 存儲的多個副本都可以提供讀取服務,相比于主從副本的設計,Apache Pulsar 可以提供更好的數(shù)據(jù)讀取能力。
通過以上分析,Apache Pulsar 使用 Apache BookKeeper 作為數(shù)據(jù)存儲,可以帶來下列的收益:
支持將多個 Ledger 的數(shù)據(jù)寫入到同一個 Entry logger 文件,可以避免分區(qū)膨脹帶來的性能下降問題 支持讀寫分離,可以在滯后消費場景導致磁盤 IO 上升時,保證數(shù)據(jù)寫入的不受影響 支持全副本讀取,可以充分利用存儲副本的數(shù)據(jù)讀取能力
多種消費模型
Pulsar 提供了多種訂閱方式來消費消息,分為三種類型:獨占(Exclusive),故障切換(Failover)或共享(Share)。
Exclusive 獨占訂閱 :在任何時間,一個消費者組(訂閱)中有且只有一個消費者來消費 Topic 中的消息。 Failover 故障切換:多個消費者(Consumer)可以附加到同一訂閱。但是,一個訂閱中的所有消費者,只會有一個消費者被選為該訂閱的主消費者。其他消費者將被指定為故障轉移消費者。當主消費者斷開連接時,分區(qū)將被重新分配給其中一個故障轉移消費者,而新分配的消費者將成為新的主消費者。發(fā)生這種情況時,所有未確認(ack)的消息都將傳遞給新的主消費者。 Share 共享訂閱:使用共享訂閱,在同一個訂閱背后,用戶按照應用的需求掛載任意多的消費者。訂閱中的所有消息以循環(huán)分發(fā)形式發(fā)送給訂閱背后的多個消費者,并且一個消息僅傳遞給一個消費者。
當消費者斷開連接時,所有傳遞給它但是未被確認(ack)的消息將被重新分配和組織,以便發(fā)送給該訂閱上剩余的剩余消費者。

多種 ACK 模型
消息確認(ACK)的目的就是保證當發(fā)生故障后,消費者能夠從上一次停止的地方恢復消費,保證既不會丟失消息,也不會重復處理已經(jīng)確認(ACK)的消息。在 Pulsar 中,每個訂閱中都使用一個專門的數(shù)據(jù)結構–游標(Cursor)來跟蹤訂閱中的每條消息的確認(ACK)狀態(tài)。每當消費者在分區(qū)上確認消息時,游標都會更新。
Pulsar 提供兩種消息確認方法:
單條確認(Individual Ack),單獨確認一條消息。被確認后的消息將不會被重新傳遞 累積確認(Cumulative Ack),通過累積確認,消費者只需要確認它收到的最后一條消息

上圖說明了單條確認和累積確認的差異(灰色框中的消息被確認并且不會被重新傳遞)。對于累計確認,M12 之前的消息被標記為 Acked。對于單獨進行 ACK,僅確認消息 M7 和 M12, 在消費者失敗的情況下,除了 M7 和 M12 之外,其他所有消息將被重新傳送。
— 本文結束 —

關注我,回復 「加群」 加入各種主題討論群。
對「服務端思維」有期待,請在文末點個在看
喜歡這篇文章,歡迎轉發(fā)、分享朋友圈


