Kafka 不知道從哪開始學(xué)?看這篇就夠了!
前言
文章已經(jīng)同步到個(gè)人網(wǎng)站:http://xiaoflyfish.cn/
「文章較長(zhǎng),可以點(diǎn)贊在看,謝謝,謝謝」
覺得不錯(cuò),可以關(guān)注一下「公眾號(hào)(月伴飛魚)」,之后會(huì)不定期分享系列文章

基本簡(jiǎn)介
Apache Kafka是由LinkedIn采用Scala和Java開發(fā)的開源流處理軟件平臺(tái),并捐贈(zèng)給了Apache Software Foundation。
該項(xiàng)目旨在提供統(tǒng)一的、高吞吐量、低延遲的平臺(tái)來處理實(shí)時(shí)數(shù)據(jù)流。
Kafka可以通過Kafka Connect連接到外部系統(tǒng),并提供了Kafka Streams。
「Kafka的特性」
Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng),主要特性如下:
| 特性 | 分布式 | 「高性能」 | 「持久性和擴(kuò)展性」 |
|---|---|---|---|
| 描述 | 多分區(qū) | 高吞吐量 | 數(shù)據(jù)可持久化 |
| 多副本 | 低延遲 | 容錯(cuò)性 | |
| 多訂閱者 | 高并發(fā) | 支持水平在線擴(kuò)展 | |
| 基于ZooKeeper調(diào)度 | 時(shí)間復(fù)雜度為O(1) | 消息自動(dòng)平衡 |
版本號(hào)
「Kafka版本命名」
我們?cè)诠倬W(wǎng)上下載Kafka時(shí),會(huì)看到這樣的版本:

前面的版本號(hào)是編譯Kafka源代碼的Scala編譯器版本。
Kafka服務(wù)器端的代碼完全由Scala語言編寫,Scala同時(shí)支持面向?qū)ο缶幊毯秃瘮?shù)式編程,用Scala寫成的源代碼編譯之后也是普通的.class文件,因此我們說Scala是JVM系的語言。
真正的Kafka版本號(hào)實(shí)際上是2.1.1。
?那么這個(gè)2.1.1又表示什么呢?
?
前面的2表示大版本號(hào),即Major Version;中間的1表示小版本號(hào)或次版本號(hào),即Minor Version;最后的1表示修訂版本號(hào),也就是Patch號(hào)。
Kafka社區(qū)在發(fā)布1.0.0版本后寫過一篇文章,宣布Kafka版本命名規(guī)則正式從4位演進(jìn)到3位,比如0.11.0.0版本就是4位版本號(hào)。
有個(gè)建議,不論用的是哪個(gè)版本,都請(qǐng)盡量保持服務(wù)器端版本和客戶端版本一致,否則你將損失很多Kafka為你提供的性能優(yōu)化收益。
「版本演進(jìn)」
0.7版本:只提供了最基礎(chǔ)的消息隊(duì)列功能。
0.8版本:引入了副本機(jī)制,至此kafka成為了一個(gè)整整意義上完備的分布式可靠消息隊(duì)列解決方案
0.9.0.0版本:增加了基礎(chǔ)的安全認(rèn)證/權(quán)限功能;使用Java重新了新版本消費(fèi)者API;引入了Kafka Connect組件。
0.11.0.0版本:提供了冪等性Producer API以及事務(wù)API;對(duì)Kafka消息格式做了重構(gòu)。
1.0和2.0版本:主要還是Kafka Streams的各種改進(jìn)
基本概念

「主題」
發(fā)布訂閱的對(duì)象是主題(Topic),可以為每 個(gè)業(yè)務(wù)、每個(gè)應(yīng)用甚至是每類數(shù)據(jù)都創(chuàng)建專屬的主題
「生產(chǎn)者和消費(fèi)者」
向主題發(fā)布消息的客戶端應(yīng)用程序稱為生產(chǎn)者,生產(chǎn)者程序通常持續(xù)不斷地 向一個(gè)或多個(gè)主題發(fā)送消息
訂閱這些主題消息的客戶端應(yīng)用程序就被稱為消費(fèi)者,消費(fèi)者也能夠同時(shí)訂閱多個(gè)主題的消息
「Broker」
集群由多個(gè) Broker 組成,Broker 負(fù)責(zé)接收和處理客戶端發(fā)送過來的請(qǐng)求,以及對(duì)消息進(jìn)行持久化
雖然多個(gè) Broker 進(jìn)程能夠運(yùn)行在同一臺(tái)機(jī)器上,但更常見的做法是將 不同的 Broker 分散運(yùn)行在不同的機(jī)器上,這樣如果集群中某一臺(tái)機(jī)器宕機(jī),即使在它上面 運(yùn)行的所有 Broker 進(jìn)程都掛掉了,其他機(jī)器上的 Broker 也依然能夠?qū)ν馓峁┓?wù)
「?jìng)浞輽C(jī)制」
備份的思想很簡(jiǎn)單,就是把相同的數(shù)據(jù)拷貝到多臺(tái)機(jī)器上,而這些相同的數(shù)據(jù)拷貝被稱為副本
定義了兩類副本:領(lǐng)導(dǎo)者副本和追隨者副本
前者對(duì)外提供服務(wù),這里的對(duì)外指的是與 客戶端程序進(jìn)行交互;而后者只是被動(dòng)地追隨領(lǐng)導(dǎo)者副本而已,不能與外界進(jìn)行交互
「分區(qū)」
分區(qū)機(jī)制指的是將每個(gè)主題劃分成多個(gè)分區(qū),每個(gè)分區(qū)是一組有序的消息日志
生產(chǎn)者生產(chǎn)的每條消息只會(huì)被發(fā)送到一個(gè)分區(qū)中,也就是說如果向一個(gè)雙分區(qū)的主題發(fā)送一條消息,這條消息要么在分區(qū) 0 中,要么在分區(qū) 1 中
每個(gè)分區(qū)下可以配置若干個(gè)副本,其中只能有 1 個(gè)領(lǐng) 導(dǎo)者副本和 N-1 個(gè)追隨者副本
生產(chǎn)者向分區(qū)寫入消息,每條消息在分區(qū)中的位置信息叫位移
「消費(fèi)者組」
多個(gè)消費(fèi)者實(shí)例共同組成一個(gè)組來 消費(fèi)一組主題
這組主題中的每個(gè)分區(qū)都只會(huì)被組內(nèi)的一個(gè)消費(fèi)者實(shí)例消費(fèi),其他消費(fèi)者實(shí)例不能消費(fèi)它
?同時(shí)實(shí)現(xiàn)了傳統(tǒng)消息引擎系統(tǒng)的兩大模型:
?
如果所有實(shí)例都屬于同一個(gè) Group, 那么它實(shí)現(xiàn)的就是消息隊(duì)列模型;
如果所有實(shí)例分別屬于不 同的 Group,那么它實(shí)現(xiàn)的就是發(fā)布/訂閱模型
「Coordinator:協(xié)調(diào)者」
所謂協(xié)調(diào)者,它專門為Consumer Group服務(wù),負(fù)責(zé)為Group執(zhí)行Rebalance以及提供位移管理和組成員管理等。
具體來講,Consumer端應(yīng)用程序在提交位移時(shí),其實(shí)是向Coordinator所在的Broker提交位移,同樣地,當(dāng)Consumer應(yīng)用啟動(dòng)時(shí),也是向Coordinator所在的Broker發(fā)送各種請(qǐng)求,然后由Coordinator負(fù)責(zé)執(zhí)行消費(fèi)者組的注冊(cè)、成員管理記錄等元數(shù)據(jù)管理操作。
所有Broker在啟動(dòng)時(shí),都會(huì)創(chuàng)建和開啟相應(yīng)的Coordinator組件。
也就是說,「所有Broker都有各自的Coordinator組件」。
那么,Consumer Group如何確定為它服務(wù)的Coordinator在哪臺(tái)Broker上呢?
通過Kafka內(nèi)部主題__consumer_offsets。
目前,Kafka為某個(gè)Consumer Group確定Coordinator所在的Broker的算法有2個(gè)步驟。
第1步:確定由
__consumer_offsets主題的哪個(gè)分區(qū)來保存該Group數(shù)據(jù):partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。第2步:找出該分區(qū)Leader副本所在的Broker,該Broker即為對(duì)應(yīng)的Coordinator。
首先,Kafka會(huì)計(jì)算該Group的group.id參數(shù)的哈希值。
比如你有個(gè)Group的group.id設(shè)置成了test-group,那么它的hashCode值就應(yīng)該是627841412。
其次,Kafka會(huì)計(jì)算__consumer_offsets的分區(qū)數(shù),通常是50個(gè)分區(qū),之后將剛才那個(gè)哈希值對(duì)分區(qū)數(shù)進(jìn)行取模加求絕對(duì)值計(jì)算,即abs(627841412 % 50) = 12。
此時(shí),我們就知道了__consumer_offsets主題的分區(qū)12負(fù)責(zé)保存這個(gè)Group的數(shù)據(jù)。
有了分區(qū)號(hào),我們只需要找出__consumer_offsets主題分區(qū)12的Leader副本在哪個(gè)Broker上就可以了,這個(gè)Broker,就是我們要找的Coordinator。
「消費(fèi)者位移:Consumer Offset」
消費(fèi)者消費(fèi)進(jìn)度,每個(gè)消費(fèi)者都有自己的消費(fèi)者位移。
「重平衡:Rebalance」
消費(fèi)者組內(nèi)某個(gè)消費(fèi)者實(shí)例掛掉后,其他消費(fèi)者實(shí)例自動(dòng)重新分配訂閱主題分區(qū)的過程。
Rebalance是Kafka消費(fèi)者端實(shí)現(xiàn)高可用的重要手段。
「AR(Assigned Replicas)」:分區(qū)中的所有副本統(tǒng)稱為AR。
所有消息會(huì)先發(fā)送到leader副本,然后follower副本才能從leader中拉取消息進(jìn)行同步。
但是在同步期間,follower對(duì)于leader而言會(huì)有一定程度的滯后,這個(gè)時(shí)候follower和leader并非完全同步狀態(tài)
「OSR(Out Sync Replicas)」:follower副本與leader副本沒有完全同步或滯后的副本集合
「ISR(In Sync Replicas):「AR中的一個(gè)子集,ISR中的副本都」是與leader保持完全同步的副本」,如果某個(gè)在ISR中的follower副本落后于leader副本太多,則會(huì)被從ISR中移除,否則如果完全同步,會(huì)從OSR中移至ISR集合。
在默認(rèn)情況下,當(dāng)leader副本發(fā)生故障時(shí),只有在ISR集合中的follower副本才有資格被選舉為新leader,而OSR中的副本沒有機(jī)會(huì)(可以通過unclean.leader.election.enable進(jìn)行配置)
「HW(High Watermark)」:高水位,它標(biāo)識(shí)了一個(gè)特定的消息偏移量(offset),消費(fèi)者只能拉取到這個(gè)水位 offset 之前的消息
下圖表示一個(gè)日志文件,這個(gè)日志文件中只有9條消息,第一條消息的offset(LogStartOffset)為0,最有一條消息的offset為8,offset為9的消息使用虛線表示的,代表下一條待寫入的消息。
日志文件的 HW 為6,表示消費(fèi)者只能拉取offset在 0 到 5 之間的消息,offset為6的消息對(duì)消費(fèi)者而言是不可見的。

「LEO(Log End Offset)」:標(biāo)識(shí)當(dāng)前日志文件中下一條待寫入的消息的offset
上圖中offset為9的位置即為當(dāng)前日志文件的 LEO,LEO 的大小相當(dāng)于當(dāng)前日志分區(qū)中最后一條消息的offset值加1
分區(qū) ISR 集合中的每個(gè)副本都會(huì)維護(hù)自身的 LEO ,而 ISR 集合中最小的 LEO 即為分區(qū)的 HW,對(duì)消費(fèi)者而言只能消費(fèi) HW 之前的消息。
系統(tǒng)架構(gòu)
「kafka設(shè)計(jì)思想」
一個(gè)最基本的架構(gòu)是生產(chǎn)者發(fā)布一個(gè)消息到Kafka的一個(gè)Topic ,該Topic的消息存放于的Broker中,消費(fèi)者訂閱這個(gè)Topic,然后從Broker中消費(fèi)消息,下面這個(gè)圖可以更直觀的描述這個(gè)場(chǎng)景:

「消息狀態(tài):」 在Kafka中,消息是否被消費(fèi)的狀態(tài)保存在Consumer中,Broker不會(huì)關(guān)心消息是否被消費(fèi)或被誰消費(fèi),Consumer會(huì)記錄一個(gè)offset值(指向partition中下一條將要被消費(fèi)的消息位置),如果offset被錯(cuò)誤設(shè)置可能導(dǎo)致同一條消息被多次消費(fèi)或者消息丟失。
「消息持久化:」 Kafka會(huì)把消息持久化到本地文件系統(tǒng)中,并且具有極高的性能。
「批量發(fā)送:」 Kafka支持以消息集合為單位進(jìn)行批量發(fā)送,以提高效率。
「Push-and-Pull:」 Kafka中的Producer和Consumer采用的是Push-and-Pull模式,即Producer向Broker Push消息,Consumer從Broker Pull消息。
「分區(qū)機(jī)制(Partition):」 Kafka的Broker端支持消息分區(qū),Producer可以決定把消息發(fā)到哪個(gè)Partition,在一個(gè)Partition中消息的順序就是Producer發(fā)送消息的順序,一個(gè)Topic中的Partition數(shù)是可配置的,Partition是Kafka高吞吐量的重要保證。
「系統(tǒng)架構(gòu)」

通常情況下,一個(gè)kafka體系架構(gòu)包括「多個(gè)Producer」、「多個(gè)Consumer」、「多個(gè)broker」以及「一個(gè)Zookeeper集群」。
「Producer」:生產(chǎn)者,負(fù)責(zé)將消息發(fā)送到kafka中。
「Consumer」:消費(fèi)者,負(fù)責(zé)從kafka中拉取消息進(jìn)行消費(fèi)。
「Broker」:Kafka服務(wù)節(jié)點(diǎn),一個(gè)或多個(gè)Broker組成了一個(gè)Kafka集群
「Zookeeper集群」:負(fù)責(zé)管理kafka集群元數(shù)據(jù)以及控制器選舉等。
生產(chǎn)者分區(qū)
「為什么分區(qū)?」
Kafka的消息組織方式實(shí)際上是三級(jí)結(jié)構(gòu):主題-分區(qū)-消息。
主題下的每條消息只會(huì)保存在某一個(gè)分區(qū)中,而不會(huì)在多個(gè)分區(qū)中被保存多份。
其實(shí)分區(qū)的作用就是提供負(fù)載均衡的能力,或者說對(duì)數(shù)據(jù)進(jìn)行分區(qū)的主要原因,就是為了實(shí)現(xiàn)系統(tǒng)的高伸縮性(Scalability)。
不同的分區(qū)能夠被放置到不同節(jié)點(diǎn)的機(jī)器上,而數(shù)據(jù)的讀寫操作也都是針對(duì)分區(qū)這個(gè)粒度而進(jìn)行的,這樣每個(gè)節(jié)點(diǎn)的機(jī)器都能獨(dú)立地執(zhí)行各自分區(qū)的讀寫請(qǐng)求處理,并且,我們還可以通過添加新的節(jié)點(diǎn)機(jī)器來增加整體系統(tǒng)的吞吐量。
「都有哪些分區(qū)策略?」
「所謂分區(qū)策略是決定生產(chǎn)者將消息發(fā)送到哪個(gè)分區(qū)的算法?!?/strong>
Kafka為我們提供了默認(rèn)的分區(qū)策略,同時(shí)它也支持你自定義分區(qū)策略。
「自定義分區(qū)策略」
如果要自定義分區(qū)策略,你需要顯式地配置生產(chǎn)者端的參數(shù)partitioner.class。
在編寫生產(chǎn)者程序時(shí),你可以編寫一個(gè)具體的類實(shí)現(xiàn)org.apache.kafka.clients.producer.Partitioner接口。
這個(gè)接口也很簡(jiǎn)單,只定義了兩個(gè)方法:partition()和close(),通常你只需要實(shí)現(xiàn)最重要的partition方法。
我們來看看這個(gè)方法的方法簽名:
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
這里的topic、key、keyBytes、value和valueBytes都屬于消息數(shù)據(jù),cluster則是集群信息(比如當(dāng)前Kafka集群共有多少主題、多少Broker等)。
Kafka給你這么多信息,就是希望讓你能夠充分地利用這些信息對(duì)消息進(jìn)行分區(qū),計(jì)算出它要被發(fā)送到哪個(gè)分區(qū)中。
只要你自己的實(shí)現(xiàn)類定義好了partition方法,同時(shí)設(shè)置partitioner.class參數(shù)為你自己實(shí)現(xiàn)類的Full Qualified Name,那么生產(chǎn)者程序就會(huì)按照你的代碼邏輯對(duì)消息進(jìn)行分區(qū)。
「輪詢策略」
也稱Round-robin策略,即順序分配。
比如一個(gè)主題下有3個(gè)分區(qū),那么第一條消息被發(fā)送到分區(qū)0,第二條被發(fā)送到分區(qū)1,第三條被發(fā)送到分區(qū)2,以此類推。當(dāng)生產(chǎn)第4條消息時(shí)又會(huì)重新開始,即將其分配到分區(qū)0
這就是所謂的輪詢策略。輪詢策略是Kafka Java生產(chǎn)者API默認(rèn)提供的分區(qū)策略。
「輪詢策略有非常優(yōu)秀的負(fù)載均衡表現(xiàn),它總是能保證消息最大限度地被平均分配到所有分區(qū)上,故默認(rèn)情況下它是最合理的分區(qū)策略,也是我們最常用的分區(qū)策略之一?!?/strong>
「隨機(jī)策略」
也稱Randomness策略。所謂隨機(jī)就是我們隨意地將消息放置到任意一個(gè)分區(qū)上。
如果要實(shí)現(xiàn)隨機(jī)策略版的partition方法,很簡(jiǎn)單,只需要兩行代碼即可:
List partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
先計(jì)算出該主題總的分區(qū)數(shù),然后隨機(jī)地返回一個(gè)小于它的正整數(shù)。
本質(zhì)上看隨機(jī)策略也是力求將數(shù)據(jù)均勻地打散到各個(gè)分區(qū),但從實(shí)際表現(xiàn)來看,它要遜于輪詢策略,所以「如果追求數(shù)據(jù)的均勻分布,還是使用輪詢策略比較好」。事實(shí)上,隨機(jī)策略是老版本生產(chǎn)者使用的分區(qū)策略,在新版本中已經(jīng)改為輪詢了。
「按消息鍵保序策略」
Kafka允許為每條消息定義消息鍵,簡(jiǎn)稱為Key。
這個(gè)Key的作用非常大,它可以是一個(gè)有著明確業(yè)務(wù)含義的字符串,比如客戶代碼、部門編號(hào)或是業(yè)務(wù)ID等;也可以用來表征消息元數(shù)據(jù)。
特別是在Kafka不支持時(shí)間戳的年代,在一些場(chǎng)景中,工程師們都是直接將消息創(chuàng)建時(shí)間封裝進(jìn)Key里面的。
一旦消息被定義了Key,那么你就可以保證同一個(gè)Key的所有消息都進(jìn)入到相同的分區(qū)里面,由于每個(gè)分區(qū)下的消息處理都是有順序的,故這個(gè)策略被稱為按消息鍵保序策略
實(shí)現(xiàn)這個(gè)策略的partition方法同樣簡(jiǎn)單,只需要下面兩行代碼即可:
List partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
前面提到的Kafka默認(rèn)分區(qū)策略實(shí)際上同時(shí)實(shí)現(xiàn)了兩種策略:如果指定了Key,那么默認(rèn)實(shí)現(xiàn)按消息鍵保序策略;如果沒有指定Key,則使用輪詢策略。
「其他分區(qū)策略」
其實(shí)還有一種比較常見的,即所謂的基于地理位置的分區(qū)策略。
當(dāng)然這種策略一般只針對(duì)那些大規(guī)模的Kafka集群,特別是跨城市、跨國家甚至是跨大洲的集群。
我們可以根據(jù)Broker所在的IP地址實(shí)現(xiàn)定制化的分區(qū)策略。比如下面這段代碼:
List partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
我們可以從所有分區(qū)中找出那些Leader副本在南方的所有分區(qū),然后隨機(jī)挑選一個(gè)進(jìn)行消息發(fā)送。
生產(chǎn)者壓縮算法
「Kafka是如何壓縮消息的呢?」
目前Kafka共有兩大類消息格式,社區(qū)分別稱之為V1版本和V2版本。
V2版本是Kafka 0.11.0.0中正式引入的。
不論是哪個(gè)版本,Kafka的消息層次都分為兩層:消息集合以及消息。
一個(gè)消息集合中包含若干條日志項(xiàng),而日志項(xiàng)才是真正封裝消息的地方。
Kafka底層的消息日志由一系列消息集合日志項(xiàng)組成。
Kafka通常不會(huì)直接操作具體的一條條消息,它總是在消息集合這個(gè)層面上進(jìn)行寫入操作。
「那么社區(qū)引入V2版本的目的是什么呢?」
V2版本主要是針對(duì)V1版本的一些弊端做了修正,比如把消息的公共部分抽取出來放到外層消息集合里面,這樣就不用每條消息都保存這些信息了。
舉個(gè)例子:原來在V1版本中,每條消息都需要執(zhí)行CRC校驗(yàn),但有些情況下消息的CRC值是會(huì)發(fā)生變化的。
比如在Broker端可能會(huì)對(duì)消息時(shí)間戳字段進(jìn)行更新,那么重新計(jì)算之后的CRC值也會(huì)相應(yīng)更新;再比如Broker端在執(zhí)行消息格式轉(zhuǎn)換時(shí)(主要是為了兼容老版本客戶端程序),也會(huì)帶來CRC值的變化。
鑒于這些情況,再對(duì)每條消息都執(zhí)行CRC校驗(yàn)就有點(diǎn)沒必要了,不僅浪費(fèi)空間還耽誤CPU時(shí)間,因此在V2版本中,消息的CRC校驗(yàn)工作就被移到了消息集合這一層。
V2版本還有一個(gè)和壓縮息息相關(guān)的改進(jìn),就是保存壓縮消息的方法發(fā)生了變化。
之前V1版本中保存壓縮消息的方法是把多條消息進(jìn)行壓縮然后保存到外層消息的消息體字段中;而V2版本的做法是對(duì)整個(gè)消息集合進(jìn)行壓縮,顯然后者應(yīng)該比前者有更好的壓縮效果。
「何時(shí)壓縮?」
在Kafka中,壓縮可能發(fā)生在兩個(gè)地方:生產(chǎn)者端和Broker端。
生產(chǎn)者程序中配置compression.type參數(shù)即表示啟用指定類型的壓縮算法。
比如下面這段程序代碼展示了如何構(gòu)建一個(gè)開啟GZIP的Producer對(duì)象:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 開啟GZIP壓縮
props.put("compression.type", "gzip");
Producer producer = new KafkaProducer<>(props);
這里比較關(guān)鍵的代碼行是props.put(“compression.type”, “gzip”),它表明該P(yáng)roducer的壓縮算法使用的是GZIP。
這樣Producer啟動(dòng)后生產(chǎn)的每個(gè)消息集合都是經(jīng)GZIP壓縮過的,故而能很好地節(jié)省網(wǎng)絡(luò)傳輸帶寬以及Kafka Broker端的磁盤占用。
有兩種例外情況就可能讓Broker重新壓縮消息:
「情況一:Broker端指定了和Producer端不同的壓縮算法?!?/strong>
一旦你在Broker端設(shè)置了不同的compression.type值,就一定要小心了,因?yàn)榭赡軙?huì)發(fā)生預(yù)料之外的壓縮/解壓縮操作,通常表現(xiàn)為Broker端CPU使用率飆升。
「情況二:Broker端發(fā)生了消息格式轉(zhuǎn)換?!?/strong>
所謂的消息格式轉(zhuǎn)換主要是為了兼容老版本的消費(fèi)者程序。
在一個(gè)生產(chǎn)環(huán)境中,Kafka集群中同時(shí)保存多種版本的消息格式非常常見。
為了兼容老版本的格式,Broker端會(huì)對(duì)新版本消息執(zhí)行向老版本格式的轉(zhuǎn)換。
這個(gè)過程中會(huì)涉及消息的解壓縮和重新壓縮。
一般情況下這種消息格式轉(zhuǎn)換對(duì)性能是有很大影響的,除了這里的壓縮之外,它還讓Kafka喪失了Zero Copy特性。
「何時(shí)解壓縮?」
有壓縮必有解壓縮!通常來說解壓縮發(fā)生在消費(fèi)者程序中,也就是說Producer發(fā)送壓縮消息到Broker后,Broker照單全收并原樣保存起來。當(dāng)Consumer程序請(qǐng)求這部分消息時(shí),Broker依然原樣發(fā)送出去,當(dāng)消息到達(dá)Consumer端后,由Consumer自行解壓縮還原成之前的消息。
「基本過程:Producer端壓縮、Broker端保持、Consumer端解壓縮。」
注意:除了在Consumer端解壓縮,Broker端也會(huì)進(jìn)行解壓縮。
每個(gè)壓縮過的消息集合在Broker端寫入時(shí)都要發(fā)生解壓縮操作,目的就是為了對(duì)消息執(zhí)行各種驗(yàn)證。
我們必須承認(rèn)這種解壓縮對(duì)Broker端性能是有一定影響的,特別是對(duì)CPU的使用率而言。
「各種壓縮算法對(duì)比」
在Kafka 2.1.0版本之前,Kafka支持3種壓縮算法:GZIP、Snappy和LZ4。
從2.1.0開始,Kafka正式支持Zstandard算法(簡(jiǎn)寫為zstd)。
它是Facebook開源的一個(gè)壓縮算法,能夠提供超高的壓縮比。
在實(shí)際使用中,GZIP、Snappy、LZ4和zstd的表現(xiàn)各有千秋。
但對(duì)于Kafka而言,在吞吐量方面:LZ4 > Snappy > zstd和GZIP;而在壓縮比方面,zstd > LZ4 > GZIP > Snappy。
具體到物理資源,使用Snappy算法占用的網(wǎng)絡(luò)帶寬最多,zstd最少;
在CPU使用率方面,各個(gè)算法表現(xiàn)得差不多,只是在壓縮時(shí)Snappy算法使用的CPU較多一些,而在解壓縮時(shí)GZIP算法則可能使用更多的CPU。
「最佳實(shí)踐」
?何時(shí)啟用壓縮是比較合適的時(shí)機(jī)呢?
?
啟用壓縮的一個(gè)條件就是Producer程序運(yùn)行機(jī)器上的CPU資源要很充足。
除了CPU資源充足這一條件,如果你的環(huán)境中帶寬資源有限,那么建議你開啟壓縮。
消費(fèi)者組
「Consumer Group是Kafka提供的可擴(kuò)展且具有容錯(cuò)性的消費(fèi)者機(jī)制」。
既然是一個(gè)組,那么組內(nèi)必然可以有多個(gè)消費(fèi)者或消費(fèi)者實(shí)例,它們共享一個(gè)公共的ID,這個(gè)ID被稱為Group ID。
組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來消費(fèi)訂閱主題的所有分區(qū)。
?每個(gè)分區(qū)只能由同一個(gè)消費(fèi)者組內(nèi)的一個(gè)Consumer實(shí)例來消費(fèi)。
?
「Consumer Group三個(gè)特性:」
Consumer Group下可以有一個(gè)或多個(gè)Consumer實(shí)例,這里的實(shí)例可以是一個(gè)單獨(dú)的進(jìn)程,也可以是同一進(jìn)程下的線程。 Group ID是一個(gè)字符串,在一個(gè)Kafka集群中,它標(biāo)識(shí)唯一的一個(gè)Consumer Group。 Consumer Group下所有實(shí)例訂閱的主題的單個(gè)分區(qū),只能分配給組內(nèi)的某個(gè)Consumer實(shí)例消費(fèi),這個(gè)分區(qū)當(dāng)然也可以被其他的Group消費(fèi)。
當(dāng)Consumer Group訂閱了多個(gè)主題后,組內(nèi)的每個(gè)實(shí)例不要求一定要訂閱主題的所有分區(qū),它只會(huì)消費(fèi)部分分區(qū)中的消息。
Consumer Group之間彼此獨(dú)立,互不影響,它們能夠訂閱相同的一組主題而互不干涉。
「Kafka僅僅使用Consumer Group這一種機(jī)制,卻同時(shí)實(shí)現(xiàn)了傳統(tǒng)消息引擎系統(tǒng)的兩大模型」:
如果所有實(shí)例都屬于同一個(gè)Group,那么它實(shí)現(xiàn)的就是消息隊(duì)列模型;
如果所有實(shí)例分別屬于不同的Group,那么它實(shí)現(xiàn)的就是發(fā)布/訂閱模型。
「一個(gè)Group下該有多少個(gè)Consumer實(shí)例呢?」
「理想情況下,Consumer實(shí)例的數(shù)量應(yīng)該等于該Group訂閱主題的分區(qū)總數(shù)?!?/strong>
假設(shè)一個(gè)Consumer Group訂閱了3個(gè)主題,分別是A、B、C,它們的分區(qū)數(shù)依次是1、2、3,那么通常情況下,為該Group設(shè)置6個(gè)Consumer實(shí)例是比較理想的情形,因?yàn)樗茏畲笙薅鹊貙?shí)現(xiàn)高伸縮性。
「針對(duì)Consumer Group,Kafka是怎么管理位移的呢?」
「位移Offset」
老版本的Consumer Group把位移保存在ZooKeeper中。
Apache ZooKeeper是一個(gè)分布式的協(xié)調(diào)服務(wù)框架,Kafka重度依賴它實(shí)現(xiàn)各種各樣的協(xié)調(diào)管理。
將位移保存在ZooKeeper外部系統(tǒng)的做法,最顯而易見的好處就是減少了Kafka Broker端的狀態(tài)保存開銷。
不過,慢慢地發(fā)現(xiàn)了一個(gè)問題,即ZooKeeper這類元框架其實(shí)并不適合進(jìn)行頻繁的寫更新,而Consumer Group的位移更新卻是一個(gè)非常頻繁的操作。
這種大吞吐量的寫操作會(huì)極大地拖慢ZooKeeper集群的性能。
于是,在新版本的Consumer Group中,Kafka社區(qū)重新設(shè)計(jì)了Consumer Group的位移管理方式,采用了將位移保存在Kafka內(nèi)部主題的方法。
這個(gè)內(nèi)部主題就是__consumer_offsets。
消費(fèi)者策略
「第一種是Round」
默認(rèn),也叫輪循,說的是對(duì)于同一組消費(fèi)者來說,使用輪訓(xùn)分配的方式,決定消費(fèi)者消費(fèi)的分區(qū)

