從一個消費(fèi)慢的例子深入理解 kafka rebalance
點(diǎn)擊關(guān)注公眾號,Java干貨及時送達(dá)??

文章來源:https://lxkaka.wang/kafka-rebalance/
前 言
我們能清楚的看到整個消費(fèi)組在消費(fèi)異常的時間段內(nèi)經(jīng)常出現(xiàn)消費(fèi)停滯的情況如圖上消費(fèi)速率為0。
為什么消費(fèi)會卡主呢?同時去看了相關(guān)服務(wù)的日志看到很多err kafka data maybe rebalancing。看了這篇文章后消費(fèi)卡主的問題自然就知道答案了。
重要概念
為了說清楚 rebalance 有必要把最相關(guān)的重要概念回顧一下
| Consumer Group
consumer group是kafka提供的可擴(kuò)展且具有容錯性的消費(fèi)者機(jī)制。既然是一個組,那么組內(nèi)必然可以有多個消費(fèi)者或消費(fèi)者實(shí)例(consumer instance),它們共享一個公共的 ID,即group ID。
1. consumer group下可以有一個或多個consumer instance,consumer instance可以是一個進(jìn)程,也可以是一個線程
2. group.id是一個字符串,唯一標(biāo)識一個consumer group
3. consumer group訂閱的topic下的每個分區(qū)只能分配給某個group下的一個consumer (當(dāng)然該分區(qū)還可以被分配給其他group)

| Coordinator
partition-Id(__consumer_offsets) = Math.abs(groupId.hashCode()%groupMetadataTopicPartitionCount)
Rebalance 目的
我們知道topic的partition已經(jīng)根據(jù)策略分配給了consumer group下的各個consumer。
那么當(dāng)有新的consumer加入或者老的consumer離開這個partition與consumer的分配關(guān)系就會發(fā)生變化,如果這個時候不進(jìn)行重新調(diào)配,就可能出現(xiàn)新consumer無partition消費(fèi)或者有partition無消費(fèi)者的情況。
那么這個重新調(diào)配指的就是consumer和partition rebalance。

Rebalance 時機(jī)
Rebalance在以下情況會觸發(fā)
Rebalance 過程
kafka中的重要設(shè)計(jì)也會隨著版本的升級而優(yōu)化。rebalance也不例外,這里我們介紹的kafka rebalance流程以我們的線上版本1.1.1為例。





