圖解 Kafka 生產(chǎn)者元數(shù)據(jù)拉取管理全流程
閱讀本文大約需要 60 分鐘。
大家好,我是 華仔, 又跟大家見面了。
在上一篇中,正式開啟了「Kafka的源碼之旅」,主要講述了 KafkaProducer 初始化時用到的核心組件以及消息發(fā)送的核心流程,帶你梳理生產(chǎn)者初始化整體的源碼分析脈絡(luò),并通過「場景驅(qū)動」的方式帶大家一點(diǎn)點(diǎn)的對 Kafka 源碼進(jìn)行深度剖析,一起掌握 Kafka 源碼核心架構(gòu)設(shè)計思想。
今天這篇我們就來聊聊生產(chǎn)者是會如何拉取和管理元數(shù)據(jù)的,帶你梳理生產(chǎn)者元數(shù)據(jù)管理整體的源碼分析脈絡(luò)。
認(rèn)真讀完這篇文章,我相信你會對 Kafka 生產(chǎn)獲取和管理元數(shù)據(jù)源碼有更加深刻的理解。
這篇文章干貨很多,希望你可以耐心讀完。

01 總的概述
消息想從 Producer 端發(fā)送到 Broker 端,必須要先知道 Topic 在 Broker 的分布情況,才能判斷消息該發(fā)往哪些節(jié)點(diǎn),比如:「Topic 對應(yīng)的 Leader 分區(qū)有哪些」、「Leader分區(qū)分布在哪些 Broker 節(jié)點(diǎn)」、「Topic 分區(qū)動態(tài)變更」等等,所以及時獲取元數(shù)據(jù)對生產(chǎn)者正常工作是非常有必要的。
元數(shù)據(jù)獲取涉及的底層組件比較多,主要分為:「KafkaProducer ?主線程加載元數(shù)據(jù)」、「Sender 子線程拉取元數(shù)據(jù)」。
接下來我們逐一分析元數(shù)據(jù)在生產(chǎn)者端是如何被加載和拉取、更新的。為了方便大家理解,所有的源碼只保留骨干。
02 主線程如何加載元數(shù)據(jù)
首先我們來看下 KafkaProducer 主線程是如何加載元數(shù)據(jù)的。
在上一篇中《圖解Kafka生產(chǎn)者初始化核心流程》我們分析知道集群元數(shù)據(jù)的初始化是在 KafkaProducer 主線程的構(gòu)造函數(shù)中來完成的,我們來看一下相關(guān)源碼:
//?初始化?Kafka?集群元數(shù)據(jù),元數(shù)據(jù)會保存到客戶端中,并與服務(wù)端元數(shù)據(jù)保持一致
if?(metadata?!=?null)?{
????this.metadata?=?metadata;
}?else?{
????//?初始化集群元數(shù)據(jù)
????this.metadata?=?new?ProducerMetadata(retryBackoffMs,
??????????//?元數(shù)據(jù)過期時間:默認(rèn)5分鐘
??????????config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
??????????//?topic最大空閑時間,如果在規(guī)定時間沒有被訪問,將從緩存刪除,下次訪問時強(qiáng)制獲取元數(shù)據(jù)
??????????config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
??????????logContext,
??????????clusterResourceListeners,
??????????Time.SYSTEM);
????//?啟動metadata的引導(dǎo)程序????
????this.metadata.bootstrap(addresses);
}
從上述源代碼我們可以看出在 KafkaProducer 的構(gòu)造方法中,如果metadata為空就會初始化集群元數(shù)據(jù)類「ProducerMetadata」,然后通過調(diào)用 「this.metadata.bootstrap」這個方法來啟動引導(dǎo)程序,這時 metaData 對象里并沒有具體的元數(shù)據(jù)信息,因?yàn)榭蛻舳诉€沒發(fā)送元數(shù)據(jù)更新的請求,后面會通過喚醒 Sender 線程進(jìn)行發(fā)送請求獲取元數(shù)據(jù)的。
這里的 this.meta 其實(shí)就是 Kafka 內(nèi)存中的一個對象,底層會做一層緩存,因此并不會一直請求 Kafka Broker 端進(jìn)行獲取。
我先給大家總結(jié)下元數(shù)據(jù)初始化及啟動的調(diào)用關(guān)系圖,口說無憑,我們來扒開源碼瞅一瞅,這樣更真實(shí)。

