<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          聊聊 Kafka:Kafka 消息丟失的場景以及最佳實踐

          共 2706字,需瀏覽 6分鐘

           ·

          2022-05-10 09:34

          一、前言

          大家好,我是老周,有快二十多天沒有更新文章了,很多小伙伴一直在催更。先說明下最近的情況,最近項目上線很忙,沒有時間寫,并且組里有個同事使用 Kafka 不當(dāng),導(dǎo)致線上消息丟失,在修復(fù)一些線上的數(shù)據(jù),人都麻了。事情是這樣,有個 Kafka 消費者實例,部署到線上去,消費到了線上的數(shù)據(jù),而新版本做了新的邏輯,新版本的業(yè)務(wù)邏輯與老版本的業(yè)務(wù)邏輯不兼容,直接導(dǎo)致消費失敗,沒有進行重試操作,關(guān)鍵還提交了 offset。直接這部分數(shù)據(jù)沒有被業(yè)務(wù)處理,導(dǎo)致消息丟失,然后緊急修復(fù)線上數(shù)據(jù)。

          剛好這些天忙完了有空,所以記錄一下,同時看是否對大家能起到避免踩坑的作用,能有一些作用,那我寫的也就值了。

          我們下面會從以下三個方面來說一下 Kafka 消息丟失的場景以及最佳實踐。

          • 生產(chǎn)者丟失消息

          • Kafka Broker 服務(wù)端丟失消息

          • 消費者丟失消息

          二、Kafka 的三種消息語義

          先說 Kafka 消息丟失的場景之前,我們先來說下 Kafka 的三種消息語義,不會還有人不知道吧?這個不應(yīng)該了,消息系統(tǒng)基本上抽象成這以下三種消息語義了:

          • 最多傳遞一次

          • 最少傳遞一次

          • 僅有一次傳遞

          三、Kafka 消息丟失的場景

          3.1 生產(chǎn)者丟失消息

          • 目前 Kafka Producer 是異步發(fā)送消息的,如果你的 Producer 客戶端使用了 producer.send(msg) 方法來發(fā)送消息,方法會立即返回,但此時并不能代表消息已經(jīng)發(fā)送成功了。

          • 如果消息在發(fā)送的過程中發(fā)生了網(wǎng)絡(luò)抖動,那么消息可能沒有傳遞到 Broker,那么消息可能會丟失。

          • 如果發(fā)送的消息本身不符合,如大小超過了 Broker 的承受能力等。

          3.2 Kafka Broker 服務(wù)端丟失消息

          • Leader Broker 宕機了,觸發(fā)選舉過程,集群選舉了一個落后 Leader 太多的 Broker 作為 Leader,那么落后的那些消息就會丟失了。

          • Kafka 為了提升性能,使用頁緩存機制,將消息寫入頁緩存而非直接持久化至磁盤,采用了異步批量刷盤機制,也就是說,按照一定的消息量和時間間隔去刷盤,刷盤的動作由操作系統(tǒng)來調(diào)度的,如果刷盤之前,Broker 宕機了,重啟后在頁緩存的這部分消息則會丟失。

          3.3 消費者丟失消息

          • 消費者拉取了消息,并處理了消息,但處理消息異常了導(dǎo)致失敗,并且提交了偏移量,消費者重啟后,會從之前已提交的位移的下一個位置重新開始消費,消費失敗的那些消息不會再次處理,即相當(dāng)于消費者丟失了消息。

          • 消費者拉取了消息,并提交了消費位移,但是在消息處理結(jié)束之前突然發(fā)生了宕機等故障,消費者重啟后,會從之前已提交的位移的下一個位置重新開始消費,之前未處理完成的消息不會再次處理,即相當(dāng)于消費者丟失了消息。

          四、最佳實踐

          4.1 生產(chǎn)端

          • 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。帶有回調(diào)通知的 send 方法可以針對發(fā)送失敗的消息進行重試處理。

          • 設(shè)acks = all代表了你對“已提交”消息的定義。如果設(shè)置成 all,則表明所有副本 Broker 都要接收到消息,該消息才算是“已提交”。這是最高等級的“已提交”定義。


          • 設(shè)置 retries = 3,當(dāng)出現(xiàn)網(wǎng)絡(luò)的瞬時抖動時,消息發(fā)送可能會失敗,此時配置了 retries > 0 的 Producer 能夠自動重試消息發(fā)送,避免消息丟失。



            如果重試達到設(shè)定的次數(shù),那么生產(chǎn)者就會放棄重試并返回異常。不過并不是所有的異常都是可以通過重試來解決的,比如消息太大,超過max.request.size參數(shù)配置的值時,這種方式就不可行了。

          • 設(shè)置 retry.backoff.ms = 300,合理估算重試的時間間隔,可以避免無效的頻繁重試。



            它用來設(shè)定兩次重試之間的時間間隔,避免無效的頻繁重試。在配置 retriesretry.backoff.ms之前,最好先估算一下可能的異常恢復(fù)時間,這樣可以設(shè)定總的重試時間大于這個異常恢復(fù)時間,以此來避免生產(chǎn)者過早地放棄重試。

          4.2 Broker 端

          • 設(shè)置 unclean.leader.election.enable = false。它控制的是哪些 Broker 有資格競選分區(qū)的 Leader。如果一個 Broker 落后原先的 Leader 太多,那么它一旦成為新的 Leader,必然會造成消息的丟失。故一般都要將該參數(shù)設(shè)置成 false,即不允許這種情況的發(fā)生。



          • 設(shè)置 replication.factor >= 3。其實這里想表述的是,最好將消息多保存幾份,畢竟目前防止消息丟失的主要機制就是冗余。



          • 設(shè)置 min.insync.replicas > 1。這控制的是消息至少要被寫入到多少個副本才算是“已提交”。設(shè)置成大于 1 可以提升消息持久性。在實際環(huán)境中千萬不要使用默認值 1。



          • 確保 replication.factor > min.insync.replicas。如果兩者相等,那么只要有一個副本掛機,整個分區(qū)就無法正常工作了。我們不僅要改善消息的持久性,防止數(shù)據(jù)丟失,還要在不降低可用性的基礎(chǔ)上完成。推薦設(shè)置成 replication.factor = min.insync.replicas + 1

          4.3 消費端

          • 確保消息消費完成再提交。最好把它設(shè)置成 enable.auto.commit = false,并采用手動提交位移的方式。這對于單 Consumer 多線程處理的場景而言是至關(guān)重要的。



            雖然采用手動提交位移的方式可以解決消費端消息丟失的場景,但同時會存在重復(fù)消費問題,關(guān)于重復(fù)消費問題我們下一篇再講。

          • 像我們上面說的那個線上問題,即使你設(shè)置了手動提交,消費異常了同時也提交了位移,還是會存在消息丟失。

            Kafka 沒有重試機制不支持消息重試,也沒有死信隊列,因此使用 Kafka 做消息隊列時,需要自己
            實現(xiàn)消息重試的功能。這里我先說下大致的思路,后續(xù)有時間再分享代碼出來:

            • 創(chuàng)建一個 Topic 作為重試 Topic,用于接收等待重試的消息。

            • 普通 Topic 消費者設(shè)置待重試消息的下一個重試 Topic。

            • 從重試 Topic 獲取待重試消息存儲到 Redis 的 ZSet 中,并以下一次消費時間排序。

            • 定時任務(wù)從 Redis 獲取到達消費時間的消息,并把消息發(fā)送到對應(yīng)的 Topic。

            • 同一個消息重試次數(shù)過多則不再重試。


          歡迎大家關(guān)構(gòu)Java術(shù)棧的原理、源碼分析、架構(gòu)以及各種互聯(lián)網(wǎng)高并發(fā)、高性能、高可用的解決方案。

          瀏覽 72
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产一级免费在线观看 | 美女被肏视频网站 | 欧美日本黄色一级视频 | 自拍偷拍第六页 | 丰满老熟女一级AA片色欲 |