Kafka體系架構(gòu)詳細分解
點擊上方藍色字體,選擇“設(shè)為星標(biāo)”

作者:luozhiyun
地址:http://suo.im/5uYoJ0
基本概念
Kafka 體系架構(gòu)

Kafka 體系架構(gòu)包括若干 Producer、若干 Broker、若干 Consumer,以及一個 ZooKeeper 集群。
在 Kafka 中還有兩個特別重要的概念—主題(Topic)與分區(qū)(Partition)。
Kafka 中的消息以主題為單位進行歸類,生產(chǎn)者負責(zé)將消息發(fā)送到特定的主題(發(fā)送到 Kafka 集群中的每一條消息都要指定一個主題),而消費者負責(zé)訂閱主題并進行消費。
主題是一個邏輯上的概念,它還可以細分為多個分區(qū),一個分區(qū)只屬于單個主題,很多時候也會把分區(qū)稱為主題分區(qū)(Topic-Partition)。
Kafka 為分區(qū)引入了多副本(Replica)機制,通過增加副本數(shù)量可以提升容災(zāi)能力。同一分區(qū)的不同副本中保存的是相同的消息(在同一時刻,副本之間并非完全一樣),副本之間是“一主多從”的關(guān)系,其中 leader 副本負責(zé)處理讀寫請求,follower 副本只負責(zé)與 leader 副本的消息同步。當(dāng) leader 副本出現(xiàn)故障時,從 follower 副本中重新選舉新的 leader 副本對外提供服務(wù)。

如上圖所示,Kafka 集群中有4個 broker,某個主題中有3個分區(qū),且副本因子(即副本個數(shù))也為3,如此每個分區(qū)便有1個 leader 副本和2個 follower 副本。
數(shù)據(jù)同步
分區(qū)中的所有副本統(tǒng)稱為 AR(Assigned Replicas)。所有與 leader 副本保持一定程度同步的副本(包括 leader 副本在內(nèi))組成ISR(In-Sync Replicas),ISR 集合是 AR 集合中的一個子集。
與 leader 副本同步滯后過多的副本(不包括 leader 副本)組成 OSR(Out-of-Sync Replicas),由此可見,AR=ISR+OSR。在正常情況下,所有的 follower 副本都應(yīng)該與 leader 副本保持一定程度的同步,即 AR=ISR,OSR 集合為空。
Leader 副本負責(zé)維護和跟蹤 ISR 集合中所有 follower 副本的滯后狀態(tài),當(dāng) follower 副本落后太多或失效時,leader 副本會把它從 ISR 集合中剔除。默認情況下,當(dāng) leader 副本發(fā)生故障時,只有在 ISR 集合中的副本才有資格被選舉為新的 leader。
HW 是 High Watermark 的縮寫,俗稱高水位,它標(biāo)識了一個特定的消息偏移量(offset),消費者只能拉取到這個 offset 之前的消息。
LEO 是 Log End Offset 的縮寫,它標(biāo)識當(dāng)前日志文件中下一條待寫入消息的 offset。

如上圖所示,第一條消息的 offset(LogStartOffset)為0,最后一條消息的 offset 為8,offset 為9的消息用虛線框表示,代表下一條待寫入的消息。日志文件的 HW 為6,表示消費者只能拉取到 offset 在0至5之間的消息,而 offset 為6的消息對消費者而言是不可見的。
Kafka生產(chǎn)者客戶端的整體結(jié)構(gòu)