「第二種叫做Range」
對(duì)一個(gè)消費(fèi)者組來說決定消費(fèi)方式是以分區(qū)總數(shù)除以消費(fèi)者總數(shù)來決定,一般如果不能整除,往往是從頭開始將剩余的分區(qū)分配開

「第三種叫Sticky」
是在0.11.x,新增的,它和前面兩個(gè)不是很一樣,它是在Range上的一種升華,且前面兩個(gè)當(dāng)同組內(nèi)有新的消費(fèi)者加入或者舊的消費(fèi)者退出的時(shí)候,會(huì)從新開始決定消費(fèi)者消費(fèi)方式,但是Sticky,在同組中有新的新的消費(fèi)者加入或者舊的消費(fèi)者退出時(shí),不會(huì)直接開始新的Range分配,而是保留現(xiàn)有消費(fèi)者原來的消費(fèi)策略,將退出的消費(fèi)者所消費(fèi)的分區(qū)平均分配給現(xiàn)有消費(fèi)者,新增消費(fèi)者同理,同其他現(xiàn)存消費(fèi)者的消費(fèi)策略中分離
位移提交
假設(shè)一個(gè)分區(qū)中有10條消息,位移分別是0到9。
某個(gè)Consumer應(yīng)用已消費(fèi)了5條消息,這就說明該Consumer消費(fèi)了位移為0到4的5條消息,此時(shí)Consumer的位移是5,指向了下一條消息的位移。
因?yàn)镃onsumer能夠同時(shí)消費(fèi)多個(gè)分區(qū)的數(shù)據(jù),所以位移的提交實(shí)際上是在分區(qū)粒度上進(jìn)行的,即「Consumer需要為分配給它的每個(gè)分區(qū)提交各自的位移數(shù)據(jù)」。
「位移提交分為自動(dòng)提交和手動(dòng)提交;從Consumer端的角度來說,位移提交分為同步提交和異步提交」。
開啟自動(dòng)提交位移的方法:Consumer端有個(gè)參數(shù)enable.auto.commit,把它設(shè)置為true或者壓根不設(shè)置它就可以了。
因?yàn)樗哪J(rèn)值就是true,即Java Consumer默認(rèn)就是自動(dòng)提交位移的。
如果啟用了自動(dòng)提交,Consumer端還有個(gè)參數(shù):auto.commit.interval.ms。
它的默認(rèn)值是5秒,表明Kafka每5秒會(huì)為你自動(dòng)提交一次位移。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
上面的第3、第4行代碼,就是開啟自動(dòng)提交位移的方法。
開啟手動(dòng)提交位移的方法就是設(shè)置enable.auto.commit為false。
還需要調(diào)用相應(yīng)的API手動(dòng)提交位移。最簡(jiǎn)單的API就是「KafkaConsumer#commitSync()」。
該方法會(huì)提交KafkaConsumer#poll()返回的最新位移。
從名字上來看,它是一個(gè)同步操作,即該方法會(huì)一直等待,直到位移被成功提交才會(huì)返回。
如果提交過程中出現(xiàn)異常,該方法會(huì)將異常信息拋出。
下面這段代碼展示了commitSync()的使用方法:
while (true) {
ConsumerRecords records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 處理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 處理提交失敗異常
}
}
一旦設(shè)置了enable.auto.commit為true,Kafka會(huì)保證在開始調(diào)用poll方法時(shí),提交上次poll返回的所有消息。
從順序上來說,poll方法的邏輯是先提交上一批消息的位移,再處理下一批消息,因此它能保證不出現(xiàn)消費(fèi)丟失的情況。
但自動(dòng)提交位移的一個(gè)問題在于,「它可能會(huì)出現(xiàn)重復(fù)消費(fèi)」。
而手動(dòng)提交位移,它的好處就在于更加靈活,你完全能夠把控位移提交的時(shí)機(jī)和頻率。
但是,它也有一個(gè)缺陷,就是在調(diào)用commitSync()時(shí),Consumer程序會(huì)處于阻塞狀態(tài),直到遠(yuǎn)端的Broker返回提交結(jié)果,這個(gè)狀態(tài)才會(huì)結(jié)束。
鑒于這個(gè)問題,Kafka社區(qū)為手動(dòng)提交位移提供了另一個(gè)API方法:「KafkaConsumer#commitAsync()」。
從名字上來看它就不是同步的,而是一個(gè)異步操作。
調(diào)用commitAsync()之后,它會(huì)立即返回,不會(huì)阻塞,因此不會(huì)影響Consumer應(yīng)用的TPS。
由于它是異步的,Kafka提供了回調(diào)函數(shù)(callback),供你實(shí)現(xiàn)提交之后的邏輯,比如記錄日志或處理異常等。
下面這段代碼展示了調(diào)用commitAsync()的方法:
while (true) {
ConsumerRecords records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 處理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null)
handle(exception);
});
}
commitAsync的問題在于,出現(xiàn)問題時(shí)它不會(huì)自動(dòng)重試。
顯然,如果是手動(dòng)提交,我們需要將commitSync和commitAsync組合使用才能到達(dá)最理想的效果,原因有兩個(gè):
我們可以利用commitSync的自動(dòng)重試來規(guī)避那些瞬時(shí)錯(cuò)誤,比如網(wǎng)絡(luò)的瞬時(shí)抖動(dòng),Broker端GC等,因?yàn)檫@些問題都是短暫的,自動(dòng)重試通常都會(huì)成功。 我們不希望程序總處于阻塞狀態(tài),影響TPS。
我們來看一下下面這段代碼,它展示的是如何將兩個(gè)API方法結(jié)合使用進(jìn)行手動(dòng)提交。
try {
while(true) {
ConsumerRecords records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 處理消息
commitAysnc(); // 使用異步提交規(guī)避阻塞
}
} catch(Exception e) {
handle(e); // 處理異常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
這樣一個(gè)場(chǎng)景:你的poll方法返回的不是500條消息,而是5000條。
那么,你肯定不想把這5000條消息都處理完之后再提交位移,因?yàn)橐坏┲虚g出現(xiàn)差錯(cuò),之前處理的全部都要重來一遍。
比如前面這個(gè)5000條消息的例子,你可能希望每處理完100條消息就提交一次位移,這樣能夠避免大批量的消息重新消費(fèi)。
Kafka Consumer API為手動(dòng)提交提供了這樣的方法:commitSync(Map)和commitAsync(Map)。
它們的參數(shù)是一個(gè)Map對(duì)象,鍵就是TopicPartition,即消費(fèi)的分區(qū),而值是一個(gè)OffsetAndMetadata對(duì)象,保存的主要是位移數(shù)據(jù)。
?如何每處理100條消息就提交一次位移呢?
?
在這里,我以commitAsync為例,展示一段代碼,實(shí)際上,commitSync的調(diào)用方法和它是一模一樣的。
private Map offsets = new HashMap<>();
int count = 0;
……
while (true) {
ConsumerRecords records =
consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord record: records) {
process(record); // 處理消息
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1);
if(count % 100 == 0)
consumer.commitAsync(offsets, null); // 回調(diào)處理邏輯是null
count++;
}
}
程序先是創(chuàng)建了一個(gè)Map對(duì)象,用于保存Consumer消費(fèi)處理過程中要提交的分區(qū)位移,之后開始逐條處理消息,并構(gòu)造要提交的位移值。
代碼的最后部分是做位移的提交。設(shè)置了一個(gè)計(jì)數(shù)器,每累計(jì)100條消息就統(tǒng)一提交一次位移。
與調(diào)用無參的commitAsync不同,這里調(diào)用了帶Map對(duì)象參數(shù)的commitAsync進(jìn)行細(xì)粒度的位移提交。
這樣,這段代碼就能夠?qū)崿F(xiàn)每處理100條消息就提交一次位移,不用再受poll方法返回的消息總數(shù)的限制了。
重平衡
「(重平衡)Rebalance本質(zhì)上是一種協(xié)議,規(guī)定了一個(gè)Consumer Group下的所有Consumer如何達(dá)成一致,來分配訂閱Topic的每個(gè)分區(qū)」。
比如某個(gè)Group下有20個(gè)Consumer實(shí)例,它訂閱了一個(gè)具有100個(gè)分區(qū)的Topic。
正常情況下,Kafka平均會(huì)為每個(gè)Consumer分配5個(gè)分區(qū)。這個(gè)分配的過程就叫Rebalance。
「Rebalance的觸發(fā)條件有3個(gè)?!?/strong>
組成員數(shù)發(fā)生變更。比如有新的Consumer實(shí)例加入組或者離開組,或是有Consumer實(shí)例崩潰被踢出組。 訂閱主題數(shù)發(fā)生變更。Consumer Group可以使用正則表達(dá)式的方式訂閱主題,比如 consumer.subscribe(Pattern.compile(“t.*c”))就表明該Group訂閱所有以字母t開頭、字母c結(jié)尾的主題,在Consumer Group的運(yùn)行過程中,你新創(chuàng)建了一個(gè)滿足這樣條件的主題,那么該Group就會(huì)發(fā)生Rebalance。訂閱主題的分區(qū)數(shù)發(fā)生變更。Kafka當(dāng)前只能允許增加一個(gè)主題的分區(qū)數(shù),當(dāng)分區(qū)數(shù)增加時(shí),就會(huì)觸發(fā)訂閱該主題的所有Group開啟Rebalance。
Rebalance發(fā)生時(shí),Group下所有的Consumer實(shí)例都會(huì)協(xié)調(diào)在一起共同參與。
「分配策略」
當(dāng)前Kafka默認(rèn)提供了3種分配策略,每種策略都有一定的優(yōu)勢(shì)和劣勢(shì),社區(qū)會(huì)不斷地完善這些策略,保證提供最公平的分配策略,即每個(gè)Consumer實(shí)例都能夠得到較為平均的分區(qū)數(shù)。
比如一個(gè)Group內(nèi)有10個(gè)Consumer實(shí)例,要消費(fèi)100個(gè)分區(qū),理想的分配策略自然是每個(gè)實(shí)例平均得到10個(gè)分區(qū)。
這就叫公平的分配策略。
舉個(gè)簡(jiǎn)單的例子來說明一下Consumer Group發(fā)生Rebalance的過程。
假設(shè)目前某個(gè)Consumer Group下有兩個(gè)Consumer,比如A和B,當(dāng)?shù)谌齻€(gè)成員C加入時(shí),Kafka會(huì)觸發(fā)Rebalance,并根據(jù)默認(rèn)的分配策略重新為A、B和C分配分區(qū)
Rebalance之后的分配依然是公平的,即每個(gè)Consumer實(shí)例都獲得了2個(gè)分區(qū)的消費(fèi)權(quán)。
在Rebalance過程中,所有Consumer實(shí)例都會(huì)停止消費(fèi),等待Rebalance完成,這是Rebalance為人詬病的一個(gè)方面。
目前Rebalance的設(shè)計(jì)是所有Consumer實(shí)例共同參與,全部重新分配所有分區(qū)。
「Coordinator會(huì)在什么情況下認(rèn)為某個(gè)Consumer實(shí)例已掛從而要退組呢?」
當(dāng)Consumer Group完成Rebalance之后,每個(gè)Consumer實(shí)例都會(huì)定期地向Coordinator發(fā)送心跳請(qǐng)求,表明它還存活著。
如果某個(gè)Consumer實(shí)例不能及時(shí)地發(fā)送這些心跳請(qǐng)求,Coordinator就會(huì)認(rèn)為該Consumer已經(jīng)死了,從而將其從Group中移除,然后開啟新一輪Rebalance。
Consumer端有個(gè)參數(shù),叫session.timeout.ms。
該參數(shù)的默認(rèn)值是10秒,即如果Coordinator在10秒之內(nèi)沒有收到Group下某Consumer實(shí)例的心跳,它就會(huì)認(rèn)為這個(gè)Consumer實(shí)例已經(jīng)掛了。
除了這個(gè)參數(shù),Consumer還提供了一個(gè)允許你控制發(fā)送心跳請(qǐng)求頻率的參數(shù),就是heartbeat.interval.ms。
這個(gè)值設(shè)置得越小,Consumer實(shí)例發(fā)送心跳請(qǐng)求的頻率就越高。
頻繁地發(fā)送心跳請(qǐng)求會(huì)額外消耗帶寬資源,但好處是能夠更加快速地知曉當(dāng)前是否開啟Rebalance,因?yàn)?,目前Coordinator通知各個(gè)Consumer實(shí)例開啟Rebalance的方法,就是將REBALANCE_NEEDED標(biāo)志封裝進(jìn)心跳請(qǐng)求的響應(yīng)體中。
除了以上兩個(gè)參數(shù),Consumer端還有一個(gè)參數(shù),用于控制Consumer實(shí)際消費(fèi)能力對(duì)Rebalance的影響,即max.poll.interval.ms參數(shù)。
它限定了Consumer端應(yīng)用程序兩次調(diào)用poll方法的最大時(shí)間間隔。
它的默認(rèn)值是5分鐘,表示你的Consumer程序如果在5分鐘之內(nèi)無法消費(fèi)完poll方法返回的消息,那么Consumer會(huì)主動(dòng)發(fā)起離開組的請(qǐng)求,Coordinator也會(huì)開啟新一輪Rebalance。
「可避免Rebalance的配置」
第一類Rebalance是因?yàn)槲茨芗皶r(shí)發(fā)送心跳,導(dǎo)致Consumer被踢出Group而引發(fā)的
因此可以設(shè)置「session.timeout.ms和heartbeat.interval.ms」的值。
設(shè)置 session.timeout.ms= 6s。設(shè)置 heartbeat.interval.ms= 2s。要保證Consumer實(shí)例在被判定為dead之前,能夠發(fā)送至少3輪的心跳請(qǐng)求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。
將session.timeout.ms設(shè)置成6s主要是為了讓Coordinator能夠更快地定位已經(jīng)掛掉的Consumer。
「第二類Rebalance是Consumer消費(fèi)時(shí)間過長(zhǎng)導(dǎo)致的」。
你要為你的業(yè)務(wù)處理邏輯留下充足的時(shí)間,這樣Consumer就不會(huì)因?yàn)樘幚磉@些消息的時(shí)間太長(zhǎng)而引發(fā)Rebalance了。
ConsumerOffsets
「Kafka將Consumer的位移數(shù)據(jù)作為一條條普通的Kafka消息,提交到__consumer_offsets中。」
「__consumer_offsets的主要作用是保存Kafka消費(fèi)者的位移信息。」
它要求這個(gè)提交過程不僅要實(shí)現(xiàn)高持久性,還要支持高頻的寫操作。
__consumer_offsets主題就是普通的Kafka主題。你可以手動(dòng)地創(chuàng)建它、修改它,甚至是刪除它。
雖說__consumer_offsets主題是一個(gè)普通的Kafka主題,但「它的消息格式卻是Kafka自己定義的」,用戶不能修改,也就是說你不能隨意地向這個(gè)主題寫消息,因?yàn)橐坏┠銓懭氲南⒉粷M足Kafka規(guī)定的格式,那么Kafka內(nèi)部無法成功解析,就會(huì)造成Broker的崩潰。
Kafka Consumer有API幫你提交位移,也就是向__consumer_offsets主題寫消息,千萬不要自己寫個(gè)Producer隨意向該主題發(fā)送消息。
__consumer_offsets有3種消息格式:
用于保存Consumer Group信息的消息。 用于刪除Group過期位移甚至是刪除Group的消息。 保存了位移值。
第2種格式它有個(gè)專屬的名字:tombstone消息,即墓碑消息,也稱delete mark,它的主要特點(diǎn)是它的消息體是null,即空消息體。
一旦某個(gè)Consumer Group下的所有Consumer實(shí)例都停止了,而且它們的位移數(shù)據(jù)都已被刪除時(shí),Kafka會(huì)向__consumer_offsets主題的對(duì)應(yīng)分區(qū)寫入tombstone消息,表明要徹底刪除這個(gè)Group的信息。
__consumer_offsets是怎么被創(chuàng)建的?
通常來說,「當(dāng)Kafka集群中的第一個(gè)Consumer程序啟動(dòng)時(shí),Kafka會(huì)自動(dòng)創(chuàng)建位移主題」。
「默認(rèn)該主題的分區(qū)數(shù)是50,副本數(shù)是3」。
目前Kafka Consumer提交位移的方式有兩種:「自動(dòng)提交位移和手動(dòng)提交位移?!?/strong>
Consumer端有個(gè)參數(shù)叫enable.auto.commit,如果值是true,則Consumer在后臺(tái)默默地為你定期提交位移,提交間隔由一個(gè)專屬的參數(shù)auto.commit.interval.ms來控制。
自動(dòng)提交位移有一個(gè)顯著的優(yōu)點(diǎn),就是省事,你不用操心位移提交的事情,就能保證消息消費(fèi)不會(huì)丟失。
但這一點(diǎn)同時(shí)也是缺點(diǎn),喪失了很大的靈活性和可控性,你完全沒法把控Consumer端的位移管理。
Kafka Consumer API為你提供了位移提交的方法,如consumer.commitSync等。
當(dāng)調(diào)用這些方法時(shí),Kafka會(huì)向__consumer_offsets主題寫入相應(yīng)的消息。
如果你選擇的是自動(dòng)提交位移,那么就可能存在一個(gè)問題:只要Consumer一直啟動(dòng)著,它就會(huì)無限期地向位移主題寫入消息。
「舉個(gè)極端一點(diǎn)的例子?!?/strong>
假設(shè)Consumer當(dāng)前消費(fèi)到了某個(gè)主題的最新一條消息,位移是100,之后該主題沒有任何新消息產(chǎn)生,故Consumer無消息可消費(fèi)了,所以位移永遠(yuǎn)保持在100。
由于是自動(dòng)提交位移,位移主題中會(huì)不停地寫入位移=100的消息。
顯然Kafka只需要保留這類消息中的最新一條就可以了,之前的消息都是可以刪除的。
這就要求Kafka必須要有針對(duì)位移主題消息特點(diǎn)的消息刪除策略,否則這種消息會(huì)越來越多,最終撐爆整個(gè)磁盤。
「Compact策略」
Kafka使用「Compact策略」來刪除__consumer_offsets主題中的過期消息,避免該主題無限期膨脹。
比如對(duì)于同一個(gè)Key的兩條消息M1和M2,如果M1的發(fā)送時(shí)間早于M2,那么M1就是過期消息。
Compact的過程就是掃描日志的所有消息,剔除那些過期的消息,然后把剩下的消息整理在一起。
我在這里貼一張來自官網(wǎng)的圖片,來說明Compact過程。

