Kafka核心知識總結(jié)!
前言
文章已經(jīng)同步到個人網(wǎng)站:http://xiaoflyfish.cn/
「文章較長,可以點贊在看,謝謝,謝謝」
覺得不錯,可以關(guān)注一下「公眾號(月伴飛魚)」,之后會不定期分享系列文章

基本簡介
Apache Kafka是由LinkedIn采用Scala和Java開發(fā)的開源流處理軟件平臺,并捐贈給了Apache Software Foundation。
該項目旨在提供統(tǒng)一的、高吞吐量、低延遲的平臺來處理實時數(shù)據(jù)流。
Kafka可以通過Kafka Connect連接到外部系統(tǒng),并提供了Kafka Streams。
「Kafka的特性」
Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng),主要特性如下:
| 特性 | 分布式 | 「高性能」 | 「持久性和擴展性」 |
|---|---|---|---|
| 描述 | 多分區(qū) | 高吞吐量 | 數(shù)據(jù)可持久化 |
| 多副本 | 低延遲 | 容錯性 | |
| 多訂閱者 | 高并發(fā) | 支持水平在線擴展 | |
| 基于ZooKeeper調(diào)度 | 時間復(fù)雜度為O(1) | 消息自動平衡 |
版本號
「Kafka版本命名」
我們在官網(wǎng)上下載Kafka時,會看到這樣的版本:

前面的版本號是編譯Kafka源代碼的Scala編譯器版本。
Kafka服務(wù)器端的代碼完全由Scala語言編寫,Scala同時支持面向?qū)ο缶幊毯秃瘮?shù)式編程,用Scala寫成的源代碼編譯之后也是普通的.class文件,因此我們說Scala是JVM系的語言。
真正的Kafka版本號實際上是2.1.1。
?那么這個2.1.1又表示什么呢?
?
前面的2表示大版本號,即Major Version;中間的1表示小版本號或次版本號,即Minor Version;最后的1表示修訂版本號,也就是Patch號。
Kafka社區(qū)在發(fā)布1.0.0版本后寫過一篇文章,宣布Kafka版本命名規(guī)則正式從4位演進到3位,比如0.11.0.0版本就是4位版本號。
有個建議,不論用的是哪個版本,都請盡量保持服務(wù)器端版本和客戶端版本一致,否則你將損失很多Kafka為你提供的性能優(yōu)化收益。
「版本演進」
0.7版本:只提供了最基礎(chǔ)的消息隊列功能。
0.8版本:引入了副本機制,至此kafka成為了一個整整意義上完備的分布式可靠消息隊列解決方案
0.9.0.0版本:增加了基礎(chǔ)的安全認(rèn)證/權(quán)限功能;使用Java重新了新版本消費者API;引入了Kafka Connect組件。
0.11.0.0版本:提供了冪等性Producer API以及事務(wù)API;對Kafka消息格式做了重構(gòu)。
1.0和2.0版本:主要還是Kafka Streams的各種改進
基本概念

「主題」
發(fā)布訂閱的對象是主題(Topic),可以為每 個業(yè)務(wù)、每個應(yīng)用甚至是每類數(shù)據(jù)都創(chuàng)建專屬的主題
「生產(chǎn)者和消費者」
向主題發(fā)布消息的客戶端應(yīng)用程序稱為生產(chǎn)者,生產(chǎn)者程序通常持續(xù)不斷地 向一個或多個主題發(fā)送消息
訂閱這些主題消息的客戶端應(yīng)用程序就被稱為消費者,消費者也能夠同時訂閱多個主題的消息
「Broker」
集群由多個 Broker 組成,Broker 負(fù)責(zé)接收和處理客戶端發(fā)送過來的請求,以及對消息進行持久化
雖然多個 Broker 進程能夠運行在同一臺機器上,但更常見的做法是將 不同的 Broker 分散運行在不同的機器上,這樣如果集群中某一臺機器宕機,即使在它上面 運行的所有 Broker 進程都掛掉了,其他機器上的 Broker 也依然能夠?qū)ν馓峁┓?wù)
「備份機制」
備份的思想很簡單,就是把相同的數(shù)據(jù)拷貝到多臺機器上,而這些相同的數(shù)據(jù)拷貝被稱為副本
定義了兩類副本:領(lǐng)導(dǎo)者副本和追隨者副本
前者對外提供服務(wù),這里的對外指的是與 客戶端程序進行交互;而后者只是被動地追隨領(lǐng)導(dǎo)者副本而已,不能與外界進行交互
「分區(qū)」
分區(qū)機制指的是將每個主題劃分成多個分區(qū),每個分區(qū)是一組有序的消息日志
生產(chǎn)者生產(chǎn)的每條消息只會被發(fā)送到一個分區(qū)中,也就是說如果向一個雙分區(qū)的主題發(fā)送一條消息,這條消息要么在分區(qū) 0 中,要么在分區(qū) 1 中
每個分區(qū)下可以配置若干個副本,其中只能有 1 個領(lǐng) 導(dǎo)者副本和 N-1 個追隨者副本
生產(chǎn)者向分區(qū)寫入消息,每條消息在分區(qū)中的位置信息叫位移
「消費者組」
多個消費者實例共同組成一個組來 消費一組主題
這組主題中的每個分區(qū)都只會被組內(nèi)的一個消費者實例消費,其他消費者實例不能消費它
?同時實現(xiàn)了傳統(tǒng)消息引擎系統(tǒng)的兩大模型:
?
如果所有實例都屬于同一個 Group, 那么它實現(xiàn)的就是消息隊列模型;
如果所有實例分別屬于不 同的 Group,那么它實現(xiàn)的就是發(fā)布/訂閱模型
「Coordinator:協(xié)調(diào)者」
所謂協(xié)調(diào)者,它專門為Consumer Group服務(wù),負(fù)責(zé)為Group執(zhí)行Rebalance以及提供位移管理和組成員管理等。
具體來講,Consumer端應(yīng)用程序在提交位移時,其實是向Coordinator所在的Broker提交位移,同樣地,當(dāng)Consumer應(yīng)用啟動時,也是向Coordinator所在的Broker發(fā)送各種請求,然后由Coordinator負(fù)責(zé)執(zhí)行消費者組的注冊、成員管理記錄等元數(shù)據(jù)管理操作。
所有Broker在啟動時,都會創(chuàng)建和開啟相應(yīng)的Coordinator組件。
也就是說,「所有Broker都有各自的Coordinator組件」。
那么,Consumer Group如何確定為它服務(wù)的Coordinator在哪臺Broker上呢?
通過Kafka內(nèi)部主題__consumer_offsets。
目前,Kafka為某個Consumer Group確定Coordinator所在的Broker的算法有2個步驟。
第1步:確定由
__consumer_offsets主題的哪個分區(qū)來保存該Group數(shù)據(jù):partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。第2步:找出該分區(qū)Leader副本所在的Broker,該Broker即為對應(yīng)的Coordinator。
首先,Kafka會計算該Group的group.id參數(shù)的哈希值。
比如你有個Group的group.id設(shè)置成了test-group,那么它的hashCode值就應(yīng)該是627841412。
其次,Kafka會計算__consumer_offsets的分區(qū)數(shù),通常是50個分區(qū),之后將剛才那個哈希值對分區(qū)數(shù)進行取模加求絕對值計算,即abs(627841412 % 50) = 12。
此時,我們就知道了__consumer_offsets主題的分區(qū)12負(fù)責(zé)保存這個Group的數(shù)據(jù)。
有了分區(qū)號,我們只需要找出__consumer_offsets主題分區(qū)12的Leader副本在哪個Broker上就可以了,這個Broker,就是我們要找的Coordinator。
「消費者位移:Consumer Offset」
消費者消費進度,每個消費者都有自己的消費者位移。
「重平衡:Rebalance」
消費者組內(nèi)某個消費者實例掛掉后,其他消費者實例自動重新分配訂閱主題分區(qū)的過程。
Rebalance是Kafka消費者端實現(xiàn)高可用的重要手段。
「AR(Assigned Replicas)」:分區(qū)中的所有副本統(tǒng)稱為AR。
所有消息會先發(fā)送到leader副本,然后follower副本才能從leader中拉取消息進行同步。
但是在同步期間,follower對于leader而言會有一定程度的滯后,這個時候follower和leader并非完全同步狀態(tài)
「OSR(Out Sync Replicas)」:follower副本與leader副本沒有完全同步或滯后的副本集合
「ISR(In Sync Replicas):「AR中的一個子集,ISR中的副本都」是與leader保持完全同步的副本」,如果某個在ISR中的follower副本落后于leader副本太多,則會被從ISR中移除,否則如果完全同步,會從OSR中移至ISR集合。
在默認(rèn)情況下,當(dāng)leader副本發(fā)生故障時,只有在ISR集合中的follower副本才有資格被選舉為新leader,而OSR中的副本沒有機會(可以通過unclean.leader.election.enable進行配置)
「HW(High Watermark)」:高水位,它標(biāo)識了一個特定的消息偏移量(offset),消費者只能拉取到這個水位 offset 之前的消息
下圖表示一個日志文件,這個日志文件中只有9條消息,第一條消息的offset(LogStartOffset)為0,最有一條消息的offset為8,offset為9的消息使用虛線表示的,代表下一條待寫入的消息。
日志文件的 HW 為6,表示消費者只能拉取offset在 0 到 5 之間的消息,offset為6的消息對消費者而言是不可見的。

「LEO(Log End Offset)」:標(biāo)識當(dāng)前日志文件中下一條待寫入的消息的offset
上圖中offset為9的位置即為當(dāng)前日志文件的 LEO,LEO 的大小相當(dāng)于當(dāng)前日志分區(qū)中最后一條消息的offset值加1
分區(qū) ISR 集合中的每個副本都會維護自身的 LEO ,而 ISR 集合中最小的 LEO 即為分區(qū)的 HW,對消費者而言只能消費 HW 之前的消息。
系統(tǒng)架構(gòu)
「kafka設(shè)計思想」
一個最基本的架構(gòu)是生產(chǎn)者發(fā)布一個消息到Kafka的一個Topic ,該Topic的消息存放于的Broker中,消費者訂閱這個Topic,然后從Broker中消費消息,下面這個圖可以更直觀的描述這個場景:

「消息狀態(tài):」 在Kafka中,消息是否被消費的狀態(tài)保存在Consumer中,Broker不會關(guān)心消息是否被消費或被誰消費,Consumer會記錄一個offset值(指向partition中下一條將要被消費的消息位置),如果offset被錯誤設(shè)置可能導(dǎo)致同一條消息被多次消費或者消息丟失。
「消息持久化:」 Kafka會把消息持久化到本地文件系統(tǒng)中,并且具有極高的性能。
「批量發(fā)送:」 Kafka支持以消息集合為單位進行批量發(fā)送,以提高效率。
「Push-and-Pull:」 Kafka中的Producer和Consumer采用的是Push-and-Pull模式,即Producer向Broker Push消息,Consumer從Broker Pull消息。
「分區(qū)機制(Partition):」 Kafka的Broker端支持消息分區(qū),Producer可以決定把消息發(fā)到哪個Partition,在一個Partition中消息的順序就是Producer發(fā)送消息的順序,一個Topic中的Partition數(shù)是可配置的,Partition是Kafka高吞吐量的重要保證。
「系統(tǒng)架構(gòu)」

通常情況下,一個kafka體系架構(gòu)包括「多個Producer」、「多個Consumer」、「多個broker」以及「一個Zookeeper集群」。
「Producer」:生產(chǎn)者,負(fù)責(zé)將消息發(fā)送到kafka中。
「Consumer」:消費者,負(fù)責(zé)從kafka中拉取消息進行消費。
「Broker」:Kafka服務(wù)節(jié)點,一個或多個Broker組成了一個Kafka集群
「Zookeeper集群」:負(fù)責(zé)管理kafka集群元數(shù)據(jù)以及控制器選舉等。
生產(chǎn)者分區(qū)
「為什么分區(qū)?」
Kafka的消息組織方式實際上是三級結(jié)構(gòu):主題-分區(qū)-消息。
主題下的每條消息只會保存在某一個分區(qū)中,而不會在多個分區(qū)中被保存多份。
其實分區(qū)的作用就是提供負(fù)載均衡的能力,或者說對數(shù)據(jù)進行分區(qū)的主要原因,就是為了實現(xiàn)系統(tǒng)的高伸縮性(Scalability)。
不同的分區(qū)能夠被放置到不同節(jié)點的機器上,而數(shù)據(jù)的讀寫操作也都是針對分區(qū)這個粒度而進行的,這樣每個節(jié)點的機器都能獨立地執(zhí)行各自分區(qū)的讀寫請求處理,并且,我們還可以通過添加新的節(jié)點機器來增加整體系統(tǒng)的吞吐量。
「都有哪些分區(qū)策略?」
「所謂分區(qū)策略是決定生產(chǎn)者將消息發(fā)送到哪個分區(qū)的算法。」
Kafka為我們提供了默認(rèn)的分區(qū)策略,同時它也支持你自定義分區(qū)策略。
「自定義分區(qū)策略」
如果要自定義分區(qū)策略,你需要顯式地配置生產(chǎn)者端的參數(shù)partitioner.class。
在編寫生產(chǎn)者程序時,你可以編寫一個具體的類實現(xiàn)org.apache.kafka.clients.producer.Partitioner接口。
這個接口也很簡單,只定義了兩個方法:partition()和close(),通常你只需要實現(xiàn)最重要的partition方法。
我們來看看這個方法的方法簽名:
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
這里的topic、key、keyBytes、value和valueBytes都屬于消息數(shù)據(jù),cluster則是集群信息(比如當(dāng)前Kafka集群共有多少主題、多少Broker等)。
Kafka給你這么多信息,就是希望讓你能夠充分地利用這些信息對消息進行分區(qū),計算出它要被發(fā)送到哪個分區(qū)中。
只要你自己的實現(xiàn)類定義好了partition方法,同時設(shè)置partitioner.class參數(shù)為你自己實現(xiàn)類的Full Qualified Name,那么生產(chǎn)者程序就會按照你的代碼邏輯對消息進行分區(qū)。
「輪詢策略」
也稱Round-robin策略,即順序分配。
比如一個主題下有3個分區(qū),那么第一條消息被發(fā)送到分區(qū)0,第二條被發(fā)送到分區(qū)1,第三條被發(fā)送到分區(qū)2,以此類推。當(dāng)生產(chǎn)第4條消息時又會重新開始,即將其分配到分區(qū)0
這就是所謂的輪詢策略。輪詢策略是Kafka Java生產(chǎn)者API默認(rèn)提供的分區(qū)策略。
「輪詢策略有非常優(yōu)秀的負(fù)載均衡表現(xiàn),它總是能保證消息最大限度地被平均分配到所有分區(qū)上,故默認(rèn)情況下它是最合理的分區(qū)策略,也是我們最常用的分區(qū)策略之一。」
「隨機策略」
也稱Randomness策略。所謂隨機就是我們隨意地將消息放置到任意一個分區(qū)上。
如果要實現(xiàn)隨機策略版的partition方法,很簡單,只需要兩行代碼即可:
List partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
先計算出該主題總的分區(qū)數(shù),然后隨機地返回一個小于它的正整數(shù)。
本質(zhì)上看隨機策略也是力求將數(shù)據(jù)均勻地打散到各個分區(qū),但從實際表現(xiàn)來看,它要遜于輪詢策略,所以「如果追求數(shù)據(jù)的均勻分布,還是使用輪詢策略比較好」。事實上,隨機策略是老版本生產(chǎn)者使用的分區(qū)策略,在新版本中已經(jīng)改為輪詢了。
「按消息鍵保序策略」
Kafka允許為每條消息定義消息鍵,簡稱為Key。
這個Key的作用非常大,它可以是一個有著明確業(yè)務(wù)含義的字符串,比如客戶代碼、部門編號或是業(yè)務(wù)ID等;也可以用來表征消息元數(shù)據(jù)。
特別是在Kafka不支持時間戳的年代,在一些場景中,工程師們都是直接將消息創(chuàng)建時間封裝進Key里面的。
一旦消息被定義了Key,那么你就可以保證同一個Key的所有消息都進入到相同的分區(qū)里面,由于每個分區(qū)下的消息處理都是有順序的,故這個策略被稱為按消息鍵保序策略
實現(xiàn)這個策略的partition方法同樣簡單,只需要下面兩行代碼即可:
List partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
前面提到的Kafka默認(rèn)分區(qū)策略實際上同時實現(xiàn)了兩種策略:如果指定了Key,那么默認(rèn)實現(xiàn)按消息鍵保序策略;如果沒有指定Key,則使用輪詢策略。
「其他分區(qū)策略」
其實還有一種比較常見的,即所謂的基于地理位置的分區(qū)策略。
當(dāng)然這種策略一般只針對那些大規(guī)模的Kafka集群,特別是跨城市、跨國家甚至是跨大洲的集群。
我們可以根據(jù)Broker所在的IP地址實現(xiàn)定制化的分區(qū)策略。比如下面這段代碼:
List partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
我們可以從所有分區(qū)中找出那些Leader副本在南方的所有分區(qū),然后隨機挑選一個進行消息發(fā)送。
生產(chǎn)者壓縮算法
「Kafka是如何壓縮消息的呢?」
目前Kafka共有兩大類消息格式,社區(qū)分別稱之為V1版本和V2版本。
V2版本是Kafka 0.11.0.0中正式引入的。
不論是哪個版本,Kafka的消息層次都分為兩層:消息集合以及消息。
一個消息集合中包含若干條日志項,而日志項才是真正封裝消息的地方。
Kafka底層的消息日志由一系列消息集合日志項組成。
Kafka通常不會直接操作具體的一條條消息,它總是在消息集合這個層面上進行寫入操作。
「那么社區(qū)引入V2版本的目的是什么呢?」
V2版本主要是針對V1版本的一些弊端做了修正,比如把消息的公共部分抽取出來放到外層消息集合里面,這樣就不用每條消息都保存這些信息了。
舉個例子:原來在V1版本中,每條消息都需要執(zhí)行CRC校驗,但有些情況下消息的CRC值是會發(fā)生變化的。
比如在Broker端可能會對消息時間戳字段進行更新,那么重新計算之后的CRC值也會相應(yīng)更新;再比如Broker端在執(zhí)行消息格式轉(zhuǎn)換時(主要是為了兼容老版本客戶端程序),也會帶來CRC值的變化。
鑒于這些情況,再對每條消息都執(zhí)行CRC校驗就有點沒必要了,不僅浪費空間還耽誤CPU時間,因此在V2版本中,消息的CRC校驗工作就被移到了消息集合這一層。
V2版本還有一個和壓縮息息相關(guān)的改進,就是保存壓縮消息的方法發(fā)生了變化。
之前V1版本中保存壓縮消息的方法是把多條消息進行壓縮然后保存到外層消息的消息體字段中;而V2版本的做法是對整個消息集合進行壓縮,顯然后者應(yīng)該比前者有更好的壓縮效果。
「何時壓縮?」
在Kafka中,壓縮可能發(fā)生在兩個地方:生產(chǎn)者端和Broker端。
生產(chǎn)者程序中配置compression.type參數(shù)即表示啟用指定類型的壓縮算法。
比如下面這段程序代碼展示了如何構(gòu)建一個開啟GZIP的Producer對象:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 開啟GZIP壓縮
props.put("compression.type", "gzip");
Producer producer = new KafkaProducer<>(props);
這里比較關(guān)鍵的代碼行是props.put(“compression.type”, “gzip”),它表明該Producer的壓縮算法使用的是GZIP。
這樣Producer啟動后生產(chǎn)的每個消息集合都是經(jīng)GZIP壓縮過的,故而能很好地節(jié)省網(wǎng)絡(luò)傳輸帶寬以及Kafka Broker端的磁盤占用。
有兩種例外情況就可能讓Broker重新壓縮消息:
「情況一:Broker端指定了和Producer端不同的壓縮算法。」
一旦你在Broker端設(shè)置了不同的compression.type值,就一定要小心了,因為可能會發(fā)生預(yù)料之外的壓縮/解壓縮操作,通常表現(xiàn)為Broker端CPU使用率飆升。
「情況二:Broker端發(fā)生了消息格式轉(zhuǎn)換。」
所謂的消息格式轉(zhuǎn)換主要是為了兼容老版本的消費者程序。
在一個生產(chǎn)環(huán)境中,Kafka集群中同時保存多種版本的消息格式非常常見。
為了兼容老版本的格式,Broker端會對新版本消息執(zhí)行向老版本格式的轉(zhuǎn)換。
這個過程中會涉及消息的解壓縮和重新壓縮。
一般情況下這種消息格式轉(zhuǎn)換對性能是有很大影響的,除了這里的壓縮之外,它還讓Kafka喪失了Zero Copy特性。
「何時解壓縮?」
有壓縮必有解壓縮!通常來說解壓縮發(fā)生在消費者程序中,也就是說Producer發(fā)送壓縮消息到Broker后,Broker照單全收并原樣保存起來。當(dāng)Consumer程序請求這部分消息時,Broker依然原樣發(fā)送出去,當(dāng)消息到達(dá)Consumer端后,由Consumer自行解壓縮還原成之前的消息。
「基本過程:Producer端壓縮、Broker端保持、Consumer端解壓縮。」
注意:除了在Consumer端解壓縮,Broker端也會進行解壓縮。
每個壓縮過的消息集合在Broker端寫入時都要發(fā)生解壓縮操作,目的就是為了對消息執(zhí)行各種驗證。
我們必須承認(rèn)這種解壓縮對Broker端性能是有一定影響的,特別是對CPU的使用率而言。
「各種壓縮算法對比」
在Kafka 2.1.0版本之前,Kafka支持3種壓縮算法:GZIP、Snappy和LZ4。
從2.1.0開始,Kafka正式支持Zstandard算法(簡寫為zstd)。
它是Facebook開源的一個壓縮算法,能夠提供超高的壓縮比。
在實際使用中,GZIP、Snappy、LZ4和zstd的表現(xiàn)各有千秋。
但對于Kafka而言,在吞吐量方面:LZ4 > Snappy > zstd和GZIP;而在壓縮比方面,zstd > LZ4 > GZIP > Snappy。
具體到物理資源,使用Snappy算法占用的網(wǎng)絡(luò)帶寬最多,zstd最少;
在CPU使用率方面,各個算法表現(xiàn)得差不多,只是在壓縮時Snappy算法使用的CPU較多一些,而在解壓縮時GZIP算法則可能使用更多的CPU。
「最佳實踐」
?何時啟用壓縮是比較合適的時機呢?
?
啟用壓縮的一個條件就是Producer程序運行機器上的CPU資源要很充足。
除了CPU資源充足這一條件,如果你的環(huán)境中帶寬資源有限,那么建議你開啟壓縮。
消費者組
「Consumer Group是Kafka提供的可擴展且具有容錯性的消費者機制」。
既然是一個組,那么組內(nèi)必然可以有多個消費者或消費者實例,它們共享一個公共的ID,這個ID被稱為Group ID。
組內(nèi)的所有消費者協(xié)調(diào)在一起來消費訂閱主題的所有分區(qū)。
?每個分區(qū)只能由同一個消費者組內(nèi)的一個Consumer實例來消費。
?
「Consumer Group三個特性:」
Consumer Group下可以有一個或多個Consumer實例,這里的實例可以是一個單獨的進程,也可以是同一進程下的線程。 Group ID是一個字符串,在一個Kafka集群中,它標(biāo)識唯一的一個Consumer Group。 Consumer Group下所有實例訂閱的主題的單個分區(qū),只能分配給組內(nèi)的某個Consumer實例消費,這個分區(qū)當(dāng)然也可以被其他的Group消費。
當(dāng)Consumer Group訂閱了多個主題后,組內(nèi)的每個實例不要求一定要訂閱主題的所有分區(qū),它只會消費部分分區(qū)中的消息。
Consumer Group之間彼此獨立,互不影響,它們能夠訂閱相同的一組主題而互不干涉。
「Kafka僅僅使用Consumer Group這一種機制,卻同時實現(xiàn)了傳統(tǒng)消息引擎系統(tǒng)的兩大模型」:
如果所有實例都屬于同一個Group,那么它實現(xiàn)的就是消息隊列模型;
如果所有實例分別屬于不同的Group,那么它實現(xiàn)的就是發(fā)布/訂閱模型。
「一個Group下該有多少個Consumer實例呢?」
「理想情況下,Consumer實例的數(shù)量應(yīng)該等于該Group訂閱主題的分區(qū)總數(shù)。」
假設(shè)一個Consumer Group訂閱了3個主題,分別是A、B、C,它們的分區(qū)數(shù)依次是1、2、3,那么通常情況下,為該Group設(shè)置6個Consumer實例是比較理想的情形,因為它能最大限度地實現(xiàn)高伸縮性。
「針對Consumer Group,Kafka是怎么管理位移的呢?」
「位移Offset」
老版本的Consumer Group把位移保存在ZooKeeper中。
Apache ZooKeeper是一個分布式的協(xié)調(diào)服務(wù)框架,Kafka重度依賴它實現(xiàn)各種各樣的協(xié)調(diào)管理。
將位移保存在ZooKeeper外部系統(tǒng)的做法,最顯而易見的好處就是減少了Kafka Broker端的狀態(tài)保存開銷。
不過,慢慢地發(fā)現(xiàn)了一個問題,即ZooKeeper這類元框架其實并不適合進行頻繁的寫更新,而Consumer Group的位移更新卻是一個非常頻繁的操作。
這種大吞吐量的寫操作會極大地拖慢ZooKeeper集群的性能。
于是,在新版本的Consumer Group中,Kafka社區(qū)重新設(shè)計了Consumer Group的位移管理方式,采用了將位移保存在Kafka內(nèi)部主題的方法。
這個內(nèi)部主題就是__consumer_offsets。
消費者策略
「第一種是Round」
默認(rèn),也叫輪循,說的是對于同一組消費者來說,使用輪訓(xùn)分配的方式,決定消費者消費的分區(qū)