整個生產(chǎn)者客戶端由兩個線程協(xié)調(diào)運行,這兩個線程分別為主線程和 Sender 線程(發(fā)送線程)。
在主線程中由 KafkaProducer 創(chuàng)建消息,然后通過可能的攔截器、序列化器和分區(qū)器的作用之后緩存到消息累加器(RecordAccumulator,也稱為消息收集器)中。Sender 線程負責(zé)從 RecordAccumulator 中獲取消息并將其發(fā)送到 Kafka 中。
RecordAccumulator
RecordAccumulator 主要用來緩存消息以便 Sender 線程可以批量發(fā)送,進而減少網(wǎng)絡(luò)傳輸?shù)馁Y源消耗以提升性能。
主線程中發(fā)送過來的消息都會被追加到 RecordAccumulator 的某個雙端隊列(Deque)中,在 RecordAccumulator 的內(nèi)部為每個分區(qū)都維護了一個雙端隊列。
消息寫入緩存時,追加到雙端隊列的尾部;Sender 讀取消息時,從雙端隊列的頭部讀取。
Sender 從 RecordAccumulator 中獲取緩存的消息之后,會進一步將原本<分區(qū), Deque< ProducerBatch>> 的保存形式轉(zhuǎn)變成
KafkaProducer 要將此消息追加到指定主題的某個分區(qū)所對應(yīng)的 leader 副本之前,首先需要知道主題的分區(qū)數(shù)量,然后經(jīng)過計算得出(或者直接指定)目標(biāo)分區(qū),之后 KafkaProducer 需要知道目標(biāo)分區(qū)的 leader 副本所在的 broker 節(jié)點的地址、端口等信息才能建立連接,最終才能將消息發(fā)送到 Kafka。
所以這里需要一個轉(zhuǎn)換,對于網(wǎng)絡(luò)連接來說,生產(chǎn)者客戶端是與具體的 broker 節(jié)點建立的連接,也就是向具體的 broker 節(jié)點發(fā)送消息,而并不關(guān)心消息屬于哪一個分區(qū)。
InFlightRequests
請求在從 Sender 線程發(fā)往 Kafka 之前還會保存到 InFlightRequests 中,InFlightRequests 保存對象的具體形式為 Map
攔截器
生產(chǎn)者攔截器既可以用來在消息發(fā)送前做一些準(zhǔn)備工作,比如按照某個規(guī)則過濾不符合要求的消息、修改消息的內(nèi)容等,也可以用來在發(fā)送回調(diào)邏輯前做一些定制化的需求,比如統(tǒng)計類工作。
生產(chǎn)者攔截器的使用也很方便,主要是自定義實現(xiàn) org.apache.kafka.clients.producer. ProducerInterceptor 接口。ProducerInterceptor 接口中包含3個方法:
Copypublic ProducerRecordonSend(ProducerRecord record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
KafkaProducer 在將消息序列化和計算分區(qū)之前會調(diào)用生產(chǎn)者攔截器的 onSend() 方法來對消息進行相應(yīng)的定制化操作。一般來說最好不要修改消息 ProducerRecord 的 topic、key 和 partition 等信息。
KafkaProducer 會在消息被應(yīng)答(Acknowledgement)之前或消息發(fā)送失敗時調(diào)用生產(chǎn)者攔截器的 onAcknowledgement() 方法,優(yōu)先于用戶設(shè)定的 Callback 之前執(zhí)行。這個方法運行在 Producer 的I/O線程中,所以這個方法中實現(xiàn)的代碼邏輯越簡單越好,否則會影響消息的發(fā)送速度。
close() 方法主要用于在關(guān)閉攔截器時執(zhí)行一些資源的清理工作。
序列化器
生產(chǎn)者需要用序列化器(Serializer)把對象轉(zhuǎn)換成字節(jié)數(shù)組才能通過網(wǎng)絡(luò)發(fā)送給 Kafka。而在對側(cè),消費者需要用反序列化器(Deserializer)把從 Kafka 中收到的字節(jié)數(shù)組轉(zhuǎn)換成相應(yīng)的對象。
生產(chǎn)者使用的序列化器和消費者使用的反序列化器是需要一一對應(yīng)的,如果生產(chǎn)者使用了某種序列化器,比如 StringSerializer,而消費者使用了另一種序列化器,比如 IntegerSerializer,那么是無法解析出想要的數(shù)據(jù)的。
序列化器都需要實現(xiàn)org.apache.kafka.common.serialization.Serializer 接口,此接口有3個方法:
Copypublic void configure(Mapconfigs, boolean isKey)
public byte[] serialize(String topic, T data)
public void close()
configure() 方法用來配置當(dāng)前類,serialize() 方法用來執(zhí)行序列化操作。而 close() 方法用來關(guān)閉當(dāng)前的序列化器。
如下:
Copypublic class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";
@Override
public void configure(Mapconfigs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" :
"value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue != null && encodingValue instanceof String)
encoding = (String) encodingValue;
}
@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing " +
"string to byte[] due to unsupported encoding " + encoding);
}
}
@Override
public void close() {
// nothing to do
}
}
configure() 方法,這個方法是在創(chuàng)建 KafkaProducer 實例的時候調(diào)用的,主要用來確定編碼類型。
serialize用來編解碼,如果 Kafka 客戶端提供的幾種序列化器都無法滿足應(yīng)用需求,則可以選擇使用如 Avro、JSON、Thrift、ProtoBuf 和 Protostuff 等通用的序列化工具來實現(xiàn),或者使用自定義類型的序列化器來實現(xiàn)。
分區(qū)器
消息經(jīng)過序列化之后就需要確定它發(fā)往的分區(qū),如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分區(qū)器的作用,因為 partition 代表的就是所要發(fā)往的分區(qū)號。
如果消息 ProducerRecord 中沒有指定 partition 字段,那么就需要依賴分區(qū)器,根據(jù) key 這個字段來計算 partition 的值。分區(qū)器的作用就是為消息分配分區(qū)。
Kafka 中提供的默認分區(qū)器是 org.apache.kafka.clients.producer.internals.DefaultPartitioner,它實現(xiàn)了 org.apache.kafka.clients.producer.Partitioner 接口,這個接口中定義了2個方法,具體如下所示。
Copypublic int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster);
public void close();
其中 partition() 方法用來計算分區(qū)號,返回值為 int 類型。partition() 方法中的參數(shù)分別表示主題、鍵、序列化后的鍵、值、序列化后的值,以及集群的元數(shù)據(jù)信息,通過這些信息可以實現(xiàn)功能豐富的分區(qū)器。close() 方法在關(guān)閉分區(qū)器的時候用來回收一些資源。
在默認分區(qū)器 DefaultPartitioner 的實現(xiàn)中,close() 是空方法,而在 partition() 方法中定義了主要的分區(qū)分配邏輯。如果 key 不為 null,那么默認的分區(qū)器會對 key 進行哈希,最終根據(jù)得到的哈希值來計算分區(qū)號,擁有相同 key 的消息會被寫入同一個分區(qū)。如果 key 為 null,那么消息將會以輪詢的方式發(fā)往主題內(nèi)的各個可用分區(qū)。
自定義的分區(qū)器,只需同 DefaultPartitioner 一樣實現(xiàn) Partitioner 接口即可。由于每個分區(qū)下的消息處理都是有順序的,我們可以利用自定義分區(qū)器實現(xiàn)在某一系列的key都發(fā)送到一個分區(qū)中,從而實現(xiàn)有序消費。
Broker
Broker處理請求流程

