<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Kafka 3.0新特性全面曝光,真香!

          共 23805字,需瀏覽 48分鐘

           ·

          2022-06-19 22:48

          導(dǎo)語 | kafka3.0的版本已經(jīng)試推行去zk的kafka架構(gòu)了,如果去掉了zk,那么在kafka新的版本當(dāng)中使用什么技術(shù)來代替了zk的位置呢,接下來我們一起來一探究竟,了解kafka的內(nèi)置共識機制和raft算法。


          一、Kafka簡介


          Kafka是一款開源的消息引擎系統(tǒng)。一個典型的Kafka體系架構(gòu)包括若干Producer、若干Broker、若干Consumer,以及一個ZooKeeper集群,如上圖所示。其中ZooKeeper是Kafka用來負(fù)責(zé)集群元數(shù)據(jù)的管理、控制器的選舉等操作的。Producer將消息發(fā)送到Broker,Broker負(fù)責(zé)將收到的消息存儲到磁盤中,而Consumer負(fù)責(zé)從Broker訂閱并消費消息。


          (一)Kafka核心組件


          • producer:消息生產(chǎn)者,就是向broker發(fā)送消息的客戶端。   


          • consumer:消息消費者,就是從broker拉取數(shù)據(jù)的客戶端。


          • consumer group:消費者組,由多個消費者consumer組成。消費者組內(nèi)每個消費者負(fù)責(zé)消費不同的分區(qū),一個分區(qū)只能由同一個消費者組內(nèi)的一個消費者消費;消費者組之間相互獨立,互不影響。所有的消費者都屬于某個消費者組,即消費者組是一個邏輯上的訂閱者。


          • broker:一臺服務(wù)器就是一個broker,一個集群由多個broker組成,一個broker可以有多個topic。


          • topic可以理解為一個隊列,所有的生產(chǎn)者和消費者都是面向topic的。


          • partition:分區(qū),kafka中的topic為了提高拓展性和實現(xiàn)高可用而將它分布到不同的broker中,一個topic可以分為多個partition,每個partition都是有序的,即消息發(fā)送到隊列的順序跟消費時拉取到的順序是一致的。


          • replication:副本。一個topic對應(yīng)的分區(qū)partition可以有多個副本,多個副本中只有一個為leader,其余的為follower。為了保證數(shù)據(jù)的高可用性,leader和follower會盡量均勻的分布在各個broker中,避免了leader所在的服務(wù)器宕機而導(dǎo)致topic不可用的問題。  



          (二)kafka2當(dāng)中zk的作用



          • /admin:主要保存kafka當(dāng)中的核心的重要信息,包括類似于已經(jīng)刪除的topic就會保存在這個路徑下面。


          • /brokers:主要用于保存kafka集群當(dāng)中的broker信息,以及沒被刪除的topic信息。


          • /cluster: 主要用于保存kafka集群的唯一id信息,每個kafka集群都會給分配要給唯一id,以及對應(yīng)的版本號。


          • /config: 集群配置信息。


          • /controller:kafka集群當(dāng)中的控制器信息,控制器組件(Controller),是Apache Kafka的核心組件。它的主要作用是在Apache ZooKeeper的幫助下管理和協(xié)調(diào)整個Kafka集群。


          • /controller_epoch:主要用于保存記錄controller的選舉的次數(shù)。


          • /isr_change_notification:isr列表發(fā)生變更時候的通知,在kafka當(dāng)中由于存在ISR列表變更的情況發(fā)生,為了保證ISR列表更新的及時性,定義了isr_change_notification這個節(jié)點,主要用于通知Controller來及時將ISR列表進(jìn)行變更。


          • /latest_producer_id_block:使用`/latest_producer_id_block`節(jié)點來保存PID塊,主要用于能夠保證生產(chǎn)者的任意寫入請求都能夠得到響應(yīng)。


          • /log_dir_event_notification:主要用于保存當(dāng)broker當(dāng)中某些LogDir出現(xiàn)異常時候,例如磁盤損壞,文件讀寫失敗等異常時候,向ZK當(dāng)中增加一個通知序號,controller監(jiān)聽到這個節(jié)點的變化之后,就會做出對應(yīng)的處理操作。


          以上就是kafka在zk當(dāng)中保留的所有的所有的相關(guān)的元數(shù)據(jù)信息,這些元數(shù)據(jù)信息保證了kafka集群的正常運行。

          二、kafka3的安裝配置


          在kafka3的版本當(dāng)中已經(jīng)徹底去掉了對zk的依賴,如果沒有了zk集群,那么kafka當(dāng)中是如何保存元數(shù)據(jù)信息的呢,這里我們通過kafka3的集群來一探究竟。


          (一)kafka安裝配置核心重要參數(shù)


          • Controller服務(wù)器


          不管是kafka2還是kafka3當(dāng)中,controller控制器都是必不可少的,通過controller控制器來維護(hù)kafka集群的正常運行,例如ISR列表的變更,broker的上線或者下線,topic的創(chuàng)建,分區(qū)的指定等等各種操作都需要依賴于Controller,在kafka2當(dāng)中,controller的選舉需要通過zk來實現(xiàn),我們沒法控制哪些機器選舉成為Controller,而在kafka3當(dāng)中,我們可以通過配置文件來自己指定哪些機器成為Controller,這樣做的好處就是我們可以指定一些配置比較高的機器作為Controller節(jié)點,從而保證controller節(jié)點的穩(wěn)健性。


          被選中的controller節(jié)點參與元數(shù)據(jù)集群的選舉,每個controller節(jié)點要么是Active狀態(tài),或者就是standBy狀態(tài)。

          • Process.Roles


          使用KRaft模式來運行kafka集群的話,我們有一個配置叫做Process.Roles必須配置,這個參數(shù)有以下四個值可以進(jìn)行配置:


          • Process.Roles=Broker, 服務(wù)器在KRaft模式中充當(dāng)Broker。


          • Process.Roles=Controller, 服務(wù)器在KRaft模式下充當(dāng)Controller。


          • Process.Roles=Broker,Controller,服務(wù)器在KRaft模式中同時充當(dāng)Broker和Controller。


          • 如果process.roles沒有設(shè)置。那么集群就假定是運行在ZooKeeper模式下。


          如果需要從zookeeper模式轉(zhuǎn)換成為KRaft模式,那么需要進(jìn)行重新格式化。如果一個節(jié)點同時是Broker和Controller節(jié)點,那么就稱之為組合節(jié)點。


          實際工作當(dāng)中,如果有條件的話,盡量還是將Broker和Controller節(jié)點進(jìn)行分離部署。避免由于服務(wù)器資源不夠的情況導(dǎo)致OOM等一系列的問題



          • Quorum Voters


          通過controller.quorum.voters配置來實習(xí)哪些節(jié)點是Quorum的投票節(jié)點,所有想要成為控制器的節(jié)點,都必須放到這個配置里面。


          每個Broker和每個Controller都必須配置Controller.quorum.voters,該配置當(dāng)中提供的節(jié)點ID必須與提供給服務(wù)器的節(jié)點ID保持一直。


          每個Broker和每個Controller 都必須設(shè)置 controller.quorum.voters。需要注意的是,controller.quorum.voters 配置中提供的節(jié)點ID必須與提供給服務(wù)器的節(jié)點ID匹配。


          比如在Controller1上,node.Id必須設(shè)置為1,以此類推。注意,控制器id不強制要求你從0或1開始。然而,分配節(jié)點ID的最簡單和最不容易混淆的方法是給每個服務(wù)器一個數(shù)字ID,然后從0開始。



          (二)下載并解壓安裝包


          bigdata01下載kafka的安裝包,并進(jìn)行解壓:


          [hadoop@bigdata01 kraft]$ cd /opt/soft/[hadoop@bigdata01 soft]$ wget http://archive.apache.org/dist/kafka/3.1.0/kafka_2.12-3.1.0.tgz[hadoop@bigdata01 soft]$ tar -zxf kafka_2.12-3.1.0.tgz -C /opt/install/


          修改kafka的配置文件broker.properties:


          [hadoop@bigdata01 kafka_2.12-3.1.0]$ cd /opt/install/kafka_2.12-3.1.0/config/kraft/[hadoop@bigdata01 kraft]$ vim broker.properties


          修改編輯內(nèi)容如下:


          node.id=1controller.quorum.voters=1@bigdata01:9093listeners=PLAINTEXT://bigdata01:9092advertised.listeners=PLAINTEXT://bigdata01:9092log.dirs=/opt/install/kafka_2.12-3.1.0/kraftlogs


          創(chuàng)建兩個文件夾:


          [hadoop@bigdata01 kafka_2.12-3.1.0]$ mkdir -p /opt/install/kafka_2.12-3.1.0/kraftlogs[hadoop@bigdata01 kafka_2.12-3.1.0]$ mkdir -p /opt/install/kafka_2.12-3.1.0/topiclogs


          同步安裝包到其他機器上面去。



          (三)服務(wù)器集群啟動


          啟動kafka服務(wù):


          [hadoop@bigdata01 kafka_2.12-3.1.0]$  ./bin/kafka-storage.sh random-uuidYkJwr6RESgSJv-sxa1R1mA[hadoop@bigdata01 kafka_2.12-3.1.0]$  ./bin/kafka-storage.sh format -t YkJwr6RESgSJv-sxa1R1mA -c ./config/kraft/server.propertiesFormatting /opt/install/kafka_2.12-3.1.0/topiclogs[hadoop@bigdata01 kafka_2.12-3.1.0]$ ./bin/kafka-server-start.sh ./config/kraft/server.properties



          (四)創(chuàng)建kafka的topic


          集群啟動成功之后,就可以來創(chuàng)建kafka的topic了,使用以下命令來創(chuàng)建kafka的topic:


          ./bin/kafka-topics.sh --create --topic kafka_test --partitions 3 --replication-factor 2 --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092



          (五)任意一臺機器查看kafka的topic


          組成集群之后,任意一臺機器就可以通過以下命令來查看到剛才創(chuàng)建的topic了:


          [hadoop@bigdata03 ~]$ cd /opt/install/kafka_2.12-3.1.0/[hadoop@bigdata03 kafka_2.12-3.1.0]$ bin/kafka-topics.sh  --list --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092



          (六)消息生產(chǎn)與消費


          使用命令行來生產(chǎn)以及消費kafka當(dāng)中的消息:


          [hadoop@bigdata01 kafka_2.12-3.1.0]$ bin/kafka-console-producer.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic kafka_test
          [hadoop@bigdata02 kafka_2.12-3.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic kafka_test --from-beginning



          三、Kafka當(dāng)中Raft的介紹


          (一)kafka強依賴zk所引發(fā)的問題


          前面我們已經(jīng)看到了kafka3集群在沒有zk集群的依賴下,也可以正常運行,那么kafka2在zk當(dāng)中保存的各種重要元數(shù)據(jù)信息,在kafka3當(dāng)中如何實現(xiàn)保存的呢?


          kafka一直都是使用zk來管理集群以及所有的topic的元數(shù)據(jù),并且使用了zk的強一致性來選舉集群的controller,controller對整個集群的管理至關(guān)重要,包括分區(qū)的新增,ISR列表的維護(hù),等等很多功能都需要靠controller來實現(xiàn),然后使用zk來維護(hù)kafka的元數(shù)據(jù)也存在很多的問題以及存在性能瓶頸。


          以下是kafka將元數(shù)據(jù)保存在zk當(dāng)中的諸多問題。


          • 元數(shù)據(jù)存取困難


          元數(shù)據(jù)的存取過于困難,每次重新選舉的controller需要把整個集群的元數(shù)據(jù)重新restore,非常的耗時且影響集群的可用性。


          • 元數(shù)據(jù)更新網(wǎng)絡(luò)開銷大


          整個元數(shù)據(jù)的更新操作也是以全量推的方式進(jìn)行,網(wǎng)絡(luò)的開銷也會非常大。


          • 強耦合違背軟件設(shè)計原則


          Zookeeper對于運維來說,維護(hù)Zookeeper也需要一定的開銷,并且kafka強耦合與zk也并不好,還得時刻擔(dān)心zk的宕機問題,違背軟件設(shè)計的高內(nèi)聚,低耦合的原則。


          • 網(wǎng)絡(luò)分區(qū)復(fù)雜度高


          Zookeeper本身并不能兼顧到broker與broker之間通信的狀態(tài),這就會導(dǎo)致網(wǎng)絡(luò)分區(qū)的復(fù)雜度成幾何倍數(shù)增長。


          • zk本身不適合做消息隊列


          zookeeper不適合做消息隊列,因為zookeeper有1M的消息大小限制 zookeeper的children太多會極大的影響性能znode太大也會影響性能 znode太大會導(dǎo)致重啟zkserver耗時10-15分鐘 zookeeper僅使用內(nèi)存作為存儲,所以不能存儲太多東西。


          • 并發(fā)訪問zk問題多


          最好單線程操作zk客戶端,不要并發(fā),臨界、競態(tài)問題太多。


          基于以上各種問題,所以提出了脫離zk的方案,轉(zhuǎn)向自助研發(fā)強一致性的元數(shù)據(jù)解決方案,也就是KIP-500。


          KIP-500議案提出了在Kafka中處理元數(shù)據(jù)的更好方法。基本思想是"Kafka on Kafka",將Kafka的元數(shù)據(jù)存儲在Kafka本身中,無需增加額外的外部存儲比如ZooKeeper等。


          去zookeeper之后的kafka新的架構(gòu)


          在KIP-500中,Kafka控制器會將其元數(shù)據(jù)存儲在Kafka分區(qū)中,而不是存儲在ZooKeeper中。但是,由于控制器依賴于該分區(qū),因此分區(qū)本身不能依賴控制器來進(jìn)行領(lǐng)導(dǎo)者選舉之類的事情。而是,管理該分區(qū)的節(jié)點必須實現(xiàn)自我管理的Raft仲裁。


          在kafka3.0的新的版本當(dāng)中,使用了新的KRaft協(xié)議,使用該協(xié)議來保證在元數(shù)據(jù)仲裁中準(zhǔn)確的復(fù)制元數(shù)據(jù),這個協(xié)議類似于zk當(dāng)中的zab協(xié)議以及類似于Raft協(xié)議,但是KRaft協(xié)議使用的是基于事件驅(qū)動的模式,與ZAB協(xié)議和Raft協(xié)議還有點不一樣


          在kafka3.0之前的的版本當(dāng)中,主要是借助于controller來進(jìn)行l(wèi)eader partition的選舉,而在3.0協(xié)議當(dāng)中,使用了KRaft來實現(xiàn)自己選擇leader,并最終令所有節(jié)點達(dá)成共識,這樣簡化了controller的選舉過程,效果更加高效。



          (二)kakfa3 Raft


          前面我們已經(jīng)知道了在kafka3當(dāng)中可以不用再依賴于zk來保存kafka當(dāng)中的元數(shù)據(jù)了,轉(zhuǎn)而使用Kafka Raft來實現(xiàn)元數(shù)據(jù)的一致性,簡稱KRaft,并且將元數(shù)據(jù)保存在kafka自己的服務(wù)器當(dāng)中,大大提高了kafka的元數(shù)據(jù)管理的性能。


          KRaft運行模式的Kafka集群,不會將元數(shù)據(jù)存儲在Apache ZooKeeper中。即部署新集群的時候,無需部署ZooKeeper集群,因為Kafka將元數(shù)據(jù)存儲在Controller節(jié)點的KRaft Quorum中。KRaft可以帶來很多好處,比如可以支持更多的分區(qū),更快速的切換Controller,也可以避免Controller緩存的元數(shù)據(jù)和Zookeeper存儲的數(shù)據(jù)不一致帶來的一系列問題。


          在新的版本當(dāng)中,控制器Controller節(jié)點我們可以自己進(jìn)行指定,這樣最大的好處就是我們可以自己選擇一些配置比較好的機器成為Controller節(jié)點,而不像在之前的版本當(dāng)中,我們無法指定哪臺機器成為Controller節(jié)點,而且controller節(jié)點與broker節(jié)點可以運行在同一臺機器上,并且控制器controller節(jié)點不再向broker推送更新消息,而是讓Broker從這個Controller Leader節(jié)點進(jìn)行拉去元數(shù)據(jù)的更新。




          (三)如何查看kafka3當(dāng)中的元數(shù)據(jù)信息


          在kafka3當(dāng)中,不再使用zk來保存元數(shù)據(jù)信息了,那么在kafka3當(dāng)中如何查看元數(shù)據(jù)信息呢,我們也可以通過kafka自帶的命令來進(jìn)行查看元數(shù)據(jù)信息,在KRaft中,有兩個命令常用命令腳本,kafka-dump-log.sh和kakfa-metadata-shell.sh需要我們來進(jìn)行關(guān)注,因為我們可以通過這兩個腳本來查看kafka當(dāng)中保存的元數(shù)據(jù)信息。


          • Kafka-dump-log.sh腳本來導(dǎo)出元數(shù)據(jù)信息


          KRaft模式下,所有的元數(shù)據(jù)信息都保存到了一個內(nèi)部的topic上面,叫做@metadata,例如Broker的信息,Topic的信息等,我們都可以去到這個topic上面進(jìn)行查看,我們可以通過kafka-dump-log.sh這個腳本來進(jìn)行查看該topic的信息。


          Kafka-dump-log.sh是一個之前就有的工具,用來查看Topic的的文件內(nèi)容。這工具加了一個參數(shù)--cluster-metadata-decoder用來,查看元數(shù)據(jù)日志,如下所示:


          [hadoop@bigdata01 kafka_2.12-3.1.0]$ cd /opt/install/kafka_2.12-3.1.0[hadoop@bigdata01 kafka_2.12-3.1.0]$ bin/kafka-dump-log.sh  --cluster-metadata-decoder --skip-record-metadata  --files  /opt/install/kafka_2.12-3.1.0/topiclogs/__cluster_metadata-0/00000000000000000000.index,/opt/install/kafka_2.12-3.1.0/topiclogs/__cluster_metadata-0/00000000000000000000.log  >>/opt/metadata.txt



          • kafka-metadata-shell.sh直接查看元數(shù)據(jù)信息


          平時我們用zk的時候,習(xí)慣了用zk命令行查看數(shù)據(jù),簡單快捷。bin目錄下自帶了kafka-metadata-shell.sh工具,可以允許你像zk一樣方便的查看數(shù)據(jù)。


          使用kafka-metadata-shell.sh腳本進(jìn)入kafka的元數(shù)據(jù)客戶端


          [hadoop@bigdata01 kafka_2.12-3.1.0]$ bin/kafka-metadata-shell.sh --snapshot /opt/install/kafka_2.12-3.1.0/topiclogs/__cluster_metadata-0/00000000000000000000.log


          四、Raft算法介紹


          raft算法中文版本翻譯介紹:

          https://github.com/maemual/raft-zh_cn/blob/master/raft-zh_cn.md


          著名的CAP原則又稱CAP定理的提出,真正奠基了分布式系統(tǒng)的誕生,CAP定理指的是在一個分布式系統(tǒng)中,[一致性]、[可用性](Availability)、[分區(qū)容錯性](Partition tolerance),這三個要素最多只能同時實現(xiàn)兩點,不可能三者兼顧(nosql)。


          分布式系統(tǒng)為了提高系統(tǒng)的可靠性,一般都會選擇使用多副本的方式來進(jìn)行實現(xiàn),例如hdfs當(dāng)中數(shù)據(jù)的多副本,kafka集群當(dāng)中分區(qū)的多副本等,但是一旦有了多副本的話,那么久面臨副本之間一致性的問題,而一致性算法就是 用于解決分布式環(huán)境下多副本的數(shù)據(jù)一致性的問題。業(yè)界最著名的一致性算法就是大名鼎鼎的Paxos,但是Paxos比較晦澀難懂,不太容易理解,所以還有一種叫做Raft的算法,更加簡單容易理解的實現(xiàn)了一致性算法。


          (一)Raft協(xié)議的工作原理


          • Raft協(xié)議當(dāng)中的角色分布


          Raft協(xié)議將分布式系統(tǒng)當(dāng)中的角色分為Leader(領(lǐng)導(dǎo)者),F(xiàn)ollower(跟從者)以及Candidate(候選者)


          • Leader:主節(jié)點的角色,主要是接收客戶端請求,并向Follower同步日志,當(dāng)日志同步到過半及以上節(jié)點之后,告訴follower進(jìn)行提交日志。


          • Follower:從節(jié)點的角色,接受并持久化Leader同步的日志,在Leader通知可以提交日志之后,進(jìn)行提交保存的日志。


          • Candidate:Leader選舉過程中的臨時角色。


          • Raft協(xié)議當(dāng)中的底層原理


          Raft協(xié)議當(dāng)中會選舉出Leader節(jié)點,Leader作為主節(jié)點,完全負(fù)責(zé)replicate log的管理。Leader負(fù)責(zé)接受所有客戶端的請求,然后復(fù)制到Follower節(jié)點,如果leader故障,那么follower會重新選舉leader,Raft協(xié)議的一致性,概括主要可以分為以下三個重要部分:


          • Leader選舉


          • 日志復(fù)制


          • 安全性


          其中Leader選舉和日志復(fù)制是Raft協(xié)議當(dāng)中最為重要的。


          Raft協(xié)議要求系統(tǒng)當(dāng)中,任意一個時刻,只有一個leader,正常工作期間,只有Leader和Follower角色,并且Raft協(xié)議采用了類似網(wǎng)絡(luò)租期的方式來進(jìn)行管理維護(hù)整個集群,Raft協(xié)議將時間分為一個個的時間段(term),也叫作任期,每一個任期都會選舉一個Leader來管理維護(hù)整個集群,如果這個時間段的Leader宕機,那么這一個任期結(jié)束,繼續(xù)重新選舉leader。


          Raft算法將時間劃分成為任意不同長度的任期(term)。任期用連續(xù)的數(shù)字進(jìn)行表示。每一個任期的開始都是一次選舉(election),一個或多個候選人會試圖成為領(lǐng)導(dǎo)人。如果一個候選人贏得了選舉,它就會在該任期的剩余時間擔(dān)任領(lǐng)導(dǎo)人。在某些情況下,選票會被瓜分,有可能沒有選出領(lǐng)導(dǎo)人,那么,將會開始另一個任期,并且立刻開始下一次選舉。Raft算法保證在給定的一個任期最多只有一個領(lǐng)導(dǎo)人。

          • Leader選舉的過程


          Raft使用心跳來進(jìn)行觸發(fā)leader選舉,當(dāng)服務(wù)器啟動時,初始化為follower角色。leader向所有Follower發(fā)送周期性心跳,如果Follower在選舉超時間內(nèi)沒有收到Leader的心跳,就會認(rèn)為leader宕機,稍后發(fā)起leader的選舉。


          每個Follower都會有一個倒計時時鐘,是一個隨機的值,表示的是Follower等待成為Leader的時間,倒計時時鐘先跑完,就會當(dāng)選成為Leader,這樣做得好處就是每一個節(jié)點都有機會成為Leader。


          當(dāng)滿足以下三個條件之一時,Quorum中的某個節(jié)點就會觸發(fā)選舉:


          • 向Leader發(fā)送Fetch請求后,在超時閾值quorum.fetch.timeout.ms之后仍然沒有得到Fetch響應(yīng),表示Leader疑似失敗。


          • 從當(dāng)前Leader收到了EndQuorumEpoch請求,表示Leader已退位。


          • Candidate狀態(tài)下,在超時閾值quorum.election.timeout.ms之后仍然沒有收到多數(shù)票,也沒有Candidate贏得選舉,表示此次選舉作廢,重新進(jìn)行選舉。


          具體詳細(xì)過程實現(xiàn)描述如下:


          • 增加節(jié)點本地的current term,切換到candidate狀態(tài)。


          • 自己給自己投一票。


          • 給其他節(jié)點發(fā)送RequestVote RPCs,要求其他節(jié)點也投自己一票。


          • 等待其他節(jié)點的投票回復(fù)。


          整個過程中的投票過程可以用下圖進(jìn)行表述。



          leader節(jié)點選舉的限制


          • 每個節(jié)點只能投一票,投給自己或者投給別人。


          • 候選人所知道的日志信息,一定不能比自己的更少,即能被選舉成為leader節(jié)點,一定包含了所有已經(jīng)提交的日志。


          • 先到先得的原則


          • 數(shù)據(jù)一致性保證(日志復(fù)制機制)


          前面通過選舉機制之后,選舉出來了leader節(jié)點,然后leader節(jié)點對外提供服務(wù),所有的客戶端的請求都會發(fā)送到leader節(jié)點,由leader節(jié)點來調(diào)度這些并發(fā)請求的處理順序,保證所有節(jié)點的狀態(tài)一致,leader會把請求作為日志條目(Log entries)加入到他的日志當(dāng)中,然后并行的向其他服務(wù)器發(fā)起AppendEntries RPC復(fù)制日志條目。當(dāng)這條請求日志被成功復(fù)制到大多數(shù)服務(wù)器上面之后,Leader將這條日志應(yīng)用到它的狀態(tài)機并向客戶端返回執(zhí)行結(jié)果。


          • 客戶端的每個請求都包含被復(fù)制狀態(tài)機執(zhí)行的指令


          • leader將客戶端請求作為一條心得日志添加到日志文件中,然后并行發(fā)起RPC給其他的服務(wù)器,讓他們復(fù)制這條信息到自己的日志文件中保存。


          • 如果這條日志被成功復(fù)制,也就是大部分的follower都保存好了執(zhí)行指令日志,leader就應(yīng)用這條日志到自己的狀態(tài)機中,并返回給客戶端。


          • 如果follower宕機或者運行緩慢或者數(shù)據(jù)丟失,leader會不斷地進(jìn)行重試,直至所有在線的follower都成功復(fù)制了所有的日志條目。


          與維護(hù)Consumer offset的方式類似,脫離ZK之后的Kafka集群將元數(shù)據(jù)視為日志,保存在一個內(nèi)置的Topic中,且該Topic只有一個Partition。



          元數(shù)據(jù)日志的消息格式與普通消息沒有太大不同,但必須攜帶Leader的紀(jì)元值(即之前的Controller epoch):


          Record => Offset LeaderEpoch ControlType Key Value Timestamp


          這樣,F(xiàn)ollower以拉模式復(fù)制Leader日志,就相當(dāng)于以Consumer角色消費元數(shù)據(jù)Topic,符合Kafka原生的語義。


          那么在KRaft協(xié)議中,是如何維護(hù)哪些元數(shù)據(jù)日志已經(jīng)提交——即已經(jīng)成功復(fù)制到多數(shù)的Follower節(jié)點上的呢?Kafka仍然借用了原生副本機制中的概念——high watermark(HW,高水位線)保證日志不會丟失,HW的示意圖如下。



          狀態(tài)機說明


          要讓所有節(jié)點達(dá)成一致性的狀態(tài),大部分都是基于復(fù)制狀態(tài)機來實現(xiàn)的(Replicated state machine)


          簡單來說就是:初始相同的狀態(tài)+相同的輸入過程=相同的結(jié)束狀態(tài),這個其實也好理解,就類似于一對雙胞胎,出生時候就長得一樣,然后吃的喝的用的穿的都一樣,你自然很難分辨。其中最重要的就是一定要注意中間的相同輸入過程,各個不同節(jié)點要以相同且確定性的函數(shù)來處理輸入,而不要引入一個不確定的值。使用replicated log來實現(xiàn)每個節(jié)點都順序的寫入客戶端請求,然后順序的處理客戶端請求,最終就一定能夠達(dá)到最終一致性。


          狀態(tài)機安全性保證


          在安全性方面,KRaft與傳統(tǒng)Raft的選舉安全性、領(lǐng)導(dǎo)者只追加、日志匹配和領(lǐng)導(dǎo)者完全性保證都是幾乎相同的。下面只簡單看看狀態(tài)機安全性是如何保證的,仍然舉論文中的極端例子:



          • 在時刻a,節(jié)點S1是Leader,epoch=2的日志只復(fù)制給了S2就崩潰了。


          • 在時刻b,S5被選舉為Leader,epoch=3的日志還沒來得及復(fù)制,也崩潰了。


          • 在時刻c,S1又被選舉為Leader,繼續(xù)復(fù)制日志,將epoch=2的日志給了S3。此時該日志復(fù)制給了多數(shù)節(jié)點,但還未提交。


          • 在時刻d,S1又崩潰,并且S5重新被選舉為領(lǐng)導(dǎo)者,將epoch=3的日志復(fù)制給S0~S4。


          此時日志與新Leader S5的日志發(fā)生了沖突,如果按上圖中d1的方式處理,消息2就會丟失。傳統(tǒng)Raft協(xié)議的處理方式是:在Leader任期開始時,立刻提交一條空的日志,所以上圖中時刻c的情況不會發(fā)生,而是如同d2一樣先提交epoch=4的日志,連帶提交epoch=2的日志。


          與傳統(tǒng)Raft不同,KRaft附加了一個較強的約束:當(dāng)新的Leader被選舉出來,但還沒有成功提交屬于它的epoch的日志時,不會向前推進(jìn)HW。也就是說,即使上圖中時刻c的情況發(fā)生了,消息2也被視為沒有成功提交,所以按照d1方式處理是安全的。


          日志格式說明


          所有節(jié)點持久化保存在本地的日志,大概就是類似于這個樣子:


          上圖顯示,共有八條日志數(shù)據(jù),其中已經(jīng)提交了7條,提交的日志都將通過狀態(tài)機持久化到本地磁盤當(dāng)中,防止宕機。


          日志復(fù)制的保證機制


          如果兩個節(jié)點不同的日志文件當(dāng)中存儲著相同的索引和任期號,那么他們所存儲的命令是相同的。(原因:leader最多在一個任期里的一個日志索引位置創(chuàng)建一條日志條目,日志條目所在的日志位置從來不會改變)。


          如果不同日志中兩個條目有著相同的索引和任期號,那么他們之前的所有條目都是一樣的(原因:每次RPC發(fā)送附加日志時,leader會把這條日志前面的日志下標(biāo)和任期號一起發(fā)送給follower,如果follower發(fā)現(xiàn)和自己的日志不匹配,那么就拒絕接受這條日志,這個稱之為一致性檢查)


          日志的不正常情況


          一般情況下,Leader和Followers的日志保持一致,因此Append Entries一致性檢查通常不會失敗。然而,Leader崩潰可能會導(dǎo)致日志不一致:舊的Leader可能沒有完全復(fù)制完日志中的所有條目。


          下圖闡述了一些Followers可能和新的Leader日志不同的情況。一個Follower可能會丟失掉Leader上的一些條目,也有可能包含一些Leader沒有的條目,也有可能兩者都會發(fā)生。丟失的或者多出來的條目可能會持續(xù)多個任期。



          如何保證日志的正常復(fù)制


          如果出現(xiàn)了上述leader宕機,導(dǎo)致follower與leader日志不一致的情況,那么就需要進(jìn)行處理,保證follower上的日志與leader上的日志保持一致,leader通過強制follower復(fù)制它的日志來處理不一致的問題,follower與leader不一致的日志會被強制覆蓋。leader為了最大程度的保證日志的一致性,且保證日志最大量,leader會尋找follower與他日志一致的地方,然后覆蓋follower之后的所有日志條目,從而實現(xiàn)日志數(shù)據(jù)的一致性。


          具體的操作就是:leader會從后往前不斷對比,每次Append Entries失敗后嘗試前一個日志條目,直到成功找到每個Follower的日志一致的位置點,然后向該Follower所在位置之后的條目進(jìn)行覆蓋。


          詳細(xì)過程如下:


          • Leader維護(hù)了每個Follower節(jié)點下一次要接收的日志的索引,即nextIndex。


          • Leader選舉成功后將所有Follower的nextIndex設(shè)置為自己的最后一個日志條目+1。


          • Leader將數(shù)據(jù)推送給Follower,如果Follower驗證失?。╪extIndex不匹配),則在下一次推送日志時縮小nextIndex,直到nextIndex驗證通過。


          總結(jié)一下就是:當(dāng)leader和follower日志沖突的時候,leader將校驗 follower最后一條日志是否和leader匹配,如果不匹配,將遞減查詢,直到匹配,匹配后,刪除沖突的日志。這樣就實現(xiàn)了主從日志的一致性。



          (二)Raft協(xié)議算法代碼實現(xiàn)


          前面我們已經(jīng)大致了解了Raft協(xié)議算法的實現(xiàn)原理,如果我們要自己實現(xiàn)一個Raft協(xié)議的算法,其實就是將我們講到的理論知識給翻譯成為代碼的過程,具體的開發(fā)需要考慮的細(xì)節(jié)比較多,代碼量肯定也比較大,好在有人已經(jīng)實現(xiàn)了Raft協(xié)議的算法了,我們可以直接拿過來使用。


          創(chuàng)建maven工程并導(dǎo)入jar包地址如下:


          <dependencies>
          <dependency> <groupId>com.github.wenweihu86.raft</groupId> <artifactId>raft-java-core</artifactId> <version>1.8.0</version> </dependency>
          <dependency> <groupId>com.github.wenweihu86.rpc</groupId> <artifactId>rpc-java</artifactId> <version>1.8.0</version> </dependency>
          <dependency> <groupId>org.rocksdb</groupId> <artifactId>rocksdbjni</artifactId> <version>5.1.4</version> </dependency>
          </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>


          定義Server端代碼實現(xiàn):


          public class Server1 {    public static void main(String[] args) {        // parse args        // peers, format is "host:port:serverId,host2:port2:serverId2"
          //localhost:16010:1,localhost:16020:2,localhost:16030:3 localhost:16010:1 String servers = "localhost:16010:1,localhost:16020:2,localhost:16030:3";
          // local server RaftMessage.Server localServer = parseServer("localhost:16010:1");
          String[] splitArray = servers.split(","); List<RaftMessage.Server> serverList = new ArrayList<>(); for (String serverString : splitArray) { RaftMessage.Server server = parseServer(serverString); serverList.add(server); }

          // 初始化RPCServer RPCServer server = new RPCServer(localServer.getEndPoint().getPort()); // 設(shè)置Raft選項,比如: // just for test snapshot RaftOptions raftOptions = new RaftOptions(); /* raftOptions.setSnapshotMinLogSize(10 * 1024); raftOptions.setSnapshotPeriodSeconds(30); raftOptions.setMaxSegmentFileSize(1024 * 1024);*/ // 應(yīng)用狀態(tài)機 ExampleStateMachine stateMachine = new ExampleStateMachine(raftOptions.getDataDir()); // 初始化RaftNode RaftNode raftNode = new RaftNode(raftOptions, serverList, localServer, stateMachine); raftNode.getLeaderId(); // 注冊Raft節(jié)點之間相互調(diào)用的服務(wù) RaftConsensusService raftConsensusService = new RaftConsensusServiceImpl(raftNode); server.registerService(raftConsensusService); // 注冊給Client調(diào)用的Raft服務(wù) RaftClientService raftClientService = new RaftClientServiceImpl(raftNode); server.registerService(raftClientService); // 注冊應(yīng)用自己提供的服務(wù) ExampleService exampleService = new ExampleServiceImpl(raftNode, stateMachine); server.registerService(exampleService); // 啟動RPCServer,初始化Raft節(jié)點 server.start(); raftNode.init(); }
          private static RaftMessage.Server parseServer(String serverString) { String[] splitServer = serverString.split(":"); String host = splitServer[0]; Integer port = Integer.parseInt(splitServer[1]); Integer serverId = Integer.parseInt(splitServer[2]); RaftMessage.EndPoint endPoint = RaftMessage.EndPoint.newBuilder() .setHost(host).setPort(port).build(); RaftMessage.Server.Builder serverBuilder = RaftMessage.Server.newBuilder(); RaftMessage.Server server = serverBuilder.setServerId(serverId).setEndPoint(endPoint).build(); return server; }}


          定義客戶端代碼實現(xiàn)如下:


          public class ClientMain {    public static void main(String[] args) {        // parse args        String ipPorts = args[0];        String key = args[1];        String value = null;        if (args.length > 2) {            value = args[2];        }        // init rpc client        RPCClient rpcClient = new RPCClient(ipPorts);        ExampleService exampleService = RPCProxy.getProxy(rpcClient, ExampleService.class);        final JsonFormat.Printer printer = JsonFormat.printer().omittingInsignificantWhitespace();        // set        if (value != null) {            ExampleMessage.SetRequest setRequest = ExampleMessage.SetRequest.newBuilder()                    .setKey(key).setValue(value).build();            ExampleMessage.SetResponse setResponse = exampleService.set(setRequest);            try {                System.out.printf("set request, key=%s value=%s response=%s\n",                        key, value, printer.print(setResponse));            } catch (Exception ex) {                ex.printStackTrace();            }        } else {            // get            ExampleMessage.GetRequest getRequest = ExampleMessage.GetRequest.newBuilder().setKey(key).build();            ExampleMessage.GetResponse getResponse = exampleService.get(getRequest);            try {                String value1 = getResponse.getValue();                System.out.println(value1);                System.out.printf("get request, key=%s, response=%s\n",                        key, printer.print(getResponse));            } catch (Exception ex) {                ex.printStackTrace();            }        }
          rpcClient.stop(); }}


          先啟動服務(wù)端,然后啟動客戶端,就可以將實現(xiàn)客戶端向服務(wù)端發(fā)送消息,并且服務(wù)端會向三臺機器進(jìn)行保存消息了。


          五、Kafka常見問題


          (一)消息隊列模型知道嗎?Kafka是怎么做到支持這兩種模型的?


          對于傳統(tǒng)的消息隊列系統(tǒng)支持兩個模型:


          • 點對點:也就是消息只能被一個消費者消費,消費完后消息刪除。


          • 發(fā)布訂閱:相當(dāng)于廣播模式,消息可以被所有消費者消費。


          kafka其實就是通過Consumer Group同時支持了這兩個模型。如果說所有消費者都屬于一個Group,消息只能被同一個Group內(nèi)的一個消費者消費,那就是點對點模式。如果每個消費者都是一個單獨的Group,那么就是發(fā)布訂閱模式。



          (二)說說Kafka通信過程原理嗎?


          首先kafka broker啟動的時候,會去向Zookeeper注冊自己的ID(創(chuàng)建臨時節(jié)點),這個ID可以配置也可以自動生成,同時會去訂閱Zookeeper的brokers/ids路徑,當(dāng)有新的broker加入或者退出時,可以得到當(dāng)前所有broker信。


          生產(chǎn)者啟動的時候會指定bootstrap.servers,通過指定的broker地址,Kafka就會和這些broker創(chuàng)建TCP連接(通常我們不用配置所有的broker服務(wù)器地址,否則kafka會和配置的所有broker都建立TCP連接)


          隨便連接到任何一臺broker之后,然后再發(fā)送請求獲取元數(shù)據(jù)信息(包含有哪些主題、主題都有哪些分區(qū)、分區(qū)有哪些副本,分區(qū)的Leader副本等信息)


          接著就會創(chuàng)建和所有broker的TCP連接。


          之后就是發(fā)送消息的過程。


          消費者和生產(chǎn)者一樣,也會指定bootstrap.servers屬性,然后選擇一臺broker創(chuàng)建TCP連接,發(fā)送請求找到協(xié)調(diào)者所在的broker。


          然后再和協(xié)調(diào)者broker創(chuàng)建TCP連接,獲取元數(shù)據(jù)。


          根據(jù)分區(qū)Leader節(jié)點所在的broker節(jié)點,和這些broker分別創(chuàng)建連接。


          最后開始消費消息。



          (三)發(fā)送消息時如何選擇分區(qū)的?


          主要有兩種方式:


          • 輪詢,按照順序消息依次發(fā)送到不同的分區(qū)。


          • 隨機,隨機發(fā)送到某個分區(qū)。


          如果消息指定key,那么會根據(jù)消息的key進(jìn)行hash,然后對partition分區(qū)數(shù)量取模,決定落在哪個分區(qū)上,所以,對于相同key的消息來說,總是會發(fā)送到同一個分區(qū)上,也是我們常說的消息分區(qū)有序性。


          很常見的場景就是我們希望下單、支付消息有順序,這樣以訂單ID作為key發(fā)送消息就達(dá)到了分區(qū)有序性的目的。


          如果沒有指定key,會執(zhí)行默認(rèn)的輪詢負(fù)載均衡策略,比如第一條消息落在P0,第二條消息落在P1,然后第三條又在P1。


          除此之外,對于一些特定的業(yè)務(wù)場景和需求,還可以通過實現(xiàn)Partitioner接口,重寫configure和partition方法來達(dá)到自定義分區(qū)的效果。



          (四)為什么需要分區(qū)?有什么好處?


          這個問題很簡單,如果說不分區(qū)的話,我們發(fā)消息寫數(shù)據(jù)都只能保存到一個節(jié)點上,這樣的話就算這個服務(wù)器節(jié)點性能再好最終也支撐不住。

          實際上分布式系統(tǒng)都面臨這個問題,要么收到消息之后進(jìn)行數(shù)據(jù)切分,要么提前切分,kafka正是選擇了前者,通過分區(qū)可以把數(shù)據(jù)均勻地分布到不同的節(jié)點。


          分區(qū)帶來了負(fù)載均衡和橫向擴(kuò)展的能力。


          發(fā)送消息時可以根據(jù)分區(qū)的數(shù)量落在不同的Kafka服務(wù)器節(jié)點上,提升了并發(fā)寫消息的性能,消費消息的時候又和消費者綁定了關(guān)系,可以從不同節(jié)點的不同分區(qū)消費消息,提高了讀消息的能力。


          另外一個就是分區(qū)又引入了副本,冗余的副本保證了Kafka的高可用和高持久性。



          (五)詳細(xì)說說消費者組和消費者重平衡?


          Kafka中的消費者組訂閱topic主題的消息,一般來說消費者的數(shù)量最好要和所有主題分區(qū)的數(shù)量保持一致最好(舉例子用一個主題,實際上當(dāng)然是可以訂閱多個主題)。


          當(dāng)消費者數(shù)量小于分區(qū)數(shù)量的時候,那么必然會有一個消費者消費多個分區(qū)的消息。


          而消費者數(shù)量超過分區(qū)的數(shù)量的時候,那么必然會有消費者沒有分區(qū)可以消費。


          所以,消費者組的好處一方面在上面說到過,可以支持多種消息模型,另外的話根據(jù)消費者和分區(qū)的消費關(guān)系,支撐橫向擴(kuò)容伸縮。



          當(dāng)我們知道消費者如何消費分區(qū)的時候,就顯然會有一個問題出現(xiàn)了,消費者消費的分區(qū)是怎么分配的,有先加入的消費者時候怎么辦?


          舊版本的重平衡過程主要通過ZK監(jiān)聽器的方式來觸發(fā),每個消費者客戶端自己去執(zhí)行分區(qū)分配算法。


          新版本則是通過協(xié)調(diào)者來完成,每一次新的消費者加入都會發(fā)送請求給協(xié)調(diào)者去獲取分區(qū)的分配,這個分區(qū)分配的算法邏輯由協(xié)調(diào)者來完成。


          而重平衡Rebalance就是指的有新消費者加入的情況,比如剛開始我們只有消費者A在消費消息,過了一段時間消費者B和C加入了,這時候分區(qū)就需要重新分配,這就是重平衡,也可以叫做再平衡,但是重平衡的過程和我們的GC時候STW很像,會導(dǎo)致整個消費群組停止工作,重平衡期間都無法消息消息。


          另外,發(fā)生重平衡并不是只有這一種情況,因為消費者和分區(qū)總數(shù)是存在綁定關(guān)系的,上面也說了,消費者數(shù)量最好和所有主題的分區(qū)總數(shù)一樣。


          那只要消費者數(shù)量、主題數(shù)量(比如用的正則訂閱的主題)、分區(qū)數(shù)量任何一個發(fā)生改變,都會觸發(fā)重平衡。


          下面說說重平衡的過程。


          重平衡的機制依賴消費者和協(xié)調(diào)者之間的心跳來維持,消費者會有一個獨立的線程去定時發(fā)送心跳給協(xié)調(diào)者,這個可以通過參數(shù)heartbeat.interval.ms來控制發(fā)送心跳的間隔時間。


          每個消費者第一次加入組的時候都會向協(xié)調(diào)者發(fā)送JoinGroup請求,第一個發(fā)送這個請求的消費者會成為“群主”,協(xié)調(diào)者會返回組成員列表給群主。


          群主執(zhí)行分區(qū)分配策略,然后把分配結(jié)果通過SyncGroup請求發(fā)送給協(xié)調(diào)者,協(xié)調(diào)者收到分區(qū)分配結(jié)果。


          其他組內(nèi)成員也向協(xié)調(diào)者發(fā)送SyncGroup,協(xié)調(diào)者把每個消費者的分區(qū)分配分別響應(yīng)給他們。



          (六)具體講講分區(qū)分配策略?


          主要有3種分配策略


          Range


          對分區(qū)進(jìn)行排序,排序越靠前的分區(qū)能夠分配到更多的分區(qū)。


          比如有3個分區(qū),消費者A排序更靠前,所以能夠分配到P0\P1兩個分區(qū),消費者B就只能分配到一個P2。


          如果是4個分區(qū)的話,那么他們會剛好都是分配到2個。



          但是這個分配策略會有點小問題,他是根據(jù)主題進(jìn)行分配,所以如果消費者組訂閱了多個主題,那就有可能導(dǎo)致分區(qū)分配不均衡。


          比如下圖中兩個主題的P0\P1都被分配給了A,這樣A有4個分區(qū),而B只有2個,如果這樣的主題數(shù)量越多,那么不均衡就越嚴(yán)重。



          RoundRobin


          也就是我們常說的輪詢了,這個就比較簡單了,不畫圖你也能很容易理解。

          這個會根據(jù)所有的主題進(jìn)行輪詢分配,不會出現(xiàn)Range那種主題越多可能導(dǎo)致分區(qū)分配不均衡的問題。


          P0->A,P1->B,P1->A。。。以此類推



          Sticky


          這個從字面看來意思就是粘性策略,大概是這個意思。主要考慮的是在分配均衡的前提下,讓分區(qū)的分配更小的改動。


          比如之前P0\P1分配給消費者A,那么下一次盡量還是分配給A。


          這樣的好處就是連接可以復(fù)用,要消費消息總是要和broker去連接的,如果能夠保持上一次分配的分區(qū)的話,那么就不用頻繁的銷毀創(chuàng)建連接了。



          (七)如何保證消息可靠性?


          • 生產(chǎn)者發(fā)送消息丟失


          kafka支持3種方式發(fā)送消息,這也是常規(guī)的3種方式,發(fā)送后不管結(jié)果、同步發(fā)送、異步發(fā)送,基本上所有的消息隊列都是這樣玩的。


          • 發(fā)送并忘記,直接調(diào)用發(fā)送send方法,不管結(jié)果,雖然可以開啟自動重試,但是肯定會有消息丟失的可能。


          • 同步發(fā)送,同步發(fā)送返回Future對象,我們可以知道發(fā)送結(jié)果,然后進(jìn)行處理。


          • 異步發(fā)送,發(fā)送消息,同時指定一個回調(diào)函數(shù),根據(jù)結(jié)果進(jìn)行相應(yīng)的處理。


          為了保險起見,一般我們都會使用異步發(fā)送帶有回調(diào)的方式進(jìn)行發(fā)送消息,再設(shè)置參數(shù)為發(fā)送消息失敗不停地重試。


          acks=all,這個參數(shù)有可以配置0|1|all。


          0表示生產(chǎn)者寫入消息不管服務(wù)器的響應(yīng),可能消息還在網(wǎng)絡(luò)緩沖區(qū),服務(wù)器根本沒有收到消息,當(dāng)然會丟失消息。


          1表示至少有一個副本收到消息才認(rèn)為成功,一個副本那肯定就是集群的Leader副本了,但是如果剛好Leader副本所在的節(jié)點掛了,F(xiàn)ollower沒有同步這條消息,消息仍然丟失了。


          配置all的話表示所有ISR都寫入成功才算成功,那除非所有ISR里的副本全掛了,消息才會丟失。


          retries=N,設(shè)置一個非常大的值,可以讓生產(chǎn)者發(fā)送消息失敗后不停重試

          Kafka 自身消息丟失。


          kafka因為消息寫入是通過PageCache異步寫入磁盤的,因此仍然存在丟失消息的可能。


          因此針對kafka自身丟失的可能設(shè)置參數(shù):


          replication.factor=N,設(shè)置一個比較大的值,保證至少有2個或者以上的副本。


          min.insync.replicas=N,代表消息如何才能被認(rèn)為是寫入成功,設(shè)置大于1的數(shù),保證至少寫入1個或者以上的副本才算寫入消息成功。


          unclean.leader.election.enable=false,這個設(shè)置意味著沒有完全同步的分區(qū)副本不能成為Leader副本,如果是true的話,那些沒有完全同步Leader的副本成為Leader之后,就會有消息丟失的風(fēng)險。


          • 消費者消息丟失


          消費者丟失的可能就比較簡單,關(guān)閉自動提交位移即可,改為業(yè)務(wù)處理成功手動提交。


          因為重平衡發(fā)生的時候,消費者會去讀取上一次提交的偏移量,自動提交默認(rèn)是每5秒一次,這會導(dǎo)致重復(fù)消費或者丟失消息。


          enable.auto.commit=false,設(shè)置為手動提交。


          還有一個參數(shù)我們可能也需要考慮進(jìn)去的:


          auto.offset.reset=earliest,這個參數(shù)代表沒有偏移量可以提交或者broker上不存在偏移量的時候,消費者如何處理。earliest代表從分區(qū)的開始位置讀取,可能會重復(fù)讀取消息,但是不會丟失,消費方一般我們肯定要自己保證冪等,另外一種latest表示從分區(qū)末尾讀取,那就會有概率丟失消息。

          綜合這幾個參數(shù)設(shè)置,我們就能保證消息不會丟失,保證了可靠性。



          (八)聊聊副本和它的同步原理吧?


          Kafka副本的之前提到過,分為Leader副本和Follower副本,也就是主副本和從副本,和其他的比如Mysql不一樣的是,Kafka中只有Leader副本會對外提供服務(wù),F(xiàn)ollower副本只是單純地和Leader保持?jǐn)?shù)據(jù)同步,作為數(shù)據(jù)冗余容災(zāi)的作用。


          在Kafka中我們把所有副本的集合統(tǒng)稱為AR(Assigned Replicas),和Leader副本保持同步的副本集合稱為ISR(InSyncReplicas)。


          ISR是一個動態(tài)的集合,維持這個集合會通過replica.lag.time.max.ms參數(shù)來控制,這個代表落后Leader副本的最長時間,默認(rèn)值10秒,所以只要Follower副本沒有落后Leader副本超過10秒以上,就可以認(rèn)為是和Leader同步的(簡單可以認(rèn)為就是同步時間差)。


          另外還有兩個關(guān)鍵的概念用于副本之間的同步:


          HW(High Watermark):高水位,也叫做復(fù)制點,表示副本間同步的位置。如下圖所示,0~4綠色表示已經(jīng)提交的消息,這些消息已經(jīng)在副本之間進(jìn)行同步,消費者可以看見這些消息并且進(jìn)行消費,4~6黃色的則是表示未提交的消息,可能還沒有在副本間同步,這些消息對于消費者是不可見的。


          LEO(Log End Offset):下一條待寫入消息的位移



          副本間同步的過程依賴的就是HW和LEO的更新,以他們的值變化來演示副本同步消息的過程,綠色表示Leader副本,黃色表示Follower副本。


          首先,生產(chǎn)者不停地向Leader寫入數(shù)據(jù),這時候Leader的LEO可能已經(jīng)達(dá)到了10,但是HW依然是0,兩個Follower向Leader請求同步數(shù)據(jù),他們的值都是0。



          此時,F(xiàn)ollower再次向Leader拉取數(shù)據(jù),這時候Leader會更新自己的HW值,取Follower中的最小的LEO值來更新。



          之后,Leader響應(yīng)自己的HW給Follower,F(xiàn)ollower更新自己的HW值,因為又拉取到了消息,所以再次更新LEO,流程以此類推。



          (九)Kafka為什么快?


          主要是3個方面:


          • 順序IO


          kafka寫消息到分區(qū)采用追加的方式,也就是順序?qū)懭氪疟P,不是隨機寫入,這個速度比普通的隨機IO快非常多,幾乎可以和網(wǎng)絡(luò)IO的速度相媲美。


          • Page Cache和零拷貝


          kafka在寫入消息數(shù)據(jù)的時候通過mmap內(nèi)存映射的方式,不是真正立刻寫入磁盤,而是利用操作系統(tǒng)的文件緩存PageCache異步寫入,提高了寫入消息的性能,另外在消費消息的時候又通過sendfile實現(xiàn)了零拷貝。


          • 批量處理和壓縮


          Kafka在發(fā)送消息的時候不是一條條的發(fā)送的,而是會把多條消息合并成一個批次進(jìn)行處理發(fā)送,消費消息也是一個道理,一次拉取一批次的消息進(jìn)行消費。


          并且Producer、Broker、Consumer都使用了優(yōu)化后的壓縮算法,發(fā)送和消息消息使用壓縮節(jié)省了網(wǎng)絡(luò)傳輸?shù)拈_銷,Broker存儲使用壓縮則降低了磁盤存儲的空間。


          參考資料:

          1.《深入理解Kafka:核心設(shè)計實踐原理》

          2.狀態(tài)機程序設(shè)計套路

          3.raft算法源碼

          4.https://www.bbsmax.com/A/QW5Y3kaBzm/

          瀏覽 51
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美黑人一级 | 欧美亚国产手机在线视频 | 可以看的免费黄色片 | 丁香五月天堂 | 大香蕉伊人网国产 |