點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”
回復(fù)”學(xué)習(xí)資料“獲取學(xué)習(xí)寶典
文章來(lái)源:https://lxkaka.wang/kafka-rebalance/
消息隊(duì)列是服務(wù)端必不可少的組件,其中Kafka可以說(shuō)是數(shù)一數(shù)二的選擇,對(duì)于大部分服務(wù)端的同學(xué)來(lái)說(shuō)Kafka也是最熟悉的消息中間件之一。而當(dāng)我們?cè)谏a(chǎn)上遇到kafka的使用問(wèn)題時(shí)想要透過(guò)現(xiàn)象看到問(wèn)題的本質(zhì),從而找到解決問(wèn)題的辦法。這就要求對(duì)kafka的設(shè)計(jì)和實(shí)現(xiàn)有這較為深刻的認(rèn)識(shí)。在這篇文章里我們就以生產(chǎn)實(shí)際的例子來(lái)展開(kāi)討論Kafka在消費(fèi)端中的一個(gè)重要設(shè)計(jì)consumer group的rebalance。只有理解了rebalance我們才能對(duì)消息消費(fèi)過(guò)程有著更全面的掌握。
某一天我們收到消費(fèi)端消費(fèi)嚴(yán)重落后生產(chǎn)的告警。第一時(shí)間相關(guān)同學(xué)去看了consumer group的消費(fèi)曲線監(jiān)控,消費(fèi)速率明顯出現(xiàn)異常。下面這張示意圖展示了這種情況。
我們能清楚的看到整個(gè)消費(fèi)組在消費(fèi)異常的時(shí)間段內(nèi)經(jīng)常出現(xiàn)消費(fèi)停滯的情況如圖上消費(fèi)速率為0。
為什么消費(fèi)會(huì)卡主呢?同時(shí)去看了相關(guān)服務(wù)的日志看到很多err kafka data maybe rebalancing。看了這篇文章后消費(fèi)卡主的問(wèn)題自然就知道答案了。
為了說(shuō)清楚 rebalance 有必要把最相關(guān)的重要概念回顧一下
| Consumer Group
consumer group是kafka提供的可擴(kuò)展且具有容錯(cuò)性的消費(fèi)者機(jī)制。既然是一個(gè)組,那么組內(nèi)必然可以有多個(gè)消費(fèi)者或消費(fèi)者實(shí)例(consumer instance),它們共享一個(gè)公共的 ID,即group ID。
組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來(lái)消費(fèi)topic下的所有分區(qū)。總結(jié)一下就是以下幾個(gè)關(guān)鍵點(diǎn)。
1. consumer group下可以有一個(gè)或多個(gè)consumer instance,consumer instance可以是一個(gè)進(jìn)程,也可以是一個(gè)線程
2. group.id是一個(gè)字符串,唯一標(biāo)識(shí)一個(gè)consumer group
3. consumer group訂閱的topic下的每個(gè)分區(qū)只能分配給某個(gè)group下的一個(gè)consumer (當(dāng)然該分區(qū)還可以被分配給其他group)