通過上述的調(diào)用關(guān)系圖,我們可以看出:
ProducerMetadata 類是 MetaData 的子類。 Metadata 類是元數(shù)據(jù)基類,封裝了元數(shù)據(jù)的具體信息、版本控制、更新標(biāo)識、響應(yīng)解析等。 元數(shù)據(jù)的信息其實(shí)最終是保存在元數(shù)據(jù)緩存即 MetadataCache 中,而 ? 它最核心的是 Cluster , 保存了元數(shù)據(jù)基礎(chǔ)信息。
接下來會挨個類展開來進(jìn)行講解。
02.1 初探 ProducerMetadata
在主線程中初始化了 ProducerMetadata 類的對象,我們先來看看這個類都做了哪些事情。
github 源碼地址如下:
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java
?public?class?ProducerMetadata?extends?Metadata?{
????//?主題元數(shù)據(jù)過期時間,如果在這個時間段內(nèi)未被訪問,它就會從緩存中刪除。
????private?final?long?metadataIdleMs;
????/*?Topics?with?expiry?time?*/
????//?map集合,生產(chǎn)者的元數(shù)據(jù)主題集合,里面保存著
????//?主題和主題過期時間的對應(yīng)關(guān)系,即?topic,?nowMs?+?metadataIdleMs,
????//?過期了的主題會被踢出去。
????private?final?Map?topics?=?new?HashMap<>();
????//?新的主題集合,?set集合,即第一次要發(fā)送的主題
????private?final?Set?newTopics?=?new?HashSet<>();
????private?final?Logger?log;
????private?final?Time?time;
????public?ProducerMetadata(long?refreshBackoffMs,
????????????????????????????long?metadataExpireMs,
????????????????????????????long?metadataIdleMs,
????????????????????????????LogContext?logContext,
????????????????????????????ClusterResourceListeners?clusterResourceListeners,
????????????????????????????Time?time)?{
????????//調(diào)用父類?Metadata?的構(gòu)造函數(shù)
????????super(refreshBackoffMs,?metadataExpireMs,?logContext,?clusterResourceListeners);
????????....
????}
}
我們可以看出這里只是調(diào)用了父類的構(gòu)造函數(shù)進(jìn)行類屬性的初始化,接下來我們深度分析下 ProducerMetadata 類中的幾個比較重要的方法。
02.1.1 add()
?public?synchronized?void?add(String?topic,?long?nowMs)?{
????????//?判斷對象是否為空
????????Objects.requireNonNull(topic,?"topic?cannot?be?null");
????????if?(topics.put(topic,?nowMs?+?metadataIdleMs)?==?null)?{
????????????//?添加主題到新主題集合中
????????????newTopics.add(topic);
????????????//?更新元數(shù)據(jù)標(biāo)記?屬于Metadata類方法,后面小節(jié)分析
????????????requestUpdateForNewTopics();
????????}
?}
該方法主要用來向元數(shù)據(jù)主題集合 topics 中添加主題,主要用在「KafkaProducer 主線程」以及「Sender 子線程」中,我們來看下是如何添加的,具體邏輯如下:
往元數(shù)據(jù)主題集合 topics 中添加主題和對應(yīng)的過期時間(當(dāng)時時間+過期時間段「默認(rèn)值:5分鐘」)。 如果元數(shù)據(jù)主題集合中不存在該主題時,說明是第一次就把該主題添加到新主題集合中。 標(biāo)記要更新新主題元數(shù)據(jù)的屬性字段「lastRefreshMs」 、「needPartialUpdate」 、「requestVersion」,以便后續(xù)喚醒 Sender 線程去服務(wù)端拉取新主題的元數(shù)據(jù)。
此時主題被添加到元數(shù)據(jù)主題主題集合中,但是如果集合里面數(shù)據(jù)過期了該怎么辦?接下來我們看另外一個方法是如何判斷的。
02.1.2 retainTopic()
??public?synchronized?boolean?retainTopic(String?topic,?boolean?isInternal,?long?nowMs)?{
????????//?獲取該主題的過期時間
????????Long?expireMs?=?topics.get(topic);
????????//?如果為空表示該主題不在元數(shù)據(jù)主題集合中
????????if?(expireMs?==?null)?{
????????????return?false;
????????//?判斷該主題是否在新集合中
????????}?else?if?(newTopics.contains(topic))?{
????????????return?true;
????????//?判斷是否超過了過期時間
????????}?else?if?(expireMs?<=?nowMs)?{
????????????log.debug("Removing?unused?topic?{}?from?the?metadata?list,?expiryMs?{}?now?{}",?topic,?expireMs,?nowMs);
????????????//?超過后直接從元數(shù)據(jù)主題集合中刪除該主題
????????????topics.remove(topic);
????????????return?false;
????????}?else?{
????????????return?true;
????????}
??}
該方法用來判斷元數(shù)據(jù)中是否該保留該主題,會在 handleMetadataResponse 即處理元數(shù)據(jù)響應(yīng)結(jié)果的時候進(jìn)行調(diào)用,我們來看下它是如何判斷的。
先判斷元數(shù)據(jù)主題集合中是否存在該主題,如果不存在直接返回false。 然后判斷該主題是否在新主題集合中,如果存在直接返回true。 再判斷該主題是否超過了過期時間,如果超過了,就從元數(shù)據(jù)主題集合中刪除該主題,再請求元數(shù)據(jù)的時候就不用帶上該主題,可以有效的減少網(wǎng)絡(luò)傳輸數(shù)據(jù)大小。
02.1.3 requestUpdateForTopic()
??public?synchronized?int?requestUpdateForTopic(String?topic)???{
?????//?如果新主題集合中存在該主題
?????if?(newTopics.contains(topic))?{
????????//?針對新主題集合標(biāo)記部分更新,并返回版本
????????return?requestUpdateForNewTopics();
?????}?else?{
????????//?全量更新,并返回版本
????????return?requestUpdate();
?????}
??}
該方法用來判斷是全量更新元數(shù)據(jù)還是部分更新元數(shù)據(jù),邏輯相對比較簡單,主要用在 KafkaProducer 主線程元數(shù)據(jù)同步等待時調(diào)用,后續(xù)小節(jié)會詳細(xì)分析。
02.1.4 update()
??public?synchronized?void?update(int?requestVersion,?MetadataResponse?response,?boolean?isPartialUpdate,?long?nowMs)?{
????????//?調(diào)用父類的update方法
????????super.update(requestVersion,?response,?isPartialUpdate,?nowMs);
????????//?如果新主題集合不為空,則遍歷響應(yīng)元數(shù)據(jù)找出已經(jīng)獲取元數(shù)據(jù)的主題,并從新主題集合中刪除
????????if?(!newTopics.isEmpty())?{
????????????for?(MetadataResponse.TopicMetadata?metadata?:?response.topicMetadata())?{
????????????????newTopics.remove(metadata.topic());
????????????}
????????}
????????//?喚醒等待元數(shù)據(jù)更新完成的線程
????????notifyAll();
?}
該方法用來更新生產(chǎn)端元數(shù)據(jù)的,具體邏輯如下:
先調(diào)用父類的 update() 方法。 然后判斷新主題集合是否不為空,如果不為空則遍歷響應(yīng)元數(shù)據(jù)找出已經(jīng)獲取元數(shù)據(jù)的主題,并從新主題集合中刪除。 最后調(diào)用 notifyAll() 來喚醒等待元數(shù)據(jù)更新完成的線程。
從上述 update()方法 中可以看出最后調(diào)用 notifyAll() 來喚醒阻塞的線程, 那么它是如何喚醒的呢,這就是接下來要分析的方法。
02.1.5 awaitUpdate()
??public?synchronized?void?awaitUpdate(final?int?lastVersion,?final?long?timeoutMs)?throws?InterruptedException?{
????????long?currentTimeMs?=?time.milliseconds();
????????long?deadlineMs?=?currentTimeMs?+?timeoutMs?0???Long.MAX_VALUE?:?currentTimeMs?+?timeoutMs;
????????//?通過調(diào)用?time.waitObject?來實(shí)現(xiàn)線程阻塞
????????time.waitObject(this,?()?->?{
????????????//?Throw?fatal?exceptions,?if?there?are?any.?Recoverable?topic?errors?will?be?handled?by?the?caller.
????????????maybeThrowFatalException();
????????????return?updateVersion()?>?lastVersion?||?isClosed();
????????},?deadlineMs);
????????if?(isClosed())
????????????throw?new?KafkaException("Requested?metadata?update?after?close");
??}
該方法用來實(shí)現(xiàn)線程阻塞的功能,用在主線程中如果發(fā)現(xiàn)主題對應(yīng)的元數(shù)據(jù)不存在時,阻塞并等待 Sender 線程將元數(shù)據(jù)更新完成。
重點(diǎn)是調(diào)用了 time.waitObject() 方法來實(shí)現(xiàn)阻塞功能,它的實(shí)現(xiàn)還是有一些技巧的,它的底層通過調(diào)用 SystemTime 包里面的 waitObject() 實(shí)現(xiàn)的,源碼如下:
??public?void?waitObject(Object?obj,?Supplier?condition,?long?deadlineMs) ?throws?InterruptedException?{
????????synchronized?(obj)?{
????????????while?(true)?{
????????????????//?判斷條件是否滿足即元數(shù)據(jù)是否更新成功,成功直接返回
????????????????if?(condition.get())
????????????????????return;
????????????????long?currentTimeMs?=?milliseconds();
????????????????//?超時拋出異常
????????????????if?(currentTimeMs?>=?deadlineMs)
????????????????????throw?new?TimeoutException("Condition?not?satisfied?before?deadline");
????????????????//?調(diào)用?wait?阻塞線程?
????????????????obj.wait(deadlineMs?-?currentTimeMs);
????????????}
????????}
?}
通過一個循環(huán)來判斷條件是否滿足,即元數(shù)據(jù)是否更新成功了,如果成功則跳出循環(huán),釋放鎖。 獲取當(dāng)前時間,判斷是否超時,如果超時后會拋出超時的異常。 如果未超時就調(diào)用 wait 方法來阻塞線程,直到滿足過期時間的條件,解除阻塞。
如果你的項(xiàng)目中也需要類似的功能要實(shí)現(xiàn)一個鎖,可以參考這里的代碼,實(shí)現(xiàn)非常巧妙。
接下來我們來重點(diǎn)分析下元數(shù)據(jù)基類。
02.2 初探 Metadata
首先它是一個線程安全的類,因此Metadata通過synchronized修飾幾乎所有方法來保證線程安全,里面封裝了元數(shù)據(jù)基本信息以及元數(shù)據(jù)的相關(guān)操作,主要被用在生產(chǎn)端的 「KafkaProducer 主線程」、「Sender 子線程」中,對于生產(chǎn)者來說只需要獲取自己發(fā)送的主題集合的元數(shù)據(jù),這樣可以有效降低網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量。因此,它內(nèi)部只維護(hù)部分主題的元數(shù)據(jù)。
github 源碼地址如下:
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/Metadata.java
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
我們先來看下這個類的重要字段:
?public?class?Metadata?implements?Closeable?{
????private?final?Logger?log;
????//?請求元數(shù)據(jù)失敗后重試間隔時間,默認(rèn)值:100ms
????private?final?long?refreshBackoffMs;
????//?元數(shù)據(jù)過期時間,默認(rèn)值:5分鐘
????private?final?long?metadataExpireMs;
????//?元數(shù)據(jù)版本號,每次請求服務(wù)端+1,保存在本地內(nèi)存中
????private?int?updateVersion;?
????//?元數(shù)據(jù)加入到新主題集合的版本號
????private?int?requestVersion;?
????//?最后一次更新元數(shù)據(jù)的時間
????private?long?lastRefreshMs;
????//?最后一次成功更新全部主題元數(shù)據(jù)的時間
????private?long?lastSuccessfulRefreshMs;
????//?失敗異常
????private?KafkaException?fatalException;
????//?無效的主題集合
????private?Set?invalidTopics;
????//?無權(quán)限的主題集合
????private?Set?unauthorizedTopics;
????//?元數(shù)據(jù)緩存
????private?MetadataCache?cache?=?MetadataCache.empty();
????//?是否全部主題更新
????private?boolean?needFullUpdate;
????//?是否部分主題更新
????private?boolean?needPartialUpdate;
????//?初始化?Metadata?
????public?Metadata(long?refreshBackoffMs,
????????????????????long?metadataExpireMs,
????????????????????LogContext?logContext,
????????????????????ClusterResourceListeners?clusterResourceListeners)?{
????????this.refreshBackoffMs?=?refreshBackoffMs;
????????this.metadataExpireMs?=?metadataExpireMs;
????????this.lastRefreshMs?=?0L;
????????this.lastSuccessfulRefreshMs?=?0L;
????????this.requestVersion?=?0;
????????this.updateVersion?=?0;
????????this.needFullUpdate?=?false;
????????this.needPartialUpdate?=?false;
????????this.invalidTopics?=?Collections.emptySet();
????????this.unauthorizedTopics?=?Collections.emptySet();
????}
refreshBackoffMs:請求元數(shù)據(jù)失敗后重試間隔時間,默認(rèn)值:100ms。 metadataExpireMs:元數(shù)據(jù)過期時間,默認(rèn)值:5分鐘,時間一到會再次發(fā)送獲取元數(shù)據(jù)的請求。 updateVersion:元數(shù)據(jù)版本號,每次請求服務(wù)端+1,保存在本地內(nèi)存中。 requestVersion:元數(shù)據(jù)加入到新主題集合的版本號,每次+1。 lastRefreshMs:最后一次更新元數(shù)據(jù)的時間。 lastSuccessfulRefreshMs:最后一次成功更新全部主題元數(shù)據(jù)的時間。 fatalException:失敗異常。 invalidTopics:無效的主題集合。 unauthorizedTopics:無權(quán)限的主題集合。 MetadataCache:元數(shù)據(jù)緩存,客戶端真正存儲元數(shù)據(jù)的對象。 needFullUpdate:是否全部主題更新,對生產(chǎn)者來說,全部主題是指最近發(fā)送的主題集合。 needPartialUpdate:是否部分主題更新,對生產(chǎn)者來說,部分主題是指新發(fā)送的主題集合。
介紹完字段后,我們來深度分析下幾個元數(shù)據(jù)用到的重要方法。
02.2.1 bootstrap()
?public?synchronized?void?bootstrap(List?addresses) ?{
????//?是否全部主題更新為true?
????this.needFullUpdate?=?true;
????//?版本更新為1
????this.updateVersion?+=?1;
????//?初始化元數(shù)據(jù)緩存
????this.cache?=?MetadataCache.bootstrap(addresses);
?}
該方法用來引導(dǎo)啟動程序的,即在第一次使用前進(jìn)行初始化的工作。
由于此時生產(chǎn)者剛啟動,本地緩存中的元數(shù)據(jù)是空的,因此先將 needFullUpdate 置為 true,即需要全部主題進(jìn)行更新。 在初始化時將 updateVersion 置為 0,此時將版本更新+1。 調(diào)用元數(shù)據(jù)緩存類 MetadataCache.bootstrap() 初始化元數(shù)據(jù)緩存。
我們來看下元數(shù)據(jù)緩存的啟動程序都做了哪些事情。
02.2.2 MetadataCache.bootstrap()
?static?MetadataCache?bootstrap(List?addresses) ?{
????????...
????????//?因還未獲取元數(shù)據(jù),此時元數(shù)據(jù)緩存都是空集合
????????return?new?MetadataCache(null,?nodes,?Collections.emptyList(),
????????????????Collections.emptySet(),?Collections.emptySet(),?Collections.emptySet(),
????????????????null,?Cluster.bootstrap(addresses));
?}
此時獲取元數(shù)據(jù)被啟動了,但是還未獲取元首,所以元數(shù)據(jù)緩存都是空的集合,接下來我們來分析下元數(shù)據(jù)是如何被更新以及如何解析響應(yīng)的,這里涉及到以下幾個方法。
02.2.3 requestUpdate()
/**
???*?Request?an?update?of?the?current?cluster?metadata?info,?return?the?current?updateVersion?before?the?update
???*/
?public?synchronized?int?requestUpdate()?{
????//?全部主題更新設(shè)置為true
????this.needFullUpdate?=?true;
????//?返回元數(shù)據(jù)版本號
????return?this.updateVersion;
?}
該方法用來設(shè)置全部主題更新的標(biāo)記,上來就先將 needFullUpdate 置為 true,即要全部更新主題,然后返回更新版本。 主要在「ProducerMetadata」、「sender 子線程」、「NetworkClient線程」中使用。
02.2.4 requestUpdateForNewTopics()
?public?synchronized?int?requestUpdateForNewTopics()?{
????????//?重寫上次刷新的時間戳以允許立即更新。
????????this.lastRefreshMs?=?0;
????????//?部分主題更新設(shè)置為true
????????this.needPartialUpdate?=?true;
????????//?元數(shù)據(jù)加入到新主題集合的版本號?+?1
????????this.requestVersion++;
????????//?返回元數(shù)據(jù)版本號
????????return?this.updateVersion;
??}
該方法用來設(shè)置新集合主題更新的標(biāo)記,這里并未真正發(fā)送更新元數(shù)據(jù)的請求,只是設(shè)置標(biāo)識位的值,Kafka必須確保在第一次拉消息前元數(shù)據(jù)是可用的,即必須更新一次元數(shù)據(jù)。
將 lastRefreshMs 設(shè)置為0,即重寫上次刷新的時間戳以允許立即更新。 將需要更新元數(shù)據(jù)的標(biāo)志位 needPartialUpdate 設(shè)置 true。 將元數(shù)據(jù)加入到新主題集合的版本號 requestVersion +1 。 返回元數(shù)據(jù)版本號。
02.2.3 fetch()
/**
??*?Get?the?current?cluster?info?without?blocking
??*/
?public?synchronized?Cluster?fetch()?{
??????//?返回元數(shù)據(jù)緩存
??????return?cache.cluster();
?}
該方法用來獲取集群元數(shù)據(jù)的,默認(rèn)從元數(shù)據(jù)緩存中返回元數(shù)據(jù)。
02.2.4 update()
public?synchronized?void?update(int?requestVersion,?MetadataResponse?response,?boolean?isPartialUpdate,?long?nowMs)?{
????????.....
????????//?是否是部分主題更新標(biāo)記
????????this.needPartialUpdate?=?requestVersion?this.requestVersion;
????????//?最后一次更新元數(shù)據(jù)的時間為當(dāng)前時間
????????this.lastRefreshMs?=?nowMs;
????????//?元數(shù)據(jù)版本號?+1?
????????this.updateVersion?+=?1;
????????//?判斷非部分更新即全部主題更新
????????if?(!isPartialUpdate)?{
????????????//?全部主題更新標(biāo)記為否
????????????this.needFullUpdate?=?false;
????????????//?最后一次成功更新全部主題元數(shù)據(jù)的時間更新為當(dāng)前時間
????????????this.lastSuccessfulRefreshMs?=?nowMs;
????????}
????????String?previousClusterId?=?cache.clusterResource().clusterId();
????????//?解析元數(shù)據(jù)響應(yīng)結(jié)果
????????this.cache?=?handleMetadataResponse(response,?isPartialUpdate,?nowMs);
???????....
}
該方法主要做了幾件事情,具體如下:
設(shè)置是否是部分主題更新 needPartialUpdate,根據(jù)參數(shù):元數(shù)據(jù)加入到新主題集合的版本號 「requestVersion」與「元數(shù)據(jù)中的requestVersion」對比。 設(shè)置最后一次更新元數(shù)據(jù)的時間為當(dāng)前時間。 設(shè)置元數(shù)據(jù)版本號 +1 。 判斷是否全部主題更新,如果是則說明更新全部主題的響應(yīng)已經(jīng)收到了,此時將 needFullUpdate 標(biāo)記為否,lastSuccessfulRefreshMs 更新為當(dāng)前時間。 解析元數(shù)據(jù)響應(yīng)結(jié)果,并設(shè)置緩存。
最后調(diào)用了 handleMetadataResponse 這個重要方法來解析元數(shù)據(jù)響應(yīng),接下來重點(diǎn)分析下它做了些什么。
02.2.5 handleMetadataResponse()
private?MetadataCache?handleMetadataResponse(MetadataResponse?metadataResponse,?boolean?isPartialUpdate,?long?nowMs)?{
????????//?All?encountered?topics.
????????Set?topics?=?new?HashSet<>();
????????//?初始化相關(guān)主題集合
????????Set?internalTopics?=?new?HashSet<>();
????????Set?unauthorizedTopics?=?new?HashSet<>();
????????Set?invalidTopics?=?new?HashSet<>();
????????List?partitions?=?new?ArrayList<>();
????????//?遍歷主題的元數(shù)據(jù)響應(yīng)
????????for?(MetadataResponse.TopicMetadata?metadata?:?metadataResponse.topicMetadata())?{
????????????//?將該主題添加到元數(shù)據(jù)主題集合中
????????????topics.add(metadata.topic());
????????????//?判斷是否保留主題元數(shù)據(jù)
????????????if?(!retainTopic(metadata.topic(),?metadata.isInternal(),?nowMs))
????????????????continue;
????????????//?判斷是否是內(nèi)部主題。
????????????if?(metadata.isInternal())
????????????????internalTopics.add(metadata.topic());
????????????//?判斷是否元數(shù)據(jù)響應(yīng)error為空
????????????if?(metadata.error()?==?Errors.NONE)?{
????????????????//?遍歷分區(qū)信息
????????????????for?(MetadataResponse.PartitionMetadata?partitionMetadata?:?metadata.partitionMetadata())?{
????????????????????....
????????????????????//?判斷分區(qū)元數(shù)據(jù)是否有無效異常
????????????????????if?(partitionMetadata.error.exception()?instanceof?InvalidMetadataException)?{
????????????????????????....
????????????????????????//?標(biāo)記全部主題更新
????????????????????????requestUpdate();
????????????????????}
????????????????}
????????????}?else?{?//?如果元數(shù)據(jù)響應(yīng)有錯誤
????????????????//?判斷是否是無效元數(shù)據(jù)異常
????????????????if?(metadata.error().exception()?instanceof?InvalidMetadataException)?{
????????????????????....
????????????????????//?標(biāo)記全部主題更新
????????????????????requestUpdate();
????????????????}
????????????????//?判斷是否無效主題錯誤
????????????????if?(metadata.error()?==?Errors.INVALID_TOPIC_EXCEPTION)
????????????????????//?將主題添加到無效主題集合中
????????????????????invalidTopics.add(metadata.topic());
????????????????//?判斷是否無權(quán)限主題錯誤
????????????????else?if?(metadata.error()?==?Errors.TOPIC_AUTHORIZATION_FAILED)
????????????????????//?將主題添加到無權(quán)限主題集合中
????????????????????unauthorizedTopics.add(metadata.topic());
????????????}
????????}
????????Map?nodes?=?metadataResponse.brokersById();
????????//?判斷是否部分主題更新
????????if?(isPartialUpdate)
????????????//?如果是則與現(xiàn)在的元數(shù)據(jù)緩存合并在一起
????????????return?this.cache.mergeWith(metadataResponse.clusterId(),?nodes,?partitions,
????????????????unauthorizedTopics,?invalidTopics,?internalTopics,?metadataResponse.controller(),
????????????????(topic,?isInternal)?->?!topics.contains(topic)?&&?retainTopic(topic,?isInternal,?nowMs));
????????else
????????????//?如果是全部主題更新的話,就重新初始化元數(shù)據(jù)緩存
????????????return?new?MetadataCache(metadataResponse.clusterId(),?nodes,?partitions,
????????????????unauthorizedTopics,?invalidTopics,?internalTopics,?metadataResponse.controller());
?}
該方法用來解析元數(shù)據(jù)響應(yīng),我們具體分析下主流程的邏輯:
首先初始化相關(guān)集合:「內(nèi)部主題集合」、「無效主題集合」、「無權(quán)限主題集合」。 遍歷主題的元數(shù)據(jù)響應(yīng)。 將該主題添加到元數(shù)據(jù)主題集合中。 判斷是否保留主題元數(shù)據(jù),如果過期可能就沒必要保留了。 判斷是否是內(nèi)部主題,如果是內(nèi)部主題就添加到內(nèi)部主題集合。 判斷是否元數(shù)據(jù)響應(yīng)error為空的話
如果為空就開始遍歷主題下的分區(qū)信息,更新本地元數(shù)據(jù)緩存。 判斷分區(qū)元數(shù)據(jù)是否有無效異常,如果有則要打出相應(yīng)的日志, 做好需要全部主題更新元數(shù)據(jù)的標(biāo)記,后續(xù)會提醒 Sender 線程去更新元數(shù)據(jù)。 否則如果元數(shù)據(jù)響應(yīng)error不為空的話
先判斷是否是無效元數(shù)據(jù)異常,如果有則做好需要全部主題更新元數(shù)據(jù)的標(biāo)記,后續(xù)會提醒 Sender 線程去更新元數(shù)據(jù)。 判斷是否無效主題錯誤,如果是將主題添加到無效主題集合中。 判斷是否無權(quán)限主題錯誤,如果是將主題添加到無權(quán)限主題集合中。 判斷是否部分主題更新響應(yīng)
如果是則與現(xiàn)在的元數(shù)據(jù)緩存合并在一起。 否則就重新初始化元數(shù)據(jù)緩存。
至此,元數(shù)據(jù)相關(guān)類的重要方法已經(jīng)分析完畢,剩余沒分析到的方法待到后續(xù)場景中進(jìn)行分析,接下來我們看下主線程是如何加載元數(shù)據(jù)的。
02.3 初探 cluster
github 源碼地址如下:
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/Cluster.java
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/Node.java
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
?public?final?class?Cluster?{
????private?final?boolean?isBootstrapConfigured;
????//?kafka集群中的broker節(jié)點(diǎn)
????private?final?List?nodes;
????//?未授權(quán)的主題集合
????private?final?Set?unauthorizedTopics;
????//?無效的主題集合
????private?final?Set?invalidTopics;
????//?內(nèi)部主題集合
????private?final?Set?internalTopics;
????//?controller?節(jié)點(diǎn)
????private?final?Node?controller;
????//?topic對應(yīng)的?partition?信息字典,存放的?partition?不一定有?Leader?副本,?鍵為topic,值為?partition?信息集合。
????private?final?Map?partitionsByTopicPartition;
????//?鍵為topic,值為可用?partition?信息集合,存放的?partition?一定有?Leader?副本
????private?final?Map>?partitionsByTopic;
????//?鍵為broker的id,值為partition?信息集合
????private?final?Map>?availablePartitionsByTopic;
????//?鍵為broker的id,值為表示該節(jié)點(diǎn)的node實(shí)例
????private?final?Map>?partitionsByNode;
????private?final?Map?nodesById;
????private?final?ClusterResource?clusterResource;
Cluster類基本保存了所有的kafka集群相關(guān)的信息。
Node broker節(jié)點(diǎn)的相關(guān)的信息
?public?class?Node?{
????//?broker?節(jié)點(diǎn)id
????private?final?int?id;
????//?broker?節(jié)點(diǎn)id的字符串
????private?final?String?idString;
????//?broker?節(jié)點(diǎn)的地址用于socket連接
????private?final?String?host;
????//?端口
????private?final?int?port;
????//?broker?節(jié)點(diǎn)的機(jī)架
????private?final?String?rack;
????//?初始化節(jié)點(diǎn)屬性
????public?Node(int?id,?String?host,?int?port,?String?rack)?{
????????this.id?=?id;
????????this.idString?=?Integer.toString(id);
????????this.host?=?host;
????????this.port?=?port;
????????this.rack?=?rack;
????}
?}
PartitionInfo 分區(qū)信息
?public?class?PartitionInfo?{
????//?主題
????private?final?String?topic;
????//?分區(qū)編號id
????private?final?int?partition;
????//?分區(qū)?Leader?副本信息,唯一進(jìn)行通信的節(jié)點(diǎn)
????private?final?Node?leader;
????//?全部副本信息
????private?final?Node[]?replicas;
????//?ISR?副本信息,?follower角色
????private?final?Node[]?inSyncReplicas;
????//?離線副本信息
????private?final?Node[]?offlineReplicas;
????//?初始化分區(qū)信息
????public?PartitionInfo(String?topic,
?????????????????????????int?partition,
?????????????????????????Node?leader,
?????????????????????????Node[]?replicas,
?????????????????????????Node[]?inSyncReplicas,
?????????????????????????Node[]?offlineReplicas)?{
????????this.topic?=?topic;
????????this.partition?=?partition;
????????this.leader?=?leader;
????????this.replicas?=?replicas;
????????this.inSyncReplicas?=?inSyncReplicas;
????????this.offlineReplicas?=?offlineReplicas;
????}
?}
TopicPartition
每個topic和每個分區(qū)組成的唯一索引,代表一個分區(qū)標(biāo)識。
?public?final?class?TopicPartition?implements?Serializable?{
????//?hash值,用來hashCode方法緩存
????private?int?hash?=?0;
????//?分區(qū)編號
????private?final?int?partition;
????//?主題名稱
????private?final?String?topic;
????//?初始化主題分區(qū)索引
????public?TopicPartition(String?topic,?int?partition)?{
????????this.partition?=?partition;
????????this.topic?=?topic;
????}
????//?計算hash值
????public?int?hashCode()?{
????????if?(hash?!=?0)
????????????return?hash;
????????final?int?prime?=?31;
????????int?result?=?1;
????????result?=?prime?*?result?+?partition;
????????result?=?prime?*?result?+?Objects.hashCode(topic);
????????this.hash?=?result;
????????return?result;
????}
?}
綜上映射關(guān)系可以看出,kafka 是以「分區(qū)」為最小管理單元,然后分區(qū)中的 Leader 負(fù)責(zé)交互。「Cluster」代表整個kafka集群的實(shí)體類,「MetaData」的角色相當(dāng)于「Cluster」的在客戶端的一個維護(hù)者。
02.4 主線程加載元數(shù)據(jù)
我們先來看加載元數(shù)據(jù)大體過程。

接下來分析詳細(xì)源碼,首先客戶端可以直接調(diào)用「 producer.send() 」進(jìn)行發(fā)送,底層調(diào)用 doSend() 方法。
//?向?topic?異步地發(fā)送數(shù)據(jù),當(dāng)發(fā)送確認(rèn)后喚起回調(diào)函數(shù)
public?Future?send(ProducerRecord?record,?Callback?callback) ? {
???ProducerRecord?interceptedRecord?=?this.interceptors.onSend(record);
???//?調(diào)用?doSend?發(fā)送,支持回調(diào)函數(shù)
???return?doSend(interceptedRecord,?callback);
}
private?Future?doSend(ProducerRecord?record,?Callback?callback) ? {
????????TopicPartition?tp?=?null;
????????try?{
????????????//?假如?sender?線程為空或者不運(yùn)行,報錯
????????????throwIfProducerClosed();
????????????//?當(dāng)前時間【毫秒】?
????????????long?nowMs?=?time.milliseconds();
????????????ClusterAndWaitTime?clusterAndWaitTime;
????????????try?{
????????????????//?等待元數(shù)據(jù)更新
????????????????clusterAndWaitTime?=?waitOnMetadata(record.topic(),?record.partition(),?nowMs,?maxBlockTimeMs);
????????????}?catch?(KafkaException?e)?{
????????????????if?(metadata.isClosed())
????????????????????throw?new?KafkaException("Producer?closed?while?send?in?progress",?e);
????????????????throw?e;
????????????}
????????????nowMs?+=?clusterAndWaitTime.waitedOnMetadataMs;
????????????long?remainingWaitMs?=?Math.max(0,?maxBlockTimeMs?-?clusterAndWaitTime.waitedOnMetadataMs);
????????????Cluster?cluster?=?clusterAndWaitTime.cluster;
????????????....
????????????//?將消息記錄追加到消息緩沖區(qū)
????????????RecordAccumulator.RecordAppendResult?result?=?accumulator.append(tp,?timestamp,?serializedKey,
????????????????????serializedValue,?headers,?interceptCallback,?remainingWaitMs,?true,?nowMs);
????????????????????
????????????//?當(dāng)批次滿了或者新批次被創(chuàng)建了,開始喚醒sender線程數(shù)據(jù)發(fā)送
????????????if?(result.batchIsFull?||?result.newBatchCreated)?{
????????????????this.sender.wakeup();
????????????}
????????????return?result.future;
????????}?catch?(ApiException?e)?{
????????????....
????????????return?new?FutureFailure(e);
????????}?catch?(InterruptedException?e)?{
????????????....
????????????throw?new?InterruptException(e);
????????}?catch?(KafkaException?e)?{
????????????....
????????????throw?e;
????????}?catch?(Exception?e)?{
????????????....
????????????throw?e;
????????}
?}
從上述發(fā)送的源碼中,可以看出來主線程在發(fā)送消息前需要先獲取元數(shù)據(jù),這樣才能知道消息要發(fā)送到哪些節(jié)點(diǎn)。
中間通過調(diào)用 waitOnMetadata() 獲取元數(shù)據(jù)的相關(guān)信息,為后續(xù)發(fā)送消息提供支持,接下來我們重點(diǎn)分析下 waitOnMetadata() 這個方法。
private?ClusterAndWaitTime?waitOnMetadata(String?topic,?Integer?partition,?long?nowMs,?long?maxWaitMs)?throws?InterruptedException?{
????????//?1.?從元數(shù)據(jù)緩存中獲取元數(shù)據(jù)
????????Cluster?cluster?=?metadata.fetch();
????????//?2.?判斷該主題是否是無效的主題
????????if?(cluster.invalidTopics().contains(topic))
????????????throw?new?InvalidTopicException(topic);
????????//?3.?將該主題放入元數(shù)據(jù)主題列表中
????????metadata.add(topic,?nowMs);
????????//?4.?從元數(shù)據(jù)緩存中獲取主題對應(yīng)的分區(qū)數(shù)
????????Integer?partitionsCount?=?cluster.partitionCountForTopic(topic);
????????//?5.?滿足這些條件后就不用拉取元數(shù)據(jù),直接從元數(shù)據(jù)緩存中返回
????????if?(partitionsCount?!=?null?&&?(partition?==?null?||?partition?????????????//?返回元數(shù)據(jù)信息
????????????return?new?ClusterAndWaitTime(cluster,?0);
????????long?remainingWaitMs?=?maxWaitMs;
????????long?elapsed?=?0;
????????//?6.?不停的輪詢喚醒?sender?線程更新元數(shù)據(jù)
????????do?{
????????????//?7.?將主題?topic及過期時間添加到元數(shù)據(jù)主題列表中
????????????metadata.add(topic,?nowMs?+?elapsed);
????????????//?8.?標(biāo)記元數(shù)據(jù)更新標(biāo)識,獲取元數(shù)據(jù)版本號
????????????int?version?=?metadata.requestUpdateForTopic(topic);
????????????//?9.?喚醒?sender?子線程
????????????sender.wakeup();
????????????try?{
????????????????//?10.?阻塞線程等待元數(shù)據(jù)更新成功
????????????????metadata.awaitUpdate(version,?remainingWaitMs);
????????????}?catch?(TimeoutException?ex)?{
????????????????...
????????????}
????????????//?11.?到這里,應(yīng)該是當(dāng)前cluster更新成功了,再次獲取元數(shù)據(jù)
????????????cluster?=?metadata.fetch();
????????????//?12.?計算等待更新完成元數(shù)據(jù)消耗時間
????????????elapsed?=?time.milliseconds()?-?nowMs;
????????????//?13.?如果超時,拋超時異常
????????????if?(elapsed?>=?maxWaitMs)?{
???????????????...
????????????}
????????????metadata.maybeThrowExceptionForTopic(topic);
????????????remainingWaitMs?=?maxWaitMs?-?elapsed;
????????????//?14.?獲取元數(shù)據(jù)分區(qū)數(shù)
????????????partitionsCount?=?cluster.partitionCountForTopic(topic);
????????}?while?(partitionsCount?==?null?||?(partition?!=?null?&&?partition?>=?partitionsCount));
????????//?15.?返回元數(shù)據(jù)以及消耗時間
????????return?new?ClusterAndWaitTime(cluster,?elapsed);
?}
該方法用來獲取元數(shù)據(jù)以及元數(shù)據(jù)消耗時間,我們具體分析下主流程的邏輯:
從元數(shù)據(jù)緩存中獲取元數(shù)據(jù),如果metadata不存在當(dāng)前topic的元數(shù)據(jù),會觸發(fā)一次強(qiáng)制刷新,metaData中的needUpdate置為true。 判斷該主題是否是無效的主題, 如果是拋異常。 將該主題放入元數(shù)據(jù)的主題列表中,通過 Sender 線程定時更新這些主題的元數(shù)據(jù)。 從元數(shù)據(jù)緩存中獲取主題對應(yīng)的分區(qū)數(shù)。 判斷元數(shù)據(jù)緩存是否能滿足需要,就是說要能夠找到要發(fā)送消息的主題分區(qū),條件是:partitionsCount != null && (partition == null || partition < partitionsCount) ?即「主題對應(yīng)的分區(qū)數(shù)不能為空且發(fā)送的分區(qū)ID要小于主題的分區(qū)數(shù)」。 不停的輪詢喚醒 sender 線程更新元數(shù)據(jù),這里有兩個條件要滿足其一才可以。
partitionsCount == null 「從元數(shù)據(jù)緩存中獲取主題對應(yīng)的分區(qū)數(shù)為空」,這里要發(fā)送的主題連Leader分區(qū)都沒有,可能主題分區(qū)根本不存在也可能沒拉取到最新分區(qū)信息,如果真的不存在就沒必要繼續(xù)直接拋異常。 partition != null && partition >= partitionsCount「分區(qū)不為空且要發(fā)送的分區(qū)ID大于主題分區(qū)數(shù)」,這里說明了主題分區(qū)的數(shù)量增加了,需要重新拉取下獲取最新分區(qū)信息。 將主題 topic及過期時間添加到元數(shù)據(jù)主題列表中 標(biāo)記元數(shù)據(jù)更新標(biāo)識,并獲取當(dāng)前元數(shù)據(jù)版本號,提醒 Sender 線程更新元數(shù)據(jù)。 喚醒 sender 子線程進(jìn)行元數(shù)據(jù)更新以及消息發(fā)送。 阻塞主線程并等待Sender線程更新元數(shù)據(jù)成功。 待 「Sender 子線程更新元數(shù)據(jù)成功」或者「阻塞超時解除阻塞」,再次獲取元數(shù)據(jù)。 計算等待更新完成元數(shù)據(jù)消耗時間。 如果超時,拋超時異常。maxWaitMs 最大1分鐘。 獲取元數(shù)據(jù)分區(qū)數(shù)。 返回元數(shù)據(jù)以及消耗時間。
聰明的讀者可能發(fā)現(xiàn)在生產(chǎn)者中獲取元數(shù)據(jù)都是基于topic的,主要原因就是對于生產(chǎn)者來說,沒必要拉取全部的元數(shù)據(jù),只拉取自己需要的主題元數(shù)據(jù)就可以了。
至此,元數(shù)據(jù)加載已經(jīng)分析完了, 接下來我們看下 Sender 子線程是如何拉取元數(shù)據(jù)的。
03 Sender 線程拉取元數(shù)據(jù)
由于 Sender 線程的深度分析不是本文的重點(diǎn),這里先簡單的給大家?guī)б粠В罄m(xù)會有專門篇章去分析。
github 源碼地址如下:
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
我們來看看 Sender 線程拉取元數(shù)據(jù)的大體過程。

接下來我們詳細(xì)分析具體的源碼。
03.1 Sender 周期執(zhí)行
通過源碼可以得知 Sender 是一個 Runnable 對象,那整個 Sender 線程執(zhí)行的核心邏輯就在 run() 方法中,run() 方法中的第一段代碼就是循環(huán)調(diào)用 runOnce() 方法:
?public?class?Sender?implements?Runnable?{
?????public?void?run()?{
????????//?running?字段用來標(biāo)識當(dāng)前?Sender?線程是否正常執(zhí)行
????????while?(running)?{
????????????try?{
????????????????//?如果正常運(yùn)行,則執(zhí)行運(yùn)行周期
????????????????runOnce();
????????????}?catch?(Exception?e)?{
????????????????log.error("Uncaught?error?in?kafka?producer?I/O?thread:?",?e);
????????????}
????????}
????}
????
????void?runOnce()?{
????????.....??//?此處省略事務(wù)消息相關(guān)的處理邏輯
????????long?currentTimeMs?=?time.milliseconds();
????????//?創(chuàng)建發(fā)送到?kafka?集群的請求
????????long?pollTimeout?=?sendProducerData(currentTimeMs);
????????//?真正執(zhí)行網(wǎng)絡(luò)IO的地方,會將請求發(fā)送出去,并處理收到的響應(yīng)
????????client.poll(pollTimeout,?currentTimeMs);
????}
?}
上述源碼中的 runOnce() 方法是 Sender 線程一個執(zhí)行的周期,在這個周期中會進(jìn)行一次批量的請求發(fā)送,并進(jìn)行一次響應(yīng)的處理。
接下來我們看里面2個涉及到元數(shù)據(jù)的重要方法:
03.1.1 sendProducerData()
??private?long?sendProducerData(long?now)?{
????????//?從元數(shù)據(jù)緩存中獲取元數(shù)據(jù)
????????Cluster?cluster?=?metadata.fetch();
????????//?通過元數(shù)據(jù)cluster獲取要發(fā)送的節(jié)點(diǎn)?Leader?分區(qū)信息
????????RecordAccumulator.ReadyCheckResult?result?=?this.accumulator.ready(cluster,?now);
????????//?如果主題的?Leader?分區(qū)對應(yīng)的節(jié)點(diǎn)不存在就強(qiáng)制更新元數(shù)據(jù)
????????if?(!result.unknownLeaderTopics.isEmpty())?{
????????????for?(String?topic?:?result.unknownLeaderTopics)
????????????????//?將主題加入元數(shù)據(jù)主題列表
????????????????this.metadata.add(topic,?now);
????????????//?強(qiáng)制標(biāo)記元數(shù)據(jù)更新標(biāo)識
????????????this.metadata.requestUpdate();
????????}
??????.....?//?忽略能發(fā)送的節(jié)點(diǎn)數(shù)據(jù)
??}
該方法用來進(jìn)行 Sender 線程創(chuàng)建請求的核心,這里只分析下元數(shù)據(jù)涉及的部分:
從元數(shù)據(jù)緩存中獲取元數(shù)據(jù)。 通過元數(shù)據(jù)cluster獲取要發(fā)送的節(jié)點(diǎn) Leader 分區(qū)信息。 如果主題的 Leader 分區(qū)對應(yīng)的節(jié)點(diǎn)不存在就強(qiáng)制更新元數(shù)據(jù)。
循環(huán)無 Leader 分區(qū)的主題,將主題加入元數(shù)據(jù)主題列表。 強(qiáng)制標(biāo)記元數(shù)據(jù)更新標(biāo)識。 對于無 Leader 分區(qū)的主題,可能分區(qū)正在選主中,也可能 Leader 分區(qū)所在節(jié)點(diǎn) Crash,所以要強(qiáng)制更新元數(shù)據(jù)保證元數(shù)據(jù)一致性。
03.1.2 client.poll()
public?class?NetworkClient?implements?KafkaClient?{
???public?List?poll(long?timeout,?long?now)? {
????????....
????????//?嘗試更新元數(shù)據(jù)
????????long?metadataTimeout?=?metadataUpdater.maybeUpdate(now);
????????try?{
????????????//?執(zhí)行IO操作
????????????this.selector.poll(Utils.min(timeout,?metadataTimeout,?defaultRequestTimeoutMs));
????????}?catch?(IOException?e)?{
????????????log.error("Unexpected?error?during?I/O",?e);
????????}
????????....
????????List?responses?=?new?ArrayList<>();
????????//?處理更新命令的返回
????????handleCompletedReceives(responses,?updatedNow);
????????return?responses;
???}
}
該方法用來發(fā)送網(wǎng)絡(luò)IO請求,在請求之前先執(zhí)行嘗試更新元數(shù)據(jù)的請求。
03.2 元數(shù)據(jù)更新觸發(fā)時機(jī)
03.2.1 metadataUpdater.maybeUpdate()
?public?class?NetworkClient?implements?KafkaClient?{
????private?NetworkClient(MetadataUpdater?metadataUpdater,
??????????????????????????Metadata?metadata,
??????????????????????????Selectable?selector,
??????????????????????????String?clientId,
??????????????????????????int?maxInFlightRequestsPerConnection,
??????????????????????????long?reconnectBackoffMs,
??????????????????????????long?reconnectBackoffMax,
??????????????????????????int?socketSendBuffer,
??????????????????????????int?socketReceiveBuffer,
??????????????????????????int?defaultRequestTimeoutMs,
??????????????????????????long?connectionSetupTimeoutMs,
??????????????????????????long?connectionSetupTimeoutMaxMs,
??????????????????????????ClientDnsLookup?clientDnsLookup,
??????????????????????????Time?time,
??????????????????????????boolean?discoverBrokerVersions,
??????????????????????????ApiVersions?apiVersions,
??????????????????????????Sensor?throttleTimeSensor,
??????????????????????????LogContext?logContext)?{
???????
????????if?(metadataUpdater?==?null)?{
????????????//?通過這里可以看出?metadataUpdate?實(shí)例化對象
????????????this.metadataUpdater?=?new?DefaultMetadataUpdater(metadata);
????????}?else?{
????????????this.metadataUpdater?=?metadataUpdater;
????????}
????????...
????}?
?}
從初始化函數(shù)中可以得出元數(shù)據(jù)更新組件是調(diào)用 NetworkClient 內(nèi)部類即 DefaultMetadataUpdater,前面已多次提到更新集群元數(shù)據(jù)的場景,而這些更新操作都是在標(biāo)記集群元數(shù)據(jù)是否需要更新,而真正執(zhí)行更新的操作是這里。接下來我們看下這個類的 maybeUpdate() 方法。
03.2.2 public maybeUpdate()
??public?long?maybeUpdate(long?now)?{
??????//?計算下次要更新元數(shù)據(jù)的時間,其中會檢測needUpdate的值、退避時間、是否長時間未更新
??????long?timeToNextMetadataUpdate?=?metadata.timeToNextUpdate(now);
??????//?檢測是否已經(jīng)發(fā)送了元數(shù)據(jù)的請求,如果一個元數(shù)據(jù)的請求,還未從服務(wù)端返回,那么時間設(shè)置為?waitForMetadataFetch(默認(rèn)30s)
??????long?waitForMetadataFetch?=?hasFetchInProgress()???defaultRequestTimeoutMs?:?0;
??????//?計算元數(shù)據(jù)超時時間
??????long?metadataTimeout?=?Math.max(timeToNextMetadataUpdate,?waitForMetadataFetch);
??????if?(metadataTimeout?>?0)?{
??????????return?metadataTimeout;
??????}
??????//?表示需要立即更新,取最空閑的節(jié)點(diǎn)node
??????Node?node?=?leastLoadedNode(now);
??????//?發(fā)送更新元數(shù)據(jù)的請求?????
??????return?maybeUpdate(now,?node);
??}
從這段源碼中可以看出上來先計算下次要更新元數(shù)據(jù)的時間,我們看下這個計算過程。
public?class?Metadata?implements?Closeable?{
??//?計算下次更新元數(shù)據(jù)的時間
??public?synchronized?long?timeToNextUpdate(long?nowMs)?{
??????long?timeToExpire?=?updateRequested()???0?:?Math.max(this.lastSuccessfulRefreshMs?+?this.metadataExpireMs?-?nowMs,?0);
??????return?Math.max(timeToExpire,?timeToAllowUpdate(nowMs));
??}
??
??//?判斷這2個標(biāo)識是否為true
??public?synchronized?boolean?updateRequested()?{
??????return?this.needFullUpdate?||?this.needPartialUpdate;
??}
??//?計算允許更新元數(shù)據(jù)的時機(jī)
??public?synchronized?long?timeToAllowUpdate(long?nowMs)?{
??????return?Math.max(this.lastRefreshMs?+?this.refreshBackoffMs?-?nowMs,?0);
??}
}
元數(shù)據(jù)是否過期 timeToExpire,計算方式:
首先會通過 updateRequested() 方法檢查 Metadata 中的 「needFullUpdate」「needPartialUpdate」,如果這兩個標(biāo)識位為 true,表示 Metadata 需要立即更新, 否則計算上次更新成功的時間距離當(dāng)前時間是否已經(jīng)超過了指定的元數(shù)據(jù)過期時間閾值metadataExpireMs「默認(rèn)5分鐘」。 允許更新的時間點(diǎn) timeToAllowUpdate,計算方式:
上次更新時間 + 退避時間 - 當(dāng)前時間的間隔 「要求上次更新時間與當(dāng)前時間的間隔不能大于退避時間,如果大于則需要等待」。 最后計算這倆值的最大值作為下次更新元數(shù)據(jù)的時間。
分析完元數(shù)據(jù)更新時機(jī),最后調(diào)用 maybeUpdate(now, node) 發(fā)送更新元數(shù)據(jù)的請求。
03.2.3 private maybeUpdate()
?private?long?maybeUpdate(long?now,?Node?node)?{
??????String?nodeConnectionId?=?node.idString();
??????//?判斷當(dāng)前node的狀態(tài)是否可以發(fā)送Request請求
??????if?(canSendRequest(nodeConnectionId,?now))?{
??????????//?構(gòu)建元數(shù)據(jù)請求
??????????Metadata.MetadataRequestAndVersion?requestAndVersion?=?metadata.newMetadataRequestAndVersion(now);
??????????MetadataRequest.Builder?metadataRequest?=?requestAndVersion.requestBuilder;
??????????//?向?nodeConnectionId?發(fā)送元數(shù)據(jù)請求
??????????sendInternalMetadataRequest(metadataRequest,?nodeConnectionId,?now);
??????????inProgress?=?new?InProgressData(requestAndVersion.requestVersion,?requestAndVersion.isPartialUpdate);
??????????return?defaultRequestTimeoutMs;
??????}
??????//?判斷Node是否正在連接
??????if?(isAnyNodeConnecting())?{
??????????return?reconnectBackoffMs;
??????}
??????//?如果存在可用的Node,則嘗試初始化連接
??????if?(connectionStates.canConnect(nodeConnectionId,?now))?{
??????????//?初始化與node的連接
??????????initiateConnect(node,?now);
??????????return?reconnectBackoffMs;
??????}
??????//?阻塞等待有新的節(jié)點(diǎn)可用
??????return?Long.MAX_VALUE;
??}
該方法用來發(fā)送更新元數(shù)據(jù)的請求,具體的實(shí)現(xiàn)邏輯如下:
canSendRequest() 判斷當(dāng)前node的狀態(tài)是否可以發(fā)送Request請求, 如果可以發(fā)送則構(gòu)建元數(shù)據(jù)請求,調(diào)用 sendInternalMetadataRequest() 向 nodeConnectionId 發(fā)送元數(shù)據(jù)請求。 isAnyNodeConnecting() 如果該 node 正在建立連接,則直接返回重新連接超時時間,等待更新成功。 connectionStates.canConnect() 如果存在可用的Node,則嘗試初始化連接,返回重新連接超時時間,等待更新成功。 阻塞等待有新的節(jié)點(diǎn)可用。
從上述源碼可以看出:更新流程一直在重試,直到元數(shù)據(jù)更新成功為止。
Sender 子線程第一次調(diào)用 poll() 方法時,嘗試初始化與 node 的連接。 Sender 子線程第二次調(diào)用 poll() 方法時,發(fā)送 Metadata 請求。 Sender 子線程會阻塞等待一定時間,當(dāng)有響應(yīng)返回時則獲取 metadataResponse,并更新 metadata。
如果元數(shù)據(jù)更新成功以后,KafkaProducer 主線程就不會被阻塞,當(dāng) NetworkClient 接收到服務(wù)端對 Metadata 請求的響應(yīng)后,就會更新 Metadata 信息, 即 poll() 方法后續(xù)的操作。
03.2.4 handleCompletedReceives()
??private?void?handleCompletedReceives(List?responses,?long?now) ?{
????for?(NetworkReceive?receive?:?this.selector.completedReceives())?{
????????....
????????//?如果是MetadataResponse類的響應(yīng),由metadataUpdater來處理
????????if?(req.isInternalRequest?&&?response?instanceof?MetadataResponse)
??????????//?處理成功返回的響應(yīng)信息
??????????metadataUpdater.handleSuccessfulResponse(req.header,?now,?(MetadataResponse)?response);
????????....
???}
?}
該方法用來判斷成功接收服務(wù)端返回的響應(yīng),根據(jù)不同的響應(yīng)返回做不同的操作,這里只看下跟元數(shù)據(jù)更新有關(guān)的邏輯。
03.2.5 handleSuccessfulResponse()
??public?void?handleSuccessfulResponse(RequestHeader?requestHeader,?long?now,?MetadataResponse?response)?{
?????....
????//?Check?if?any?topic's?metadata?failed?to?get?updated
????Map?errors?=?response.errors();
????//?如果返回錯誤,直接報錯
????if?(!errors.isEmpty())
????????log.warn("Error?while?fetching?metadata?with?correlation?id?{}?:?{}",?requestHeader.correlationId(),?errors);
????//?判斷broker信息是否為空,如果為空表示沒有獲得元數(shù)據(jù)
????if?(response.brokers().isEmpty())?{
????????//?更新失敗
????????this.metadata.failedUpdate(now);
????}?else?{
????????//?如果成功?則開始更新元數(shù)據(jù)
????????this.metadata.update(inProgress.requestVersion,?response,?inProgress.isPartialUpdate,?now);
????}
????inProgress?=?null;
?}
?
?//?元數(shù)據(jù)更新失敗
?public?synchronized?void?failedUpdate(long?now)?{
?????//?最后一次更新元數(shù)據(jù)的時間為當(dāng)前時間
?????this.lastRefreshMs?=?now;
?}
該方法用來對成功返回信息進(jìn)行處理,主要是對元數(shù)據(jù)信息的更新。
查看 response 返回信息的 error。 判斷broker信息是否為空
如果為空表示沒有獲得元數(shù)據(jù)即更新失敗了,然后調(diào)用 failedUpdate(now)方法記錄 lastRefreshMs 為當(dāng)前時間,不允許立即更新元數(shù)據(jù)。 如果不為空表示獲取元數(shù)據(jù)成功,則調(diào)用 MetaData.update() 更新元數(shù)據(jù)。
至此,Sender 子線程拉取并更新元數(shù)據(jù)分析完畢。
最后通過一張圖來描述整個元數(shù)據(jù)拉取的全過程:

05 總結(jié)
這里,我們一起來總結(jié)一下這篇文章的重點(diǎn)。
1、通過「場景驅(qū)動」的方式從元數(shù)據(jù)的使用場景出發(fā),拋出主線程加載元數(shù)據(jù)和子線程拉取元數(shù)據(jù)的過程是怎樣的?
2、帶你梳理了「主線程如何加載元數(shù)據(jù)源碼全貌」,包括 ProducerMetadata 類、Metadata 元數(shù)據(jù)基類、cluster類的幾個重要方法源碼分析,最后分析主線程加載元數(shù)據(jù)過程。
3、又帶你梳理了「Sender 線程拉取元數(shù)據(jù)」,包括 Sender 周期執(zhí)行、元數(shù)據(jù)更新觸發(fā)時機(jī)的幾個重要方法源碼分析。
3、最后通過一張元數(shù)據(jù)流程圖來勾勒出元數(shù)據(jù)拉取和更新的全貌。
下一篇我們來深度剖析「Kafka NIO實(shí)現(xiàn)」,大家期待,我們下期見。
如另外歡迎加入我的技術(shù)交流群,關(guān)注本公眾號并添加我個人微信,邀您進(jìn)群。
如果我的文章對你有所幫助,還請幫忙點(diǎn)贊、在看、轉(zhuǎn)發(fā)一下,非常感謝!
堅持總結(jié), 持續(xù)輸出高質(zhì)量文章 ?關(guān)注我: 華仔聊技術(shù)
? ? ? ? ? ? ? ? ? ? ?
? ? ? ? ? ? ? ? ?點(diǎn)個“贊”和“在看”鼓勵一下嘛~
