Kafka的心跳處理機(jī)制竟然用到了時(shí)間輪算法?
點(diǎn)擊上方“服務(wù)端思維”,選擇“設(shè)為星標(biāo)”
回復(fù)”669“獲取獨(dú)家整理的精選資料集
回復(fù)”加群“加入全國(guó)服務(wù)端高端社群「后端圈」
? 作者 | 丁威
Broker端與客戶(hù)端的心跳在Kafka中非常的重要,因?yàn)橐坏┰谝粋€(gè)心跳過(guò)期周期內(nèi)(默認(rèn)10s),Broker端的消費(fèi)組組協(xié)調(diào)器(GroupCoordinator)會(huì)把消費(fèi)者從消費(fèi)組中移除,從而觸發(fā)重平衡。在2.4.x以下其版本中,消費(fèi)組一旦進(jìn)入重平衡狀態(tài),該消費(fèi)組內(nèi)所有消費(fèi)者全部暫停消費(fèi),直到重平衡完成。
本文將來(lái)探討Kafka的心跳機(jī)制的具體實(shí)現(xiàn)。本文的組織結(jié)構(gòu)如下:
源碼解讀Kafka心跳機(jī)制 Kafka心跳架構(gòu)設(shè)計(jì)亮點(diǎn)(時(shí)間輪調(diào)度算法實(shí)現(xiàn)原理圖)
溫馨提示:如果大家對(duì)源碼閱讀不感興趣,可以直接跳到本文的第二部分,用流程圖、數(shù)據(jù)結(jié)構(gòu)圖闡述心跳的實(shí)現(xiàn)機(jī)制。
1、源碼分析Kafka心跳機(jī)制
在介紹源碼分析之前介紹筆直的一條源碼分析經(jīng)驗(yàn):找準(zhǔn)入口,了解調(diào)用鏈路。故筆者會(huì)先尋找歸納出Kafka心跳處理的所有入口。
1.1Kafka心跳入口總結(jié)
Kafka心跳包的處理流程如下圖所示:

圖的右邊是kafka心跳在服務(wù)端的核心處理流程,而左邊主要展示kafka中所有的心跳請(qǐng)求,根據(jù)上圖得知Kafka觸發(fā)心跳處理的主要請(qǐng)求分別如下:
KafkaConsume主動(dòng)發(fā)送心跳包 消費(fèi)者會(huì)以3s的頻率向服務(wù)端發(fā)送心跳包,服務(wù)端對(duì)應(yīng)的入口為 KafkaApis的handleHeartbeatRequest方法。
消費(fèi)者加入消費(fèi)組 在消費(fèi)端重平衡過(guò)程中,客戶(hù)端主動(dòng)向其組協(xié)調(diào)器發(fā)起Join_Group(加入消費(fèi)組)時(shí),組協(xié)調(diào)器會(huì)認(rèn)為收到一個(gè)有效的心跳包,服務(wù)端對(duì)應(yīng)的處理入口:KafkaApis的handleJoinGroup方法。
消費(fèi)者獲取隊(duì)列負(fù)載結(jié)果 在重平衡的第二個(gè)階段,消費(fèi)組的Leader在計(jì)算出分區(qū)負(fù)載結(jié)果后會(huì)發(fā)給組協(xié)調(diào)器,消費(fèi)組中的其他成員需要發(fā)生Sync_Group請(qǐng)求獲取負(fù)載結(jié)果,組協(xié)調(diào)器同樣認(rèn)為收到了一個(gè)有效的心跳包。服務(wù)端對(duì)應(yīng)的處理入口:KafkaApis的handleSyncGroupRequest。
消費(fèi)者提交位點(diǎn) 消費(fèi)者組協(xié)調(diào)器收到消費(fèi)者提交位點(diǎn)請(qǐng)求,同樣可以認(rèn)定消費(fèi)者是存活的。位點(diǎn)提交的處理入口:KafkaApis的handlerCommitOffsets方法。
__consumers_offsets主題的ISR的Leader發(fā)生變化
如果__consumers_offsets主題中的各個(gè)分區(qū)Leader發(fā)生變化,與特定分區(qū)的組協(xié)調(diào)器需要重新選舉,與此組協(xié)調(diào)器相關(guān)的消費(fèi)者將觸發(fā)重平衡。
上述任何一種請(qǐng)求,都能表明消費(fèi)端是存活的,故能有效阻止服務(wù)端將客戶(hù)端端心跳設(shè)置為過(guò)期,進(jìn)入下一個(gè)心跳檢測(cè)周期。
上述各個(gè)入口,特別是__consumers_offsets的ISR對(duì)消費(fèi)組的影響,后續(xù)會(huì)專(zhuān)門(mén)展開(kāi)研究,現(xiàn)在我們將重心轉(zhuǎn)移到服務(wù)端是如何處理一個(gè)心跳包的。
1.2 源碼分析Kafka心跳處理機(jī)制
從上面的流程圖可以得出,Kafka收到一個(gè)心跳包后的處理入口為GroupCoordinator的completeAndScheduleNextExpiration方法,核心代碼如下圖所示:

在介紹該方法之前首先介紹一個(gè)該方法的入?yún)⒑x:
GroupMetadata group 消費(fèi)組的元信息。 MemberMetadata member 消費(fèi)者的元信息。 long timeoutMs 心跳超時(shí)時(shí)間,默認(rèn)為10s,這個(gè)參數(shù)是由消費(fèi)端的session.timeout.ms參數(shù)設(shè)置,默認(rèn)為10s。
Step1:為消費(fèi)組設(shè)置唯一標(biāo)識(shí):groupId + "-" + memberId構(gòu)成。
Step2:將hearbeatSatisfied設(shè)置為true,表示該消費(fèi)者收到一個(gè)有效的心跳包。
Step3:收到一個(gè)有效的心跳包,通知定時(shí)調(diào)度器停止本次的心跳過(guò)期檢測(cè)。
Step4:構(gòu)建一個(gè)DelayedHearbeat,進(jìn)入下一個(gè)心跳檢測(cè)周期。
接下來(lái)將分別對(duì)Step3、Step4展開(kāi)詳細(xì)介紹。
1.2.1 心跳檢測(cè)正常處理邏輯
在收到一個(gè)心跳包時(shí),嘗試將本次檢測(cè)設(shè)置成功,具體的實(shí)現(xiàn)由DelayedOperation的checkAndComplete方法,代碼如下:

Kafka使用一個(gè)數(shù)據(jù)結(jié)構(gòu)來(lái)存儲(chǔ)需要跟蹤的所有消費(fèi)者,在這里成為Watch機(jī)制。
實(shí)現(xiàn)要點(diǎn):根據(jù)key獲取WatchList,然后從獲取的WatchList中內(nèi)部的ConcurrentMap中再按照Key獲取對(duì)應(yīng)與當(dāng)前消費(fèi)者對(duì)應(yīng)的Watch。
如果沒(méi)有找到對(duì)應(yīng)消費(fèi)者的Watch,則直接返回,無(wú)需檢測(cè),說(shuō)明已經(jīng)成功檢測(cè)。 如果找到了對(duì)應(yīng)消費(fèi)者的Watch,則執(zhí)行被watch的tryCompleteWatched方法。
Watch的數(shù)據(jù)結(jié)構(gòu)如下:
接下來(lái)重點(diǎn)關(guān)注Watches的tryCompleteWatched方法,該方法的詳細(xì)調(diào)用代碼如下圖所示:
如果消費(fèi)組的狀態(tài)處于Dead 如果消費(fèi)組的狀態(tài)為Pending(消費(fèi)組在重平衡中) hearbeatSatisfied為true,即收到了一個(gè)有效的心跳包。
上述代碼的實(shí)現(xiàn)比較簡(jiǎn)單,這里就不一一羅列,其核心關(guān)鍵點(diǎn)如下:
刪除對(duì)應(yīng)的Watch,表示一次心跳檢測(cè)成功。 Watchs中存儲(chǔ)的對(duì)象是DelayedOperation(Kafka延遲類(lèi)型的父類(lèi))的子類(lèi),在心跳檢測(cè)中具體為DelayedHeartbeat。 最終執(zhí)行DelayedOperation的是TimeTask的cancel方法(取消延遲任務(wù)),就是從延遲調(diào)度中移除自己,表示沒(méi)有超時(shí),結(jié)束本輪的超時(shí)檢測(cè),具體的存儲(chǔ)結(jié)構(gòu),將在下文詳介紹如果開(kāi)啟新一輪心跳檢測(cè)時(shí)再詳細(xì)講解。
為了方便大家閱讀源碼,其主要的調(diào)用時(shí)序圖如下:

1.2.2 開(kāi)啟下一輪心跳檢測(cè)
1.2.2.1將延遲任務(wù)放入時(shí)間輪
在接受到一個(gè)新的心跳包首先用于清除上一輪設(shè)置的延遲任務(wù),然后需要開(kāi)啟一個(gè)新的延遲任務(wù),接下來(lái)我們將來(lái)具體看看Kafka如何開(kāi)啟新一輪心跳檢測(cè)機(jī)制,**其本質(zhì)上是Kafka的延遲(定時(shí))實(shí)現(xiàn)原理。**代碼入口如下圖所示:

開(kāi)啟下一輪調(diào)度時(shí)首先將Member的heartbeatSatisfied設(shè)置為false。
其核心思想是創(chuàng)建一個(gè)心跳延遲任務(wù)DelayedHeartbeat,并對(duì)其檢測(cè)是否完成或者添加Watch,啟動(dòng)心跳延遲或者等待下一個(gè)心跳包的到來(lái)。
其實(shí)看到這里,我們應(yīng)該能得到一個(gè)關(guān)于Kafka心跳檢測(cè)機(jī)制的實(shí)現(xiàn)思路:
開(kāi)啟一個(gè)延遲任務(wù),延遲檢查時(shí)間為心跳過(guò)期時(shí)間,一旦延遲任務(wù)執(zhí)行,則意外著心跳超時(shí)。 當(dāng)收到一個(gè)心跳包時(shí),需要取消上一次設(shè)置的延遲任務(wù)。 使用循環(huán)使用延遲任務(wù),從而實(shí)現(xiàn)類(lèi)似定時(shí)任務(wù)的效果。
接下來(lái)我們?cè)敿?xì)探討一下DelayedOperationPurgatory的tryCompleteElseWatch方法,其代碼如下圖所示:

Step1:嘗試調(diào)用DelayedHeartbeat的tryComplete方法,判斷是否可以判斷完成,這里主要是消費(fèi)組是否為重平衡或者狀態(tài)為Dead,如果上述情況不滿(mǎn)足,則會(huì)返回false,因?yàn)樵诎l(fā)起下一輪心跳包時(shí)已將heartbeatSatisfied設(shè)置為false。
Step2:為該消費(fèi)者添加到Watch中,表示kafka需要跟蹤該消費(fèi)者的心跳。
Step3:再次調(diào)用maybeTryComplete方法,再?lài)L試判斷是否該心跳檢測(cè)完成。
Step4:如果沒(méi)有完成,則該任務(wù)延遲任務(wù)(DelayedHeartbeat)添加到定時(shí)調(diào)度中。
接下來(lái)將進(jìn)入到Kafka心跳的核心機(jī)制,即延遲任務(wù)的實(shí)現(xiàn)機(jī)制。


并持有一個(gè)關(guān)鍵字段:該定時(shí)任務(wù)的過(guò)期時(shí)間,等于系統(tǒng)當(dāng)前時(shí)間+過(guò)期時(shí)間,在心跳檢測(cè)場(chǎng)景中默認(rèn)為10s。
繼續(xù)跟蹤SystemTimer的addTimerTaskEntry,其代碼如下:

addTimerTaskEntry的核心實(shí)現(xiàn)如下:
嘗試將延遲任務(wù)添加到時(shí)間輪,如果已經(jīng)過(guò)期,則提交到線程池,觸發(fā)心跳過(guò)期的邏輯,提交到線程后,DelayedOperation的run方法會(huì)被調(diào)用,最終onExpiration方法被調(diào)用。

接下來(lái)重點(diǎn)談一下往時(shí)間輪中添加任務(wù)的具體實(shí)現(xiàn),核心代碼見(jiàn)下圖所示:

核心實(shí)現(xiàn)要點(diǎn):
Step1:如果任務(wù)已經(jīng)被取消或者已過(guò)期,返回false。如果返回false,則會(huì)觸發(fā)定時(shí)任務(wù)過(guò)期。
Step2根據(jù)過(guò)期時(shí)間,放入到時(shí)間輪中指定的位置,時(shí)間輪的數(shù)據(jù)結(jié)構(gòu)如下:

每一個(gè)格代表一個(gè)時(shí)間間隔,例如200ms,當(dāng)前指針指向的格子,代表該格子中的所有任務(wù)過(guò)期,例如現(xiàn)在要要插入一個(gè)700ms過(guò)期,從當(dāng)前指針的下一格開(kāi)始算起,放入第4格中。
另外時(shí)間輪的總格子有限,則該時(shí)間輪能計(jì)算的最大時(shí)間是有限的,例如一個(gè)8格的時(shí)間輪,每一格代表200ms,則如果要在2s后過(guò)期,顯然這個(gè)時(shí)間輪無(wú)法存儲(chǔ),通常的解決方案是采用多級(jí)時(shí)間輪,另外一級(jí)的時(shí)間輪,其時(shí)間精度會(huì)更粗。
結(jié)合上述關(guān)于時(shí)間輪的原理,再去看上述代碼,就顯得容易看懂了。
Step3:就是處理第一級(jí)時(shí)間輪無(wú)法滿(mǎn)足過(guò)期時(shí)間,則放入到第二級(jí)時(shí)間輪中。
1.2.2.2 驅(qū)動(dòng)時(shí)間輪
基于時(shí)間輪算法,除了數(shù)據(jù)按找時(shí)間輪到方向、觸發(fā)時(shí)間存儲(chǔ)在合適的刻度量,還需要驅(qū)動(dòng)時(shí)間輪指針。Kafka中的驅(qū)動(dòng)時(shí)間輪入口為:

具體實(shí)現(xiàn)代碼如下:

上述代碼看起來(lái)比較簡(jiǎn)單,就不一一介紹,為了方便大家讀懂上面的代碼,我們只需要了解一下kafka采用時(shí)間輪的實(shí)際存儲(chǔ)數(shù)據(jù)結(jié)構(gòu),即能很容易理解上述代碼:

其核心特點(diǎn):環(huán)形隊(duì)列就是一個(gè)數(shù)組,每一個(gè)元素在Kafka中對(duì)應(yīng)一個(gè)桶,每一個(gè)桶存儲(chǔ)一個(gè)TimerTaskList(鏈表),每次指針指向的TimerTaskList,將該鏈表中的元素代表的任務(wù)全部執(zhí)行。
2、圖解Kafka心跳架構(gòu)設(shè)計(jì)
讀起源碼來(lái)說(shuō)或許比較枯燥,接下來(lái)給出Kafka心跳處理的圖解,重點(diǎn)是闡述Kafka時(shí)間輪算法的核心數(shù)據(jù)結(jié)構(gòu)。

— 本文結(jié)束 —

●?漫談設(shè)計(jì)模式在 Spring 框架中的良好實(shí)踐
關(guān)注我,回復(fù) 「加群」 加入各種主題討論群。
對(duì)「服務(wù)端思維」有期待,請(qǐng)?jiān)谖哪c(diǎn)個(gè)在看
喜歡這篇文章,歡迎轉(zhuǎn)發(fā)、分享朋友圈