圖中位移為0、2和3的消息的Key都是K1,Compact之后,分區(qū)只需要保存位移為3的消息,因?yàn)樗亲钚掳l(fā)送的。
「Kafka提供了專門的后臺(tái)線程定期地巡檢待Compact的主題,看看是否存在滿足條件的可刪除數(shù)據(jù)」。
這個(gè)后臺(tái)線程叫Log Cleaner。
很多實(shí)際生產(chǎn)環(huán)境中都出現(xiàn)過位移主題無限膨脹占用過多磁盤空間的問題,如果你的環(huán)境中也有這個(gè)問題,建議你去檢查一下Log Cleaner線程的狀態(tài),通常都是這個(gè)線程掛掉了導(dǎo)致的。
副本機(jī)制
根據(jù)Kafka副本機(jī)制的定義,同一個(gè)分區(qū)下的所有副本保存有相同的消息序列,這些副本分散保存在不同的Broker上,從而能夠?qū)共糠諦roker宕機(jī)帶來的數(shù)據(jù)不可用。
下面展示的是一個(gè)有3臺(tái)Broker的Kafka集群上的副本分布情況。
從這張圖中,我們可以看到,主題1分區(qū)0的3個(gè)副本分散在3臺(tái)Broker上,其他主題分區(qū)的副本也都散落在不同的Broker上,從而實(shí)現(xiàn)數(shù)據(jù)冗余。