在Kafka的架構(gòu)中,會有很多客戶端向Broker端發(fā)送請求,Kafka 的 Broker 端有個 SocketServer 組件,用來和客戶端建立連接,然后通過Acceptor線程來進行請求的分發(fā),由于Acceptor不涉及具體的邏輯處理,非常得輕量級,因此有很高的吞吐量。
接著Acceptor 線程采用輪詢的方式將入站請求公平地發(fā)到所有網(wǎng)絡(luò)線程中,網(wǎng)絡(luò)線程池默認大小是 3個,表示每臺 Broker 啟動時會創(chuàng)建 3 個網(wǎng)絡(luò)線程,專門處理客戶端發(fā)送的請求,可以通過Broker 端參數(shù) num.network.threads來進行修改。
那么接下來處理網(wǎng)絡(luò)線程處理流程如下:
當(dāng)網(wǎng)絡(luò)線程拿到請求后,會將請求放入到一個共享請求隊列中。Broker 端還有個 IO 線程池,負責(zé)從該隊列中取出請求,執(zhí)行真正的處理。如果是 PRODUCE 生產(chǎn)請求,則將消息寫入到底層的磁盤日志中;如果是 FETCH 請求,則從磁盤或頁緩存中讀取消息。
IO 線程池處中的線程是執(zhí)行請求邏輯的線程,默認是8,表示每臺 Broker 啟動后自動創(chuàng)建 8 個 IO 線程處理請求,可以通過Broker 端參數(shù) num.io.threads調(diào)整。
Purgatory組件是用來緩存延時請求(Delayed Request)的。比如設(shè)置了 acks=all 的 PRODUCE 請求,一旦設(shè)置了 acks=all,那么該請求就必須等待 ISR 中所有副本都接收了消息后才能返回,此時處理該請求的 IO 線程就必須等待其他 Broker 的寫入結(jié)果。
控制器
在 Kafka 集群中會有一個或多個 broker,其中有一個 broker 會被選舉為控制器(Kafka Controller),它負責(zé)管理整個集群中所有分區(qū)和副本的狀態(tài)。
控制器是如何被選出來的?
Broker 在啟動時,會嘗試去 ZooKeeper 中創(chuàng)建 /controller 節(jié)點。Kafka 當(dāng)前選舉控制器的規(guī)則是:第一個成功創(chuàng)建 /controller 節(jié)點的 Broker 會被指定為控制器。
在ZooKeeper中的 /controller_epoch 節(jié)點中存放的是一個整型的 controller_epoch 值。controller_epoch 用于記錄控制器發(fā)生變更的次數(shù),即記錄當(dāng)前的控制器是第幾代控制器,我們也可以稱之為“控制器的紀(jì)元”。
controller_epoch 的初始值為1,即集群中第一個控制器的紀(jì)元為1,當(dāng)控制器發(fā)生變更時,每選出一個新的控制器就將該字段值加1。Kafka 通過 controller_epoch 來保證控制器的唯一性,進而保證相關(guān)操作的一致性。
每個和控制器交互的請求都會攜帶 controller_epoch 這個字段,如果請求的 controller_epoch 值小于內(nèi)存中的 controller_epoch 值,則認為這個請求是向已經(jīng)過期的控制器所發(fā)送的請求,那么這個請求會被認定為無效的請求。
如果請求的 controller_epoch 值大于內(nèi)存中的 controller_epoch 值,那么說明已經(jīng)有新的控制器當(dāng)選了。
控制器是做什么的?
主題管理(創(chuàng)建、刪除、增加分區(qū))
分區(qū)重分配
Preferred 領(lǐng)導(dǎo)者選舉
Preferred 領(lǐng)導(dǎo)者選舉主要是 Kafka 為了避免部分 Broker 負載過重而提供的一種換 Leader 的方案。集群成員管理(新增 Broker、Broker 主動關(guān)閉、Broker 宕機)
控制器組件會利用 Watch 機制檢查 ZooKeeper 的 /brokers/ids 節(jié)點下的子節(jié)點數(shù)量變更。目前,當(dāng)有新 Broker 啟動后,它會在 /brokers 下創(chuàng)建專屬的 znode 節(jié)點。一旦創(chuàng)建完畢,ZooKeeper 會通過 Watch 機制將消息通知推送給控制器,這樣,控制器就能自動地感知到這個變化,進而開啟后續(xù)的新增 Broker 作業(yè)。數(shù)據(jù)服務(wù)
控制器上保存了最全的集群元數(shù)據(jù)信息。
控制器宕機了怎么辦?
當(dāng)運行中的控制器突然宕機或意外終止時,Kafka 能夠快速地感知到,并立即啟用備用控制器來代替之前失敗的控制器。這個過程就被稱為 Failover,該過程是自動完成的,無需你手動干預(yù)。

