<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          一文理解Kafka重復消費的原因和解決方案

          共 1666字,需瀏覽 4分鐘

           ·

          2021-06-23 12:21

          如果對Kafka不了解的話,可以先看這篇博客《一文快速了解Kafka》。

          在解釋Kafka重復消費出現(xiàn)原因之前,列舉一下Kafka中與消費者有關(guān)的幾個重要配置參數(shù)。

          • enable.auto.commit:表示消費者會周期性自動提交消費的offset。默認值true。

          • auto.commit.interval.ms:在enable.auto.commit為true的情況下, 自動提交的間隔。默認值5秒。

          • max.poll.records:單次消費者拉取的最大數(shù)據(jù)條數(shù),默認值500。

          • max.poll.interval.ms:表示若在閾值時間之內(nèi)消費者沒有消費完上一次poll的消息,consumer client會主動向coordinator發(fā)起LeaveGroup請求,觸發(fā)Rebalance;然后consumer重新發(fā)送JoinGroup請求。

          • session.timeout.ms:group Coordinator檢測consumer發(fā)生崩潰所需的時間。在這個時間內(nèi)如果Coordinator未收到Consumer的任何消息,那Coordinator就認為Consumer掛了。默認值10秒。

          • heartbeat.interval.ms:標識Consumer給Coordinator發(fā)一個心跳包的時間間隔。heartbeat.interval.ms越小,發(fā)的心跳包越多。默認值3秒。

          Group Coordinator

          基于Zookeeper的Rebalance存在不可避免的羊群效應和腦裂問題,如何不用Zookeeper來協(xié)調(diào),而是將失敗探測和Rebalance的邏輯放到一個高可用的中心,那么上述問題就能得以解決。因此Kafka0.9的版本重新設計了Consumer端,設計了Coordinator機制,大大減少了Zookeeper負載。

          對于每一個Consumer Group,Kafka集群為其從Broker集群中選擇一個Broker作為其Coordinator。Coordinator主要做兩件事:

          1. 維持Group成員的組成。這包括加入新的成員,檢測成員的存活性,清除不再存活的成員。

          2. 協(xié)調(diào)Group成員的行為。

          羊群效應:以Zookeeper為例,由于一個被watch的znode變化,導致大量的通知需要被發(fā)送,將會導致在這個通知期間的其他操作提交的延遲。

          重復消費的原因

          原因1:消費者宕機、重啟或者被強行kill進程,導致消費者消費的offset沒有提交。

          原因2:設置enable.auto.commit為true,如果在關(guān)閉消費者進程之前,取消了消費者的訂閱,則有可能部分offset沒提交,下次重啟會重復消費。

          原因3:消費后的數(shù)據(jù),當offset還沒有提交時,Partition就斷開連接。比如,通常會遇到消費的數(shù)據(jù),處理很耗時,導致超過了Kafka的session timeout.ms時間,那么就會觸發(fā)reblance重平衡,此時可能存在消費者offset沒提交,會導致重平衡后重復消費。

          重復消費的解決方法

          1. 提高消費者的處理速度。例如:對消息處理中比較耗時的步驟可通過異步的方式進行處理、利用多線程處理等。在縮短單條消息消費的同時,根據(jù)實際場景可將max.poll.interval.ms值設置大一點,避免不必要的Rebalance??筛鶕?jù)實際消息速率適當調(diào)小max.poll.records的值。

          2. 引入消息去重機制。例如:生成消息時,在消息中加入唯一標識符如消息id等。在消費端,可以保存最近的max.poll.records條消息id到redis或mysql表中,這樣在消費消息時先通過查詢?nèi)ブ睾?,再進行消息的處理。

          3. 保證消費者邏輯冪等??梢圆榭床┛汀?a target="_blank" style="box-sizing: border-box;color: rgb(3, 102, 214);" data-linktype="2">一文理解如何實現(xiàn)接口的冪等性》


          瀏覽 69
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  在线免费日韩 | 一起操成人影视 | 亚洲成人操B视频 | 三级片www | 先锋影音亚洲AV每日资源网站 |