「第二種叫做Range」
對一個消費者組來說決定消費方式是以分區(qū)總數(shù)除以消費者總數(shù)來決定,一般如果不能整除,往往是從頭開始將剩余的分區(qū)分配開

「第三種叫Sticky」
是在0.11.x,新增的,它和前面兩個不是很一樣,它是在Range上的一種升華,且前面兩個當(dāng)同組內(nèi)有新的消費者加入或者舊的消費者退出的時候,會從新開始決定消費者消費方式,但是Sticky,在同組中有新的新的消費者加入或者舊的消費者退出時,不會直接開始新的Range分配,而是保留現(xiàn)有消費者原來的消費策略,將退出的消費者所消費的分區(qū)平均分配給現(xiàn)有消費者,新增消費者同理,同其他現(xiàn)存消費者的消費策略中分離
位移提交
假設(shè)一個分區(qū)中有10條消息,位移分別是0到9。
某個Consumer應(yīng)用已消費了5條消息,這就說明該Consumer消費了位移為0到4的5條消息,此時Consumer的位移是5,指向了下一條消息的位移。
因為Consumer能夠同時消費多個分區(qū)的數(shù)據(jù),所以位移的提交實際上是在分區(qū)粒度上進行的,即「Consumer需要為分配給它的每個分區(qū)提交各自的位移數(shù)據(jù)」。
「位移提交分為自動提交和手動提交;從Consumer端的角度來說,位移提交分為同步提交和異步提交」。
開啟自動提交位移的方法:Consumer端有個參數(shù)enable.auto.commit,把它設(shè)置為true或者壓根不設(shè)置它就可以了。
因為它的默認(rèn)值就是true,即Java Consumer默認(rèn)就是自動提交位移的。
如果啟用了自動提交,Consumer端還有個參數(shù):auto.commit.interval.ms。
它的默認(rèn)值是5秒,表明Kafka每5秒會為你自動提交一次位移。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
上面的第3、第4行代碼,就是開啟自動提交位移的方法。
開啟手動提交位移的方法就是設(shè)置enable.auto.commit為false。
還需要調(diào)用相應(yīng)的API手動提交位移。最簡單的API就是「KafkaConsumer#commitSync()」。
該方法會提交KafkaConsumer#poll()返回的最新位移。
從名字上來看,它是一個同步操作,即該方法會一直等待,直到位移被成功提交才會返回。
如果提交過程中出現(xiàn)異常,該方法會將異常信息拋出。
下面這段代碼展示了commitSync()的使用方法:
while (true) {
ConsumerRecords records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 處理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 處理提交失敗異常
}
}
一旦設(shè)置了enable.auto.commit為true,Kafka會保證在開始調(diào)用poll方法時,提交上次poll返回的所有消息。
從順序上來說,poll方法的邏輯是先提交上一批消息的位移,再處理下一批消息,因此它能保證不出現(xiàn)消費丟失的情況。
但自動提交位移的一個問題在于,「它可能會出現(xiàn)重復(fù)消費」。
而手動提交位移,它的好處就在于更加靈活,你完全能夠把控位移提交的時機和頻率。
但是,它也有一個缺陷,就是在調(diào)用commitSync()時,Consumer程序會處于阻塞狀態(tài),直到遠(yuǎn)端的Broker返回提交結(jié)果,這個狀態(tài)才會結(jié)束。
鑒于這個問題,Kafka社區(qū)為手動提交位移提供了另一個API方法:「KafkaConsumer#commitAsync()」。
從名字上來看它就不是同步的,而是一個異步操作。
調(diào)用commitAsync()之后,它會立即返回,不會阻塞,因此不會影響Consumer應(yīng)用的TPS。
由于它是異步的,Kafka提供了回調(diào)函數(shù)(callback),供你實現(xiàn)提交之后的邏輯,比如記錄日志或處理異常等。
下面這段代碼展示了調(diào)用commitAsync()的方法:
while (true) {
ConsumerRecords records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 處理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null)
handle(exception);
});
}
commitAsync的問題在于,出現(xiàn)問題時它不會自動重試。
顯然,如果是手動提交,我們需要將commitSync和commitAsync組合使用才能到達(dá)最理想的效果,原因有兩個:
我們可以利用commitSync的自動重試來規(guī)避那些瞬時錯誤,比如網(wǎng)絡(luò)的瞬時抖動,Broker端GC等,因為這些問題都是短暫的,自動重試通常都會成功。 我們不希望程序總處于阻塞狀態(tài),影響TPS。
我們來看一下下面這段代碼,它展示的是如何將兩個API方法結(jié)合使用進行手動提交。
try {
while(true) {
ConsumerRecords records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 處理消息
commitAysnc(); // 使用異步提交規(guī)避阻塞
}
} catch(Exception e) {
handle(e); // 處理異常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
這樣一個場景:你的poll方法返回的不是500條消息,而是5000條。
那么,你肯定不想把這5000條消息都處理完之后再提交位移,因為一旦中間出現(xiàn)差錯,之前處理的全部都要重來一遍。
比如前面這個5000條消息的例子,你可能希望每處理完100條消息就提交一次位移,這樣能夠避免大批量的消息重新消費。
Kafka Consumer API為手動提交提供了這樣的方法:commitSync(Map)和commitAsync(Map)。
它們的參數(shù)是一個Map對象,鍵就是TopicPartition,即消費的分區(qū),而值是一個OffsetAndMetadata對象,保存的主要是位移數(shù)據(jù)。
?如何每處理100條消息就提交一次位移呢?
?
在這里,我以commitAsync為例,展示一段代碼,實際上,commitSync的調(diào)用方法和它是一模一樣的。
private Map offsets = new HashMap<>();
int count = 0;
……
while (true) {
ConsumerRecords records =
consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord record: records) {
process(record); // 處理消息
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1);
if(count % 100 == 0)
consumer.commitAsync(offsets, null); // 回調(diào)處理邏輯是null
count++;
}
}
程序先是創(chuàng)建了一個Map對象,用于保存Consumer消費處理過程中要提交的分區(qū)位移,之后開始逐條處理消息,并構(gòu)造要提交的位移值。
代碼的最后部分是做位移的提交。設(shè)置了一個計數(shù)器,每累計100條消息就統(tǒng)一提交一次位移。
與調(diào)用無參的commitAsync不同,這里調(diào)用了帶Map對象參數(shù)的commitAsync進行細(xì)粒度的位移提交。
這樣,這段代碼就能夠?qū)崿F(xiàn)每處理100條消息就提交一次位移,不用再受poll方法返回的消息總數(shù)的限制了。
重平衡
「(重平衡)Rebalance本質(zhì)上是一種協(xié)議,規(guī)定了一個Consumer Group下的所有Consumer如何達(dá)成一致,來分配訂閱Topic的每個分區(qū)」。
比如某個Group下有20個Consumer實例,它訂閱了一個具有100個分區(qū)的Topic。
正常情況下,Kafka平均會為每個Consumer分配5個分區(qū)。這個分配的過程就叫Rebalance。
「Rebalance的觸發(fā)條件有3個。」
組成員數(shù)發(fā)生變更。比如有新的Consumer實例加入組或者離開組,或是有Consumer實例崩潰被踢出組。 訂閱主題數(shù)發(fā)生變更。Consumer Group可以使用正則表達(dá)式的方式訂閱主題,比如 consumer.subscribe(Pattern.compile(“t.*c”))就表明該Group訂閱所有以字母t開頭、字母c結(jié)尾的主題,在Consumer Group的運行過程中,你新創(chuàng)建了一個滿足這樣條件的主題,那么該Group就會發(fā)生Rebalance。訂閱主題的分區(qū)數(shù)發(fā)生變更。Kafka當(dāng)前只能允許增加一個主題的分區(qū)數(shù),當(dāng)分區(qū)數(shù)增加時,就會觸發(fā)訂閱該主題的所有Group開啟Rebalance。
Rebalance發(fā)生時,Group下所有的Consumer實例都會協(xié)調(diào)在一起共同參與。
「分配策略」
當(dāng)前Kafka默認(rèn)提供了3種分配策略,每種策略都有一定的優(yōu)勢和劣勢,社區(qū)會不斷地完善這些策略,保證提供最公平的分配策略,即每個Consumer實例都能夠得到較為平均的分區(qū)數(shù)。
比如一個Group內(nèi)有10個Consumer實例,要消費100個分區(qū),理想的分配策略自然是每個實例平均得到10個分區(qū)。
這就叫公平的分配策略。
舉個簡單的例子來說明一下Consumer Group發(fā)生Rebalance的過程。
假設(shè)目前某個Consumer Group下有兩個Consumer,比如A和B,當(dāng)?shù)谌齻€成員C加入時,Kafka會觸發(fā)Rebalance,并根據(jù)默認(rèn)的分配策略重新為A、B和C分配分區(qū)
Rebalance之后的分配依然是公平的,即每個Consumer實例都獲得了2個分區(qū)的消費權(quán)。
在Rebalance過程中,所有Consumer實例都會停止消費,等待Rebalance完成,這是Rebalance為人詬病的一個方面。
目前Rebalance的設(shè)計是所有Consumer實例共同參與,全部重新分配所有分區(qū)。
「Coordinator會在什么情況下認(rèn)為某個Consumer實例已掛從而要退組呢?」
當(dāng)Consumer Group完成Rebalance之后,每個Consumer實例都會定期地向Coordinator發(fā)送心跳請求,表明它還存活著。
如果某個Consumer實例不能及時地發(fā)送這些心跳請求,Coordinator就會認(rèn)為該Consumer已經(jīng)死了,從而將其從Group中移除,然后開啟新一輪Rebalance。
Consumer端有個參數(shù),叫session.timeout.ms。
該參數(shù)的默認(rèn)值是10秒,即如果Coordinator在10秒之內(nèi)沒有收到Group下某Consumer實例的心跳,它就會認(rèn)為這個Consumer實例已經(jīng)掛了。
除了這個參數(shù),Consumer還提供了一個允許你控制發(fā)送心跳請求頻率的參數(shù),就是heartbeat.interval.ms。
這個值設(shè)置得越小,Consumer實例發(fā)送心跳請求的頻率就越高。
頻繁地發(fā)送心跳請求會額外消耗帶寬資源,但好處是能夠更加快速地知曉當(dāng)前是否開啟Rebalance,因為,目前Coordinator通知各個Consumer實例開啟Rebalance的方法,就是將REBALANCE_NEEDED標(biāo)志封裝進心跳請求的響應(yīng)體中。
除了以上兩個參數(shù),Consumer端還有一個參數(shù),用于控制Consumer實際消費能力對Rebalance的影響,即max.poll.interval.ms參數(shù)。
它限定了Consumer端應(yīng)用程序兩次調(diào)用poll方法的最大時間間隔。
它的默認(rèn)值是5分鐘,表示你的Consumer程序如果在5分鐘之內(nèi)無法消費完poll方法返回的消息,那么Consumer會主動發(fā)起離開組的請求,Coordinator也會開啟新一輪Rebalance。
「可避免Rebalance的配置」
第一類Rebalance是因為未能及時發(fā)送心跳,導(dǎo)致Consumer被踢出Group而引發(fā)的
因此可以設(shè)置「session.timeout.ms和heartbeat.interval.ms」的值。
設(shè)置 session.timeout.ms= 6s。設(shè)置 heartbeat.interval.ms= 2s。要保證Consumer實例在被判定為dead之前,能夠發(fā)送至少3輪的心跳請求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。
將session.timeout.ms設(shè)置成6s主要是為了讓Coordinator能夠更快地定位已經(jīng)掛掉的Consumer。
「第二類Rebalance是Consumer消費時間過長導(dǎo)致的」。
你要為你的業(yè)務(wù)處理邏輯留下充足的時間,這樣Consumer就不會因為處理這些消息的時間太長而引發(fā)Rebalance了。
ConsumerOffsets
「Kafka將Consumer的位移數(shù)據(jù)作為一條條普通的Kafka消息,提交到__consumer_offsets中。」
「__consumer_offsets的主要作用是保存Kafka消費者的位移信息。」
它要求這個提交過程不僅要實現(xiàn)高持久性,還要支持高頻的寫操作。
__consumer_offsets主題就是普通的Kafka主題。你可以手動地創(chuàng)建它、修改它,甚至是刪除它。
雖說__consumer_offsets主題是一個普通的Kafka主題,但「它的消息格式卻是Kafka自己定義的」,用戶不能修改,也就是說你不能隨意地向這個主題寫消息,因為一旦你寫入的消息不滿足Kafka規(guī)定的格式,那么Kafka內(nèi)部無法成功解析,就會造成Broker的崩潰。
Kafka Consumer有API幫你提交位移,也就是向__consumer_offsets主題寫消息,千萬不要自己寫個Producer隨意向該主題發(fā)送消息。
__consumer_offsets有3種消息格式:
用于保存Consumer Group信息的消息。 用于刪除Group過期位移甚至是刪除Group的消息。 保存了位移值。
第2種格式它有個專屬的名字:tombstone消息,即墓碑消息,也稱delete mark,它的主要特點是它的消息體是null,即空消息體。
一旦某個Consumer Group下的所有Consumer實例都停止了,而且它們的位移數(shù)據(jù)都已被刪除時,Kafka會向__consumer_offsets主題的對應(yīng)分區(qū)寫入tombstone消息,表明要徹底刪除這個Group的信息。
__consumer_offsets是怎么被創(chuàng)建的?
通常來說,「當(dāng)Kafka集群中的第一個Consumer程序啟動時,Kafka會自動創(chuàng)建位移主題」。
「默認(rèn)該主題的分區(qū)數(shù)是50,副本數(shù)是3」。
目前Kafka Consumer提交位移的方式有兩種:「自動提交位移和手動提交位移。」
Consumer端有個參數(shù)叫enable.auto.commit,如果值是true,則Consumer在后臺默默地為你定期提交位移,提交間隔由一個專屬的參數(shù)auto.commit.interval.ms來控制。
自動提交位移有一個顯著的優(yōu)點,就是省事,你不用操心位移提交的事情,就能保證消息消費不會丟失。
但這一點同時也是缺點,喪失了很大的靈活性和可控性,你完全沒法把控Consumer端的位移管理。
Kafka Consumer API為你提供了位移提交的方法,如consumer.commitSync等。
當(dāng)調(diào)用這些方法時,Kafka會向__consumer_offsets主題寫入相應(yīng)的消息。
如果你選擇的是自動提交位移,那么就可能存在一個問題:只要Consumer一直啟動著,它就會無限期地向位移主題寫入消息。
「舉個極端一點的例子。」
假設(shè)Consumer當(dāng)前消費到了某個主題的最新一條消息,位移是100,之后該主題沒有任何新消息產(chǎn)生,故Consumer無消息可消費了,所以位移永遠(yuǎn)保持在100。
由于是自動提交位移,位移主題中會不停地寫入位移=100的消息。
顯然Kafka只需要保留這類消息中的最新一條就可以了,之前的消息都是可以刪除的。
這就要求Kafka必須要有針對位移主題消息特點的消息刪除策略,否則這種消息會越來越多,最終撐爆整個磁盤。
「Compact策略」
Kafka使用「Compact策略」來刪除__consumer_offsets主題中的過期消息,避免該主題無限期膨脹。
比如對于同一個Key的兩條消息M1和M2,如果M1的發(fā)送時間早于M2,那么M1就是過期消息。
Compact的過程就是掃描日志的所有消息,剔除那些過期的消息,然后把剩下的消息整理在一起。
我在這里貼一張來自官網(wǎng)的圖片,來說明Compact過程。

