用 Docker 快速搭建 Kafka 集群
? 開(kāi)源Linux
一個(gè)執(zhí)著于技術(shù)的公眾號(hào)

版本
?JDK 14?Zookeeper?Kafka
安裝 Zookeeper 和 Kafka
Kafka 依賴 Zookeeper,所以我們需要在安裝 Kafka 之前先擁有 Zookeeper。準(zhǔn)備如下的 docker-compose.yaml 文件,將文件中的主機(jī)地址 192.168.1.100 替換成你自己的環(huán)境中的主機(jī)地址即可。
version: "3"services:zookeeper:image: zookeeperbuild:context: ./container_name: zookeeperports:- 2181:2181volumes:- ./data/zookeeper/data:/data- ./data/zookeeper/datalog:/datalog- ./data/zookeeper/logs:/logsrestart: alwayskafka_node_0:depends_on:- zookeeperbuild:context: ./container_name: kafka-node-0image: wurstmeister/kafkaenvironment:KAFKA_BROKER_ID: 0KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_NUM_PARTITIONS: 3KAFKA_DEFAULT_REPLICATION_FACTOR: 2ports:- 9092:9092volumes:- ./data/kafka/node_0:/kafkarestart: unless-stoppedkafka_node_1:depends_on:- kafka_node_0build:context: ./container_name: kafka-node-1image: wurstmeister/kafkaenvironment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9093KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093KAFKA_NUM_PARTITIONS: 3KAFKA_DEFAULT_REPLICATION_FACTOR: 2ports:- 9093:9093volumes:- ./data/kafka/node_1:/kafkarestart: unless-stoppedkafka_node_2:depends_on:- kafka_node_1build:context: ./container_name: kafka-node-2image: wurstmeister/kafkaenvironment:KAFKA_BROKER_ID: 2KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9094KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094KAFKA_NUM_PARTITIONS: 3KAFKA_DEFAULT_REPLICATION_FACTOR: 2ports:- 9094:9094volumes:- ./data/kafka/node_2:/kafkarestart: unless-stopped
輸入 docker-compose up -d 運(yùn)行腳本文件進(jìn)行集群構(gòu)建。等待一會(huì)兒,得到如下結(jié)果即為成功。

SpringBoot 集成 Kafka 集群
創(chuàng)建一個(gè)全新的 SpringBoot 工程,在 build.gradle 文件中添加下列依賴。
dependencies {......implementation 'org.springframework.kafka:spring-kafka:2.5.2.RELEASE'implementation 'com.alibaba:fastjson:1.2.71'}
1.在 application.properties 進(jìn)行 Kafka 相關(guān)參數(shù)配置。
spring.kafka.bootstrap-servers=192.168.1.100:9092,192.168.1.100:9093,192.168.1.100:9094spring.kafka.producer.retries=0spring.kafka.producer.batch-size=16384spring.kafka.producer.buffer-memory=33554432spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.consumer.auto-offset-reset=latestspring.kafka.consumer.enable-auto-commit=truespring.kafka.consumer.auto-commit-interval=100
2.創(chuàng)建消息體類。
public class Message {private Long id;private String message;private Date sendAt;}
3.創(chuàng)建消息發(fā)送者
public class Sender {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void send() {Message message = new Message();message.setId(System.currentTimeMillis());message.setMessage(UUID.randomUUID().toString());message.setSendAt(new Date());log.info("message = {}", JSON.toJSONString(message));kafkaTemplate.send("test", JSON.toJSONString(message));}}
4.創(chuàng)建消息接收者
public class Receiver {@KafkaListener(topics = {"test"}, groupId = "test")public void listen(ConsumerRecord, ?> record) {Optional> message = Optional.ofNullable(record.value());if (message.isPresent()) {log.info("receiver record = " + record);log.info("receiver message = " + message.get());}}}
5.測(cè)試消息隊(duì)列
public class QueueController {@Autowiredprivate Sender sender;@PostMapping("/test")public void testQueue() {sender.send();sender.send();sender.send();}}
得到如下日志即為集成成功。

到這里就我們就成功搭建了一個(gè) Kafka 偽集群,并成功與 SpringBoot 進(jìn)行整合。
如果你想了解Docker容器技術(shù)及應(yīng)用,歡迎參加8月7日晚的公開(kāi)課。本次公開(kāi)課主要圍繞Docker容器技術(shù)入門(mén)和應(yīng)用實(shí)戰(zhàn),系統(tǒng)介紹Docker容器技術(shù)、在企業(yè)中應(yīng)用、以及網(wǎng)絡(luò)管理、儲(chǔ)存卷管理和部署Nginx、PHP、MySQL、Redis實(shí)戰(zhàn)等,干貨滿滿。
關(guān)注「開(kāi)源Linux」加星標(biāo),提升IT技能
好文章,分享、點(diǎn)贊、在看三連哦??↓↓↓