消費者
消費組
在Kafka中,每個消費者都有一個對應(yīng)的消費組。當(dāng)消息發(fā)布到主題后,只會被投遞給訂閱它的每個消費組中的一個消費者。每個消費者只能消費所分配到的分區(qū)中的消息。而每一個分區(qū)只能被一個消費組中的一個消費者所消費。

如上圖所示,我們可以設(shè)置兩個消費者組來實現(xiàn)廣播消息的作用,消費組A和組B都可以接受到生產(chǎn)者發(fā)送過來的消息。
消費者與消費組這種模型可以讓整體的消費能力具備橫向伸縮性,我們可以增加(或減少)消費者的個數(shù)來提高(或降低)整體的消費能力。對于分區(qū)數(shù)固定的情況,一味地增加消費者并不會讓消費能力一直得到提升,如果消費者過多,出現(xiàn)了消費者的個數(shù)大于分區(qū)個數(shù)的情況,就會有消費者分配不到任何分區(qū)。
如下:一共有8個消費者,7個分區(qū),那么最后的消費者C7由于分配不到任何分區(qū)而無法消費任何消息。
消費端分區(qū)分配策略
Kafka 提供了消費者客戶端參數(shù) partition.assignment.strategy 來設(shè)置消費者與訂閱主題之間的分區(qū)分配策略。
RangeAssignor分配策略
默認情況下,采用 RangeAssignor 分配策略。
RangeAssignor 分配策略的原理是按照消費者總數(shù)和分區(qū)總數(shù)進行整除運算來獲得一個跨度,然后將分區(qū)按照跨度進行平均分配,以保證分區(qū)盡可能均勻地分配給所有的消費者。對于每一個主題,RangeAssignor 策略會將消費組內(nèi)所有訂閱這個主題的消費者按照名稱的字典序排序,然后為每個消費者劃分固定的分區(qū)范圍,如果不夠平均分配,那么字典序靠前的消費者會被多分配一個分區(qū)。
假設(shè)消費組內(nèi)有2個消費者 C0 和 C1,都訂閱了主題 t0 和 t1,并且每個主題都有4個分區(qū),那么訂閱的所有分區(qū)可以標(biāo)識為:t0p0、t0p1、t0p2、t0p3、t1p0、t1p1、t1p2、t1p3。最終的分配結(jié)果為:
Copy消費者C0:t0p0、t0p1、t1p0、t1p1
消費者C1:t0p2、t0p3、t1p2、t1p3
假設(shè)上面例子中2個主題都只有3個分區(qū),那么訂閱的所有分區(qū)可以標(biāo)識為:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最終的分配結(jié)果為:
Copy消費者C0:t0p0、t0p1、t1p0、t1p1
消費者C1:t0p2、t1p2
可以明顯地看到這樣的分配并不均勻。
RoundRobinAssignor分配策略
RoundRobinAssignor 分配策略的原理是將消費組內(nèi)所有消費者及消費者訂閱的所有主題的分區(qū)按照字典序排序,然后通過輪詢方式逐個將分區(qū)依次分配給每個消費者。
如果同一個消費組內(nèi)所有的消費者的訂閱信息都是相同的,那么 RoundRobinAssignor 分配策略的分區(qū)分配會是均勻的。
如果同一個消費組內(nèi)的消費者訂閱的信息是不相同的,那么在執(zhí)行分區(qū)分配的時候就不是完全的輪詢分配,有可能導(dǎo)致分區(qū)分配得不均勻。
假設(shè)消費組內(nèi)有3個消費者(C0、C1 和 C2),t0、t0、t1、t2主題分別有1、2、3個分區(qū),即整個消費組訂閱了 t0p0、t1p0、t1p1、t2p0、t2p1、t2p2 這6個分區(qū)。
具體而言,消費者 C0 訂閱的是主題 t0,消費者 C1 訂閱的是主題 t0 和 t1,消費者 C2 訂閱的是主題 t0、t1 和 t2,那么最終的分配結(jié)果為:
Copy消費者C0:t0p0
消費者C1:t1p0
消費者C2:t1p1、t2p0、t2p1、t2p2
可以看 到 RoundRobinAssignor 策略也不是十分完美,這樣分配其實并不是最優(yōu)解,因為完全可以將分區(qū) t1p1 分配給消費者 C1。
StickyAssignor分配策略
這種分配策略,它主要有兩個目的:
分區(qū)的分配要盡可能均勻。
分區(qū)的分配盡可能與上次分配的保持相同。
假設(shè)消費組內(nèi)有3個消費者(C0、C1 和 C2),它們都訂閱了4個主題(t0、t1、t2、t3),并且每個主題有2個分區(qū)。也就是說,整個消費組訂閱了 t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1 這8個分區(qū)。最終的分配結(jié)果如下:
Copy消費者C0:t0p0、t1p1、t3p0
消費者C1:t0p1、t2p0、t3p1
消費者C2:t1p0、t2p1
再假設(shè)此時消費者 C1 脫離了消費組,那么分配結(jié)果為:
Copy消費者C0:t0p0、t1p1、t3p0、t2p0
消費者C2:t1p0、t2p1、t0p1、t3p1
StickyAssignor 分配策略如同其名稱中的“sticky”一樣,讓分配策略具備一定的“黏性”,盡可能地讓前后兩次分配相同,進而減少系統(tǒng)資源的損耗及其他異常情況的發(fā)生。
再均衡(Rebalance)
再均衡是指分區(qū)的所屬權(quán)從一個消費者轉(zhuǎn)移到另一消費者的行為,它為消費組具備高可用性和伸縮性提供保障,使我們可以既方便又安全地刪除消費組內(nèi)的消費者或往消費組內(nèi)添加消費者。
弊端:
在再均衡發(fā)生期間,消費組內(nèi)的消費者是無法讀取消息的。
Rebalance 很慢。如果一個消費者組里面有幾百個 Consumer 實例,Rebalance 一次要幾個小時。
在進行再均衡的時候消費者當(dāng)前的狀態(tài)也會丟失。比如消費者消費完某個分區(qū)中的一部分消息時還沒有來得及提交消費位移就發(fā)生了再均衡操作,之后這個分區(qū)又被分配給了消費組內(nèi)的另一個消費者,原來被消費完的那部分消息又被重新消費一遍,也就是發(fā)生了重復(fù)消費。
Rebalance 發(fā)生的時機有三個:
組成員數(shù)量發(fā)生變化
訂閱主題數(shù)量發(fā)生變化
訂閱主題的分區(qū)數(shù)發(fā)生變化
后兩類通常是業(yè)務(wù)的變動調(diào)整所導(dǎo)致的,我們一般不可控制,我們主要說說因為組成員數(shù)量變化而引發(fā)的 Rebalance 該如何避免。
當(dāng) Consumer Group 完成 Rebalance 之后,每個 Consumer 實例都會定期地向 Coordinator 發(fā)送心跳請求,表明它還存活著。如果某個 Consumer 實例不能及時地發(fā)送這些心跳請求,Coordinator 就會認為該 Consumer 已經(jīng)“死”了,從而將其從 Group 中移除,然后開啟新一輪 Rebalance。
Consumer端可以設(shè)置session.timeout.ms,默認是10s,表示如果 Coordinator 在 10 秒之內(nèi)沒有收到 Group 下某 Consumer 實例的心跳,它就會認為這個 Consumer 實例已經(jīng)掛了。
Consumer端還可以設(shè)置heartbeat.interval.ms,表示發(fā)送心跳請求的頻率。
以及max.poll.interval.ms?參數(shù),它限定了 Consumer 端應(yīng)用程序兩次調(diào)用 poll 方法的最大時間間隔。它的默認值是 5 分鐘,表示你的 Consumer 程序如果在 5 分鐘之內(nèi)無法消費完 poll 方法返回的消息,那么 Consumer 會主動發(fā)起“離開組”的請求,Coordinator 也會開啟新一輪 Rebalance。
所以知道了上面幾個參數(shù)后,我們就可以避免以下兩個問題:
非必要 Rebalance 是因為未能及時發(fā)送心跳,導(dǎo)致 Consumer 被“踢出”Group 而引發(fā)的。
所以我們在生產(chǎn)環(huán)境中可以這么設(shè)置:設(shè)置 session.timeout.ms = 6s。
設(shè)置 heartbeat.interval.ms = 2s。
必要 Rebalance 是 Consumer 消費時間過長導(dǎo)致的。如何消費任務(wù)時間達到8分鐘,而max.poll.interval.ms設(shè)置為5分鐘,那么也會發(fā)生Rebalance,所以如果有比較重的任務(wù)的話,可以適當(dāng)調(diào)整這個參數(shù)。
Consumer 端的頻繁的 Full GC導(dǎo)致的長時間停頓,從而引發(fā)了 Rebalance。
消費者組再平衡全流程
重平衡過程是靠消費者端的心跳線程(Heartbeat Thread),通知到其他消費者實例的。
當(dāng)協(xié)調(diào)者決定開啟新一輪重平衡后,它會將“REBALANCE_IN_PROGRESS”封裝進心跳請求的響應(yīng)中,發(fā)還給消費者實例。當(dāng)消費者實例發(fā)現(xiàn)心跳響應(yīng)中包含了“REBALANCE_IN_PROGRESS”,就能立馬知道重平衡又開始了,這就是重平衡的通知機制。
所以,實際上heartbeat.interval.ms不止是設(shè)置了心跳的間隔時間,還可以控制重平衡通知的頻率。
消費者組狀態(tài)機
重平衡一旦開啟,Broker 端的協(xié)調(diào)者組件就要完成整個重平衡流程,Kafka 設(shè)計了一套消費者組狀態(tài)機(State Machine)來實現(xiàn)。
Kafka 為消費者組定義了 5 種狀態(tài),它們分別是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。