圖中位移為0、2和3的消息的Key都是K1,Compact之后,分區(qū)只需要保存位移為3的消息,因為它是最新發(fā)送的。
「Kafka提供了專門的后臺線程定期地巡檢待Compact的主題,看看是否存在滿足條件的可刪除數(shù)據(jù)」。
這個后臺線程叫Log Cleaner。
很多實際生產(chǎn)環(huán)境中都出現(xiàn)過位移主題無限膨脹占用過多磁盤空間的問題,如果你的環(huán)境中也有這個問題,建議你去檢查一下Log Cleaner線程的狀態(tài),通常都是這個線程掛掉了導(dǎo)致的。
副本機制
根據(jù)Kafka副本機制的定義,同一個分區(qū)下的所有副本保存有相同的消息序列,這些副本分散保存在不同的Broker上,從而能夠?qū)共糠諦roker宕機帶來的數(shù)據(jù)不可用。
下面展示的是一個有3臺Broker的Kafka集群上的副本分布情況。
從這張圖中,我們可以看到,主題1分區(qū)0的3個副本分散在3臺Broker上,其他主題分區(qū)的副本也都散落在不同的Broker上,從而實現(xiàn)數(shù)據(jù)冗余。

「副本角色」

在Kafka中,副本分成兩類:領(lǐng)導(dǎo)者副本(Leader Replica)和追隨者副本(Follower Replica)。
每個分區(qū)在創(chuàng)建時都要選舉一個副本,稱為領(lǐng)導(dǎo)者副本,其余的副本自動稱為追隨者副本。
在Kafka中,追隨者副本是不對外提供服務(wù)的。這就是說,任何一個追隨者副本都不能響應(yīng)消費者和生產(chǎn)者的讀寫請求。所有的請求都必須由領(lǐng)導(dǎo)者副本來處理,或者說,所有的讀寫請求都必須發(fā)往領(lǐng)導(dǎo)者副本所在的Broker,由該Broker負(fù)責(zé)處理。
追隨者副本不處理客戶端請求,它唯一的任務(wù)就是從領(lǐng)導(dǎo)者副本「異步拉取」消息,并寫入到自己的提交日志中,從而實現(xiàn)與領(lǐng)導(dǎo)者副本的同步。
當(dāng)領(lǐng)導(dǎo)者副本掛掉了,或者說領(lǐng)導(dǎo)者副本所在的Broker宕機時,Kafka依托于ZooKeeper提供的監(jiān)控功能能夠?qū)崟r感知到,并立即開啟新一輪的領(lǐng)導(dǎo)者選舉,從追隨者副本中選一個作為新的領(lǐng)導(dǎo)者。老Leader副本重啟回來后,只能作為追隨者副本加入到集群中。
對于客戶端用戶而言,Kafka的追隨者副本沒有任何作用,Kafka為什么要這樣設(shè)計呢?
這種副本機制有兩個方面的好處。
1.「方便實現(xiàn)Read-your-writes」。
所謂Read-your-writes,顧名思義就是,當(dāng)你使用生產(chǎn)者API向Kafka成功寫入消息后,馬上使用消費者API去讀取剛才生產(chǎn)的消息。
2.「方便實現(xiàn)單調(diào)讀(Monotonic Reads)」。
假設(shè)當(dāng)前有2個追隨者副本F1和F2,它們異步地拉取領(lǐng)導(dǎo)者副本數(shù)據(jù)。倘若F1拉取了Leader的最新消息而F2還未及時拉取,那么,此時如果有一個消費者先從F1讀取消息之后又從F2拉取消息,它可能會看到這樣的現(xiàn)象:第一次消費時看到的最新消息在第二次消費時不見了,這就不是單調(diào)讀一致性。
但是,如果所有的讀請求都是由Leader來處理,那么Kafka就很容易實現(xiàn)單調(diào)讀一致性。
ISR機制
In-sync Replicas,也就是所謂的ISR副本集合。
ISR中的副本都是與Leader同步的副本,相反,不在ISR中的追隨者副本就被認(rèn)為是與Leader不同步的。
?什么副本能夠進入到ISR中呢?
?
Leader副本天然就在ISR中。也就是說,「ISR不只是追隨者副本集合,它必然包括Leader副本。甚至在某些情況下,ISR只有Leader這一個副本」。
另外,能夠進入到ISR的追隨者副本要滿足一定的條件。
「通過Broker端參數(shù)replica.lag.time.max.ms參數(shù)值」。
這個參數(shù)的含義是Follower副本能夠落后Leader副本的最長時間間隔,當(dāng)前默認(rèn)值是10秒。
這就是說,只要一個Follower副本落后Leader副本的時間不連續(xù)超過10秒,那么Kafka就認(rèn)為該Follower副本與Leader是同步的,即使此時Follower副本中保存的消息明顯少于Leader副本中的消息。
Follower副本唯一的工作就是不斷地從Leader副本拉取消息,然后寫入到自己的提交日志中。
倘若該副本后面慢慢地追上了Leader的進度,那么它是能夠重新被加回ISR的。
ISR是一個動態(tài)調(diào)整的集合,而非靜態(tài)不變的。
Unclean領(lǐng)導(dǎo)者選舉
「Kafka把所有不在ISR中的存活副本都稱為非同步副本」。
通常來說,非同步副本落后Leader太多,因此,如果選擇這些副本作為新Leader,就可能出現(xiàn)數(shù)據(jù)的丟失。
畢竟,這些副本中保存的消息遠(yuǎn)遠(yuǎn)落后于老Leader中的消息。
在Kafka中,選舉這種副本的過程稱為Unclean領(lǐng)導(dǎo)者選舉。
「Broker端參數(shù)unclean.leader.election.enable控制是否允許Unclean領(lǐng)導(dǎo)者選舉」。
開啟Unclean領(lǐng)導(dǎo)者選舉可能會造成數(shù)據(jù)丟失,但好處是,它使得分區(qū)Leader副本一直存在,不至于停止對外提供服務(wù),因此提升了高可用性。反之,禁止Unclean領(lǐng)導(dǎo)者選舉的好處在于維護了數(shù)據(jù)的一致性,避免了消息丟失,但犧牲了高可用性。
副本選舉
對于kafka集群中對于任意的topic的分區(qū)以及副本leader的設(shè)定,都需要考慮到集群整體的負(fù)載能力的平衡性,會盡量分配每一個partition的副本leader在不同的broker中,這樣會避免多個leader在同一個broker,導(dǎo)致集群中的broker負(fù)載不平衡
kafka引入了優(yōu)先副本的概念,優(yōu)先副本的意思在AR(分區(qū)中的所有副本)集合列表中的第一個副本,在理想狀態(tài)下該副本就是該分區(qū)的leader副本
例如kafka集群由3臺broker組成,創(chuàng)建了一個名為topic-partitions的topic,設(shè)置partition為3,副本數(shù)為3,partition0中AR列表為 [1,2,0],那么分區(qū)0的優(yōu)先副本為1
kafka使用多副本機制提高可靠性,但是只有l(wèi)eader副本對外提供讀寫服務(wù),follow副本只是做消息同步。
「如果一個分區(qū)的leader副本不可用,就意味著整個分區(qū)不可用,此時需要從follower副本中選舉出新的leader副本提供服務(wù)」。
「在創(chuàng)建主題的時候,該分區(qū)的主題和副本會盡可能的均勻發(fā)布到kafka的各個broker上」。
比如我們在包含3個broker節(jié)點的kafka集群上創(chuàng)建一個分區(qū)數(shù)為3,副本因子為3的主題topic-partitions時,leader副本會均勻的分布在3臺broker節(jié)點上。

