<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          springboot + kafka 入門實例 入門demo

          共 4031字,需瀏覽 9分鐘

           ·

          2020-08-25 19:43

          點擊上方藍(lán)色字體,選擇“標(biāo)星公眾號”

          優(yōu)質(zhì)文章,第一時間送達(dá)

          ? 作者?|???jelly_oy?

          來源 |? urlify.cn/NVJFva? ? ? ??

          66套java從入門到精通實戰(zhàn)課程分享?

          版本說明

          • springboot版本:2.3.3.RELEASE

          • kakfa服務(wù)端版本:kafka_2.12-2.6.0.tgz

          • zookeeper服務(wù)端版本:apache-zookeeper-3.6.1-bin.tar.gz

          實例搭建前提條件

          1,搭建好zookeeper服務(wù),本實例zookeeper使用單機(jī)偽集群模式,

          192.168.1.126:2181, 192.168.1.126:2182, 192.168.1.126:2183

          2,搭建好kafka服務(wù),本實例kafka使用單機(jī)偽集群模式,

          192.168.1.126:9092, 192.168.1.126:9093, 192.168.1.126:9094

          1. 導(dǎo)入相關(guān)依賴

          xml version="1.0"?encoding="UTF-8"?>
          <project?xmlns="http://maven.apache.org/POM/4.0.0"?xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          ?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

          ????<modelVersion>4.0.0modelVersion>

          ????<parent>
          ????????<groupId>org.springframework.bootgroupId>
          ????????<artifactId>spring-boot-starter-parentartifactId>
          ????????<version>2.3.3.RELEASEversion>
          ????????<relativePath/>?
          ????parent>

          ????<groupId>com.examplegroupId>
          ????<artifactId>springboot-kafka-demoartifactId>
          ????<version>1.0-SNAPSHOTversion>
          ????<name>springboot-kafka-demoname>
          ????<description>springboot-kafka-demodescription>

          ????<properties>
          ????????<java.version>1.8java.version>
          ????properties>

          ????<dependencies>
          ????????<dependency>
          ????????????<groupId>org.springframework.bootgroupId>
          ????????????<artifactId>spring-boot-starter-webartifactId>
          ????????dependency>

          ????????<dependency>
          ????????????<groupId>org.springframework.bootgroupId>
          ????????????<artifactId>spring-boot-starter-testartifactId>
          ????????????<scope>testscope>
          ????????dependency>

          ????????<dependency>
          ????????????<groupId>org.springframework.kafkagroupId>
          ????????????<artifactId>spring-kafkaartifactId>
          ????????dependency>

          ????????<dependency>
          ????????????<groupId>org.projectlombokgroupId>
          ????????????<artifactId>lombokartifactId>
          ????????????<optional>trueoptional>
          ????????dependency>

          ????????<dependency>
          ????????????<groupId>com.alibabagroupId>
          ????????????<artifactId>fastjsonartifactId>
          ????????????<version>1.2.54version>
          ????????dependency>
          ????dependencies>

          ????<build>
          ????????<plugins>
          ????????????<plugin>
          ????????????????<groupId>org.springframework.bootgroupId>
          ????????????????<artifactId>spring-boot-maven-pluginartifactId>
          ????????????plugin>
          ????????plugins>
          ????build>

          project>

          2. yml配置

          server:
          ??port: 8080
          ??servlet:
          ????context-path: /
          ??tomcat:
          ????uri-encoding: UTF-8

          spring:
          ??kafka:
          ????#本地虛擬機(jī)kafka偽集群
          ????bootstrap-servers: 192.168.1.126:9092,192.168.1.126:9093,192.168.1.126:9094
          ????producer:
          ??????key-serializer: org.apache.kafka.common.serialization.StringSerializer
          ??????value-serializer: org.apache.kafka.common.serialization.StringSerializer
          ??????batch-size: 65536
          ??????buffer-memory: 524288
          ??????#自定義的topic
          ??????myTopic1: testTopic1
          ??????myTopic2: testTopic2
          ????consumer:
          ??????group-id: default-group #默認(rèn)組id 后面會配置多個消費者組
          ??????key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          ??????value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          ??????auto-offset-reset: latest
          ??????enable-auto-commit: false #關(guān)閉自動提交 改由spring-kafka提交
          ??????auto-commit-interval: 100
          ??????max-poll-records: 20??????#批量消費 一次接收的最大數(shù)量

          3. 部分代碼

          消息實體類

          package com.example.demo.entity;

          import?java.util.Date;
          import?lombok.Data;
          import?lombok.ToString;

          @Data
          @ToString
          public?class?Message {
          ????private?Long id;
          ????private?String?msg;
          ????private?Date?sendTime;

          }

          kafka配置類

          package?com.example.demo.config;

          import?lombok.Data;
          import?org.springframework.beans.factory.annotation.Value;
          import?org.springframework.context.annotation.Configuration;

          /**
          ?* kafka配置類
          ?*/

          @Data
          @Configuration
          public class KafkaConfiguration {
          ????/**
          ?????* kafaka集群列表
          ?????*/

          ????@Value("${spring.kafka.bootstrap-servers}")
          ????private?String?bootstrapServers;
          ?
          ????/**
          ?????* kafaka消費group列表
          ?????*/

          ????@Value("${spring.kafka.consumer.group-id}")
          ????private?String?defaultGroupId;
          ????
          ????/**
          ?????* 消費開始位置
          ?????*/

          ????@Value("${spring.kafka.consumer.auto-offset-reset}")
          ????private?String?autoOffsetReset;
          ?
          ????/**
          ?????* 是否自動提交
          ?????*/

          ????@Value("${spring.kafka.consumer.enable-auto-commit}")
          ????private?String?enableAutoCommit;
          ?
          ????/**
          ?????* #如果'enable.auto.commit'為true,則消費者偏移自動提交給Kafka的頻率(以毫秒為單位),默認(rèn)值為5000。
          ?????*/

          ????@Value("${spring.kafka.consumer.auto-commit-interval}")
          ????private?String?autoCommitInterval;
          ?
          ????/**
          ?????* 一次調(diào)用poll()操作時返回的最大記錄數(shù),默認(rèn)值為500
          ?????*/

          ????@Value("${spring.kafka.consumer.max-poll-records}")
          ????private?String?maxPollRecords;

          ????/**
          ?????* 自定義的topic1
          ?????*/

          ????@Value("${spring.kafka.producer.myTopic1}")
          ????private?String?myTopic1;

          ????/**
          ?????* 自定義的topic2
          ?????*/

          ????@Value("${spring.kafka.producer.myTopic2}")
          ????private?String?myTopic2;
          ?
          }

          消費者監(jiān)聽類

          package com.example.demo.consumer;

          import org.apache.kafka.clients.consumer.ConsumerRecord;
          import org.springframework.kafka.annotation.KafkaListener;
          import org.springframework.stereotype.Component;

          /**
          ?* 消費者1(監(jiān)聽topic1隊列)
          ?*/

          @Component
          public?class?ConsumerListener1?{

          ????@KafkaListener(topics = "${spring.kafka.producer.myTopic1}")
          ????public?void?listen(ConsumerRecord record) {
          ????????System.out.println(record);
          ????????String value?= record.value();
          ????????System.out.println("消費者1接收到消息:"?+ value);
          ????}
          }

          測試類

          package com.example.demo.controller;

          import?com.alibaba.fastjson.JSON;
          import?com.example.demo.config.KafkaConfiguration;
          import?com.example.demo.entity.Message;
          import?com.example.demo.service.KafkaService;
          import?com.example.demo.util.UUID;
          import?lombok.extern.slf4j.Slf4j;
          import?org.springframework.beans.factory.annotation.Autowired;
          import?org.springframework.web.bind.annotation.GetMapping;
          import?org.springframework.web.bind.annotation.PathVariable;
          import?org.springframework.web.bind.annotation.RequestMapping;
          import?org.springframework.web.bind.annotation.RestController;

          import?java.util.Date;

          @Slf4j
          @RestController
          @RequestMapping("/kafka")
          public?class?KafkaController {
          ????@Autowired
          ????private?KafkaService kafkaService;

          ????@Autowired
          ????private?KafkaConfiguration kafkaConfiguration;

          ????/**
          ?????* 發(fā)送文本消息
          ?????* @param msg
          ?????* @return
          ?????*/

          ????@GetMapping("/send/{msg}")
          ????public?String?send(@PathVariable?String?msg) {
          ????????kafkaService.send(kafkaConfiguration.getMyTopic1(), msg);
          ????????return?"生產(chǎn)者發(fā)送消息給topic1:"+msg;
          ????}

          ????/**
          ?????* 發(fā)送JSON數(shù)據(jù)
          ?????* @return
          ?????*/

          ????@GetMapping("/send2")
          ????public?String?send2() {
          ????????Message message = new?Message();
          ????????message.setId(System.currentTimeMillis());
          ????????message.setMsg("生產(chǎn)者發(fā)送消息到topic1: "?+ UUID.getUUID32());
          ????????message.setSendTime(new?Date());

          ????????String?value = JSON.toJSONString(message);
          ????????log.info("生產(chǎn)者發(fā)送消息到topic1 message = {}", value);

          ????????kafkaService.send(kafkaConfiguration.getMyTopic1(),value);
          ????????return?value;
          ????}

          ????/**
          ?????* 發(fā)送JSON數(shù)據(jù)
          ?????* @return
          ?????*/

          ????@GetMapping("/send3")
          ????public?String?send3() {
          ????????Message message = new?Message();
          ????????message.setId(System.currentTimeMillis());
          ????????message.setMsg("生產(chǎn)者發(fā)送消息到topic2: "?+ UUID.getUUID32());
          ????????message.setSendTime(new?Date());

          ????????String?value = JSON.toJSONString(message);
          ????????log.info("生產(chǎn)者發(fā)送消息到topic2 message = {}", value);

          ????????kafkaService.send(kafkaConfiguration.getMyTopic2(),value);
          ????????return?value;
          ????}

          }

          4. 實例運行結(jié)果



          5. 寫在最后

          本實例源代碼:https://gitee.com/jelly_oy/springboot-kafka-demo

          本實例采用springboot2.3.3 + zookeeper3.6.1 + kafka2.6.0 進(jìn)行搭建

          如果本項目對你有幫助,歡迎留言評論,歡迎git clone源代碼。



          粉絲福利:108本java從入門到大神精選電子書領(lǐng)取

          ???

          ?長按上方鋒哥微信二維碼?2 秒
          備注「1234」即可獲取資料以及
          可以進(jìn)入java1234官方微信群



          感謝點贊支持下哈?

          瀏覽 62
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  精品九九九 | 日本色情视频在线播放 | 在线视频观看三区 | 操逼网站黄色 | 黑逼逼无码区 |