| Coordinator
Group Coordinator是一個(gè)服務(wù),每個(gè)Broker在啟動(dòng)的時(shí)候都會(huì)啟動(dòng)一個(gè)該服務(wù)。Group Coordinator的作用是用來(lái)存儲(chǔ)Group的相關(guān)Meta信息,并將對(duì)應(yīng)Partition的 Offset信息記錄到Kafka內(nèi)置Topic(__consumer_offsets)中。每個(gè)Group都會(huì)選擇一個(gè)Coordinator來(lái)完成自己組內(nèi)各Partition的Offset信息,選擇的規(guī)則如下:1. 計(jì)算Group對(duì)應(yīng)在__consumer_offsets 上的Partition2. 根據(jù)對(duì)應(yīng)的Partition尋找該P(yáng)artition的leader所對(duì)應(yīng)的Broker,該Broker上的 Group Coordinator即就是該Group的Coordinatorpartition-Id(__consumer_offsets) = Math.abs(groupId.hashCode()%groupMetadataTopicPartitionCount)
其中g(shù)roupMetadataTopicPartitionCount對(duì)應(yīng)offsets.topic.num.partitions參數(shù)值,默認(rèn)值是50個(gè)分區(qū)。
我們知道topic的partition已經(jīng)根據(jù)策略分配給了consumer group下的各個(gè)consumer。
那么當(dāng)有新的consumer加入或者老的consumer離開(kāi)這個(gè)partition與consumer的分配關(guān)系就會(huì)發(fā)生變化,如果這個(gè)時(shí)候不進(jìn)行重新調(diào)配,就可能出現(xiàn)新consumer無(wú)partition消費(fèi)或者有partition無(wú)消費(fèi)者的情況。
那么這個(gè)重新調(diào)配指的就是consumer和partition rebalance。
consumer默認(rèn)提供了2種分配策略:range策略:將單個(gè)topic的分區(qū)按順序排列,然后把這些分區(qū)劃分成固定大小的分區(qū)段并依次分配給每個(gè)consumer。round-robin:把topi c的所有分區(qū)按順序排開(kāi),以輪詢的方式配給每個(gè)consumer。下面給一個(gè)簡(jiǎn)單的例子,假設(shè)目前某個(gè)consumer group A有2個(gè)consumer C1和C2,當(dāng)C3加入時(shí),觸發(fā)了rebalance條件,coordinator會(huì)進(jìn)行rebalance,根據(jù)range策略重新分配了partition。
Rebalance在以下情況會(huì)觸發(fā)
1. consume group中的成員個(gè)數(shù)發(fā)生變化。例如有新的consumer實(shí)例加入該消費(fèi)組或者離開(kāi)組2. 訂閱Topic的分區(qū)數(shù)發(fā)生變化
kafka中的重要設(shè)計(jì)也會(huì)隨著版本的升級(jí)而優(yōu)化。rebalance也不例外,這里我們介紹的kafka rebalance流程以我們的線上版本1.1.1為例。
1. 當(dāng)前consumer準(zhǔn)備加入consumer group或GroupCoordinator發(fā)生故障轉(zhuǎn)移時(shí),consumer并不知道GroupCoordinator的host和port,所以consumer會(huì)向Kafka集群中的任一broker節(jié)點(diǎn)發(fā)送FindCoordinatorRequest請(qǐng)求,收到請(qǐng)求的 broker 節(jié)點(diǎn)會(huì)返回ConsumerMetadataResponse響應(yīng),其中就包含了負(fù)責(zé)管理該Consumer Group的GroupCoordinator的地址2. 當(dāng)consumer通過(guò)FindCoordinatorRequest查找到其Consumer Group對(duì)應(yīng)的 GroupCoordinator之后,就會(huì)進(jìn)入Join Group階段3. Consumer先向GroupCoordinator發(fā)送JoinGroupRequest請(qǐng)求,其中包含consumer的相關(guān)信息4. GroupCoordinator收到JoinGroupRequest后會(huì)暫存該 consumer信息,然后等待全部consumer的JoinGroupRequest請(qǐng)求。JoinGroupRequest中的session.timeout.ms和rebalance_timeout_ms(max.poll.interval.ms)決定了consumer如果沒(méi)有響應(yīng)過(guò)多久會(huì)被踢出出組
5. GroupCoordinator會(huì)根據(jù)全部consumer的JoinGroupRequest請(qǐng)求來(lái)確定 Consumer Group中可用的consumer,從中選取一個(gè)consumer成為Group Leader,同時(shí)還會(huì)決定partition分配策略,最后會(huì)將這些信息封裝成JoinGroupResponse?返回給Group Leader Consumer6. 每個(gè)consumer 都會(huì)收到JoinGroupResponse?響應(yīng),但是只有Group Leader 收到的JoinGroupResponse響應(yīng)中封裝的所有consumer 信息以及Group Leader信息。當(dāng)其中一個(gè)consumer確定了自己的Group Leader后,會(huì)根據(jù)consumer 信息、kafka 集群元數(shù)據(jù)以及partition分配策略計(jì)算partition的分片結(jié)果。其他非Group Leader consumer收到 JoinResponse為空響應(yīng),也就不會(huì)進(jìn)行任何操作,只是原地等待
7. 接下來(lái),所有consumer進(jìn)入Synchronizing Group State階段,所有consumer會(huì)向GroupCoordinator發(fā)送SyncGroupRequest。Group Leader Consumer的SyncGroupRequest請(qǐng)求包含了partition分配結(jié)果,普通consumer的SyncGroupRequest為空請(qǐng)求
8. GroupCoordinator接下來(lái)會(huì)將partition分配結(jié)果封裝成SyncGroupResponse返回給所有consumer, consumer收到SyncGroupResponse后進(jìn)行解析,就可以明確partition與consumer的映射關(guān)系
9. 后續(xù)consumer還是會(huì)與GroupCoordinator保持定期的心跳(heartbeat.interval.ms)。當(dāng)rebalance正在進(jìn)行中coordinator會(huì)通過(guò)hearbeat response告訴consumers是否要rejoin group觸發(fā)。即心跳響應(yīng)中包含 IllegalGeneration異常
在整個(gè)rebalance的過(guò)程中,所有partition都會(huì)被回收,consumer是無(wú)法消費(fèi)任何 partition的。Join階段會(huì)等待原先組內(nèi)存活的成員發(fā)送JoinGroupRequest過(guò)來(lái),如果原先組內(nèi)的成員因?yàn)闃I(yè)務(wù)處理一直沒(méi)有發(fā)送請(qǐng)求過(guò)來(lái),服務(wù)端就會(huì)一直等待,直到超時(shí)。這個(gè)超時(shí)時(shí)間就是max.poll.interval.ms的值,默認(rèn)是5分鐘,因此這種情況下rebalance的耗時(shí)就會(huì)長(zhǎng)達(dá)5分鐘,導(dǎo)致所有消費(fèi)者都無(wú)法進(jìn)行正常消費(fèi),這對(duì)生產(chǎn)來(lái)說(shuō)是個(gè)很大的問(wèn)題。
| Static Membership
為了減少因?yàn)閏onsumer短暫不可用造成的rebalance,kafka在2.3版本中引入了Static Membership。Static Membership優(yōu)化的核心是:1. 在consumer端增加group.instance.id配置(group.instance.id 是 consumer 的唯一標(biāo)識(shí))。如果consumer啟動(dòng)的時(shí)候明確指定了group.instance.id配置值,consumer會(huì)JoinGroup Request中攜帶該值,表示該consumer為static member。為了保證group.instance.id的唯一性,我們可以考慮使用hostname、ip等。2. 在GroupCoordinator端會(huì)記錄group.instance.id → member.id的映射關(guān)系,以及已有的 partition 分配關(guān)系。當(dāng) GroupCoordinator 收到已知 group.instance.id 的 consumer 的 JoinGroup Request 時(shí),不會(huì)進(jìn)行 rebalance,而是將其原來(lái)對(duì)應(yīng)的 partition 分配給它。Static Membership 可以讓 consumer group 只在下面的 4 種情況下進(jìn)行 rebalance:- 有新consumer加入consumer group-?Group Leader重新加入Group時(shí)-?consumer下線時(shí)間超過(guò)閾值-?GroupCoordinator收到static member的這樣的話,在使用Static Membership場(chǎng)景下,只要在consumer重新啟動(dòng)的時(shí)候,不發(fā)送LeaveGroup Request且在session.timeout.ms時(shí)長(zhǎng)內(nèi)重啟成功,就不會(huì)觸發(fā)rebalance。所以,這里推薦設(shè)置一個(gè)足夠consumer重啟的時(shí)長(zhǎng)session.timeout.ms,這樣能有效降低因consumer短暫不可用導(dǎo)致的reblance次數(shù)。
| Incremental Cooperative Rebalancing
從名字中我們就能看出這個(gè)版本的rebalance過(guò)程兩個(gè)關(guān)鍵詞增量和協(xié)作,增量指的是原先版本的rebalance被分解成了多次小規(guī)模的rebalance, 協(xié)作自然指的是consumer 之間的關(guān)系。核心思想:
1. consumer比較新舊兩個(gè)partition分配結(jié)果,只停止消費(fèi)回收(revoke)的partition,對(duì)于兩次都分配給自己的partition,consumer不需要停止消費(fèi)
2. 通過(guò)多輪的局部 rebalance 來(lái)最終實(shí)現(xiàn)全局的 rebalance
我們以文章開(kāi)始的例子來(lái)理解一下這個(gè)版本的改進(jìn)
首先C1 -> {P0, p3}C2 -> {P1} C3 -> {P2}這是consumer和partition的分配關(guān)系,我們假設(shè)C2宕機(jī)超過(guò)了session.timeout.ms, 此時(shí)GroupCoordinator會(huì)觸發(fā)第一輪 rebalance
| 第一輪 Rebalance
1. GroupCoordinator會(huì)在下一輪心跳響應(yīng)中通知C1和C3發(fā)起第一輪rebalance
2. C1和C3會(huì)將自己當(dāng)前正在處理的partition信息封裝到JoinGroupRequest中(metadata字段)發(fā)往GroupCoordinator:
- C1 發(fā)送的?JoinGroupRequest(assigned: P0、P3)
- C3 發(fā)送的?JoinGroupRequest(assigned: P2)
3. 假設(shè)GroupCoordinator在這里選擇1作為Group Leader,GroupCoordinator會(huì)將 partition目前的分配狀態(tài)通過(guò)JoinGroupResponse發(fā)送給C1
4. C1發(fā)現(xiàn)P1并未出現(xiàn)(處于lost狀態(tài)),此時(shí)C1并不會(huì)立即解決當(dāng)前的不平衡問(wèn)題,返回的partition分配結(jié)果不變(同時(shí)會(huì)攜帶一個(gè)delay時(shí)間,scheduled.rebalance.max.delay.ms,默認(rèn)5分鐘)。
GroupCoordinator據(jù)C1的SyncGroupRequest,生成SyncGroupResponse返回給兩個(gè)存活的consumer
- C1 收到的SyncGroup Response(delay,assigned: P0、P3,revoked:)
- C3 收到的SyncGroup Response(delay,assigned: P2,revoked:)
到此為止,第一輪rebalance結(jié)束。整個(gè)rebalance過(guò)程中,C1和C3并不會(huì)停止消費(fèi)。
|?第二輪 Rebalance
1. 在scheduled.rebalance.max.delay.ms這個(gè)時(shí)間段內(nèi),C2故障恢復(fù),重新加入到 consumer group時(shí),會(huì)向GroupCoordinator發(fā)送JoinGroup Request,觸發(fā)第二輪的rebalance。
GroupCoordinator在下一次心跳響應(yīng)中會(huì)通知C1和C3參與第二輪rebalance
2. C1和C3收到心跳后,發(fā)送JoinGroupRequest參與第二輪rebalance:
- C1 發(fā)送的JoinGroupRequest(assigned: P0、P3)
- C3 發(fā)送的JoinGroupRequest(assigned: P2)
3. 本輪中,C1依舊被選為Group Leader,它檢查delay時(shí)間(scheduled.rebalance.max.delay.ms)是否已經(jīng)到了,如果沒(méi)到,則依舊不會(huì)立即解決當(dāng)前的不平衡問(wèn)題,繼續(xù)返回目前的分配結(jié)果,并且返回的SyncGroupResponse 中更新了delay的剩余時(shí)間(remaining delay = delay - pass_time) 到此為止,第二輪 rebalance結(jié)束。
整個(gè)rebalance過(guò)程中,C1和C3并不會(huì)停止消費(fèi)。
-?C1收到的SyncGroup Response(remaining delay,assigned:P0、P3,revoked:)
-?C2收到的SyncGroup Response(remaining delay,assigned:,revoked:)
-?C3收到的SyncGroup Response(remaining delay,assigned:P2,revoked:)
| 第三輪 Rebalance
1. 當(dāng)remaining delay時(shí)間到期之后,consumer全部重新送JoinGroupRequest,觸發(fā)第三輪rebalance
- C1 發(fā)送的JoinGroupRequest(assigned: P0、P3)
-?C2 發(fā)送的JoinGroupRequest(assigned: )
-??C3 發(fā)送的JoinGroupRequest(assigned: P2)
2. 在此次rebalance中,C1依舊被選為Group Leader,它會(huì)發(fā)現(xiàn)delay已經(jīng)到期了,開(kāi)始解決不平衡的問(wèn)題,對(duì)partition進(jìn)行重新分配。最新的分配結(jié)果最終通過(guò)SyncGroupResponse返回到各個(gè)consumer:
到此為止,第三輪rebalance結(jié)束。整個(gè)rebalance過(guò)程中,C1和C3的消費(fèi)都不會(huì)停止
- C1收到的SyncGroup Response(assigned:P0、P3,revoked:)
- C2收到的SyncGroup Response(assigned:P1,revoked:)
- C3收到的SyncGroup Response(assigned:P2,revoked:)
下面這張圖展示了上述的Rebalance過(guò)程