「副本角色」

在Kafka中,副本分成兩類:領(lǐng)導(dǎo)者副本(Leader Replica)和追隨者副本(Follower Replica)。
每個(gè)分區(qū)在創(chuàng)建時(shí)都要選舉一個(gè)副本,稱為領(lǐng)導(dǎo)者副本,其余的副本自動(dòng)稱為追隨者副本。
在Kafka中,追隨者副本是不對(duì)外提供服務(wù)的。這就是說,任何一個(gè)追隨者副本都不能響應(yīng)消費(fèi)者和生產(chǎn)者的讀寫請(qǐng)求。所有的請(qǐng)求都必須由領(lǐng)導(dǎo)者副本來處理,或者說,所有的讀寫請(qǐng)求都必須發(fā)往領(lǐng)導(dǎo)者副本所在的Broker,由該Broker負(fù)責(zé)處理。
追隨者副本不處理客戶端請(qǐng)求,它唯一的任務(wù)就是從領(lǐng)導(dǎo)者副本「異步拉取」消息,并寫入到自己的提交日志中,從而實(shí)現(xiàn)與領(lǐng)導(dǎo)者副本的同步。
當(dāng)領(lǐng)導(dǎo)者副本掛掉了,或者說領(lǐng)導(dǎo)者副本所在的Broker宕機(jī)時(shí),Kafka依托于ZooKeeper提供的監(jiān)控功能能夠?qū)崟r(shí)感知到,并立即開啟新一輪的領(lǐng)導(dǎo)者選舉,從追隨者副本中選一個(gè)作為新的領(lǐng)導(dǎo)者。老Leader副本重啟回來后,只能作為追隨者副本加入到集群中。
對(duì)于客戶端用戶而言,Kafka的追隨者副本沒有任何作用,Kafka為什么要這樣設(shè)計(jì)呢?
這種副本機(jī)制有兩個(gè)方面的好處。
1.「方便實(shí)現(xiàn)Read-your-writes」。
所謂Read-your-writes,顧名思義就是,當(dāng)你使用生產(chǎn)者API向Kafka成功寫入消息后,馬上使用消費(fèi)者API去讀取剛才生產(chǎn)的消息。
2.「方便實(shí)現(xiàn)單調(diào)讀(Monotonic Reads)」。
假設(shè)當(dāng)前有2個(gè)追隨者副本F1和F2,它們異步地拉取領(lǐng)導(dǎo)者副本數(shù)據(jù)。倘若F1拉取了Leader的最新消息而F2還未及時(shí)拉取,那么,此時(shí)如果有一個(gè)消費(fèi)者先從F1讀取消息之后又從F2拉取消息,它可能會(huì)看到這樣的現(xiàn)象:第一次消費(fèi)時(shí)看到的最新消息在第二次消費(fèi)時(shí)不見了,這就不是單調(diào)讀一致性。
但是,如果所有的讀請(qǐng)求都是由Leader來處理,那么Kafka就很容易實(shí)現(xiàn)單調(diào)讀一致性。
ISR機(jī)制
In-sync Replicas,也就是所謂的ISR副本集合。
ISR中的副本都是與Leader同步的副本,相反,不在ISR中的追隨者副本就被認(rèn)為是與Leader不同步的。
?什么副本能夠進(jìn)入到ISR中呢?
?
Leader副本天然就在ISR中。也就是說,「ISR不只是追隨者副本集合,它必然包括Leader副本。甚至在某些情況下,ISR只有Leader這一個(gè)副本」。
另外,能夠進(jìn)入到ISR的追隨者副本要滿足一定的條件。
「通過Broker端參數(shù)replica.lag.time.max.ms參數(shù)值」。
這個(gè)參數(shù)的含義是Follower副本能夠落后Leader副本的最長(zhǎng)時(shí)間間隔,當(dāng)前默認(rèn)值是10秒。
這就是說,只要一個(gè)Follower副本落后Leader副本的時(shí)間不連續(xù)超過10秒,那么Kafka就認(rèn)為該Follower副本與Leader是同步的,即使此時(shí)Follower副本中保存的消息明顯少于Leader副本中的消息。
Follower副本唯一的工作就是不斷地從Leader副本拉取消息,然后寫入到自己的提交日志中。
倘若該副本后面慢慢地追上了Leader的進(jìn)度,那么它是能夠重新被加回ISR的。
ISR是一個(gè)動(dòng)態(tài)調(diào)整的集合,而非靜態(tài)不變的。
Unclean領(lǐng)導(dǎo)者選舉
「Kafka把所有不在ISR中的存活副本都稱為非同步副本」。
通常來說,非同步副本落后Leader太多,因此,如果選擇這些副本作為新Leader,就可能出現(xiàn)數(shù)據(jù)的丟失。
畢竟,這些副本中保存的消息遠(yuǎn)遠(yuǎn)落后于老Leader中的消息。
在Kafka中,選舉這種副本的過程稱為Unclean領(lǐng)導(dǎo)者選舉。
「Broker端參數(shù)unclean.leader.election.enable控制是否允許Unclean領(lǐng)導(dǎo)者選舉」。
開啟Unclean領(lǐng)導(dǎo)者選舉可能會(huì)造成數(shù)據(jù)丟失,但好處是,它使得分區(qū)Leader副本一直存在,不至于停止對(duì)外提供服務(wù),因此提升了高可用性。反之,禁止Unclean領(lǐng)導(dǎo)者選舉的好處在于維護(hù)了數(shù)據(jù)的一致性,避免了消息丟失,但犧牲了高可用性。
副本選舉
對(duì)于kafka集群中對(duì)于任意的topic的分區(qū)以及副本leader的設(shè)定,都需要考慮到集群整體的負(fù)載能力的平衡性,會(huì)盡量分配每一個(gè)partition的副本leader在不同的broker中,這樣會(huì)避免多個(gè)leader在同一個(gè)broker,導(dǎo)致集群中的broker負(fù)載不平衡
kafka引入了優(yōu)先副本的概念,優(yōu)先副本的意思在AR(分區(qū)中的所有副本)集合列表中的第一個(gè)副本,在理想狀態(tài)下該副本就是該分區(qū)的leader副本
例如kafka集群由3臺(tái)broker組成,創(chuàng)建了一個(gè)名為topic-partitions的topic,設(shè)置partition為3,副本數(shù)為3,partition0中AR列表為 [1,2,0],那么分區(qū)0的優(yōu)先副本為1
kafka使用多副本機(jī)制提高可靠性,但是只有l(wèi)eader副本對(duì)外提供讀寫服務(wù),follow副本只是做消息同步。
「如果一個(gè)分區(qū)的leader副本不可用,就意味著整個(gè)分區(qū)不可用,此時(shí)需要從follower副本中選舉出新的leader副本提供服務(wù)」。
「在創(chuàng)建主題的時(shí)候,該分區(qū)的主題和副本會(huì)盡可能的均勻發(fā)布到kafka的各個(gè)broker上」。
比如我們?cè)诎?個(gè)broker節(jié)點(diǎn)的kafka集群上創(chuàng)建一個(gè)分區(qū)數(shù)為3,副本因子為3的主題topic-partitions時(shí),leader副本會(huì)均勻的分布在3臺(tái)broker節(jié)點(diǎn)上。

「針對(duì)同一個(gè)分區(qū),在同一個(gè)broker節(jié)點(diǎn)上不可能出現(xiàn)它的多個(gè)副本」。
我們可以把leader副本所在的節(jié)點(diǎn)叫作分區(qū)的leader節(jié)點(diǎn),把follower副本所在的節(jié)點(diǎn)叫作follower節(jié)點(diǎn)。
在上面的例子中,分區(qū)0的leader節(jié)點(diǎn)是broker1,分區(qū)1的leader節(jié)點(diǎn)是broker2,分區(qū)2的leader節(jié)點(diǎn)是broker0。
當(dāng)分區(qū)leader節(jié)點(diǎn)發(fā)生故障時(shí),其中的一個(gè)follower節(jié)點(diǎn)就會(huì)選舉為新的leader節(jié)點(diǎn)。
當(dāng)原來leader的節(jié)點(diǎn)恢復(fù)之后,它只能成為一個(gè)follower節(jié)點(diǎn),此時(shí)就導(dǎo)致了集群負(fù)載不均衡。
比如分區(qū)1的leader節(jié)點(diǎn)broker2崩潰了,此時(shí)選舉了在broker1上的分區(qū)1follower節(jié)點(diǎn)作為新的leader節(jié)點(diǎn)。
當(dāng)broker2重新恢復(fù)時(shí),此時(shí)的kafka集群狀態(tài)如下:

可以看到,此時(shí)broker1上負(fù)載更大,而broker2上沒有負(fù)載。
「為了解決上述負(fù)載不均衡的情況,kafka支持了優(yōu)先副本選舉,優(yōu)先副本指的是一個(gè)分區(qū)所在的AR集合的第一個(gè)副本」。
比如上面的分區(qū)1,它的AR集合是[2,0,1],表示分區(qū)1的優(yōu)先副本就是在broker2上。
理想情況下,優(yōu)先副本應(yīng)該就是leader副本,kafka保證了優(yōu)先副本的均衡分布,而這與broker節(jié)點(diǎn)宕機(jī)與否沒有關(guān)系。
「優(yōu)先副本選舉就是對(duì)分區(qū)leader副本進(jìn)行選舉的時(shí)候,盡可能讓優(yōu)先副本成為leader副本」,針對(duì)上述的情況,只要再觸發(fā)一次優(yōu)先副本選舉就能保證分區(qū)負(fù)載均衡。
kafka支持自動(dòng)優(yōu)先副本選舉功能,默認(rèn)每5分鐘觸發(fā)一次優(yōu)先副本選舉操作。
網(wǎng)絡(luò)通信模型

Broker 中有個(gè)Acceptor(mainReactor)監(jiān)聽新連接的到來,與新連接建連之后輪詢選擇一個(gè)Processor(subReactor)管理這個(gè)連接。
而Processor會(huì)監(jiān)聽其管理的連接,當(dāng)事件到達(dá)之后,讀取封裝成Request,并將Request放入共享請(qǐng)求隊(duì)列中。
然后IO線程池不斷的從該隊(duì)列中取出請(qǐng)求,執(zhí)行真正的處理。處理完之后將響應(yīng)發(fā)送到對(duì)應(yīng)的Processor的響應(yīng)隊(duì)列中,然后由Processor將Response返還給客戶端。
每個(gè)listener只有一個(gè)Acceptor線程,因?yàn)樗皇亲鳛樾逻B接建連再分發(fā),沒有過多的邏輯,很輕量。
Processor 在Kafka中稱之為網(wǎng)絡(luò)線程,默認(rèn)網(wǎng)絡(luò)線程池有3個(gè)線程,對(duì)應(yīng)的參數(shù)是num.network.threads,并且可以根據(jù)實(shí)際的業(yè)務(wù)動(dòng)態(tài)增減。
還有個(gè) IO 線程池,即KafkaRequestHandlerPool,執(zhí)行真正的處理,對(duì)應(yīng)的參數(shù)是num.io.threads,默認(rèn)值是 8。
IO線程處理完之后會(huì)將Response放入對(duì)應(yīng)的Processor中,由Processor將響應(yīng)返還給客戶端。
可以看到網(wǎng)絡(luò)線程和IO線程之間利用的經(jīng)典的生產(chǎn)者 - 消費(fèi)者模式,不論是用于處理Request的共享請(qǐng)求隊(duì)列,還是IO處理完返回的Response。
冪等性
「冪等性Producer」
在Kafka中,Producer默認(rèn)不是冪等性的,但我們可以創(chuàng)建冪等性Producer。
它其實(shí)是0.11.0.0版本引入的新功能,在此之前,Kafka向分區(qū)發(fā)送數(shù)據(jù)時(shí),可能會(huì)出現(xiàn)同一條消息被發(fā)送了多次,導(dǎo)致消息重復(fù)的情況。
在0.11之后,指定Producer冪等性的方法很簡(jiǎn)單,僅需要設(shè)置一個(gè)參數(shù)即可,即
props.put(“enable.idempotence”, ture),
或props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。
enable.idempotence被設(shè)置成true后,Producer自動(dòng)升級(jí)成冪等性Producer,其他所有的代碼邏輯都不需要改變。
Kafka自動(dòng)幫你做消息的重復(fù)去重。
底層具體的原理很簡(jiǎn)單,就是經(jīng)典的用空間去換時(shí)間的優(yōu)化思路,即在Broker端多保存一些字段。
當(dāng)Producer發(fā)送了具有相同字段值的消息后,Broker能夠自動(dòng)知曉這些消息已經(jīng)重復(fù)了,于是可以在后臺(tái)默默地把它們丟棄掉。
「冪等性Producer的作用范圍」
首先,它只能保證單分區(qū)上的冪等性,即一個(gè)冪等性Producer能夠保證某個(gè)主題的一個(gè)分區(qū)上不出現(xiàn)重復(fù)消息,它無法實(shí)現(xiàn)多個(gè)分區(qū)的冪等性。
其次,它只能實(shí)現(xiàn)單會(huì)話上的冪等性,不能實(shí)現(xiàn)跨會(huì)話的冪等性。
這里的會(huì)話,你可以理解為Producer進(jìn)程的一次運(yùn)行,當(dāng)你重啟了Producer進(jìn)程之后,這種冪等性保證就喪失了。
事務(wù)
Kafka自0.11版本開始也提供了對(duì)事務(wù)的支持,目前主要是在read committed隔離級(jí)別上做事情。
它能保證多條消息原子性地寫入到目標(biāo)分區(qū),同時(shí)也能保證Consumer只能看到事務(wù)成功提交的消息。
「事務(wù)型Producer」
事務(wù)型Producer能夠保證將消息原子性地寫入到多個(gè)分區(qū)中。
這批消息要么全部寫入成功,要么全部失敗,另外,事務(wù)型Producer也不懼進(jìn)程的重啟。
Producer重啟回來后,Kafka依然保證它們發(fā)送消息的精確一次處理。
設(shè)置事務(wù)型Producer的方法也很簡(jiǎn)單,滿足兩個(gè)要求即可:
和冪等性Producer一樣,開啟 enable.idempotence = true。設(shè)置Producer端參數(shù) transactional. id,最好為其設(shè)置一個(gè)有意義的名字。
此外,你還需要在Producer代碼中做一些調(diào)整,如這段代碼所示:
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
和普通Producer代碼相比,事務(wù)型Producer的顯著特點(diǎn)是調(diào)用了一些事務(wù)API,如initTransaction、beginTransaction、commitTransaction和abortTransaction,它們分別對(duì)應(yīng)事務(wù)的初始化、事務(wù)開始、事務(wù)提交以及事務(wù)終止。
這段代碼能夠保證Record1和Record2被當(dāng)作一個(gè)事務(wù)統(tǒng)一提交到Kafka,要么它們?nèi)刻峤怀晒?,要么全部寫入失敗?/p>
實(shí)際上即使寫入失敗,Kafka也會(huì)把它們寫入到底層的日志中,也就是說Consumer還是會(huì)看到這些消息。
有一個(gè)isolation.level參數(shù),這個(gè)參數(shù)有兩個(gè)取值:
read_uncommitted:這是默認(rèn)值,表明Consumer能夠讀取到Kafka寫入的任何消息,不論事務(wù)型Producer提交事務(wù)還是終止事務(wù),其寫入的消息都可以讀取,如果你用了事務(wù)型Producer,那么對(duì)應(yīng)的Consumer就不要使用這個(gè)值。read_committed:表明Consumer只會(huì)讀取事務(wù)型Producer成功提交事務(wù)寫入的消息,它也能看到非事務(wù)型Producer寫入的所有消息。
攔截器
「Kafka攔截器分為生產(chǎn)者攔截器和消費(fèi)者攔截器」。
生產(chǎn)者攔截器允許你在發(fā)送消息前以及消息提交成功后植入你的攔截器邏輯;
而消費(fèi)者攔截器支持在消費(fèi)消息前以及提交位移后編寫特定邏輯。
可以將一組攔截器串連成一個(gè)大的攔截器,Kafka會(huì)按照添加順序依次執(zhí)行攔截器邏輯。
當(dāng)前Kafka攔截器的設(shè)置方法是通過參數(shù)配置完成的,生產(chǎn)者和消費(fèi)者兩端有一個(gè)相同的參數(shù)interceptor.classes,它指定的是一組類的列表,每個(gè)類就是特定邏輯的攔截器實(shí)現(xiàn)類。
Properties props = new Properties();
List interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 攔截器1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 攔截器2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
……
?怎么編寫AddTimeStampInterceptor和UpdateCounterInterceptor類呢?
?
這兩個(gè)類以及你自己編寫的所有Producer端攔截器實(shí)現(xiàn)類都要繼承org.apache.kafka.clients.producer.ProducerInterceptor接口。
該接口是Kafka提供的,里面有兩個(gè)核心的方法。
onSend:該方法會(huì)在消息發(fā)送之前被調(diào)用。 onAcknowledgement:該方法會(huì)在消息成功提交或發(fā)送失敗之后被調(diào)用。onAcknowledgement的調(diào)用要早于callback的調(diào)用。值得注意的是,這個(gè)方法和onSend不是在同一個(gè)線程中被調(diào)用的,因此如果你在這兩個(gè)方法中調(diào)用了某個(gè)共享可變對(duì)象,一定要保證線程安全。
同理,指定消費(fèi)者攔截器也是同樣的方法,只是具體的實(shí)現(xiàn)類要實(shí)現(xiàn)org.apache.kafka.clients.consumer.ConsumerInterceptor接口,這里面也有兩個(gè)核心方法。
onConsume:該方法在消息返回給Consumer程序之前調(diào)用。 onCommit:Consumer在提交位移之后調(diào)用該方法。通常你可以在該方法中做一些記賬類的動(dòng)作,比如打日志等。
一定要注意的是,「指定攔截器類時(shí)要指定它們的全限定名」。
通俗點(diǎn)說就是要把完整包名也加上,不要只有一個(gè)類名在那里,并且還要保證你的Producer程序能夠正確加載你的攔截器類。
控制器
「控制器組件(Controller),它的主要作用是在Apache ZooKeeper的幫助下管理和協(xié)調(diào)整個(gè)Kafka集群」。
集群中任意一臺(tái)Broker都能充當(dāng)控制器的角色,但是,在運(yùn)行過程中,只能有一個(gè)Broker成為控制器,行使其管理和協(xié)調(diào)的職責(zé)。
Kafka控制器大量使用ZooKeeper的Watch功能實(shí)現(xiàn)對(duì)集群的協(xié)調(diào)管理。
「控制器是如何被選出來的」
實(shí)際上,Broker在啟動(dòng)時(shí),會(huì)嘗試去ZooKeeper中創(chuàng)建/controller節(jié)點(diǎn)。
Kafka當(dāng)前選舉控制器的規(guī)則是:「第一個(gè)成功創(chuàng)建/controller節(jié)點(diǎn)的Broker會(huì)被指定為控制器」。
「控制器是做什么的」
控制器的職責(zé)大致可以分為5種:
1.「主題管理(創(chuàng)建、刪除、增加分區(qū))」
控制器幫助我們完成對(duì)Kafka主題的創(chuàng)建、刪除以及分區(qū)增加的操作。
2.「分區(qū)重分配」
3.「Preferred領(lǐng)導(dǎo)者選舉」
Preferred領(lǐng)導(dǎo)者選舉主要是Kafka為了避免部分Broker負(fù)載過重而提供的一種換Leader的方案。
4.「集群成員管理(新增Broker、Broker主動(dòng)關(guān)閉、Broker宕機(jī))」
包括自動(dòng)檢測(cè)新增Broker、Broker主動(dòng)關(guān)閉及被動(dòng)宕機(jī)。
這種自動(dòng)檢測(cè)是依賴于Watch功能和ZooKeeper臨時(shí)節(jié)點(diǎn)組合實(shí)現(xiàn)的。
比如,控制器組件會(huì)利用「Watch機(jī)制」檢查ZooKeeper的/brokers/ids節(jié)點(diǎn)下的子節(jié)點(diǎn)數(shù)量變更。
目前,當(dāng)有新Broker啟動(dòng)后,它會(huì)在/brokers下創(chuàng)建專屬的znode節(jié)點(diǎn)。
一旦創(chuàng)建完畢,ZooKeeper會(huì)通過Watch機(jī)制將消息通知推送給控制器,這樣,控制器就能自動(dòng)地感知到這個(gè)變化,進(jìn)而開啟后續(xù)的新增Broker作業(yè)。
偵測(cè)Broker存活性則是依賴于剛剛提到的另一個(gè)機(jī)制:「臨時(shí)節(jié)點(diǎn)」。
每個(gè)Broker啟動(dòng)后,會(huì)在/brokers/ids下創(chuàng)建一個(gè)臨時(shí)znode。
當(dāng)Broker宕機(jī)或主動(dòng)關(guān)閉后,該Broker與ZooKeeper的會(huì)話結(jié)束,這個(gè)znode會(huì)被自動(dòng)刪除。
同理,ZooKeeper的Watch機(jī)制將這一變更推送給控制器,這樣控制器就能知道有Broker關(guān)閉或宕機(jī)了,從而進(jìn)行善后。
5.「數(shù)據(jù)服務(wù)」
控制器上保存了最全的集群元數(shù)據(jù)信息,其他所有Broker會(huì)定期接收控制器發(fā)來的元數(shù)據(jù)更新請(qǐng)求,從而更新其內(nèi)存中的緩存數(shù)據(jù)。
「控制器故障轉(zhuǎn)移(Failover)」
「故障轉(zhuǎn)移指的是,當(dāng)運(yùn)行中的控制器突然宕機(jī)或意外終止時(shí),Kafka能夠快速地感知到,并立即啟用備用控制器來代替之前失敗的控制器」。這個(gè)過程就被稱為Failover,該過程是自動(dòng)完成的,無需你手動(dòng)干預(yù)。

