springboot + kafka 入門實例 入門demo
點擊上方藍(lán)色字體,選擇“標(biāo)星公眾號”
優(yōu)質(zhì)文章,第一時間送達(dá)
? 作者?|???jelly_oy?
來源 |? urlify.cn/NVJFva? ? ? ??
版本說明
springboot版本:2.3.3.RELEASE
kakfa服務(wù)端版本:kafka_2.12-2.6.0.tgz
zookeeper服務(wù)端版本:apache-zookeeper-3.6.1-bin.tar.gz
實例搭建前提條件
1,搭建好zookeeper服務(wù),本實例zookeeper使用單機(jī)偽集群模式,
192.168.1.126:2181, 192.168.1.126:2182, 192.168.1.126:21832,搭建好kafka服務(wù),本實例kafka使用單機(jī)偽集群模式,
192.168.1.126:9092, 192.168.1.126:9093, 192.168.1.126:90941. 導(dǎo)入相關(guān)依賴
xml version="1.0"?encoding="UTF-8"?>
<project?xmlns="http://maven.apache.org/POM/4.0.0"?xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
????<modelVersion>4.0.0modelVersion>
????<parent>
????????<groupId>org.springframework.bootgroupId>
????????<artifactId>spring-boot-starter-parentartifactId>
????????<version>2.3.3.RELEASEversion>
????????<relativePath/>?
????parent>
????<groupId>com.examplegroupId>
????<artifactId>springboot-kafka-demoartifactId>
????<version>1.0-SNAPSHOTversion>
????<name>springboot-kafka-demoname>
????<description>springboot-kafka-demodescription>
????<properties>
????????<java.version>1.8java.version>
????properties>
????<dependencies>
????????<dependency>
????????????<groupId>org.springframework.bootgroupId>
????????????<artifactId>spring-boot-starter-webartifactId>
????????dependency>
????????<dependency>
????????????<groupId>org.springframework.bootgroupId>
????????????<artifactId>spring-boot-starter-testartifactId>
????????????<scope>testscope>
????????dependency>
????????<dependency>
????????????<groupId>org.springframework.kafkagroupId>
????????????<artifactId>spring-kafkaartifactId>
????????dependency>
????????<dependency>
????????????<groupId>org.projectlombokgroupId>
????????????<artifactId>lombokartifactId>
????????????<optional>trueoptional>
????????dependency>
????????<dependency>
????????????<groupId>com.alibabagroupId>
????????????<artifactId>fastjsonartifactId>
????????????<version>1.2.54version>
????????dependency>
????dependencies>
????<build>
????????<plugins>
????????????<plugin>
????????????????<groupId>org.springframework.bootgroupId>
????????????????<artifactId>spring-boot-maven-pluginartifactId>
????????????plugin>
????????plugins>
????build>
project>2. yml配置
server:
??port: 8080
??servlet:
????context-path: /
??tomcat:
????uri-encoding: UTF-8
spring:
??kafka:
????#本地虛擬機(jī)kafka偽集群
????bootstrap-servers: 192.168.1.126:9092,192.168.1.126:9093,192.168.1.126:9094
????producer:
??????key-serializer: org.apache.kafka.common.serialization.StringSerializer
??????value-serializer: org.apache.kafka.common.serialization.StringSerializer
??????batch-size: 65536
??????buffer-memory: 524288
??????#自定義的topic
??????myTopic1: testTopic1
??????myTopic2: testTopic2
????consumer:
??????group-id: default-group #默認(rèn)組id 后面會配置多個消費者組
??????key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
??????value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
??????auto-offset-reset: latest
??????enable-auto-commit: false #關(guān)閉自動提交 改由spring-kafka提交
??????auto-commit-interval: 100
??????max-poll-records: 20??????#批量消費 一次接收的最大數(shù)量3. 部分代碼
消息實體類
package com.example.demo.entity;
import?java.util.Date;
import?lombok.Data;
import?lombok.ToString;
@Data
@ToString
public?class?Message {
????private?Long id;
????private?String?msg;
????private?Date?sendTime;
}kafka配置類
package?com.example.demo.config;
import?lombok.Data;
import?org.springframework.beans.factory.annotation.Value;
import?org.springframework.context.annotation.Configuration;
/**
?* kafka配置類
?*/
@Data
@Configuration
public class KafkaConfiguration {
????/**
?????* kafaka集群列表
?????*/
????@Value("${spring.kafka.bootstrap-servers}")
????private?String?bootstrapServers;
?
????/**
?????* kafaka消費group列表
?????*/
????@Value("${spring.kafka.consumer.group-id}")
????private?String?defaultGroupId;
????
????/**
?????* 消費開始位置
?????*/
????@Value("${spring.kafka.consumer.auto-offset-reset}")
????private?String?autoOffsetReset;
?
????/**
?????* 是否自動提交
?????*/
????@Value("${spring.kafka.consumer.enable-auto-commit}")
????private?String?enableAutoCommit;
?
????/**
?????* #如果'enable.auto.commit'為true,則消費者偏移自動提交給Kafka的頻率(以毫秒為單位),默認(rèn)值為5000。
?????*/
????@Value("${spring.kafka.consumer.auto-commit-interval}")
????private?String?autoCommitInterval;
?
????/**
?????* 一次調(diào)用poll()操作時返回的最大記錄數(shù),默認(rèn)值為500
?????*/
????@Value("${spring.kafka.consumer.max-poll-records}")
????private?String?maxPollRecords;
????/**
?????* 自定義的topic1
?????*/
????@Value("${spring.kafka.producer.myTopic1}")
????private?String?myTopic1;
????/**
?????* 自定義的topic2
?????*/
????@Value("${spring.kafka.producer.myTopic2}")
????private?String?myTopic2;
?
}消費者監(jiān)聽類
package com.example.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
?* 消費者1(監(jiān)聽topic1隊列)
?*/
@Component
public?class?ConsumerListener1?{
????@KafkaListener(topics = "${spring.kafka.producer.myTopic1}")
????public?void?listen(ConsumerRecord,String> record) {
????????System.out.println(record);
????????String value?= record.value();
????????System.out.println("消費者1接收到消息:"?+ value);
????}
}測試類
package com.example.demo.controller;
import?com.alibaba.fastjson.JSON;
import?com.example.demo.config.KafkaConfiguration;
import?com.example.demo.entity.Message;
import?com.example.demo.service.KafkaService;
import?com.example.demo.util.UUID;
import?lombok.extern.slf4j.Slf4j;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.web.bind.annotation.GetMapping;
import?org.springframework.web.bind.annotation.PathVariable;
import?org.springframework.web.bind.annotation.RequestMapping;
import?org.springframework.web.bind.annotation.RestController;
import?java.util.Date;
@Slf4j
@RestController
@RequestMapping("/kafka")
public?class?KafkaController {
????@Autowired
????private?KafkaService kafkaService;
????@Autowired
????private?KafkaConfiguration kafkaConfiguration;
????/**
?????* 發(fā)送文本消息
?????* @param msg
?????* @return
?????*/
????@GetMapping("/send/{msg}")
????public?String?send(@PathVariable?String?msg) {
????????kafkaService.send(kafkaConfiguration.getMyTopic1(), msg);
????????return?"生產(chǎn)者發(fā)送消息給topic1:"+msg;
????}
????/**
?????* 發(fā)送JSON數(shù)據(jù)
?????* @return
?????*/
????@GetMapping("/send2")
????public?String?send2() {
????????Message message = new?Message();
????????message.setId(System.currentTimeMillis());
????????message.setMsg("生產(chǎn)者發(fā)送消息到topic1: "?+ UUID.getUUID32());
????????message.setSendTime(new?Date());
????????String?value = JSON.toJSONString(message);
????????log.info("生產(chǎn)者發(fā)送消息到topic1 message = {}", value);
????????kafkaService.send(kafkaConfiguration.getMyTopic1(),value);
????????return?value;
????}
????/**
?????* 發(fā)送JSON數(shù)據(jù)
?????* @return
?????*/
????@GetMapping("/send3")
????public?String?send3() {
????????Message message = new?Message();
????????message.setId(System.currentTimeMillis());
????????message.setMsg("生產(chǎn)者發(fā)送消息到topic2: "?+ UUID.getUUID32());
????????message.setSendTime(new?Date());
????????String?value = JSON.toJSONString(message);
????????log.info("生產(chǎn)者發(fā)送消息到topic2 message = {}", value);
????????kafkaService.send(kafkaConfiguration.getMyTopic2(),value);
????????return?value;
????}
}4. 實例運行結(jié)果



5. 寫在最后
本實例源代碼:https://gitee.com/jelly_oy/springboot-kafka-demo
本實例采用springboot2.3.3 + zookeeper3.6.1 + kafka2.6.0 進(jìn)行搭建
如果本項目對你有幫助,歡迎留言評論,歡迎git clone源代碼。
粉絲福利:108本java從入門到大神精選電子書領(lǐng)取
???
?長按上方鋒哥微信二維碼?2 秒 備注「1234」即可獲取資料以及 可以進(jìn)入java1234官方微信群
感謝點贊支持下哈?
評論
圖片
表情
