<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          圖解 Kafka 生產(chǎn)者元數(shù)據(jù)拉取管理全流程

          共 27266字,需瀏覽 55分鐘

           ·

          2022-06-08 17:19

          閱讀本文大約需要 60 分鐘。

          大家好,我是 華仔, 又跟大家見面了。

          在上一篇中,正式開啟了「Kafka的源碼之旅」,主要講述了 KafkaProducer 初始化時用到的核心組件以及消息發(fā)送的核心流程,帶你梳理生產(chǎn)者初始化整體的源碼分析脈絡(luò),并通過「場景驅(qū)動」的方式帶大家一點(diǎn)點(diǎn)的對 Kafka 源碼進(jìn)行深度剖析,一起掌握 Kafka 源碼核心架構(gòu)設(shè)計思想。

          今天這篇我們就來聊聊生產(chǎn)者是會如何拉取和管理元數(shù)據(jù)的,帶你梳理生產(chǎn)者元數(shù)據(jù)管理整體的源碼分析脈絡(luò)。

          認(rèn)真讀完這篇文章,我相信你會對 Kafka 生產(chǎn)獲取和管理元數(shù)據(jù)源碼有更加深刻的理解。

          這篇文章干貨很多,希望你可以耐心讀完。

          01 總的概述

          消息想從 Producer 端發(fā)送到 Broker 端,必須要先知道 Topic 在 Broker 的分布情況,才能判斷消息該發(fā)往哪些節(jié)點(diǎn),比如:「Topic 對應(yīng)的 Leader 分區(qū)有哪些」、「Leader分區(qū)分布在哪些 Broker 節(jié)點(diǎn)」、「Topic 分區(qū)動態(tài)變更」等等,所以及時獲取元數(shù)據(jù)對生產(chǎn)者正常工作是非常有必要的。

          元數(shù)據(jù)獲取涉及的底層組件比較多,主要分為:「KafkaProducer ?主線程加載元數(shù)據(jù)」、「Sender 子線程拉取元數(shù)據(jù)」。

          接下來我們逐一分析元數(shù)據(jù)在生產(chǎn)者端是如何被加載和拉取、更新的。為了方便大家理解,所有的源碼只保留骨干。

          02 主線程如何加載元數(shù)據(jù)

          首先我們來看下 KafkaProducer 主線程是如何加載元數(shù)據(jù)的

          在上一篇中《圖解Kafka生產(chǎn)者初始化核心流程》我們分析知道集群元數(shù)據(jù)的初始化是在 KafkaProducer 主線程的構(gòu)造函數(shù)中來完成的,我們來看一下相關(guān)源碼:

          //?初始化?Kafka?集群元數(shù)據(jù),元數(shù)據(jù)會保存到客戶端中,并與服務(wù)端元數(shù)據(jù)保持一致
          if?(metadata?!=?null)?{
          ????this.metadata?=?metadata;
          }?else?{
          ????//?初始化集群元數(shù)據(jù)
          ????this.metadata?=?new?ProducerMetadata(retryBackoffMs,
          ??????????//?元數(shù)據(jù)過期時間:默認(rèn)5分鐘
          ??????????config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
          ??????????//?topic最大空閑時間,如果在規(guī)定時間沒有被訪問,將從緩存刪除,下次訪問時強(qiáng)制獲取元數(shù)據(jù)
          ??????????config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
          ??????????logContext,
          ??????????clusterResourceListeners,
          ??????????Time.SYSTEM);
          ????//?啟動metadata的引導(dǎo)程序????
          ????this.metadata.bootstrap(addresses);
          }

          從上述源代碼我們可以看出在 KafkaProducer 的構(gòu)造方法中,如果metadata為空就會初始化集群元數(shù)據(jù)類「ProducerMetadata」,然后通過調(diào)用 「this.metadata.bootstrap」這個方法來啟動引導(dǎo)程序,這時 metaData 對象里并沒有具體的元數(shù)據(jù)信息,因?yàn)榭蛻舳诉€沒發(fā)送元數(shù)據(jù)更新的請求,后面會通過喚醒 Sender 線程進(jìn)行發(fā)送請求獲取元數(shù)據(jù)的。

          這里的 this.meta 其實(shí)就是 Kafka 內(nèi)存中的一個對象,底層會做一層緩存,因此并不會一直請求 Kafka Broker 端進(jìn)行獲取。

          我先給大家總結(jié)下元數(shù)據(jù)初始化及啟動的調(diào)用關(guān)系圖,口說無憑,我們來扒開源碼瞅一瞅,這樣更真實(shí)。

          通過上述的調(diào)用關(guān)系圖,我們可以看出:

          1. ProducerMetadata 類是 MetaData 的子類。
          2. Metadata 類是元數(shù)據(jù)基類,封裝了元數(shù)據(jù)的具體信息、版本控制、更新標(biāo)識、響應(yīng)解析等。
          3. 元數(shù)據(jù)的信息其實(shí)最終是保存在元數(shù)據(jù)緩存即 MetadataCache 中,而 ? 它最核心的是 Cluster , 保存了元數(shù)據(jù)基礎(chǔ)信息。

          接下來會挨個類展開來進(jìn)行講解。

          02.1 初探 ProducerMetadata

          在主線程中初始化了 ProducerMetadata 類的對象,我們先來看看這個類都做了哪些事情。

          github 源碼地址如下:

          https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java

          ?public?class?ProducerMetadata?extends?Metadata?{
          ????//?主題元數(shù)據(jù)過期時間,如果在這個時間段內(nèi)未被訪問,它就會從緩存中刪除。
          ????private?final?long?metadataIdleMs;

          ????/*?Topics?with?expiry?time?*/
          ????//?map集合,生產(chǎn)者的元數(shù)據(jù)主題集合,里面保存著
          ????//?主題和主題過期時間的對應(yīng)關(guān)系,即?topic,?nowMs?+?metadataIdleMs,
          ????//?過期了的主題會被踢出去。
          ????private?final?Map?topics?=?new?HashMap<>();
          ????//?新的主題集合,?set集合,即第一次要發(fā)送的主題
          ????private?final?Set?newTopics?=?new?HashSet<>();
          ????private?final?Logger?log;
          ????private?final?Time?time;
          ????public?ProducerMetadata(long?refreshBackoffMs,
          ????????????????????????????long?metadataExpireMs,
          ????????????????????????????long?metadataIdleMs,
          ????????????????????????????LogContext?logContext,
          ????????????????????????????ClusterResourceListeners?clusterResourceListeners,
          ????????????????????????????Time?time)
          ?
          {
          ????????//調(diào)用父類?Metadata?的構(gòu)造函數(shù)
          ????????super(refreshBackoffMs,?metadataExpireMs,?logContext,?clusterResourceListeners);
          ????????....
          ????}
          }

          我們可以看出這里只是調(diào)用了父類的構(gòu)造函數(shù)進(jìn)行類屬性的初始化,接下來我們深度分析下 ProducerMetadata 類中的幾個比較重要的方法。

          02.1.1 add()

          ?public?synchronized?void?add(String?topic,?long?nowMs)?{
          ????????//?判斷對象是否為空
          ????????Objects.requireNonNull(topic,?"topic?cannot?be?null");
          ????????if?(topics.put(topic,?nowMs?+?metadataIdleMs)?==?null)?{
          ????????????//?添加主題到新主題集合中
          ????????????newTopics.add(topic);
          ????????????//?更新元數(shù)據(jù)標(biāo)記?屬于Metadata類方法,后面小節(jié)分析
          ????????????requestUpdateForNewTopics();
          ????????}
          ?}

          該方法主要用來向元數(shù)據(jù)主題集合 topics 中添加主題,主要用在「KafkaProducer 主線程」以及「Sender 子線程」中,我們來看下是如何添加的,具體邏輯如下:

          1. 往元數(shù)據(jù)主題集合 topics 中添加主題和對應(yīng)的過期時間(當(dāng)時時間+過期時間段「默認(rèn)值:5分鐘」)。
          2. 如果元數(shù)據(jù)主題集合中不存在該主題時,說明是第一次就把該主題添加到新主題集合中。
          3. 標(biāo)記要更新新主題元數(shù)據(jù)的屬性字段「lastRefreshMs」 、「needPartialUpdate」 、「requestVersion」,以便后續(xù)喚醒 Sender 線程去服務(wù)端拉取新主題的元數(shù)據(jù)。

          此時主題被添加到元數(shù)據(jù)主題主題集合中,但是如果集合里面數(shù)據(jù)過期了該怎么辦?接下來我們看另外一個方法是如何判斷的。

          02.1.2 retainTopic()

          ??public?synchronized?boolean?retainTopic(String?topic,?boolean?isInternal,?long?nowMs)?{
          ????????//?獲取該主題的過期時間
          ????????Long?expireMs?=?topics.get(topic);
          ????????//?如果為空表示該主題不在元數(shù)據(jù)主題集合中
          ????????if?(expireMs?==?null)?{
          ????????????return?false;
          ????????//?判斷該主題是否在新集合中
          ????????}?else?if?(newTopics.contains(topic))?{
          ????????????return?true;
          ????????//?判斷是否超過了過期時間
          ????????}?else?if?(expireMs?<=?nowMs)?{
          ????????????log.debug("Removing?unused?topic?{}?from?the?metadata?list,?expiryMs?{}?now?{}",?topic,?expireMs,?nowMs);
          ????????????//?超過后直接從元數(shù)據(jù)主題集合中刪除該主題
          ????????????topics.remove(topic);
          ????????????return?false;
          ????????}?else?{
          ????????????return?true;
          ????????}
          ??}

          該方法用來判斷元數(shù)據(jù)中是否該保留該主題,會在 handleMetadataResponse 即處理元數(shù)據(jù)響應(yīng)結(jié)果的時候進(jìn)行調(diào)用,我們來看下它是如何判斷的。

          1. 先判斷元數(shù)據(jù)主題集合中是否存在該主題,如果不存在直接返回false。
          2. 然后判斷該主題是否在新主題集合中,如果存在直接返回true。
          3. 再判斷該主題是否超過了過期時間,如果超過了,就從元數(shù)據(jù)主題集合中刪除該主題,再請求元數(shù)據(jù)的時候就不用帶上該主題,可以有效的減少網(wǎng)絡(luò)傳輸數(shù)據(jù)大小。

          02.1.3 requestUpdateForTopic()

          ??public?synchronized?int?requestUpdateForTopic(String?topic)???{
          ?????//?如果新主題集合中存在該主題
          ?????if?(newTopics.contains(topic))?{
          ????????//?針對新主題集合標(biāo)記部分更新,并返回版本
          ????????return?requestUpdateForNewTopics();
          ?????}?else?{
          ????????//?全量更新,并返回版本
          ????????return?requestUpdate();
          ?????}
          ??}

          該方法用來判斷是全量更新元數(shù)據(jù)還是部分更新元數(shù)據(jù),邏輯相對比較簡單,主要用在 KafkaProducer 主線程元數(shù)據(jù)同步等待時調(diào)用,后續(xù)小節(jié)會詳細(xì)分析。

          02.1.4 update()

          ??public?synchronized?void?update(int?requestVersion,?MetadataResponse?response,?boolean?isPartialUpdate,?long?nowMs)?{
          ????????//?調(diào)用父類的update方法
          ????????super.update(requestVersion,?response,?isPartialUpdate,?nowMs);
          ????????//?如果新主題集合不為空,則遍歷響應(yīng)元數(shù)據(jù)找出已經(jīng)獲取元數(shù)據(jù)的主題,并從新主題集合中刪除
          ????????if?(!newTopics.isEmpty())?{
          ????????????for?(MetadataResponse.TopicMetadata?metadata?:?response.topicMetadata())?{
          ????????????????newTopics.remove(metadata.topic());
          ????????????}
          ????????}
          ????????//?喚醒等待元數(shù)據(jù)更新完成的線程
          ????????notifyAll();
          ?}

          該方法用來更新生產(chǎn)端元數(shù)據(jù)的,具體邏輯如下:

          1. 先調(diào)用父類的 update() 方法。
          2. 然后判斷新主題集合是否不為空,如果不為空則遍歷響應(yīng)元數(shù)據(jù)找出已經(jīng)獲取元數(shù)據(jù)的主題,并從新主題集合中刪除。
          3. 最后調(diào)用 notifyAll() 來喚醒等待元數(shù)據(jù)更新完成的線程。

          從上述 update()方法 中可以看出最后調(diào)用 notifyAll() 來喚醒阻塞的線程, 那么它是如何喚醒的呢,這就是接下來要分析的方法。

          02.1.5 awaitUpdate()

          ??public?synchronized?void?awaitUpdate(final?int?lastVersion,?final?long?timeoutMs)?throws?InterruptedException?{
          ????????long?currentTimeMs?=?time.milliseconds();
          ????????long?deadlineMs?=?currentTimeMs?+?timeoutMs?0???Long.MAX_VALUE?:?currentTimeMs?+?timeoutMs;
          ????????//?通過調(diào)用?time.waitObject?來實(shí)現(xiàn)線程阻塞
          ????????time.waitObject(this,?()?->?{
          ????????????//?Throw?fatal?exceptions,?if?there?are?any.?Recoverable?topic?errors?will?be?handled?by?the?caller.
          ????????????maybeThrowFatalException();
          ????????????return?updateVersion()?>?lastVersion?||?isClosed();
          ????????},?deadlineMs);
          ????????if?(isClosed())
          ????????????throw?new?KafkaException("Requested?metadata?update?after?close");
          ??}

          該方法用來實(shí)現(xiàn)線程阻塞的功能,用在主線程中如果發(fā)現(xiàn)主題對應(yīng)的元數(shù)據(jù)不存在時,阻塞并等待 Sender 線程將元數(shù)據(jù)更新完成。

          重點(diǎn)是調(diào)用了 time.waitObject() 方法來實(shí)現(xiàn)阻塞功能,它的實(shí)現(xiàn)還是有一些技巧的,它的底層通過調(diào)用 SystemTime 包里面的 waitObject() 實(shí)現(xiàn)的,源碼如下:

          ??public?void?waitObject(Object?obj,?Supplier?condition,?long?deadlineMs)?throws?InterruptedException?{
          ????????synchronized?(obj)?{
          ????????????while?(true)?{
          ????????????????//?判斷條件是否滿足即元數(shù)據(jù)是否更新成功,成功直接返回
          ????????????????if?(condition.get())
          ????????????????????return;
          ????????????????long?currentTimeMs?=?milliseconds();
          ????????????????//?超時拋出異常
          ????????????????if?(currentTimeMs?>=?deadlineMs)
          ????????????????????throw?new?TimeoutException("Condition?not?satisfied?before?deadline");
          ????????????????//?調(diào)用?wait?阻塞線程?
          ????????????????obj.wait(deadlineMs?-?currentTimeMs);
          ????????????}
          ????????}
          ?}
          1. 通過一個循環(huán)來判斷條件是否滿足,即元數(shù)據(jù)是否更新成功了,如果成功則跳出循環(huán),釋放鎖。
          2. 獲取當(dāng)前時間,判斷是否超時,如果超時后會拋出超時的異常。
          3. 如果未超時就調(diào)用 wait 方法來阻塞線程,直到滿足過期時間的條件,解除阻塞。

          如果你的項(xiàng)目中也需要類似的功能要實(shí)現(xiàn)一個鎖,可以參考這里的代碼,實(shí)現(xiàn)非常巧妙。

          接下來我們來重點(diǎn)分析下元數(shù)據(jù)基類。

          02.2 初探 Metadata

          首先它是一個線程安全的類,因此Metadata通過synchronized修飾幾乎所有方法來保證線程安全,里面封裝了元數(shù)據(jù)基本信息以及元數(shù)據(jù)的相關(guān)操作,主要被用在生產(chǎn)端的 「KafkaProducer 主線程」、「Sender 子線程」中,對于生產(chǎn)者來說只需要獲取自己發(fā)送的主題集合的元數(shù)據(jù),這樣可以有效降低網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量。因此,它內(nèi)部只維護(hù)部分主題的元數(shù)據(jù)。

          github 源碼地址如下:

          https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/Metadata.java

          https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java

          我們先來看下這個類的重要字段

          ?public?class?Metadata?implements?Closeable?{
          ????private?final?Logger?log;
          ????//?請求元數(shù)據(jù)失敗后重試間隔時間,默認(rèn)值:100ms
          ????private?final?long?refreshBackoffMs;
          ????//?元數(shù)據(jù)過期時間,默認(rèn)值:5分鐘
          ????private?final?long?metadataExpireMs;
          ????//?元數(shù)據(jù)版本號,每次請求服務(wù)端+1,保存在本地內(nèi)存中
          ????private?int?updateVersion;?
          ????//?元數(shù)據(jù)加入到新主題集合的版本號
          ????private?int?requestVersion;?
          ????//?最后一次更新元數(shù)據(jù)的時間
          ????private?long?lastRefreshMs;
          ????//?最后一次成功更新全部主題元數(shù)據(jù)的時間
          ????private?long?lastSuccessfulRefreshMs;
          ????//?失敗異常
          ????private?KafkaException?fatalException;
          ????//?無效的主題集合
          ????private?Set?invalidTopics;
          ????//?無權(quán)限的主題集合
          ????private?Set?unauthorizedTopics;
          ????//?元數(shù)據(jù)緩存
          ????private?MetadataCache?cache?=?MetadataCache.empty();
          ????//?是否全部主題更新
          ????private?boolean?needFullUpdate;
          ????//?是否部分主題更新
          ????private?boolean?needPartialUpdate;
          ????//?初始化?Metadata?
          ????public?Metadata(long?refreshBackoffMs,
          ????????????????????long?metadataExpireMs,
          ????????????????????LogContext?logContext,
          ????????????????????ClusterResourceListeners?clusterResourceListeners)
          ?
          {
          ????????this.refreshBackoffMs?=?refreshBackoffMs;
          ????????this.metadataExpireMs?=?metadataExpireMs;
          ????????this.lastRefreshMs?=?0L;
          ????????this.lastSuccessfulRefreshMs?=?0L;
          ????????this.requestVersion?=?0;
          ????????this.updateVersion?=?0;
          ????????this.needFullUpdate?=?false;
          ????????this.needPartialUpdate?=?false;
          ????????this.invalidTopics?=?Collections.emptySet();
          ????????this.unauthorizedTopics?=?Collections.emptySet();
          ????}
          1. refreshBackoffMs:請求元數(shù)據(jù)失敗后重試間隔時間,默認(rèn)值:100ms。
          2. metadataExpireMs:元數(shù)據(jù)過期時間,默認(rèn)值:5分鐘,時間一到會再次發(fā)送獲取元數(shù)據(jù)的請求。
          3. updateVersion:元數(shù)據(jù)版本號,每次請求服務(wù)端+1,保存在本地內(nèi)存中。
          4. requestVersion:元數(shù)據(jù)加入到新主題集合的版本號,每次+1。
          5. lastRefreshMs:最后一次更新元數(shù)據(jù)的時間。
          6. lastSuccessfulRefreshMs:最后一次成功更新全部主題元數(shù)據(jù)的時間。
          7. fatalException:失敗異常。
          8. invalidTopics:無效的主題集合。
          9. unauthorizedTopics:無權(quán)限的主題集合。
          10. MetadataCache:元數(shù)據(jù)緩存,客戶端真正存儲元數(shù)據(jù)的對象。
          11. needFullUpdate:是否全部主題更新,對生產(chǎn)者來說,全部主題是指最近發(fā)送的主題集合。
          12. needPartialUpdate:是否部分主題更新,對生產(chǎn)者來說,部分主題是指新發(fā)送的主題集合。

          介紹完字段后,我們來深度分析下幾個元數(shù)據(jù)用到的重要方法。

          02.2.1 bootstrap()

          ?public?synchronized?void?bootstrap(List?addresses)?{
          ????//?是否全部主題更新為true?
          ????this.needFullUpdate?=?true;
          ????//?版本更新為1
          ????this.updateVersion?+=?1;
          ????//?初始化元數(shù)據(jù)緩存
          ????this.cache?=?MetadataCache.bootstrap(addresses);
          ?}

          該方法用來引導(dǎo)啟動程序的,即在第一次使用前進(jìn)行初始化的工作。

          1. 由于此時生產(chǎn)者剛啟動,本地緩存中的元數(shù)據(jù)是空的,因此先將 needFullUpdate 置為 true,即需要全部主題進(jìn)行更新。
          2. 在初始化時將 updateVersion 置為 0,此時將版本更新+1。
          3. 調(diào)用元數(shù)據(jù)緩存類 MetadataCache.bootstrap() 初始化元數(shù)據(jù)緩存。

          我們來看下元數(shù)據(jù)緩存的啟動程序都做了哪些事情。

          02.2.2 MetadataCache.bootstrap()

          ?static?MetadataCache?bootstrap(List?addresses)?{
          ????????...
          ????????//?因還未獲取元數(shù)據(jù),此時元數(shù)據(jù)緩存都是空集合
          ????????return?new?MetadataCache(null,?nodes,?Collections.emptyList(),
          ????????????????Collections.emptySet(),?Collections.emptySet(),?Collections.emptySet(),
          ????????????????null,?Cluster.bootstrap(addresses));
          ?}

          此時獲取元數(shù)據(jù)被啟動了,但是還未獲取元首,所以元數(shù)據(jù)緩存都是空的集合,接下來我們來分析下元數(shù)據(jù)是如何被更新以及如何解析響應(yīng)的,這里涉及到以下幾個方法。

          02.2.3 requestUpdate()

          /**
          ???*?Request?an?update?of?the?current?cluster?metadata?info,?return?the?current?updateVersion?before?the?update
          ???*/

          ?public?synchronized?int?requestUpdate()?{
          ????//?全部主題更新設(shè)置為true
          ????this.needFullUpdate?=?true;
          ????//?返回元數(shù)據(jù)版本號
          ????return?this.updateVersion;
          ?}

          該方法用來設(shè)置全部主題更新的標(biāo)記,上來就先將 needFullUpdate 置為 true,即要全部更新主題,然后返回更新版本。 主要在「ProducerMetadata」、「sender 子線程」、「NetworkClient線程」中使用。

          02.2.4 requestUpdateForNewTopics()

          ?public?synchronized?int?requestUpdateForNewTopics()?{
          ????????//?重寫上次刷新的時間戳以允許立即更新。
          ????????this.lastRefreshMs?=?0;
          ????????//?部分主題更新設(shè)置為true
          ????????this.needPartialUpdate?=?true;
          ????????//?元數(shù)據(jù)加入到新主題集合的版本號?+?1
          ????????this.requestVersion++;
          ????????//?返回元數(shù)據(jù)版本號
          ????????return?this.updateVersion;
          ??}

          該方法用來設(shè)置新集合主題更新的標(biāo)記,這里并未真正發(fā)送更新元數(shù)據(jù)的請求,只是設(shè)置標(biāo)識位的值,Kafka必須確保在第一次拉消息前元數(shù)據(jù)是可用的,即必須更新一次元數(shù)據(jù)。

          1. 將 lastRefreshMs 設(shè)置為0,即重寫上次刷新的時間戳以允許立即更新。
          2. 將需要更新元數(shù)據(jù)的標(biāo)志位 needPartialUpdate 設(shè)置 true。
          3. 將元數(shù)據(jù)加入到新主題集合的版本號 requestVersion +1 。
          4. 返回元數(shù)據(jù)版本號。

          02.2.3 fetch()

          /**
          ??*?Get?the?current?cluster?info?without?blocking
          ??*/

          ?public?synchronized?Cluster?fetch()?{
          ??????//?返回元數(shù)據(jù)緩存
          ??????return?cache.cluster();
          ?}

          該方法用來獲取集群元數(shù)據(jù)的,默認(rèn)從元數(shù)據(jù)緩存中返回元數(shù)據(jù)。

          02.2.4 update()

          public?synchronized?void?update(int?requestVersion,?MetadataResponse?response,?boolean?isPartialUpdate,?long?nowMs)?{
          ????????.....
          ????????//?是否是部分主題更新標(biāo)記
          ????????this.needPartialUpdate?=?requestVersion?this.requestVersion;
          ????????//?最后一次更新元數(shù)據(jù)的時間為當(dāng)前時間
          ????????this.lastRefreshMs?=?nowMs;
          ????????//?元數(shù)據(jù)版本號?+1?
          ????????this.updateVersion?+=?1;
          ????????//?判斷非部分更新即全部主題更新
          ????????if?(!isPartialUpdate)?{
          ????????????//?全部主題更新標(biāo)記為否
          ????????????this.needFullUpdate?=?false;
          ????????????//?最后一次成功更新全部主題元數(shù)據(jù)的時間更新為當(dāng)前時間
          ????????????this.lastSuccessfulRefreshMs?=?nowMs;
          ????????}
          ????????String?previousClusterId?=?cache.clusterResource().clusterId();
          ????????//?解析元數(shù)據(jù)響應(yīng)結(jié)果
          ????????this.cache?=?handleMetadataResponse(response,?isPartialUpdate,?nowMs);
          ???????....
          }

          該方法主要做了幾件事情,具體如下:

          1. 設(shè)置是否是部分主題更新 needPartialUpdate,根據(jù)參數(shù):元數(shù)據(jù)加入到新主題集合的版本號 「requestVersion」與「元數(shù)據(jù)中的requestVersion」對比。
          2. 設(shè)置最后一次更新元數(shù)據(jù)的時間為當(dāng)前時間。
          3. 設(shè)置元數(shù)據(jù)版本號 +1 。
          4. 判斷是否全部主題更新,如果是則說明更新全部主題的響應(yīng)已經(jīng)收到了,此時將 needFullUpdate 標(biāo)記為否,lastSuccessfulRefreshMs 更新為當(dāng)前時間。
          5. 解析元數(shù)據(jù)響應(yīng)結(jié)果,并設(shè)置緩存。

          最后調(diào)用了 handleMetadataResponse 這個重要方法來解析元數(shù)據(jù)響應(yīng),接下來重點(diǎn)分析下它做了些什么。

          02.2.5 handleMetadataResponse()

          private?MetadataCache?handleMetadataResponse(MetadataResponse?metadataResponse,?boolean?isPartialUpdate,?long?nowMs)?{
          ????????//?All?encountered?topics.
          ????????Set?topics?=?new?HashSet<>();
          ????????//?初始化相關(guān)主題集合
          ????????Set?internalTopics?=?new?HashSet<>();
          ????????Set?unauthorizedTopics?=?new?HashSet<>();
          ????????Set?invalidTopics?=?new?HashSet<>();
          ????????List?partitions?=?new?ArrayList<>();
          ????????//?遍歷主題的元數(shù)據(jù)響應(yīng)
          ????????for?(MetadataResponse.TopicMetadata?metadata?:?metadataResponse.topicMetadata())?{
          ????????????//?將該主題添加到元數(shù)據(jù)主題集合中
          ????????????topics.add(metadata.topic());
          ????????????//?判斷是否保留主題元數(shù)據(jù)
          ????????????if?(!retainTopic(metadata.topic(),?metadata.isInternal(),?nowMs))
          ????????????????continue;
          ????????????//?判斷是否是內(nèi)部主題。
          ????????????if?(metadata.isInternal())
          ????????????????internalTopics.add(metadata.topic());
          ????????????//?判斷是否元數(shù)據(jù)響應(yīng)error為空
          ????????????if?(metadata.error()?==?Errors.NONE)?{
          ????????????????//?遍歷分區(qū)信息
          ????????????????for?(MetadataResponse.PartitionMetadata?partitionMetadata?:?metadata.partitionMetadata())?{
          ????????????????????....
          ????????????????????//?判斷分區(qū)元數(shù)據(jù)是否有無效異常
          ????????????????????if?(partitionMetadata.error.exception()?instanceof?InvalidMetadataException)?{
          ????????????????????????....
          ????????????????????????//?標(biāo)記全部主題更新
          ????????????????????????requestUpdate();
          ????????????????????}
          ????????????????}
          ????????????}?else?{?//?如果元數(shù)據(jù)響應(yīng)有錯誤
          ????????????????//?判斷是否是無效元數(shù)據(jù)異常
          ????????????????if?(metadata.error().exception()?instanceof?InvalidMetadataException)?{
          ????????????????????....
          ????????????????????//?標(biāo)記全部主題更新
          ????????????????????requestUpdate();
          ????????????????}
          ????????????????//?判斷是否無效主題錯誤
          ????????????????if?(metadata.error()?==?Errors.INVALID_TOPIC_EXCEPTION)
          ????????????????????//?將主題添加到無效主題集合中
          ????????????????????invalidTopics.add(metadata.topic());
          ????????????????//?判斷是否無權(quán)限主題錯誤
          ????????????????else?if?(metadata.error()?==?Errors.TOPIC_AUTHORIZATION_FAILED)
          ????????????????????//?將主題添加到無權(quán)限主題集合中
          ????????????????????unauthorizedTopics.add(metadata.topic());
          ????????????}
          ????????}

          ????????Map?nodes?=?metadataResponse.brokersById();
          ????????//?判斷是否部分主題更新
          ????????if?(isPartialUpdate)
          ????????????//?如果是則與現(xiàn)在的元數(shù)據(jù)緩存合并在一起
          ????????????return?this.cache.mergeWith(metadataResponse.clusterId(),?nodes,?partitions,
          ????????????????unauthorizedTopics,?invalidTopics,?internalTopics,?metadataResponse.controller(),
          ????????????????(topic,?isInternal)?->?!topics.contains(topic)?&&?retainTopic(topic,?isInternal,?nowMs));
          ????????else
          ????????????//?如果是全部主題更新的話,就重新初始化元數(shù)據(jù)緩存
          ????????????return?new?MetadataCache(metadataResponse.clusterId(),?nodes,?partitions,
          ????????????????unauthorizedTopics,?invalidTopics,?internalTopics,?metadataResponse.controller());
          ?}

          該方法用來解析元數(shù)據(jù)響應(yīng),我們具體分析下主流程的邏輯:

          1. 首先初始化相關(guān)集合:「內(nèi)部主題集合」、「無效主題集合」、「無權(quán)限主題集合」。
          2. 遍歷主題的元數(shù)據(jù)響應(yīng)。
          3. 將該主題添加到元數(shù)據(jù)主題集合中。
          4. 判斷是否保留主題元數(shù)據(jù),如果過期可能就沒必要保留了。
          5. 判斷是否是內(nèi)部主題,如果是內(nèi)部主題就添加到內(nèi)部主題集合。
          6. 判斷是否元數(shù)據(jù)響應(yīng)error為空的話
            • 如果為空就開始遍歷主題下的分區(qū)信息,更新本地元數(shù)據(jù)緩存。
            • 判斷分區(qū)元數(shù)據(jù)是否有無效異常,如果有則要打出相應(yīng)的日志,
            • 做好需要全部主題更新元數(shù)據(jù)的標(biāo)記,后續(xù)會提醒 Sender 線程去更新元數(shù)據(jù)。
          7. 否則如果元數(shù)據(jù)響應(yīng)error不為空的話
            • 先判斷是否是無效元數(shù)據(jù)異常,如果有則做好需要全部主題更新元數(shù)據(jù)的標(biāo)記,后續(xù)會提醒 Sender 線程去更新元數(shù)據(jù)。
            • 判斷是否無效主題錯誤,如果是將主題添加到無效主題集合中。
            • 判斷是否無權(quán)限主題錯誤,如果是將主題添加到無權(quán)限主題集合中。
          8. 判斷是否部分主題更新響應(yīng)
            • 如果是則與現(xiàn)在的元數(shù)據(jù)緩存合并在一起。
            • 否則就重新初始化元數(shù)據(jù)緩存。

          至此,元數(shù)據(jù)相關(guān)類的重要方法已經(jīng)分析完畢,剩余沒分析到的方法待到后續(xù)場景中進(jìn)行分析,接下來我們看下主線程是如何加載元數(shù)據(jù)的。

          02.3 初探 cluster

          github 源碼地址如下:

          https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/Cluster.java

          https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/Node.java

          https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java

          https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/TopicPartition.java

          ?public?final?class?Cluster?{
          ????private?final?boolean?isBootstrapConfigured;
          ????//?kafka集群中的broker節(jié)點(diǎn)
          ????private?final?List?nodes;
          ????//?未授權(quán)的主題集合
          ????private?final?Set?unauthorizedTopics;
          ????//?無效的主題集合
          ????private?final?Set?invalidTopics;
          ????//?內(nèi)部主題集合
          ????private?final?Set?internalTopics;
          ????//?controller?節(jié)點(diǎn)
          ????private?final?Node?controller;
          ????//?topic對應(yīng)的?partition?信息字典,存放的?partition?不一定有?Leader?副本,?鍵為topic,值為?partition?信息集合。
          ????private?final?Map?partitionsByTopicPartition;
          ????//?鍵為topic,值為可用?partition?信息集合,存放的?partition?一定有?Leader?副本
          ????private?final?Map>?partitionsByTopic;
          ????//?鍵為broker的id,值為partition?信息集合
          ????private?final?Map>?availablePartitionsByTopic;
          ????//?鍵為broker的id,值為表示該節(jié)點(diǎn)的node實(shí)例
          ????private?final?Map>?partitionsByNode;
          ????private?final?Map?nodesById;
          ????private?final?ClusterResource?clusterResource;

          Cluster類基本保存了所有的kafka集群相關(guān)的信息。

          Node broker節(jié)點(diǎn)的相關(guān)的信息

          ?public?class?Node?{
          ????//?broker?節(jié)點(diǎn)id
          ????private?final?int?id;
          ????//?broker?節(jié)點(diǎn)id的字符串
          ????private?final?String?idString;
          ????//?broker?節(jié)點(diǎn)的地址用于socket連接
          ????private?final?String?host;
          ????//?端口
          ????private?final?int?port;
          ????//?broker?節(jié)點(diǎn)的機(jī)架
          ????private?final?String?rack;
          ????//?初始化節(jié)點(diǎn)屬性
          ????public?Node(int?id,?String?host,?int?port,?String?rack)?{
          ????????this.id?=?id;
          ????????this.idString?=?Integer.toString(id);
          ????????this.host?=?host;
          ????????this.port?=?port;
          ????????this.rack?=?rack;
          ????}
          ?}

          PartitionInfo 分區(qū)信息

          ?public?class?PartitionInfo?{
          ????//?主題
          ????private?final?String?topic;
          ????//?分區(qū)編號id
          ????private?final?int?partition;
          ????//?分區(qū)?Leader?副本信息,唯一進(jìn)行通信的節(jié)點(diǎn)
          ????private?final?Node?leader;
          ????//?全部副本信息
          ????private?final?Node[]?replicas;
          ????//?ISR?副本信息,?follower角色
          ????private?final?Node[]?inSyncReplicas;
          ????//?離線副本信息
          ????private?final?Node[]?offlineReplicas;
          ????//?初始化分區(qū)信息
          ????public?PartitionInfo(String?topic,
          ?????????????????????????int?partition,
          ?????????????????????????Node?leader,
          ?????????????????????????Node[]?replicas,
          ?????????????????????????Node[]?inSyncReplicas,
          ?????????????????????????Node[]?offlineReplicas)
          ?
          {
          ????????this.topic?=?topic;
          ????????this.partition?=?partition;
          ????????this.leader?=?leader;
          ????????this.replicas?=?replicas;
          ????????this.inSyncReplicas?=?inSyncReplicas;
          ????????this.offlineReplicas?=?offlineReplicas;
          ????}
          ?}

          TopicPartition

          每個topic和每個分區(qū)組成的唯一索引,代表一個分區(qū)標(biāo)識。

          ?public?final?class?TopicPartition?implements?Serializable?{
          ????//?hash值,用來hashCode方法緩存
          ????private?int?hash?=?0;
          ????//?分區(qū)編號
          ????private?final?int?partition;
          ????//?主題名稱
          ????private?final?String?topic;
          ????//?初始化主題分區(qū)索引
          ????public?TopicPartition(String?topic,?int?partition)?{
          ????????this.partition?=?partition;
          ????????this.topic?=?topic;
          ????}
          ????//?計算hash值
          ????public?int?hashCode()?{
          ????????if?(hash?!=?0)
          ????????????return?hash;
          ????????final?int?prime?=?31;
          ????????int?result?=?1;
          ????????result?=?prime?*?result?+?partition;
          ????????result?=?prime?*?result?+?Objects.hashCode(topic);
          ????????this.hash?=?result;
          ????????return?result;
          ????}
          ?}

          綜上映射關(guān)系可以看出,kafka 是以「分區(qū)」為最小管理單元,然后分區(qū)中的 Leader 負(fù)責(zé)交互。「Cluster」代表整個kafka集群的實(shí)體類,「MetaData」的角色相當(dāng)于「Cluster」的在客戶端的一個維護(hù)者。

          02.4 主線程加載元數(shù)據(jù)

          我們先來看加載元數(shù)據(jù)大體過程。

          接下來分析詳細(xì)源碼,首先客戶端可以直接調(diào)用「 producer.send() 」進(jìn)行發(fā)送,底層調(diào)用 doSend() 方法。

          //?向?topic?異步地發(fā)送數(shù)據(jù),當(dāng)發(fā)送確認(rèn)后喚起回調(diào)函數(shù)
          public?Future?send(ProducerRecord?record,?Callback?callback)?{
          ???ProducerRecord?interceptedRecord?=?this.interceptors.onSend(record);
          ???//?調(diào)用?doSend?發(fā)送,支持回調(diào)函數(shù)
          ???return?doSend(interceptedRecord,?callback);
          }

          private?Future?doSend(ProducerRecord?record,?Callback?callback)?{
          ????????TopicPartition?tp?=?null;
          ????????try?{
          ????????????//?假如?sender?線程為空或者不運(yùn)行,報錯
          ????????????throwIfProducerClosed();
          ????????????//?當(dāng)前時間【毫秒】?
          ????????????long?nowMs?=?time.milliseconds();
          ????????????ClusterAndWaitTime?clusterAndWaitTime;
          ????????????try?{
          ????????????????//?等待元數(shù)據(jù)更新
          ????????????????clusterAndWaitTime?=?waitOnMetadata(record.topic(),?record.partition(),?nowMs,?maxBlockTimeMs);
          ????????????}?catch?(KafkaException?e)?{
          ????????????????if?(metadata.isClosed())
          ????????????????????throw?new?KafkaException("Producer?closed?while?send?in?progress",?e);
          ????????????????throw?e;
          ????????????}
          ????????????nowMs?+=?clusterAndWaitTime.waitedOnMetadataMs;
          ????????????long?remainingWaitMs?=?Math.max(0,?maxBlockTimeMs?-?clusterAndWaitTime.waitedOnMetadataMs);
          ????????????Cluster?cluster?=?clusterAndWaitTime.cluster;
          ????????????....
          ????????????//?將消息記錄追加到消息緩沖區(qū)
          ????????????RecordAccumulator.RecordAppendResult?result?=?accumulator.append(tp,?timestamp,?serializedKey,
          ????????????????????serializedValue,?headers,?interceptCallback,?remainingWaitMs,?true,?nowMs);
          ????????????????????
          ????????????//?當(dāng)批次滿了或者新批次被創(chuàng)建了,開始喚醒sender線程數(shù)據(jù)發(fā)送
          ????????????if?(result.batchIsFull?||?result.newBatchCreated)?{
          ????????????????this.sender.wakeup();
          ????????????}
          ????????????return?result.future;
          ????????}?catch?(ApiException?e)?{
          ????????????....
          ????????????return?new?FutureFailure(e);
          ????????}?catch?(InterruptedException?e)?{
          ????????????....
          ????????????throw?new?InterruptException(e);
          ????????}?catch?(KafkaException?e)?{
          ????????????....
          ????????????throw?e;
          ????????}?catch?(Exception?e)?{
          ????????????....
          ????????????throw?e;
          ????????}
          ?}

          從上述發(fā)送的源碼中,可以看出來主線程在發(fā)送消息前需要先獲取元數(shù)據(jù),這樣才能知道消息要發(fā)送到哪些節(jié)點(diǎn)。

          中間通過調(diào)用 waitOnMetadata() 獲取元數(shù)據(jù)的相關(guān)信息,為后續(xù)發(fā)送消息提供支持,接下來我們重點(diǎn)分析下 waitOnMetadata() 這個方法。

          private?ClusterAndWaitTime?waitOnMetadata(String?topic,?Integer?partition,?long?nowMs,?long?maxWaitMs)?throws?InterruptedException?{
          ????????//?1.?從元數(shù)據(jù)緩存中獲取元數(shù)據(jù)
          ????????Cluster?cluster?=?metadata.fetch();
          ????????//?2.?判斷該主題是否是無效的主題
          ????????if?(cluster.invalidTopics().contains(topic))
          ????????????throw?new?InvalidTopicException(topic);
          ????????//?3.?將該主題放入元數(shù)據(jù)主題列表中
          ????????metadata.add(topic,?nowMs);
          ????????//?4.?從元數(shù)據(jù)緩存中獲取主題對應(yīng)的分區(qū)數(shù)
          ????????Integer?partitionsCount?=?cluster.partitionCountForTopic(topic);
          ????????//?5.?滿足這些條件后就不用拉取元數(shù)據(jù),直接從元數(shù)據(jù)緩存中返回
          ????????if?(partitionsCount?!=?null?&&?(partition?==?null?||?partition?????????????//?返回元數(shù)據(jù)信息
          ????????????return?new?ClusterAndWaitTime(cluster,?0);

          ????????long?remainingWaitMs?=?maxWaitMs;
          ????????long?elapsed?=?0;
          ????????//?6.?不停的輪詢喚醒?sender?線程更新元數(shù)據(jù)
          ????????do?{
          ????????????//?7.?將主題?topic及過期時間添加到元數(shù)據(jù)主題列表中
          ????????????metadata.add(topic,?nowMs?+?elapsed);
          ????????????//?8.?標(biāo)記元數(shù)據(jù)更新標(biāo)識,獲取元數(shù)據(jù)版本號
          ????????????int?version?=?metadata.requestUpdateForTopic(topic);
          ????????????//?9.?喚醒?sender?子線程
          ????????????sender.wakeup();
          ????????????try?{
          ????????????????//?10.?阻塞線程等待元數(shù)據(jù)更新成功
          ????????????????metadata.awaitUpdate(version,?remainingWaitMs);
          ????????????}?catch?(TimeoutException?ex)?{
          ????????????????...
          ????????????}
          ????????????//?11.?到這里,應(yīng)該是當(dāng)前cluster更新成功了,再次獲取元數(shù)據(jù)
          ????????????cluster?=?metadata.fetch();
          ????????????//?12.?計算等待更新完成元數(shù)據(jù)消耗時間
          ????????????elapsed?=?time.milliseconds()?-?nowMs;
          ????????????//?13.?如果超時,拋超時異常
          ????????????if?(elapsed?>=?maxWaitMs)?{
          ???????????????...
          ????????????}
          ????????????metadata.maybeThrowExceptionForTopic(topic);
          ????????????remainingWaitMs?=?maxWaitMs?-?elapsed;
          ????????????//?14.?獲取元數(shù)據(jù)分區(qū)數(shù)
          ????????????partitionsCount?=?cluster.partitionCountForTopic(topic);
          ????????}?while?(partitionsCount?==?null?||?(partition?!=?null?&&?partition?>=?partitionsCount));
          ????????//?15.?返回元數(shù)據(jù)以及消耗時間
          ????????return?new?ClusterAndWaitTime(cluster,?elapsed);
          ?}

          該方法用來獲取元數(shù)據(jù)以及元數(shù)據(jù)消耗時間,我們具體分析下主流程的邏輯:

          1. 從元數(shù)據(jù)緩存中獲取元數(shù)據(jù),如果metadata不存在當(dāng)前topic的元數(shù)據(jù),會觸發(fā)一次強(qiáng)制刷新,metaData中的needUpdate置為true。
          2. 判斷該主題是否是無效的主題, 如果是拋異常。
          3. 將該主題放入元數(shù)據(jù)的主題列表中,通過 Sender 線程定時更新這些主題的元數(shù)據(jù)。
          4. 從元數(shù)據(jù)緩存中獲取主題對應(yīng)的分區(qū)數(shù)。
          5. 判斷元數(shù)據(jù)緩存是否能滿足需要,就是說要能夠找到要發(fā)送消息的主題分區(qū),條件是:partitionsCount != null && (partition == null || partition < partitionsCount) ?即「主題對應(yīng)的分區(qū)數(shù)不能為空且發(fā)送的分區(qū)ID要小于主題的分區(qū)數(shù)」。
          6. 不停的輪詢喚醒 sender 線程更新元數(shù)據(jù),這里有兩個條件要滿足其一才可以。
            • partitionsCount == null 「從元數(shù)據(jù)緩存中獲取主題對應(yīng)的分區(qū)數(shù)為空」,這里要發(fā)送的主題連Leader分區(qū)都沒有,可能主題分區(qū)根本不存在也可能沒拉取到最新分區(qū)信息,如果真的不存在就沒必要繼續(xù)直接拋異常。
            • partition != null && partition >= partitionsCount「分區(qū)不為空且要發(fā)送的分區(qū)ID大于主題分區(qū)數(shù)」,這里說明了主題分區(qū)的數(shù)量增加了,需要重新拉取下獲取最新分區(qū)信息。
          7. 將主題 topic及過期時間添加到元數(shù)據(jù)主題列表中
          8. 標(biāo)記元數(shù)據(jù)更新標(biāo)識,并獲取當(dāng)前元數(shù)據(jù)版本號,提醒 Sender 線程更新元數(shù)據(jù)。
          9. 喚醒 sender 子線程進(jìn)行元數(shù)據(jù)更新以及消息發(fā)送。
          10. 阻塞主線程并等待Sender線程更新元數(shù)據(jù)成功。
          11. 待 「Sender 子線程更新元數(shù)據(jù)成功」或者「阻塞超時解除阻塞」,再次獲取元數(shù)據(jù)。
          12. 計算等待更新完成元數(shù)據(jù)消耗時間。
          13. 如果超時,拋超時異常。maxWaitMs 最大1分鐘。
          14. 獲取元數(shù)據(jù)分區(qū)數(shù)。
          15. 返回元數(shù)據(jù)以及消耗時間。

          聰明的讀者可能發(fā)現(xiàn)在生產(chǎn)者中獲取元數(shù)據(jù)都是基于topic的,主要原因就是對于生產(chǎn)者來說,沒必要拉取全部的元數(shù)據(jù),只拉取自己需要的主題元數(shù)據(jù)就可以了

          至此,元數(shù)據(jù)加載已經(jīng)分析完了, 接下來我們看下 Sender 子線程是如何拉取元數(shù)據(jù)的。

          03 Sender 線程拉取元數(shù)據(jù)

          由于 Sender 線程的深度分析不是本文的重點(diǎn),這里先簡單的給大家?guī)б粠В罄m(xù)會有專門篇章去分析。

          github 源碼地址如下:

          https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

          https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

          我們來看看 Sender 線程拉取元數(shù)據(jù)的大體過程。

          接下來我們詳細(xì)分析具體的源碼。

          03.1 Sender 周期執(zhí)行

          通過源碼可以得知 Sender 是一個 Runnable 對象,那整個 Sender 線程執(zhí)行的核心邏輯就在 run() 方法中,run() 方法中的第一段代碼就是循環(huán)調(diào)用 runOnce() 方法:

          ?public?class?Sender?implements?Runnable?{
          ?????public?void?run()?{
          ????????//?running?字段用來標(biāo)識當(dāng)前?Sender?線程是否正常執(zhí)行
          ????????while?(running)?{
          ????????????try?{
          ????????????????//?如果正常運(yùn)行,則執(zhí)行運(yùn)行周期
          ????????????????runOnce();
          ????????????}?catch?(Exception?e)?{
          ????????????????log.error("Uncaught?error?in?kafka?producer?I/O?thread:?",?e);
          ????????????}
          ????????}
          ????}
          ????
          ????void?runOnce()?{
          ????????.....??//?此處省略事務(wù)消息相關(guān)的處理邏輯
          ????????long?currentTimeMs?=?time.milliseconds();
          ????????//?創(chuàng)建發(fā)送到?kafka?集群的請求
          ????????long?pollTimeout?=?sendProducerData(currentTimeMs);
          ????????//?真正執(zhí)行網(wǎng)絡(luò)IO的地方,會將請求發(fā)送出去,并處理收到的響應(yīng)
          ????????client.poll(pollTimeout,?currentTimeMs);
          ????}
          ?}

          上述源碼中的 runOnce() 方法是 Sender 線程一個執(zhí)行的周期,在這個周期中會進(jìn)行一次批量的請求發(fā)送,并進(jìn)行一次響應(yīng)的處理

          接下來我們看里面2個涉及到元數(shù)據(jù)的重要方法:

          03.1.1 sendProducerData()

          ??private?long?sendProducerData(long?now)?{
          ????????//?從元數(shù)據(jù)緩存中獲取元數(shù)據(jù)
          ????????Cluster?cluster?=?metadata.fetch();
          ????????//?通過元數(shù)據(jù)cluster獲取要發(fā)送的節(jié)點(diǎn)?Leader?分區(qū)信息
          ????????RecordAccumulator.ReadyCheckResult?result?=?this.accumulator.ready(cluster,?now);
          ????????//?如果主題的?Leader?分區(qū)對應(yīng)的節(jié)點(diǎn)不存在就強(qiáng)制更新元數(shù)據(jù)
          ????????if?(!result.unknownLeaderTopics.isEmpty())?{
          ????????????for?(String?topic?:?result.unknownLeaderTopics)
          ????????????????//?將主題加入元數(shù)據(jù)主題列表
          ????????????????this.metadata.add(topic,?now);
          ????????????//?強(qiáng)制標(biāo)記元數(shù)據(jù)更新標(biāo)識
          ????????????this.metadata.requestUpdate();
          ????????}
          ??????.....?//?忽略能發(fā)送的節(jié)點(diǎn)數(shù)據(jù)
          ??}

          該方法用來進(jìn)行 Sender 線程創(chuàng)建請求的核心,這里只分析下元數(shù)據(jù)涉及的部分:

          1. 從元數(shù)據(jù)緩存中獲取元數(shù)據(jù)。
          2. 通過元數(shù)據(jù)cluster獲取要發(fā)送的節(jié)點(diǎn) Leader 分區(qū)信息。
          3. 如果主題的 Leader 分區(qū)對應(yīng)的節(jié)點(diǎn)不存在就強(qiáng)制更新元數(shù)據(jù)。
            • 循環(huán)無 Leader 分區(qū)的主題,將主題加入元數(shù)據(jù)主題列表。
            • 強(qiáng)制標(biāo)記元數(shù)據(jù)更新標(biāo)識。
            • 對于無 Leader 分區(qū)的主題,可能分區(qū)正在選主中,也可能 Leader 分區(qū)所在節(jié)點(diǎn) Crash,所以要強(qiáng)制更新元數(shù)據(jù)保證元數(shù)據(jù)一致性。

          03.1.2 client.poll()

          public?class?NetworkClient?implements?KafkaClient?{
          ???public?List?poll(long?timeout,?long?now)?{
          ????????....
          ????????//?嘗試更新元數(shù)據(jù)
          ????????long?metadataTimeout?=?metadataUpdater.maybeUpdate(now);
          ????????try?{
          ????????????//?執(zhí)行IO操作
          ????????????this.selector.poll(Utils.min(timeout,?metadataTimeout,?defaultRequestTimeoutMs));
          ????????}?catch?(IOException?e)?{
          ????????????log.error("Unexpected?error?during?I/O",?e);
          ????????}
          ????????....
          ????????List?responses?=?new?ArrayList<>();
          ????????//?處理更新命令的返回
          ????????handleCompletedReceives(responses,?updatedNow);
          ????????return?responses;
          ???}
          }

          該方法用來發(fā)送網(wǎng)絡(luò)IO請求,在請求之前先執(zhí)行嘗試更新元數(shù)據(jù)的請求

          03.2 元數(shù)據(jù)更新觸發(fā)時機(jī)

          03.2.1 metadataUpdater.maybeUpdate()

          ?public?class?NetworkClient?implements?KafkaClient?{
          ????private?NetworkClient(MetadataUpdater?metadataUpdater,
          ??????????????????????????Metadata?metadata,
          ??????????????????????????Selectable?selector,
          ??????????????????????????String?clientId,
          ??????????????????????????int?maxInFlightRequestsPerConnection,
          ??????????????????????????long?reconnectBackoffMs,
          ??????????????????????????long?reconnectBackoffMax,
          ??????????????????????????int?socketSendBuffer,
          ??????????????????????????int?socketReceiveBuffer,
          ??????????????????????????int?defaultRequestTimeoutMs,
          ??????????????????????????long?connectionSetupTimeoutMs,
          ??????????????????????????long?connectionSetupTimeoutMaxMs,
          ??????????????????????????ClientDnsLookup?clientDnsLookup,
          ??????????????????????????Time?time,
          ??????????????????????????boolean?discoverBrokerVersions,
          ??????????????????????????ApiVersions?apiVersions,
          ??????????????????????????Sensor?throttleTimeSensor,
          ??????????????????????????LogContext?logContext)
          ?
          {
          ???????
          ????????if?(metadataUpdater?==?null)?{
          ????????????//?通過這里可以看出?metadataUpdate?實(shí)例化對象
          ????????????this.metadataUpdater?=?new?DefaultMetadataUpdater(metadata);
          ????????}?else?{
          ????????????this.metadataUpdater?=?metadataUpdater;
          ????????}
          ????????...
          ????}?
          ?}

          從初始化函數(shù)中可以得出元數(shù)據(jù)更新組件是調(diào)用 NetworkClient 內(nèi)部類即 DefaultMetadataUpdater,前面已多次提到更新集群元數(shù)據(jù)的場景,而這些更新操作都是在標(biāo)記集群元數(shù)據(jù)是否需要更新,而真正執(zhí)行更新的操作是這里。接下來我們看下這個類的 maybeUpdate() 方法。

          03.2.2 public maybeUpdate()

          ??public?long?maybeUpdate(long?now)?{
          ??????//?計算下次要更新元數(shù)據(jù)的時間,其中會檢測needUpdate的值、退避時間、是否長時間未更新
          ??????long?timeToNextMetadataUpdate?=?metadata.timeToNextUpdate(now);
          ??????//?檢測是否已經(jīng)發(fā)送了元數(shù)據(jù)的請求,如果一個元數(shù)據(jù)的請求,還未從服務(wù)端返回,那么時間設(shè)置為?waitForMetadataFetch(默認(rèn)30s)
          ??????long?waitForMetadataFetch?=?hasFetchInProgress()???defaultRequestTimeoutMs?:?0;
          ??????//?計算元數(shù)據(jù)超時時間
          ??????long?metadataTimeout?=?Math.max(timeToNextMetadataUpdate,?waitForMetadataFetch);
          ??????if?(metadataTimeout?>?0)?{
          ??????????return?metadataTimeout;
          ??????}
          ??????//?表示需要立即更新,取最空閑的節(jié)點(diǎn)node
          ??????Node?node?=?leastLoadedNode(now);
          ??????//?發(fā)送更新元數(shù)據(jù)的請求?????
          ??????return?maybeUpdate(now,?node);
          ??}

          從這段源碼中可以看出上來先計算下次要更新元數(shù)據(jù)的時間,我們看下這個計算過程。

          public?class?Metadata?implements?Closeable?{
          ??//?計算下次更新元數(shù)據(jù)的時間
          ??public?synchronized?long?timeToNextUpdate(long?nowMs)?{
          ??????long?timeToExpire?=?updateRequested()???0?:?Math.max(this.lastSuccessfulRefreshMs?+?this.metadataExpireMs?-?nowMs,?0);
          ??????return?Math.max(timeToExpire,?timeToAllowUpdate(nowMs));
          ??}
          ??
          ??//?判斷這2個標(biāo)識是否為true
          ??public?synchronized?boolean?updateRequested()?{
          ??????return?this.needFullUpdate?||?this.needPartialUpdate;
          ??}
          ??//?計算允許更新元數(shù)據(jù)的時機(jī)
          ??public?synchronized?long?timeToAllowUpdate(long?nowMs)?{
          ??????return?Math.max(this.lastRefreshMs?+?this.refreshBackoffMs?-?nowMs,?0);
          ??}
          }
          1. 元數(shù)據(jù)是否過期 timeToExpire,計算方式:
            • 首先會通過 updateRequested() 方法檢查 Metadata 中的 「needFullUpdate」「needPartialUpdate」,如果這兩個標(biāo)識位為 true,表示 Metadata 需要立即更新,
            • 否則計算上次更新成功的時間距離當(dāng)前時間是否已經(jīng)超過了指定的元數(shù)據(jù)過期時間閾值metadataExpireMs「默認(rèn)5分鐘」。
          2. 允許更新的時間點(diǎn) timeToAllowUpdate,計算方式:
            • 上次更新時間 + 退避時間 - 當(dāng)前時間的間隔 「要求上次更新時間與當(dāng)前時間的間隔不能大于退避時間,如果大于則需要等待」。
          3. 最后計算這倆值的最大值作為下次更新元數(shù)據(jù)的時間。

          分析完元數(shù)據(jù)更新時機(jī),最后調(diào)用 maybeUpdate(now, node) 發(fā)送更新元數(shù)據(jù)的請求。

          03.2.3 private maybeUpdate()

          ?private?long?maybeUpdate(long?now,?Node?node)?{
          ??????String?nodeConnectionId?=?node.idString();
          ??????//?判斷當(dāng)前node的狀態(tài)是否可以發(fā)送Request請求
          ??????if?(canSendRequest(nodeConnectionId,?now))?{
          ??????????//?構(gòu)建元數(shù)據(jù)請求
          ??????????Metadata.MetadataRequestAndVersion?requestAndVersion?=?metadata.newMetadataRequestAndVersion(now);
          ??????????MetadataRequest.Builder?metadataRequest?=?requestAndVersion.requestBuilder;
          ??????????//?向?nodeConnectionId?發(fā)送元數(shù)據(jù)請求
          ??????????sendInternalMetadataRequest(metadataRequest,?nodeConnectionId,?now);
          ??????????inProgress?=?new?InProgressData(requestAndVersion.requestVersion,?requestAndVersion.isPartialUpdate);
          ??????????return?defaultRequestTimeoutMs;
          ??????}
          ??????//?判斷Node是否正在連接
          ??????if?(isAnyNodeConnecting())?{
          ??????????return?reconnectBackoffMs;
          ??????}
          ??????//?如果存在可用的Node,則嘗試初始化連接
          ??????if?(connectionStates.canConnect(nodeConnectionId,?now))?{
          ??????????//?初始化與node的連接
          ??????????initiateConnect(node,?now);
          ??????????return?reconnectBackoffMs;
          ??????}
          ??????//?阻塞等待有新的節(jié)點(diǎn)可用
          ??????return?Long.MAX_VALUE;
          ??}

          該方法用來發(fā)送更新元數(shù)據(jù)的請求,具體的實(shí)現(xiàn)邏輯如下:

          1. canSendRequest() 判斷當(dāng)前node的狀態(tài)是否可以發(fā)送Request請求, 如果可以發(fā)送則構(gòu)建元數(shù)據(jù)請求,調(diào)用 sendInternalMetadataRequest() 向 nodeConnectionId 發(fā)送元數(shù)據(jù)請求。
          2. isAnyNodeConnecting() 如果該 node 正在建立連接,則直接返回重新連接超時時間,等待更新成功。
          3. connectionStates.canConnect() 如果存在可用的Node,則嘗試初始化連接,返回重新連接超時時間,等待更新成功。
          4. 阻塞等待有新的節(jié)點(diǎn)可用。

          從上述源碼可以看出:更新流程一直在重試,直到元數(shù)據(jù)更新成功為止。

          1. Sender 子線程第一次調(diào)用 poll() 方法時,嘗試初始化與 node 的連接。
          2. Sender 子線程第二次調(diào)用 poll() 方法時,發(fā)送 Metadata 請求。
          3. Sender 子線程會阻塞等待一定時間,當(dāng)有響應(yīng)返回時則獲取 metadataResponse,并更新 metadata。

          如果元數(shù)據(jù)更新成功以后,KafkaProducer 主線程就不會被阻塞,當(dāng) NetworkClient 接收到服務(wù)端對 Metadata 請求的響應(yīng)后,就會更新 Metadata 信息, 即 poll() 方法后續(xù)的操作。

          03.2.4 handleCompletedReceives()

          ??private?void?handleCompletedReceives(List?responses,?long?now)?{
          ????for?(NetworkReceive?receive?:?this.selector.completedReceives())?{
          ????????....
          ????????//?如果是MetadataResponse類的響應(yīng),由metadataUpdater來處理
          ????????if?(req.isInternalRequest?&&?response?instanceof?MetadataResponse)
          ??????????//?處理成功返回的響應(yīng)信息
          ??????????metadataUpdater.handleSuccessfulResponse(req.header,?now,?(MetadataResponse)?response);
          ????????....
          ???}
          ?}

          該方法用來判斷成功接收服務(wù)端返回的響應(yīng),根據(jù)不同的響應(yīng)返回做不同的操作,這里只看下跟元數(shù)據(jù)更新有關(guān)的邏輯。

          03.2.5 handleSuccessfulResponse()

          ??public?void?handleSuccessfulResponse(RequestHeader?requestHeader,?long?now,?MetadataResponse?response)?{
          ?????....
          ????//?Check?if?any?topic's?metadata?failed?to?get?updated
          ????Map?errors?=?response.errors();
          ????//?如果返回錯誤,直接報錯
          ????if?(!errors.isEmpty())
          ????????log.warn("Error?while?fetching?metadata?with?correlation?id?{}?:?{}",?requestHeader.correlationId(),?errors);
          ????//?判斷broker信息是否為空,如果為空表示沒有獲得元數(shù)據(jù)
          ????if?(response.brokers().isEmpty())?{
          ????????//?更新失敗
          ????????this.metadata.failedUpdate(now);
          ????}?else?{
          ????????//?如果成功?則開始更新元數(shù)據(jù)
          ????????this.metadata.update(inProgress.requestVersion,?response,?inProgress.isPartialUpdate,?now);
          ????}
          ????inProgress?=?null;
          ?}
          ?
          ?//?元數(shù)據(jù)更新失敗
          ?public?synchronized?void?failedUpdate(long?now)?{
          ?????//?最后一次更新元數(shù)據(jù)的時間為當(dāng)前時間
          ?????this.lastRefreshMs?=?now;
          ?}

          該方法用來對成功返回信息進(jìn)行處理,主要是對元數(shù)據(jù)信息的更新。

          1. 查看 response 返回信息的 error。
          2. 判斷broker信息是否為空
            • 如果為空表示沒有獲得元數(shù)據(jù)即更新失敗了,然后調(diào)用 failedUpdate(now)方法記錄 lastRefreshMs 為當(dāng)前時間,不允許立即更新元數(shù)據(jù)。
            • 如果不為空表示獲取元數(shù)據(jù)成功,則調(diào)用 MetaData.update() 更新元數(shù)據(jù)。

          至此,Sender 子線程拉取并更新元數(shù)據(jù)分析完畢。

          最后通過一張圖來描述整個元數(shù)據(jù)拉取的全過程:

          05 總結(jié)

          這里,我們一起來總結(jié)一下這篇文章的重點(diǎn)。

          1、通過「場景驅(qū)動」的方式從元數(shù)據(jù)的使用場景出發(fā),拋出主線程加載元數(shù)據(jù)和子線程拉取元數(shù)據(jù)的過程是怎樣的?

          2、帶你梳理了「主線程如何加載元數(shù)據(jù)源碼全貌」,包括 ProducerMetadata 類、Metadata 元數(shù)據(jù)基類、cluster類的幾個重要方法源碼分析,最后分析主線程加載元數(shù)據(jù)過程。

          3、又帶你梳理了「Sender 線程拉取元數(shù)據(jù)」,包括 Sender 周期執(zhí)行、元數(shù)據(jù)更新觸發(fā)時機(jī)的幾個重要方法源碼分析。

          3、最后通過一張元數(shù)據(jù)流程圖來勾勒出元數(shù)據(jù)拉取和更新的全貌。

          下一篇我們來深度剖析「Kafka NIO實(shí)現(xiàn)」,大家期待,我們下期見。

          如另外歡迎加入我的技術(shù)交流群,關(guān)注本公眾號并添加我個人微信,邀您進(jìn)群。

          如果我的文章對你有所幫助,還請幫忙點(diǎn)贊、在看、轉(zhuǎn)發(fā)一下,非常感謝!

          堅持總結(jié), 持續(xù)輸出高質(zhì)量文章 ?關(guān)注我: 華仔聊技術(shù)

          ? ? ? ? ? ? ? ? ? ? ?

          ? ? ? ? ? ? ? ? ?點(diǎn)個“贊”和“在看”鼓勵一下嘛~

          瀏覽 44
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  粉嫩99精品99久久久久久特污兔 | 亚洲AV无码高清在线观看 | 影音先锋久久久久AV综合网成人 | 久久成人麻豆影视 | 人人摸天天操 |