一文理解Kafka如何消息不丟失
本文只聚焦于Kafka系統(tǒng)的消息丟失,如果是生產(chǎn)環(huán)境出現(xiàn)數(shù)據(jù)丟失,排查時要先從鏈路上分段定位,縮小問題范圍。
如果對Kafka不了解的話,可以先看這篇博客《一文快速了解Kafka》。
數(shù)據(jù)丟失的原因
生產(chǎn)者丟失消息的情況
生產(chǎn)者(Producer) 調用send方法發(fā)送消息之后,消息可能因為網(wǎng)絡問題并沒有發(fā)送過去。
解決方法:
不能認為在調用send方法發(fā)送消息之后消息消息發(fā)送成功了。為了確定消息是發(fā)送成功,需要判斷消息發(fā)送的結果。但要注意的是Kafka生產(chǎn)者(Producer) 使用send方法發(fā)送消息實是異步的操作,雖然可以通過get()方法獲取調用結果,但降低業(yè)務服務的吞吐量。優(yōu)化的方式是改為回調函數(shù)的形式。
此外,對于一致性要求不高的業(yè)務場景,可以考慮Producer端設置retries(重試次數(shù))設置一個比較合理的值,一般是3。設置完成之后,當出現(xiàn)網(wǎng)絡問題之后能夠自動重試消息發(fā)送,避免消息丟失。另外,建議將重試間隔設置長一些,因為間隔時間太小,可能一次網(wǎng)絡波動的時間重試全部結束了。
消費者丟失消息的情況
自動提交開啟會存在這樣的問題:當消費者poll到這個消息,還沒進行真正消費的時候,offset被自動提交的同時消費者掛掉了。
解決辦法:
關閉自動提交offset(即:enable.auto.commit為false),每次在真正消費完消息之后,手動提交offset。
但這樣還是會存在消費者剛消費完消息,還沒提交offset,結果宕機了,那么這個消息理論上就會被消費兩次,因此消費端冪等性是需要保證。可以查看博客《一文理解如何實現(xiàn)接口的冪等性》,有這種問題對應的解決方案
Kafka系統(tǒng)內丟失消息的情況
假如leader副本所在的broker突然掛掉,那么就要從follower副本重新選出一個leader,但是leader的數(shù)據(jù)還有一些沒有被follower副本的同步的話,就會造成消息丟失。
解決方法:
為了減少Kafka系統(tǒng)內丟失消息的情況,Kafka需要配置如下幾個參數(shù):
Producer端設置
acks=all。acks的默認值為1,代表消息被leader副本接收之后就算被成功發(fā)送。當配置acks=all代表則所有副本都要接收到該消息之后該消息才算真正成功被發(fā)送。(副本只是將消息存儲在PageCache上的,定期flush到磁盤上的,如果出現(xiàn)斷電或者機器故障等,PageCache上的數(shù)據(jù)就丟失了。但設置設置了acks=all,出現(xiàn)多個副本同時掛掉的概率比Leader掛掉的概率就小很多)topic設置
replication.factor>=3。為了保證leader副本能有follower 副本能同步消息,一般會設置replication.factor>=3。這樣就可以保證每個分區(qū)(partition)至少有3個副本。雖然造成了數(shù)據(jù)冗余,但是帶來了數(shù)據(jù)的安全性。設置
min.insync.replicas>1。一般情況下需要設置min.insync.replicas>1,這樣配置代表消息至少要被寫入到2個副本才算是被成功發(fā)送(默認值為1)。在實際生產(chǎn)中應盡量避免min.insync.replicas值為1,此外,為了保證整個Kafka服務的高可用性,你需要確保replication.factor>min.insync.replicas,否則有一個副本掛掉,整個分區(qū)就無法正常工作了。推薦設置成replication.factor=min.insync.replicas+1。設置
unclean.leader.election.enable=false。即不允許Unclean leader選舉。Producer端設置
retries。配合acks=all,這樣可以保證leader掛掉之后,Producer會重新發(fā)送消息。
Unclean leader選舉:Kafka把不在ISR列表中的存活副本稱為“非同步副本”,這些副本中的消息遠遠落后于leader,如果選舉這種副本作為leader的話就可能造成數(shù)據(jù)丟失。Kafka broker端提供了一個參數(shù)unclean.leader.election.enable,用于控制是否允許非同步副本參與leader選舉;如果開啟,則當ISR為空時就會從這些副本中選舉新的leader,這個過程稱為Unclean leader選舉。
異常導致的數(shù)據(jù)丟失
單條數(shù)據(jù)的長度超過限制會丟失數(shù)據(jù),報kafka.common.MessageSizeTooLargeException異常,導致生產(chǎn)者消息積壓,內存上升。
解決方法:
修改Kafka Broker的配置,修改單條消息的最大長度、單條消息的最大長度等參數(shù)配置。
