Kafka在SpringBoot中的實(shí)踐
Kafka作為一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),目前已經(jīng)越來(lái)越被廣泛的應(yīng)用。這里介紹下如何在Spring Boot下集成、應(yīng)用

環(huán)境搭建
我們使用Docker來(lái)進(jìn)行實(shí)踐,其中本機(jī)IP為192.168.2.101。其實(shí)這里還需要一個(gè)ZooKeeper實(shí)例用于保存元數(shù)據(jù),這里就不贅述ZooKeeper相關(guān)配置了
# 拉取鏡像
docker pull wurstmeister/kafka
# 創(chuàng)建容器
docker run \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ # KafKa監(jiān)聽端口
-e KAFKA_ZOOKEEPER_CONNECT=192.168.2.101:2181/kafka \ # ZooKeeper地址、端口
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.2.101:9092 \ # 暴露給客戶端的地址、端口信息
-d -p 9092:9092 \
--name myKafka \
wurstmeister/kafka
由于wurstmeister/kafka鏡像使用Alpine Linux作為基礎(chǔ)鏡像環(huán)境,其使用的UTC時(shí)間與我們本地時(shí)間(北京時(shí)間)有8個(gè)小時(shí)的時(shí)差。故這里我們介紹下如何在Alpine Linux設(shè)置正確的時(shí)區(qū),利用docker exec命令進(jìn)入容器,然后進(jìn)行如下設(shè)置
# 安裝 時(shí)區(qū)數(shù)據(jù)
apk add tzdata
# 查看 亞洲可用的時(shí)區(qū)
ls /usr/share/zoneinfo/Asia
# 復(fù)制 亞洲/上海 時(shí)區(qū) 到 /etc/localtime 下
cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
# 設(shè)置時(shí)區(qū)為 亞洲/上海
echo "Asia/Shanghai" > /etc/timezone
# 查看當(dāng)前時(shí)間,驗(yàn)證是否生效
date -R
一切配置好了,現(xiàn)在我們通過(guò)命令腳本來(lái)驗(yàn)證下看看Kafka是否可以正常工作
生產(chǎn)者操作如下
# 進(jìn)入容器
docker exec -it myKafka /bin/bash
# 切換目錄
cd /opt/kafka/bin
# 使用kafka-console-producer.sh腳本 生產(chǎn)消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
# 生產(chǎn)消息1
one
# 生產(chǎn)消息2
hell world
消費(fèi)者操作如下
# 進(jìn)入容器
docker exec -it myKafka /bin/bash
# 切換目錄
cd /opt/kafka/bin
# 使用kafka-console-consumer.sh腳本 消費(fèi)消息
./kafka-console-consumer.sh --broker-list localhost:9092 --topic test
效果如下所示,符合預(yù)期

