Kafka 3.0新特性全面曝光,真香!
導(dǎo)語 | kafka3.0的版本已經(jīng)試推行去zk的kafka架構(gòu)了,如果去掉了zk,那么在kafka新的版本當(dāng)中使用什么技術(shù)來代替了zk的位置呢,接下來我們一起來一探究竟,了解kafka的內(nèi)置共識機制和raft算法。
一、Kafka簡介
Kafka是一款開源的消息引擎系統(tǒng)。一個典型的Kafka體系架構(gòu)包括若干Producer、若干Broker、若干Consumer,以及一個ZooKeeper集群,如上圖所示。其中ZooKeeper是Kafka用來負(fù)責(zé)集群元數(shù)據(jù)的管理、控制器的選舉等操作的。Producer將消息發(fā)送到Broker,Broker負(fù)責(zé)將收到的消息存儲到磁盤中,而Consumer負(fù)責(zé)從Broker訂閱并消費消息。
(一)Kafka核心組件
producer:消息生產(chǎn)者,就是向broker發(fā)送消息的客戶端。
consumer:消息消費者,就是從broker拉取數(shù)據(jù)的客戶端。
consumer group:消費者組,由多個消費者consumer組成。消費者組內(nèi)每個消費者負(fù)責(zé)消費不同的分區(qū),一個分區(qū)只能由同一個消費者組內(nèi)的一個消費者消費;消費者組之間相互獨立,互不影響。所有的消費者都屬于某個消費者組,即消費者組是一個邏輯上的訂閱者。
broker:一臺服務(wù)器就是一個broker,一個集群由多個broker組成,一個broker可以有多個topic。
topic:可以理解為一個隊列,所有的生產(chǎn)者和消費者都是面向topic的。
partition:分區(qū),kafka中的topic為了提高拓展性和實現(xiàn)高可用而將它分布到不同的broker中,一個topic可以分為多個partition,每個partition都是有序的,即消息發(fā)送到隊列的順序跟消費時拉取到的順序是一致的。
replication:副本。一個topic對應(yīng)的分區(qū)partition可以有多個副本,多個副本中只有一個為leader,其余的為follower。為了保證數(shù)據(jù)的高可用性,leader和follower會盡量均勻的分布在各個broker中,避免了leader所在的服務(wù)器宕機而導(dǎo)致topic不可用的問題。
(二)kafka2當(dāng)中zk的作用

/admin:主要保存kafka當(dāng)中的核心的重要信息,包括類似于已經(jīng)刪除的topic就會保存在這個路徑下面。
/brokers:主要用于保存kafka集群當(dāng)中的broker信息,以及沒被刪除的topic信息。
/cluster: 主要用于保存kafka集群的唯一id信息,每個kafka集群都會給分配要給唯一id,以及對應(yīng)的版本號。
/config: 集群配置信息。
/controller:kafka集群當(dāng)中的控制器信息,控制器組件(Controller),是Apache Kafka的核心組件。它的主要作用是在Apache ZooKeeper的幫助下管理和協(xié)調(diào)整個Kafka集群。
/controller_epoch:主要用于保存記錄controller的選舉的次數(shù)。
/isr_change_notification:isr列表發(fā)生變更時候的通知,在kafka當(dāng)中由于存在ISR列表變更的情況發(fā)生,為了保證ISR列表更新的及時性,定義了isr_change_notification這個節(jié)點,主要用于通知Controller來及時將ISR列表進(jìn)行變更。
/latest_producer_id_block:使用`/latest_producer_id_block`節(jié)點來保存PID塊,主要用于能夠保證生產(chǎn)者的任意寫入請求都能夠得到響應(yīng)。
/log_dir_event_notification:主要用于保存當(dāng)broker當(dāng)中某些LogDir出現(xiàn)異常時候,例如磁盤損壞,文件讀寫失敗等異常時候,向ZK當(dāng)中增加一個通知序號,controller監(jiān)聽到這個節(jié)點的變化之后,就會做出對應(yīng)的處理操作。
以上就是kafka在zk當(dāng)中保留的所有的所有的相關(guān)的元數(shù)據(jù)信息,這些元數(shù)據(jù)信息保證了kafka集群的正常運行。
二、kafka3的安裝配置
在kafka3的版本當(dāng)中已經(jīng)徹底去掉了對zk的依賴,如果沒有了zk集群,那么kafka當(dāng)中是如何保存元數(shù)據(jù)信息的呢,這里我們通過kafka3的集群來一探究竟。
(一)kafka安裝配置核心重要參數(shù)
Controller服務(wù)器
不管是kafka2還是kafka3當(dāng)中,controller控制器都是必不可少的,通過controller控制器來維護(hù)kafka集群的正常運行,例如ISR列表的變更,broker的上線或者下線,topic的創(chuàng)建,分區(qū)的指定等等各種操作都需要依賴于Controller,在kafka2當(dāng)中,controller的選舉需要通過zk來實現(xiàn),我們沒法控制哪些機器選舉成為Controller,而在kafka3當(dāng)中,我們可以通過配置文件來自己指定哪些機器成為Controller,這樣做的好處就是我們可以指定一些配置比較高的機器作為Controller節(jié)點,從而保證controller節(jié)點的穩(wěn)健性。
被選中的controller節(jié)點參與元數(shù)據(jù)集群的選舉,每個controller節(jié)點要么是Active狀態(tài),或者就是standBy狀態(tài)。

