<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          硬核 | Kafka 如何解決消息不丟失?

          共 2474字,需瀏覽 5分鐘

           ·

          2021-08-05 14:08

          大家好,我是Tom哥~

          Kafka 消息框架,大家一定不陌生,很多人工作中都有接觸。它的核心思路,通過一個(gè)高性能的MQ服務(wù)來連接生產(chǎn)消費(fèi)兩個(gè)系統(tǒng),達(dá)到系統(tǒng)間的解耦,有很強(qiáng)的擴(kuò)展性。

          你可能會(huì)有疑問,如果中間某一個(gè)環(huán)節(jié)斷掉了,那怎么辦?

          這種情況,我們稱之為消息丟失,會(huì)造成系統(tǒng)間的數(shù)據(jù)不一致。

          那如何解決這個(gè)問題?需要從生產(chǎn)端MQ服務(wù)端消費(fèi)端,三個(gè)維度來處理

          1、生產(chǎn)端

          生產(chǎn)端的職責(zé)就是,確保生產(chǎn)的消息能到達(dá)MQ服務(wù)端,這里我們需要有一個(gè)響應(yīng)來判斷本次的操作是否成功。

          Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)

          比如,上面的代碼就是通過一個(gè)Callback函數(shù),來判斷消息是否發(fā)送成功,如果失敗,我們需要補(bǔ)償處理。

          另外,為了提升發(fā)送時(shí)的靈活性,kafka提供了多種參數(shù),供不同業(yè)務(wù)自己選擇

          1.1 參數(shù) acks

          該參數(shù)表示有多少個(gè)分區(qū)副本收到消息,才認(rèn)為本次發(fā)送是成功的。

          • acks=0,只要發(fā)送消息就認(rèn)為成功,生產(chǎn)端不等待服務(wù)器節(jié)點(diǎn)的響應(yīng)
          • acks=1,表示生產(chǎn)者收到 leader 分區(qū)的響應(yīng)就認(rèn)為發(fā)送成功
          • acks=-1,只有當(dāng) ISR 中的副本全部收到消息時(shí),生產(chǎn)端才會(huì)認(rèn)為是成功的。這種配置是最安全的,但由于同步的節(jié)點(diǎn)較多,吞吐量會(huì)降低。

          1.2 參數(shù) retries

          表示生產(chǎn)端的重試次數(shù),如果重試次數(shù)用完后,還是失敗,會(huì)將消息臨時(shí)存儲(chǔ)在本地磁盤,待服務(wù)恢復(fù)后再重新發(fā)送。建議值 retries=3

          1.3 參數(shù) retry.backoff.m

          消息發(fā)送超時(shí)或失敗后,間隔的重試時(shí)間。一般推薦的設(shè)置時(shí)間是 300 毫秒。

          這里要特別注意一種特殊情況,如果MQ服務(wù)沒有正常響應(yīng),不一定代表消息發(fā)送失敗,也有可能是響應(yīng)時(shí)正好趕上網(wǎng)絡(luò)抖動(dòng),響應(yīng)超時(shí)。

          當(dāng)生產(chǎn)端做完這些,一定能保證消息發(fā)送成功了,但可能發(fā)送多次,這樣就會(huì)導(dǎo)致消息重復(fù),這個(gè)我們后面再講解決方案

          2、MQ服務(wù)端

          MQ服務(wù)端作為消息的存儲(chǔ)介質(zhì),也有可能會(huì)丟失消息。比如:一個(gè)分區(qū)突然掛掉,那么怎么保證這個(gè)分區(qū)的數(shù)據(jù)不丟失,我們會(huì)引入副本概念,通過備份來解決這個(gè)問題。

          具體可設(shè)置哪些參數(shù)?

          2.1 參數(shù) replication.factor

          表示分區(qū)副本的個(gè)數(shù),replication.factor >1 當(dāng)leader 副本掛了,follower副本會(huì)被選舉為leader繼續(xù)提供服務(wù)。

          2.2 參數(shù) min.insync.replicas

          表示 ISR 最少的副本數(shù)量,通常設(shè)置 min.insync.replicas >1,這樣才有可用的follower副本執(zhí)行替換,保證消息不丟失

          2.3 參數(shù) unclean.leader.election.enable

          是否可以把非 ISR 集合中的副本選舉為 leader 副本。

          如果設(shè)置為true,而follower副本的同步消息進(jìn)度落后較多,此時(shí)被選舉為leader,會(huì)導(dǎo)致消息丟失,慎用。

          3、消費(fèi)端

          消費(fèi)端要做的是把消息完整的消費(fèi)處理掉。但是這里面有個(gè)提交位移的步驟。

          有的同學(xué),考慮到業(yè)務(wù)處理消耗時(shí)間較長,會(huì)單獨(dú)啟動(dòng)線程拉取消息存儲(chǔ)到本地內(nèi)存隊(duì)列,然后再搞個(gè)線程池并行處理業(yè)務(wù)邏輯。這樣設(shè)計(jì)有個(gè)風(fēng)險(xiǎn),本地消息如果沒有處理完,服務(wù)器宕機(jī)了,會(huì)造成消息丟失。

          正確的做法:拉取消息 ---  業(yè)務(wù)處理  ---- 提交消費(fèi)位移

          關(guān)于提交位移,kafka提供了集中參數(shù)配置

          參數(shù)  enable.auto.commit

          表示消費(fèi)位移是否自動(dòng)提交。

          如果拉取了消息,業(yè)務(wù)邏輯還沒處理完,提交了消費(fèi)位移但是消費(fèi)端卻掛了,消費(fèi)端恢復(fù)或其他消費(fèi)端接管該分片再也拉取不到這條消息,會(huì)造成消息丟失。所以,我們通常設(shè)置 enable.auto.commit=false,手動(dòng)提交消費(fèi)位移。

          List<String> messages = consumer.poll();
          processMsg(messages);
          consumer.commitOffset();

          這個(gè)方案,會(huì)產(chǎn)生另外一個(gè)問題,我們來看下這個(gè)圖

          拉取了消息4~消息8,業(yè)務(wù)處理后,在提交消費(fèi)位移時(shí),不湊巧系統(tǒng)宕機(jī)了,最后的提交位移并沒有保存到MQ 服務(wù)端,下次拉取消息時(shí),依然是從消息4開始拉取,但是這部分消息已經(jīng)處理過了,這樣便會(huì)導(dǎo)致重復(fù)消費(fèi)。

          如何解決重復(fù)消費(fèi),避免引發(fā)數(shù)據(jù)不一致

          首先,要解決MQ 服務(wù)端的重復(fù)消息。kafka 在  0.11.0 版本后,每條消息都有唯一的message id, MQ服務(wù)采用空間換時(shí)間方式,自動(dòng)對(duì)重復(fù)消息過濾處理,保證接口的冪等性。

          但這個(gè)不能根本上解決消息重復(fù)問題,即使MQ服務(wù)中存儲(chǔ)的消息沒有重復(fù),但消費(fèi)端是采用拉取方式,如果重復(fù)拉取,也會(huì)導(dǎo)致重復(fù)消費(fèi),如何解決這種場(chǎng)景問題?

          方案一:只拉取一次(消費(fèi)者拉取消息后,先提交 offset 后再處理消息),但是如果系統(tǒng)宕機(jī),業(yè)務(wù)處理沒有正常結(jié)束,后面再也拉取不到這些消息,會(huì)導(dǎo)致數(shù)據(jù)不一致,該方案很少采用。

          方案二:允許拉取重復(fù)消息,但是消費(fèi)端自己做冪等性控制。保證只成功消費(fèi)一次

          關(guān)于冪等技術(shù)方案很多,我們可以采用數(shù)據(jù)表Redis緩存存儲(chǔ)處理標(biāo)識(shí),每次拉取到消息,處理前先校驗(yàn)處理狀態(tài),再?zèng)Q定是處理還是丟棄消息。

          推薦閱讀:
          MySQL 開源工具集合
          什么是布隆過濾器?如何解決高并發(fā)緩存穿透問題?
          如何通過Binlog來實(shí)現(xiàn)不同系統(tǒng)間數(shù)據(jù)同步
          高并發(fā)服務(wù)優(yōu)化篇:詳解RPC的一次調(diào)用過程
          如何設(shè)計(jì)一個(gè)高性能的秒殺系統(tǒng)

          關(guān)號(hào)互聯(lián)網(wǎng)全棧架構(gòu)價(jià)

          瀏覽 34
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产一级a一片成人AV | 在线视频日韩 | 人人色免费 | 国产女人AV第一 | 豆花视频操逼 |