聊聊 Kafka:Producer Metadata 讀取與更新機制
一、前言
我們上一篇說了 聊聊 Kafka:Producer 源碼解析,這一篇我們來說下 Producer Metadata 的讀取與更新機制。上一篇從宏觀上介紹了 Producer 的宏觀模型,其中通過 waitOnMetadata() 方法獲取 topic 的 metadata 信息這一塊東西很多,所以單獨拎一篇出來講。
二、Metadata
2.1 什么是 Metadata
Metadata 是指 Kafka 集群的元數(shù)據(jù),包含了 Kafka 集群的各種信息,直接看源碼便可知:
public class Metadata implements Closeable {
private final Logger log;
// retry.backoff.ms: 默認值為100ms,它用來設(shè)定兩次重試之間的時間間隔,避免無效的頻繁重試。
private final long refreshBackoffMs;
// metadata.max.age.ms: 默認值為300000,如果在這個時間內(nèi)元數(shù)據(jù)沒有更新的話會被強制更新。
private final long metadataExpireMs;
// 更新版本號,每更新成功1次,version自增1,主要是用于判斷metadata是否更新
private int updateVersion;
// 請求版本號,每發(fā)送一次請求,version自增1
private int requestVersion;
// 上一次更新的時間(包含更新失?。?/span>
private long lastRefreshMs;
// 上一次更新成功的時間
private long lastSuccessfulRefreshMs;
private KafkaException fatalException;
// 非法的topics
private Set<String> invalidTopics;
// 未認證的topics
private Set<String> unauthorizedTopics;
// 元數(shù)據(jù)信息的Cache緩存
private MetadataCache cache = MetadataCache.empty();
private boolean needFullUpdate;
private boolean needPartialUpdate;
// 會收到metadata updates的Listener列表
private final ClusterResourceListeners clusterResourceListeners;
private boolean isClosed;
// 存儲Partition最近一次的leaderEpoch
private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
}
MetadataCache:Kafka 集群中關(guān)于 node、topic 和 partition 的信息。(是只讀的)
public class MetadataCache {
private final String clusterId;
private final Map<Integer, Node> nodes;
private final Set<String> unauthorizedTopics;
private final Set<String> invalidTopics;
private final Set<String> internalTopics;
private final Node controller;
private final Map<TopicPartition, PartitionMetadata> metadataByPartition;
private Cluster clusterInstance;
}
關(guān)于 topic 的詳細信息(leader 所在節(jié)點、replica 所在節(jié)點、isr 列表)都是在 Cluster 實例中保存的。
// 保存了Kafka集群中部分nodes、topics和partitions的信息
public final class Cluster {
private final boolean isBootstrapConfigured;
// node 列表
private final List<Node> nodes;
// 未認證的topics
private final Set<String> unauthorizedTopics;
// 非法的topics
private final Set<String> invalidTopics;
// kafka內(nèi)置的topics
private final Set<String> internalTopics;
private final Node controller;
// partition對應(yīng)的信息,如:leader所在節(jié)點、所有的副本、ISR中的副本、offline的副本
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
// topic和partition信息的對應(yīng)關(guān)系
private final Map<String, List<PartitionInfo>> partitionsByTopic;
// topic和可用partition(leader不為null)的對應(yīng)關(guān)系
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
// node和partition信息的對應(yīng)關(guān)系
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
// 節(jié)點id與節(jié)點的對應(yīng)關(guān)系
private final Map<Integer, Node> nodesById;
// 集群信息,里面只有一個clusterId
private final ClusterResource clusterResource;
}
// topic-partition: 包含 topic、partition、leader、replicas、isr
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;
}
看源碼不難理解 Metadata 的主要數(shù)據(jù)結(jié)構(gòu),我們大概總結(jié)下包含哪些信息:
集群中有哪些節(jié)點;
集群中有哪些 topic,這些 topic 有哪些 partition;
每個 partition 的 leader 副本分配在哪個節(jié)點上,follower 副本分配在哪些節(jié)點上;
每個 partition 的 AR 有哪些副本,ISR 有哪些副本;
2.2 Metadata 的應(yīng)用場景
Metadata 在 Kafka 中非常重要,很多場景中都需要從 Metadata 中獲取數(shù)據(jù)或更新數(shù)據(jù),例如:
KafkaProducer 發(fā)送一條消息到指定的 topic 中,需要知道分區(qū)的數(shù)量,要發(fā)送的目標分區(qū),目標分區(qū)的 leader,leader 所在的節(jié)點地址等,這些信息都要從 Metadata 中獲取。
當 Kafka 集群中發(fā)生了 leader 選舉,節(jié)點中 partition 或副本發(fā)生了變化等,這些場景都需要更新Metadata 中的數(shù)據(jù)。
三、Producer 的 Metadata 更新流程
Producer 在調(diào)用 doSend() 方法時,第一步就是通過 waitOnMetadata 方法獲取該 topic 的 metadata 信息。