「針對同一個分區(qū),在同一個broker節(jié)點上不可能出現(xiàn)它的多個副本」。
我們可以把leader副本所在的節(jié)點叫作分區(qū)的leader節(jié)點,把follower副本所在的節(jié)點叫作follower節(jié)點。
在上面的例子中,分區(qū)0的leader節(jié)點是broker1,分區(qū)1的leader節(jié)點是broker2,分區(qū)2的leader節(jié)點是broker0。
當(dāng)分區(qū)leader節(jié)點發(fā)生故障時,其中的一個follower節(jié)點就會選舉為新的leader節(jié)點。
當(dāng)原來leader的節(jié)點恢復(fù)之后,它只能成為一個follower節(jié)點,此時就導(dǎo)致了集群負(fù)載不均衡。
比如分區(qū)1的leader節(jié)點broker2崩潰了,此時選舉了在broker1上的分區(qū)1follower節(jié)點作為新的leader節(jié)點。
當(dāng)broker2重新恢復(fù)時,此時的kafka集群狀態(tài)如下:

可以看到,此時broker1上負(fù)載更大,而broker2上沒有負(fù)載。
「為了解決上述負(fù)載不均衡的情況,kafka支持了優(yōu)先副本選舉,優(yōu)先副本指的是一個分區(qū)所在的AR集合的第一個副本」。
比如上面的分區(qū)1,它的AR集合是[2,0,1],表示分區(qū)1的優(yōu)先副本就是在broker2上。
理想情況下,優(yōu)先副本應(yīng)該就是leader副本,kafka保證了優(yōu)先副本的均衡分布,而這與broker節(jié)點宕機與否沒有關(guān)系。
「優(yōu)先副本選舉就是對分區(qū)leader副本進行選舉的時候,盡可能讓優(yōu)先副本成為leader副本」,針對上述的情況,只要再觸發(fā)一次優(yōu)先副本選舉就能保證分區(qū)負(fù)載均衡。
kafka支持自動優(yōu)先副本選舉功能,默認(rèn)每5分鐘觸發(fā)一次優(yōu)先副本選舉操作。
網(wǎng)絡(luò)通信模型

