<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          從一個(gè)消費(fèi)慢的例子深入理解 kafka rebalance

          共 7891字,需瀏覽 16分鐘

           ·

          2022-02-22 13:24

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”


          回復(fù)”學(xué)習(xí)資料“獲取學(xué)習(xí)寶典



          文章來(lái)源:https://lxkaka.wang/kafka-rebalance/


          前 言



          消息隊(duì)列是服務(wù)端必不可少的組件,其中Kafka可以說(shuō)是數(shù)一數(shù)二的選擇,對(duì)于大部分服務(wù)端的同學(xué)來(lái)說(shuō)Kafka也是最熟悉的消息中間件之一。而當(dāng)我們?cè)谏a(chǎn)上遇到kafka的使用問(wèn)題時(shí)想要透過(guò)現(xiàn)象看到問(wèn)題的本質(zhì),從而找到解決問(wèn)題的辦法。
          這就要求對(duì)kafka的設(shè)計(jì)和實(shí)現(xiàn)有這較為深刻的認(rèn)識(shí)。在這篇文章里我們就以生產(chǎn)實(shí)際的例子來(lái)展開(kāi)討論Kafka在消費(fèi)端中的一個(gè)重要設(shè)計(jì)consumer group的rebalance。
          只有理解了rebalance我們才能對(duì)消息消費(fèi)過(guò)程有著更全面的掌握。
          某一天我們收到消費(fèi)端消費(fèi)嚴(yán)重落后生產(chǎn)的告警。第一時(shí)間相關(guān)同學(xué)去看了consumer group的消費(fèi)曲線監(jiān)控,消費(fèi)速率明顯出現(xiàn)異常。下面這張示意圖展示了這種情況。

          我們能清楚的看到整個(gè)消費(fèi)組在消費(fèi)異常的時(shí)間段內(nèi)經(jīng)常出現(xiàn)消費(fèi)停滯的情況如圖上消費(fèi)速率為0。

          為什么消費(fèi)會(huì)卡主呢?同時(shí)去看了相關(guān)服務(wù)的日志看到很多err kafka data maybe rebalancing。看了這篇文章后消費(fèi)卡主的問(wèn)題自然就知道答案了。


          重要概念



          為了說(shuō)清楚 rebalance 有必要把最相關(guān)的重要概念回顧一下

          | Consumer Group

          consumer group是kafka提供的可擴(kuò)展且具有容錯(cuò)性的消費(fèi)者機(jī)制。既然是一個(gè)組,那么組內(nèi)必然可以有多個(gè)消費(fèi)者或消費(fèi)者實(shí)例(consumer instance),它們共享一個(gè)公共的 ID,即group ID。

          組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來(lái)消費(fèi)topic下的所有分區(qū)。總結(jié)一下就是以下幾個(gè)關(guān)鍵點(diǎn)。

          1. consumer group下可以有一個(gè)或多個(gè)consumer instance,consumer instance可以是一個(gè)進(jìn)程,也可以是一個(gè)線程

          2. group.id是一個(gè)字符串,唯一標(biāo)識(shí)一個(gè)consumer group

          3. consumer group訂閱的topic下的每個(gè)分區(qū)只能分配給某個(gè)group下的一個(gè)consumer (當(dāng)然該分區(qū)還可以被分配給其他group)

          | Coordinator

          Group Coordinator是一個(gè)服務(wù),每個(gè)Broker在啟動(dòng)的時(shí)候都會(huì)啟動(dòng)一個(gè)該服務(wù)。Group Coordinator的作用是用來(lái)存儲(chǔ)Group的相關(guān)Meta信息,并將對(duì)應(yīng)Partition的 Offset信息記錄到Kafka內(nèi)置Topic(__consumer_offsets)中。
          每個(gè)Group都會(huì)選擇一個(gè)Coordinator來(lái)完成自己組內(nèi)各Partition的Offset信息,選擇的規(guī)則如下:
          1. 計(jì)算Group對(duì)應(yīng)在__consumer_offsets 上的Partition
          2. 根據(jù)對(duì)應(yīng)的Partition尋找該P(yáng)artition的leader所對(duì)應(yīng)的Broker,該Broker上的 Group Coordinator即就是該Group的Coordinator
          Partition計(jì)算規(guī)則:

          partition-Id(__consumer_offsets) = Math.abs(groupId.hashCode()%groupMetadataTopicPartitionCount)

          其中g(shù)roupMetadataTopicPartitionCount對(duì)應(yīng)offsets.topic.num.partitions參數(shù)值,默認(rèn)值是50個(gè)分區(qū)。


          Rebalance 目的



          我們知道topic的partition已經(jīng)根據(jù)策略分配給了consumer group下的各個(gè)consumer。

          那么當(dāng)有新的consumer加入或者老的consumer離開(kāi)這個(gè)partition與consumer的分配關(guān)系就會(huì)發(fā)生變化,如果這個(gè)時(shí)候不進(jìn)行重新調(diào)配,就可能出現(xiàn)新consumer無(wú)partition消費(fèi)或者有partition無(wú)消費(fèi)者的情況。

          那么這個(gè)重新調(diào)配指的就是consumer和partition rebalance。

          consumer默認(rèn)提供了2種分配策略:
          range策略:將單個(gè)topic的分區(qū)按順序排列,然后把這些分區(qū)劃分成固定大小的分區(qū)段并依次分配給每個(gè)consumer。
          round-robin:把topi c的所有分區(qū)按順序排開(kāi),以輪詢的方式配給每個(gè)consumer。
          下面給一個(gè)簡(jiǎn)單的例子,假設(shè)目前某個(gè)consumer group A有2個(gè)consumer C1和C2,當(dāng)C3加入時(shí),觸發(fā)了rebalance條件,coordinator會(huì)進(jìn)行rebalance,根據(jù)range策略重新分配了partition。


          Rebalance 時(shí)機(jī)



          Rebalance在以下情況會(huì)觸發(fā)

          1. consume group中的成員個(gè)數(shù)發(fā)生變化。例如有新的consumer實(shí)例加入該消費(fèi)組或者離開(kāi)組
          2. 訂閱Topic的分區(qū)數(shù)發(fā)生變化
          3. 取消訂閱Topic或新增訂閱Topic


          Rebalance 過(guò)程



          kafka中的重要設(shè)計(jì)也會(huì)隨著版本的升級(jí)而優(yōu)化。rebalance也不例外,這里我們介紹的kafka rebalance流程以我們的線上版本1.1.1為例。

          1. 當(dāng)前consumer準(zhǔn)備加入consumer group或GroupCoordinator發(fā)生故障轉(zhuǎn)移時(shí),consumer并不知道GroupCoordinator的host和port,所以consumer會(huì)向Kafka集群中的任一broker節(jié)點(diǎn)發(fā)送FindCoordinatorRequest請(qǐng)求,
          收到請(qǐng)求的 broker 節(jié)點(diǎn)會(huì)返回ConsumerMetadataResponse響應(yīng),其中就包含了負(fù)責(zé)管理該Consumer Group的GroupCoordinator的地址
          2. 當(dāng)consumer通過(guò)FindCoordinatorRequest查找到其Consumer Group對(duì)應(yīng)的 GroupCoordinator之后,就會(huì)進(jìn)入Join Group階段
          3. Consumer先向GroupCoordinator發(fā)送JoinGroupRequest請(qǐng)求,其中包含consumer的相關(guān)信息
          4. GroupCoordinator收到JoinGroupRequest后會(huì)暫存該 consumer信息,然后等待全部consumer的JoinGroupRequest請(qǐng)求。
          JoinGroupRequest中的session.timeout.ms和rebalance_timeout_ms(max.poll.interval.ms)決定了consumer如果沒(méi)有響應(yīng)過(guò)多久會(huì)被踢出出組

          5. GroupCoordinator會(huì)根據(jù)全部consumer的JoinGroupRequest請(qǐng)求來(lái)確定 Consumer Group中可用的consumer,從中選取一個(gè)consumer成為Group Leader,
          同時(shí)還會(huì)決定partition分配策略,最后會(huì)將這些信息封裝成JoinGroupResponse?返回給Group Leader Consumer
          6. 每個(gè)consumer 都會(huì)收到JoinGroupResponse?響應(yīng),但是只有Group Leader 收到的JoinGroupResponse響應(yīng)中封裝的所有consumer 信息以及Group Leader信息。
          當(dāng)其中一個(gè)consumer確定了自己的Group Leader后,會(huì)根據(jù)consumer 信息、kafka 集群元數(shù)據(jù)以及partition分配策略計(jì)算partition的分片結(jié)果。
          其他非Group Leader consumer收到 JoinResponse為空響應(yīng),也就不會(huì)進(jìn)行任何操作,只是原地等待

          7. 接下來(lái),所有consumer進(jìn)入Synchronizing Group State階段,所有consumer會(huì)向GroupCoordinator發(fā)送SyncGroupRequest
          Group Leader Consumer的SyncGroupRequest請(qǐng)求包含了partition分配結(jié)果,普通consumer的SyncGroupRequest為空請(qǐng)求

          8. GroupCoordinator接下來(lái)會(huì)將partition分配結(jié)果封裝成SyncGroupResponse返回給所有consumer, consumer收到SyncGroupResponse后進(jìn)行解析,就可以明確partition與consumer的映射關(guān)系

          9. 后續(xù)consumer還是會(huì)與GroupCoordinator保持定期的心跳(heartbeat.interval.ms)。當(dāng)rebalance正在進(jìn)行中coordinator會(huì)通過(guò)hearbeat response告訴consumers是否要rejoin group觸發(fā)。即心跳響應(yīng)中包含 IllegalGeneration異常


          Rebalance 問(wèn)題



          在整個(gè)rebalance的過(guò)程中,所有partition都會(huì)被回收,consumer是無(wú)法消費(fèi)任何 partition的。Join階段會(huì)等待原先組內(nèi)存活的成員發(fā)送JoinGroupRequest過(guò)來(lái),如果原先組內(nèi)的成員因?yàn)闃I(yè)務(wù)處理一直沒(méi)有發(fā)送請(qǐng)求過(guò)來(lái),服務(wù)端就會(huì)一直等待,直到超時(shí)。
          這個(gè)超時(shí)時(shí)間就是max.poll.interval.ms的值,默認(rèn)是5分鐘,因此這種情況下rebalance的耗時(shí)就會(huì)長(zhǎng)達(dá)5分鐘,導(dǎo)致所有消費(fèi)者都無(wú)法進(jìn)行正常消費(fèi),這對(duì)生產(chǎn)來(lái)說(shuō)是個(gè)很大的問(wèn)題。


          Rebalance 改進(jìn)



          | Static Membership

          為了減少因?yàn)閏onsumer短暫不可用造成的rebalance,kafka在2.3版本中引入了Static Membership。
          Static Membership優(yōu)化的核心是:
          1. 在consumer端增加group.instance.id配置(group.instance.id 是 consumer 的唯一標(biāo)識(shí))。如果consumer啟動(dòng)的時(shí)候明確指定了group.instance.id配置值,consumer會(huì)JoinGroup Request中攜帶該值,表示該consumer為static member。為了保證group.instance.id的唯一性,我們可以考慮使用hostname、ip等。
          2. 在GroupCoordinator端會(huì)記錄group.instance.id → member.id的映射關(guān)系,以及已有的 partition 分配關(guān)系。
          當(dāng) GroupCoordinator 收到已知 group.instance.id 的 consumer 的 JoinGroup Request 時(shí),不會(huì)進(jìn)行 rebalance,而是將其原來(lái)對(duì)應(yīng)的 partition 分配給它。
          Static Membership 可以讓 consumer group 只在下面的 4 種情況下進(jìn)行 rebalance:
          - 有新consumer加入consumer group
          -?Group Leader重新加入Group時(shí)
          -?consumer下線時(shí)間超過(guò)閾值
          ? ?session.timeout.ms
          -?GroupCoordinator收到static member的
          ? ?LeaveGroup Request

          這樣的話,在使用Static Membership場(chǎng)景下,只要在consumer重新啟動(dòng)的時(shí)候,不發(fā)送LeaveGroup Request且在session.timeout.ms時(shí)長(zhǎng)內(nèi)重啟成功,就不會(huì)觸發(fā)rebalance。所以,這里推薦設(shè)置一個(gè)足夠consumer重啟的時(shí)長(zhǎng)session.timeout.ms,這樣能有效降低因consumer短暫不可用導(dǎo)致的reblance次數(shù)。

          | Incremental Cooperative Rebalancing

          從名字中我們就能看出這個(gè)版本的rebalance過(guò)程兩個(gè)關(guān)鍵詞增量和協(xié)作,增量指的是原先版本的rebalance被分解成了多次小規(guī)模的rebalance, 協(xié)作自然指的是consumer 之間的關(guān)系。核心思想:

          1. consumer比較新舊兩個(gè)partition分配結(jié)果,只停止消費(fèi)回收(revoke)的partition,對(duì)于兩次都分配給自己的partition,consumer不需要停止消費(fèi)

          2. 通過(guò)多輪的局部 rebalance 來(lái)最終實(shí)現(xiàn)全局的 rebalance

          我們以文章開(kāi)始的例子來(lái)理解一下這個(gè)版本的改進(jìn)

          首先C1 -> {P0, p3}C2 -> {P1} C3 -> {P2}這是consumer和partition的分配關(guān)系,我們假設(shè)C2宕機(jī)超過(guò)了session.timeout.ms, 此時(shí)GroupCoordinator會(huì)觸發(fā)第一輪 rebalance

          | 第一輪 Rebalance

          1. GroupCoordinator會(huì)在下一輪心跳響應(yīng)中通知C1和C3發(fā)起第一輪rebalance

          2. C1和C3會(huì)將自己當(dāng)前正在處理的partition信息封裝到JoinGroupRequest中(metadata字段)發(fā)往GroupCoordinator:

          - C1 發(fā)送的?JoinGroupRequest(assigned: P0、P3)

          - C3 發(fā)送的?JoinGroupRequest(assigned: P2)

          3. 假設(shè)GroupCoordinator在這里選擇1作為Group Leader,GroupCoordinator會(huì)將 partition目前的分配狀態(tài)通過(guò)JoinGroupResponse發(fā)送給C1

          4. C1發(fā)現(xiàn)P1并未出現(xiàn)(處于lost狀態(tài)),此時(shí)C1并不會(huì)立即解決當(dāng)前的不平衡問(wèn)題,返回的partition分配結(jié)果不變(同時(shí)會(huì)攜帶一個(gè)delay時(shí)間,scheduled.rebalance.max.delay.ms,默認(rèn)5分鐘)。

          GroupCoordinator據(jù)C1的SyncGroupRequest,生成SyncGroupResponse返回給兩個(gè)存活的consumer

          - C1 收到的SyncGroup Response(delay,assigned: P0、P3,revoked:

          - C3 收到的SyncGroup Response(delay,assigned: P2,revoked:

          到此為止,第一輪rebalance結(jié)束。整個(gè)rebalance過(guò)程中,C1和C3并不會(huì)停止消費(fèi)。

          |?第二輪 Rebalance

          1. 在scheduled.rebalance.max.delay.ms這個(gè)時(shí)間段內(nèi),C2故障恢復(fù),重新加入到 consumer group時(shí),會(huì)向GroupCoordinator發(fā)送JoinGroup Request,觸發(fā)第二輪的rebalance。

          GroupCoordinator在下一次心跳響應(yīng)中會(huì)通知C1和C3參與第二輪rebalance

          2. C1和C3收到心跳后,發(fā)送JoinGroupRequest參與第二輪rebalance:

          - C1 發(fā)送的JoinGroupRequest(assigned: P0、P3)

          - C3 發(fā)送的JoinGroupRequest(assigned: P2)

          3. 本輪中,C1依舊被選為Group Leader,它檢查delay時(shí)間(scheduled.rebalance.max.delay.ms)是否已經(jīng)到了,如果沒(méi)到,則依舊不會(huì)立即解決當(dāng)前的不平衡問(wèn)題,繼續(xù)返回目前的分配結(jié)果,并且返回的SyncGroupResponse 中更新了delay的剩余時(shí)間(remaining delay = delay - pass_time) 到此為止,第二輪 rebalance結(jié)束。

          整個(gè)rebalance過(guò)程中,C1和C3并不會(huì)停止消費(fèi)。

          -?C1收到的SyncGroup Response(remaining delay,assigned:P0、P3,revoked:)

          -?C2收到的SyncGroup Response(remaining delay,assigned:,revoked:

          -?C3收到的SyncGroup Response(remaining delay,assigned:P2,revoked:

          | 第三輪 Rebalance

          1. 當(dāng)remaining delay時(shí)間到期之后,consumer全部重新送JoinGroupRequest,觸發(fā)第三輪rebalance

          - C1 發(fā)送的JoinGroupRequest(assigned: P0、P3)

          -?C2 發(fā)送的JoinGroupRequest(assigned: )

          -??C3 發(fā)送的JoinGroupRequest(assigned: P2)

          2. 在此次rebalance中,C1依舊被選為Group Leader,它會(huì)發(fā)現(xiàn)delay已經(jīng)到期了,開(kāi)始解決不平衡的問(wèn)題,對(duì)partition進(jìn)行重新分配。最新的分配結(jié)果最終通過(guò)SyncGroupResponse返回到各個(gè)consumer:

          到此為止,第三輪rebalance結(jié)束。整個(gè)rebalance過(guò)程中,C1和C3的消費(fèi)都不會(huì)停止

          - C1收到的SyncGroup Response(assigned:P0、P3,revoked:

          - C2收到的SyncGroup Response(assigned:P1,revoked:

          - C3收到的SyncGroup Response(assigned:P2,revoked:

          下面這張圖展示了上述的Rebalance過(guò)程

          通過(guò)上述我們應(yīng)該對(duì)Kafka的Rebalance有了比較完整的認(rèn)識(shí)。我們現(xiàn)在來(lái)回答文章開(kāi)始提出的消費(fèi)卡主問(wèn)題:消費(fèi)端拿到了異常的消息,這樣的消息業(yè)務(wù)上處理時(shí)間過(guò)超過(guò)了max.poll.interval.ms, 從而觸發(fā)了rebalance, 在rebalance過(guò)程中所有消費(fèi)者都暫停了消費(fèi)。
          為了解決這個(gè)問(wèn)題我們首先優(yōu)化業(yè)務(wù)邏輯盡可能提高處理消息速度,對(duì)異常消息做特殊處理;然后合理的設(shè)置max.poll.interval.ms cover住業(yè)務(wù)的處理時(shí)間。
          為了盡可能減少Rebalance次數(shù)我們也要注意設(shè)置session.timeout.ms和heartbeat.interval.ms的值。一種推薦的方案session.timeout.ms >= 3 * heartbeat.interval.ms, 比如session.timeout.ms = 6s; heartbeat.interval.ms = 2s。這樣consumer如果宕機(jī)且6s之內(nèi)未恢復(fù), Coordinator能夠較快地定位已經(jīng)掛掉的 consumer,把它踢出Group。
          -------------? END??-------------
          掃描下方二維碼,加入技術(shù)群。暗號(hào):加群

          瀏覽 64
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久久无码高清视频 | 亚洲第一在线观看 | 色色拍拍 | 性国产 | 国操逼网|