<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          從一個消費(fèi)慢的例子深入理解 kafka rebalance

          共 8181字,需瀏覽 17分鐘

           ·

          2022-02-21 21:24

          點(diǎn)擊關(guān)注公眾號,Java干貨及時送達(dá)??

          文章來源:https://lxkaka.wang/kafka-rebalance/


          前 言



          消息隊(duì)列是服務(wù)端必不可少的組件,其中Kafka可以說是數(shù)一數(shù)二的選擇,對于大部分服務(wù)端的同學(xué)來說Kafka也是最熟悉的消息中間件之一。而當(dāng)我們在生產(chǎn)上遇到kafka的使用問題時想要透過現(xiàn)象看到問題的本質(zhì),從而找到解決問題的辦法。
          這就要求對kafka的設(shè)計(jì)和實(shí)現(xiàn)有這較為深刻的認(rèn)識。在這篇文章里我們就以生產(chǎn)實(shí)際的例子來展開討論Kafka在消費(fèi)端中的一個重要設(shè)計(jì)consumer group的rebalance。
          只有理解了rebalance我們才能對消息消費(fèi)過程有著更全面的掌握。
          某一天我們收到消費(fèi)端消費(fèi)嚴(yán)重落后生產(chǎn)的告警。第一時間相關(guān)同學(xué)去看了consumer group的消費(fèi)曲線監(jiān)控,消費(fèi)速率明顯出現(xiàn)異常。下面這張示意圖展示了這種情況。

          我們能清楚的看到整個消費(fèi)組在消費(fèi)異常的時間段內(nèi)經(jīng)常出現(xiàn)消費(fèi)停滯的情況如圖上消費(fèi)速率為0。

          為什么消費(fèi)會卡主呢?同時去看了相關(guān)服務(wù)的日志看到很多err kafka data maybe rebalancing。看了這篇文章后消費(fèi)卡主的問題自然就知道答案了。


          重要概念



          為了說清楚 rebalance 有必要把最相關(guān)的重要概念回顧一下

          | Consumer Group

          consumer group是kafka提供的可擴(kuò)展且具有容錯性的消費(fèi)者機(jī)制。既然是一個組,那么組內(nèi)必然可以有多個消費(fèi)者或消費(fèi)者實(shí)例(consumer instance),它們共享一個公共的 ID,即group ID。

          組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來消費(fèi)topic下的所有分區(qū)。總結(jié)一下就是以下幾個關(guān)鍵點(diǎn)。

          1. consumer group下可以有一個或多個consumer instance,consumer instance可以是一個進(jìn)程,也可以是一個線程

          2. group.id是一個字符串,唯一標(biāo)識一個consumer group

          3. consumer group訂閱的topic下的每個分區(qū)只能分配給某個group下的一個consumer (當(dāng)然該分區(qū)還可以被分配給其他group)

          | Coordinator

          Group Coordinator是一個服務(wù),每個Broker在啟動的時候都會啟動一個該服務(wù)。Group Coordinator的作用是用來存儲Group的相關(guān)Meta信息,并將對應(yīng)Partition的 Offset信息記錄到Kafka內(nèi)置Topic(__consumer_offsets)中。
          每個Group都會選擇一個Coordinator來完成自己組內(nèi)各Partition的Offset信息,選擇的規(guī)則如下:
          1. 計(jì)算Group對應(yīng)在__consumer_offsets 上的Partition
          2. 根據(jù)對應(yīng)的Partition尋找該P(yáng)artition的leader所對應(yīng)的Broker,該Broker上的 Group Coordinator即就是該Group的Coordinator
          Partition計(jì)算規(guī)則:

          partition-Id(__consumer_offsets) = Math.abs(groupId.hashCode()%groupMetadataTopicPartitionCount)

          其中g(shù)roupMetadataTopicPartitionCount對應(yīng)offsets.topic.num.partitions參數(shù)值,默認(rèn)值是50個分區(qū)。


          Rebalance 目的



          我們知道topic的partition已經(jīng)根據(jù)策略分配給了consumer group下的各個consumer。

          那么當(dāng)有新的consumer加入或者老的consumer離開這個partition與consumer的分配關(guān)系就會發(fā)生變化,如果這個時候不進(jìn)行重新調(diào)配,就可能出現(xiàn)新consumer無partition消費(fèi)或者有partition無消費(fèi)者的情況。

          那么這個重新調(diào)配指的就是consumer和partition rebalance。

          consumer默認(rèn)提供了2種分配策略:
          range策略:將單個topic的分區(qū)按順序排列,然后把這些分區(qū)劃分成固定大小的分區(qū)段并依次分配給每個consumer。
          round-robin:把topi c的所有分區(qū)按順序排開,以輪詢的方式配給每個consumer。
          下面給一個簡單的例子,假設(shè)目前某個consumer group A有2個consumer C1和C2,當(dāng)C3加入時,觸發(fā)了rebalance條件,coordinator會進(jìn)行rebalance,根據(jù)range策略重新分配了partition。


          Rebalance 時機(jī)



          Rebalance在以下情況會觸發(fā)

          1. consume group中的成員個數(shù)發(fā)生變化。例如有新的consumer實(shí)例加入該消費(fèi)組或者離開組
          2. 訂閱Topic的分區(qū)數(shù)發(fā)生變化
          3. 取消訂閱Topic或新增訂閱Topic


          Rebalance 過程



          kafka中的重要設(shè)計(jì)也會隨著版本的升級而優(yōu)化。rebalance也不例外,這里我們介紹的kafka rebalance流程以我們的線上版本1.1.1為例。

          1. 當(dāng)前consumer準(zhǔn)備加入consumer group或GroupCoordinator發(fā)生故障轉(zhuǎn)移時,consumer并不知道GroupCoordinator的host和port,所以consumer會向Kafka集群中的任一broker節(jié)點(diǎn)發(fā)送FindCoordinatorRequest請求,
          收到請求的 broker 節(jié)點(diǎn)會返回ConsumerMetadataResponse響應(yīng),其中就包含了負(fù)責(zé)管理該Consumer Group的GroupCoordinator的地址
          2. 當(dāng)consumer通過FindCoordinatorRequest查找到其Consumer Group對應(yīng)的 GroupCoordinator之后,就會進(jìn)入Join Group階段
          3. Consumer先向GroupCoordinator發(fā)送JoinGroupRequest請求,其中包含consumer的相關(guān)信息
          4. GroupCoordinator收到JoinGroupRequest后會暫存該 consumer信息,然后等待全部consumer的JoinGroupRequest請求。
          JoinGroupRequest中的session.timeout.ms和rebalance_timeout_ms(max.poll.interval.ms)決定了consumer如果沒有響應(yīng)過多久會被踢出出組

          5. GroupCoordinator會根據(jù)全部consumer的JoinGroupRequest請求來確定 Consumer Group中可用的consumer,從中選取一個consumer成為Group Leader,
          同時還會決定partition分配策略,最后會將這些信息封裝成JoinGroupResponse?返回給Group Leader Consumer
          6. 每個consumer 都會收到JoinGroupResponse?響應(yīng),但是只有Group Leader 收到的JoinGroupResponse響應(yīng)中封裝的所有consumer 信息以及Group Leader信息。
          當(dāng)其中一個consumer確定了自己的Group Leader后,會根據(jù)consumer 信息、kafka 集群元數(shù)據(jù)以及partition分配策略計(jì)算partition的分片結(jié)果。
          其他非Group Leader consumer收到 JoinResponse為空響應(yīng),也就不會進(jìn)行任何操作,只是原地等待

          7. 接下來,所有consumer進(jìn)入Synchronizing Group State階段,所有consumer會向GroupCoordinator發(fā)送SyncGroupRequest
          Group Leader Consumer的SyncGroupRequest請求包含了partition分配結(jié)果,普通consumer的SyncGroupRequest為空請求

          8. GroupCoordinator接下來會將partition分配結(jié)果封裝成SyncGroupResponse返回給所有consumer, consumer收到SyncGroupResponse后進(jìn)行解析,就可以明確partition與consumer的映射關(guān)系

          9. 后續(xù)consumer還是會與GroupCoordinator保持定期的心跳(heartbeat.interval.ms)。當(dāng)rebalance正在進(jìn)行中coordinator會通過hearbeat response告訴consumers是否要rejoin group觸發(fā)。即心跳響應(yīng)中包含 IllegalGeneration異常


          Rebalance 問題



          在整個rebalance的過程中,所有partition都會被回收,consumer是無法消費(fèi)任何 partition的。Join階段會等待原先組內(nèi)存活的成員發(fā)送JoinGroupRequest過來,如果原先組內(nèi)的成員因?yàn)闃I(yè)務(wù)處理一直沒有發(fā)送請求過來,服務(wù)端就會一直等待,直到超時。
          這個超時時間就是max.poll.interval.ms的值,默認(rèn)是5分鐘,因此這種情況下rebalance的耗時就會長達(dá)5分鐘,導(dǎo)致所有消費(fèi)者都無法進(jìn)行正常消費(fèi),這對生產(chǎn)來說是個很大的問題。


          Rebalance 改進(jìn)



          | Static Membership

          為了減少因?yàn)閏onsumer短暫不可用造成的rebalance,kafka在2.3版本中引入了Static Membership。
          Static Membership優(yōu)化的核心是:
          1. 在consumer端增加group.instance.id配置(group.instance.id 是 consumer 的唯一標(biāo)識)。如果consumer啟動的時候明確指定了group.instance.id配置值,consumer會JoinGroup Request中攜帶該值,表示該consumer為static member。為了保證group.instance.id的唯一性,我們可以考慮使用hostname、ip等。
          2. 在GroupCoordinator端會記錄group.instance.id → member.id的映射關(guān)系,以及已有的 partition 分配關(guān)系。
          當(dāng) GroupCoordinator 收到已知 group.instance.id 的 consumer 的 JoinGroup Request 時,不會進(jìn)行 rebalance,而是將其原來對應(yīng)的 partition 分配給它。
          Static Membership 可以讓 consumer group 只在下面的 4 種情況下進(jìn)行 rebalance:
          - 有新consumer加入consumer group
          -?Group Leader重新加入Group時
          -?consumer下線時間超過閾值
          ? ?session.timeout.ms
          -?GroupCoordinator收到static member的
          ? ?LeaveGroup Request

          這樣的話,在使用Static Membership場景下,只要在consumer重新啟動的時候,不發(fā)送LeaveGroup Request且在session.timeout.ms時長內(nèi)重啟成功,就不會觸發(fā)rebalance。所以,這里推薦設(shè)置一個足夠consumer重啟的時長session.timeout.ms,這樣能有效降低因consumer短暫不可用導(dǎo)致的reblance次數(shù)。

          | Incremental Cooperative Rebalancing

          從名字中我們就能看出這個版本的rebalance過程兩個關(guān)鍵詞增量和協(xié)作,增量指的是原先版本的rebalance被分解成了多次小規(guī)模的rebalance, 協(xié)作自然指的是consumer 之間的關(guān)系。核心思想:

          1. consumer比較新舊兩個partition分配結(jié)果,只停止消費(fèi)回收(revoke)的partition,對于兩次都分配給自己的partition,consumer不需要停止消費(fèi)

          2. 通過多輪的局部 rebalance 來最終實(shí)現(xiàn)全局的 rebalance

          我們以文章開始的例子來理解一下這個版本的改進(jìn)

          首先C1 -> {P0, p3}C2 -> {P1} C3 -> {P2}這是consumer和partition的分配關(guān)系,我們假設(shè)C2宕機(jī)超過了session.timeout.ms, 此時GroupCoordinator會觸發(fā)第一輪 rebalance

          | 第一輪 Rebalance

          1. GroupCoordinator會在下一輪心跳響應(yīng)中通知C1和C3發(fā)起第一輪rebalance

          2. C1和C3會將自己當(dāng)前正在處理的partition信息封裝到JoinGroupRequest中(metadata字段)發(fā)往GroupCoordinator:

          - C1 發(fā)送的?JoinGroupRequest(assigned: P0、P3)

          - C3 發(fā)送的?JoinGroupRequest(assigned: P2)

          3. 假設(shè)GroupCoordinator在這里選擇1作為Group Leader,GroupCoordinator會將 partition目前的分配狀態(tài)通過JoinGroupResponse發(fā)送給C1

          4. C1發(fā)現(xiàn)P1并未出現(xiàn)(處于lost狀態(tài)),此時C1并不會立即解決當(dāng)前的不平衡問題,返回的partition分配結(jié)果不變(同時會攜帶一個delay時間,scheduled.rebalance.max.delay.ms,默認(rèn)5分鐘)。

          GroupCoordinator據(jù)C1的SyncGroupRequest,生成SyncGroupResponse返回給兩個存活的consumer

          - C1 收到的SyncGroup Response(delay,assigned: P0、P3,revoked:

          - C3 收到的SyncGroup Response(delay,assigned: P2,revoked:

          到此為止,第一輪rebalance結(jié)束。整個rebalance過程中,C1和C3并不會停止消費(fèi)。

          |?第二輪 Rebalance

          1. 在scheduled.rebalance.max.delay.ms這個時間段內(nèi),C2故障恢復(fù),重新加入到 consumer group時,會向GroupCoordinator發(fā)送JoinGroup Request,觸發(fā)第二輪的rebalance。

          GroupCoordinator在下一次心跳響應(yīng)中會通知C1和C3參與第二輪rebalance

          2. C1和C3收到心跳后,發(fā)送JoinGroupRequest參與第二輪rebalance:

          - C1 發(fā)送的JoinGroupRequest(assigned: P0、P3)

          - C3 發(fā)送的JoinGroupRequest(assigned: P2)

          3. 本輪中,C1依舊被選為Group Leader,它檢查delay時間(scheduled.rebalance.max.delay.ms)是否已經(jīng)到了,如果沒到,則依舊不會立即解決當(dāng)前的不平衡問題,繼續(xù)返回目前的分配結(jié)果,并且返回的SyncGroupResponse 中更新了delay的剩余時間(remaining delay = delay - pass_time) 到此為止,第二輪 rebalance結(jié)束。

          整個rebalance過程中,C1和C3并不會停止消費(fèi)。

          -?C1收到的SyncGroup Response(remaining delay,assigned:P0、P3,revoked:)

          -?C2收到的SyncGroup Response(remaining delay,assigned:,revoked:

          -?C3收到的SyncGroup Response(remaining delay,assigned:P2,revoked:

          | 第三輪 Rebalance

          1. 當(dāng)remaining delay時間到期之后,consumer全部重新送JoinGroupRequest,觸發(fā)第三輪rebalance

          - C1 發(fā)送的JoinGroupRequest(assigned: P0、P3)

          -?C2 發(fā)送的JoinGroupRequest(assigned: )

          -??C3 發(fā)送的JoinGroupRequest(assigned: P2)

          2. 在此次rebalance中,C1依舊被選為Group Leader,它會發(fā)現(xiàn)delay已經(jīng)到期了,開始解決不平衡的問題,對partition進(jìn)行重新分配。最新的分配結(jié)果最終通過SyncGroupResponse返回到各個consumer:

          到此為止,第三輪rebalance結(jié)束。整個rebalance過程中,C1和C3的消費(fèi)都不會停止

          - C1收到的SyncGroup Response(assigned:P0、P3,revoked:

          - C2收到的SyncGroup Response(assigned:P1,revoked:

          - C3收到的SyncGroup Response(assigned:P2,revoked:

          下面這張圖展示了上述的Rebalance過程

          通過上述我們應(yīng)該對Kafka的Rebalance有了比較完整的認(rèn)識。我們現(xiàn)在來回答文章開始提出的消費(fèi)卡主問題:消費(fèi)端拿到了異常的消息,這樣的消息業(yè)務(wù)上處理時間過超過了max.poll.interval.ms, 從而觸發(fā)了rebalance, 在rebalance過程中所有消費(fèi)者都暫停了消費(fèi)。
          為了解決這個問題我們首先優(yōu)化業(yè)務(wù)邏輯盡可能提高處理消息速度,對異常消息做特殊處理;然后合理的設(shè)置max.poll.interval.ms cover住業(yè)務(wù)的處理時間。
          為了盡可能減少Rebalance次數(shù)我們也要注意設(shè)置session.timeout.ms和heartbeat.interval.ms的值。一種推薦的方案session.timeout.ms >= 3 * heartbeat.interval.ms, 比如session.timeout.ms = 6s; heartbeat.interval.ms = 2s。這樣consumer如果宕機(jī)且6s之內(nèi)未恢復(fù), Coordinator能夠較快地定位已經(jīng)掛掉的 consumer,把它踢出Group。

          1.?JetBrains 宣布:IntelliJ 平臺徹底停用 Log4j 組件,建議切換至 java.util.logging

          2.?一年之計(jì):如何構(gòu)建知識體系?

          3.?你在 Docker 中跑 MySQL?恭喜你,可以下崗了!

          4.?PageHelper 使用 ThreadLocal 的線程復(fù)用問題,你用對了嗎?

          最近面試BAT,整理一份面試資料Java面試BATJ通關(guān)手冊,覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫、數(shù)據(jù)結(jié)構(gòu)等等。

          獲取方式:點(diǎn)“在看”,關(guān)注公眾號并回復(fù)?Java?領(lǐng)取,更多內(nèi)容陸續(xù)奉上。

          PS:因公眾號平臺更改了推送規(guī)則,如果不想錯過內(nèi)容,記得讀完點(diǎn)一下在看,加個星標(biāo),這樣每次新文章推送才會第一時間出現(xiàn)在你的訂閱列表里。

          點(diǎn)“在看”支持小哈呀,謝謝啦??

          瀏覽 80
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  99 热在线观看 | 日韩一区二区三区免费播放 | 久久综合五月丁香六月 | 欧美成人性爱在线播放 | 亚洲大香香蕉网 |