<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          SpringBoot+Kafka+ELK 完成海量日志收集(超詳細)

          共 27692字,需瀏覽 56分鐘

           ·

          2021-08-23 15:43


          來源:jiandansuifeng.blog.csdn.net/

          article/details/107361190

          整體流程大概如下:

          服務器準備

          在這先列出各服務器節(jié)點,方便同學們在下文中對照節(jié)點查看相應內容

          SpringBoot項目準備

          引入log4j2替換SpringBoot默認log,demo項目結構如下:

          pom

          <dependencies>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-web</artifactId>
                  <!--  排除spring-boot-starter-logging -->
                  <exclusions>
                      <exclusion>
                          <groupId>org.springframework.boot</groupId>
                          <artifactId>spring-boot-starter-logging</artifactId>
                      </exclusion>
                  </exclusions>
              </dependency> 
           <!-- log4j2 -->
           <dependency>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-starter-log4j2</artifactId>
           </dependency> 
             <dependency>
               <groupId>com.lmax</groupId>
               <artifactId>disruptor</artifactId>
               <version>3.3.4</version>
             </dependency> 
          </dependencies> 

          log4j2.xml

          <?xml version="1.0" encoding="UTF-8"?>
          <Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600" >
              <Properties>
                  <Property name="LOG_HOME">logs</Property>
                  <property name="FILE_NAME">collector</property>
                  <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</property>
              </Properties>
              <Appenders>
                  <Console name="CONSOLE" target="SYSTEM_OUT">
                      <PatternLayout pattern="${patternLayout}"/>
                  </Console>  
                  <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
                    <PatternLayout pattern="${patternLayout}" />
                    <Policies>
                        <TimeBasedTriggeringPolicy interval="1"/>
                        <SizeBasedTriggeringPolicy size="500MB"/>
                    </Policies>
                    <DefaultRolloverStrategy max="20"/>         
                  </RollingRandomAccessFile>
                  <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
                    <PatternLayout pattern="${patternLayout}" />
                    <Filters>
                        <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
                    </Filters>              
                    <Policies>
                        <TimeBasedTriggeringPolicy interval="1"/>
                        <SizeBasedTriggeringPolicy size="500MB"/>
                    </Policies>
                    <DefaultRolloverStrategy max="20"/>         
                  </RollingRandomAccessFile>            
              </Appenders>
              <Loggers>
                  <!-- 業(yè)務相關 異步logger -->
                  <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
                    <AppenderRef ref="appAppender"/>
                  </AsyncLogger>
                  <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
                    <AppenderRef ref="errorAppender"/>
                  </AsyncLogger>       
                  <Root level="info">
                      <Appender-Ref ref="CONSOLE"/>
                      <Appender-Ref ref="appAppender"/>
                      <AppenderRef ref="errorAppender"/>
                  </Root>         
              </Loggers>
          </Configuration>

          IndexController

          測試Controller,用以打印日志進行調試

          @Slf4j
          @RestController
          public class IndexController {

           @RequestMapping(value = "/index")
           public String index() {
            InputMDC.putMDC();
            
            log.info("我是一條info日志");
            
            log.warn("我是一條warn日志");

            log.error("我是一條error日志");
            
            return "idx";
           }


           @RequestMapping(value = "/err")
           public String err() {
            InputMDC.putMDC();
            try {
             int a = 1/0;
            } catch (Exception e) {
             log.error("算術異常", e);
            }
            return "err";
           }
           
          }

          InputMDC

          用以獲取log中的[%X{hostName}][%X{ip}][%X{applicationName}]三個字段值

          @Component
          public class InputMDC implements EnvironmentAware {

           private static Environment environment;
           
           @Override
           public void setEnvironment(Environment environment) {
            InputMDC.environment = environment;
           }
           
           public static void putMDC() {
            MDC.put("hostName", NetUtil.getLocalHostName());
            MDC.put("ip", NetUtil.getLocalIp());
            MDC.put("applicationName", environment.getProperty("spring.application.name"));
           }

          }

          NetUtil

          public class NetUtil {   
           
           public static String normalizeAddress(String address){
            String[] blocks = address.split("[:]");
            if(blocks.length > 2){
             throw new IllegalArgumentException(address + " is invalid");
            }
            String host = blocks[0];
            int port = 80;
            if(blocks.length > 1){
             port = Integer.valueOf(blocks[1]);
            } else {
             address += ":"+port; //use default 80
            } 
            String serverAddr = String.format("%s:%d", host, port);
            return serverAddr;
           }
           
           public static String getLocalAddress(String address){
            String[] blocks = address.split("[:]");
            if(blocks.length != 2){
             throw new IllegalArgumentException(address + " is invalid address");
            } 
            String host = blocks[0];
            int port = Integer.valueOf(blocks[1]);
            
            if("0.0.0.0".equals(host)){
             return String.format("%s:%d",NetUtil.getLocalIp(), port);
            }
            return address;
           }
           
           private static int matchedIndex(String ip, String[] prefix){
            for(int i=0; i<prefix.length; i++){
             String p = prefix[i];
             if("*".equals(p)){ //*, assumed to be IP
              if(ip.startsWith("127.") ||
                 ip.startsWith("10.") || 
                 ip.startsWith("172.") ||
                 ip.startsWith("192.")){
               continue;
              }
              return i;
             } else {
              if(ip.startsWith(p)){
               return i;
              }
             } 
            }
            
            return -1;
           }
           
           public static String getLocalIp(String ipPreference) {
            if(ipPreference == null){
             ipPreference = "*>10>172>192>127";
            }
            String[] prefix = ipPreference.split("[> ]+");
            try {
             Pattern pattern = Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");
             Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
             String matchedIp = null;
             int matchedIdx = -1;
             while (interfaces.hasMoreElements()) {
              NetworkInterface ni = interfaces.nextElement();
              Enumeration<InetAddress> en = ni.getInetAddresses(); 
              while (en.hasMoreElements()) {
               InetAddress addr = en.nextElement();
               String ip = addr.getHostAddress();  
               Matcher matcher = pattern.matcher(ip);
               if (matcher.matches()) {  
                int idx = matchedIndex(ip, prefix);
                if(idx == -1continue;
                if(matchedIdx == -1){
                 matchedIdx = idx;
                 matchedIp = ip;
                } else {
                 if(matchedIdx>idx){
                  matchedIdx = idx;
                  matchedIp = ip;
                 }
                }
               } 
              } 
             } 
             if(matchedIp != nullreturn matchedIp;
             return "127.0.0.1";
            } catch (Exception e) { 
             return "127.0.0.1";
            }
           }
           
           public static String getLocalIp() {
            return getLocalIp("*>10>172>192>127");
           }
           
           public static String remoteAddress(SocketChannel channel){
            SocketAddress addr = channel.socket().getRemoteSocketAddress();
            String res = String.format("%s", addr);
            return res;
           }
           
           public static String localAddress(SocketChannel channel){
            SocketAddress addr = channel.socket().getLocalSocketAddress();
            String res = String.format("%s", addr);
            return addr==null? res: res.substring(1);
           }
           
           public static String getPid(){
            RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
                  String name = runtime.getName();
                  int index = name.indexOf("@");
                  if (index != -1) {
                      return name.substring(0, index);
                  }
            return null;
           }
           
           public static String getLocalHostName() {
                  try {
                      return (InetAddress.getLocalHost()).getHostName();
                  } catch (UnknownHostException uhe) {
                      String host = uhe.getMessage();
                      if (host != null) {
                          int colon = host.indexOf(':');
                          if (colon > 0) {
                              return host.substring(0, colon);
                          }
                      }
                      return "UnknownHost";
                  }
              }
          }

          啟動項目,訪問/index/ero接口,可以看到項目中生成了app-collector.logerror-collector.log兩個日志文件

          我們將Springboot服務部署在192.168.11.31這臺機器上。

          Kafka安裝和啟用

          kafka下載地址:

          http://kafka.apache.org/downloads.html

          kafka安裝步驟:首先kafka安裝需要依賴與zookeeper,所以小伙伴們先準備好zookeeper環(huán)境(三個節(jié)點即可),然后我們來一起構建kafka broker。

          ## 解壓命令:
          tar -zxvf kafka_2.12-2.1.0.tgz -C /usr/local/
          ## 改名命令:
          mv kafka_2.12-2.1.0/ kafka_2.12
          ## 進入解壓后的目錄,修改server.properties文件:
          vim /usr/local/kafka_2.12/config/server.properties
          ## 修改配置:
          broker.id=0
          port=9092
          host.name=192.168.11.51
          advertised.host.name=192.168.11.51
          log.dirs=/usr/local/kafka_2.12/kafka-logs
          num.partitions=2
          zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181

          ## 建立日志文件夾:
          mkdir /usr/local/kafka_2.12/kafka-logs

          ##啟動kafka:
          /usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &

          創(chuàng)建兩個topic

          ## 創(chuàng)建topic
          kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic app-log-collector --partitions 1  --replication-factor 1
          kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic error-log-collector --partitions 1  --replication-factor 1 

          我們可以查看一下topic情況

          kafka-topics.sh --zookeeper 192.168.11.111:2181 --topic app-log-test --describe

          可以看到已經成功啟用了app-log-collectorerror-log-collector兩個topic

          filebeat安裝和啟用

          filebeat下載

          cd /usr/local/software
          tar -zxvf filebeat-6.6.0-linux-x86_64.tar.gz -C /usr/local/
          cd /usr/local
          mv filebeat-6.6.0-linux-x86_64/ filebeat-6.6.0

          配置filebeat,可以參考下方y(tǒng)ml配置文件

          vim /usr/local/filebeat-5.6.2/filebeat.yml
          ###################### Filebeat Configuration Example #########################
          filebeat.prospectors:

          - input_type: log

            paths:
              ## app-服務名稱.log, 為什么寫死,防止發(fā)生輪轉抓取歷史數(shù)據(jù)
              - /usr/local/logs/app-collector.log
            #定義寫入 ES 時的 _type 值
            document_type: "app-log"
            multiline:
              #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表達式(匹配以 2017-11-15 08:04:23:889 時間格式開頭的字符串)
              pattern: '^\['                              # 指定匹配的表達式(匹配以 "{ 開頭的字符串)
              negate: true                                # 是否匹配到
              match: after                                # 合并到上一行的末尾
              max_lines: 2000                             # 最大的行數(shù)
              timeout: 2s                                 # 如果在規(guī)定時間沒有新的日志事件就不等待后面的日志
            fields:
              logbiz: collector
              logtopic: app-log-collector   ## 按服務劃分用作kafka topic
              evn: dev

          - input_type: log

            paths:
              - /usr/local/logs/error-collector.log
            document_type: "error-log"
            multiline:
              #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表達式(匹配以 2017-11-15 08:04:23:889 時間格式開頭的字符串)
              pattern: '^\['                              # 指定匹配的表達式(匹配以 "{ 開頭的字符串)
              negate: true                                # 是否匹配到
              match: after                                # 合并到上一行的末尾
              max_lines: 2000                             # 最大的行數(shù)
              timeout: 2s                                 # 如果在規(guī)定時間沒有新的日志事件就不等待后面的日志
            fields:
              logbiz: collector
              logtopic: error-log-collector   ## 按服務劃分用作kafka topic
              evn: dev
              
          output.kafka:
            enabled: true
            hosts: ["192.168.11.51:9092"]
            topic: '%{[fields.logtopic]}'
            partition.hash:
              reachable_only: true
            compression: gzip
            max_message_bytes: 1000000
            required_acks: 1
          logging.to_files: true

          filebeat啟動:

          檢查配置是否正確

          cd /usr/local/filebeat-6.6.0
          ./filebeat -c filebeat.yml -configtest
          ## Config OK

          啟動filebeat

          /usr/local/filebeat-6.6.0/filebeat &

          檢查是否啟動成功

          ps -ef | grep filebeat

          可以看到filebeat已經啟動成功

          然后我們訪問192.168.11.31:8001/index和192.168.11.31:8001/err,再查看kafka的logs文件,可以看到已經生成了app-log-collector-0和error-log-collector-0文件,說明filebeat已經幫我們把數(shù)據(jù)收集好放到了kafka上。

          logstash安裝

          我們在logstash的安裝目錄下新建一個文件夾

          mkdir scrpit

          然后cd進該文件,創(chuàng)建一個logstash-script.conf文件

          cd scrpit
          vim logstash-script.conf
          ## multiline 插件也可以用于其他類似的堆棧式信息,比如 linux 的內核日志。
          input {
            kafka {
              ## app-log-服務名稱
              topics_pattern => "app-log-.*"
              bootstrap_servers => "192.168.11.51:9092"
           codec => json
           consumer_threads => 1 ## 增加consumer的并行消費線程數(shù)
           decorate_events => true
              #auto_offset_rest => "latest"
           group_id => "app-log-group"
             }
             
             kafka {
              ## error-log-服務名稱
              topics_pattern => "error-log-.*"
              bootstrap_servers => "192.168.11.51:9092"
           codec => json
           consumer_threads => 1
           decorate_events => true
              #auto_offset_rest => "latest"
           group_id => "error-log-group"
             }
             
          }

          filter {
            
            ## 時區(qū)轉換
            ruby {
           code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
            }

            if "app-log" in [fields][logtopic]{
              grok {
                  ## 表達式,這里對應的是Springboot輸出的日志格式
                  match => ["message""\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
              }
            }

            if "error-log" in [fields][logtopic]{
              grok {
                  ## 表達式
                  match => ["message""\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
              }
            }
            
          }

          ## 測試輸出到控制臺:
          output {
            stdout { codec => rubydebug }
          }


          ## elasticsearch:
          output {

            if "app-log" in [fields][logtopic]{
           ## es插件
           elasticsearch {
                 # es服務地址
                  hosts => ["192.168.11.35:9200"]
                  # 用戶名密碼      
                  user => "elastic"
                  password => "123456"
                  ## 索引名,+ 號開頭的,就會自動認為后面是時間格式:
                  ## javalog-app-service-2019.01.23 
                  index => "app-log-%{[fields][logbiz]}-%{index_time}"
                  # 是否嗅探集群ip:一般設置true;http://192.168.11.35:9200/_nodes/http?pretty
                  # 通過嗅探機制進行es集群負載均衡發(fā)日志消息
                  sniffing => true
                  # logstash默認自帶一個mapping模板,進行模板覆蓋
                  template_overwrite => true
              } 
            }
            
            if "error-log" in [fields][logtopic]{
           elasticsearch {
                  hosts => ["192.168.11.35:9200"]    
                  user => "elastic"
                  password => "123456"
                  index => "error-log-%{[fields][logbiz]}-%{index_time}"
                  sniffing => true
                  template_overwrite => true
              } 
            }
            

          }

          啟動logstash

          /usr/local/logstash-6.6.0/bin/logstash -f /usr/local/logstash-6.6.0/script/logstash-script.conf &

          等待啟動成功,我們再次訪問192.168.11.31:8001/err

          可以看到控制臺開始打印日志

          ElasticSearch與Kibana

          ES和Kibana的搭建之前沒寫過博客,網(wǎng)上資料也比較多,大家可以自行搜索。

          搭建完成后,訪問Kibana的管理頁面192.168.11.35:5601,選擇Management -> Kinaba - Index Patterns

          然后Create index pattern

          • index pattern 輸入 app-log-*
          • Time Filter field name 選擇 currentDateTime

          這樣我們就成功創(chuàng)建了索引。

          我們再次訪問192.168.11.31:8001/err,這個時候就可以看到我們已經命中了一條log信息

          里面展示了日志的全量信息

          到這里,我們完整的日志收集及可視化就搭建完成了,grafana loki也不錯,用起來賊爽!

          推薦閱讀:

          世界的真實格局分析,地球人類社會底層運行原理

          不是你需要中臺,而是一名合格的架構師(附各大廠中臺建設PPT)

          企業(yè)IT技術架構規(guī)劃方案

          論數(shù)字化轉型——轉什么,如何轉?

          華為干部與人才發(fā)展手冊(附PPT)

          企業(yè)10大管理流程圖,數(shù)字化轉型從業(yè)者必備!

          【中臺實踐】華為大數(shù)據(jù)中臺架構分享.pdf

          華為的數(shù)字化轉型方法論

          華為如何實施數(shù)字化轉型(附PPT)

          超詳細280頁Docker實戰(zhàn)文檔!開放下載

          華為大數(shù)據(jù)解決方案(PPT)

          瀏覽 65
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  人人添人人澡久久婷亚洲AV | 学生妹毛片视频 | 黑人操比视频 | 先锋男人资源 | 久久久久久久中文字幕 |