<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          springboot整合kafka

          共 4044字,需瀏覽 9分鐘

           ·

          2020-11-16 23:05

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”

          優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

          ? 作者?|??Jone_chen

          來源 |? urlify.cn/yEF7v2

          66套java從入門到精通實(shí)戰(zhàn)課程分享

          相關(guān)環(huán)境搭建(centos7下搭建單機(jī)kafka)

          1、官網(wǎng)下載[kafka][http://kafka.apache.org/]

          tar?-xzf?kafka_2.12-2.6.0.tgz
          cd?kafka_2.13-2.6.0

          2、修改配置文件(conf下面service.properties中advertised.listeners)

          #?允許外部端口連接????????????????????????????????????????????
          listeners=PLAINTEXT://0.0.0.0:9092??
          #?外部代理地址????????????????????????????????????????????????
          advertised.listeners=PLAINTEXT://192.168.0.175:9092

          3、通過守護(hù)進(jìn)程啟動(dòng)命令

          bin/zookeeper-server-start.sh?-daemon?config/zookeeper.properties
          bin/kafka-server-start.sh?-daemon?config/server.properties

          環(huán)境配置好之后,下面進(jìn)入測(cè)試。
          4、創(chuàng)建一個(gè)主題

          bin/kafka-topics.sh?--create?--topic?test?--bootstrap-server?localhost:9092

          5、將事件/消息寫入主題(創(chuàng)建生產(chǎn)者)

          bin/kafka-console-producer.sh?--topic?test?--bootstrap-server?localhost:9092
          >this?is?test!

          按crtl+c可退出當(dāng)前輸入模式
          6、消費(fèi)

          bin/kafka-console-consumer.sh?--topic?test?--from-beginning?--bootstrap-server?localhost:9092

          springboot集成kafka

          1、新建工程,添加pom



          ????org.springframework.kafka
          ????spring-kafka

          2、application.yml添加kafka相關(guān)配置:

          spring:
          ??application:
          ????name:?cloud-kafka
          ??kafka:
          ????bootstrap-servers:?192.168.0.175:9092
          ????producer:?#?producer?生產(chǎn)者
          ??????retries:?0?#?重試次數(shù)
          ??????acks:?1?#?應(yīng)答級(jí)別:多少個(gè)分區(qū)副本備份完成時(shí)向生產(chǎn)者發(fā)送ack確認(rèn)(可選0、1、all/-1)
          ??????batch-size:?16384?#?批量大小
          ??????buffer-memory:?33554432?#?生產(chǎn)端緩沖區(qū)大小
          ??????key-serializer:?org.apache.kafka.common.serialization.StringSerializer
          ??????value-serializer:?org.apache.kafka.common.serialization.StringSerializer
          ????consumer:?#?consumer消費(fèi)者
          ??????group-id:?test-consumer-group?#?默認(rèn)的消費(fèi)組ID
          ??????enable-auto-commit:?true?#?是否自動(dòng)提交offset
          ??????auto-commit-interval:?100?#?提交offset延時(shí)(接收到消息后多久提交offset)
          ??????auto-offset-reset:?latest
          ??????#?當(dāng)kafka中沒有初始o(jì)ffset或offset超出范圍時(shí)將自動(dòng)重置offset;
          ??????#?earliest:重置為分區(qū)中最小的offset;
          ??????#?latest:重置為分區(qū)中最新的offset(消費(fèi)分區(qū)中新產(chǎn)生的數(shù)據(jù));
          ??????#?none:只要有一個(gè)分區(qū)不存在已提交的offset,就拋出異常;
          ??????key-deserializer:?org.apache.kafka.common.serialization.StringDeserializer
          ??????value-deserializer:?org.apache.kafka.common.serialization.StringDeserializer

          3、生產(chǎn)者(發(fā)送者)

          @RestController
          public?class?KafkaProducer?{
          ????@Resource
          ????private?KafkaTemplate?kafkaTemplate;

          ????@GetMapping("/kafka/normal/{msg}")
          ????public?void?sendMessage(@PathVariable("msg")?String?msg)?{
          ????????Message?message?=?new?Message();
          ????????message.setId(UUID.randomUUID().toString());
          ????????message.setSendTime(new?Date());
          ????????message.setMessage(msg);
          ????????kafkaTemplate.send("test",?JSONUtil.toJsonStr(message));
          ????}
          }

          4、消費(fèi)者(接受者)

          @Component
          public?class?KafkaConsumer?{
          ????private?final?Logger?logger?=?LoggerFactory.getLogger(KafkaConsumer.class);

          ????@KafkaListener(topics?=?{"test"})
          ????public?void?onMessage(ConsumerRecord?consumerRecord)?{
          ????????Optional?optional?=?Optional.ofNullable(consumerRecord.value());
          ????????if?(optional.isPresent())?{
          ????????????Object?msg?=?optional.get();
          ????????????logger.info("record:{}",?consumerRecord);
          ????????????logger.info("message:{}",?msg);
          ????????}
          ????}
          }

          5、實(shí)體類

          public?class?Message?{
          ????private?String?id;
          ????private?String?message;
          ????private?Date?sendTime;
          ????//?getter?setter?略??
          }

          上面示例創(chuàng)建了一個(gè)生產(chǎn)者,發(fā)送消息到test,消費(fèi)者監(jiān)聽test消費(fèi)消息。監(jiān)聽器用@KafkaListener注解,topics表示監(jiān)聽的topic,支持同時(shí)監(jiān)聽多個(gè),用英文逗號(hào)分隔。啟動(dòng)項(xiàng)目,postman調(diào)接口觸發(fā)生產(chǎn)者發(fā)送消息。
          同時(shí)查看日志信息:

          2020-11-09?17:28:08.530??INFO?15076?---?[ntainer#0-0-C-1]?com.example.service.KafkaConsumer????????:?record:ConsumerRecord(topic?=?test,?partition?=?0,?leaderEpoch?=?0,?offset?=?3,?CreateTime?=?1604914088509,?serialized?key?size?=?-1,?serialized?value?size?=?87,?headers?=?RecordHeaders(headers?=?[],?isReadOnly?=?false),?key?=?null,?value?=?{"message":"test","sendTime":1604914088452,"id":"f4dcc246-8721-4ef8-bad4-555269328901"})
          2020-11-09?17:28:08.530??INFO?15076?---?[ntainer#0-0-C-1]?com.example.service.KafkaConsumer????????:?message:{"message":"test","sendTime":1604914088452,"id":"f4dcc246-8721-4ef8-bad4-555269328901"}

          可以看到消費(fèi)成功。

          更詳細(xì)內(nèi)容,請(qǐng)參考:https://blog.csdn.net/yuanlong122716/article/details/105160545/







          粉絲福利:實(shí)戰(zhàn)springboot+CAS單點(diǎn)登錄系統(tǒng)視頻教程免費(fèi)領(lǐng)取

          ???

          ?長按上方微信二維碼?2 秒
          即可獲取資料



          感謝點(diǎn)贊支持下哈?

          瀏覽 41
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  av免费在线观看网站 | 狠狠撸 | 人人爽人人操 | 国产免费无码一区二区三区四区 | 蜜桃av秘 无码一区二区三欧 |