總結(jié)一下以上代碼:
首先會從緩存中獲取 cluster 信息,并從中獲取 partition 信息,如果可以取到則返回當前的 cluster 信息,如果不含有所需要的 partition 信息時就會更新 metadata;
更新 metadata 的操作會在一個 do ….while 循環(huán)中進行,直到 metadata 中含有所需 partition 的信息,該循環(huán)中主要做了以下事情:
調(diào)用 metadata.requestUpdateForTopic() 方法來獲取 updateVersion,即上一次更新成功時的 version,并將 needUpdate 設(shè)為 true,強制更新;
調(diào)用 sender.wakeup() 方法來喚醒 Sender 線程,Sender 線程中又會喚醒 NetworkClient 線程,在 NetworkClient 中會對 UpdateMetadataRequest 請求進行操作,待會下面會詳細介紹;
調(diào)用 metadata.awaitUpdate(version, remainingWaitMs) 方法來等待 metadata 的更新,通過比較當前的 updateVersion 與步驟 1 中獲取的 updateVersion 來判斷是否更新成功;
3.1 org.apache.kafka.clients.NetworkClient#poll
上面提到調(diào)用 sender.wakeup() 方法來喚醒 Sender 線程,Sender 線程中又會喚醒 NetworkClient 線程,在 NetworkClient 中會對 UpdateMetadataRequest 請求進行操作。在 NetworkClient 中真正處理請求的是 NetworkClient.poll() 方法,接下來讓我們通過分析源碼來看下 NetworkClient 是如何處理請求的。

3.2 org.apache.kafka.clients.NetworkClient.DefaultMetadataUpdater#maybeUpdate(long)
我們來看下 metadata 是如何更新的

這里你可能會問,老周啊,最小負載節(jié)點是啥呀?
別急,我們來看下面這張圖,你就理解了。

LeastLoadedNode 指 Kafka 集群中所有 Node 中負載最小的那一個 Node,它是由每個 Node 在 InFlightRequests 中還未確定的請求數(shù)決定的,未確定的請求越少則負載越小。如上圖所示,Node1 即為 LeastLoadedNode。
3.3 org.apache.kafka.clients.Metadata#updateRequested

下次更新元數(shù)據(jù)信息的時間:
當前 metadata 信息即將到期的時間即 timeToExpire 和 距離允許更新 metadata 信息的時間 即 timeToAllowUpdate 中的最大值。timeToExpire:needUpdate 為 true,表示強制更新,此時該值為 0;否則的話,就按照定時更新時間,即元數(shù)據(jù)信息過期時間(默認是 300000 ms 即 5 分鐘)進行周期性更新。
timeToAllowUpdate:默認就是 refreshBackoffMs 的默認值,即 100 ms。
3.4 org.apache.kafka.clients.NetworkClient.DefaultMetadataUpdater#maybeUpdate(long, org.apache.kafka.common.Node)
我們繼續(xù)跟一下 maybeUpdate 方法:

因此,每次 producer 請求更新 metadata 時,會有以下幾種情況:
通道已經(jīng) ready,node 可以發(fā)送請求,那么就直接發(fā)送請求。
如果該 node 正在建立連接,則直接返回。
如果該 node 還沒建立連接,則向 broker 初始化連接。
而 KafkaProducer 線程一直是阻塞在兩個 while 循環(huán)中的,直到 metadata 更新:
sender 線程第一次調(diào)用 poll,初始化與 node 的連接。
sender 線程第二次調(diào)用 poll,發(fā)送 metadata 請求。
sender 線程第三次調(diào)用 poll,獲取 metadataResponse,并更新 metadata。
3.5 接收 Server 端的響應(yīng),更新 Metadata 信息
handleCompletedReceives 是如何處理任何已完成的接收響應(yīng),如下:

之后進一步調(diào)用 handleSuccessfulResponse。
四、總結(jié)
Metadata 會在下面兩種情況下進行更新:
強制更新:調(diào)用 Metadata.requestUpdate() 將 needFullUpdate 置為 true 來強制更新。
周期性更新:通過 Metadata 的 lastSuccessfulRefreshMs 和 metadataExpireMs 來實現(xiàn),一般情況下,默認周期時間就是 metadataExpireMs,5 分鐘時長。
在 NetworkClient 的 poll() 方法調(diào)用時,會去檢查兩種更新機制,只要達到一種,就會觸發(fā)更新操作。
Metadata 的強制更新會在以下幾種情況下進行:
initConnect 方法調(diào)用時,初始化連接;
poll() 方法中對 handleDisconnections() 方法調(diào)用來處理連接斷開的情況,這時會觸發(fā)強制更新;
poll() 方法中對 handleTimedOutRequests() 來處理請求超時時;
發(fā)送消息時,如果無法找到 partition 的 leader;
處理 Producer 響應(yīng)(handleProduceResponse),如果返回關(guān)于 Metadata 過期的異常,比如:沒有 topic-partition 的相關(guān) meta 或者 client 沒有權(quán)限獲取其 metadata。
強制更新主要是用于處理各種異常情況。
好了,Producer Metadata 讀取與更新機制就說到這,我們下一期再見。
歡迎大家關(guān)注我的公眾號【老周聊架構(gòu)】,Java后端主流技術(shù)棧的原理、源碼分析、架構(gòu)以及各種互聯(lián)網(wǎng)高并發(fā)、高性能、高可用的解決方案。
