聊聊 Kafka:協(xié)調(diào)者 GroupCoordinator 源碼剖析之 FIND_COORDINATOR
我們上一篇講的是 聊聊 Kafka:協(xié)調(diào)者 GroupCoordinator 源碼剖析之實(shí)例化與啟動(dòng) GroupCoordinator,這一篇我們來講講聊聊 Kafka:協(xié)調(diào)者 GroupCoordinator 源碼剖析之 FIND_COORDINATOR。
四、ApiKeys.FIND_COORDINATOR
我們前面的文章說過,與消費(fèi)組相關(guān)的兩個(gè)組件,一個(gè)是消費(fèi)者客戶端的 ConsumerCoordinator,一個(gè)是 Kafka Broker 服務(wù)端的 GroupCoordinator。ConsumerCoordinator 負(fù)責(zé)與 GroupCoordinator 通信,Broker 啟動(dòng)的時(shí)候,都會(huì)啟動(dòng)一個(gè) GroupCoordinator 實(shí)例,而一個(gè)集群中,會(huì)有多個(gè) Broker,那么如何確定一個(gè)新的 Consumer 加入 Consumer Group 后,到底和哪個(gè) Broker 上的 GroupCoordinator 進(jìn)行交互呢?
這個(gè)問題就就交給服務(wù)端的 ApiKeys.FIND_COORDINATOR 命令來處理。
4.1 客戶端源碼分析

coordinator 即獲取到的 group 節(jié)點(diǎn)對(duì)象,
client.isUnavailable(coordinator) 是在與 group 建立連接,每次判斷 coordinator 不為空且 client 與 group 連接失敗,則將 coordinator 置空,為什么會(huì)這樣呢?很有可能是請(qǐng)求到 group 的信息之后發(fā)現(xiàn)該節(jié)點(diǎn)已下線或者不可用,此時(shí)服務(wù)端很有可能也在進(jìn)行選舉,所以我們需要將 coordinator 清空,待服務(wù)端選舉完成后再次通信。
如果通信一次發(fā)現(xiàn)該 GroupCoordinator 的信息還未獲取到則繼續(xù)重試,直到超時(shí),這里的超時(shí)時(shí)間即為 poll 時(shí)傳入的超時(shí)時(shí)間,這個(gè)時(shí)間設(shè)置貫穿了整個(gè) consume 的運(yùn)行代碼。

我們來看看是如何尋找負(fù)載最小節(jié)點(diǎn)的,首先就是取隨機(jī)數(shù),防止每次都從第一個(gè)節(jié)點(diǎn)連接,如果判斷沒有在途的 request 則直接返回該節(jié)點(diǎn),否則取在途 request 最小的節(jié)點(diǎn),如果該節(jié)點(diǎn)不存在,則依次取連接的節(jié)點(diǎn)、需要重試的節(jié)點(diǎn),如果找到不為 null 的節(jié)點(diǎn)則返回該節(jié)點(diǎn),否則返回 null。
4.2 FindCoordinatorRequest 請(qǐng)求報(bào)文




key_type 有兩種枚舉,一種是 GROUP,另一種是 TRANSACTION,如果 type 為 GROUP 的話那 key 就是 groupId,反之是 transactionId。
4.3 服務(wù)端源碼分析
直接看到 FIND_COORDINATOR 命令調(diào)用的方法 kafka.server.KafkaApis#handleFindCoordinatorRequest

kafka.coordinator.group.GroupMetadataManager#partitionFordef partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount// 記錄 offsets topic 的分區(qū)數(shù)量,這個(gè)字段會(huì)調(diào)用 getGroupMetadataTopicPartitionCount() 進(jìn)行初始化,默認(rèn) 50。private val groupMetadataTopicPartitionCount = getGroupMetadataTopicPartitionCountprivate def getGroupMetadataTopicPartitionCount: Int = {zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicNumPartitions)}val DefaultOffsetsTopicNumPartitions = 50
__consumer_offsets 這個(gè)內(nèi)部 Topic,專門用來存儲(chǔ) Consumer Group 消費(fèi)的情況,內(nèi)置 Topic 初始化時(shí)由 offsets.topic.num.partitions 參數(shù)來決定分區(qū)數(shù),默認(rèn)值是 50。相同 Consumer Group 的 offset 最終會(huì)保存在其中一個(gè)分區(qū)中,而保存在哪個(gè)分區(qū)就由上面這段代碼來決定,可以看到邏輯很簡(jiǎn)單,就是取 groupId 的 hashCode,然后對(duì)總的分區(qū)數(shù)取模。
舉個(gè)例子,假設(shè)一個(gè) GroupId 計(jì)算出來的 hashCode 是 8,之后取模 50 得到 8。那么 partition-8 的 leader 所在的 broker 就是我們要找的那個(gè)節(jié)點(diǎn)。這個(gè) Consumer Group 后面都會(huì)直接在 partition-8 分區(qū)保存位點(diǎn)。
kafka.server.KafkaApis#getOrCreateInternalTopic

首先從當(dāng)前 node 的元數(shù)據(jù)緩存中拿到對(duì)應(yīng) topic 的數(shù)據(jù),如果沒有,則創(chuàng)建。
注意:kafka 創(chuàng)建 topic 是需要時(shí)間的,而這里的實(shí)現(xiàn)方式是往 zk 中寫入數(shù)據(jù)觸發(fā)創(chuàng)建 topic 流程,是一種異步方式,往 zk 中寫入數(shù)據(jù)之后會(huì)返回一個(gè) error,LEADER_NOT_AVAILABLE,待創(chuàng)建 topic 的流程走完,并同步各個(gè)節(jié)點(diǎn) metaData 之后,最后從 metaData 中取到該節(jié)點(diǎn)信息 findCoordinatorRequest 才會(huì)成功返回。
4.4 FindCoordinatorResponse 響應(yīng)報(bào)文


4.5 小結(jié)
總體分析下來,主要流程如下圖所示:

尋找最小負(fù)載節(jié)點(diǎn)信息
向最小負(fù)載節(jié)點(diǎn)發(fā)送 FindCoordinatorRequest
最小負(fù)載節(jié)點(diǎn)處理該請(qǐng)求
首先找到該 groupId 對(duì)應(yīng)的分區(qū)
通過內(nèi)存中緩存的 metaData 獲取該分區(qū)的信息,如果不存在則創(chuàng)建 topic。
返回查找到的分區(qū) leader 信息
最小負(fù)載節(jié)點(diǎn)向 client 響應(yīng) FindCoordinatorResponse
歡迎大家關(guān)注我的公眾號(hào)【老周聊架構(gòu)】,Java后端主流技術(shù)棧的原理、源碼分析、架構(gòu)以及各種互聯(lián)網(wǎng)高并發(fā)、高性能、高可用的解決方案。
