聊聊 Kafka:Kafka 如何保證一致性
一、前言
在如今的分布式環(huán)境時(shí)代,任何一款中間件產(chǎn)品,大多都有一套機(jī)制去保證一致性的,Kafka 作為一個(gè)商業(yè)級(jí)消息中間件,消息一致性的重要性可想而知,那 Kafka 如何保證一致性的呢?本文從高水位更新機(jī)制、副本同步機(jī)制以及 Leader Epoch 幾個(gè)方面去介紹 Kafka 是如何保證一致性的。
二、HW 和 LEO
要想 Kafka 保證一致性,我們必須先了解 HW(High Watermark)高水位和 LEO(Log End Offset)日志末端位移,看下面這張圖你就清晰了:

高水位的作用:
定義消息可見性,即用來標(biāo)識(shí)分區(qū)下的哪些消息是可以被消費(fèi)者消費(fèi)的。
幫助 Kafka 完成副本同步
這里我們不討論 Kafka 事務(wù),因?yàn)槭聞?wù)機(jī)制會(huì)影響消費(fèi)者所能看到的消息的范圍,它不只是簡(jiǎn)單依賴高水位來判斷。它依靠一個(gè)名為 LSO(Log Stable Offset)的位移值來判斷事務(wù)型消費(fèi)者的可見性。
日志末端位移的作用:
副本寫入下一條消息的位移值
數(shù)字 15 所在的方框是虛線,這就說明,這個(gè)副本當(dāng)前只有 15 條消息,位移值是從 0 到 14,下一條新消息的位移是 15。
介于高水位和 LEO 之間的消息就屬于未提交消息。這也反映出一個(gè)事實(shí),那就是:同一個(gè)副本對(duì)象,其高水位值不會(huì)大于 LEO 值。
高水位和 LEO 是副本對(duì)象的兩個(gè)重要屬性。Kafka 所有副本都有對(duì)應(yīng)的高水位和 LEO 值,而不僅僅是 Leader 副本。只不過 Leader 副本比較特殊,Kafka 使用 Leader 副本的高水位來定義所在分區(qū)的高水位。換句話說,分區(qū)的高水位就是其 Leader 副本的高水位。
三、HW 和 LEO 的更新機(jī)制
現(xiàn)在,我們知道了每個(gè)副本對(duì)象都保存了一組高水位值和 LEO 值,但實(shí)際上,在 Leader 副本所在的 Broker 上,還保存了其他 Follower 副本的 LEO 值,請(qǐng)看下圖:

從圖中可以看出,Broker 0 上保存了某分區(qū)的 Leader 副本和所有 Follower 副本的 LEO 值,而 Broker 1 上僅僅保存了該分區(qū)的某個(gè) Follower 副本。Kafka 把 Broker 0 上保存的這些 Follower 副本又稱為遠(yuǎn)程副本(Remote Replica)。Kafka 副本機(jī)制在運(yùn)行過程中,會(huì)更新 Broker 1 上 Follower 副本的高水位和 LEO 值,同時(shí)也會(huì)更新 Broker 0 上 Leader 副本的高水位和 LEO 以及所有遠(yuǎn)程副本的 LEO,但它不會(huì)更新遠(yuǎn)程副本的高水位值,也就是我在圖中標(biāo)記為灰色的部分。
這里你可能就困惑了?
為啥 Leader 副本所在的 Broker 上,還保存了其他 Follower 副本的 LEO 值?
為啥 Leader 副本所在的 Broker 上不會(huì)更新 Follower 副本 HW?
別著急,老周帶你看下源碼:
在 kafka.cluster.Partition#makeLeader 中:


