<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          聊聊 Kafka:Producer Metadata 讀取與更新機制

          共 8598字,需瀏覽 18分鐘

           ·

          2021-10-01 09:51

          一、前言

          我們上一篇說了 聊聊 Kafka:Producer 源碼解析,這一篇我們來說下 Producer Metadata 的讀取與更新機制。上一篇從宏觀上介紹了 Producer 的宏觀模型,其中通過 waitOnMetadata() 方法獲取 topic 的 metadata 信息這一塊東西很多,所以單獨拎一篇出來講。

          二、Metadata

          2.1 什么是 Metadata

          Metadata 是指 Kafka 集群的元數(shù)據(jù),包含了 Kafka 集群的各種信息,直接看源碼便可知:

          public class Metadata implements Closeable {
              private final Logger log;
              // retry.backoff.ms: 默認值為100ms,它用來設(shè)定兩次重試之間的時間間隔,避免無效的頻繁重試。
              private final long refreshBackoffMs;
              // metadata.max.age.ms: 默認值為300000,如果在這個時間內(nèi)元數(shù)據(jù)沒有更新的話會被強制更新。
              private final long metadataExpireMs;
              // 更新版本號,每更新成功1次,version自增1,主要是用于判斷metadata是否更新
              private int updateVersion;
              // 請求版本號,每發(fā)送一次請求,version自增1
              private int requestVersion;
              // 上一次更新的時間(包含更新失?。?/span>
              private long lastRefreshMs;
              // 上一次更新成功的時間
              private long lastSuccessfulRefreshMs;
              private KafkaException fatalException;
              // 非法的topics
              private Set<String> invalidTopics;
              // 未認證的topics
              private Set<String> unauthorizedTopics;
              // 元數(shù)據(jù)信息的Cache緩存
              private MetadataCache cache = MetadataCache.empty();
              private boolean needFullUpdate;
              private boolean needPartialUpdate;
              // 會收到metadata updates的Listener列表
              private final ClusterResourceListeners clusterResourceListeners;
              private boolean isClosed;
              // 存儲Partition最近一次的leaderEpoch
              private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
          }

          MetadataCache:Kafka 集群中關(guān)于 node、topic 和 partition 的信息。(是只讀的)

          public class MetadataCache {
              private final String clusterId;
              private final Map<Integer, Node> nodes;
              private final Set<String> unauthorizedTopics;
              private final Set<String> invalidTopics;
              private final Set<String> internalTopics;
              private final Node controller;
              private final Map<TopicPartition, PartitionMetadata> metadataByPartition;
              private Cluster clusterInstance;
          }

          關(guān)于 topic 的詳細信息(leader 所在節(jié)點、replica 所在節(jié)點、isr 列表)都是在 Cluster 實例中保存的。

          // 保存了Kafka集群中部分nodes、topics和partitions的信息
          public final class Cluster {
              private final boolean isBootstrapConfigured;
              // node 列表
              private final List<Node> nodes;
              // 未認證的topics
              private final Set<String> unauthorizedTopics;
              // 非法的topics
              private final Set<String> invalidTopics;
              // kafka內(nèi)置的topics
              private final Set<String> internalTopics;
              private final Node controller;
              // partition對應(yīng)的信息,如:leader所在節(jié)點、所有的副本、ISR中的副本、offline的副本
              private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
              // topic和partition信息的對應(yīng)關(guān)系
              private final Map<String, List<PartitionInfo>> partitionsByTopic;
              // topic和可用partition(leader不為null)的對應(yīng)關(guān)系
              private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
              // node和partition信息的對應(yīng)關(guān)系
              private final Map<Integer, List<PartitionInfo>> partitionsByNode;
              // 節(jié)點id與節(jié)點的對應(yīng)關(guān)系
              private final Map<Integer, Node> nodesById;
              // 集群信息,里面只有一個clusterId
              private final ClusterResource clusterResource;
          }
          // topic-partition: 包含 topic、partition、leader、replicas、isr
          public class PartitionInfo {
              private final String topic;
              private final int partition;
              private final Node leader;
              private final Node[] replicas;
              private final Node[] inSyncReplicas;
              private final Node[] offlineReplicas;
          }

          看源碼不難理解 Metadata 的主要數(shù)據(jù)結(jié)構(gòu),我們大概總結(jié)下包含哪些信息:

          • 集群中有哪些節(jié)點;

          • 集群中有哪些 topic,這些 topic 有哪些 partition;

          • 每個 partition 的 leader 副本分配在哪個節(jié)點上,follower 副本分配在哪些節(jié)點上;

          • 每個 partition 的 AR 有哪些副本,ISR 有哪些副本;

          2.2 Metadata 的應(yīng)用場景

          Metadata 在 Kafka 中非常重要,很多場景中都需要從 Metadata 中獲取數(shù)據(jù)或更新數(shù)據(jù),例如:

          • KafkaProducer 發(fā)送一條消息到指定的 topic 中,需要知道分區(qū)的數(shù)量,要發(fā)送的目標分區(qū),目標分區(qū)的 leader,leader 所在的節(jié)點地址等,這些信息都要從 Metadata 中獲取。

          • 當 Kafka 集群中發(fā)生了 leader 選舉,節(jié)點中 partition 或副本發(fā)生了變化等,這些場景都需要更新Metadata 中的數(shù)據(jù)。

          三、Producer 的 Metadata 更新流程

          Producer 在調(diào)用 doSend() 方法時,第一步就是通過 waitOnMetadata 方法獲取該 topic 的 metadata 信息。



          總結(jié)一下以上代碼:

          • 首先會從緩存中獲取 cluster 信息,并從中獲取 partition 信息,如果可以取到則返回當前的 cluster 信息,如果不含有所需要的 partition 信息時就會更新 metadata;

          • 更新 metadata 的操作會在一個 do ….while 循環(huán)中進行,直到 metadata 中含有所需 partition 的信息,該循環(huán)中主要做了以下事情:

            • 調(diào)用 metadata.requestUpdateForTopic() 方法來獲取 updateVersion,即上一次更新成功時的 version,并將 needUpdate 設(shè)為 true,強制更新;

            • 調(diào)用 sender.wakeup() 方法來喚醒 Sender 線程,Sender 線程中又會喚醒 NetworkClient 線程,在 NetworkClient 中會對 UpdateMetadataRequest 請求進行操作,待會下面會詳細介紹;

            • 調(diào)用 metadata.awaitUpdate(version, remainingWaitMs) 方法來等待 metadata 的更新,通過比較當前的 updateVersion 與步驟 1 中獲取的 updateVersion 來判斷是否更新成功;

          3.1 org.apache.kafka.clients.NetworkClient#poll

          上面提到調(diào)用 sender.wakeup() 方法來喚醒 Sender 線程,Sender 線程中又會喚醒 NetworkClient 線程,在 NetworkClient 中會對 UpdateMetadataRequest 請求進行操作。在 NetworkClient 中真正處理請求的是 NetworkClient.poll() 方法,接下來讓我們通過分析源碼來看下 NetworkClient 是如何處理請求的。



          3.2 org.apache.kafka.clients.NetworkClient.DefaultMetadataUpdater#maybeUpdate(long)

          我們來看下 metadata 是如何更新的



          這里你可能會問,老周啊,最小負載節(jié)點是啥呀?

          別急,我們來看下面這張圖,你就理解了。


          LeastLoadedNode 指 Kafka 集群中所有 Node 中負載最小的那一個 Node,它是由每個 Node 在 InFlightRequests 中還未確定的請求數(shù)決定的,未確定的請求越少則負載越小。如上圖所示,Node1 即為 LeastLoadedNode。

          3.3 org.apache.kafka.clients.Metadata#updateRequested


          下次更新元數(shù)據(jù)信息的時間:當前 metadata 信息即將到期的時間即 timeToExpire 和 距離允許更新 metadata 信息的時間 即 timeToAllowUpdate 中的最大值。

          timeToExpire:needUpdate 為 true,表示強制更新,此時該值為 0;否則的話,就按照定時更新時間,即元數(shù)據(jù)信息過期時間(默認是 300000 ms 即 5 分鐘)進行周期性更新。

          timeToAllowUpdate:默認就是 refreshBackoffMs 的默認值,即 100 ms。

          3.4 org.apache.kafka.clients.NetworkClient.DefaultMetadataUpdater#maybeUpdate(long, org.apache.kafka.common.Node)

          我們繼續(xù)跟一下 maybeUpdate 方法:


          因此,每次 producer 請求更新 metadata 時,會有以下幾種情況:

          • 通道已經(jīng) ready,node 可以發(fā)送請求,那么就直接發(fā)送請求。

          • 如果該 node 正在建立連接,則直接返回。

          • 如果該 node  還沒建立連接,則向 broker 初始化連接。

          而 KafkaProducer 線程一直是阻塞在兩個 while 循環(huán)中的,直到 metadata 更新:

          • sender 線程第一次調(diào)用 poll,初始化與 node 的連接。

          • sender 線程第二次調(diào)用 poll,發(fā)送 metadata 請求。

          • sender 線程第三次調(diào)用 poll,獲取 metadataResponse,并更新 metadata。

          3.5 接收 Server 端的響應(yīng),更新 Metadata 信息

          handleCompletedReceives 是如何處理任何已完成的接收響應(yīng),如下:

          之后進一步調(diào)用 handleSuccessfulResponse。

          四、總結(jié)

          Metadata 會在下面兩種情況下進行更新:

          • 強制更新:調(diào)用 Metadata.requestUpdate() 將 needFullUpdate 置為 true 來強制更新。

          • 周期性更新:通過 Metadata 的 lastSuccessfulRefreshMs 和 metadataExpireMs 來實現(xiàn),一般情況下,默認周期時間就是 metadataExpireMs,5 分鐘時長。

          在 NetworkClient 的 poll() 方法調(diào)用時,會去檢查兩種更新機制,只要達到一種,就會觸發(fā)更新操作。

          Metadata 的強制更新會在以下幾種情況下進行:

          • initConnect 方法調(diào)用時,初始化連接;

          • poll() 方法中對 handleDisconnections() 方法調(diào)用來處理連接斷開的情況,這時會觸發(fā)強制更新;

          • poll() 方法中對 handleTimedOutRequests() 來處理請求超時時;

          • 發(fā)送消息時,如果無法找到 partition 的 leader;

          • 處理 Producer 響應(yīng)(handleProduceResponse),如果返回關(guān)于 Metadata 過期的異常,比如:沒有 topic-partition 的相關(guān) meta 或者 client 沒有權(quán)限獲取其 metadata。

          強制更新主要是用于處理各種異常情況。

          好了,Producer Metadata 讀取與更新機制就說到這,我們下一期再見。



          關(guān)構(gòu),Java術(shù)、、構(gòu)聯(lián)網(wǎng)發(fā)、、。

          瀏覽 61
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  一卡二卡国产精品 | A片免费观看视频 | www.俺来也.com | 噜噜无码高清 | 一区二区无码激情 |