<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Kafka中如何動態(tài)指定多個topic

          共 8925字,需瀏覽 18分鐘

           ·

          2021-08-11 10:45



          說明:本項目為springboot+kafak的整合項目,故其用了springboot中對kafak的消費注解@KafkaListener

          首先,application.properties中配置用逗號隔開的多個topic。

          方法:利用Spring的SpEl表達式,將topics 配置為:@KafkaListener(topics = “#{’${topics}’.split(’,’)}”)

          運行程序,console打印的效果如下:

          因為只開了一條消費者線程,所以所有的topic和分區(qū)都分配給這條線程。

          如果你想開多條消費者線程去消費這些topic,添加@KafkaListener注解的參數(shù)concurrency的值為自己想要的消費者個數(shù)即可(注意,消費者數(shù)要小于等于你開的所有topic的分區(qū)數(shù)總和)

          運行程序,console打印的效果如下:

          總結(jié)一下大家問的最多的一個問題:

          如何在程序運行的過程中,改變topic,消費者能夠消費修改后的topic?

          ans: 經(jīng)過嘗試,使用@KafkaListener注解實現(xiàn)不了此需求,在程序啟動的時候,程序就會根據(jù)@KafkaListener的注解信息初始化好消費者去消費指定好的topic。如果在程序運行的過程中,修改topic,不會讓此消費者修改消費者的配置再重新訂閱topic的。

          不過我們可以有個折中的辦法,就是利用@KafkaListener的topicPattern參數(shù)來進行topic匹配。

          具體如何操作的可以看下這位老哥的blog:

          https://blog.csdn.net/songzehao/article/details/103091486

          終極方法:

          思路:

          不使用@KafkaListener,使用kafka原生客戶端依賴,手動初始化消費者,開啟消費者線程。在消費者線程中,每次循環(huán)都從配置、數(shù)據(jù)庫或者其他配置源獲取最新的topic信息,與之前的topic比較,如果發(fā)生變化,重新訂閱topic或者初始化消費者。

          實現(xiàn)

          1.加入kafka客戶端依賴(本次測試服務(wù)端kafka版本:2.12-2.4.0)

          <dependency>
           <groupId>org.apache.kafka</groupId>
           <artifactId>kafka-clients</artifactId>
           <version>2.3.0</version>
          </dependency>

          2.代碼

          @Service
          @Slf4j
          public class KafkaConsumers implements InitializingBean {

              /**
               * 消費者
               */

              private static KafkaConsumer<String, String> consumer;
              /**
               * topic
               */

              private List<String> topicList;

              public static String getNewTopic() {
                  try {
                      return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0);
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
                  return null;
              }

              /**
               * 初始化消費者(配置寫死是為了快速測試,請大家使用配置文件)
               *
               * @param topicList
               * @return
               */

              public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) {
                  //配置信息
                  Properties props = new Properties();
                  //kafka服務(wù)器地址
                  props.put("bootstrap.servers""192.168.9.185:9092");
                  //必須指定消費者組
                  props.put("group.id""haha");
                  //設(shè)置數(shù)據(jù)key和value的序列化處理類
                  props.put("key.deserializer", StringDeserializer.class);
                  props.put("value.deserializer", StringDeserializer.class);
                  //創(chuàng)建消息者實例
                  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
                  //訂閱topic的消息
                  consumer.subscribe(topicList);
                  return consumer;
              }

              /**
               * 開啟消費者線程
               * 異常請自己根據(jù)需求自己處理
               */

              @Override
              public void afterPropertiesSet() {
                  // 初始化topic
                  topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
                  if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) {
                      consumer = getInitConsumer(topicList);
                      // 開啟一個消費者線程
                      new Thread(() -> {
                          while (true) {
                              // 模擬從配置源中獲取最新的topic(字符串,逗號隔開)
                              final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
                              // 如果topic發(fā)生變化
                              if (!topicList.equals(newTopic)) {
                                  log.info("topic 發(fā)生變化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList);
                                  // method one:重新訂閱topic:
                                  topicList = newTopic;
                                  consumer.subscribe(newTopic);
                                  // method two:關(guān)閉原來的消費者,重新初始化一個消費者
                                  //consumer.close();
                                  //topicList = newTopic;
                                  //consumer = getInitConsumer(newTopic);
                                  continue;
                              }
                              ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                              for (ConsumerRecord<String, String> record : records) {
                                  System.out.println("key:" + record.key() + "" + ",value:" + record.value());
                              }
                          }
                      }).start();
                  }
              }
          }

          說一下第72行代碼:

          ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

          上面這行代碼表示:在100ms內(nèi)等待Kafka的broker返回數(shù)據(jù).超市參數(shù)指定poll在多久之后可以返回,不管有沒有可用的數(shù)據(jù)都要返回。

          在修改topic后,必須等到此次poll拉取的消息處理完,while(true)循環(huán)的時候檢測topic發(fā)生變化,才能重新訂閱topic.

          poll()方法一次拉取得消息數(shù)默認為:500,如下圖,kafka客戶端源碼中設(shè)置的。

          如果想自定義此配置,可在初始化消費者時加入

          3.運行結(jié)果(測試的topic中都無數(shù)據(jù))

          注意:KafkaConsumer是線程不安全的,不要用一個KafkaConsumer實例開啟多個消費者,要開啟多個消費者,需要new 多個KafkaConsumer實例。

          來源:blog.csdn.net/qq_35457078/

          article/details/88838511


          瀏覽 195
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  成人超碰在线 | 大香蕉560 | sm捆绑网址 | 青青草视频免费在线播放 | 天天色成人网站 |