通過(guò)上述我們應(yīng)該對(duì)Kafka的Rebalance有了比較完整的認(rèn)識(shí)。我們現(xiàn)在來(lái)回答文章開(kāi)始提出的消費(fèi)卡主問(wèn)題:消費(fèi)端拿到了異常的消息,這樣的消息業(yè)務(wù)上處理時(shí)間過(guò)超過(guò)了max.poll.interval.ms, 從而觸發(fā)了rebalance, 在rebalance過(guò)程中所有消費(fèi)者都暫停了消費(fèi)。為了解決這個(gè)問(wèn)題我們首先優(yōu)化業(yè)務(wù)邏輯盡可能提高處理消息速度,對(duì)異常消息做特殊處理;然后合理的設(shè)置max.poll.interval.ms cover住業(yè)務(wù)的處理時(shí)間。為了盡可能減少Rebalance次數(shù)我們也要注意設(shè)置session.timeout.ms和heartbeat.interval.ms的值。一種推薦的方案session.timeout.ms >= 3 * heartbeat.interval.ms, 比如session.timeout.ms = 6s; heartbeat.interval.ms = 2s。這樣consumer如果宕機(jī)且6s之內(nèi)未恢復(fù), Coordinator能夠較快地定位已經(jīng)掛掉的 consumer,把它踢出Group。-------------? END??-------------掃描下方二維碼,加入技術(shù)群。暗號(hào):加群