Broker 中有個Acceptor(mainReactor)監(jiān)聽新連接的到來,與新連接建連之后輪詢選擇一個Processor(subReactor)管理這個連接。
而Processor會監(jiān)聽其管理的連接,當(dāng)事件到達(dá)之后,讀取封裝成Request,并將Request放入共享請求隊列中。
然后IO線程池不斷的從該隊列中取出請求,執(zhí)行真正的處理。處理完之后將響應(yīng)發(fā)送到對應(yīng)的Processor的響應(yīng)隊列中,然后由Processor將Response返還給客戶端。
每個listener只有一個Acceptor線程,因為它只是作為新連接建連再分發(fā),沒有過多的邏輯,很輕量。
Processor 在Kafka中稱之為網(wǎng)絡(luò)線程,默認(rèn)網(wǎng)絡(luò)線程池有3個線程,對應(yīng)的參數(shù)是num.network.threads,并且可以根據(jù)實際的業(yè)務(wù)動態(tài)增減。
還有個 IO 線程池,即KafkaRequestHandlerPool,執(zhí)行真正的處理,對應(yīng)的參數(shù)是num.io.threads,默認(rèn)值是 8。
IO線程處理完之后會將Response放入對應(yīng)的Processor中,由Processor將響應(yīng)返還給客戶端。
可以看到網(wǎng)絡(luò)線程和IO線程之間利用的經(jīng)典的生產(chǎn)者 - 消費者模式,不論是用于處理Request的共享請求隊列,還是IO處理完返回的Response。
冪等性
「冪等性Producer」
在Kafka中,Producer默認(rèn)不是冪等性的,但我們可以創(chuàng)建冪等性Producer。
它其實是0.11.0.0版本引入的新功能,在此之前,Kafka向分區(qū)發(fā)送數(shù)據(jù)時,可能會出現(xiàn)同一條消息被發(fā)送了多次,導(dǎo)致消息重復(fù)的情況。
在0.11之后,指定Producer冪等性的方法很簡單,僅需要設(shè)置一個參數(shù)即可,即
props.put(“enable.idempotence”, ture),
或props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。
enable.idempotence被設(shè)置成true后,Producer自動升級成冪等性Producer,其他所有的代碼邏輯都不需要改變。
Kafka自動幫你做消息的重復(fù)去重。
底層具體的原理很簡單,就是經(jīng)典的用空間去換時間的優(yōu)化思路,即在Broker端多保存一些字段。
當(dāng)Producer發(fā)送了具有相同字段值的消息后,Broker能夠自動知曉這些消息已經(jīng)重復(fù)了,于是可以在后臺默默地把它們丟棄掉。
「冪等性Producer的作用范圍」
首先,它只能保證單分區(qū)上的冪等性,即一個冪等性Producer能夠保證某個主題的一個分區(qū)上不出現(xiàn)重復(fù)消息,它無法實現(xiàn)多個分區(qū)的冪等性。
其次,它只能實現(xiàn)單會話上的冪等性,不能實現(xiàn)跨會話的冪等性。
這里的會話,你可以理解為Producer進程的一次運行,當(dāng)你重啟了Producer進程之后,這種冪等性保證就喪失了。
事務(wù)
Kafka自0.11版本開始也提供了對事務(wù)的支持,目前主要是在read committed隔離級別上做事情。
它能保證多條消息原子性地寫入到目標(biāo)分區(qū),同時也能保證Consumer只能看到事務(wù)成功提交的消息。
「事務(wù)型Producer」
事務(wù)型Producer能夠保證將消息原子性地寫入到多個分區(qū)中。
這批消息要么全部寫入成功,要么全部失敗,另外,事務(wù)型Producer也不懼進程的重啟。
Producer重啟回來后,Kafka依然保證它們發(fā)送消息的精確一次處理。
設(shè)置事務(wù)型Producer的方法也很簡單,滿足兩個要求即可:
和冪等性Producer一樣,開啟 enable.idempotence = true。設(shè)置Producer端參數(shù) transactional. id,最好為其設(shè)置一個有意義的名字。
此外,你還需要在Producer代碼中做一些調(diào)整,如這段代碼所示:
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
和普通Producer代碼相比,事務(wù)型Producer的顯著特點是調(diào)用了一些事務(wù)API,如initTransaction、beginTransaction、commitTransaction和abortTransaction,它們分別對應(yīng)事務(wù)的初始化、事務(wù)開始、事務(wù)提交以及事務(wù)終止。
這段代碼能夠保證Record1和Record2被當(dāng)作一個事務(wù)統(tǒng)一提交到Kafka,要么它們?nèi)刻峤怀晒Γ慈繉懭胧 ?/p>
實際上即使寫入失敗,Kafka也會把它們寫入到底層的日志中,也就是說Consumer還是會看到這些消息。
有一個isolation.level參數(shù),這個參數(shù)有兩個取值:
read_uncommitted:這是默認(rèn)值,表明Consumer能夠讀取到Kafka寫入的任何消息,不論事務(wù)型Producer提交事務(wù)還是終止事務(wù),其寫入的消息都可以讀取,如果你用了事務(wù)型Producer,那么對應(yīng)的Consumer就不要使用這個值。read_committed:表明Consumer只會讀取事務(wù)型Producer成功提交事務(wù)寫入的消息,它也能看到非事務(wù)型Producer寫入的所有消息。
攔截器
「Kafka攔截器分為生產(chǎn)者攔截器和消費者攔截器」。
生產(chǎn)者攔截器允許你在發(fā)送消息前以及消息提交成功后植入你的攔截器邏輯;
而消費者攔截器支持在消費消息前以及提交位移后編寫特定邏輯。
可以將一組攔截器串連成一個大的攔截器,Kafka會按照添加順序依次執(zhí)行攔截器邏輯。
當(dāng)前Kafka攔截器的設(shè)置方法是通過參數(shù)配置完成的,生產(chǎn)者和消費者兩端有一個相同的參數(shù)interceptor.classes,它指定的是一組類的列表,每個類就是特定邏輯的攔截器實現(xiàn)類。
Properties props = new Properties();
List interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 攔截器1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 攔截器2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
……
?怎么編寫AddTimeStampInterceptor和UpdateCounterInterceptor類呢?
?
這兩個類以及你自己編寫的所有Producer端攔截器實現(xiàn)類都要繼承org.apache.kafka.clients.producer.ProducerInterceptor接口。
該接口是Kafka提供的,里面有兩個核心的方法。
onSend:該方法會在消息發(fā)送之前被調(diào)用。 onAcknowledgement:該方法會在消息成功提交或發(fā)送失敗之后被調(diào)用。onAcknowledgement的調(diào)用要早于callback的調(diào)用。值得注意的是,這個方法和onSend不是在同一個線程中被調(diào)用的,因此如果你在這兩個方法中調(diào)用了某個共享可變對象,一定要保證線程安全。
同理,指定消費者攔截器也是同樣的方法,只是具體的實現(xiàn)類要實現(xiàn)org.apache.kafka.clients.consumer.ConsumerInterceptor接口,這里面也有兩個核心方法。
onConsume:該方法在消息返回給Consumer程序之前調(diào)用。 onCommit:Consumer在提交位移之后調(diào)用該方法。通常你可以在該方法中做一些記賬類的動作,比如打日志等。
一定要注意的是,「指定攔截器類時要指定它們的全限定名」。
通俗點說就是要把完整包名也加上,不要只有一個類名在那里,并且還要保證你的Producer程序能夠正確加載你的攔截器類。
控制器
「控制器組件(Controller),它的主要作用是在Apache ZooKeeper的幫助下管理和協(xié)調(diào)整個Kafka集群」。
集群中任意一臺Broker都能充當(dāng)控制器的角色,但是,在運行過程中,只能有一個Broker成為控制器,行使其管理和協(xié)調(diào)的職責(zé)。
Kafka控制器大量使用ZooKeeper的Watch功能實現(xiàn)對集群的協(xié)調(diào)管理。
「控制器是如何被選出來的」
實際上,Broker在啟動時,會嘗試去ZooKeeper中創(chuàng)建/controller節(jié)點。
Kafka當(dāng)前選舉控制器的規(guī)則是:「第一個成功創(chuàng)建/controller節(jié)點的Broker會被指定為控制器」。
「控制器是做什么的」
控制器的職責(zé)大致可以分為5種:
1.「主題管理(創(chuàng)建、刪除、增加分區(qū))」
控制器幫助我們完成對Kafka主題的創(chuàng)建、刪除以及分區(qū)增加的操作。
2.「分區(qū)重分配」
3.「Preferred領(lǐng)導(dǎo)者選舉」
Preferred領(lǐng)導(dǎo)者選舉主要是Kafka為了避免部分Broker負(fù)載過重而提供的一種換Leader的方案。
4.「集群成員管理(新增Broker、Broker主動關(guān)閉、Broker宕機)」
包括自動檢測新增Broker、Broker主動關(guān)閉及被動宕機。
這種自動檢測是依賴于Watch功能和ZooKeeper臨時節(jié)點組合實現(xiàn)的。
比如,控制器組件會利用「Watch機制」檢查ZooKeeper的/brokers/ids節(jié)點下的子節(jié)點數(shù)量變更。
目前,當(dāng)有新Broker啟動后,它會在/brokers下創(chuàng)建專屬的znode節(jié)點。
一旦創(chuàng)建完畢,ZooKeeper會通過Watch機制將消息通知推送給控制器,這樣,控制器就能自動地感知到這個變化,進而開啟后續(xù)的新增Broker作業(yè)。
偵測Broker存活性則是依賴于剛剛提到的另一個機制:「臨時節(jié)點」。
每個Broker啟動后,會在/brokers/ids下創(chuàng)建一個臨時znode。
當(dāng)Broker宕機或主動關(guān)閉后,該Broker與ZooKeeper的會話結(jié)束,這個znode會被自動刪除。
同理,ZooKeeper的Watch機制將這一變更推送給控制器,這樣控制器就能知道有Broker關(guān)閉或宕機了,從而進行善后。
5.「數(shù)據(jù)服務(wù)」
控制器上保存了最全的集群元數(shù)據(jù)信息,其他所有Broker會定期接收控制器發(fā)來的元數(shù)據(jù)更新請求,從而更新其內(nèi)存中的緩存數(shù)據(jù)。
「控制器故障轉(zhuǎn)移(Failover)」
「故障轉(zhuǎn)移指的是,當(dāng)運行中的控制器突然宕機或意外終止時,Kafka能夠快速地感知到,并立即啟用備用控制器來代替之前失敗的控制器」。這個過程就被稱為Failover,該過程是自動完成的,無需你手動干預(yù)。

