<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Kafka在SpringBoot中的實(shí)踐

          共 15342字,需瀏覽 31分鐘

           ·

          2021-05-23 22:14

          Kafka作為一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),目前已經(jīng)越來(lái)越被廣泛的應(yīng)用。這里介紹下如何在Spring Boot下集成、應(yīng)用

          abstract.png

          環(huán)境搭建

          我們使用Docker來(lái)進(jìn)行實(shí)踐,其中本機(jī)IP為192.168.2.101。其實(shí)這里還需要一個(gè)ZooKeeper實(shí)例用于保存元數(shù)據(jù),這里就不贅述ZooKeeper相關(guān)配置了

          # 拉取鏡像
          docker pull wurstmeister/kafka

          # 創(chuàng)建容器  
          docker run \
           -e KAFKA_BROKER_ID=1 \
           -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ # KafKa監(jiān)聽端口
           -e KAFKA_ZOOKEEPER_CONNECT=192.168.2.101:2181/kafka \ # ZooKeeper地址、端口
           -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.2.101:9092 \ # 暴露給客戶端的地址、端口信息
           -d -p 9092:9092 \
           --name myKafka \
           wurstmeister/kafka

          由于wurstmeister/kafka鏡像使用Alpine Linux作為基礎(chǔ)鏡像環(huán)境,其使用的UTC時(shí)間與我們本地時(shí)間(北京時(shí)間)有8個(gè)小時(shí)的時(shí)差。故這里我們介紹下如何在Alpine Linux設(shè)置正確的時(shí)區(qū),利用docker exec命令進(jìn)入容器,然后進(jìn)行如下設(shè)置

          # 安裝 時(shí)區(qū)數(shù)據(jù)
          apk add tzdata
          # 查看 亞洲可用的時(shí)區(qū)
          ls /usr/share/zoneinfo/Asia
          # 復(fù)制 亞洲/上海 時(shí)區(qū) 到 /etc/localtime 下
          cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
          # 設(shè)置時(shí)區(qū)為 亞洲/上海
          echo "Asia/Shanghai" > /etc/timezone
          # 查看當(dāng)前時(shí)間,驗(yàn)證是否生效
          date -R

          一切配置好了,現(xiàn)在我們通過(guò)命令腳本來(lái)驗(yàn)證下看看Kafka是否可以正常工作

          生產(chǎn)者操作如下

          # 進(jìn)入容器
          docker exec -it myKafka /bin/bash
          # 切換目錄
          cd /opt/kafka/bin
          # 使用kafka-console-producer.sh腳本 生產(chǎn)消息
          ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
          # 生產(chǎn)消息1
          one
          # 生產(chǎn)消息2
          hell world

          消費(fèi)者操作如下

          # 進(jìn)入容器
          docker exec -it myKafka /bin/bash
          # 切換目錄
          cd /opt/kafka/bin
          # 使用kafka-console-consumer.sh腳本 消費(fèi)消息
          ./kafka-console-consumer.sh --broker-list localhost:9092 --topic test

          效果如下所示,符合預(yù)期

          figure 1.jpeg

          SpringBoot集成Kafka

          依賴

          SpringBoot下使用Kafka很方便,直接添加依賴即可。

          <!-- Kafka -->
          <dependency>
              <groupId>org.springframework.kafka</groupId>
              <artifactId>spring-kafka</artifactId>
              <version>2.6.4</version>
          </dependency>

          這里我們需要注意SpringBoot與spring-kafka之間的版本兼容性,具體地可以參考官網(wǎng)(https://spring.io/projects/spring-kafka ),下圖紅框、藍(lán)框分別為spring-kafka、SpringBoot的版本要求。這里,我們的SpringBoot版本為2.4.1

          figure 2.jpeg

          其實(shí),關(guān)于spring-kafka版本的選擇問(wèn)題還有一個(gè)小技巧,我們可以在POM依賴中不指定spring-kafka的版本信息,這樣其會(huì)自動(dòng)選擇合適的版本

          配置

          在 application.properties 中添加相關(guān)的必要配置

          # Kafka
          # Kafka 地址、端口
          spring.kafka.bootstrap-servers=127.0.0.1:9092
          # 自定義Kafka分區(qū)器
          spring.kafka.producer.properties.partitioner.class=com.aaron.SpringBoot1.Kafka.KafkaPartitioner
          # 生產(chǎn)者 key、value的序列化器
          spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
          spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
          # 消費(fèi)者 key、value的反序列化器
          spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
          spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

          實(shí)踐

          手動(dòng)聲明一個(gè)Topic,并設(shè)置分區(qū)數(shù)為4

          @Configuration
          public class KafkaConfig {

              public static final String TOPIC_ALARM_IN = "topic_alarm_in";

              /**
               * 聲明Topic,設(shè)置其分區(qū)數(shù)為4
               * @return
               */

              @Bean
              public NewTopic topic1() {
                  return TopicBuilder.name(TOPIC_ALARM_IN)
                          .partitions(4)
                          .build();
              }
          }

          在上文的配置中,我們定義了一個(gè)自定義的Kafka分區(qū)器。我們需要實(shí)現(xiàn)Partitioner接口,在partition方法中實(shí)現(xiàn)我們的分區(qū)邏輯。如下所示

          /**
           * 自定義Kafka分區(qū)器
           */

          public class KafkaPartitioner implements Partitioner {

              @Override
              public void configure(Map<String, ?> configs) {
              }

              @Override
              public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

                  // 獲取該主題的分區(qū)數(shù)量
                  int size = cluster.partitionsForTopic(topic).size();
                  // 分區(qū)號(hào)
                  int index = -1;
                  switch ( (String) key) {
                      case "996":
                          index = 1;
                          break;
                      case "247":
                          index = 2;
                          break;
                      case "965":
                          index = 3;
                          break;
                      default:
                          index = 0;
                  }

                  return index;
              }

              @Override
              public void close() {
              }
          }

          消息的生產(chǎn)者就比較簡(jiǎn)單了,我們直接使用kafkaTemplate發(fā)送即可,這里我們簡(jiǎn)單地對(duì)其進(jìn)行了封裝

          @Component
          public class Producer {

              @Autowired
              private KafkaTemplate<String, String> kafkaTemplate;

              @Autowired
              private KafkaSendResultHandler kafkaSendResultHandler;

              public void sendMsg(String topic, String key, String value) {
                  try{
                      // 設(shè)置消息發(fā)送結(jié)果的回調(diào)
                      kafkaTemplate.setProducerListener(kafkaSendResultHandler);
                      kafkaTemplate.send(topic, key, value);
                  }catch (Exception e) {
                      e.printStackTrace();
                  }
              }
          }

          其中,send方法會(huì)返回一個(gè)Future,可進(jìn)一步地通過(guò)get方法獲取發(fā)送結(jié)果。但我們既希望能夠了解消息發(fā)送是否成功,又不希望被阻塞。幸好Kafka提供了一個(gè)回調(diào)接口用于處理發(fā)送結(jié)果,KafkaSendResultHandler實(shí)現(xiàn)如下所示

          @Component
          public class KafkaSendResultHandler implements ProducerListener {

              @Override
              public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
                  String info = "[發(fā)送成功]: ";
                  String resultStr = buildResult(producerRecord);
                  System.out.println( info + resultStr );
              }

              @Override
              public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
                  String info = "[發(fā)送失敗]: ";
                  String resultStr = buildResult(producerRecord);
                  System.out.println( info + resultStr );
                  exception.printStackTrace();
                  System.out.println();
              }

              private String buildResult(ProducerRecord<String, String> producerRecord) {
                  String topic = producerRecord.topic();
                  String key = producerRecord.key();
                  String value = producerRecord.value();
                  String str = " <topic>: " + topic + ", <key>: " + key + ", <value> :" + value;
                  return str;
              }
          }

          這里為了簡(jiǎn)便,使用一個(gè)Controller用于控制消息的發(fā)送,并將id作為key

          @RequiredArgsConstructor(onConstructor = @__(@Autowired))
          @Controller
          @ResponseBody
          @RequestMapping("Kafka")
          public class KafkaController {

              @Autowired
              private Producer producer;

              @RequestMapping("/saveAlarmIn")
              public String test1(@RequestBody AlarmIn alarmIn) {
                  System.out.println("\n------------------------------------");
                  String topic = TOPIC_ALARM_IN;
                  String key = alarmIn.getId().toString();
                  try {
                      String jsonStr = new ObjectMapper().writeValueAsString(alarmIn);
                      producer.sendMsg(topic, key, jsonStr);
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
                  return "OK";
              }

          }

          ...

          /**
           * 進(jìn)入告警
           */

          @Data
          @AllArgsConstructor
          @NoArgsConstructor
          @Builder
          public class AlarmIn {
              /**
               * ID
               */

              private Integer id;

              /**
               * 進(jìn)入告警的人員姓名
               */

              private String personName;

              /**
               * 進(jìn)入告警的區(qū)域名稱
               */

              private String areaName;

              /**
               * 告警級(jí)別
               */

              private Integer level;

          }

          通過(guò)@KafkaListener注解即可實(shí)現(xiàn)消息的監(jiān)聽消費(fèi),具體地可通過(guò)topics、groupId等屬性設(shè)置主題、消費(fèi)者群組名等信息

          @Component
          public class Consumer {

              @KafkaListener(topics = TOPIC_ALARM_IN, groupId = "myGroup1" )
              public void g1c1(ConsumerRecord<String, String> record) {
                  AlarmIn alarmIn = parseAlarmIn(record);
                  int index = record.partition();
                  System.out.println("[myGroup1] <c1>: alarmIn: " + alarmIn + ", partition: " + index);
              }

              @KafkaListener(topics = TOPIC_ALARM_IN, groupId = "myGroup1" )
              public void g1c2(ConsumerRecord<String, String> record) {
                  AlarmIn alarmIn = parseAlarmIn(record);
                  int index = record.partition();
                  System.out.println("[myGroup1] <c2>: alarmIn: " + alarmIn + ", partition: " + index);
              }

              @KafkaListener(topics = TOPIC_ALARM_IN, groupId = "myGroup2" )
              public void g2c3(ConsumerRecord<String, String> record) {
                  AlarmIn alarmIn = parseAlarmIn(record);
                  int index = record.partition();
                  System.out.println("[myGroup2] <c3>: alarmIn: " + alarmIn + ", partition: " + index);
              }

              /**
               * 解析進(jìn)入告警信息
               * @param record
               * @return
               */

              private AlarmIn parseAlarmIn(ConsumerRecord<String, String> record) {
                  String key = record.key();
                  String value = record.value();

                  AlarmIn alarmIn = null;
                  try {
                      alarmIn = new ObjectMapper().readValue(value, AlarmIn.class);
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
                  return alarmIn;
              }

          }

          測(cè)試結(jié)果如下,符合預(yù)期

          figure 3.jpeg

          Note

          1. Kafka將一個(gè)消費(fèi)群組內(nèi)的所有消費(fèi)者視為同一個(gè)整體,他們均訂閱同一個(gè)主題。一條消息只會(huì)被群組內(nèi)的一個(gè)消費(fèi)者進(jìn)行消費(fèi)。但消費(fèi)者組之間不會(huì)相互影響,換言之,如果另外一個(gè)消費(fèi)者組也訂閱了該主題,其同樣也會(huì)收到該消息并進(jìn)行處理。上文的測(cè)試結(jié)果也佐證了這一點(diǎn)

          2. Kafka中一個(gè)主題Topic雖然可以擁有多個(gè)分區(qū),但一個(gè)分區(qū)不能被同一個(gè)消費(fèi)者群組下的多個(gè)消費(fèi)者進(jìn)行消費(fèi)。所以當(dāng) 某個(gè)消費(fèi)者群組中消費(fèi)者的數(shù)量 多于 其訂閱的主題Topic的分區(qū)數(shù) 時(shí),該群組多出來(lái)的消費(fèi)者只會(huì)被閑置、浪費(fèi)

          figure 4.jpeg
          1. 上文我們自定義了一個(gè)Kafka的分區(qū)器,事實(shí)上這并不是必須的。一方面我們?cè)诎l(fā)送時(shí)可以顯式地將消息發(fā)送到指定的分區(qū);另一方面,如果發(fā)送時(shí)未直接指定分區(qū),Kafka也會(huì)使用默認(rèn)的分區(qū)器進(jìn)行分區(qū)。具體地,如果key不為null, 默認(rèn)分區(qū)器則通過(guò)哈希計(jì)算來(lái)保證相同key鍵的消息可以被映射到同一個(gè)分區(qū)下;如果key為null,默認(rèn)分區(qū)器則使用輪詢算法將消息均衡地分布到各個(gè)分區(qū)

          參考文獻(xiàn)

          1. Kafka權(quán)威指南 Neha Narkhede/Gwen Shapira/Todd Palino著
          瀏覽 61
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  人操人人 | 91色噜噜狠狠色婷婷 | 多国五级毛片 | 欧美成人在线观看 | 乱伦小说A片 |