最開始時(shí),Broker 0是控制器。當(dāng)Broker 0宕機(jī)后,ZooKeeper通過Watch機(jī)制感知到并刪除了/controller臨時(shí)節(jié)點(diǎn)。
之后,所有存活的Broker開始競(jìng)選新的控制器身份。
Broker 3最終贏得了選舉,成功地在ZooKeeper上重建了/controller節(jié)點(diǎn)。
之后,Broker 3會(huì)從ZooKeeper中讀取集群元數(shù)據(jù)信息,并初始化到自己的緩存中。
至此,控制器的Failover完成,可以行使正常的工作職責(zé)了。
日志存儲(chǔ)
Kafka中的消息是以主題為基本單位進(jìn)行歸類的,每個(gè)主題在邏輯上相互獨(dú)立。
每個(gè)主題又可以分為一個(gè)或多個(gè)分區(qū),在不考慮副本的情況下,一個(gè)分區(qū)會(huì)對(duì)應(yīng)一個(gè)日志。
但設(shè)計(jì)者考慮到隨著時(shí)間推移,日志文件會(huì)不斷擴(kuò)大,因此為了防止Log過大,設(shè)計(jì)者引入了日志分段(LogSegment)的概念,將Log切分為多個(gè)LogSegment,便于后續(xù)的消息維護(hù)和清理工作。
下圖描繪了主題、分區(qū)、副本、Log、LogSegment五者之間的關(guān)系。

「LogSegment」
在Kafka中,每個(gè)Log對(duì)象又可以劃分為多個(gè)LogSegment文件,每個(gè)LogSegment文件包括一個(gè)日志數(shù)據(jù)文件和兩個(gè)索引文件(偏移量索引文件和消息時(shí)間戳索引文件)。
其中,每個(gè)LogSegment中的日志數(shù)據(jù)文件大小均相等(該日志數(shù)據(jù)文件的大小可以通過在Kafka Broker的config/server.properties配置文件的中的「log.segment.bytes」進(jìn)行設(shè)置,默認(rèn)為1G大?。?073741824字節(jié)),在順序?qū)懭胂r(shí)如果超出該設(shè)定的閾值,將會(huì)創(chuàng)建一組新的日志數(shù)據(jù)和索引文件)。

常用參數(shù)
「broker端配置」
broker.id
每個(gè) kafka broker 都有一個(gè)唯一的標(biāo)識(shí)來表示,這個(gè)唯一的標(biāo)識(shí)符即是 broker.id,它的默認(rèn)值是 0。
這個(gè)值在 kafka 集群中必須是唯一的,這個(gè)值可以任意設(shè)定,
port
如果使用配置樣本來啟動(dòng) kafka,它會(huì)監(jiān)聽 9092 端口,修改 port 配置參數(shù)可以把它設(shè)置成任意的端口。
要注意,如果使用 1024 以下的端口,需要使用 root 權(quán)限啟動(dòng) kakfa。
zookeeper.connect
用于保存 broker 元數(shù)據(jù)的 Zookeeper 地址是通過 zookeeper.connect 來指定的。
比如可以這么指定 localhost:2181 表示這個(gè) Zookeeper 是運(yùn)行在本地 2181 端口上的。
我們也可以通過 比如我們可以通過 zk1:2181,zk2:2181,zk3:2181 來指定 zookeeper.connect 的多個(gè)參數(shù)值。
該配置參數(shù)是用冒號(hào)分割的一組 hostname:port/path 列表,其含義如下
hostname 是 Zookeeper 服務(wù)器的機(jī)器名或者 ip 地址。
port 是 Zookeeper 客戶端的端口號(hào)
/path 是可選擇的 Zookeeper 路徑,Kafka 路徑是使用了
chroot環(huán)境,如果不指定默認(rèn)使用跟路徑。
?如果你有兩套 Kafka 集群,假設(shè)分別叫它們 kafka1 和 kafka2,那么兩套集群的
?zookeeper.connect參數(shù)可以這樣指定:zk1:2181,zk2:2181,zk3:2181/kafka1和zk1:2181,zk2:2181,zk3:2181/kafka2
log.dirs
Kafka 把所有的消息都保存到磁盤上,存放這些日志片段的目錄是通過 log.dirs 來制定的,它是用一組逗號(hào)來分割的本地系統(tǒng)路徑,log.dirs 是沒有默認(rèn)值的,「你必須手動(dòng)指定他的默認(rèn)值」。
其實(shí)還有一個(gè)參數(shù)是 log.dir,這個(gè)配置是沒有 s 的,默認(rèn)情況下只用配置 log.dirs 就好了,比如你可以通過 /home/kafka1,/home/kafka2,/home/kafka3 這樣來配置這個(gè)參數(shù)的值。
auto.create.topics.enable
默認(rèn)情況下,kafka 會(huì)自動(dòng)創(chuàng)建主題
auto.create.topics.enable參數(shù)建議最好設(shè)置成 false,即不允許自動(dòng)創(chuàng)建 Topic。
「主題相關(guān)配置」
num.partitions
num.partitions 參數(shù)指定了新創(chuàng)建的主題需要包含多少個(gè)分區(qū),該參數(shù)的默認(rèn)值是 1。
default.replication.factor
這個(gè)參數(shù)比較簡(jiǎn)單,它表示 kafka保存消息的副本數(shù)。
log.retention.ms
Kafka 通常根據(jù)時(shí)間來決定數(shù)據(jù)可以保留多久。
默認(rèn)使用log.retention.hours參數(shù)來配置時(shí)間,默認(rèn)是 168 個(gè)小時(shí),也就是一周。
除此之外,還有兩個(gè)參數(shù)log.retention.minutes 和log.retentiion.ms 。
這三個(gè)參數(shù)作用是一樣的,都是決定消息多久以后被刪除,推薦使用log.retention.ms。
message.max.bytes
broker 通過設(shè)置 message.max.bytes 參數(shù)來限制單個(gè)消息的大小,默認(rèn)是 1000 000, 也就是 1MB,如果生產(chǎn)者嘗試發(fā)送的消息超過這個(gè)大小,不僅消息不會(huì)被接收,還會(huì)收到 broker 返回的錯(cuò)誤消息。
retention.ms
規(guī)定了該主題消息被保存的時(shí)常,默認(rèn)是7天,即該主題只能保存7天的消息,一旦設(shè)置了這個(gè)值,它會(huì)覆蓋掉 Broker 端的全局參數(shù)值。
消息丟失問題
「生產(chǎn)者程序丟失數(shù)據(jù)」
目前Kafka Producer是異步發(fā)送消息的,也就是說如果你調(diào)用的是producer.send(msg)這個(gè)API,那么它通常會(huì)立即返回,但此時(shí)你不能認(rèn)為消息發(fā)送已成功完成。
如果用這個(gè)方式,可能會(huì)有哪些因素導(dǎo)致消息沒有發(fā)送成功呢?
其實(shí)原因有很多,例如網(wǎng)絡(luò)抖動(dòng),導(dǎo)致消息壓根就沒有發(fā)送到Broker端;或者消息本身不合格導(dǎo)致Broker拒絕接收(比如消息太大了,超過了Broker的承受能力)等。
實(shí)際上,解決此問題的方法非常簡(jiǎn)單:Producer永遠(yuǎn)要使用帶有回調(diào)通知的發(fā)送API,也就是說不要使用producer.send(msg),而要使用producer.send(msg, callback)。
它能準(zhǔn)確地告訴你消息是否真的提交成功了。
一旦出現(xiàn)消息提交失敗的情況,你就可以有針對(duì)性地進(jìn)行處理。
「消費(fèi)者程序丟失數(shù)據(jù)」
Consumer端丟失數(shù)據(jù)主要體現(xiàn)在Consumer端要消費(fèi)的消息不見了。
下面這張圖它清晰地展示了Consumer端的位移數(shù)據(jù)。

比如對(duì)于Consumer A而言,它當(dāng)前的位移值就是9;Consumer B的位移值是11。
Consumer程序從Kafka獲取到消息后開啟了多個(gè)線程異步處理消息,而Consumer程序自動(dòng)地向前更新位移。
假如其中某個(gè)線程運(yùn)行失敗了,它負(fù)責(zé)的消息沒有被成功處理,但位移已經(jīng)被更新了,因此這條消息對(duì)于Consumer而言實(shí)際上是丟失了。
這里的關(guān)鍵在于Consumer自動(dòng)提交位移。
這個(gè)問題的解決方案也很簡(jiǎn)單:
「如果是多線程異步處理消費(fèi)消息,Consumer程序不要開啟自動(dòng)提交位移,而是要應(yīng)用程序手動(dòng)提交位移」。
最佳實(shí)踐
總結(jié)Kafka無消息丟失的配置:
不要使用 producer.send(msg),而要使用producer.send(msg, callback),一定要使用帶有回調(diào)通知的send方法。設(shè)置 acks = all,acks是Producer的一個(gè)參數(shù),代表了你對(duì)已提交消息的定義,如果設(shè)置成all,則表明所有副本Broker都要接收到消息,該消息才算是已提交。設(shè)置retries為一個(gè)較大的值。這里的retries同樣是Producer的參數(shù),對(duì)應(yīng)前面提到的Producer自動(dòng)重試,當(dāng)出現(xiàn)網(wǎng)絡(luò)的瞬時(shí)抖動(dòng)時(shí),消息發(fā)送可能會(huì)失敗,此時(shí)配置了 retries > 0的Producer能夠自動(dòng)重試消息發(fā)送,避免消息丟失。設(shè)置 unclean.leader.election.enable = false,這是Broker端的參數(shù),它控制的是哪些Broker有資格競(jìng)選分區(qū)的Leader,如果一個(gè)Broker落后原先的Leader太多,那么它一旦成為新的Leader,必然會(huì)造成消息的丟失,故一般都要將該參數(shù)設(shè)置成false,即不允許這種情況的發(fā)生。設(shè)置 replication.factor >= 3,這也是Broker端的參數(shù),將消息多保存幾份,目前防止消息丟失的主要機(jī)制就是冗余。設(shè)置 min.insync.replicas > 1,這依然是Broker端參數(shù),控制的是消息至少要被寫入到多少個(gè)副本才算是已提交,設(shè)置成大于1可以提升消息持久性,在實(shí)際環(huán)境中千萬不要使用默認(rèn)值1。確保 replication.factor > min.insync.replicas,如果兩者相等,那么只要有一個(gè)副本掛機(jī),整個(gè)分區(qū)就無法正常工作了,我們不僅要改善消息的持久性,防止數(shù)據(jù)丟失,還要在不降低可用性的基礎(chǔ)上完成,推薦設(shè)置成replication.factor = min.insync.replicas + 1。確保消息消費(fèi)完成再提交,Consumer端有個(gè)參數(shù) enable.auto.commit,最好把它設(shè)置成false,并采用手動(dòng)提交位移的方式。
重復(fù)消費(fèi)問題
「消費(fèi)重復(fù)的場(chǎng)景」
在enable.auto.commit 默認(rèn)值true情況下,出現(xiàn)重復(fù)消費(fèi)的場(chǎng)景有以下幾種:
?consumer 在消費(fèi)過程中,應(yīng)用進(jìn)程被強(qiáng)制kill掉或發(fā)生異常退出。
?
例如在一次poll 500條消息后,消費(fèi)到200條時(shí),進(jìn)程被強(qiáng)制kill消費(fèi)到offset未提交,或出現(xiàn)異常退出導(dǎo)致消費(fèi)到offset未提交。
下次重啟時(shí),依然會(huì)重新拉取500消息,造成之前消費(fèi)到200條消息重復(fù)消費(fèi)了兩次。
解決方案:在發(fā)生異常時(shí)正確處理未提交的offset
「消費(fèi)者消費(fèi)時(shí)間過長(zhǎng)」
max.poll.interval.ms參數(shù)定義了兩次poll的最大間隔,它的默認(rèn)值是 5 分鐘,表示你的 Consumer 程序如果在 5 分鐘之內(nèi)無法消費(fèi)完 poll 方法返回的消息,那么 Consumer 會(huì)主動(dòng)發(fā)起離開組的請(qǐng)求,Coordinator 也會(huì)開啟新一輪 Rebalance。
舉例:?jiǎn)未卫?1條消息,每條消息耗時(shí)30s,11條消息耗時(shí)5分鐘30秒,由于max.poll.interval.ms 默認(rèn)值5分鐘,所以消費(fèi)者無法在5分鐘內(nèi)消費(fèi)完,consumer會(huì)離開組,導(dǎo)致rebalance。
在消費(fèi)完11條消息后,consumer會(huì)重新連接broker,再次rebalance,因?yàn)樯洗蜗M(fèi)的offset未提交,再次拉取的消息是之前消費(fèi)過的消息,造成重復(fù)消費(fèi)。
「解決方案:」
1、提高消費(fèi)能力,提高單條消息的處理速度;根據(jù)實(shí)際場(chǎng)景可講max.poll.interval.ms值設(shè)置大一點(diǎn),避免不必要的rebalance;可適當(dāng)減小max.poll.records的值,默認(rèn)值是500,可根據(jù)實(shí)際消息速率適當(dāng)調(diào)小。
2、生成消息時(shí),可加入唯一標(biāo)識(shí)符如消息id,在消費(fèi)端,保存最近的1000條消息id存入到redis或mysql中,消費(fèi)的消息時(shí)通過前置去重。
消息順序問題
我們都知道kafka的topic是無序的,但是一個(gè)topic包含多個(gè)partition,每個(gè)partition內(nèi)部是有序的

