<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          springboot 之集成kafka

          共 10058字,需瀏覽 21分鐘

           ·

          2021-03-08 19:20

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”

          優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

          76套java從入門到精通實(shí)戰(zhàn)課程分享

          本章只介紹springboot微服務(wù)集成kafka,跟rabbitmq用法相同,作為一個(gè)消息中間件收發(fā)消息使用,本章僅介紹集成后的基礎(chǔ)用法,研究不深,請各位諒解。


          環(huán)境準(zhǔn)備

          IntelliJ IDEA

          前一章中搭建的微服務(wù)框架

          前一章之后,對目錄結(jié)構(gòu)進(jìn)行了優(yōu)化,將config相關(guān)類都放到demo.config包下

          開始集成

          pom.xml中增加依賴包

          <dependency>
           <groupId>org.springframework.kafka</groupId>
           <artifactId>spring-kafka</artifactId>
          </dependency


          加入依賴包后最好先執(zhí)行mvn clean install編一把,把所需依賴包下下來,后續(xù)寫代碼的時(shí)候直接就可以引了。


          application.yml中引入kafka相關(guān)配置

          spring:
            kafka:
              bootstrap-servers: 172.101.203.33:9092
              producer:
                # 發(fā)生錯(cuò)誤后,消息重發(fā)的次數(shù)。
                retries: 0
                #當(dāng)有多個(gè)消息需要被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會(huì)把它們放在同一個(gè)批次里。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算。
                batch-size: 16384
                # 設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小。
                buffer-memory: 33554432
                # 鍵的序列化方式
                key-serializer: org.apache.kafka.common.serialization.StringSerializer
                # 值的序列化方式
                value-serializer: org.apache.kafka.common.serialization.StringSerializer
                # acks=0 : 生產(chǎn)者在成功寫入消息之前不會(huì)等待任何來自服務(wù)器的響應(yīng)。
                # acks=1 : 只要集群的首領(lǐng)節(jié)點(diǎn)收到消息,生產(chǎn)者就會(huì)收到一個(gè)來自服務(wù)器成功響應(yīng)。
                # acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點(diǎn)全部收到消息時(shí),生產(chǎn)者才會(huì)收到一個(gè)來自服務(wù)器的成功響應(yīng)。
                acks: 1
              consumer:
                # 自動(dòng)提交的時(shí)間間隔 在spring boot 2.X 版本中這里采用的是值的類型為Duration 需要符合特定的格式,如1S,1M,2H,5D
                auto-commit-interval: 1S
                # 該屬性指定了消費(fèi)者在讀取一個(gè)沒有偏移量的分區(qū)或者偏移量無效的情況下該作何處理:
                # latest(默認(rèn)值)在偏移量無效的情況下,消費(fèi)者將從最新的記錄開始讀取數(shù)據(jù)(在消費(fèi)者啟動(dòng)之后生成的記錄)
                # earliest :在偏移量無效的情況下,消費(fèi)者將從起始位置讀取分區(qū)的記錄
                auto-offset-reset: earliest
                # 是否自動(dòng)提交偏移量,默認(rèn)值是true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為false,然后手動(dòng)提交偏移量
                enable-auto-commit: false
                # 鍵的反序列化方式
                key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
                # 值的反序列化方式
                value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
              listener:
                # 在偵聽器容器中運(yùn)行的線程數(shù)。
                concurrency: 5
                #listner負(fù)責(zé)ack,每調(diào)用一次,就立即commit
                ack-mode: manual_immediate
                missing-topics-fatal: false


          該配置位于spring下,其中可以配置kafka server的IP:port,producer、consumer、listener的一些配置,可以參考中文注釋了解其作用


          開始寫代碼了:demo下新增kafka包,并在其下面新增producer和consumer


          package com.example.demo.kafka;

          import lombok.extern.slf4j.Slf4j;
          import org.apache.kafka.clients.consumer.ConsumerRecord;
          import org.springframework.kafka.annotation.KafkaListener;
          import org.springframework.kafka.support.Acknowledgment;
          import org.springframework.kafka.support.KafkaHeaders;
          import org.springframework.messaging.handler.annotation.Header;
          import org.springframework.stereotype.Component;

          import java.util.Optional;

          /**
           * 類功能描述:<br>
           * <ul>
           * <li>類功能描述1<br>
           * <li>類功能描述2<br>
           * <li>類功能描述3<br>
           * </ul>
           * 修改記錄:<br>
           * <ul>
           * <li>修改記錄描述1<br>
           * <li>修改記錄描述2<br>
           * <li>修改記錄描述3<br>
           * </ul>
           *
           * @author xuefl
           * @version 5.0 since 2020-01-13
           */
          @Component
          @Slf4j
          public class KafkaConsumer {

              @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP1)
              public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

                  Optional message = Optional.ofNullable(record.value());
                  if (message.isPresent()) {
                      Object msg = message.get();
                      log.info("topic_test 消費(fèi)了:Topic:" + topic + ",Message:" + msg);
                      ack.acknowledge();
                  }
              }

              @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP2)
              public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

                  Optional message = Optional.ofNullable(record.value());
                  if (message.isPresent()) {
                      Object msg = message.get();
                      log.info("topic_test1 消費(fèi)了:Topic:" + topic + ",Message:" + msg);
                      ack.acknowledge();
                  }
              }
          }



          package com.example.demo.kafka;

          import com.alibaba.fastjson.JSONObject;
          import lombok.extern.slf4j.Slf4j;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.kafka.core.KafkaTemplate;
          import org.springframework.kafka.support.SendResult;
          import org.springframework.stereotype.Component;
          import org.springframework.util.concurrent.ListenableFuture;
          import org.springframework.util.concurrent.ListenableFutureCallback;

          /**
           * 類功能描述:<br>
           * <ul>
           * <li>類功能描述1<br>
           * <li>類功能描述2<br>
           * <li>類功能描述3<br>
           * </ul>
           * 修改記錄:<br>
           * <ul>
           * <li>修改記錄描述1<br>
           * <li>修改記錄描述2<br>
           * <li>修改記錄描述3<br>
           * </ul>
           *
           * @author xuefl
           * @version 5.0 since 2020-01-13
           */
          @Component
          @Slf4j
          public class KafkaProducer {

              @Autowired
              private KafkaTemplate<String, Object> kafkaTemplate;

              //自定義topic
              public static final String TOPIC_TEST = "topic.test";

              //
              public static final String TOPIC_GROUP1 = "topic.group1";

              //
              public static final String TOPIC_GROUP2 = "topic.group2";

              public void send(Object obj) {
                  String obj2String = JSONObject.toJSONString(obj);
                  log.info("準(zhǔn)備發(fā)送消息為:{}", obj2String);
                  //發(fā)送消息
                  ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj);
                  future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                      @Override
                      public void onFailure(Throwable throwable) {
                          //發(fā)送失敗的處理
                          log.info(TOPIC_TEST + " - 生產(chǎn)者 發(fā)送消息失敗:" + throwable.getMessage());
                      }

                      @Override
                      public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                          //成功的處理
                          log.info(TOPIC_TEST + " - 生產(chǎn)者 發(fā)送消息成功:" + stringObjectSendResult.toString());
                      }
                  });


              }
          }


          增加測試controller類,在controller下新建KafkaController類



          測試結(jié)果


          ————————————————

          版權(quán)聲明:本文為CSDN博主「念念不忘,笑對人生」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明。

          原文鏈接:

          https://blog.csdn.net/qq_42715450/article/details/114293390





          鋒哥最新SpringCloud分布式電商秒殺課程發(fā)布

          ??????

          ??長按上方微信二維碼 2 秒





          感謝點(diǎn)贊支持下哈 

          瀏覽 83
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美成人视频在线观看 | 欧美极品一区 | 大香蕉伊人宗和网 | 免费成人毛片网站 | 男女高清无码 |