<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          如何解決 Kafka 消息重復(fù)問(wèn)題!

          共 14064字,需瀏覽 29分鐘

           ·

          2023-02-04 18:39

          程序員的成長(zhǎng)之路
          互聯(lián)網(wǎng)/程序員/技術(shù)/資料共享 
          關(guān)注


          閱讀本文大概需要 7 分鐘。

          來(lái)自:juejin.cn/post/7172897190627508237

          一、前言

          數(shù)據(jù)重復(fù)這個(gè)問(wèn)題其實(shí)也是挺正常,全鏈路都有可能會(huì)導(dǎo)致數(shù)據(jù)重復(fù)。
          通常,消息消費(fèi)時(shí)候都會(huì)設(shè)置一定重試次數(shù)來(lái)避免網(wǎng)絡(luò)波動(dòng)造成的影響,同時(shí)帶來(lái)副作用是可能出現(xiàn)消息重復(fù)。

          整理下消息重復(fù)的幾個(gè)場(chǎng)景:

          1.「生產(chǎn)端:」 遇到異常,基本解決措施都是 「重試」
          • 場(chǎng)景一:leader分區(qū)不可用了,拋 LeaderNotAvailableException 異常,等待選出新 leader 分區(qū)。

          • 場(chǎng)景二:Controller 所在 Broker 掛了,拋 NotControllerException 異常,等待 Controller 重新選舉。

          • 場(chǎng)景三:網(wǎng)絡(luò)異常、斷網(wǎng)、網(wǎng)絡(luò)分區(qū)、丟包等,拋 NetworkException 異常,等待網(wǎng)絡(luò)恢復(fù)。

          2.「消費(fèi)端:」 poll 一批數(shù)據(jù),處理完畢還沒(méi)提交 offset ,機(jī)子宕機(jī)重啟了,又會(huì) poll 上批數(shù)據(jù),再度消費(fèi)就造成了消息重復(fù)。

          怎么解決?

          「先來(lái)了解下消息的三種投遞語(yǔ)義:」
          • 最多一次(at most once): 消息只發(fā)一次,消息可能會(huì)丟失,但絕不會(huì)被重復(fù)發(fā)送。例如:mqtt 中 QoS = 0

          • 至少一次(at least once): 消息至少發(fā)一次,消息不會(huì)丟失,但有可能被重復(fù)發(fā)送。例如:mqtt 中 QoS = 1

          • 精確一次(exactly once): 消息精確發(fā)一次,消息不會(huì)丟失,也不會(huì)被重復(fù)發(fā)送。例如:mqtt 中 QoS = 2

          了解了這三種語(yǔ)義,再來(lái)看如何解決消息重復(fù),即如何實(shí)現(xiàn)精準(zhǔn)一次,可分為三種方法:

          1. Kafka 冪等性 Producer: 保證生產(chǎn)端發(fā)送消息冪等。局限性,是只能保證單分區(qū)且單會(huì)話(重啟后就算新會(huì)話)

          2. Kafka 事務(wù): 保證生產(chǎn)端發(fā)送消息冪等。解決冪等 Producer 的局限性。

          3. 消費(fèi)端冪等: 保證消費(fèi)端接收消息冪等。蔸底方案。

          1)Kafka 冪等性 Producer

          ?
          「冪等性指」:無(wú)論執(zhí)行多少次同樣的運(yùn)算,結(jié)果都是相同的。即一條命令,任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。
          ?
          「冪等性使用示例:在生產(chǎn)端添加對(duì)應(yīng)配置即可」

          Properties props = new Properties();  
          props.put("enable.idempotence", ture); // 1. 設(shè)置冪等  
          props.put("acks""all"); // 2. 當(dāng) enable.idempotence 為 true,這里默認(rèn)為 all  
          props.put("max.in.flight.requests.per.connection", 5); // 3. 注意

          1.設(shè)置冪等,啟動(dòng)冪等。
          2.配置 acks,注意:一定要設(shè)置 acks=all,否則會(huì)拋異常。
          3.配置max.in.flight.requests.per.connection 需要 <= 5,否則會(huì)拋異常 OutOfOrderSequenceException
          • 0.11 >= Kafka < 1.1max.in.flight.request.per.connection = 1

          • Kafka >= 1.1max.in.flight.request.per.connection <= 5

          為了更好理解,需要了解下Kafka 冪等機(jī)制:

          1.Producer 每次啟動(dòng)后,會(huì)向 Broker 申請(qǐng)一個(gè)全局唯一的 pid。(重啟后 pid 會(huì)變化,這也是弊端之一)
          2.Sequence Numbe:針對(duì)每個(gè) <Topic, Partition> 都對(duì)應(yīng)一個(gè)從0開(kāi)始單調(diào)遞增的 Sequence,同時(shí) Broker端會(huì)緩存這個(gè) seq num
          3.判斷是否重復(fù): 拿 <pid, seq num> 去 Broker 里對(duì)應(yīng)的隊(duì)列 ProducerStateEntry.Queue(默認(rèn)隊(duì)列長(zhǎng)度為 5)查詢是否存在
          • 如果 nextSeq == lastSeq + 1,即 服務(wù)端seq + 1 == 生產(chǎn)傳入seq,則接收。

          • 如果 nextSeq == 0 && lastSeq == Int.MaxValue,即剛初始化,也接收。

          • 反之,要么重復(fù),要么丟消息,均拒絕。

          這種設(shè)計(jì)針對(duì)解決了兩個(gè)問(wèn)題:

          1. 「消息重復(fù):」 場(chǎng)景 Broker 保存消息后還沒(méi)發(fā)送 ack 就宕機(jī)了,這時(shí)候 Producer 就會(huì)重試,這就造成消息重復(fù)。

          2. 「消息亂序:」 避免場(chǎng)景,前一條消息發(fā)送失敗而其后一條發(fā)送成功,前一條消息重試后成功,造成的消息亂序。

          那什么時(shí)候該使用冪等:

          1. 如果已經(jīng)使用 acks=all,使用冪等也可以。

          2. 如果已經(jīng)使用 acks=0 或者 acks=1,說(shuō)明你的系統(tǒng)追求高性能,對(duì)數(shù)據(jù)一致性要求不高。不要使用冪等。

          2)Kafka 事務(wù)

          ?
          使用 Kafka 事務(wù)解決冪等的弊端:?jiǎn)螘?huì)話且單分區(qū)冪等。
          Tips:」 這塊篇幅較長(zhǎng),這先稍微提及下使用,之后另起一篇。
          ?
          「事務(wù)使用示例:分為生產(chǎn)端 和 消費(fèi)端」

          Properties props = new Properties();  
          props.put("enable.idempotence", ture); // 1. 設(shè)置冪等  
          props.put("acks""all"); // 2. 當(dāng) enable.idempotence 為 true,這里默認(rèn)為 all  
          props.put("max.in.flight.requests.per.connection"5); // 3. 最大等待數(shù)  
          props.put("transactional.id""my-transactional-id"); // 4. 設(shè)定事務(wù) id  
            
          Producer<String, String> producer = new KafkaProducer<String, String>(props);  
            
          // 初始化事務(wù)  
          producer.initTransactions();  
            
          try{  
              // 開(kāi)始事務(wù)  
              producer.beginTransaction();  
            
              // 發(fā)送數(shù)據(jù)  
              producer.send(new ProducerRecord<String, String>("Topic""Key""Value"));  
             
              // 數(shù)據(jù)發(fā)送及 Offset 發(fā)送均成功的情況下,提交事務(wù)  
              producer.commitTransaction();  
          catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {  
              // 數(shù)據(jù)發(fā)送或者 Offset 發(fā)送出現(xiàn)異常時(shí),終止事務(wù)  
              producer.abortTransaction();  
          finally {  
              // 關(guān)閉 Producer 和 Consumer  
              producer.close();  
              consumer.close();  
          }

          這里消費(fèi)端Consumer 需要設(shè)置下配置:isolation.level 參數(shù)
          • read_uncommitted:」 這是默認(rèn)值,表明 Consumer 能夠讀取到 Kafka 寫(xiě)入的任何消息,不論事務(wù)型 Producer 提交事務(wù)還是終止事務(wù),其寫(xiě)入的消息都可以讀取。如果你用了事務(wù)型 Producer,那么對(duì)應(yīng)的 Consumer 就不要使用這個(gè)值。

          • read_committed:」 表明 Consumer 只會(huì)讀取事務(wù)型 Producer 成功提交事務(wù)寫(xiě)入的消息。當(dāng)然了,它也能看到非事務(wù)型 Producer 寫(xiě)入的所有消息。

          3)消費(fèi)端冪等

          ?
          “如何解決消息重復(fù)?” 這個(gè)問(wèn)題,其實(shí)換一種說(shuō)法:就是如何解決消費(fèi)端冪等性問(wèn)題。
          只要消費(fèi)端具備了冪等性,那么重復(fù)消費(fèi)消息的問(wèn)題也就解決了。
          ?
          「典型的方案是使用:消息表,來(lái)去重:」
          • 上述栗子中,消費(fèi)端拉取到一條消息后,開(kāi)啟事務(wù),將消息Id 新增到本地消息表中,同時(shí)更新訂單信息。

          • 如果消息重復(fù),則新增操作 insert 會(huì)異常,同時(shí)觸發(fā)事務(wù)回滾。

          二、案例:Kafka 冪等性 Producer 使用

          ?
          環(huán)境搭建可參考:https://developer.confluent.io/tutorials/message-ordering/kafka.html#view-all-records-in-the-topic
          ?
          「準(zhǔn)備工作如下:」
          1、Zookeeper:本地使用 Docker 啟動(dòng)

          $ docker run -d --name zookeeper -p 2181:2181 zookeeper  
          a86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4

          2、Kafka:版本 2.7.1,源碼編譯啟動(dòng)(看上文源碼搭建啟動(dòng))
          3、啟動(dòng)生產(chǎn)者:Kafka 源碼中 exmaple 中
          4、啟動(dòng)消息者:可以用 Kafka 提供的腳本

          # 舉個(gè)栗子:topic 需要自己去修改  
          cd ./kafka-2.7.1-src/bin  
          $ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

          創(chuàng)建topic :1副本,2 分區(qū)

          $ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2  
            
          # 查看  
          $ ./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe

                     「生產(chǎn)者代碼:」

          public class KafkaProducerApplication {  
            
              private final Producer<String, String> producer;  
              final String outTopic;  
            
              public KafkaProducerApplication(final Producer<String, String> producer,  
                                              final String topic)
           
          {  
                  this.producer = producer;  
                  outTopic = topic;  
              }  
            
              public void produce(final String message) {  
                  final String[] parts = message.split("-");  
                  final String key, value;  
                  if (parts.length > 1) {  
                      key = parts[0];  
                      value = parts[1];  
                  } else {  
                      key = null;  
                      value = parts[0];  
                  }  
                  final ProducerRecord<String, String> producerRecord  
                      = new ProducerRecord<>(outTopic, key, value);  
                  producer.send(producerRecord,  
                          (recordMetadata, e) -> {  
                              if(e != null) {  
                                  e.printStackTrace();  
                              } else {  
                                  System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset());  
                              }  
                          }  
                  );  
              }  
            
              public void shutdown() {  
                  producer.close();  
              }  
            
              public static void main(String[] args) {  
            
                  final Properties props = new Properties();  
            
                  props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");  
                  props.put(ProducerConfig.ACKS_CONFIG, "all");  
            
                  props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp");  
                  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);  
                  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);  
            
                  final String topic = "myTopic";  
                  final Producer<String, String> producer = new KafkaProducer<>(props);  
                  final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic);  
            
                  String filePath = "/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";  
                  try {  
                      List<String> linesToProduce = Files.readAllLines(Paths.get(filePath));  
                      linesToProduce.stream().filter(l -> !l.trim().isEmpty())  
                              .forEach(producerApp::produce);  
                      System.out.println("Offsets and timestamps committed in batch from " + filePath);  
                  } catch (IOException e) {  
                      System.err.printf("Error reading file %s due to %s %n", filePath, e);  
                  } finally {  
                      producerApp.shutdown();  
                  }  
              }  
          }

          「啟動(dòng)生產(chǎn)者后,控制臺(tái)輸出如下:」
          「啟動(dòng)消費(fèi)者:」

          $ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic

          修改配置 acks

          啟用冪等的情況下,調(diào)整acks 配置,生產(chǎn)者啟動(dòng)后結(jié)果是怎樣的:
          • 修改配置 acks = 1

          • 修改配置 acks = 0

          會(huì)直接報(bào)錯(cuò):

          Exception in thread "main" org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer.  
          Otherwise we cannot guarantee idempotence.

          修改配置 max.in.flight.requests.per.connection

          「啟用冪等的情況下,調(diào)整此配置,結(jié)果是怎樣的:」
          將  max.in.flight.requests.per.connection > 5 會(huì)怎樣?
          「當(dāng)然會(huì)報(bào)錯(cuò):」

          Caused by: org.apache.kafka.common.config.ConfigException: Must set max.in.flight.requests.per.connection to at most 5 to use the idempotent producer.

          <END>

          推薦閱讀:

          誓死要將Notepad++拉下馬,大佬推出了一款國(guó)產(chǎn)開(kāi)源編輯器.....

          Linux服務(wù)器存在大量log日志,如何快速定位錯(cuò)誤?

          互聯(lián)網(wǎng)初中高級(jí)大廠面試題(9個(gè)G)

          內(nèi)容包含Java基礎(chǔ)、JavaWeb、MySQL性能優(yōu)化、JVM、鎖、百萬(wàn)并發(fā)、消息隊(duì)列、高性能緩存、反射、Spring全家桶原理、微服務(wù)、Zookeeper......等技術(shù)棧!

          ?戳閱讀原文領(lǐng)取!                                  朕已閱 

          瀏覽 23
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  黄色成人免费在线播放 | 天天干天天艹天天日 | 亚洲操逼123 | 天唐操逼在线观看 | 三级毛骗免费看电影 |