「亂序場(chǎng)景1」
因?yàn)橐粋€(gè)topic可以有多個(gè)partition,kafka只能保證partition內(nèi)部有序
「解決方案」
1、可以設(shè)置topic,有且只有一個(gè)partition
2、根據(jù)業(yè)務(wù)需要,需要順序的 指定為同一個(gè)partition
3、根據(jù)業(yè)務(wù)需要,比如同一個(gè)訂單,使用同一個(gè)key,可以保證分配到同一個(gè)partition上
「亂序場(chǎng)景2」
對(duì)于同一業(yè)務(wù)進(jìn)入了同一個(gè)消費(fèi)者組之后,用了多線程來處理消息,會(huì)導(dǎo)致消息的亂序
「解決方案」
消費(fèi)者內(nèi)部根據(jù)線程數(shù)量創(chuàng)建等量的內(nèi)存隊(duì)列,對(duì)于需要順序的一系列業(yè)務(wù)數(shù)據(jù),根據(jù)key或者業(yè)務(wù)數(shù)據(jù),放到同一個(gè)內(nèi)存隊(duì)列中,然后線程從對(duì)應(yīng)的內(nèi)存隊(duì)列中取出并操作

「通過設(shè)置相同key來保證消息有序性,會(huì)有一點(diǎn)缺陷:」
例如消息發(fā)送設(shè)置了重試機(jī)制,并且異步發(fā)送,消息A和B設(shè)置相同的key,業(yè)務(wù)上A先發(fā),B后發(fā),由于網(wǎng)絡(luò)或者其他原因A發(fā)送失敗,B發(fā)送成功;A由于發(fā)送失敗就會(huì)重試且重試成功,這時(shí)候消息順序B在前A在后,與業(yè)務(wù)發(fā)送順序不一致,如果需要解決這個(gè)問題,需要設(shè)置參數(shù)max.in.flight.requests.per.connection=1,其含義是限制客戶端在單個(gè)連接上能夠發(fā)送的未響應(yīng)請(qǐng)求的個(gè)數(shù),設(shè)置此值是1表示kafka broker在響應(yīng)請(qǐng)求之前client不能再向同一個(gè)broker發(fā)送請(qǐng)求,這個(gè)參數(shù)默認(rèn)值是5
?官方文檔說明,這個(gè)參數(shù)如果大于1,由于重試消息順序可能重排
?
高性能原因
「順序讀寫」
kafka的消息是不斷追加到文件中的,這個(gè)特性使kafka可以充分利用磁盤的順序讀寫性能
順序讀寫不需要硬盤磁頭的尋道時(shí)間,只需很少的扇區(qū)旋轉(zhuǎn)時(shí)間,所以速度遠(yuǎn)快于隨機(jī)讀寫
Kafka 可以配置異步刷盤,不開啟同步刷盤,異步刷盤不需要等寫入磁盤后返回消息投遞的 ACK,所以它提高了消息發(fā)送的吞吐量,降低了請(qǐng)求的延時(shí)
「零拷貝」
傳統(tǒng)的 IO 流程,需要先把數(shù)據(jù)拷貝到內(nèi)核緩沖區(qū),再從內(nèi)核緩沖拷貝到用戶空間,應(yīng)用程序處理完成以后,再拷貝回內(nèi)核緩沖區(qū)
這個(gè)過程中發(fā)生了多次數(shù)據(jù)拷貝
為了減少不必要的拷貝,Kafka 依賴 Linux 內(nèi)核提供的 Sendfile 系統(tǒng)調(diào)用
在 Sendfile 方法中,數(shù)據(jù)在內(nèi)核緩沖區(qū)完成輸入和輸出,不需要拷貝到用戶空間處理,這也就避免了重復(fù)的數(shù)據(jù)拷貝
在具體的操作中,Kafka 把所有的消息都存放在單獨(dú)的文件里,在消息投遞時(shí)直接通過 Sendfile 方法發(fā)送文件,減少了上下文切換,因此大大提高了性能
「MMAP技術(shù)」
除了 Sendfile 之外,還有一種零拷貝的實(shí)現(xiàn)技術(shù),即 Memory Mapped Files
Kafka 使用 Memory Mapped Files 完成內(nèi)存映射,Memory Mapped Files 對(duì)文件的操作不是 write/read,而是直接對(duì)內(nèi)存地址的操作,如果是調(diào)用文件的 read 操作,則把數(shù)據(jù)先讀取到內(nèi)核空間中,然后再復(fù)制到用戶空間,但 MMAP可以將文件直接映射到用戶態(tài)的內(nèi)存空間,省去了用戶空間到內(nèi)核空間復(fù)制的開銷
Producer生產(chǎn)的數(shù)據(jù)持久化到broker,采用mmap文件映射,實(shí)現(xiàn)順序的快速寫入
Customer從broker讀取數(shù)據(jù),采用sendfile,將磁盤文件讀到OS內(nèi)核緩沖區(qū)后,直接轉(zhuǎn)到socket buffer進(jìn)行網(wǎng)絡(luò)發(fā)送。
「批量發(fā)送讀取」
Kafka 的批量包括批量寫入、批量發(fā)布等。它在消息投遞時(shí)會(huì)將消息緩存起來,然后批量發(fā)送
同樣,消費(fèi)端在消費(fèi)消息時(shí),也不是一條一條處理的,而是批量進(jìn)行拉取,提高了消息的處理速度
「數(shù)據(jù)壓縮」
Kafka還支持對(duì)消息集合進(jìn)行壓縮,Producer可以通過GZIP或Snappy格式對(duì)消息集合進(jìn)行壓縮
壓縮的好處就是減少傳輸?shù)臄?shù)據(jù)量,減輕對(duì)網(wǎng)絡(luò)傳輸?shù)膲毫?/p>
Producer壓縮之后,在Consumer需進(jìn)行解壓,雖然增加了CPU的工作,但在對(duì)大數(shù)據(jù)處理上,瓶頸在網(wǎng)絡(luò)上而不是CPU,所以這個(gè)成本很值得
「分區(qū)機(jī)制」
kafka中的topic中的內(nèi)容可以被分為多partition存在,每個(gè)partition又分為多個(gè)段segment,所以每次操作都是針對(duì)一小部分做操作,很輕便,并且增加并行操作的能力
常見面試題
「Kafka是Push還是Pull模式?」
Kafka最初考慮的問題是,customer應(yīng)該從brokes拉取消息還是brokers將消息推送到consumer。
在這方面,Kafka遵循了一種大部分消息系統(tǒng)共同的傳統(tǒng)的設(shè)計(jì):producer將消息推送到broker,consumer從broker拉取消息。
push模式由broker決定消息推送的速率,對(duì)于不同消費(fèi)速率的consumer就不太好處理了。
消息系統(tǒng)都致力于讓consumer以最大的速率最快速的消費(fèi)消息,push模式下,當(dāng)broker推送的速率遠(yuǎn)大于consumer消費(fèi)的速率時(shí),consumer恐怕就要崩潰了。
?Kafka中的Producer和Consumer采用的是Push-and-Pull模式,即Producer向Broker Push消息,Consumer從Broker Pull消息。
?
Pull模式的一個(gè)好處是consumer可以自主決定是否批量的從broker拉取數(shù)據(jù)。
Pull有個(gè)缺點(diǎn)是,如果broker沒有可供消費(fèi)的消息,將導(dǎo)致consumer不斷在循環(huán)中輪詢,直到新消息到達(dá)。
「Kafka如何保證高可用?」
「Kafk的使用場(chǎng)景」
業(yè)界Kafka實(shí)際應(yīng)用場(chǎng)景
?異步通信
?
消息中間件在異步通信中用的最多,很多業(yè)務(wù)流程中,如果所有步驟都同步進(jìn)行可能會(huì)導(dǎo)致核心流程耗時(shí)非常長(zhǎng),更重要的是所有步驟都同步進(jìn)行一旦非核心步驟失敗會(huì)導(dǎo)致核心流程整體失敗,因此在很多業(yè)務(wù)流程中Kafka就充當(dāng)了異步通信角色。
?日志同步
?
大規(guī)模分布式系統(tǒng)中的機(jī)器非常多而且分散在不同機(jī)房中,分布式系統(tǒng)帶來的一個(gè)明顯問題就是業(yè)務(wù)日志的查看、追蹤和分析等行為變得十分困難,對(duì)于集群規(guī)模在百臺(tái)以上的系統(tǒng),查詢線上日志很恐怖。
為了應(yīng)對(duì)這種場(chǎng)景統(tǒng)一日志系統(tǒng)應(yīng)運(yùn)而生,日志數(shù)據(jù)都是海量數(shù)據(jù),通常為了不給系統(tǒng)帶來額外負(fù)擔(dān)一般會(huì)采用異步上報(bào),這里Kafka以其高吞吐量在日志處理中得到了很好的應(yīng)用。
?實(shí)時(shí)計(jì)算
?
隨著據(jù)量的增加,離線的計(jì)算會(huì)越來越慢,難以滿足用戶在某些場(chǎng)景下的實(shí)時(shí)性要求,因此很多解決方案中引入了實(shí)時(shí)計(jì)算。
很多時(shí)候,即使是海量數(shù)據(jù),我們也希望即時(shí)去查看一些數(shù)據(jù)指標(biāo),實(shí)時(shí)流計(jì)算應(yīng)運(yùn)而生。
實(shí)時(shí)流計(jì)算有兩個(gè)特點(diǎn),一個(gè)是實(shí)時(shí),隨時(shí)可以看數(shù)據(jù);另一個(gè)是流。
