<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Kafka的運(yùn)維利器-AdminClient

          共 5742字,需瀏覽 12分鐘

           ·

          2021-11-19 14:37

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”

          回復(fù)”面試“獲取更多驚喜

          前言

          一般情況下,我們都習(xí)慣使用kafka-topics.sh腳本來(lái)管理主題,但有些時(shí)候我們希望將主題管理類(lèi)的功能集成到公司內(nèi)部的系統(tǒng)中,打造集管理、監(jiān)控、運(yùn)維、告警為一體的生態(tài)平臺(tái),那么就需要以程序調(diào)用API的方式去實(shí)現(xiàn)。

          Kafka社區(qū)于0.11版本正式推出了Java客戶端版的AdminClient,并不斷地在后續(xù)的版本中對(duì)它進(jìn)行完善。

          本文主要介紹KafkaAdminClient 的基本使用方式,以及采用這種調(diào)用API方式下的創(chuàng)建主題時(shí)的合法性驗(yàn)證。

          功能

          鑒于社區(qū)還在不斷地完善 AdminClient 的功能,AdminClient 提供的功能有以下幾個(gè)大類(lèi)。

          • 主題管理:包括主題的創(chuàng)建、刪除和查詢。
          • 權(quán)限管理:包括具體權(quán)限的配置與刪除。
          • 配置參數(shù)管理:包括 Kafka 各種資源的參數(shù)設(shè)置、詳情查詢。所謂的 Kafka 資源,主要有 Broker、主題、用戶、Client-id 等。
          • 副本日志管理:包括副本底層日志路徑的變更和詳情查詢。
          • 分區(qū)管理:即創(chuàng)建額外的主題分區(qū)。
          • 消息刪除:即刪除指定位移之前的分區(qū)消息。
          • Delegation Token 管理:包括 Delegation Token 的創(chuàng)建、更新、過(guò)期和詳情查詢。
          • 消費(fèi)者組管理:包括消費(fèi)者組的查詢、位移查詢和刪除。
          • Preferred 領(lǐng)導(dǎo)者選舉:推選指定主題分區(qū)的 Preferred Broker 為領(lǐng)導(dǎo)者。

          工作原理

          AdminClient 是一個(gè)雙線程的設(shè)計(jì):前端主線程和后端 I/O 線程。

          前端線程負(fù)責(zé)將用戶要執(zhí)行的操作轉(zhuǎn)換成對(duì)應(yīng)的請(qǐng)求,然后再將請(qǐng)求發(fā)送到后端 I/O 線程的隊(duì)列中。

          而后端 I/O 線程(kafka-admin-client-thread)從隊(duì)列中讀取相應(yīng)的請(qǐng)求,然后發(fā)送到對(duì)應(yīng)的 Broker 節(jié)點(diǎn)上,之后把執(zhí)行結(jié)果保存起來(lái),以便等待前端線程的獲取。

          使用

          如果你使用的是 Maven,需要增加以下依賴項(xiàng):


          ????org.apache.kafka
          ????kafka-clients
          ????2.6.5

          構(gòu)建AdminClient

          /**
          ?*?創(chuàng)建AdminClient客戶端對(duì)象
          ?*/
          public?static?AdminClient?createAdminClientByProperties()?{

          ??Properties?prop?=?new?Properties();

          ??//?配置Kafka服務(wù)的訪問(wèn)地址及端口號(hào)
          ??prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");

          ??//?創(chuàng)建AdminClient實(shí)例
          ??return?AdminClient.create(prop);
          }

          /**
          ?*?創(chuàng)建AdminClient的第二種方式
          ?*/
          public?static?AdminClient?createAdminClientByMap(){

          ??Map?conf?=?Maps.newHashMap();

          ??//?配置Kafka服務(wù)的訪問(wèn)地址及端口號(hào)
          ??conf.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");

          ??//?創(chuàng)建AdminClient實(shí)例
          ??return?AdminClient.create(conf);
          }

          創(chuàng)建Topic實(shí)例

          private?static?final?String?TOPIC_NAME?=?"test_topic";

          /**
          ?*?創(chuàng)建Topic實(shí)例
          ?*/
          public?static?void?createTopic(){
          ????AdminClient?adminClient?=?AdminSample.adminClient();
          ????//副本因子
          ????Short?re?=?1;
          ????NewTopic?newTopic?=?new?NewTopic(TOPIC_NAME,1,re);
          ????CreateTopicsResult?createTopicsResult?=?adminClient.createTopics(Arrays.asList(newTopic));
          ????System.out.println("CreateTopicsResult?:?"?+?createTopicsResult);
          ????adminClient.close();
          }

          查詢Topic列表

          private?static?final?String?TOPIC_NAME?=?"test_topic";

          /**
          ?*?獲取topic列表
          ?*/
          public?static?void?topicList()?throws?Exception?{
          ????AdminClient?adminClient?=?adminClient();

          ????//是否查看Internal選項(xiàng)
          ????ListTopicsOptions?options?=?new?ListTopicsOptions();
          ????options.listInternal(true);

          ????//ListTopicsResult?listTopicsResult?=?adminClient.listTopics();
          ????ListTopicsResult?listTopicsResult?=?adminClient.listTopics(options);
          ????Set?names?=?listTopicsResult.names().get();

          ????//打印names
          ????names.stream().forEach(System.out::println);

          ????Collection?topicListings?=?listTopicsResult.listings().get();
          ????//打印TopicListing
          ????topicListings.stream().forEach((topicList)?->?{
          ????????System.out.println(topicList.toString());
          ????});
          ????adminClient.close();
          }

          刪除topic

          private?static?final?String?TOPIC_NAME?=?"test_topic";

          /**
          ?*?刪除topic
          ?*/
          public?static?void?delTopic()?throws?Exception?{
          ????AdminClient?adminClient?=?adminClient();
          ????DeleteTopicsResult?deleteTopicsResult?=?adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
          ????deleteTopicsResult.all().get();
          }

          描述topic

          /**
          ?*?獲取topic的描述信息
          ?*/
          public?static?void?describeTopics(List?topics)?throws?Exception?{
          ????//?創(chuàng)建AdminClient客戶端對(duì)象
          ????AdminClient?adminClient?=?BuildAdminClient.createAdminClientByProperties();

          ????//?獲取Topic的描述信息
          ????DescribeTopicsResult?result?=?adminClient.describeTopics(topics);

          ????//?解析描述信息結(jié)果,?Map?==>?topicName:topicDescription
          ????Map?topicDescriptionMap?=?result.all().get();
          ????topicDescriptionMap.forEach((topicName,?description)?->?System.out.printf("topic?name?=?%s,?desc?=?%s?\n",?topicName,?description));

          ????//?關(guān)閉資源
          ????adminClient.close();
          }

          查看 Topic 的配置信息

          除了Kafka自身的配置項(xiàng)外,其內(nèi)部的Topic也會(huì)有非常多的配置項(xiàng),我們可以通過(guò)describeConfigs方法來(lái)獲取某個(gè)Topic中的配置項(xiàng)信息。代碼示例:

          /**
          ?*?獲取topic的配置信息
          ?*/
          public?static?void?describeConfigTopics(List?topicNames)?throws?Exception?{
          ????//?創(chuàng)建AdminClient客戶端對(duì)象
          ????AdminClient?adminClient?=?BuildAdminClient.createAdminClientByMap();

          ????List?configResources?=?Lists.newArrayListWithCapacity(64);
          ????topicNames.forEach(topicName?->?configResources.add(
          ????????????//?指定要獲取的源
          ????????????new?ConfigResource(ConfigResource.Type.TOPIC,?topicName)));

          ????//?獲取topic的配置信息
          ????DescribeConfigsResult?result?=?adminClient.describeConfigs(configResources);

          ????//?解析topic的配置信息
          ????Map?resourceConfigMap?=?result.all().get();
          ????resourceConfigMap.forEach((configResource,?config)?->?System.out.printf("topic?config?ConfigResource?=?%s,?Config?=?%s?\n",?configResource,?config));

          ????//?關(guān)閉資源
          ????adminClient.close();
          }

          修改 Topic 的分區(qū)數(shù)量

          在創(chuàng)建Topic時(shí)我們需要設(shè)定Partition的數(shù)量,但如果覺(jué)得初始設(shè)置的Partition數(shù)量太少了,那么就可以使用createPartitions方法來(lái)調(diào)整Topic的Partition數(shù)量,但是需要注意在Kafka中Partition只能增加不能減少。代碼示例:

          /**
          ?*?修改topic的分區(qū)數(shù)量
          ?*?只能增加不能減少
          ?*/
          public?static?void?updateTopicPartition(List?topicNames,?Integer?partitionNum)?throws?Exception?{
          ????//?創(chuàng)建AdminClient客戶端對(duì)象
          ????AdminClient?adminClient?=?BuildAdminClient.createAdminClientByMap();

          ????//?構(gòu)建修改分區(qū)的topic請(qǐng)求參數(shù)
          ????Map?newPartitions?=?Maps.newHashMap();
          ????topicNames.forEach(topicName?->?newPartitions.put(topicName,?NewPartitions.increaseTo(partitionNum)));

          ????//?執(zhí)行修改
          ????CreatePartitionsResult?result?=?adminClient.createPartitions(newPartitions);

          ????//?get方法是一個(gè)阻塞方法,一定要等到createPartitions完成之后才進(jìn)行下一步操作
          ????result.all().get();

          ????//?關(guān)閉資源
          ????adminClient.close();
          }

          社區(qū)于 0.11 版本正式推出了 Java 客戶端版的 AdminClient 工具,該工具提供了幾十種運(yùn)維操作,而且它還在不斷地演進(jìn)著。如果可以的話,你最好統(tǒng)一使用 AdminClient 來(lái)執(zhí)行各種 Kafka 集群管理操作,摒棄掉連接 ZooKeeper 的那些工具。另外,建議時(shí)刻關(guān)注該工具的功能完善情況,畢竟,目前社區(qū)對(duì) AdminClient 的變更頻率很高。


          八千里路云和月 | 從零到大數(shù)據(jù)專(zhuān)家學(xué)習(xí)路徑指南

          我們?cè)趯W(xué)習(xí)Flink的時(shí)候,到底在學(xué)習(xí)什么?

          193篇文章暴揍Flink,這個(gè)合集你需要關(guān)注一下

          Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS

          Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問(wèn)題小盤(pán)點(diǎn)

          我們?cè)趯W(xué)習(xí)Spark的時(shí)候,到底在學(xué)習(xí)什么?

          在所有Spark模塊中,我愿稱SparkSQL為最強(qiáng)!

          硬剛Hive | 4萬(wàn)字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)

          數(shù)據(jù)治理方法論和實(shí)踐小百科全書(shū)

          標(biāo)簽體系下的用戶畫(huà)像建設(shè)小指南

          4萬(wàn)字長(zhǎng)文 | ClickHouse基礎(chǔ)&實(shí)踐&調(diào)優(yōu)全視角解析

          【面試&個(gè)人成長(zhǎng)】2021年過(guò)半,社招和校招的經(jīng)驗(yàn)之談

          大數(shù)據(jù)方向另一個(gè)十年開(kāi)啟 |《硬剛系列》第一版完結(jié)

          我寫(xiě)過(guò)的關(guān)于成長(zhǎng)/面試/職場(chǎng)進(jìn)階的文章

          當(dāng)我們?cè)趯W(xué)習(xí)Hive的時(shí)候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」

          瀏覽 39
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  五月花婷婷 | 一区二区高清无码在线 | 日本成人黄页 | 在线中文a 天堂观看 | 老熟妇乱伦视频 |