Kafka的運(yùn)維利器-AdminClient
點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”
回復(fù)”面試“獲取更多驚喜

前言
一般情況下,我們都習(xí)慣使用kafka-topics.sh腳本來(lái)管理主題,但有些時(shí)候我們希望將主題管理類(lèi)的功能集成到公司內(nèi)部的系統(tǒng)中,打造集管理、監(jiān)控、運(yùn)維、告警為一體的生態(tài)平臺(tái),那么就需要以程序調(diào)用API的方式去實(shí)現(xiàn)。
Kafka社區(qū)于0.11版本正式推出了Java客戶端版的AdminClient,并不斷地在后續(xù)的版本中對(duì)它進(jìn)行完善。
本文主要介紹KafkaAdminClient 的基本使用方式,以及采用這種調(diào)用API方式下的創(chuàng)建主題時(shí)的合法性驗(yàn)證。
功能
鑒于社區(qū)還在不斷地完善 AdminClient 的功能,AdminClient 提供的功能有以下幾個(gè)大類(lèi)。
主題管理:包括主題的創(chuàng)建、刪除和查詢。 權(quán)限管理:包括具體權(quán)限的配置與刪除。 配置參數(shù)管理:包括 Kafka 各種資源的參數(shù)設(shè)置、詳情查詢。所謂的 Kafka 資源,主要有 Broker、主題、用戶、Client-id 等。 副本日志管理:包括副本底層日志路徑的變更和詳情查詢。 分區(qū)管理:即創(chuàng)建額外的主題分區(qū)。 消息刪除:即刪除指定位移之前的分區(qū)消息。 Delegation Token 管理:包括 Delegation Token 的創(chuàng)建、更新、過(guò)期和詳情查詢。 消費(fèi)者組管理:包括消費(fèi)者組的查詢、位移查詢和刪除。 Preferred 領(lǐng)導(dǎo)者選舉:推選指定主題分區(qū)的 Preferred Broker 為領(lǐng)導(dǎo)者。
工作原理
AdminClient 是一個(gè)雙線程的設(shè)計(jì):前端主線程和后端 I/O 線程。
前端線程負(fù)責(zé)將用戶要執(zhí)行的操作轉(zhuǎn)換成對(duì)應(yīng)的請(qǐng)求,然后再將請(qǐng)求發(fā)送到后端 I/O 線程的隊(duì)列中。
而后端 I/O 線程(kafka-admin-client-thread)從隊(duì)列中讀取相應(yīng)的請(qǐng)求,然后發(fā)送到對(duì)應(yīng)的 Broker 節(jié)點(diǎn)上,之后把執(zhí)行結(jié)果保存起來(lái),以便等待前端線程的獲取。
使用
如果你使用的是 Maven,需要增加以下依賴項(xiàng):
????org.apache.kafka
????kafka-clients
????2.6.5
構(gòu)建AdminClient
/**
?*?創(chuàng)建AdminClient客戶端對(duì)象
?*/
public?static?AdminClient?createAdminClientByProperties()?{
??Properties?prop?=?new?Properties();
??//?配置Kafka服務(wù)的訪問(wèn)地址及端口號(hào)
??prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
??//?創(chuàng)建AdminClient實(shí)例
??return?AdminClient.create(prop);
}
/**
?*?創(chuàng)建AdminClient的第二種方式
?*/
public?static?AdminClient?createAdminClientByMap(){
??Map?conf?=?Maps.newHashMap();
??//?配置Kafka服務(wù)的訪問(wèn)地址及端口號(hào)
??conf.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
??//?創(chuàng)建AdminClient實(shí)例
??return?AdminClient.create(conf);
}
創(chuàng)建Topic實(shí)例
private?static?final?String?TOPIC_NAME?=?"test_topic";
/**
?*?創(chuàng)建Topic實(shí)例
?*/
public?static?void?createTopic(){
????AdminClient?adminClient?=?AdminSample.adminClient();
????//副本因子
????Short?re?=?1;
????NewTopic?newTopic?=?new?NewTopic(TOPIC_NAME,1,re);
????CreateTopicsResult?createTopicsResult?=?adminClient.createTopics(Arrays.asList(newTopic));
????System.out.println("CreateTopicsResult?:?"?+?createTopicsResult);
????adminClient.close();
}
查詢Topic列表
private?static?final?String?TOPIC_NAME?=?"test_topic";
/**
?*?獲取topic列表
?*/
public?static?void?topicList()?throws?Exception?{
????AdminClient?adminClient?=?adminClient();
????//是否查看Internal選項(xiàng)
????ListTopicsOptions?options?=?new?ListTopicsOptions();
????options.listInternal(true);
????//ListTopicsResult?listTopicsResult?=?adminClient.listTopics();
????ListTopicsResult?listTopicsResult?=?adminClient.listTopics(options);
????Set?names?=?listTopicsResult.names().get();
????//打印names
????names.stream().forEach(System.out::println);
????Collection?topicListings?=?listTopicsResult.listings().get();
????//打印TopicListing
????topicListings.stream().forEach((topicList)?->?{
????????System.out.println(topicList.toString());
????});
????adminClient.close();
}
刪除topic
private?static?final?String?TOPIC_NAME?=?"test_topic";
/**
?*?刪除topic
?*/
public?static?void?delTopic()?throws?Exception?{
????AdminClient?adminClient?=?adminClient();
????DeleteTopicsResult?deleteTopicsResult?=?adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
????deleteTopicsResult.all().get();
}
描述topic
/**
?*?獲取topic的描述信息
?*/
public?static?void?describeTopics(List?topics)?throws?Exception?{
????//?創(chuàng)建AdminClient客戶端對(duì)象
????AdminClient?adminClient?=?BuildAdminClient.createAdminClientByProperties();
????//?獲取Topic的描述信息
????DescribeTopicsResult?result?=?adminClient.describeTopics(topics);
????//?解析描述信息結(jié)果,?Map?==>?topicName:topicDescription
????Map?topicDescriptionMap?=?result.all().get();
????topicDescriptionMap.forEach((topicName,?description)?->?System.out.printf("topic?name?=?%s,?desc?=?%s?\n",?topicName,?description));
????//?關(guān)閉資源
????adminClient.close();
}
查看 Topic 的配置信息
除了Kafka自身的配置項(xiàng)外,其內(nèi)部的Topic也會(huì)有非常多的配置項(xiàng),我們可以通過(guò)describeConfigs方法來(lái)獲取某個(gè)Topic中的配置項(xiàng)信息。代碼示例:
/**
?*?獲取topic的配置信息
?*/
public?static?void?describeConfigTopics(List?topicNames)?throws?Exception?{
????//?創(chuàng)建AdminClient客戶端對(duì)象
????AdminClient?adminClient?=?BuildAdminClient.createAdminClientByMap();
????List?configResources?=?Lists.newArrayListWithCapacity(64);
????topicNames.forEach(topicName?->?configResources.add(
????????????//?指定要獲取的源
????????????new?ConfigResource(ConfigResource.Type.TOPIC,?topicName)));
????//?獲取topic的配置信息
????DescribeConfigsResult?result?=?adminClient.describeConfigs(configResources);
????//?解析topic的配置信息
????Map?resourceConfigMap?=?result.all().get();
????resourceConfigMap.forEach((configResource,?config)?->?System.out.printf("topic?config?ConfigResource?=?%s,?Config?=?%s?\n",?configResource,?config));
????//?關(guān)閉資源
????adminClient.close();
}
修改 Topic 的分區(qū)數(shù)量
在創(chuàng)建Topic時(shí)我們需要設(shè)定Partition的數(shù)量,但如果覺(jué)得初始設(shè)置的Partition數(shù)量太少了,那么就可以使用createPartitions方法來(lái)調(diào)整Topic的Partition數(shù)量,但是需要注意在Kafka中Partition只能增加不能減少。代碼示例:
/**
?*?修改topic的分區(qū)數(shù)量
?*?只能增加不能減少
?*/
public?static?void?updateTopicPartition(List?topicNames,?Integer?partitionNum)?throws?Exception?{
????//?創(chuàng)建AdminClient客戶端對(duì)象
????AdminClient?adminClient?=?BuildAdminClient.createAdminClientByMap();
????//?構(gòu)建修改分區(qū)的topic請(qǐng)求參數(shù)
????Map?newPartitions?=?Maps.newHashMap();
????topicNames.forEach(topicName?->?newPartitions.put(topicName,?NewPartitions.increaseTo(partitionNum)));
????//?執(zhí)行修改
????CreatePartitionsResult?result?=?adminClient.createPartitions(newPartitions);
????//?get方法是一個(gè)阻塞方法,一定要等到createPartitions完成之后才進(jìn)行下一步操作
????result.all().get();
????//?關(guān)閉資源
????adminClient.close();
}
社區(qū)于 0.11 版本正式推出了 Java 客戶端版的 AdminClient 工具,該工具提供了幾十種運(yùn)維操作,而且它還在不斷地演進(jìn)著。如果可以的話,你最好統(tǒng)一使用 AdminClient 來(lái)執(zhí)行各種 Kafka 集群管理操作,摒棄掉連接 ZooKeeper 的那些工具。另外,建議時(shí)刻關(guān)注該工具的功能完善情況,畢竟,目前社區(qū)對(duì) AdminClient 的變更頻率很高。

