<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          一碰就頭疼的 Kafka 消息重復(fù)問(wèn)題,立馬解決!

          共 12621字,需瀏覽 26分鐘

           ·

          2023-02-03 23:17

          點(diǎn)擊關(guān)注公眾號(hào),Java干貨及時(shí)送達(dá)

          來(lái)源:juejin.cn/post/7172897190627508237
          • 一、前言
          • 二、案例:Kafka 冪等性 Producer 使用

          一、前言

          數(shù)據(jù)重復(fù)這個(gè)問(wèn)題其實(shí)也是挺正常,全鏈路都有可能會(huì)導(dǎo)致數(shù)據(jù)重復(fù)。

          通常,消息消費(fèi)時(shí)候都會(huì)設(shè)置一定重試次數(shù)來(lái)避免網(wǎng)絡(luò)波動(dòng)造成的影響,同時(shí)帶來(lái)副作用是可能出現(xiàn)消息重復(fù)。

          整理下消息重復(fù)的幾個(gè)場(chǎng)景:
          1. 生產(chǎn)端: 遇到異常,基本解決措施都是 重試
          • 場(chǎng)景一:leader分區(qū)不可用了,拋 LeaderNotAvailableException 異常,等待選出新 leader 分區(qū)。
          • 場(chǎng)景二:Controller 所在 Broker 掛了,拋 NotControllerException 異常,等待 Controller 重新選舉。
          • 場(chǎng)景三:網(wǎng)絡(luò)異常、斷網(wǎng)、網(wǎng)絡(luò)分區(qū)、丟包等,拋 NetworkException 異常,等待網(wǎng)絡(luò)恢復(fù)。
          1. 消費(fèi)端: poll 一批數(shù)據(jù),處理完畢還沒(méi)提交 offset ,機(jī)子宕機(jī)重啟了,又會(huì) poll 上批數(shù)據(jù),再度消費(fèi)就造成了消息重復(fù)。
          怎么解決?

          先來(lái)了解下消息的三種投遞語(yǔ)義:

          • 最多一次( at most once): 消息只發(fā)一次,消息可能會(huì)丟失,但絕不會(huì)被重復(fù)發(fā)送。例如:mqttQoS = 0
          • 至少一次( at least once): 消息至少發(fā)一次,消息不會(huì)丟失,但有可能被重復(fù)發(fā)送。例如:mqttQoS = 1
          • 精確一次( exactly once): 消息精確發(fā)一次,消息不會(huì)丟失,也不會(huì)被重復(fù)發(fā)送。例如:mqttQoS = 2
          了解了這三種語(yǔ)義,再來(lái)看如何解決消息重復(fù),即如何實(shí)現(xiàn)精準(zhǔn)一次,可分為三種方法:
          1. Kafka 冪等性 Producer 保證生產(chǎn)端發(fā)送消息冪等。局限性,是只能保證單分區(qū)且單會(huì)話(重啟后就算新會(huì)話)
          2. Kafka 事務(wù): 保證生產(chǎn)端發(fā)送消息冪等。解決冪等 Producer 的局限性。
          3. 消費(fèi)端冪等:保證消費(fèi)端接收消息冪等。蔸底方案。
          1)Kafka 冪等性 Producer

          冪等性指 :無(wú)論執(zhí)行多少次同樣的運(yùn)算,結(jié)果都是相同的。即一條命令,任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。

          冪等性使用示例:在生產(chǎn)端添加對(duì)應(yīng)配置即可

          Properties props = new Properties();
          props.put("enable.idempotence", ture); // 1. 設(shè)置冪等
          props.put("acks""all"); // 2. 當(dāng) enable.idempotence 為 true,這里默認(rèn)為 all
          props.put("max.in.flight.requests.per.connection"5); // 3. 注意
          1. 設(shè)置冪等,啟動(dòng)冪等。

          2. 配置 acks,注意:一定要設(shè)置 acks=all,否則會(huì)拋異常。

          3. 配置 max.in.flight.requests.per.connection 需要 <= 5 ,否則會(huì)拋異常 OutOfOrderSequenceException

          • 0.11 >= Kafka < 1.1, max.in.flight.request.per.connection = 1
          • Kafka >= 1.1, max.in.flight.request.per.connection <= 5
          [**為了更好理解,需要了解下\ \Kafka 冪等機(jī)制:](https://mp.weixin.qq.com/s/PiAxqEhkR8g1AOYGGS5Yqw)
          1. Producer 每次啟動(dòng)后,會(huì)向 Broker 申請(qǐng)一個(gè)全局唯一的 pid。(重啟后 pid 會(huì)變化,這也是弊端之一)

          2. Sequence Numbe:針對(duì)每個(gè) <Topic, Partition> 都對(duì)應(yīng)一個(gè)從0開始單調(diào)遞增的 Sequence,同時(shí) Broker端會(huì)緩存這個(gè) seq num

          3. 判斷是否重復(fù):<pid, seq num>Broker 里對(duì)應(yīng)的隊(duì)列 ProducerStateEntry.Queue(默認(rèn)隊(duì)列長(zhǎng)度為 5)查詢是否存在

          • 如果 nextSeq == lastSeq + 1,即 服務(wù)端seq + 1 == 生產(chǎn)傳入seq,則接收。
          • 如果 nextSeq == 0 && lastSeq == Int.MaxValue,即剛初始化,也接收。
          • 反之,要么重復(fù),要么丟消息,均拒絕。
          這種設(shè)計(jì)針對(duì)解決了兩個(gè)問(wèn)題:
          1. 消息重復(fù): 場(chǎng)景 Broker 保存消息后還沒(méi)發(fā)送 ack 就宕機(jī)了,這時(shí)候 Producer 就會(huì)重試,這就造成消息重復(fù)。
          2. 消息亂序: 避免場(chǎng)景,前一條消息發(fā)送失敗而其后一條發(fā)送成功,前一條消息重試后成功,造成的消息亂序。
          那什么時(shí)候該使用冪等:
          1. 如果已經(jīng)使用 acks=all,使用冪等也可以。
          2. 如果已經(jīng)使用 acks=0 或者 acks=1,說(shuō)明你的系統(tǒng)追求高性能,對(duì)數(shù)據(jù)一致性要求不高。不要使用冪等。
          2)Kafka 事務(wù)

          使用 Kafka 事務(wù)解決冪等的弊端:?jiǎn)螘?huì)話且單分區(qū)冪等。

          Tips 這塊篇幅較長(zhǎng),這先稍微提及下使用,之后另起一篇。

          事務(wù)使用示例:分為生產(chǎn)端 和 消費(fèi)端

          Properties props = new Properties();
          props.put("enable.idempotence", ture); // 1. 設(shè)置冪等
          props.put("acks""all"); // 2. 當(dāng) enable.idempotence 為 true,這里默認(rèn)為 all
          props.put("max.in.flight.requests.per.connection"5); // 3. 最大等待數(shù)
          props.put("transactional.id""my-transactional-id"); // 4. 設(shè)定事務(wù) id

          Producer<String, String> producer = new KafkaProducer<String, String>(props);

          // 初始化事務(wù)
          producer.initTransactions();

          try{
              // 開始事務(wù)
              producer.beginTransaction();

              // 發(fā)送數(shù)據(jù)
              producer.send(new ProducerRecord<String, String>("Topic""Key""Value"));
           
              // 數(shù)據(jù)發(fā)送及 Offset 發(fā)送均成功的情況下,提交事務(wù)
              producer.commitTransaction();
          catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
              // 數(shù)據(jù)發(fā)送或者 Offset 發(fā)送出現(xiàn)異常時(shí),終止事務(wù)
              producer.abortTransaction();
          finally {
              // 關(guān)閉 Producer 和 Consumer
              producer.close();
              consumer.close();
          }

          這里消費(fèi)端 Consumer 需要設(shè)置下配置:isolation.level 參數(shù)

          • read_uncommitted 這是默認(rèn)值,表明 Consumer 能夠讀取到 Kafka 寫入的任何消息,不論事務(wù)型 Producer 提交事務(wù)還是終止事務(wù),其寫入的消息都可以讀取。如果你用了事務(wù)型 Producer,那么對(duì)應(yīng)的 Consumer 就不要使用這個(gè)值。
          • read_committed 表明 Consumer 只會(huì)讀取事務(wù)型 Producer 成功提交事務(wù)寫入的消息。當(dāng)然了,它也能看到非事務(wù)型 Producer 寫入的所有消息。
          3)消費(fèi)端冪等

          “如何解決消息重復(fù)?” 這個(gè)問(wèn)題,其實(shí)換一種說(shuō)法:就是如何解決消費(fèi)端冪等性問(wèn)題。

          只要消費(fèi)端具備了冪等性,那么重復(fù)消費(fèi)消息的問(wèn)題也就解決了。

          典型的方案是使用:消息表,來(lái)去重:

          • 上述栗子中,消費(fèi)端拉取到一條消息后,開啟事務(wù),將消息Id 新增到本地消息表中,同時(shí)更新訂單信息。
          • 如果消息重復(fù),則新增操作 insert 會(huì)異常,同時(shí)觸發(fā)事務(wù)回滾。

          基于 Spring Boot + MyBatis Plus + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能

          • 項(xiàng)目地址:https://github.com/YunaiV/ruoyi-vue-pro
          • 視頻教程:https://doc.iocoder.cn/video/

          二、案例:Kafka 冪等性 Producer 使用

          環(huán)境搭建可參考:https://developer.confluent.io/tutorials/message-ordering/kafka.html#view-all-records-in-the-topic

          準(zhǔn)備工作如下:

          1、Zookeeper:本地使用 Docker 啟動(dòng)

          $ docker run -d --name zookeeper -p 2181:2181 zookeeper
          a86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4

          2、Kafka:版本 2.7.1,源碼編譯啟動(dòng)(看上文源碼搭建啟動(dòng))

          3、啟動(dòng)生產(chǎn)者:Kafka 源碼中 exmaple

          4、啟動(dòng)消息者:可以用 Kafka 提供的腳本


          > 基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
          >
          > * 項(xiàng)目地址:<https://github.com/YunaiV/yudao-cloud>
          > * 視頻教程:<https://doc.iocoder.cn/video/>

          # 舉個(gè)栗子:topic 需要自己去修改
          $ cd ./kafka-2.7.1-src/bin
          $ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

          創(chuàng)建 topic 1副本,2 分區(qū)

          $ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2

          # 查看
          $ ./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe

          生產(chǎn)者代碼:

          public class KafkaProducerApplication {

              private final Producer<String, String> producer;
              final String outTopic;

              public KafkaProducerApplication(final Producer<String, String> producer,
                                              final String topic)
           
          {
                  this.producer = producer;
                  outTopic = topic;
              }

              public void produce(final String message) {
                  final String[] parts = message.split("-");
                  final String key, value;
                  if (parts.length > 1) {
                      key = parts[0];
                      value = parts[1];
                  } else {
                      key = null;
                      value = parts[0];
                  }
                  final ProducerRecord<String, String> producerRecord
                      = new ProducerRecord<>(outTopic, key, value);
                  producer.send(producerRecord,
                          (recordMetadata, e) -> {
                              if(e != null) {
                                  e.printStackTrace();
                              } else {
                                  System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset());
                              }
                          }
                  );
              }

              public void shutdown() {
                  producer.close();
              }

              public static void main(String[] args) {

                  final Properties props = new Properties();

                  props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
                  props.put(ProducerConfig.ACKS_CONFIG, "all");

                  props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp");
                  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
                  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

                  final String topic = "myTopic";
                  final Producer<String, String> producer = new KafkaProducer<>(props);
                  final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic);

                  String filePath = "/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";
                  try {
                      List<String> linesToProduce = Files.readAllLines(Paths.get(filePath));
                      linesToProduce.stream().filter(l -> !l.trim().isEmpty())
                              .forEach(producerApp::produce);
                      System.out.println("Offsets and timestamps committed in batch from " + filePath);
                  } catch (IOException e) {
                      System.err.printf("Error reading file %s due to %s %n", filePath, e);
                  } finally {
                      producerApp.shutdown();
                  }
              }
          }

          啟動(dòng)生產(chǎn)者后,控制臺(tái)輸出如下:

          啟動(dòng)消費(fèi)者:

          $ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic
          修改配置 acks
          ``

          啟用冪等的情況下,調(diào)整 acks 配置,生產(chǎn)者啟動(dòng)后結(jié)果是怎樣的:

          • 修改配置 acks = 1
          • 修改配置 acks = 0

          會(huì)直接報(bào)錯(cuò):

          Exception in thread "main" org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer.
          Otherwise we cannot guarantee idempotence.
          修改配置 max.in.flight.requests.per.connection
          ``

          啟用冪等的情況下,調(diào)整此配置,結(jié)果是怎樣的:

          將  max.in.flight.requests.per.connection > 5 會(huì)怎樣?

          當(dāng)然會(huì)報(bào)錯(cuò):

          Caused by: org.apache.kafka.common.config.ConfigException: Must set max.in.flight.requests.per.connection to at most 5 to use the idempotent producer.

            

          1、社區(qū)糾紛不斷:程序員何苦為難程序員?

          2、該死的單元測(cè)試,寫起來(lái)到底有多痛?

          3、互聯(lián)網(wǎng)人為什么學(xué)不會(huì)擺爛

          4、為什么國(guó)外JetBrains做 IDE 就可以養(yǎng)活自己,國(guó)內(nèi)不行?區(qū)別在哪?

          5、相比高人氣的Rust、Go,為何 Java、C 在工具層面進(jìn)展緩慢?

          6、讓程序員早點(diǎn)下班的《技術(shù)寫作指南》

          點(diǎn)

          點(diǎn)

          點(diǎn)點(diǎn)

          點(diǎn)在看

          瀏覽 33
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  黄色在线观看免费 | 无码六区| 国产高清免费视频在线观看一区 | 中文字幕在线一区 | 伊人狠狠 |