<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Kafka的心跳處理機(jī)制竟然用到了時(shí)間輪算法?

          共 4525字,需瀏覽 10分鐘

           ·

          2021-11-24 09:06

          點(diǎn)擊上方“服務(wù)端思維”,選擇“設(shè)為星標(biāo)

          回復(fù)”669“獲取獨(dú)家整理的精選資料集

          回復(fù)”加群“加入全國(guó)服務(wù)端高端社群「后端圈」


          ? 作者 | 丁威

          出品?| 中間件興趣圈


          Broker端與客戶(hù)端的心跳在Kafka中非常的重要,因?yàn)橐坏┰谝粋€(gè)心跳過(guò)期周期內(nèi)(默認(rèn)10s),Broker端的消費(fèi)組組協(xié)調(diào)器(GroupCoordinator)會(huì)把消費(fèi)者從消費(fèi)組中移除,從而觸發(fā)重平衡。在2.4.x以下其版本中,消費(fèi)組一旦進(jìn)入重平衡狀態(tài),該消費(fèi)組內(nèi)所有消費(fèi)者全部暫停消費(fèi),直到重平衡完成。

          本文將來(lái)探討Kafka的心跳機(jī)制的具體實(shí)現(xiàn)。本文的組織結(jié)構(gòu)如下:

          • 源碼解讀Kafka心跳機(jī)制
          • Kafka心跳架構(gòu)設(shè)計(jì)亮點(diǎn)(時(shí)間輪調(diào)度算法實(shí)現(xiàn)原理圖)

          溫馨提示:如果大家對(duì)源碼閱讀不感興趣,可以直接跳到本文的第二部分,用流程圖、數(shù)據(jù)結(jié)構(gòu)圖闡述心跳的實(shí)現(xiàn)機(jī)制。

          1、源碼分析Kafka心跳機(jī)制

          在介紹源碼分析之前介紹筆直的一條源碼分析經(jīng)驗(yàn):找準(zhǔn)入口,了解調(diào)用鏈路。故筆者會(huì)先尋找歸納出Kafka心跳處理的所有入口。

          1.1Kafka心跳入口總結(jié)

          Kafka心跳包的處理流程如下圖所示:

          圖的右邊是kafka心跳在服務(wù)端的核心處理流程,而左邊主要展示kafka中所有的心跳請(qǐng)求,根據(jù)上圖得知Kafka觸發(fā)心跳處理的主要請(qǐng)求分別如下:

          1. KafkaConsume主動(dòng)發(fā)送心跳包 消費(fèi)者會(huì)以3s的頻率向服務(wù)端發(fā)送心跳包,服務(wù)端對(duì)應(yīng)的入口為 KafkaApis的handleHeartbeatRequest方法。

          2. 消費(fèi)者加入消費(fèi)組 在消費(fèi)端重平衡過(guò)程中,客戶(hù)端主動(dòng)向其組協(xié)調(diào)器發(fā)起Join_Group(加入消費(fèi)組)時(shí),組協(xié)調(diào)器會(huì)認(rèn)為收到一個(gè)有效的心跳包,服務(wù)端對(duì)應(yīng)的處理入口:KafkaApis的handleJoinGroup方法。

          3. 消費(fèi)者獲取隊(duì)列負(fù)載結(jié)果 在重平衡的第二個(gè)階段,消費(fèi)組的Leader在計(jì)算出分區(qū)負(fù)載結(jié)果后會(huì)發(fā)給組協(xié)調(diào)器,消費(fèi)組中的其他成員需要發(fā)生Sync_Group請(qǐng)求獲取負(fù)載結(jié)果,組協(xié)調(diào)器同樣認(rèn)為收到了一個(gè)有效的心跳包。服務(wù)端對(duì)應(yīng)的處理入口:KafkaApis的handleSyncGroupRequest。

          4. 消費(fèi)者提交位點(diǎn) 消費(fèi)者組協(xié)調(diào)器收到消費(fèi)者提交位點(diǎn)請(qǐng)求,同樣可以認(rèn)定消費(fèi)者是存活的。位點(diǎn)提交的處理入口:KafkaApis的handlerCommitOffsets方法。

          5. __consumers_offsets主題的ISR的Leader發(fā)生變化

            如果__consumers_offsets主題中的各個(gè)分區(qū)Leader發(fā)生變化,與特定分區(qū)的組協(xié)調(diào)器需要重新選舉,與此組協(xié)調(diào)器相關(guān)的消費(fèi)者將觸發(fā)重平衡。

          上述任何一種請(qǐng)求,都能表明消費(fèi)端是存活的,故能有效阻止服務(wù)端將客戶(hù)端端心跳設(shè)置為過(guò)期,進(jìn)入下一個(gè)心跳檢測(cè)周期。

          上述各個(gè)入口,特別是__consumers_offsets的ISR對(duì)消費(fèi)組的影響,后續(xù)會(huì)專(zhuān)門(mén)展開(kāi)研究,現(xiàn)在我們將重心轉(zhuǎn)移到服務(wù)端是如何處理一個(gè)心跳包的。

          1.2 源碼分析Kafka心跳處理機(jī)制

          從上面的流程圖可以得出,Kafka收到一個(gè)心跳包后的處理入口為GroupCoordinator的completeAndScheduleNextExpiration方法,核心代碼如下圖所示:

          在介紹該方法之前首先介紹一個(gè)該方法的入?yún)⒑x:

          • GroupMetadata group 消費(fèi)組的元信息。
          • MemberMetadata member 消費(fèi)者的元信息。
          • long timeoutMs 心跳超時(shí)時(shí)間,默認(rèn)為10s,這個(gè)參數(shù)是由消費(fèi)端的session.timeout.ms參數(shù)設(shè)置,默認(rèn)為10s。

          Step1:為消費(fèi)組設(shè)置唯一標(biāo)識(shí):groupId + "-" + memberId構(gòu)成。

          Step2:將hearbeatSatisfied設(shè)置為true,表示該消費(fèi)者收到一個(gè)有效的心跳包。

          Step3:收到一個(gè)有效的心跳包,通知定時(shí)調(diào)度器停止本次的心跳過(guò)期檢測(cè)。

          Step4:構(gòu)建一個(gè)DelayedHearbeat,進(jìn)入下一個(gè)心跳檢測(cè)周期。

          接下來(lái)將分別對(duì)Step3、Step4展開(kāi)詳細(xì)介紹。

          1.2.1 心跳檢測(cè)正常處理邏輯

          在收到一個(gè)心跳包時(shí),嘗試將本次檢測(cè)設(shè)置成功,具體的實(shí)現(xiàn)由DelayedOperation的checkAndComplete方法,代碼如下:

          Kafka使用一個(gè)數(shù)據(jù)結(jié)構(gòu)來(lái)存儲(chǔ)需要跟蹤的所有消費(fèi)者,在這里成為Watch機(jī)制。

          實(shí)現(xiàn)要點(diǎn):根據(jù)key獲取WatchList,然后從獲取的WatchList中內(nèi)部的ConcurrentMap中再按照Key獲取對(duì)應(yīng)與當(dāng)前消費(fèi)者對(duì)應(yīng)的Watch。

          • 如果沒(méi)有找到對(duì)應(yīng)消費(fèi)者的Watch,則直接返回,無(wú)需檢測(cè),說(shuō)明已經(jīng)成功檢測(cè)。
          • 如果找到了對(duì)應(yīng)消費(fèi)者的Watch,則執(zhí)行被watch的tryCompleteWatched方法。

          Watch的數(shù)據(jù)結(jié)構(gòu)如下:

          接下來(lái)重點(diǎn)關(guān)注Watches的tryCompleteWatched方法,該方法的詳細(xì)調(diào)用代碼如下圖所示:

          這邊先重點(diǎn)介紹一下組協(xié)調(diào)器判斷一次成功的心跳檢測(cè)的三個(gè)標(biāo)準(zhǔn)中滿(mǎn)足一個(gè)即可(GroupCoordinator的tryCompleteHeartbeat方法):
          • 如果消費(fèi)組的狀態(tài)處于Dead
          • 如果消費(fèi)組的狀態(tài)為Pending(消費(fèi)組在重平衡中)
          • hearbeatSatisfied為true,即收到了一個(gè)有效的心跳包。

          上述代碼的實(shí)現(xiàn)比較簡(jiǎn)單,這里就不一一羅列,其核心關(guān)鍵點(diǎn)如下:

          • 刪除對(duì)應(yīng)的Watch,表示一次心跳檢測(cè)成功。
          • Watchs中存儲(chǔ)的對(duì)象是DelayedOperation(Kafka延遲類(lèi)型的父類(lèi))的子類(lèi),在心跳檢測(cè)中具體為DelayedHeartbeat。
          • 最終執(zhí)行DelayedOperation的是TimeTask的cancel方法(取消延遲任務(wù)),就是從延遲調(diào)度中移除自己,表示沒(méi)有超時(shí),結(jié)束本輪的超時(shí)檢測(cè),具體的存儲(chǔ)結(jié)構(gòu),將在下文詳介紹如果開(kāi)啟新一輪心跳檢測(cè)時(shí)再詳細(xì)講解。

          為了方便大家閱讀源碼,其主要的調(diào)用時(shí)序圖如下:

          1.2.2 開(kāi)啟下一輪心跳檢測(cè)
          1.2.2.1將延遲任務(wù)放入時(shí)間輪

          在接受到一個(gè)新的心跳包首先用于清除上一輪設(shè)置的延遲任務(wù),然后需要開(kāi)啟一個(gè)新的延遲任務(wù),接下來(lái)我們將來(lái)具體看看Kafka如何開(kāi)啟新一輪心跳檢測(cè)機(jī)制,**其本質(zhì)上是Kafka的延遲(定時(shí))實(shí)現(xiàn)原理。**代碼入口如下圖所示:

          開(kāi)啟下一輪調(diào)度時(shí)首先將Member的heartbeatSatisfied設(shè)置為false。

          其核心思想是創(chuàng)建一個(gè)心跳延遲任務(wù)DelayedHeartbeat,并對(duì)其檢測(cè)是否完成或者添加Watch,啟動(dòng)心跳延遲或者等待下一個(gè)心跳包的到來(lái)。

          其實(shí)看到這里,我們應(yīng)該能得到一個(gè)關(guān)于Kafka心跳檢測(cè)機(jī)制的實(shí)現(xiàn)思路:

          • 開(kāi)啟一個(gè)延遲任務(wù),延遲檢查時(shí)間為心跳過(guò)期時(shí)間,一旦延遲任務(wù)執(zhí)行,則意外著心跳超時(shí)。
          • 當(dāng)收到一個(gè)心跳包時(shí),需要取消上一次設(shè)置的延遲任務(wù)。
          • 使用循環(huán)使用延遲任務(wù),從而實(shí)現(xiàn)類(lèi)似定時(shí)任務(wù)的效果。

          接下來(lái)我們?cè)敿?xì)探討一下DelayedOperationPurgatory的tryCompleteElseWatch方法,其代碼如下圖所示:

          Step1:嘗試調(diào)用DelayedHeartbeat的tryComplete方法,判斷是否可以判斷完成,這里主要是消費(fèi)組是否為重平衡或者狀態(tài)為Dead,如果上述情況不滿(mǎn)足,則會(huì)返回false,因?yàn)樵诎l(fā)起下一輪心跳包時(shí)已將heartbeatSatisfied設(shè)置為false。

          Step2:為該消費(fèi)者添加到Watch中,表示kafka需要跟蹤該消費(fèi)者的心跳。

          Step3:再次調(diào)用maybeTryComplete方法,再?lài)L試判斷是否該心跳檢測(cè)完成。

          Step4:如果沒(méi)有完成,則該任務(wù)延遲任務(wù)(DelayedHeartbeat)添加到定時(shí)調(diào)度中。

          接下來(lái)將進(jìn)入到Kafka心跳的核心機(jī)制,即延遲任務(wù)的實(shí)現(xiàn)機(jī)制

          每一個(gè)待執(zhí)行的延遲任務(wù)被封裝在TimeTaskEntry中,這個(gè)一個(gè)典型的雙鏈表,數(shù)據(jù)結(jié)構(gòu)說(shuō)明說(shuō)明如下:

          并持有一個(gè)關(guān)鍵字段:該定時(shí)任務(wù)的過(guò)期時(shí)間,等于系統(tǒng)當(dāng)前時(shí)間+過(guò)期時(shí)間,在心跳檢測(cè)場(chǎng)景中默認(rèn)為10s。

          繼續(xù)跟蹤SystemTimer的addTimerTaskEntry,其代碼如下:


          addTimerTaskEntry的核心實(shí)現(xiàn)如下:

          • 嘗試將延遲任務(wù)添加到時(shí)間輪,如果已經(jīng)過(guò)期,則提交到線程池,觸發(fā)心跳過(guò)期的邏輯,提交到線程后,DelayedOperation的run方法會(huì)被調(diào)用,最終onExpiration方法被調(diào)用。

          接下來(lái)重點(diǎn)談一下往時(shí)間輪中添加任務(wù)的具體實(shí)現(xiàn),核心代碼見(jiàn)下圖所示:


          核心實(shí)現(xiàn)要點(diǎn):

          Step1:如果任務(wù)已經(jīng)被取消或者已過(guò)期,返回false。如果返回false,則會(huì)觸發(fā)定時(shí)任務(wù)過(guò)期。

          Step2根據(jù)過(guò)期時(shí)間,放入到時(shí)間輪中指定的位置,時(shí)間輪的數(shù)據(jù)結(jié)構(gòu)如下:

          每一個(gè)格代表一個(gè)時(shí)間間隔,例如200ms,當(dāng)前指針指向的格子,代表該格子中的所有任務(wù)過(guò)期,例如現(xiàn)在要要插入一個(gè)700ms過(guò)期,從當(dāng)前指針的下一格開(kāi)始算起,放入第4格中。

          另外時(shí)間輪的總格子有限,則該時(shí)間輪能計(jì)算的最大時(shí)間是有限的,例如一個(gè)8格的時(shí)間輪,每一格代表200ms,則如果要在2s后過(guò)期,顯然這個(gè)時(shí)間輪無(wú)法存儲(chǔ),通常的解決方案是采用多級(jí)時(shí)間輪,另外一級(jí)的時(shí)間輪,其時(shí)間精度會(huì)更粗。

          結(jié)合上述關(guān)于時(shí)間輪的原理,再去看上述代碼,就顯得容易看懂了。

          Step3:就是處理第一級(jí)時(shí)間輪無(wú)法滿(mǎn)足過(guò)期時(shí)間,則放入到第二級(jí)時(shí)間輪中。

          1.2.2.2 驅(qū)動(dòng)時(shí)間輪

          基于時(shí)間輪算法,除了數(shù)據(jù)按找時(shí)間輪到方向、觸發(fā)時(shí)間存儲(chǔ)在合適的刻度量,還需要驅(qū)動(dòng)時(shí)間輪指針。Kafka中的驅(qū)動(dòng)時(shí)間輪入口為:

          具體實(shí)現(xiàn)代碼如下:

          具體就是將指針處的所有任務(wù)全部拉取出來(lái),執(zhí)行addTimeTaskEntry,其中過(guò)期的任務(wù)將提交到線程池觸發(fā)延遲任務(wù)的執(zhí)行。

          上述代碼看起來(lái)比較簡(jiǎn)單,就不一一介紹,為了方便大家讀懂上面的代碼,我們只需要了解一下kafka采用時(shí)間輪的實(shí)際存儲(chǔ)數(shù)據(jù)結(jié)構(gòu),即能很容易理解上述代碼:

          其核心特點(diǎn):環(huán)形隊(duì)列就是一個(gè)數(shù)組,每一個(gè)元素在Kafka中對(duì)應(yīng)一個(gè)桶,每一個(gè)桶存儲(chǔ)一個(gè)TimerTaskList(鏈表),每次指針指向的TimerTaskList,將該鏈表中的元素代表的任務(wù)全部執(zhí)行。

          2、圖解Kafka心跳架構(gòu)設(shè)計(jì)

          讀起源碼來(lái)說(shuō)或許比較枯燥,接下來(lái)給出Kafka心跳處理的圖解,重點(diǎn)是闡述Kafka時(shí)間輪算法的核心數(shù)據(jù)結(jié)構(gòu)。

          — 本文結(jié)束 —


          ●?漫談設(shè)計(jì)模式在 Spring 框架中的良好實(shí)踐

          ●?顛覆微服務(wù)認(rèn)知:深入思考微服務(wù)的七個(gè)主流觀點(diǎn)

          ●?人人都是 API 設(shè)計(jì)者

          ●?一文講透微服務(wù)下如何保證事務(wù)的一致性

          ●?要黑盒測(cè)試微服務(wù)內(nèi)部服務(wù)間調(diào)用,我該如何實(shí)現(xiàn)?



          關(guān)注我,回復(fù) 「加群」 加入各種主題討論群。



          對(duì)「服務(wù)端思維」有期待,請(qǐng)?jiān)谖哪c(diǎn)個(gè)在看

          喜歡這篇文章,歡迎轉(zhuǎn)發(fā)、分享朋友圈


          在看點(diǎn)這里
          瀏覽 70
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  怡红院院大香蕉 | 国产髙清无码播放 | 麻豆传媒md在线观看视频 | 日韩黄色电影网址网站 | 精品国产内射 |