<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          SpringBoot+Nacos+Kafka簡單實現微服務流編排

          共 10581字,需瀏覽 22分鐘

           ·

          2022-06-28 23:30

          點擊關注上方“Stephen”,

          設為“置頂或星標”,第一時間送達干貨

          文章來源:https://c1n.cn/RWt2e

          目錄

          • 前言
          • 準備工作
          • 總結

          前言


          最近一直在做微服務開發(fā),涉及了一些數據處理模塊的開發(fā),每個處理業(yè)務都會開發(fā)獨立的微服務,便于后面拓展和流編排。

          學習了 SpringCloud Data Flow 等框架,感覺這個框架對于我們來說太重了,維護起來也比較麻煩,于是根據流編排的思想,基于我們目前的技術棧實現簡單的流編排功能。

          簡單的說,我們希望自己的流編排就是微服務可插拔,微服務數據入口及輸出可不停機修改。

          準備工作


          | Nacos 安裝及使用入門

          自己學習的話推薦使用 docker 安裝,命令如下:


          拉取鏡像:

          docker pull nacos/nacos-server


          創(chuàng)建服務:

          docker run --env MODE=standalone --name nacos -d -p 8848:8848 nacos/nacos-server

          然后在瀏覽器輸入 ip:8848/nacos,賬號 nacos;密碼 nacos。
          docker 能夠幫助我們快速安裝服務,減少再環(huán)境準備花的時間。

          | 準備三個 SpringBoot 服務,引入 Nacos 及 Kafka

          <parent>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-parent</artifactId>
             <version>2.1.0.RELEASE</version>
          </parent>

          <dependency>
             <groupId>org.springframework.kafka</groupId>
             <artifactId>spring-kafka</artifactId>
          </dependency>

          <dependency>
             <groupId>com.alibaba.boot</groupId>
             <artifactId>nacos-config-spring-boot-starter</artifactId>
             <version>0.2.1</version>
          </dependency>

          配置文件:
          spring:
            kafka:
              bootstrap-servers: kafka-server:9092
              producer:
                acks: all
              consumer:
                group-id: node1-group #三個服務分別為node1 node2 node3
                enable-auto-commitfalse
          # 部署的nacos服務
          nacos:
            config:
              server-addr: nacos-server:8848


          建議配置本機 host 就可以填寫 xxx-server 不用填寫服務 ip。

          | 業(yè)務解讀

          我們現在需要對三個服務進行編排,保障每個服務可以插拔,也可以調整服務的位置。

          示意圖如上:

          • node1 服務監(jiān)聽前置服務發(fā)送的數據流,輸入的 topic 為前置數據服務輸出 topic
          • node2 監(jiān)聽 node1 處理后的數據,所以 node2 監(jiān)聽的 topic 為 node1 輸出的 topic,node3 同理,最終 node3 處理完成后將數據發(fā)送到數據流終點
          • 我們現在要調整流程移除 node2-server,我們只需要把 node1-sink 改變成 node2-sink 即可,這樣我們這幾個服務就可以靈活的嵌入的不同項目的數據流處理業(yè)務中,做到即插即用(當然,數據格式這些業(yè)務層面的都是需要約定好的)
          • 動態(tài)可調還可以保證服務某一節(jié)點出現問題時候,即時改變數據流向,比如發(fā)送到數暫存服務,避免 Kafka 中積累太多數據,吞吐不平衡


          | Nacos 配置

          ①創(chuàng)建配置


          通常流編排里面每個服務都有一個輸入及輸出,分別為 input 及 sink,所以每個服務我們需要配置兩個 topic,分別是 input-topic output-topic,我們就在 nacos 里面添加輸入輸出配置。


          nacos 配置項需要配置 groupId,dataId,通常我們用服務名稱作為 groupId,配置項的名稱作為 dataId。


          如 node1-server 服務有一個 input 配置項,配置如下:

          完成其中一個服務的配置,其它服務參考下圖配置即可:

          ②讀取配置


          代碼如下:
          @Configuration
          @NacosPropertySource(dataId = "input", groupId = "node1-server", autoRefreshed = true)
          // autoRefreshed=true指的是nacos中配置發(fā)生改變后會刷新,false代表只會使用服務啟動時候讀取到的值
          @NacosPropertySource(dataId = "sink", groupId = "node1-server", autoRefreshed = true)
          public class NacosConfig {

              @NacosValue(value = "${input:}", autoRefreshed = true)
              private String input;

              @NacosValue(value = "${sink:}", autoRefreshed = true)
              private String sink;

              public String getInput() {
                  return input;
              }

              public String getSink() {
                  return sink;
              }
          }

          ③監(jiān)聽配置改變


          服務的輸入需要在服務啟動時候創(chuàng)建消費者,在 topic 發(fā)生改變時候重新創(chuàng)建消費者,移除舊 topic 的消費者,輸出是業(yè)務驅動的,無需監(jiān)聽改變,在每次發(fā)送時候讀取到的都是最新配置的 topic。


          因為在上面的配置類中 autoRefreshed = true,這個只會刷新 nacosConfig 中的配置值,服務需要知道配置改變去驅動消費的創(chuàng)建業(yè)務,需要創(chuàng)建 nacos 配置監(jiān)聽。

          /**
           * 監(jiān)聽Nacos配置改變,創(chuàng)建消費者,更新消費
           */

          @Component
          public class ConsumerManager {

              @Value("${spring.kafka.bootstrap-servers}")
              private String servers;

              @Value("${spring.kafka.consumer.enable-auto-commit}")
              private boolean enableAutoCommit;

              @Value("${spring.kafka.consumer.group-id}")
              private boolean groupId;

              @Autowired
              private NacosConfig nacosConfig;

              @Autowired
              private KafkaTemplate kafkaTemplate;

              // 用于存放當前消費者使用的topic
              private String topic;

              // 用于執(zhí)行消費者線程
              private ExecutorService executorService;

              /**
               * 監(jiān)聽input
               */

              @NacosConfigListener(dataId = "node1-server", groupId = "input")
              public void inputListener(String input) {
                  // 這個監(jiān)聽觸發(fā)的時候 實際NacosConfig中input的值已經是最新的值了 我們只是需要這個監(jiān)聽觸發(fā)我們更新消費者的業(yè)務
                  String inputTopic = nacosConfig.getInput();
                  // 我使用nacosConfig中讀取的原因是因為監(jiān)聽到內容是input=xxxx而不是xxxx,如果使用需要自己截取一下,nacosConfig中的內容框架會處理好,大家看一下第一張圖的配置內容就明白了
                  // 先檢查當前局部變量topic是否有值,有值代表是更新消費者,沒有值只需要創(chuàng)建即可
                  if(topic != null) {
                      // 停止舊的消費者線程
                      executorService.shutdownNow();
                      executorService == null;
                  }
                  // 根據為新的topic創(chuàng)建消費者
                  topic = inputTopic;
                  ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(topic + "-pool-%d").build();
                  executorService = new ThreadPoolExecutor(110L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(2), threadFactory);
                  // 執(zhí)行消費業(yè)務
                  executorService.execute(() -> consumer(topic));
              }

              /**
               * 創(chuàng)建消費者
               */

              public void consumer(String topic) {
                  Properties properties = new Properties();
                  properties.put("bootstrap.servers", servers);
                  properties.put("enable.auto.commit", enableAutoCommit);
                  properties.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
                  properties.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
                  properties.put("group.id", groupId);
                  KafkaConsumer<StringString> consumer = new KafkaConsumer<>(properties);
                  consumer.subscribe(Arrays.asList(topic));
                  try {
                      while (!Thread.currentThread().isInterrupted()) {
                          Duration duration = Duration.ofSeconds(1L);
                          ConsumerRecords<StringString> records = consumer.poll(duration);
                          for (ConsumerRecord<StringString> record : records) {
                              String message = record.value();
                              // 執(zhí)行數據處理業(yè)務 省略業(yè)務實現
                              String handleMessage =  handle(message);
                              // 處理完成后發(fā)送到下一個節(jié)點
                              kafkaTemplate.send(nacosConfig.getSink(), handleMessage);
                          }
                      }
                      consumer.commitAsync();
                  }
                  } catch (Exception e) {
                      LOGGER.error(e.getMessage(), e);
                  } finally {
                      try {
                          consumer.commitSync();
                      } finally {
                          consumer.close();
                      }
                  }
              }
          }


          總結


          流編排的思路整體來說就是數據流方向可調,我們以此為需求,根據一些主流框架提供的 api 實現自己的動態(tài)調整方案,可以幫助自己更好的理解流編碼思想及原理。


          在實際業(yè)務中,還有許多業(yè)務問題需要去突破,我們這樣處理更多是因為服務可插拔,便于流處理微服務在項目靈活搭配。


          因為我現在工作是在傳統(tǒng)公司,由于一些原因很難去推動新框架的使用,經常會用一些現有技術棧組合搞一些 sao 操作,供大家參考,希望大家多多指教。

          END


          關注 Stephen,一起學習,一起成長。


          “在看”支持下吧


          點 閱讀原文 可優(yōu)惠充值話費,流量,視頻會員等。

          瀏覽 70
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲入口 | 在线观看免费视频黄色 | www.青春草 | 人妻蜜桃 | 天天操天天摸天天爽 |