八千里路云和月 | 從零到大數(shù)據(jù)專(zhuān)家學(xué)習(xí)路徑指南
我們?cè)趯W(xué)習(xí)Flink的時(shí)候,到底在學(xué)習(xí)什么?
193篇文章暴揍Flink,這個(gè)合集你需要關(guān)注一下
Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS
Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問(wèn)題小盤(pán)點(diǎn)
我們?cè)趯W(xué)習(xí)Spark的時(shí)候,到底在學(xué)習(xí)什么?
在所有Spark模塊中,我愿稱SparkSQL為最強(qiáng)!
硬剛Hive | 4萬(wàn)字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)
數(shù)據(jù)治理方法論和實(shí)踐小百科全書(shū)
標(biāo)簽體系下的用戶畫(huà)像建設(shè)小指南
4萬(wàn)字長(zhǎng)文 | ClickHouse基礎(chǔ)&實(shí)踐&調(diào)優(yōu)全視角解析
【面試&個(gè)人成長(zhǎng)】2021年過(guò)半,社招和校招的經(jīng)驗(yàn)之談
大數(shù)據(jù)方向另一個(gè)十年開(kāi)啟 |《硬剛系列》第一版完結(jié)
我寫(xiě)過(guò)的關(guān)于成長(zhǎng)/面試/職場(chǎng)進(jìn)階的文章
當(dāng)我們?cè)趯W(xué)習(xí)Hive的時(shí)候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」