狀態(tài)機的各個狀態(tài)流轉(zhuǎn):
當(dāng)有新成員加入或已有成員退出時,消費者組的狀態(tài)從 Stable 直接跳到 PreparingRebalance 狀態(tài),此時,所有現(xiàn)存成員就必須重新申請加入組。當(dāng)所有成員都退出組后,消費者組狀態(tài)變更為 Empty。Kafka 定期自動刪除過期位移的條件就是,組要處于 Empty 狀態(tài)。因此,如果你的消費者組停掉了很長時間(超過 7 天),那么 Kafka 很可能就把該組的位移數(shù)據(jù)刪除了。
組協(xié)調(diào)器(GroupCoordinator)
GroupCoordinator 是 Kafka 服務(wù)端中用于管理消費組的組件。協(xié)調(diào)器最重要的職責(zé)就是負責(zé)執(zhí)行消費者再均衡的操作。
消費者端重平衡流程
在消費者端,重平衡分為兩個步驟:分別是加入組和等待領(lǐng)導(dǎo)者消費者(Leader Consumer)分配方案。即JoinGroup 請求和 SyncGroup 請求。
加入組
當(dāng)組內(nèi)成員加入組時,它會向協(xié)調(diào)器發(fā)送 JoinGroup 請求。在該請求中,每個成員都要將自己訂閱的主題上報,這樣協(xié)調(diào)器就能收集到所有成員的訂閱信息。選擇消費組領(lǐng)導(dǎo)者
一旦收集了全部成員的 JoinGroup 請求后,協(xié)調(diào)者會從這些成員中選擇一個擔(dān)任這個消費者組的領(lǐng)導(dǎo)者。
這里的領(lǐng)導(dǎo)者是具體的消費者實例,它既不是副本,也不是協(xié)調(diào)器。領(lǐng)導(dǎo)者消費者的任務(wù)是收集所有成員的訂閱信息,然后根據(jù)這些信息,制定具體的分區(qū)消費分配方案。選舉分區(qū)分配策略
這個分區(qū)分配的選舉是根據(jù)消費組內(nèi)的各個消費者投票來決定的。
協(xié)調(diào)器會收集各個消費者支持的所有分配策略,組成候選集 candidates。每個消費者從候選集 candidates 中找出第一個自身支持的策略,為這個策略投上一票。計算候選集中各個策略的選票數(shù),選票數(shù)最多的策略即為當(dāng)前消費組的分配策略。
如果有消費者并不支持選出的分配策略,那么就會報出異常 IllegalArgumentException:Member does not support protocol。


發(fā)送 SyncGroup 請求
協(xié)調(diào)器會把消費者組訂閱信息封裝進 JoinGroup 請求的響應(yīng)體中,然后發(fā)給領(lǐng)導(dǎo)者,由領(lǐng)導(dǎo)者統(tǒng)一做出分配方案,然后領(lǐng)導(dǎo)者發(fā)送 SyncGroup 請求給協(xié)調(diào)器。
響應(yīng)SyncGroup
組內(nèi)所有的消費者都會發(fā)送一個 SyncGroup 請求,只不過不是領(lǐng)導(dǎo)者的請求內(nèi)容為空,然后就會接收到一個SyncGroup響應(yīng),接受訂閱信息。
文章不錯?點個【在看】吧!??