最開始時,Broker 0是控制器。當(dāng)Broker 0宕機后,ZooKeeper通過Watch機制感知到并刪除了/controller臨時節(jié)點。
之后,所有存活的Broker開始競選新的控制器身份。
Broker 3最終贏得了選舉,成功地在ZooKeeper上重建了/controller節(jié)點。
之后,Broker 3會從ZooKeeper中讀取集群元數(shù)據(jù)信息,并初始化到自己的緩存中。
至此,控制器的Failover完成,可以行使正常的工作職責(zé)了。
日志存儲
Kafka中的消息是以主題為基本單位進行歸類的,每個主題在邏輯上相互獨立。
每個主題又可以分為一個或多個分區(qū),在不考慮副本的情況下,一個分區(qū)會對應(yīng)一個日志。
但設(shè)計者考慮到隨著時間推移,日志文件會不斷擴大,因此為了防止Log過大,設(shè)計者引入了日志分段(LogSegment)的概念,將Log切分為多個LogSegment,便于后續(xù)的消息維護和清理工作。
下圖描繪了主題、分區(qū)、副本、Log、LogSegment五者之間的關(guān)系。

「LogSegment」
在Kafka中,每個Log對象又可以劃分為多個LogSegment文件,每個LogSegment文件包括一個日志數(shù)據(jù)文件和兩個索引文件(偏移量索引文件和消息時間戳索引文件)。
其中,每個LogSegment中的日志數(shù)據(jù)文件大小均相等(該日志數(shù)據(jù)文件的大小可以通過在Kafka Broker的config/server.properties配置文件的中的「log.segment.bytes」進行設(shè)置,默認(rèn)為1G大小(1073741824字節(jié)),在順序?qū)懭胂r如果超出該設(shè)定的閾值,將會創(chuàng)建一組新的日志數(shù)據(jù)和索引文件)。

常用參數(shù)
「broker端配置」
broker.id
每個 kafka broker 都有一個唯一的標(biāo)識來表示,這個唯一的標(biāo)識符即是 broker.id,它的默認(rèn)值是 0。
這個值在 kafka 集群中必須是唯一的,這個值可以任意設(shè)定,
port
如果使用配置樣本來啟動 kafka,它會監(jiān)聽 9092 端口,修改 port 配置參數(shù)可以把它設(shè)置成任意的端口。
要注意,如果使用 1024 以下的端口,需要使用 root 權(quán)限啟動 kakfa。
zookeeper.connect
用于保存 broker 元數(shù)據(jù)的 Zookeeper 地址是通過 zookeeper.connect 來指定的。
比如可以這么指定 localhost:2181 表示這個 Zookeeper 是運行在本地 2181 端口上的。
我們也可以通過 比如我們可以通過 zk1:2181,zk2:2181,zk3:2181 來指定 zookeeper.connect 的多個參數(shù)值。
該配置參數(shù)是用冒號分割的一組 hostname:port/path 列表,其含義如下
hostname 是 Zookeeper 服務(wù)器的機器名或者 ip 地址。
port 是 Zookeeper 客戶端的端口號
/path 是可選擇的 Zookeeper 路徑,Kafka 路徑是使用了
chroot環(huán)境,如果不指定默認(rèn)使用跟路徑。
?如果你有兩套 Kafka 集群,假設(shè)分別叫它們 kafka1 和 kafka2,那么兩套集群的
?zookeeper.connect參數(shù)可以這樣指定:zk1:2181,zk2:2181,zk3:2181/kafka1和zk1:2181,zk2:2181,zk3:2181/kafka2
log.dirs
Kafka 把所有的消息都保存到磁盤上,存放這些日志片段的目錄是通過 log.dirs 來制定的,它是用一組逗號來分割的本地系統(tǒng)路徑,log.dirs 是沒有默認(rèn)值的,「你必須手動指定他的默認(rèn)值」。
其實還有一個參數(shù)是 log.dir,這個配置是沒有 s 的,默認(rèn)情況下只用配置 log.dirs 就好了,比如你可以通過 /home/kafka1,/home/kafka2,/home/kafka3 這樣來配置這個參數(shù)的值。
auto.create.topics.enable
默認(rèn)情況下,kafka 會自動創(chuàng)建主題
auto.create.topics.enable參數(shù)建議最好設(shè)置成 false,即不允許自動創(chuàng)建 Topic。
「主題相關(guān)配置」
num.partitions
num.partitions 參數(shù)指定了新創(chuàng)建的主題需要包含多少個分區(qū),該參數(shù)的默認(rèn)值是 1。
default.replication.factor
這個參數(shù)比較簡單,它表示 kafka保存消息的副本數(shù)。
log.retention.ms
Kafka 通常根據(jù)時間來決定數(shù)據(jù)可以保留多久。
默認(rèn)使用log.retention.hours參數(shù)來配置時間,默認(rèn)是 168 個小時,也就是一周。
除此之外,還有兩個參數(shù)log.retention.minutes 和log.retentiion.ms 。
這三個參數(shù)作用是一樣的,都是決定消息多久以后被刪除,推薦使用log.retention.ms。
message.max.bytes
broker 通過設(shè)置 message.max.bytes 參數(shù)來限制單個消息的大小,默認(rèn)是 1000 000, 也就是 1MB,如果生產(chǎn)者嘗試發(fā)送的消息超過這個大小,不僅消息不會被接收,還會收到 broker 返回的錯誤消息。
retention.ms
規(guī)定了該主題消息被保存的時常,默認(rèn)是7天,即該主題只能保存7天的消息,一旦設(shè)置了這個值,它會覆蓋掉 Broker 端的全局參數(shù)值。
消息丟失問題
「生產(chǎn)者程序丟失數(shù)據(jù)」
目前Kafka Producer是異步發(fā)送消息的,也就是說如果你調(diào)用的是producer.send(msg)這個API,那么它通常會立即返回,但此時你不能認(rèn)為消息發(fā)送已成功完成。
如果用這個方式,可能會有哪些因素導(dǎo)致消息沒有發(fā)送成功呢?
其實原因有很多,例如網(wǎng)絡(luò)抖動,導(dǎo)致消息壓根就沒有發(fā)送到Broker端;或者消息本身不合格導(dǎo)致Broker拒絕接收(比如消息太大了,超過了Broker的承受能力)等。
實際上,解決此問題的方法非常簡單:Producer永遠(yuǎn)要使用帶有回調(diào)通知的發(fā)送API,也就是說不要使用producer.send(msg),而要使用producer.send(msg, callback)。
它能準(zhǔn)確地告訴你消息是否真的提交成功了。
一旦出現(xiàn)消息提交失敗的情況,你就可以有針對性地進行處理。
「消費者程序丟失數(shù)據(jù)」
Consumer端丟失數(shù)據(jù)主要體現(xiàn)在Consumer端要消費的消息不見了。
下面這張圖它清晰地展示了Consumer端的位移數(shù)據(jù)。

比如對于Consumer A而言,它當(dāng)前的位移值就是9;Consumer B的位移值是11。
Consumer程序從Kafka獲取到消息后開啟了多個線程異步處理消息,而Consumer程序自動地向前更新位移。
假如其中某個線程運行失敗了,它負(fù)責(zé)的消息沒有被成功處理,但位移已經(jīng)被更新了,因此這條消息對于Consumer而言實際上是丟失了。
這里的關(guān)鍵在于Consumer自動提交位移。
這個問題的解決方案也很簡單:
「如果是多線程異步處理消費消息,Consumer程序不要開啟自動提交位移,而是要應(yīng)用程序手動提交位移」。
最佳實踐
總結(jié)Kafka無消息丟失的配置:
不要使用 producer.send(msg),而要使用producer.send(msg, callback),一定要使用帶有回調(diào)通知的send方法。設(shè)置 acks = all,acks是Producer的一個參數(shù),代表了你對已提交消息的定義,如果設(shè)置成all,則表明所有副本Broker都要接收到消息,該消息才算是已提交。設(shè)置retries為一個較大的值。這里的retries同樣是Producer的參數(shù),對應(yīng)前面提到的Producer自動重試,當(dāng)出現(xiàn)網(wǎng)絡(luò)的瞬時抖動時,消息發(fā)送可能會失敗,此時配置了 retries > 0的Producer能夠自動重試消息發(fā)送,避免消息丟失。設(shè)置 unclean.leader.election.enable = false,這是Broker端的參數(shù),它控制的是哪些Broker有資格競選分區(qū)的Leader,如果一個Broker落后原先的Leader太多,那么它一旦成為新的Leader,必然會造成消息的丟失,故一般都要將該參數(shù)設(shè)置成false,即不允許這種情況的發(fā)生。設(shè)置 replication.factor >= 3,這也是Broker端的參數(shù),將消息多保存幾份,目前防止消息丟失的主要機制就是冗余。設(shè)置 min.insync.replicas > 1,這依然是Broker端參數(shù),控制的是消息至少要被寫入到多少個副本才算是已提交,設(shè)置成大于1可以提升消息持久性,在實際環(huán)境中千萬不要使用默認(rèn)值1。確保 replication.factor > min.insync.replicas,如果兩者相等,那么只要有一個副本掛機,整個分區(qū)就無法正常工作了,我們不僅要改善消息的持久性,防止數(shù)據(jù)丟失,還要在不降低可用性的基礎(chǔ)上完成,推薦設(shè)置成replication.factor = min.insync.replicas + 1。確保消息消費完成再提交,Consumer端有個參數(shù) enable.auto.commit,最好把它設(shè)置成false,并采用手動提交位移的方式。
重復(fù)消費問題
「消費重復(fù)的場景」
在enable.auto.commit 默認(rèn)值true情況下,出現(xiàn)重復(fù)消費的場景有以下幾種:
?consumer 在消費過程中,應(yīng)用進程被強制kill掉或發(fā)生異常退出。
?
例如在一次poll 500條消息后,消費到200條時,進程被強制kill消費到offset未提交,或出現(xiàn)異常退出導(dǎo)致消費到offset未提交。
下次重啟時,依然會重新拉取500消息,造成之前消費到200條消息重復(fù)消費了兩次。
解決方案:在發(fā)生異常時正確處理未提交的offset
「消費者消費時間過長」
max.poll.interval.ms參數(shù)定義了兩次poll的最大間隔,它的默認(rèn)值是 5 分鐘,表示你的 Consumer 程序如果在 5 分鐘之內(nèi)無法消費完 poll 方法返回的消息,那么 Consumer 會主動發(fā)起離開組的請求,Coordinator 也會開啟新一輪 Rebalance。
舉例:單次拉取11條消息,每條消息耗時30s,11條消息耗時5分鐘30秒,由于max.poll.interval.ms 默認(rèn)值5分鐘,所以消費者無法在5分鐘內(nèi)消費完,consumer會離開組,導(dǎo)致rebalance。
在消費完11條消息后,consumer會重新連接broker,再次rebalance,因為上次消費的offset未提交,再次拉取的消息是之前消費過的消息,造成重復(fù)消費。
「解決方案:」
1、提高消費能力,提高單條消息的處理速度;根據(jù)實際場景可講max.poll.interval.ms值設(shè)置大一點,避免不必要的rebalance;可適當(dāng)減小max.poll.records的值,默認(rèn)值是500,可根據(jù)實際消息速率適當(dāng)調(diào)小。
2、生成消息時,可加入唯一標(biāo)識符如消息id,在消費端,保存最近的1000條消息id存入到redis或mysql中,消費的消息時通過前置去重。
消息順序問題
我們都知道kafka的topic是無序的,但是一個topic包含多個partition,每個partition內(nèi)部是有序的