SpringBoot集成Kafka
依賴
SpringBoot下使用Kafka很方便,直接添加依賴即可。
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.4</version>
</dependency>
這里我們需要注意SpringBoot與spring-kafka之間的版本兼容性,具體地可以參考官網(wǎng)(https://spring.io/projects/spring-kafka ),下圖紅框、藍(lán)框分別為spring-kafka、SpringBoot的版本要求。這里,我們的SpringBoot版本為2.4.1

其實(shí),關(guān)于spring-kafka版本的選擇問(wèn)題還有一個(gè)小技巧,我們可以在POM依賴中不指定spring-kafka的版本信息,這樣其會(huì)自動(dòng)選擇合適的版本
配置
在 application.properties 中添加相關(guān)的必要配置
# Kafka
# Kafka 地址、端口
spring.kafka.bootstrap-servers=127.0.0.1:9092
# 自定義Kafka分區(qū)器
spring.kafka.producer.properties.partitioner.class=com.aaron.SpringBoot1.Kafka.KafkaPartitioner
# 生產(chǎn)者 key、value的序列化器
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 消費(fèi)者 key、value的反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
實(shí)踐
手動(dòng)聲明一個(gè)Topic,并設(shè)置分區(qū)數(shù)為4
@Configuration
public class KafkaConfig {
public static final String TOPIC_ALARM_IN = "topic_alarm_in";
/**
* 聲明Topic,設(shè)置其分區(qū)數(shù)為4
* @return
*/
@Bean
public NewTopic topic1() {
return TopicBuilder.name(TOPIC_ALARM_IN)
.partitions(4)
.build();
}
}
在上文的配置中,我們定義了一個(gè)自定義的Kafka分區(qū)器。我們需要實(shí)現(xiàn)Partitioner接口,在partition方法中實(shí)現(xiàn)我們的分區(qū)邏輯。如下所示
/**
* 自定義Kafka分區(qū)器
*/
public class KafkaPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 獲取該主題的分區(qū)數(shù)量
int size = cluster.partitionsForTopic(topic).size();
// 分區(qū)號(hào)
int index = -1;
switch ( (String) key) {
case "996":
index = 1;
break;
case "247":
index = 2;
break;
case "965":
index = 3;
break;
default:
index = 0;
}
return index;
}
@Override
public void close() {
}
}
消息的生產(chǎn)者就比較簡(jiǎn)單了,我們直接使用kafkaTemplate發(fā)送即可,這里我們簡(jiǎn)單地對(duì)其進(jìn)行了封裝
@Component
public class Producer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaSendResultHandler kafkaSendResultHandler;
public void sendMsg(String topic, String key, String value) {
try{
// 設(shè)置消息發(fā)送結(jié)果的回調(diào)
kafkaTemplate.setProducerListener(kafkaSendResultHandler);
kafkaTemplate.send(topic, key, value);
}catch (Exception e) {
e.printStackTrace();
}
}
}
其中,send方法會(huì)返回一個(gè)Future,可進(jìn)一步地通過(guò)get方法獲取發(fā)送結(jié)果。但我們既希望能夠了解消息發(fā)送是否成功,又不希望被阻塞。幸好Kafka提供了一個(gè)回調(diào)接口用于處理發(fā)送結(jié)果,KafkaSendResultHandler實(shí)現(xiàn)如下所示
@Component
public class KafkaSendResultHandler implements ProducerListener {
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
String info = "[發(fā)送成功]: ";
String resultStr = buildResult(producerRecord);
System.out.println( info + resultStr );
}
@Override
public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
String info = "[發(fā)送失敗]: ";
String resultStr = buildResult(producerRecord);
System.out.println( info + resultStr );
exception.printStackTrace();
System.out.println();
}
private String buildResult(ProducerRecord<String, String> producerRecord) {
String topic = producerRecord.topic();
String key = producerRecord.key();
String value = producerRecord.value();
String str = " <topic>: " + topic + ", <key>: " + key + ", <value> :" + value;
return str;
}
}
這里為了簡(jiǎn)便,使用一個(gè)Controller用于控制消息的發(fā)送,并將id作為key
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Controller
@ResponseBody
@RequestMapping("Kafka")
public class KafkaController {
@Autowired
private Producer producer;
@RequestMapping("/saveAlarmIn")
public String test1(@RequestBody AlarmIn alarmIn) {
System.out.println("\n------------------------------------");
String topic = TOPIC_ALARM_IN;
String key = alarmIn.getId().toString();
try {
String jsonStr = new ObjectMapper().writeValueAsString(alarmIn);
producer.sendMsg(topic, key, jsonStr);
} catch (Exception e) {
e.printStackTrace();
}
return "OK";
}
}
...
/**
* 進(jìn)入告警
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class AlarmIn {
/**
* ID
*/
private Integer id;
/**
* 進(jìn)入告警的人員姓名
*/
private String personName;
/**
* 進(jìn)入告警的區(qū)域名稱
*/
private String areaName;
/**
* 告警級(jí)別
*/
private Integer level;
}
通過(guò)@KafkaListener注解即可實(shí)現(xiàn)消息的監(jiān)聽消費(fèi),具體地可通過(guò)topics、groupId等屬性設(shè)置主題、消費(fèi)者群組名等信息
@Component
public class Consumer {
@KafkaListener(topics = TOPIC_ALARM_IN, groupId = "myGroup1" )
public void g1c1(ConsumerRecord<String, String> record) {
AlarmIn alarmIn = parseAlarmIn(record);
int index = record.partition();
System.out.println("[myGroup1] <c1>: alarmIn: " + alarmIn + ", partition: " + index);
}
@KafkaListener(topics = TOPIC_ALARM_IN, groupId = "myGroup1" )
public void g1c2(ConsumerRecord<String, String> record) {
AlarmIn alarmIn = parseAlarmIn(record);
int index = record.partition();
System.out.println("[myGroup1] <c2>: alarmIn: " + alarmIn + ", partition: " + index);
}
@KafkaListener(topics = TOPIC_ALARM_IN, groupId = "myGroup2" )
public void g2c3(ConsumerRecord<String, String> record) {
AlarmIn alarmIn = parseAlarmIn(record);
int index = record.partition();
System.out.println("[myGroup2] <c3>: alarmIn: " + alarmIn + ", partition: " + index);
}
/**
* 解析進(jìn)入告警信息
* @param record
* @return
*/
private AlarmIn parseAlarmIn(ConsumerRecord<String, String> record) {
String key = record.key();
String value = record.value();
AlarmIn alarmIn = null;
try {
alarmIn = new ObjectMapper().readValue(value, AlarmIn.class);
} catch (Exception e) {
e.printStackTrace();
}
return alarmIn;
}
}
測(cè)試結(jié)果如下,符合預(yù)期

Note
Kafka將一個(gè)消費(fèi)群組內(nèi)的所有消費(fèi)者視為同一個(gè)整體,他們均訂閱同一個(gè)主題。一條消息只會(huì)被群組內(nèi)的一個(gè)消費(fèi)者進(jìn)行消費(fèi)。但消費(fèi)者組之間不會(huì)相互影響,換言之,如果另外一個(gè)消費(fèi)者組也訂閱了該主題,其同樣也會(huì)收到該消息并進(jìn)行處理。上文的測(cè)試結(jié)果也佐證了這一點(diǎn)
Kafka中一個(gè)主題Topic雖然可以擁有多個(gè)分區(qū),但一個(gè)分區(qū)不能被同一個(gè)消費(fèi)者群組下的多個(gè)消費(fèi)者進(jìn)行消費(fèi)。所以當(dāng) 某個(gè)消費(fèi)者群組中消費(fèi)者的數(shù)量 多于 其訂閱的主題Topic的分區(qū)數(shù) 時(shí),該群組多出來(lái)的消費(fèi)者只會(huì)被閑置、浪費(fèi)

上文我們自定義了一個(gè)Kafka的分區(qū)器,事實(shí)上這并不是必須的。一方面我們?cè)诎l(fā)送時(shí)可以顯式地將消息發(fā)送到指定的分區(qū);另一方面,如果發(fā)送時(shí)未直接指定分區(qū),Kafka也會(huì)使用默認(rèn)的分區(qū)器進(jìn)行分區(qū)。具體地,如果key不為null, 默認(rèn)分區(qū)器則通過(guò)哈希計(jì)算來(lái)保證相同key鍵的消息可以被映射到同一個(gè)分區(qū)下;如果key為null,默認(rèn)分區(qū)器則使用輪詢算法將消息均衡地分布到各個(gè)分區(qū)
參考文獻(xiàn)
Kafka權(quán)威指南 Neha Narkhede/Gwen Shapira/Todd Palino著