Process.Roles
使用KRaft模式來運行kafka集群的話,我們有一個配置叫做Process.Roles必須配置,這個參數(shù)有以下四個值可以進(jìn)行配置:
Process.Roles=Broker, 服務(wù)器在KRaft模式中充當(dāng)Broker。
Process.Roles=Controller, 服務(wù)器在KRaft模式下充當(dāng)Controller。
Process.Roles=Broker,Controller,服務(wù)器在KRaft模式中同時充當(dāng)Broker和Controller。
如果process.roles沒有設(shè)置。那么集群就假定是運行在ZooKeeper模式下。
如果需要從zookeeper模式轉(zhuǎn)換成為KRaft模式,那么需要進(jìn)行重新格式化。如果一個節(jié)點同時是Broker和Controller節(jié)點,那么就稱之為組合節(jié)點。
實際工作當(dāng)中,如果有條件的話,盡量還是將Broker和Controller節(jié)點進(jìn)行分離部署。避免由于服務(wù)器資源不夠的情況導(dǎo)致OOM等一系列的問題
Quorum Voters
通過controller.quorum.voters配置來實習(xí)哪些節(jié)點是Quorum的投票節(jié)點,所有想要成為控制器的節(jié)點,都必須放到這個配置里面。
每個Broker和每個Controller都必須配置Controller.quorum.voters,該配置當(dāng)中提供的節(jié)點ID必須與提供給服務(wù)器的節(jié)點ID保持一直。
每個Broker和每個Controller 都必須設(shè)置 controller.quorum.voters。需要注意的是,controller.quorum.voters 配置中提供的節(jié)點ID必須與提供給服務(wù)器的節(jié)點ID匹配。
比如在Controller1上,node.Id必須設(shè)置為1,以此類推。注意,控制器id不強制要求你從0或1開始。然而,分配節(jié)點ID的最簡單和最不容易混淆的方法是給每個服務(wù)器一個數(shù)字ID,然后從0開始。
(二)下載并解壓安裝包
bigdata01下載kafka的安裝包,并進(jìn)行解壓:
[hadoop@bigdata01 kraft]$ cd /opt/soft/[hadoop@bigdata01 soft]$ wget http://archive.apache.org/dist/kafka/3.1.0/kafka_2.12-3.1.0.tgz[hadoop@bigdata01 soft]$ tar -zxf kafka_2.12-3.1.0.tgz -C /opt/install/
修改kafka的配置文件broker.properties:
[hadoop@bigdata01 kafka_2.12-3.1.0]$ cd /opt/install/kafka_2.12-3.1.0/config/kraft/[hadoop@bigdata01 kraft]$ vim broker.properties
修改編輯內(nèi)容如下:
node.id=1controller.quorum.voters=1:9093listeners=PLAINTEXT://bigdata01:9092advertised.listeners=PLAINTEXT://bigdata01:9092log.dirs=/opt/install/kafka_2.12-3.1.0/kraftlogs
創(chuàng)建兩個文件夾:
[hadoop@bigdata01 kafka_2.12-3.1.0]$ mkdir -p /opt/install/kafka_2.12-3.1.0/kraftlogs[hadoop@bigdata01 kafka_2.12-3.1.0]$ mkdir -p /opt/install/kafka_2.12-3.1.0/topiclogs
同步安裝包到其他機器上面去。
(三)服務(wù)器集群啟動
啟動kafka服務(wù):
[hadoop@bigdata01 kafka_2.12-3.1.0]$ ./bin/kafka-storage.sh random-uuidYkJwr6RESgSJv-sxa1R1mA[hadoop@bigdata01 kafka_2.12-3.1.0]$ ./bin/kafka-storage.sh format -t YkJwr6RESgSJv-sxa1R1mA -c ./config/kraft/server.propertiesFormatting /opt/install/kafka_2.12-3.1.0/topiclogs[hadoop@bigdata01 kafka_2.12-3.1.0]$ ./bin/kafka-server-start.sh ./config/kraft/server.properties
(四)創(chuàng)建kafka的topic
集群啟動成功之后,就可以來創(chuàng)建kafka的topic了,使用以下命令來創(chuàng)建kafka的topic:
./bin/kafka-topics.sh --create --topic kafka_test --partitions 3 --replication-factor 2 --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092(五)任意一臺機器查看kafka的topic
組成集群之后,任意一臺機器就可以通過以下命令來查看到剛才創(chuàng)建的topic了:
[hadoop@bigdata03 ~]$ cd /opt/install/kafka_2.12-3.1.0/[hadoop@bigdata03 kafka_2.12-3.1.0]$ bin/kafka-topics.sh --list --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092
(六)消息生產(chǎn)與消費
使用命令行來生產(chǎn)以及消費kafka當(dāng)中的消息:
[hadoop kafka_2.12-3.1.0]$ bin/kafka-console-producer.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic kafka_test[hadoop kafka_2.12-3.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic kafka_test --from-beginning
三、Kafka當(dāng)中Raft的介紹
(一)kafka強依賴zk所引發(fā)的問題
前面我們已經(jīng)看到了kafka3集群在沒有zk集群的依賴下,也可以正常運行,那么kafka2在zk當(dāng)中保存的各種重要元數(shù)據(jù)信息,在kafka3當(dāng)中如何實現(xiàn)保存的呢?
kafka一直都是使用zk來管理集群以及所有的topic的元數(shù)據(jù),并且使用了zk的強一致性來選舉集群的controller,controller對整個集群的管理至關(guān)重要,包括分區(qū)的新增,ISR列表的維護(hù),等等很多功能都需要靠controller來實現(xiàn),然后使用zk來維護(hù)kafka的元數(shù)據(jù)也存在很多的問題以及存在性能瓶頸。
以下是kafka將元數(shù)據(jù)保存在zk當(dāng)中的諸多問題。
元數(shù)據(jù)存取困難
元數(shù)據(jù)的存取過于困難,每次重新選舉的controller需要把整個集群的元數(shù)據(jù)重新restore,非常的耗時且影響集群的可用性。
元數(shù)據(jù)更新網(wǎng)絡(luò)開銷大
整個元數(shù)據(jù)的更新操作也是以全量推的方式進(jìn)行,網(wǎng)絡(luò)的開銷也會非常大。
強耦合違背軟件設(shè)計原則
Zookeeper對于運維來說,維護(hù)Zookeeper也需要一定的開銷,并且kafka強耦合與zk也并不好,還得時刻擔(dān)心zk的宕機問題,違背軟件設(shè)計的高內(nèi)聚,低耦合的原則。
網(wǎng)絡(luò)分區(qū)復(fù)雜度高
Zookeeper本身并不能兼顧到broker與broker之間通信的狀態(tài),這就會導(dǎo)致網(wǎng)絡(luò)分區(qū)的復(fù)雜度成幾何倍數(shù)增長。
zk本身不適合做消息隊列
zookeeper不適合做消息隊列,因為zookeeper有1M的消息大小限制 zookeeper的children太多會極大的影響性能znode太大也會影響性能 znode太大會導(dǎo)致重啟zkserver耗時10-15分鐘 zookeeper僅使用內(nèi)存作為存儲,所以不能存儲太多東西。
并發(fā)訪問zk問題多
最好單線程操作zk客戶端,不要并發(fā),臨界、競態(tài)問題太多。
基于以上各種問題,所以提出了脫離zk的方案,轉(zhuǎn)向自助研發(fā)強一致性的元數(shù)據(jù)解決方案,也就是KIP-500。
KIP-500議案提出了在Kafka中處理元數(shù)據(jù)的更好方法。基本思想是"Kafka on Kafka",將Kafka的元數(shù)據(jù)存儲在Kafka本身中,無需增加額外的外部存儲比如ZooKeeper等。
去zookeeper之后的kafka新的架構(gòu)

在KIP-500中,Kafka控制器會將其元數(shù)據(jù)存儲在Kafka分區(qū)中,而不是存儲在ZooKeeper中。但是,由于控制器依賴于該分區(qū),因此分區(qū)本身不能依賴控制器來進(jìn)行領(lǐng)導(dǎo)者選舉之類的事情。而是,管理該分區(qū)的節(jié)點必須實現(xiàn)自我管理的Raft仲裁。
在kafka3.0的新的版本當(dāng)中,使用了新的KRaft協(xié)議,使用該協(xié)議來保證在元數(shù)據(jù)仲裁中準(zhǔn)確的復(fù)制元數(shù)據(jù),這個協(xié)議類似于zk當(dāng)中的zab協(xié)議以及類似于Raft協(xié)議,但是KRaft協(xié)議使用的是基于事件驅(qū)動的模式,與ZAB協(xié)議和Raft協(xié)議還有點不一樣
在kafka3.0之前的的版本當(dāng)中,主要是借助于controller來進(jìn)行l(wèi)eader partition的選舉,而在3.0協(xié)議當(dāng)中,使用了KRaft來實現(xiàn)自己選擇leader,并最終令所有節(jié)點達(dá)成共識,這樣簡化了controller的選舉過程,效果更加高效。
(二)kakfa3 Raft
前面我們已經(jīng)知道了在kafka3當(dāng)中可以不用再依賴于zk來保存kafka當(dāng)中的元數(shù)據(jù)了,轉(zhuǎn)而使用Kafka Raft來實現(xiàn)元數(shù)據(jù)的一致性,簡稱KRaft,并且將元數(shù)據(jù)保存在kafka自己的服務(wù)器當(dāng)中,大大提高了kafka的元數(shù)據(jù)管理的性能。
KRaft運行模式的Kafka集群,不會將元數(shù)據(jù)存儲在Apache ZooKeeper中。即部署新集群的時候,無需部署ZooKeeper集群,因為Kafka將元數(shù)據(jù)存儲在Controller節(jié)點的KRaft Quorum中。KRaft可以帶來很多好處,比如可以支持更多的分區(qū),更快速的切換Controller,也可以避免Controller緩存的元數(shù)據(jù)和Zookeeper存儲的數(shù)據(jù)不一致帶來的一系列問題。
在新的版本當(dāng)中,控制器Controller節(jié)點我們可以自己進(jìn)行指定,這樣最大的好處就是我們可以自己選擇一些配置比較好的機器成為Controller節(jié)點,而不像在之前的版本當(dāng)中,我們無法指定哪臺機器成為Controller節(jié)點,而且controller節(jié)點與broker節(jié)點可以運行在同一臺機器上,并且控制器controller節(jié)點不再向broker推送更新消息,而是讓Broker從這個Controller Leader節(jié)點進(jìn)行拉去元數(shù)據(jù)的更新。

(三)如何查看kafka3當(dāng)中的元數(shù)據(jù)信息
在kafka3當(dāng)中,不再使用zk來保存元數(shù)據(jù)信息了,那么在kafka3當(dāng)中如何查看元數(shù)據(jù)信息呢,我們也可以通過kafka自帶的命令來進(jìn)行查看元數(shù)據(jù)信息,在KRaft中,有兩個命令常用命令腳本,kafka-dump-log.sh和kakfa-metadata-shell.sh需要我們來進(jìn)行關(guān)注,因為我們可以通過這兩個腳本來查看kafka當(dāng)中保存的元數(shù)據(jù)信息。
Kafka-dump-log.sh腳本來導(dǎo)出元數(shù)據(jù)信息
KRaft模式下,所有的元數(shù)據(jù)信息都保存到了一個內(nèi)部的topic上面,叫做@metadata,例如Broker的信息,Topic的信息等,我們都可以去到這個topic上面進(jìn)行查看,我們可以通過kafka-dump-log.sh這個腳本來進(jìn)行查看該topic的信息。
Kafka-dump-log.sh是一個之前就有的工具,用來查看Topic的的文件內(nèi)容。這工具加了一個參數(shù)--cluster-metadata-decoder用來,查看元數(shù)據(jù)日志,如下所示:
[hadoop@bigdata01 kafka_2.12-3.1.0]$ cd /opt/install/kafka_2.12-3.1.0[hadoop@bigdata01 kafka_2.12-3.1.0]$ bin/kafka-dump-log.sh --cluster-metadata-decoder --skip-record-metadata --files /opt/install/kafka_2.12-3.1.0/topiclogs/__cluster_metadata-0/00000000000000000000.index,/opt/install/kafka_2.12-3.1.0/topiclogs/__cluster_metadata-0/00000000000000000000.log /opt/metadata.txt
kafka-metadata-shell.sh直接查看元數(shù)據(jù)信息
平時我們用zk的時候,習(xí)慣了用zk命令行查看數(shù)據(jù),簡單快捷。bin目錄下自帶了kafka-metadata-shell.sh工具,可以允許你像zk一樣方便的查看數(shù)據(jù)。
使用kafka-metadata-shell.sh腳本進(jìn)入kafka的元數(shù)據(jù)客戶端
[hadoop@bigdata01 kafka_2.12-3.1.0]$ bin/kafka-metadata-shell.sh --snapshot /opt/install/kafka_2.12-3.1.0/topiclogs/__cluster_metadata-0/00000000000000000000.log四、Raft算法介紹
raft算法中文版本翻譯介紹:
https://github.com/maemual/raft-zh_cn/blob/master/raft-zh_cn.md
著名的CAP原則又稱CAP定理的提出,真正奠基了分布式系統(tǒng)的誕生,CAP定理指的是在一個分布式系統(tǒng)中,[一致性]、[可用性](Availability)、[分區(qū)容錯性](Partition tolerance),這三個要素最多只能同時實現(xiàn)兩點,不可能三者兼顧(nosql)。
分布式系統(tǒng)為了提高系統(tǒng)的可靠性,一般都會選擇使用多副本的方式來進(jìn)行實現(xiàn),例如hdfs當(dāng)中數(shù)據(jù)的多副本,kafka集群當(dāng)中分區(qū)的多副本等,但是一旦有了多副本的話,那么久面臨副本之間一致性的問題,而一致性算法就是 用于解決分布式環(huán)境下多副本的數(shù)據(jù)一致性的問題。業(yè)界最著名的一致性算法就是大名鼎鼎的Paxos,但是Paxos比較晦澀難懂,不太容易理解,所以還有一種叫做Raft的算法,更加簡單容易理解的實現(xiàn)了一致性算法。
(一)Raft協(xié)議的工作原理
Raft協(xié)議當(dāng)中的角色分布
Raft協(xié)議將分布式系統(tǒng)當(dāng)中的角色分為Leader(領(lǐng)導(dǎo)者),F(xiàn)ollower(跟從者)以及Candidate(候選者)
Leader:主節(jié)點的角色,主要是接收客戶端請求,并向Follower同步日志,當(dāng)日志同步到過半及以上節(jié)點之后,告訴follower進(jìn)行提交日志。
Follower:從節(jié)點的角色,接受并持久化Leader同步的日志,在Leader通知可以提交日志之后,進(jìn)行提交保存的日志。
Candidate:Leader選舉過程中的臨時角色。

Raft協(xié)議當(dāng)中的底層原理
Raft協(xié)議當(dāng)中會選舉出Leader節(jié)點,Leader作為主節(jié)點,完全負(fù)責(zé)replicate log的管理。Leader負(fù)責(zé)接受所有客戶端的請求,然后復(fù)制到Follower節(jié)點,如果leader故障,那么follower會重新選舉leader,Raft協(xié)議的一致性,概括主要可以分為以下三個重要部分:
Leader選舉
日志復(fù)制
安全性
其中Leader選舉和日志復(fù)制是Raft協(xié)議當(dāng)中最為重要的。
Raft協(xié)議要求系統(tǒng)當(dāng)中,任意一個時刻,只有一個leader,正常工作期間,只有Leader和Follower角色,并且Raft協(xié)議采用了類似網(wǎng)絡(luò)租期的方式來進(jìn)行管理維護(hù)整個集群,Raft協(xié)議將時間分為一個個的時間段(term),也叫作任期,每一個任期都會選舉一個Leader來管理維護(hù)整個集群,如果這個時間段的Leader宕機,那么這一個任期結(jié)束,繼續(xù)重新選舉leader。
Raft算法將時間劃分成為任意不同長度的任期(term)。任期用連續(xù)的數(shù)字進(jìn)行表示。每一個任期的開始都是一次選舉(election),一個或多個候選人會試圖成為領(lǐng)導(dǎo)人。如果一個候選人贏得了選舉,它就會在該任期的剩余時間擔(dān)任領(lǐng)導(dǎo)人。在某些情況下,選票會被瓜分,有可能沒有選出領(lǐng)導(dǎo)人,那么,將會開始另一個任期,并且立刻開始下一次選舉。Raft算法保證在給定的一個任期最多只有一個領(lǐng)導(dǎo)人。

Leader選舉的過程
Raft使用心跳來進(jìn)行觸發(fā)leader選舉,當(dāng)服務(wù)器啟動時,初始化為follower角色。leader向所有Follower發(fā)送周期性心跳,如果Follower在選舉超時間內(nèi)沒有收到Leader的心跳,就會認(rèn)為leader宕機,稍后發(fā)起leader的選舉。
每個Follower都會有一個倒計時時鐘,是一個隨機的值,表示的是Follower等待成為Leader的時間,倒計時時鐘先跑完,就會當(dāng)選成為Leader,這樣做得好處就是每一個節(jié)點都有機會成為Leader。

當(dāng)滿足以下三個條件之一時,Quorum中的某個節(jié)點就會觸發(fā)選舉:
向Leader發(fā)送Fetch請求后,在超時閾值quorum.fetch.timeout.ms之后仍然沒有得到Fetch響應(yīng),表示Leader疑似失敗。
從當(dāng)前Leader收到了EndQuorumEpoch請求,表示Leader已退位。
Candidate狀態(tài)下,在超時閾值quorum.election.timeout.ms之后仍然沒有收到多數(shù)票,也沒有Candidate贏得選舉,表示此次選舉作廢,重新進(jìn)行選舉。
具體詳細(xì)過程實現(xiàn)描述如下:
增加節(jié)點本地的current term,切換到candidate狀態(tài)。
自己給自己投一票。
給其他節(jié)點發(fā)送RequestVote RPCs,要求其他節(jié)點也投自己一票。
等待其他節(jié)點的投票回復(fù)。
整個過程中的投票過程可以用下圖進(jìn)行表述。

leader節(jié)點選舉的限制
每個節(jié)點只能投一票,投給自己或者投給別人。
候選人所知道的日志信息,一定不能比自己的更少,即能被選舉成為leader節(jié)點,一定包含了所有已經(jīng)提交的日志。
先到先得的原則
數(shù)據(jù)一致性保證(日志復(fù)制機制)
前面通過選舉機制之后,選舉出來了leader節(jié)點,然后leader節(jié)點對外提供服務(wù),所有的客戶端的請求都會發(fā)送到leader節(jié)點,由leader節(jié)點來調(diào)度這些并發(fā)請求的處理順序,保證所有節(jié)點的狀態(tài)一致,leader會把請求作為日志條目(Log entries)加入到他的日志當(dāng)中,然后并行的向其他服務(wù)器發(fā)起AppendEntries RPC復(fù)制日志條目。當(dāng)這條請求日志被成功復(fù)制到大多數(shù)服務(wù)器上面之后,Leader將這條日志應(yīng)用到它的狀態(tài)機并向客戶端返回執(zhí)行結(jié)果。
客戶端的每個請求都包含被復(fù)制狀態(tài)機執(zhí)行的指令
leader將客戶端請求作為一條心得日志添加到日志文件中,然后并行發(fā)起RPC給其他的服務(wù)器,讓他們復(fù)制這條信息到自己的日志文件中保存。
如果這條日志被成功復(fù)制,也就是大部分的follower都保存好了執(zhí)行指令日志,leader就應(yīng)用這條日志到自己的狀態(tài)機中,并返回給客戶端。
如果follower宕機或者運行緩慢或者數(shù)據(jù)丟失,leader會不斷地進(jìn)行重試,直至所有在線的follower都成功復(fù)制了所有的日志條目。

與維護(hù)Consumer offset的方式類似,脫離ZK之后的Kafka集群將元數(shù)據(jù)視為日志,保存在一個內(nèi)置的Topic中,且該Topic只有一個Partition。

元數(shù)據(jù)日志的消息格式與普通消息沒有太大不同,但必須攜帶Leader的紀(jì)元值(即之前的Controller epoch):
Record => Offset LeaderEpoch ControlType Key Value Timestamp這樣,F(xiàn)ollower以拉模式復(fù)制Leader日志,就相當(dāng)于以Consumer角色消費元數(shù)據(jù)Topic,符合Kafka原生的語義。
那么在KRaft協(xié)議中,是如何維護(hù)哪些元數(shù)據(jù)日志已經(jīng)提交——即已經(jīng)成功復(fù)制到多數(shù)的Follower節(jié)點上的呢?Kafka仍然借用了原生副本機制中的概念——high watermark(HW,高水位線)保證日志不會丟失,HW的示意圖如下。

狀態(tài)機說明:
要讓所有節(jié)點達(dá)成一致性的狀態(tài),大部分都是基于復(fù)制狀態(tài)機來實現(xiàn)的(Replicated state machine)
簡單來說就是:初始相同的狀態(tài)+相同的輸入過程=相同的結(jié)束狀態(tài),這個其實也好理解,就類似于一對雙胞胎,出生時候就長得一樣,然后吃的喝的用的穿的都一樣,你自然很難分辨。其中最重要的就是一定要注意中間的相同輸入過程,各個不同節(jié)點要以相同且確定性的函數(shù)來處理輸入,而不要引入一個不確定的值。使用replicated log來實現(xiàn)每個節(jié)點都順序的寫入客戶端請求,然后順序的處理客戶端請求,最終就一定能夠達(dá)到最終一致性。
狀態(tài)機安全性保證:
在安全性方面,KRaft與傳統(tǒng)Raft的選舉安全性、領(lǐng)導(dǎo)者只追加、日志匹配和領(lǐng)導(dǎo)者完全性保證都是幾乎相同的。下面只簡單看看狀態(tài)機安全性是如何保證的,仍然舉論文中的極端例子:

在時刻a,節(jié)點S1是Leader,epoch=2的日志只復(fù)制給了S2就崩潰了。
在時刻b,S5被選舉為Leader,epoch=3的日志還沒來得及復(fù)制,也崩潰了。
在時刻c,S1又被選舉為Leader,繼續(xù)復(fù)制日志,將epoch=2的日志給了S3。此時該日志復(fù)制給了多數(shù)節(jié)點,但還未提交。
在時刻d,S1又崩潰,并且S5重新被選舉為領(lǐng)導(dǎo)者,將epoch=3的日志復(fù)制給S0~S4。
此時日志與新Leader S5的日志發(fā)生了沖突,如果按上圖中d1的方式處理,消息2就會丟失。傳統(tǒng)Raft協(xié)議的處理方式是:在Leader任期開始時,立刻提交一條空的日志,所以上圖中時刻c的情況不會發(fā)生,而是如同d2一樣先提交epoch=4的日志,連帶提交epoch=2的日志。
與傳統(tǒng)Raft不同,KRaft附加了一個較強的約束:當(dāng)新的Leader被選舉出來,但還沒有成功提交屬于它的epoch的日志時,不會向前推進(jìn)HW。也就是說,即使上圖中時刻c的情況發(fā)生了,消息2也被視為沒有成功提交,所以按照d1方式處理是安全的。
日志格式說明:
所有節(jié)點持久化保存在本地的日志,大概就是類似于這個樣子:

上圖顯示,共有八條日志數(shù)據(jù),其中已經(jīng)提交了7條,提交的日志都將通過狀態(tài)機持久化到本地磁盤當(dāng)中,防止宕機。
日志復(fù)制的保證機制
如果兩個節(jié)點不同的日志文件當(dāng)中存儲著相同的索引和任期號,那么他們所存儲的命令是相同的。(原因:leader最多在一個任期里的一個日志索引位置創(chuàng)建一條日志條目,日志條目所在的日志位置從來不會改變)。
如果不同日志中兩個條目有著相同的索引和任期號,那么他們之前的所有條目都是一樣的(原因:每次RPC發(fā)送附加日志時,leader會把這條日志前面的日志下標(biāo)和任期號一起發(fā)送給follower,如果follower發(fā)現(xiàn)和自己的日志不匹配,那么就拒絕接受這條日志,這個稱之為一致性檢查)
日志的不正常情況
一般情況下,Leader和Followers的日志保持一致,因此Append Entries一致性檢查通常不會失敗。然而,Leader崩潰可能會導(dǎo)致日志不一致:舊的Leader可能沒有完全復(fù)制完日志中的所有條目。
下圖闡述了一些Followers可能和新的Leader日志不同的情況。一個Follower可能會丟失掉Leader上的一些條目,也有可能包含一些Leader沒有的條目,也有可能兩者都會發(fā)生。丟失的或者多出來的條目可能會持續(xù)多個任期。

如何保證日志的正常復(fù)制
如果出現(xiàn)了上述leader宕機,導(dǎo)致follower與leader日志不一致的情況,那么就需要進(jìn)行處理,保證follower上的日志與leader上的日志保持一致,leader通過強制follower復(fù)制它的日志來處理不一致的問題,follower與leader不一致的日志會被強制覆蓋。leader為了最大程度的保證日志的一致性,且保證日志最大量,leader會尋找follower與他日志一致的地方,然后覆蓋follower之后的所有日志條目,從而實現(xiàn)日志數(shù)據(jù)的一致性。
具體的操作就是:leader會從后往前不斷對比,每次Append Entries失敗后嘗試前一個日志條目,直到成功找到每個Follower的日志一致的位置點,然后向該Follower所在位置之后的條目進(jìn)行覆蓋。
詳細(xì)過程如下:
Leader維護(hù)了每個Follower節(jié)點下一次要接收的日志的索引,即nextIndex。
Leader選舉成功后將所有Follower的nextIndex設(shè)置為自己的最后一個日志條目+1。
Leader將數(shù)據(jù)推送給Follower,如果Follower驗證失?。╪extIndex不匹配),則在下一次推送日志時縮小nextIndex,直到nextIndex驗證通過。
總結(jié)一下就是:當(dāng)leader和follower日志沖突的時候,leader將校驗 follower最后一條日志是否和leader匹配,如果不匹配,將遞減查詢,直到匹配,匹配后,刪除沖突的日志。這樣就實現(xiàn)了主從日志的一致性。
(二)Raft協(xié)議算法代碼實現(xiàn)
前面我們已經(jīng)大致了解了Raft協(xié)議算法的實現(xiàn)原理,如果我們要自己實現(xiàn)一個Raft協(xié)議的算法,其實就是將我們講到的理論知識給翻譯成為代碼的過程,具體的開發(fā)需要考慮的細(xì)節(jié)比較多,代碼量肯定也比較大,好在有人已經(jīng)實現(xiàn)了Raft協(xié)議的算法了,我們可以直接拿過來使用。
創(chuàng)建maven工程并導(dǎo)入jar包地址如下:
<dependencies><dependency><groupId>com.github.wenweihu86.raft</groupId><artifactId>raft-java-core</artifactId><version>1.8.0</version></dependency><dependency><groupId>com.github.wenweihu86.rpc</groupId><artifactId>rpc-java</artifactId><version>1.8.0</version></dependency><dependency><groupId>org.rocksdb</groupId><artifactId>rocksdbjni</artifactId><version>5.1.4</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
定義Server端代碼實現(xiàn):
public class Server1 {public static void main(String[] args) {// parse args// peers, format is "host:port:serverId,host2:port2:serverId2"//localhost:16010:1,localhost:16020:2,localhost:16030:3 localhost:16010:1String servers = "localhost:16010:1,localhost:16020:2,localhost:16030:3";// local serverRaftMessage.Server localServer = parseServer("localhost:16010:1");String[] splitArray = servers.split(",");List<RaftMessage.Server> serverList = new ArrayList<>();for (String serverString : splitArray) {RaftMessage.Server server = parseServer(serverString);serverList.add(server);}// 初始化RPCServerRPCServer server = new RPCServer(localServer.getEndPoint().getPort());// 設(shè)置Raft選項,比如:// just for test snapshotRaftOptions raftOptions = new RaftOptions();/* raftOptions.setSnapshotMinLogSize(10 * 1024);raftOptions.setSnapshotPeriodSeconds(30);raftOptions.setMaxSegmentFileSize(1024 * 1024);*/// 應(yīng)用狀態(tài)機ExampleStateMachine stateMachine = new ExampleStateMachine(raftOptions.getDataDir());// 初始化RaftNodeRaftNode raftNode = new RaftNode(raftOptions, serverList, localServer, stateMachine);raftNode.getLeaderId();// 注冊Raft節(jié)點之間相互調(diào)用的服務(wù)RaftConsensusService raftConsensusService = new RaftConsensusServiceImpl(raftNode);server.registerService(raftConsensusService);// 注冊給Client調(diào)用的Raft服務(wù)RaftClientService raftClientService = new RaftClientServiceImpl(raftNode);server.registerService(raftClientService);// 注冊應(yīng)用自己提供的服務(wù)ExampleService exampleService = new ExampleServiceImpl(raftNode, stateMachine);server.registerService(exampleService);// 啟動RPCServer,初始化Raft節(jié)點server.start();raftNode.init();}private static RaftMessage.Server parseServer(String serverString) {String[] splitServer = serverString.split(":");String host = splitServer[0];Integer port = Integer.parseInt(splitServer[1]);Integer serverId = Integer.parseInt(splitServer[2]);RaftMessage.EndPoint endPoint = RaftMessage.EndPoint.newBuilder().setHost(host).setPort(port).build();RaftMessage.Server.Builder serverBuilder = RaftMessage.Server.newBuilder();RaftMessage.Server server = serverBuilder.setServerId(serverId).setEndPoint(endPoint).build();return server;}}
定義客戶端代碼實現(xiàn)如下:
public class ClientMain {public static void main(String[] args) {// parse argsString ipPorts = args[0];String key = args[1];String value = null;if (args.length > 2) {value = args[2];}// init rpc clientRPCClient rpcClient = new RPCClient(ipPorts);ExampleService exampleService = RPCProxy.getProxy(rpcClient, ExampleService.class);final JsonFormat.Printer printer = JsonFormat.printer().omittingInsignificantWhitespace();// setif (value != null) {ExampleMessage.SetRequest setRequest = ExampleMessage.SetRequest.newBuilder().setKey(key).setValue(value).build();ExampleMessage.SetResponse setResponse = exampleService.set(setRequest);try {System.out.printf("set request, key=%s value=%s response=%s\n",key, value, printer.print(setResponse));} catch (Exception ex) {ex.printStackTrace();}} else {// getExampleMessage.GetRequest getRequest = ExampleMessage.GetRequest.newBuilder().setKey(key).build();ExampleMessage.GetResponse getResponse = exampleService.get(getRequest);try {String value1 = getResponse.getValue();System.out.println(value1);System.out.printf("get request, key=%s, response=%s\n",key, printer.print(getResponse));} catch (Exception ex) {ex.printStackTrace();}}rpcClient.stop();}}
先啟動服務(wù)端,然后啟動客戶端,就可以將實現(xiàn)客戶端向服務(wù)端發(fā)送消息,并且服務(wù)端會向三臺機器進(jìn)行保存消息了。
五、Kafka常見問題
(一)消息隊列模型知道嗎?Kafka是怎么做到支持這兩種模型的?
對于傳統(tǒng)的消息隊列系統(tǒng)支持兩個模型:
點對點:也就是消息只能被一個消費者消費,消費完后消息刪除。
發(fā)布訂閱:相當(dāng)于廣播模式,消息可以被所有消費者消費。
kafka其實就是通過Consumer Group同時支持了這兩個模型。如果說所有消費者都屬于一個Group,消息只能被同一個Group內(nèi)的一個消費者消費,那就是點對點模式。如果每個消費者都是一個單獨的Group,那么就是發(fā)布訂閱模式。
(二)說說Kafka通信過程原理嗎?
首先kafka broker啟動的時候,會去向Zookeeper注冊自己的ID(創(chuàng)建臨時節(jié)點),這個ID可以配置也可以自動生成,同時會去訂閱Zookeeper的brokers/ids路徑,當(dāng)有新的broker加入或者退出時,可以得到當(dāng)前所有broker信。
生產(chǎn)者啟動的時候會指定bootstrap.servers,通過指定的broker地址,Kafka就會和這些broker創(chuàng)建TCP連接(通常我們不用配置所有的broker服務(wù)器地址,否則kafka會和配置的所有broker都建立TCP連接)
隨便連接到任何一臺broker之后,然后再發(fā)送請求獲取元數(shù)據(jù)信息(包含有哪些主題、主題都有哪些分區(qū)、分區(qū)有哪些副本,分區(qū)的Leader副本等信息)
接著就會創(chuàng)建和所有broker的TCP連接。
之后就是發(fā)送消息的過程。
消費者和生產(chǎn)者一樣,也會指定bootstrap.servers屬性,然后選擇一臺broker創(chuàng)建TCP連接,發(fā)送請求找到協(xié)調(diào)者所在的broker。
然后再和協(xié)調(diào)者broker創(chuàng)建TCP連接,獲取元數(shù)據(jù)。
根據(jù)分區(qū)Leader節(jié)點所在的broker節(jié)點,和這些broker分別創(chuàng)建連接。
最后開始消費消息。
(三)發(fā)送消息時如何選擇分區(qū)的?
主要有兩種方式:
輪詢,按照順序消息依次發(fā)送到不同的分區(qū)。
隨機,隨機發(fā)送到某個分區(qū)。
如果消息指定key,那么會根據(jù)消息的key進(jìn)行hash,然后對partition分區(qū)數(shù)量取模,決定落在哪個分區(qū)上,所以,對于相同key的消息來說,總是會發(fā)送到同一個分區(qū)上,也是我們常說的消息分區(qū)有序性。
很常見的場景就是我們希望下單、支付消息有順序,這樣以訂單ID作為key發(fā)送消息就達(dá)到了分區(qū)有序性的目的。
如果沒有指定key,會執(zhí)行默認(rèn)的輪詢負(fù)載均衡策略,比如第一條消息落在P0,第二條消息落在P1,然后第三條又在P1。
除此之外,對于一些特定的業(yè)務(wù)場景和需求,還可以通過實現(xiàn)Partitioner接口,重寫configure和partition方法來達(dá)到自定義分區(qū)的效果。
(四)為什么需要分區(qū)?有什么好處?
這個問題很簡單,如果說不分區(qū)的話,我們發(fā)消息寫數(shù)據(jù)都只能保存到一個節(jié)點上,這樣的話就算這個服務(wù)器節(jié)點性能再好最終也支撐不住。
實際上分布式系統(tǒng)都面臨這個問題,要么收到消息之后進(jìn)行數(shù)據(jù)切分,要么提前切分,kafka正是選擇了前者,通過分區(qū)可以把數(shù)據(jù)均勻地分布到不同的節(jié)點。
分區(qū)帶來了負(fù)載均衡和橫向擴(kuò)展的能力。
發(fā)送消息時可以根據(jù)分區(qū)的數(shù)量落在不同的Kafka服務(wù)器節(jié)點上,提升了并發(fā)寫消息的性能,消費消息的時候又和消費者綁定了關(guān)系,可以從不同節(jié)點的不同分區(qū)消費消息,提高了讀消息的能力。
另外一個就是分區(qū)又引入了副本,冗余的副本保證了Kafka的高可用和高持久性。
(五)詳細(xì)說說消費者組和消費者重平衡?
Kafka中的消費者組訂閱topic主題的消息,一般來說消費者的數(shù)量最好要和所有主題分區(qū)的數(shù)量保持一致最好(舉例子用一個主題,實際上當(dāng)然是可以訂閱多個主題)。
當(dāng)消費者數(shù)量小于分區(qū)數(shù)量的時候,那么必然會有一個消費者消費多個分區(qū)的消息。
而消費者數(shù)量超過分區(qū)的數(shù)量的時候,那么必然會有消費者沒有分區(qū)可以消費。
所以,消費者組的好處一方面在上面說到過,可以支持多種消息模型,另外的話根據(jù)消費者和分區(qū)的消費關(guān)系,支撐橫向擴(kuò)容伸縮。

當(dāng)我們知道消費者如何消費分區(qū)的時候,就顯然會有一個問題出現(xiàn)了,消費者消費的分區(qū)是怎么分配的,有先加入的消費者時候怎么辦?
舊版本的重平衡過程主要通過ZK監(jiān)聽器的方式來觸發(fā),每個消費者客戶端自己去執(zhí)行分區(qū)分配算法。
新版本則是通過協(xié)調(diào)者來完成,每一次新的消費者加入都會發(fā)送請求給協(xié)調(diào)者去獲取分區(qū)的分配,這個分區(qū)分配的算法邏輯由協(xié)調(diào)者來完成。
而重平衡Rebalance就是指的有新消費者加入的情況,比如剛開始我們只有消費者A在消費消息,過了一段時間消費者B和C加入了,這時候分區(qū)就需要重新分配,這就是重平衡,也可以叫做再平衡,但是重平衡的過程和我們的GC時候STW很像,會導(dǎo)致整個消費群組停止工作,重平衡期間都無法消息消息。
另外,發(fā)生重平衡并不是只有這一種情況,因為消費者和分區(qū)總數(shù)是存在綁定關(guān)系的,上面也說了,消費者數(shù)量最好和所有主題的分區(qū)總數(shù)一樣。
那只要消費者數(shù)量、主題數(shù)量(比如用的正則訂閱的主題)、分區(qū)數(shù)量任何一個發(fā)生改變,都會觸發(fā)重平衡。
下面說說重平衡的過程。
重平衡的機制依賴消費者和協(xié)調(diào)者之間的心跳來維持,消費者會有一個獨立的線程去定時發(fā)送心跳給協(xié)調(diào)者,這個可以通過參數(shù)heartbeat.interval.ms來控制發(fā)送心跳的間隔時間。
每個消費者第一次加入組的時候都會向協(xié)調(diào)者發(fā)送JoinGroup請求,第一個發(fā)送這個請求的消費者會成為“群主”,協(xié)調(diào)者會返回組成員列表給群主。
群主執(zhí)行分區(qū)分配策略,然后把分配結(jié)果通過SyncGroup請求發(fā)送給協(xié)調(diào)者,協(xié)調(diào)者收到分區(qū)分配結(jié)果。
其他組內(nèi)成員也向協(xié)調(diào)者發(fā)送SyncGroup,協(xié)調(diào)者把每個消費者的分區(qū)分配分別響應(yīng)給他們。
(六)具體講講分區(qū)分配策略?
主要有3種分配策略
Range
對分區(qū)進(jìn)行排序,排序越靠前的分區(qū)能夠分配到更多的分區(qū)。
比如有3個分區(qū),消費者A排序更靠前,所以能夠分配到P0\P1兩個分區(qū),消費者B就只能分配到一個P2。
如果是4個分區(qū)的話,那么他們會剛好都是分配到2個。

但是這個分配策略會有點小問題,他是根據(jù)主題進(jìn)行分配,所以如果消費者組訂閱了多個主題,那就有可能導(dǎo)致分區(qū)分配不均衡。
比如下圖中兩個主題的P0\P1都被分配給了A,這樣A有4個分區(qū),而B只有2個,如果這樣的主題數(shù)量越多,那么不均衡就越嚴(yán)重。

RoundRobin
也就是我們常說的輪詢了,這個就比較簡單了,不畫圖你也能很容易理解。
這個會根據(jù)所有的主題進(jìn)行輪詢分配,不會出現(xiàn)Range那種主題越多可能導(dǎo)致分區(qū)分配不均衡的問題。
P0->A,P1->B,P1->A。。。以此類推

Sticky
這個從字面看來意思就是粘性策略,大概是這個意思。主要考慮的是在分配均衡的前提下,讓分區(qū)的分配更小的改動。
比如之前P0\P1分配給消費者A,那么下一次盡量還是分配給A。
這樣的好處就是連接可以復(fù)用,要消費消息總是要和broker去連接的,如果能夠保持上一次分配的分區(qū)的話,那么就不用頻繁的銷毀創(chuàng)建連接了。
(七)如何保證消息可靠性?
生產(chǎn)者發(fā)送消息丟失
kafka支持3種方式發(fā)送消息,這也是常規(guī)的3種方式,發(fā)送后不管結(jié)果、同步發(fā)送、異步發(fā)送,基本上所有的消息隊列都是這樣玩的。
發(fā)送并忘記,直接調(diào)用發(fā)送send方法,不管結(jié)果,雖然可以開啟自動重試,但是肯定會有消息丟失的可能。
同步發(fā)送,同步發(fā)送返回Future對象,我們可以知道發(fā)送結(jié)果,然后進(jìn)行處理。
異步發(fā)送,發(fā)送消息,同時指定一個回調(diào)函數(shù),根據(jù)結(jié)果進(jìn)行相應(yīng)的處理。
為了保險起見,一般我們都會使用異步發(fā)送帶有回調(diào)的方式進(jìn)行發(fā)送消息,再設(shè)置參數(shù)為發(fā)送消息失敗不停地重試。
acks=all,這個參數(shù)有可以配置0|1|all。
0表示生產(chǎn)者寫入消息不管服務(wù)器的響應(yīng),可能消息還在網(wǎng)絡(luò)緩沖區(qū),服務(wù)器根本沒有收到消息,當(dāng)然會丟失消息。
1表示至少有一個副本收到消息才認(rèn)為成功,一個副本那肯定就是集群的Leader副本了,但是如果剛好Leader副本所在的節(jié)點掛了,F(xiàn)ollower沒有同步這條消息,消息仍然丟失了。
配置all的話表示所有ISR都寫入成功才算成功,那除非所有ISR里的副本全掛了,消息才會丟失。
retries=N,設(shè)置一個非常大的值,可以讓生產(chǎn)者發(fā)送消息失敗后不停重試
Kafka 自身消息丟失。
kafka因為消息寫入是通過PageCache異步寫入磁盤的,因此仍然存在丟失消息的可能。
因此針對kafka自身丟失的可能設(shè)置參數(shù):
replication.factor=N,設(shè)置一個比較大的值,保證至少有2個或者以上的副本。
min.insync.replicas=N,代表消息如何才能被認(rèn)為是寫入成功,設(shè)置大于1的數(shù),保證至少寫入1個或者以上的副本才算寫入消息成功。
unclean.leader.election.enable=false,這個設(shè)置意味著沒有完全同步的分區(qū)副本不能成為Leader副本,如果是true的話,那些沒有完全同步Leader的副本成為Leader之后,就會有消息丟失的風(fēng)險。
消費者消息丟失
消費者丟失的可能就比較簡單,關(guān)閉自動提交位移即可,改為業(yè)務(wù)處理成功手動提交。
因為重平衡發(fā)生的時候,消費者會去讀取上一次提交的偏移量,自動提交默認(rèn)是每5秒一次,這會導(dǎo)致重復(fù)消費或者丟失消息。
enable.auto.commit=false,設(shè)置為手動提交。
還有一個參數(shù)我們可能也需要考慮進(jìn)去的:
auto.offset.reset=earliest,這個參數(shù)代表沒有偏移量可以提交或者broker上不存在偏移量的時候,消費者如何處理。earliest代表從分區(qū)的開始位置讀取,可能會重復(fù)讀取消息,但是不會丟失,消費方一般我們肯定要自己保證冪等,另外一種latest表示從分區(qū)末尾讀取,那就會有概率丟失消息。
綜合這幾個參數(shù)設(shè)置,我們就能保證消息不會丟失,保證了可靠性。
(八)聊聊副本和它的同步原理吧?
Kafka副本的之前提到過,分為Leader副本和Follower副本,也就是主副本和從副本,和其他的比如Mysql不一樣的是,Kafka中只有Leader副本會對外提供服務(wù),F(xiàn)ollower副本只是單純地和Leader保持?jǐn)?shù)據(jù)同步,作為數(shù)據(jù)冗余容災(zāi)的作用。
在Kafka中我們把所有副本的集合統(tǒng)稱為AR(Assigned Replicas),和Leader副本保持同步的副本集合稱為ISR(InSyncReplicas)。
ISR是一個動態(tài)的集合,維持這個集合會通過replica.lag.time.max.ms參數(shù)來控制,這個代表落后Leader副本的最長時間,默認(rèn)值10秒,所以只要Follower副本沒有落后Leader副本超過10秒以上,就可以認(rèn)為是和Leader同步的(簡單可以認(rèn)為就是同步時間差)。
另外還有兩個關(guān)鍵的概念用于副本之間的同步:
HW(High Watermark):高水位,也叫做復(fù)制點,表示副本間同步的位置。如下圖所示,0~4綠色表示已經(jīng)提交的消息,這些消息已經(jīng)在副本之間進(jìn)行同步,消費者可以看見這些消息并且進(jìn)行消費,4~6黃色的則是表示未提交的消息,可能還沒有在副本間同步,這些消息對于消費者是不可見的。
LEO(Log End Offset):下一條待寫入消息的位移

副本間同步的過程依賴的就是HW和LEO的更新,以他們的值變化來演示副本同步消息的過程,綠色表示Leader副本,黃色表示Follower副本。
首先,生產(chǎn)者不停地向Leader寫入數(shù)據(jù),這時候Leader的LEO可能已經(jīng)達(dá)到了10,但是HW依然是0,兩個Follower向Leader請求同步數(shù)據(jù),他們的值都是0。

此時,F(xiàn)ollower再次向Leader拉取數(shù)據(jù),這時候Leader會更新自己的HW值,取Follower中的最小的LEO值來更新。

之后,Leader響應(yīng)自己的HW給Follower,F(xiàn)ollower更新自己的HW值,因為又拉取到了消息,所以再次更新LEO,流程以此類推。

(九)Kafka為什么快?
主要是3個方面:
順序IO
kafka寫消息到分區(qū)采用追加的方式,也就是順序?qū)懭氪疟P,不是隨機寫入,這個速度比普通的隨機IO快非常多,幾乎可以和網(wǎng)絡(luò)IO的速度相媲美。
Page Cache和零拷貝
kafka在寫入消息數(shù)據(jù)的時候通過mmap內(nèi)存映射的方式,不是真正立刻寫入磁盤,而是利用操作系統(tǒng)的文件緩存PageCache異步寫入,提高了寫入消息的性能,另外在消費消息的時候又通過sendfile實現(xiàn)了零拷貝。
批量處理和壓縮
Kafka在發(fā)送消息的時候不是一條條的發(fā)送的,而是會把多條消息合并成一個批次進(jìn)行處理發(fā)送,消費消息也是一個道理,一次拉取一批次的消息進(jìn)行消費。
并且Producer、Broker、Consumer都使用了優(yōu)化后的壓縮算法,發(fā)送和消息消息使用壓縮節(jié)省了網(wǎng)絡(luò)傳輸?shù)拈_銷,Broker存儲使用壓縮則降低了磁盤存儲的空間。
參考資料:
1.《深入理解Kafka:核心設(shè)計實踐原理》
2.狀態(tài)機程序設(shè)計套路
3.raft算法源碼
4.https://www.bbsmax.com/A/QW5Y3kaBzm/
