聊聊 Kafka:Kafka 消息丟失的場景以及最佳實踐
一、前言
大家好,我是老周,有快二十多天沒有更新文章了,很多小伙伴一直在催更。先說明下最近的情況,最近項目上線很忙,沒有時間寫,并且組里有個同事使用 Kafka 不當(dāng),導(dǎo)致線上消息丟失,在修復(fù)一些線上的數(shù)據(jù),人都麻了。事情是這樣,有個 Kafka 消費者實例,部署到線上去,消費到了線上的數(shù)據(jù),而新版本做了新的邏輯,新版本的業(yè)務(wù)邏輯與老版本的業(yè)務(wù)邏輯不兼容,直接導(dǎo)致消費失敗,沒有進行重試操作,關(guān)鍵還提交了 offset。直接這部分數(shù)據(jù)沒有被業(yè)務(wù)處理,導(dǎo)致消息丟失,然后緊急修復(fù)線上數(shù)據(jù)。
剛好這些天忙完了有空,所以記錄一下,同時看是否對大家能起到避免踩坑的作用,能有一些作用,那我寫的也就值了。
我們下面會從以下三個方面來說一下 Kafka 消息丟失的場景以及最佳實踐。
生產(chǎn)者丟失消息
Kafka Broker 服務(wù)端丟失消息
消費者丟失消息
二、Kafka 的三種消息語義
先說 Kafka 消息丟失的場景之前,我們先來說下 Kafka 的三種消息語義,不會還有人不知道吧?這個不應(yīng)該了,消息系統(tǒng)基本上抽象成這以下三種消息語義了:
最多傳遞一次
最少傳遞一次
僅有一次傳遞


三、Kafka 消息丟失的場景
3.1 生產(chǎn)者丟失消息
目前 Kafka Producer 是異步發(fā)送消息的,如果你的 Producer 客戶端使用了
producer.send(msg)方法來發(fā)送消息,方法會立即返回,但此時并不能代表消息已經(jīng)發(fā)送成功了。如果消息在發(fā)送的過程中發(fā)生了網(wǎng)絡(luò)抖動,那么消息可能沒有傳遞到 Broker,那么消息可能會丟失。
如果發(fā)送的消息本身不符合,如大小超過了 Broker 的承受能力等。
3.2 Kafka Broker 服務(wù)端丟失消息
Leader Broker 宕機了,觸發(fā)選舉過程,集群選舉了一個落后 Leader 太多的 Broker 作為 Leader,那么落后的那些消息就會丟失了。
Kafka 為了提升性能,使用頁緩存機制,將消息寫入頁緩存而非直接持久化至磁盤,采用了異步批量刷盤機制,也就是說,按照一定的消息量和時間間隔去刷盤,刷盤的動作由操作系統(tǒng)來調(diào)度的,如果刷盤之前,Broker 宕機了,重啟后在頁緩存的這部分消息則會丟失。
3.3 消費者丟失消息
消費者拉取了消息,并處理了消息,但處理消息異常了導(dǎo)致失敗,并且提交了偏移量,消費者重啟后,會從之前已提交的位移的下一個位置重新開始消費,消費失敗的那些消息不會再次處理,即相當(dāng)于消費者丟失了消息。
消費者拉取了消息,并提交了消費位移,但是在消息處理結(jié)束之前突然發(fā)生了宕機等故障,消費者重啟后,會從之前已提交的位移的下一個位置重新開始消費,之前未處理完成的消息不會再次處理,即相當(dāng)于消費者丟失了消息。
四、最佳實踐
4.1 生產(chǎn)端
不要使用
producer.send(msg),而要使用producer.send(msg, callback)。帶有回調(diào)通知的 send 方法可以針對發(fā)送失敗的消息進行重試處理。設(shè)置
acks = all。代表了你對“已提交”消息的定義。如果設(shè)置成 all,則表明所有副本 Broker 都要接收到消息,該消息才算是“已提交”。這是最高等級的“已提交”定義。
設(shè)置
retries = 3,當(dāng)出現(xiàn)網(wǎng)絡(luò)的瞬時抖動時,消息發(fā)送可能會失敗,此時配置了 retries > 0 的 Producer 能夠自動重試消息發(fā)送,避免消息丟失。
如果重試達到設(shè)定的次數(shù),那么生產(chǎn)者就會放棄重試并返回異常。不過并不是所有的異常都是可以通過重試來解決的,比如消息太大,超過max.request.size參數(shù)配置的值時,這種方式就不可行了。設(shè)置
retry.backoff.ms = 300,合理估算重試的時間間隔,可以避免無效的頻繁重試。
它用來設(shè)定兩次重試之間的時間間隔,避免無效的頻繁重試。在配置retries和retry.backoff.ms之前,最好先估算一下可能的異常恢復(fù)時間,這樣可以設(shè)定總的重試時間大于這個異常恢復(fù)時間,以此來避免生產(chǎn)者過早地放棄重試。
4.2 Broker 端
設(shè)置
unclean.leader.election.enable = false。它控制的是哪些 Broker 有資格競選分區(qū)的 Leader。如果一個 Broker 落后原先的 Leader 太多,那么它一旦成為新的 Leader,必然會造成消息的丟失。故一般都要將該參數(shù)設(shè)置成 false,即不允許這種情況的發(fā)生。
設(shè)置
replication.factor >= 3。其實這里想表述的是,最好將消息多保存幾份,畢竟目前防止消息丟失的主要機制就是冗余。
設(shè)置
min.insync.replicas > 1。這控制的是消息至少要被寫入到多少個副本才算是“已提交”。設(shè)置成大于 1 可以提升消息持久性。在實際環(huán)境中千萬不要使用默認值 1。
確保
replication.factor > min.insync.replicas。如果兩者相等,那么只要有一個副本掛機,整個分區(qū)就無法正常工作了。我們不僅要改善消息的持久性,防止數(shù)據(jù)丟失,還要在不降低可用性的基礎(chǔ)上完成。推薦設(shè)置成replication.factor = min.insync.replicas + 1。
4.3 消費端
確保消息消費完成再提交。最好把它設(shè)置成
enable.auto.commit = false,并采用手動提交位移的方式。這對于單 Consumer 多線程處理的場景而言是至關(guān)重要的。
雖然采用手動提交位移的方式可以解決消費端消息丟失的場景,但同時會存在重復(fù)消費問題,關(guān)于重復(fù)消費問題我們下一篇再講。像我們上面說的那個線上問題,即使你設(shè)置了手動提交,消費異常了同時也提交了位移,還是會存在消息丟失。
Kafka 沒有重試機制不支持消息重試,也沒有死信隊列,因此使用 Kafka 做消息隊列時,需要自己
實現(xiàn)消息重試的功能。這里我先說下大致的思路,后續(xù)有時間再分享代碼出來:創(chuàng)建一個 Topic 作為重試 Topic,用于接收等待重試的消息。
普通 Topic 消費者設(shè)置待重試消息的下一個重試 Topic。
從重試 Topic 獲取待重試消息存儲到 Redis 的 ZSet 中,并以下一次消費時間排序。
定時任務(wù)從 Redis 獲取到達消費時間的消息,并把消息發(fā)送到對應(yīng)的 Topic。
同一個消息重試次數(shù)過多則不再重試。
歡迎大家關(guān)注我的公眾號【老周聊架構(gòu)】,Java后端主流技術(shù)棧的原理、源碼分析、架構(gòu)以及各種互聯(lián)網(wǎng)高并發(fā)、高性能、高可用的解決方案。