Leader 副本所在的 Broker 上只有重置更新遠(yuǎn)程副本的 LEO,并沒有遠(yuǎn)程副本的 HW。
這里你又可能會(huì)問了?
為什么要在 Broker 0 上保存這些遠(yuǎn)程副本呢?
Broker 0 不會(huì)更新遠(yuǎn)程副本 HW,那遠(yuǎn)程副本的 HW 的更新機(jī)制又是怎樣的呢?
Broker 0 上保存這些遠(yuǎn)程副本的主要作用是,幫助 Leader 副本確定其高水位,也就是分區(qū)高水位。
第二個(gè)問題我們直接來看下 HW 和 LEO 被更新的時(shí)機(jī):

3.1 Leader 副本
處理生產(chǎn)者請(qǐng)求的邏輯如下:
寫入消息到本地磁盤
更新分區(qū)高水位值
獲取 Leader 副本所在 Broker 端保存的所有遠(yuǎn)程副本 LEO 值(LEO-1,LEO-2,……,LEO-n)
獲取 Leader 副本高水位值:currentHW
更新 currentHW = max{currentHW, min(LEO-1, LEO-2, ……,LEO-n)}
處理 Follower 副本拉取消息的邏輯如下:
讀取磁盤(或頁(yè)緩存)中的消息數(shù)據(jù)
使用 Follower 副本發(fā)送請(qǐng)求中的位移值更新遠(yuǎn)程副本 LEO 值
更新分區(qū)高水位值(具體步驟與處理生產(chǎn)者請(qǐng)求的步驟相同)
3.2 Follower 副本
從 Leader 拉取消息的處理邏輯如下:
寫入消息到本地磁盤
更新 LEO 值
更新高水位值
獲取 Leader 發(fā)送的高水位值:currentHW
獲取步驟 2 中更新過的 LEO 值:currentLEO
更新高水位為 min(currentHW, currentLEO)
四、副本同步機(jī)制
搞清楚了上面 HW 和 LEO 的更新機(jī)制后,我們舉一個(gè)單分區(qū)且有兩個(gè)副本的主題來演示下 Kafka 副本同步的全流程。
當(dāng)生產(chǎn)者發(fā)送一條消息時(shí),Leader 和 Follower 副本對(duì)應(yīng)的 HW 和 LEO 是怎么被更新的呢?
首先是初始狀態(tài)。下面這張圖中的 remote LEO 就是剛才的遠(yuǎn)程副本的 LEO 值。在初始狀態(tài)時(shí),所有值都是 0。

當(dāng)生產(chǎn)者給主題分區(qū)發(fā)送一條消息后,狀態(tài)變更為:

此時(shí),Leader 副本成功將消息寫入了本地磁盤,故 LEO 值被更新為 1。
Follower 再次嘗試從 Leader 拉取消息。和之前不同的是,這次有消息可以拉取了,因此狀態(tài)進(jìn)一步變更為:

這時(shí),F(xiàn)ollower 副本也成功地更新 LEO 為 1。此時(shí),Leader 和 Follower 副本的 LEO 都是 1,但各自的高水位依然是 0,還沒有被更新。它們需要在下一輪的拉取中被更新,如下圖所示:

在新一輪的拉取請(qǐng)求中,由于位移值是 0 的消息已經(jīng)拉取成功,因此 Follower 副本這次請(qǐng)求拉取的是位移值 =1 的消息。Leader 副本接收到此請(qǐng)求后,更新遠(yuǎn)程副本 LEO 為 1,然后更新 Leader 高水位為 1。做完這些之后,它會(huì)將當(dāng)前已更新過的高水位值 1 發(fā)送給 Follower 副本。Follower 副本接收到以后,也將自己的高水位值更新成 1。至此,一次完整的消息同步周期就結(jié)束了。事實(shí)上,Kafka 就是利用這樣的機(jī)制,實(shí)現(xiàn)了 Leader 和 Follower 副本之間的同步。
五、Leader Epoch 機(jī)制
上面的副本同步機(jī)制似乎很完美,我們不妨來思考下這種場(chǎng)景:
從剛才的分析中,我們知道,F(xiàn)ollower 副本的高水位更新需要一輪額外的拉取請(qǐng)求才能實(shí)現(xiàn)。如果把上面那個(gè)例子擴(kuò)展到多個(gè) Follower 副本,情況可能更糟,也許需要多輪拉取請(qǐng)求。也就是說,Leader 副本高水位更新和 Follower 副本高水位更新在時(shí)間上是存在錯(cuò)配的。這種錯(cuò)配是很多“數(shù)據(jù)丟失”或“數(shù)據(jù)不一致”問題的根源。基于此,社區(qū)在 0.11 版本正式引入了 Leader Epoch 概念,來規(guī)避因高水位更新錯(cuò)配導(dǎo)致的各種不一致問題。
所謂 Leader Epoch,我們大致可以認(rèn)為是 Leader 版本。它由兩部分?jǐn)?shù)據(jù)組成。
Epoch。一個(gè)單調(diào)增加的版本號(hào)。每當(dāng)副本領(lǐng)導(dǎo)權(quán)發(fā)生變更時(shí),都會(huì)增加該版本號(hào)。小版本號(hào)的 Leader 被認(rèn)為是過期 Leader,不能再行使 Leader 權(quán)力。起始位移(Start Offset)。Leader 副本在該 Epoch 值上寫入的首條消息的位移。
我舉個(gè)例子來說明一下 Leader Epoch。假設(shè)現(xiàn)在有兩個(gè) Leader Epoch<0, 0> 和 <1, 120>,那么,第一個(gè) Leader Epoch 表示版本號(hào)是 0,這個(gè)版本的 Leader 從位移 0 開始保存消息,一共保存了 120 條消息。之后,Leader 發(fā)生了變更,版本號(hào)增加到 1,新版本的起始位移是 120。
Kafka Broker 會(huì)在內(nèi)存中為每個(gè)分區(qū)都緩存 Leader Epoch 數(shù)據(jù),同時(shí)它還會(huì)定期地將這些信息持久化到一個(gè) checkpoint 文件中。當(dāng) Leader 副本寫入消息到磁盤時(shí),Broker 會(huì)嘗試更新這部分緩存。如果該 Leader 是首次寫入消息,那么 Broker 會(huì)向緩存中增加一個(gè) Leader Epoch 條目,否則就不做更新。這樣,每次有 Leader 變更時(shí),新的 Leader 副本會(huì)查詢這部分緩存,取出對(duì)應(yīng)的 Leader Epoch 的起始位移,以避免數(shù)據(jù)丟失和不一致的情況。
源碼在 org.apache.kafka.raft.LeaderState 中:

Kafka Broker 會(huì)在內(nèi)存中為每個(gè)分區(qū)都緩存 Leader Epoch 數(shù)據(jù):

同時(shí)它還會(huì)定期地將這些信息持久化到一個(gè) checkpoint 文件中:
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState#write

接下來,我們來看一個(gè)實(shí)際的例子,它展示的是 Leader Epoch 是如何防止數(shù)據(jù)丟失的。請(qǐng)先看下圖:

開始時(shí),副本 A 和副本 B 都處于正常狀態(tài),A 是 Leader 副本。某個(gè)使用了默認(rèn) acks 設(shè)置的生產(chǎn)者程序向 A 發(fā)送了兩條消息,A 全部寫入成功,此時(shí) Kafka 會(huì)通知生產(chǎn)者說兩條消息全部發(fā)送成功。
現(xiàn)在我們假設(shè) Leader 和 Follower 都寫入了這兩條消息,而且 Leader 副本的高水位也已經(jīng)更新了,但 Follower 副本高水位還未更新——這是可能出現(xiàn)的。還記得吧,F(xiàn)ollower 端高水位的更新與 Leader 端有時(shí)間錯(cuò)配。倘若此時(shí)副本 B 所在的 Broker 宕機(jī),當(dāng)它重啟回來后,副本 B 會(huì)執(zhí)行日志截?cái)嗖僮鳎瑢?LEO 值調(diào)整為之前的高水位值,也就是 1。這就是說,位移值為 1 的那條消息被副本 B 從磁盤中刪除,此時(shí)副本 B 的底層磁盤文件中只保存有 1 條消息,即位移值為 0 的那條消息。
當(dāng)執(zhí)行完截?cái)嗖僮骱螅北?B 開始從 A 拉取消息,執(zhí)行正常的消息同步。如果就在這個(gè)節(jié)骨眼上,副本 A 所在的 Broker 宕機(jī)了,那么 Kafka 就別無(wú)選擇,只能讓副本 B 成為新的 Leader,此時(shí),當(dāng) A 回來后,需要執(zhí)行相同的日志截?cái)嗖僮鳎磳⒏咚徽{(diào)整為與 B 相同的值,也就是 1。這樣操作之后,位移值為 1 的那條消息就從這兩個(gè)副本中被永遠(yuǎn)地抹掉了。這就是這張圖要展示的數(shù)據(jù)丟失場(chǎng)景。
嚴(yán)格來說,這個(gè)場(chǎng)景發(fā)生的前提是 Broker 端參數(shù) min.insync.replicas 設(shè)置為 1。此時(shí)一旦消息被寫入到 Leader 副本的磁盤,就會(huì)被認(rèn)為是“已提交狀態(tài)”,但現(xiàn)有的時(shí)間錯(cuò)配問題導(dǎo)致 Follower 端的高水位更新是有滯后的。如果在這個(gè)短暫的滯后時(shí)間窗口內(nèi),接連發(fā)生 Broker 宕機(jī),那么這類數(shù)據(jù)的丟失就是不可避免的。
現(xiàn)在,我們來看下如何利用 Leader Epoch 機(jī)制來規(guī)避這種數(shù)據(jù)丟失。請(qǐng)看下圖:

場(chǎng)景和之前大致是類似的,只不過引用 Leader Epoch 機(jī)制后,F(xiàn)ollower 副本 B 重啟回來后,需要向 A 發(fā)送一個(gè)特殊的請(qǐng)求去獲取 Leader 的 LEO 值。在這個(gè)例子中,該值為 2。當(dāng)獲知到 Leader LEO=2 后,B 發(fā)現(xiàn)該 LEO 值不比它自己的 LEO 值小,而且緩存中也沒有保存任何起始位移值 > 2 的 Epoch 條目,因此 B 無(wú)需執(zhí)行任何日志截?cái)嗖僮鳌_@是對(duì)高水位機(jī)制的一個(gè)明顯改進(jìn),即副本是否執(zhí)行日志截?cái)嗖辉僖蕾囉诟咚贿M(jìn)行判斷。
現(xiàn)在,副本 A 宕機(jī)了,B 成為 Leader。同樣地,當(dāng) A 重啟回來后,執(zhí)行與 B 相同的邏輯判斷,發(fā)現(xiàn)也不用執(zhí)行日志截?cái)啵链宋灰浦禐?1 的那條消息在兩個(gè)副本中均得到保留。后面當(dāng)生產(chǎn)者程序向 B 寫入新消息時(shí),副本 B 所在的 Broker 緩存中,會(huì)生成新的 Leader Epoch 條目:[Epoch=1, Offset=2]。之后,副本 B 會(huì)使用這個(gè)條目幫助判斷后續(xù)是否執(zhí)行日志截?cái)嗖僮鳌_@樣,通過 Leader Epoch 機(jī)制,Kafka 完美地規(guī)避了這種數(shù)據(jù)丟失場(chǎng)景。
歡迎大家關(guān)注我的公眾號(hào)【老周聊架構(gòu)】,Java后端主流技術(shù)棧的原理、源碼分析、架構(gòu)以及各種互聯(lián)網(wǎng)高并發(fā)、高性能、高可用的解決方案。
