<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          如何解決 Kafka 消息重復(fù)問(wèn)題!

          共 10304字,需瀏覽 21分鐘

           ·

          2023-01-29 10:50

          一、前言

          數(shù)據(jù)重復(fù)這個(gè)問(wèn)題其實(shí)也是挺正常,全鏈路都有可能會(huì)導(dǎo)致數(shù)據(jù)重復(fù)。

          cb9ae3b09cb6d141e7e9e97af9e83f27.webp

          通常,消息消費(fèi)時(shí)候都會(huì)設(shè)置一定重試次數(shù)來(lái)避免網(wǎng)絡(luò)波動(dòng)造成的影響,同時(shí)帶來(lái)副作用是可能出現(xiàn)消息重復(fù)。

          整理下消息重復(fù)的幾個(gè)場(chǎng)景:

          1.「生產(chǎn)端:」?遇到異常,基本解決措施都是?「重試」

          • 場(chǎng)景一:leader分區(qū)不可用了,拋?LeaderNotAvailableException?異常,等待選出新?leader?分區(qū)。

          • 場(chǎng)景二:Controller?所在?Broker?掛了,拋?NotControllerException?異常,等待?Controller?重新選舉。

          • 場(chǎng)景三:網(wǎng)絡(luò)異常、斷網(wǎng)、網(wǎng)絡(luò)分區(qū)、丟包等,拋?NetworkException?異常,等待網(wǎng)絡(luò)恢復(fù)。

          2.「消費(fèi)端:」?poll?一批數(shù)據(jù),處理完畢還沒(méi)提交?offset?,機(jī)子宕機(jī)重啟了,又會(huì)?poll?上批數(shù)據(jù),再度消費(fèi)就造成了消息重復(fù)。

          怎么解決?

          「先來(lái)了解下消息的三種投遞語(yǔ)義:」

          • 最多一次(at most once):?消息只發(fā)一次,消息可能會(huì)丟失,但絕不會(huì)被重復(fù)發(fā)送。例如:mqtt?中?QoS = 0

          • 至少一次(at least once):?消息至少發(fā)一次,消息不會(huì)丟失,但有可能被重復(fù)發(fā)送。例如:mqtt?中?QoS = 1

          • 精確一次(exactly once):?消息精確發(fā)一次,消息不會(huì)丟失,也不會(huì)被重復(fù)發(fā)送。例如:mqtt?中?QoS = 2

          了解了這三種語(yǔ)義,再來(lái)看如何解決消息重復(fù),即如何實(shí)現(xiàn)精準(zhǔn)一次,可分為三種方法:

          1. Kafka?冪等性?Producer:?保證生產(chǎn)端發(fā)送消息冪等。局限性,是只能保證單分區(qū)且單會(huì)話(重啟后就算新會(huì)話)

          2. Kafka?事務(wù):?保證生產(chǎn)端發(fā)送消息冪等。解決冪等?Producer?的局限性。

          3. 消費(fèi)端冪等:?保證消費(fèi)端接收消息冪等。蔸底方案。

          1)Kafka?冪等性?Producer

          ?

          「冪等性指」:無(wú)論執(zhí)行多少次同樣的運(yùn)算,結(jié)果都是相同的。即一條命令,任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。

          ?

          「冪等性使用示例:在生產(chǎn)端添加對(duì)應(yīng)配置即可」

                
                Properties?props?=?new?Properties();??
          props.put("enable.idempotence",?ture);?//?1.?設(shè)置冪等??
          props.put("acks",?"all");?//?2.?當(dāng)?enable.idempotence?為?true,這里默認(rèn)為?all??
          props.put("max.in.flight.requests.per.connection",?5);?//?3.?注意

          1.設(shè)置冪等,啟動(dòng)冪等。

          2.配置?acks,注意:一定要設(shè)置?acks=all,否則會(huì)拋異常。

          3.配置max.in.flight.requests.per.connection?需要?<= 5,否則會(huì)拋異常?OutOfOrderSequenceException

          • 0.11 >= Kafka < 1.1,?max.in.flight.request.per.connection = 1

          • Kafka >= 1.1,?max.in.flight.request.per.connection <= 5

          為了更好理解,需要了解下Kafka 冪等機(jī)制:

          8f0a57764184f8f2bc0052b677afadaf.webp

          1.Producer?每次啟動(dòng)后,會(huì)向?Broker?申請(qǐng)一個(gè)全局唯一的?pid。(重啟后?pid?會(huì)變化,這也是弊端之一)

          2.Sequence Numbe:針對(duì)每個(gè)?<Topic, Partition>?都對(duì)應(yīng)一個(gè)從0開(kāi)始單調(diào)遞增的?Sequence,同時(shí)?Broker端會(huì)緩存這個(gè)?seq num

          3.判斷是否重復(fù):?拿?<pid, seq num>?去?Broker?里對(duì)應(yīng)的隊(duì)列?ProducerStateEntry.Queue(默認(rèn)隊(duì)列長(zhǎng)度為 5)查詢是否存在

          • 如果?nextSeq == lastSeq + 1,即?服務(wù)端seq + 1 == 生產(chǎn)傳入seq,則接收。

          • 如果?nextSeq == 0 && lastSeq == Int.MaxValue,即剛初始化,也接收。

          • 反之,要么重復(fù),要么丟消息,均拒絕。

          77e822cfc289eb00e4f25f4cd5bea8ba.webp

          這種設(shè)計(jì)針對(duì)解決了兩個(gè)問(wèn)題:

          1. 「消息重復(fù):」?場(chǎng)景?Broker?保存消息后還沒(méi)發(fā)送?ack?就宕機(jī)了,這時(shí)候?Producer?就會(huì)重試,這就造成消息重復(fù)。

          2. 「消息亂序:」?避免場(chǎng)景,前一條消息發(fā)送失敗而其后一條發(fā)送成功,前一條消息重試后成功,造成的消息亂序。

          那什么時(shí)候該使用冪等:

          1. 如果已經(jīng)使用?acks=all,使用冪等也可以。

          2. 如果已經(jīng)使用?acks=0?或者?acks=1,說(shuō)明你的系統(tǒng)追求高性能,對(duì)數(shù)據(jù)一致性要求不高。不要使用冪等。

          2)Kafka?事務(wù)

          ?

          使用?Kafka?事務(wù)解決冪等的弊端:?jiǎn)螘?huì)話且單分區(qū)冪等。

          Tips:」?這塊篇幅較長(zhǎng),這先稍微提及下使用,之后另起一篇。

          ?

          「事務(wù)使用示例:分為生產(chǎn)端 和 消費(fèi)端」

                
                Properties?props?=?new?Properties();??
          props.put("enable.idempotence",?ture);?//?1.?設(shè)置冪等??
          props.put("acks",?"all");?//?2.?當(dāng)?enable.idempotence?為?true,這里默認(rèn)為?all??
          props.put("max.in.flight.requests.per.connection",?5);?//?3.?最大等待數(shù)??
          props.put("transactional.id",?"my-transactional-id");?//?4.?設(shè)定事務(wù)?id??
          ??
          Producer<String,?String>?producer?=?new?KafkaProducer<String,?String>(props);??
          ??
          //?初始化事務(wù)??
          producer.initTransactions();??
          ??
          try{??
          ????//?開(kāi)始事務(wù)??
          ????producer.beginTransaction();??
          ??
          ????//?發(fā)送數(shù)據(jù)??
          ????producer.send(new?ProducerRecord<String,?String>("Topic",?"Key",?"Value"));??
          ???
          ????//?數(shù)據(jù)發(fā)送及?Offset?發(fā)送均成功的情況下,提交事務(wù)??
          ????producer.commitTransaction();??
          }?catch?(ProducerFencedException?|?OutOfOrderSequenceException?|?AuthorizationException?e)?{??
          ????//?數(shù)據(jù)發(fā)送或者?Offset?發(fā)送出現(xiàn)異常時(shí),終止事務(wù)??
          ????producer.abortTransaction();??
          }?finally?{??
          ????//?關(guān)閉?Producer?和?Consumer??
          ????producer.close();??
          ????consumer.close();??
          }

          這里消費(fèi)端Consumer?需要設(shè)置下配置:isolation.level?參數(shù)

          • read_uncommitted:」?這是默認(rèn)值,表明?Consumer?能夠讀取到?Kafka?寫(xiě)入的任何消息,不論事務(wù)型?Producer?提交事務(wù)還是終止事務(wù),其寫(xiě)入的消息都可以讀取。如果你用了事務(wù)型?Producer,那么對(duì)應(yīng)的?Consumer?就不要使用這個(gè)值。

          • read_committed:」?表明?Consumer?只會(huì)讀取事務(wù)型?Producer?成功提交事務(wù)寫(xiě)入的消息。當(dāng)然了,它也能看到非事務(wù)型?Producer?寫(xiě)入的所有消息。

          3)消費(fèi)端冪等

          ?

          “如何解決消息重復(fù)?” 這個(gè)問(wèn)題,其實(shí)換一種說(shuō)法:就是如何解決消費(fèi)端冪等性問(wèn)題。

          只要消費(fèi)端具備了冪等性,那么重復(fù)消費(fèi)消息的問(wèn)題也就解決了。

          ?

          「典型的方案是使用:消息表,來(lái)去重:」

          0470bcb56641d02e36e03ac595cf419c.webp
          • 上述栗子中,消費(fèi)端拉取到一條消息后,開(kāi)啟事務(wù),將消息Id?新增到本地消息表中,同時(shí)更新訂單信息。

          • 如果消息重復(fù),則新增操作?insert?會(huì)異常,同時(shí)觸發(fā)事務(wù)回滾。

          二、案例:Kafka 冪等性 Producer 使用

          ?

          環(huán)境搭建可參考:https://developer.confluent.io/tutorials/message-ordering/kafka.html#view-all-records-in-the-topic

          ?

          「準(zhǔn)備工作如下:」

          1、Zookeeper:本地使用?Docker?啟動(dòng)

                
                $?docker?run?-d?--name?zookeeper?-p?2181:2181?zookeeper??
          a86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4

          2、Kafka:版本?2.7.1,源碼編譯啟動(dòng)(看上文源碼搭建啟動(dòng))

          3、啟動(dòng)生產(chǎn)者:Kafka?源碼中?exmaple?中

          4、啟動(dòng)消息者:可以用?Kafka?提供的腳本

                
                #?舉個(gè)栗子:topic 需要自己去修改??
          $?cd?./kafka-2.7.1-src/bin??
          $?./kafka-console-producer.sh?--broker-list?localhost:9092?--topic?test_topic

          創(chuàng)建topic?:1副本,2 分區(qū)

                
                $?./kafka-topics.sh?--bootstrap-server?localhost:9092?--topic?myTopic?--create?--replication-factor?1?--partitions?2??
          ??
          #?查看??
          $?./kafka-topics.sh?--bootstrap-server?broker:9092?--topic?myTopic?--describe

          「生產(chǎn)者代碼:」

          0a0ac5659c216ca65d30fd70d73daa7b.webp
                
                public?class?KafkaProducerApplication?{??
          ??
          ????private?final?Producer<String,?String>?producer;??
          ????final?String?outTopic;??
          ??
          ????public?KafkaProducerApplication(final?Producer<String,?String>?producer,??
          ????????????????????????????????????final?String?topic)
          ?
          {??
          ????????this.producer?=?producer;??
          ????????outTopic?=?topic;??
          ????}??
          ??
          ????public?void?produce(final?String?message)?{??
          ????????final?String[]?parts?=?message.split("-");??
          ????????final?String?key,?value;??
          ????????if?(parts.length?>?1)?{??
          ????????????key?=?parts[0];??
          ????????????value?=?parts[1];??
          ????????}?else?{??
          ????????????key?=?null;??
          ????????????value?=?parts[0];??
          ????????}??
          ????????final?ProducerRecord<String,?String>?producerRecord??
          ????????????=?new?ProducerRecord<>(outTopic,?key,?value);??
          ????????producer.send(producerRecord,??
          ????????????????(recordMetadata,?e)?->?{??
          ????????????????????if(e?!=?null)?{??
          ????????????????????????e.printStackTrace();??
          ????????????????????}?else?{??
          ????????????????????????System.out.println("key/value?"?+?key?+?"/"?+?value?+?"\twritten?to?topic[partition]?"?+?recordMetadata.topic()?+?"["?+?recordMetadata.partition()?+?"]?at?offset?"?+?recordMetadata.offset());??
          ????????????????????}??
          ????????????????}??
          ????????);??
          ????}??
          ??
          ????public?void?shutdown()?{??
          ????????producer.close();??
          ????}??
          ??
          ????public?static?void?main(String[]?args)?{??
          ??
          ????????final?Properties?props?=?new?Properties();??
          ??
          ????????props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,?"true");??
          ????????props.put(ProducerConfig.ACKS_CONFIG,?"all");??
          ??
          ????????props.put(ProducerConfig.CLIENT_ID_CONFIG,?"myApp");??
          ????????props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,?StringSerializer.class);??
          ????????props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,?StringSerializer.class);??
          ??
          ????????final?String?topic?=?"myTopic";??
          ????????final?Producer<String,?String>?producer?=?new?KafkaProducer<>(props);??
          ????????final?KafkaProducerApplication?producerApp?=?new?KafkaProducerApplication(producer,?topic);??
          ??
          ????????String?filePath?=?"/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";??
          ????????try?{??
          ????????????List<String>?linesToProduce?=?Files.readAllLines(Paths.get(filePath));??
          ????????????linesToProduce.stream().filter(l?->?!l.trim().isEmpty())??
          ????????????????????.forEach(producerApp::produce);??
          ????????????System.out.println("Offsets?and?timestamps?committed?in?batch?from?"?+?filePath);??
          ????????}?catch?(IOException?e)?{??
          ????????????System.err.printf("Error?reading?file?%s?due?to?%s?%n",?filePath,?e);??
          ????????}?finally?{??
          ????????????producerApp.shutdown();??
          ????????}??
          ????}??
          }

          「啟動(dòng)生產(chǎn)者后,控制臺(tái)輸出如下:」

          8706719777f98bbf185be1dcc3d539e5.webp

          「啟動(dòng)消費(fèi)者:」

                
                $?./kafka-console-consumer.sh?--bootstrap-server?localhost:9092?--topic?myTopic
          84d0842ea24b2461ddce494766663a45.webp

          修改配置 acks

          啟用冪等的情況下,調(diào)整acks?配置,生產(chǎn)者啟動(dòng)后結(jié)果是怎樣的:

          • 修改配置?acks = 1

          • 修改配置?acks = 0

          會(huì)直接報(bào)錯(cuò):

                
                Exception?in?thread?"main"?org.apache.kafka.common.config.ConfigException:?Must?set?acks?to?all?in?order?to?use?the?idempotent?producer.??
          Otherwise?we?cannot?guarantee?idempotence.
          ceecc917126beaf21fb36a17d6756fdf.webp

          修改配置 max.in.flight.requests.per.connection

          「啟用冪等的情況下,調(diào)整此配置,結(jié)果是怎樣的:」

          將 ?max.in.flight.requests.per.connection > 5?會(huì)怎樣?

          053b57e910b7e437f200a8eb642d3df0.webp

          「當(dāng)然會(huì)報(bào)錯(cuò):」

                
                Caused?by:?org.apache.kafka.common.config.ConfigException:?Must?set?max.in.flight.requests.per.connection?to?at?most?5?to?use?the?idempotent?producer.
          314d47ad6900d09c690a312730deaf6f.webp

          作者:格格步入

          來(lái)源:juejin.cn/post/7172897190627508237

              
                
                  
                    

          1.?負(fù)載均衡 LVS vs Nginx 對(duì)比!還傻傻分不清?

          2.?一款強(qiáng)大的開(kāi)源認(rèn)證和訪問(wèn)控制利器:KeyCloak 太牛了!

          3.?MySQL 批量操作,一次插入多少行數(shù)據(jù)效率最高?

          4.?美團(tuán)動(dòng)態(tài)線程池,香啊!

                      

          最近面試BAT,整理一份面試資料 Java面試BATJ通關(guān)手冊(cè) ,覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫(kù)、數(shù)據(jù)結(jié)構(gòu)等等。

          獲取方式:點(diǎn)“ 在看 ”,關(guān)注公眾號(hào)并回復(fù)? Java ?領(lǐng)取,更多內(nèi)容陸續(xù)奉上。

                      

          PS:因公眾號(hào)平臺(tái)更改了推送規(guī)則,如果不想錯(cuò)過(guò)內(nèi)容,記得讀完點(diǎn)一下 在看 ,加個(gè) 星標(biāo) ,這樣每次新文章推送才會(huì)第一時(shí)間出現(xiàn)在你的訂閱列表里。

          點(diǎn)“在看”支持小哈呀,謝謝啦

          瀏覽 99
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产乱码精品一品二品 | 优希麻琴无码一区二区三区 | 国产一级a一级a免费视频 | 精品三级在线观看 | 《精品 模特私拍秘 泄密》学院派 |