<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          SpringBoot+Kafka+ELK 完成海量日志收集(超詳細)

          共 27460字,需瀏覽 55分鐘

           ·

          2021-08-23 16:00

          來源:jiandansuifeng.blog.csdn.net/

          article/details/107361190

          整體流程大概如下:

          服務器準備

          在這先列出各服務器節(jié)點,方便同學們在下文中對照節(jié)點查看相應內容

          SpringBoot項目準備

          引入log4j2替換SpringBoot默認log,demo項目結構如下:

          pom

          <dependencies>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-web</artifactId>
                  <!--  排除spring-boot-starter-logging -->
                  <exclusions>
                      <exclusion>
                          <groupId>org.springframework.boot</groupId>
                          <artifactId>spring-boot-starter-logging</artifactId>
                      </exclusion>
                  </exclusions>
              </dependency> 
           <!-- log4j2 -->
           <dependency>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-starter-log4j2</artifactId>
           </dependency> 
             <dependency>
               <groupId>com.lmax</groupId>
               <artifactId>disruptor</artifactId>
               <version>3.3.4</version>
             </dependency> 
          </dependencies> 

          log4j2.xml

          <?xml version="1.0" encoding="UTF-8"?>
          <Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600" >
              <Properties>
                  <Property name="LOG_HOME">logs</Property>
                  <property name="FILE_NAME">collector</property>
                  <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</property>
              </Properties>
              <Appenders>
                  <Console name="CONSOLE" target="SYSTEM_OUT">
                      <PatternLayout pattern="${patternLayout}"/>
                  </Console>  
                  <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
                    <PatternLayout pattern="${patternLayout}" />
                    <Policies>
                        <TimeBasedTriggeringPolicy interval="1"/>
                        <SizeBasedTriggeringPolicy size="500MB"/>
                    </Policies>
                    <DefaultRolloverStrategy max="20"/>         
                  </RollingRandomAccessFile>
                  <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
                    <PatternLayout pattern="${patternLayout}" />
                    <Filters>
                        <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
                    </Filters>              
                    <Policies>
                        <TimeBasedTriggeringPolicy interval="1"/>
                        <SizeBasedTriggeringPolicy size="500MB"/>
                    </Policies>
                    <DefaultRolloverStrategy max="20"/>         
                  </RollingRandomAccessFile>            
              </Appenders>
              <Loggers>
                  <!-- 業(yè)務相關 異步logger -->
                  <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
                    <AppenderRef ref="appAppender"/>
                  </AsyncLogger>
                  <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
                    <AppenderRef ref="errorAppender"/>
                  </AsyncLogger>       
                  <Root level="info">
                      <Appender-Ref ref="CONSOLE"/>
                      <Appender-Ref ref="appAppender"/>
                      <AppenderRef ref="errorAppender"/>
                  </Root>         
              </Loggers>
          </Configuration>

          IndexController

          測試Controller,用以打印日志進行調試

          @Slf4j
          @RestController
          public class IndexController {

           @RequestMapping(value = "/index")
           public String index() {
            InputMDC.putMDC();
            
            log.info("我是一條info日志");
            
            log.warn("我是一條warn日志");

            log.error("我是一條error日志");
            
            return "idx";
           }


           @RequestMapping(value = "/err")
           public String err() {
            InputMDC.putMDC();
            try {
             int a = 1/0;
            } catch (Exception e) {
             log.error("算術異常", e);
            }
            return "err";
           }
           
          }

          InputMDC

          用以獲取log中的[%X{hostName}][%X{ip}][%X{applicationName}]三個字段值

          @Component
          public class InputMDC implements EnvironmentAware {

           private static Environment environment;
           
           @Override
           public void setEnvironment(Environment environment) {
            InputMDC.environment = environment;
           }
           
           public static void putMDC() {
            MDC.put("hostName", NetUtil.getLocalHostName());
            MDC.put("ip", NetUtil.getLocalIp());
            MDC.put("applicationName", environment.getProperty("spring.application.name"));
           }

          }

          NetUtil

          public class NetUtil {   
           
           public static String normalizeAddress(String address){
            String[] blocks = address.split("[:]");
            if(blocks.length > 2){
             throw new IllegalArgumentException(address + " is invalid");
            }
            String host = blocks[0];
            int port = 80;
            if(blocks.length > 1){
             port = Integer.valueOf(blocks[1]);
            } else {
             address += ":"+port; //use default 80
            } 
            String serverAddr = String.format("%s:%d", host, port);
            return serverAddr;
           }
           
           public static String getLocalAddress(String address){
            String[] blocks = address.split("[:]");
            if(blocks.length != 2){
             throw new IllegalArgumentException(address + " is invalid address");
            } 
            String host = blocks[0];
            int port = Integer.valueOf(blocks[1]);
            
            if("0.0.0.0".equals(host)){
             return String.format("%s:%d",NetUtil.getLocalIp(), port);
            }
            return address;
           }
           
           private static int matchedIndex(String ip, String[] prefix){
            for(int i=0; i<prefix.length; i++){
             String p = prefix[i];
             if("*".equals(p)){ //*, assumed to be IP
              if(ip.startsWith("127.") ||
                 ip.startsWith("10.") || 
                 ip.startsWith("172.") ||
                 ip.startsWith("192.")){
               continue;
              }
              return i;
             } else {
              if(ip.startsWith(p)){
               return i;
              }
             } 
            }
            
            return -1;
           }
           
           public static String getLocalIp(String ipPreference) {
            if(ipPreference == null){
             ipPreference = "*>10>172>192>127";
            }
            String[] prefix = ipPreference.split("[> ]+");
            try {
             Pattern pattern = Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");
             Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
             String matchedIp = null;
             int matchedIdx = -1;
             while (interfaces.hasMoreElements()) {
              NetworkInterface ni = interfaces.nextElement();
              Enumeration<InetAddress> en = ni.getInetAddresses(); 
              while (en.hasMoreElements()) {
               InetAddress addr = en.nextElement();
               String ip = addr.getHostAddress();  
               Matcher matcher = pattern.matcher(ip);
               if (matcher.matches()) {  
                int idx = matchedIndex(ip, prefix);
                if(idx == -1continue;
                if(matchedIdx == -1){
                 matchedIdx = idx;
                 matchedIp = ip;
                } else {
                 if(matchedIdx>idx){
                  matchedIdx = idx;
                  matchedIp = ip;
                 }
                }
               } 
              } 
             } 
             if(matchedIp != nullreturn matchedIp;
             return "127.0.0.1";
            } catch (Exception e) { 
             return "127.0.0.1";
            }
           }
           
           public static String getLocalIp() {
            return getLocalIp("*>10>172>192>127");
           }
           
           public static String remoteAddress(SocketChannel channel){
            SocketAddress addr = channel.socket().getRemoteSocketAddress();
            String res = String.format("%s", addr);
            return res;
           }
           
           public static String localAddress(SocketChannel channel){
            SocketAddress addr = channel.socket().getLocalSocketAddress();
            String res = String.format("%s", addr);
            return addr==null? res: res.substring(1);
           }
           
           public static String getPid(){
            RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
                  String name = runtime.getName();
                  int index = name.indexOf("@");
                  if (index != -1) {
                      return name.substring(0, index);
                  }
            return null;
           }
           
           public static String getLocalHostName() {
                  try {
                      return (InetAddress.getLocalHost()).getHostName();
                  } catch (UnknownHostException uhe) {
                      String host = uhe.getMessage();
                      if (host != null) {
                          int colon = host.indexOf(':');
                          if (colon > 0) {
                              return host.substring(0, colon);
                          }
                      }
                      return "UnknownHost";
                  }
              }
          }

          啟動項目,訪問/index/ero接口,可以看到項目中生成了app-collector.logerror-collector.log兩個日志文件

          我們將Springboot服務部署在192.168.11.31這臺機器上。

          Kafka安裝和啟用

          kafka下載地址:

          http://kafka.apache.org/downloads.html

          kafka安裝步驟:首先kafka安裝需要依賴與zookeeper,所以小伙伴們先準備好zookeeper環(huán)境(三個節(jié)點即可),然后我們來一起構建kafka broker。

          ## 解壓命令:
          tar -zxvf kafka_2.12-2.1.0.tgz -C /usr/local/
          ## 改名命令:
          mv kafka_2.12-2.1.0/ kafka_2.12
          ## 進入解壓后的目錄,修改server.properties文件:
          vim /usr/local/kafka_2.12/config/server.properties
          ## 修改配置:
          broker.id=0
          port=9092
          host.name=192.168.11.51
          advertised.host.name=192.168.11.51
          log.dirs=/usr/local/kafka_2.12/kafka-logs
          num.partitions=2
          zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181

          ## 建立日志文件夾:
          mkdir /usr/local/kafka_2.12/kafka-logs

          ##啟動kafka:
          /usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &

          創(chuàng)建兩個topic

          ## 創(chuàng)建topic
          kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic app-log-collector --partitions 1  --replication-factor 1
          kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic error-log-collector --partitions 1  --replication-factor 1 

          我們可以查看一下topic情況

          kafka-topics.sh --zookeeper 192.168.11.111:2181 --topic app-log-test --describe

          可以看到已經成功啟用了app-log-collectorerror-log-collector兩個topic

          filebeat安裝和啟用

          filebeat下載

          cd /usr/local/software
          tar -zxvf filebeat-6.6.0-linux-x86_64.tar.gz -C /usr/local/
          cd /usr/local
          mv filebeat-6.6.0-linux-x86_64/ filebeat-6.6.0

          配置filebeat,可以參考下方y(tǒng)ml配置文件

          vim /usr/local/filebeat-5.6.2/filebeat.yml
          ###################### Filebeat Configuration Example #########################
          filebeat.prospectors:

          - input_type: log

            paths:
              ## app-服務名稱.log, 為什么寫死,防止發(fā)生輪轉抓取歷史數(shù)據(jù)
              - /usr/local/logs/app-collector.log
            #定義寫入 ES 時的 _type 值
            document_type: "app-log"
            multiline:
              #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表達式(匹配以 2017-11-15 08:04:23:889 時間格式開頭的字符串)
              pattern: '^\['                              # 指定匹配的表達式(匹配以 "{ 開頭的字符串)
              negate: true                                # 是否匹配到
              match: after                                # 合并到上一行的末尾
              max_lines: 2000                             # 最大的行數(shù)
              timeout: 2s                                 # 如果在規(guī)定時間沒有新的日志事件就不等待后面的日志
            fields:
              logbiz: collector
              logtopic: app-log-collector   ## 按服務劃分用作kafka topic
              evn: dev

          - input_type: log

            paths:
              - /usr/local/logs/error-collector.log
            document_type: "error-log"
            multiline:
              #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表達式(匹配以 2017-11-15 08:04:23:889 時間格式開頭的字符串)
              pattern: '^\['                              # 指定匹配的表達式(匹配以 "{ 開頭的字符串)
              negate: true                                # 是否匹配到
              match: after                                # 合并到上一行的末尾
              max_lines: 2000                             # 最大的行數(shù)
              timeout: 2s                                 # 如果在規(guī)定時間沒有新的日志事件就不等待后面的日志
            fields:
              logbiz: collector
              logtopic: error-log-collector   ## 按服務劃分用作kafka topic
              evn: dev
              
          output.kafka:
            enabled: true
            hosts: ["192.168.11.51:9092"]
            topic: '%{[fields.logtopic]}'
            partition.hash:
              reachable_only: true
            compression: gzip
            max_message_bytes: 1000000
            required_acks: 1
          logging.to_files: true

          filebeat啟動:

          檢查配置是否正確

          cd /usr/local/filebeat-6.6.0
          ./filebeat -c filebeat.yml -configtest
          ## Config OK

          啟動filebeat

          /usr/local/filebeat-6.6.0/filebeat &

          檢查是否啟動成功

          ps -ef | grep filebeat

          可以看到filebeat已經啟動成功

          然后我們訪問192.168.11.31:8001/index和192.168.11.31:8001/err,再查看kafka的logs文件,可以看到已經生成了app-log-collector-0和error-log-collector-0文件,說明filebeat已經幫我們把數(shù)據(jù)收集好放到了kafka上。

          logstash安裝

          我們在logstash的安裝目錄下新建一個文件夾

          mkdir scrpit

          然后cd進該文件,創(chuàng)建一個logstash-script.conf文件

          cd scrpit
          vim logstash-script.conf
          ## multiline 插件也可以用于其他類似的堆棧式信息,比如 linux 的內核日志。
          input {
            kafka {
              ## app-log-服務名稱
              topics_pattern => "app-log-.*"
              bootstrap_servers => "192.168.11.51:9092"
           codec => json
           consumer_threads => 1 ## 增加consumer的并行消費線程數(shù)
           decorate_events => true
              #auto_offset_rest => "latest"
           group_id => "app-log-group"
             }
             
             kafka {
              ## error-log-服務名稱
              topics_pattern => "error-log-.*"
              bootstrap_servers => "192.168.11.51:9092"
           codec => json
           consumer_threads => 1
           decorate_events => true
              #auto_offset_rest => "latest"
           group_id => "error-log-group"
             }
             
          }

          filter {
            
            ## 時區(qū)轉換
            ruby {
           code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
            }

            if "app-log" in [fields][logtopic]{
              grok {
                  ## 表達式,這里對應的是Springboot輸出的日志格式
                  match => ["message""\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
              }
            }

            if "error-log" in [fields][logtopic]{
              grok {
                  ## 表達式
                  match => ["message""\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
              }
            }
            
          }

          ## 測試輸出到控制臺:
          output {
            stdout { codec => rubydebug }
          }


          ## elasticsearch:
          output {

            if "app-log" in [fields][logtopic]{
           ## es插件
           elasticsearch {
                 # es服務地址
                  hosts => ["192.168.11.35:9200"]
                  # 用戶名密碼      
                  user => "elastic"
                  password => "123456"
                  ## 索引名,+ 號開頭的,就會自動認為后面是時間格式:
                  ## javalog-app-service-2019.01.23 
                  index => "app-log-%{[fields][logbiz]}-%{index_time}"
                  # 是否嗅探集群ip:一般設置true;http://192.168.11.35:9200/_nodes/http?pretty
                  # 通過嗅探機制進行es集群負載均衡發(fā)日志消息
                  sniffing => true
                  # logstash默認自帶一個mapping模板,進行模板覆蓋
                  template_overwrite => true
              } 
            }
            
            if "error-log" in [fields][logtopic]{
           elasticsearch {
                  hosts => ["192.168.11.35:9200"]    
                  user => "elastic"
                  password => "123456"
                  index => "error-log-%{[fields][logbiz]}-%{index_time}"
                  sniffing => true
                  template_overwrite => true
              } 
            }
            

          }

          啟動logstash

          /usr/local/logstash-6.6.0/bin/logstash -f /usr/local/logstash-6.6.0/script/logstash-script.conf &

          等待啟動成功,我們再次訪問192.168.11.31:8001/err

          可以看到控制臺開始打印日志

          ElasticSearch與Kibana

          ES和Kibana的搭建之前沒寫過博客,網上資料也比較多,大家可以自行搜索。

          搭建完成后,訪問Kibana的管理頁面192.168.11.35:5601,選擇Management -> Kinaba - Index Patterns

          然后Create index pattern

          • index pattern 輸入 app-log-*
          • Time Filter field name 選擇 currentDateTime

          這樣我們就成功創(chuàng)建了索引。

          我們再次訪問192.168.11.31:8001/err,這個時候就可以看到我們已經命中了一條log信息

          里面展示了日志的全量信息

          到這里,我們完整的日志收集及可視化就搭建完成了!

          瀏覽 62
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天天爱激情 | 成人区人妻精品一 | 91丨色丨国产熟女蘑菇 | 欧美综合社区 | 逼特逼在线视频 |