「亂序場景1」
因為一個topic可以有多個partition,kafka只能保證partition內(nèi)部有序
「解決方案」
1、可以設(shè)置topic,有且只有一個partition
2、根據(jù)業(yè)務(wù)需要,需要順序的 指定為同一個partition
3、根據(jù)業(yè)務(wù)需要,比如同一個訂單,使用同一個key,可以保證分配到同一個partition上
「亂序場景2」
對于同一業(yè)務(wù)進入了同一個消費者組之后,用了多線程來處理消息,會導(dǎo)致消息的亂序
「解決方案」
消費者內(nèi)部根據(jù)線程數(shù)量創(chuàng)建等量的內(nèi)存隊列,對于需要順序的一系列業(yè)務(wù)數(shù)據(jù),根據(jù)key或者業(yè)務(wù)數(shù)據(jù),放到同一個內(nèi)存隊列中,然后線程從對應(yīng)的內(nèi)存隊列中取出并操作

「通過設(shè)置相同key來保證消息有序性,會有一點缺陷:」
例如消息發(fā)送設(shè)置了重試機制,并且異步發(fā)送,消息A和B設(shè)置相同的key,業(yè)務(wù)上A先發(fā),B后發(fā),由于網(wǎng)絡(luò)或者其他原因A發(fā)送失敗,B發(fā)送成功;A由于發(fā)送失敗就會重試且重試成功,這時候消息順序B在前A在后,與業(yè)務(wù)發(fā)送順序不一致,如果需要解決這個問題,需要設(shè)置參數(shù)max.in.flight.requests.per.connection=1,其含義是限制客戶端在單個連接上能夠發(fā)送的未響應(yīng)請求的個數(shù),設(shè)置此值是1表示kafka broker在響應(yīng)請求之前client不能再向同一個broker發(fā)送請求,這個參數(shù)默認(rèn)值是5
?官方文檔說明,這個參數(shù)如果大于1,由于重試消息順序可能重排
?
高性能原因
「順序讀寫」
kafka的消息是不斷追加到文件中的,這個特性使kafka可以充分利用磁盤的順序讀寫性能
順序讀寫不需要硬盤磁頭的尋道時間,只需很少的扇區(qū)旋轉(zhuǎn)時間,所以速度遠(yuǎn)快于隨機讀寫
Kafka 可以配置異步刷盤,不開啟同步刷盤,異步刷盤不需要等寫入磁盤后返回消息投遞的 ACK,所以它提高了消息發(fā)送的吞吐量,降低了請求的延時
「零拷貝」
傳統(tǒng)的 IO 流程,需要先把數(shù)據(jù)拷貝到內(nèi)核緩沖區(qū),再從內(nèi)核緩沖拷貝到用戶空間,應(yīng)用程序處理完成以后,再拷貝回內(nèi)核緩沖區(qū)
這個過程中發(fā)生了多次數(shù)據(jù)拷貝
為了減少不必要的拷貝,Kafka 依賴 Linux 內(nèi)核提供的 Sendfile 系統(tǒng)調(diào)用
在 Sendfile 方法中,數(shù)據(jù)在內(nèi)核緩沖區(qū)完成輸入和輸出,不需要拷貝到用戶空間處理,這也就避免了重復(fù)的數(shù)據(jù)拷貝
在具體的操作中,Kafka 把所有的消息都存放在單獨的文件里,在消息投遞時直接通過 Sendfile 方法發(fā)送文件,減少了上下文切換,因此大大提高了性能
「MMAP技術(shù)」
除了 Sendfile 之外,還有一種零拷貝的實現(xiàn)技術(shù),即 Memory Mapped Files
Kafka 使用 Memory Mapped Files 完成內(nèi)存映射,Memory Mapped Files 對文件的操作不是 write/read,而是直接對內(nèi)存地址的操作,如果是調(diào)用文件的 read 操作,則把數(shù)據(jù)先讀取到內(nèi)核空間中,然后再復(fù)制到用戶空間,但 MMAP可以將文件直接映射到用戶態(tài)的內(nèi)存空間,省去了用戶空間到內(nèi)核空間復(fù)制的開銷
Producer生產(chǎn)的數(shù)據(jù)持久化到broker,采用mmap文件映射,實現(xiàn)順序的快速寫入
Customer從broker讀取數(shù)據(jù),采用sendfile,將磁盤文件讀到OS內(nèi)核緩沖區(qū)后,直接轉(zhuǎn)到socket buffer進行網(wǎng)絡(luò)發(fā)送。
「批量發(fā)送讀取」
Kafka 的批量包括批量寫入、批量發(fā)布等。它在消息投遞時會將消息緩存起來,然后批量發(fā)送
同樣,消費端在消費消息時,也不是一條一條處理的,而是批量進行拉取,提高了消息的處理速度
「數(shù)據(jù)壓縮」
Kafka還支持對消息集合進行壓縮,Producer可以通過GZIP或Snappy格式對消息集合進行壓縮
壓縮的好處就是減少傳輸?shù)臄?shù)據(jù)量,減輕對網(wǎng)絡(luò)傳輸?shù)膲毫?/p>
Producer壓縮之后,在Consumer需進行解壓,雖然增加了CPU的工作,但在對大數(shù)據(jù)處理上,瓶頸在網(wǎng)絡(luò)上而不是CPU,所以這個成本很值得
「分區(qū)機制」
kafka中的topic中的內(nèi)容可以被分為多partition存在,每個partition又分為多個段segment,所以每次操作都是針對一小部分做操作,很輕便,并且增加并行操作的能力
常見面試題
「Kafka是Push還是Pull模式?」
Kafka最初考慮的問題是,customer應(yīng)該從brokes拉取消息還是brokers將消息推送到consumer。
在這方面,Kafka遵循了一種大部分消息系統(tǒng)共同的傳統(tǒng)的設(shè)計:producer將消息推送到broker,consumer從broker拉取消息。
push模式由broker決定消息推送的速率,對于不同消費速率的consumer就不太好處理了。
消息系統(tǒng)都致力于讓consumer以最大的速率最快速的消費消息,push模式下,當(dāng)broker推送的速率遠(yuǎn)大于consumer消費的速率時,consumer恐怕就要崩潰了。
?Kafka中的Producer和Consumer采用的是Push-and-Pull模式,即Producer向Broker Push消息,Consumer從Broker Pull消息。
?
Pull模式的一個好處是consumer可以自主決定是否批量的從broker拉取數(shù)據(jù)。
Pull有個缺點是,如果broker沒有可供消費的消息,將導(dǎo)致consumer不斷在循環(huán)中輪詢,直到新消息到達(dá)。
「Kafka如何保證高可用?」
「Kafk的使用場景」
業(yè)界Kafka實際應(yīng)用場景
?異步通信
?
消息中間件在異步通信中用的最多,很多業(yè)務(wù)流程中,如果所有步驟都同步進行可能會導(dǎo)致核心流程耗時非常長,更重要的是所有步驟都同步進行一旦非核心步驟失敗會導(dǎo)致核心流程整體失敗,因此在很多業(yè)務(wù)流程中Kafka就充當(dāng)了異步通信角色。
?日志同步
?
大規(guī)模分布式系統(tǒng)中的機器非常多而且分散在不同機房中,分布式系統(tǒng)帶來的一個明顯問題就是業(yè)務(wù)日志的查看、追蹤和分析等行為變得十分困難,對于集群規(guī)模在百臺以上的系統(tǒng),查詢線上日志很恐怖。
為了應(yīng)對這種場景統(tǒng)一日志系統(tǒng)應(yīng)運而生,日志數(shù)據(jù)都是海量數(shù)據(jù),通常為了不給系統(tǒng)帶來額外負(fù)擔(dān)一般會采用異步上報,這里Kafka以其高吞吐量在日志處理中得到了很好的應(yīng)用。
?實時計算
?
隨著據(jù)量的增加,離線的計算會越來越慢,難以滿足用戶在某些場景下的實時性要求,因此很多解決方案中引入了實時計算。
很多時候,即使是海量數(shù)據(jù),我們也希望即時去查看一些數(shù)據(jù)指標(biāo),實時流計算應(yīng)運而生。
實時流計算有兩個特點,一個是實時,隨時可以看數(shù)據(jù);另一個是流。