Rebalance 問題
Rebalance 改進(jìn)
| Static Membership
這樣的話,在使用Static Membership場景下,只要在consumer重新啟動的時候,不發(fā)送LeaveGroup Request且在session.timeout.ms時長內(nèi)重啟成功,就不會觸發(fā)rebalance。所以,這里推薦設(shè)置一個足夠consumer重啟的時長session.timeout.ms,這樣能有效降低因consumer短暫不可用導(dǎo)致的reblance次數(shù)。
| Incremental Cooperative Rebalancing
從名字中我們就能看出這個版本的rebalance過程兩個關(guān)鍵詞增量和協(xié)作,增量指的是原先版本的rebalance被分解成了多次小規(guī)模的rebalance, 協(xié)作自然指的是consumer 之間的關(guān)系。核心思想:
1. consumer比較新舊兩個partition分配結(jié)果,只停止消費(fèi)回收(revoke)的partition,對于兩次都分配給自己的partition,consumer不需要停止消費(fèi)
2. 通過多輪的局部 rebalance 來最終實(shí)現(xiàn)全局的 rebalance
我們以文章開始的例子來理解一下這個版本的改進(jìn)
首先C1 -> {P0, p3}C2 -> {P1} C3 -> {P2}這是consumer和partition的分配關(guān)系,我們假設(shè)C2宕機(jī)超過了session.timeout.ms, 此時GroupCoordinator會觸發(fā)第一輪 rebalance
| 第一輪 Rebalance
1. GroupCoordinator會在下一輪心跳響應(yīng)中通知C1和C3發(fā)起第一輪rebalance
2. C1和C3會將自己當(dāng)前正在處理的partition信息封裝到JoinGroupRequest中(metadata字段)發(fā)往GroupCoordinator:
- C1 發(fā)送的?JoinGroupRequest(assigned: P0、P3)
- C3 發(fā)送的?JoinGroupRequest(assigned: P2)
3. 假設(shè)GroupCoordinator在這里選擇1作為Group Leader,GroupCoordinator會將 partition目前的分配狀態(tài)通過JoinGroupResponse發(fā)送給C1
4. C1發(fā)現(xiàn)P1并未出現(xiàn)(處于lost狀態(tài)),此時C1并不會立即解決當(dāng)前的不平衡問題,返回的partition分配結(jié)果不變(同時會攜帶一個delay時間,scheduled.rebalance.max.delay.ms,默認(rèn)5分鐘)。
GroupCoordinator據(jù)C1的SyncGroupRequest,生成SyncGroupResponse返回給兩個存活的consumer
- C1 收到的SyncGroup Response(delay,assigned: P0、P3,revoked:)
- C3 收到的SyncGroup Response(delay,assigned: P2,revoked:)
到此為止,第一輪rebalance結(jié)束。整個rebalance過程中,C1和C3并不會停止消費(fèi)。
|?第二輪 Rebalance
1. 在scheduled.rebalance.max.delay.ms這個時間段內(nèi),C2故障恢復(fù),重新加入到 consumer group時,會向GroupCoordinator發(fā)送JoinGroup Request,觸發(fā)第二輪的rebalance。
GroupCoordinator在下一次心跳響應(yīng)中會通知C1和C3參與第二輪rebalance
2. C1和C3收到心跳后,發(fā)送JoinGroupRequest參與第二輪rebalance:
- C1 發(fā)送的JoinGroupRequest(assigned: P0、P3)
- C3 發(fā)送的JoinGroupRequest(assigned: P2)
3. 本輪中,C1依舊被選為Group Leader,它檢查delay時間(scheduled.rebalance.max.delay.ms)是否已經(jīng)到了,如果沒到,則依舊不會立即解決當(dāng)前的不平衡問題,繼續(xù)返回目前的分配結(jié)果,并且返回的SyncGroupResponse 中更新了delay的剩余時間(remaining delay = delay - pass_time) 到此為止,第二輪 rebalance結(jié)束。
整個rebalance過程中,C1和C3并不會停止消費(fèi)。
-?C1收到的SyncGroup Response(remaining delay,assigned:P0、P3,revoked:)
-?C2收到的SyncGroup Response(remaining delay,assigned:,revoked:)
-?C3收到的SyncGroup Response(remaining delay,assigned:P2,revoked:)
| 第三輪 Rebalance
1. 當(dāng)remaining delay時間到期之后,consumer全部重新送JoinGroupRequest,觸發(fā)第三輪rebalance
- C1 發(fā)送的JoinGroupRequest(assigned: P0、P3)
-?C2 發(fā)送的JoinGroupRequest(assigned: )
-?C3 發(fā)送的JoinGroupRequest(assigned: P2)
2. 在此次rebalance中,C1依舊被選為Group Leader,它會發(fā)現(xiàn)delay已經(jīng)到期了,開始解決不平衡的問題,對partition進(jìn)行重新分配。最新的分配結(jié)果最終通過SyncGroupResponse返回到各個consumer:
到此為止,第三輪rebalance結(jié)束。整個rebalance過程中,C1和C3的消費(fèi)都不會停止
- C1收到的SyncGroup Response(assigned:P0、P3,revoked:)
- C2收到的SyncGroup Response(assigned:P1,revoked:)
- C3收到的SyncGroup Response(assigned:P2,revoked:)
下面這張圖展示了上述的Rebalance過程
1.?JetBrains 宣布:IntelliJ 平臺徹底停用 Log4j 組件,建議切換至 java.util.logging
最近面試BAT,整理一份面試資料《Java面試BATJ通關(guān)手冊》,覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫、數(shù)據(jù)結(jié)構(gòu)等等。
獲取方式:點(diǎn)“在看”,關(guān)注公眾號并回復(fù)?Java?領(lǐng)取,更多內(nèi)容陸續(xù)奉上。
PS:因公眾號平臺更改了推送規(guī)則,如果不想錯過內(nèi)容,記得讀完點(diǎn)一下“在看”,加個“星標(biāo)”,這樣每次新文章推送才會第一時間出現(xiàn)在你的訂閱列表里。
點(diǎn)“在看”支持小哈呀,謝謝啦??

