<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          SpringBoot + Kafka + ELK 完成海量日志收集(超詳細(xì))

          2021-09-12 15:36

          上一篇:深夜看了張一鳴的微博,讓我越想越后怕

          整體流程大概如下:

          服務(wù)器準(zhǔn)備

          在這先列出各服務(wù)器節(jié)點(diǎn),方便同學(xué)們?cè)谙挛闹袑?duì)照節(jié)點(diǎn)查看相應(yīng)內(nèi)容

          SpringBoot項(xiàng)目準(zhǔn)備

          引入log4j2替換SpringBoot默認(rèn)log,demo項(xiàng)目結(jié)構(gòu)如下:


          pom

          <dependencies>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-web</artifactId>
                  <!--  排除spring-boot-starter-logging -->
                  <exclusions>
                      <exclusion>
                          <groupId>org.springframework.boot</groupId>
                          <artifactId>spring-boot-starter-logging</artifactId>
                      </exclusion>
                  </exclusions>
              </dependency> 
           <!-- log4j2 -->
           <dependency>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-starter-log4j2</artifactId>
           </dependency> 
             <dependency>
               <groupId>com.lmax</groupId>
               <artifactId>disruptor</artifactId>
               <version>3.3.4</version>
             </dependency> 
          </dependencies> 

          log4j2.xml

          <?xml version="1.0" encoding="UTF-8"?>
          <Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600" >
              <Properties>
                  <Property name="LOG_HOME">logs</Property>
                  <property name="FILE_NAME">collector</property>
                  <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</property>
              </Properties>
              <Appenders>
                  <Console name="CONSOLE" target="SYSTEM_OUT">
                      <PatternLayout pattern="${patternLayout}"/>
                  </Console>  
                  <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
                    <PatternLayout pattern="${patternLayout}" />
                    <Policies>
                        <TimeBasedTriggeringPolicy interval="1"/>
                        <SizeBasedTriggeringPolicy size="500MB"/>
                    </Policies>
                    <DefaultRolloverStrategy max="20"/>         
                  </RollingRandomAccessFile>
                  <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
                    <PatternLayout pattern="${patternLayout}" />
                    <Filters>
                        <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
                    </Filters>              
                    <Policies>
                        <TimeBasedTriggeringPolicy interval="1"/>
                        <SizeBasedTriggeringPolicy size="500MB"/>
                    </Policies>
                    <DefaultRolloverStrategy max="20"/>         
                  </RollingRandomAccessFile>            
              </Appenders>
              <Loggers>
                  <!-- 業(yè)務(wù)相關(guān) 異步logger -->
                  <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
                    <AppenderRef ref="appAppender"/>
                  </AsyncLogger>
                  <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
                    <AppenderRef ref="errorAppender"/>
                  </AsyncLogger>       
                  <Root level="info">
                      <Appender-Ref ref="CONSOLE"/>
                      <Appender-Ref ref="appAppender"/>
                      <AppenderRef ref="errorAppender"/>
                  </Root>         
              </Loggers>
          </Configuration>
          IndexController

          測試Controller,用以打印日志進(jìn)行調(diào)試

          @Slf4j
          @RestController
          public class IndexController {

           @RequestMapping(value = "/index")
           public String index() {
            InputMDC.putMDC();
            
            log.info("我是一條info日志");
            
            log.warn("我是一條warn日志");

            log.error("我是一條error日志");
            
            return "idx";
           }


           @RequestMapping(value = "/err")
           public String err() {
            InputMDC.putMDC();
            try {
             int a = 1/0;
            } catch (Exception e) {
             log.error("算術(shù)異常", e);
            }
            return "err";
           }
           
          }
          InputMDC

          用以獲取log中的[%X{hostName}]、[%X{ip}]、[%X{applicationName}]三個(gè)字段值

          @Component
          public class InputMDC implements EnvironmentAware {

           private static Environment environment;
           
           @Override
           public void setEnvironment(Environment environment) {
            InputMDC.environment = environment;
           }
           
           public static void putMDC() {
            MDC.put("hostName", NetUtil.getLocalHostName());
            MDC.put("ip", NetUtil.getLocalIp());
            MDC.put("applicationName", environment.getProperty("spring.application.name"));
           }

          }
          NetUtil
          public class NetUtil {   
           
           public static String normalizeAddress(String address){
            String[] blocks = address.split("[:]");
            if(blocks.length > 2){
             throw new IllegalArgumentException(address + " is invalid");
            }
            String host = blocks[0];
            int port = 80;
            if(blocks.length > 1){
             port = Integer.valueOf(blocks[1]);
            } else {
             address += ":"+port; //use default 80
            } 
            String serverAddr = String.format("%s:%d", host, port);
            return serverAddr;
           }
           
           public static String getLocalAddress(String address){
            String[] blocks = address.split("[:]");
            if(blocks.length != 2){
             throw new IllegalArgumentException(address + " is invalid address");
            } 
            String host = blocks[0];
            int port = Integer.valueOf(blocks[1]);
            
            if("0.0.0.0".equals(host)){
             return String.format("%s:%d",NetUtil.getLocalIp(), port);
            }
            return address;
           }
           
           private static int matchedIndex(String ip, String[] prefix){
            for(int i=0; i<prefix.length; i++){
             String p = prefix[i];
             if("*".equals(p)){ //*, assumed to be IP
              if(ip.startsWith("127.") ||
                 ip.startsWith("10.") || 
                 ip.startsWith("172.") ||
                 ip.startsWith("192.")){
               continue;
              }
              return i;
             } else {
              if(ip.startsWith(p)){
               return i;
              }
             } 
            }
            
            return -1;
           }
           
           public static String getLocalIp(String ipPreference) {
            if(ipPreference == null){
             ipPreference = "*>10>172>192>127";
            }
            String[] prefix = ipPreference.split("[> ]+");
            try {
             Pattern pattern = Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");
             Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
             String matchedIp = null;
             int matchedIdx = -1;
             while (interfaces.hasMoreElements()) {
              NetworkInterface ni = interfaces.nextElement();
              Enumeration<InetAddress> en = ni.getInetAddresses(); 
              while (en.hasMoreElements()) {
               InetAddress addr = en.nextElement();
               String ip = addr.getHostAddress();  
               Matcher matcher = pattern.matcher(ip);
               if (matcher.matches()) {  
                int idx = matchedIndex(ip, prefix);
                if(idx == -1) continue;
                if(matchedIdx == -1){
                 matchedIdx = idx;
                 matchedIp = ip;
                } else {
                 if(matchedIdx>idx){
                  matchedIdx = idx;
                  matchedIp = ip;
                 }
                }
               } 
              } 
             } 
             if(matchedIp != null) return matchedIp;
             return "127.0.0.1";
            } catch (Exception e) { 
             return "127.0.0.1";
            }
           }
           
           public static String getLocalIp() {
            return getLocalIp("*>10>172>192>127");
           }
           
           public static String remoteAddress(SocketChannel channel){
            SocketAddress addr = channel.socket().getRemoteSocketAddress();
            String res = String.format("%s", addr);
            return res;
           }
           
           public static String localAddress(SocketChannel channel){
            SocketAddress addr = channel.socket().getLocalSocketAddress();
            String res = String.format("%s", addr);
            return addr==null? res: res.substring(1);
           }
           
           public static String getPid(){
            RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
                  String name = runtime.getName();
                  int index = name.indexOf("@");
                  if (index != -1) {
                      return name.substring(0, index);
                  }
            return null;
           }
           
           public static String getLocalHostName() {
                  try {
                      return (InetAddress.getLocalHost()).getHostName();
                  } catch (UnknownHostException uhe) {
                      String host = uhe.getMessage();
                      if (host != null) {
                          int colon = host.indexOf(':');
                          if (colon > 0) {
                              return host.substring(0, colon);
                          }
                      }
                      return "UnknownHost";
                  }
              }
          }

          啟動(dòng)項(xiàng)目,訪問/index/ero接口,可以看到項(xiàng)目中生成了app-collector.logerror-collector.log兩個(gè)日志文件:

          我們將Springboot服務(wù)部署在192.168.11.31這臺(tái)機(jī)器上。

          Kafka安裝和啟用

          kafka下載地址:

          http://kafka.apache.org/downloads.html

          kafka安裝步驟:首先kafka安裝需要依賴與zookeeper,所以小伙伴們先準(zhǔn)備好zookeeper環(huán)境(三個(gè)節(jié)點(diǎn)即可),然后我們來一起構(gòu)建kafka broker。

          另外,Kafka 系列面試題和答案全部整理好了,微信搜索互聯(lián)網(wǎng)架構(gòu)師,在后臺(tái)發(fā)送:面試,可以在線閱讀。

          ## 解壓命令:
          tar -zxvf kafka_2.12-2.1.0.tgz -C /usr/local/
          ## 改名命令:
          mv kafka_2.12-2.1.0/ kafka_2.12
          ## 進(jìn)入解壓后的目錄,修改server.properties文件:
          vim /usr/local/kafka_2.12/config/server.properties
          ## 修改配置:
          broker.id=0
          port=9092
          host.name=192.168.11.51
          advertised.host.name=192.168.11.51
          log.dirs=/usr/local/kafka_2.12/kafka-logs
          num.partitions=2
          zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181

          ## 建立日志文件夾:
          mkdir /usr/local/kafka_2.12/kafka-logs

          ##啟動(dòng)kafka:
          /usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &

          創(chuàng)建兩個(gè)topic

          ## 創(chuàng)建topic
          kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic app-log-collector --partitions 1  --replication-factor 1
          kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic error-log-collector --partitions 1  --replication-factor 1 

          我們可以查看一下topic情況

          kafka-topics.sh --zookeeper 192.168.11.111:2181 --topic app-log-test --describe

          可以看到已經(jīng)成功啟用了app-log-collectorerror-log-collector兩個(gè)topic


          filebeat安裝和啟用

          filebeat下載

          cd /usr/local/software
          tar -zxvf filebeat-6.6.0-linux-x86_64.tar.gz -C /usr/local/
          cd /usr/local
          mv filebeat-6.6.0-linux-x86_64/ filebeat-6.6.0

          配置filebeat,可以參考下方y(tǒng)ml配置文件

          vim /usr/local/filebeat-5.6.2/filebeat.yml
          ###################### Filebeat Configuration Example #########################
          filebeat.prospectors:

          - input_type: log

            paths:
              ## app-服務(wù)名稱.log, 為什么寫死,防止發(fā)生輪轉(zhuǎn)抓取歷史數(shù)據(jù)
              - /usr/local/logs/app-collector.log
            #定義寫入 ES 時(shí)的 _type 值
            document_type: "app-log"
            multiline:
              #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表達(dá)式(匹配以 2017-11-15 08:04:23:889 時(shí)間格式開頭的字符串)
              pattern: '^\['                              # 指定匹配的表達(dá)式(匹配以 "{ 開頭的字符串)
              negate: true                                # 是否匹配到
              match: after                                # 合并到上一行的末尾
              max_lines: 2000                             # 最大的行數(shù)
              timeout: 2s                                 # 如果在規(guī)定時(shí)間沒有新的日志事件就不等待后面的日志
            fields:
              logbiz: collector
              logtopic: app-log-collector   ## 按服務(wù)劃分用作kafka topic
              evn: dev

          - input_type: log

            paths:
              - /usr/local/logs/error-collector.log
            document_type: "error-log"
            multiline:
              #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表達(dá)式(匹配以 2017-11-15 08:04:23:889 時(shí)間格式開頭的字符串)
              pattern: '^\['                              # 指定匹配的表達(dá)式(匹配以 "{ 開頭的字符串)
              negate: true                                # 是否匹配到
              match: after                                # 合并到上一行的末尾
              max_lines: 2000                             # 最大的行數(shù)
              timeout: 2s                                 # 如果在規(guī)定時(shí)間沒有新的日志事件就不等待后面的日志
            fields:
              logbiz: collector
              logtopic: error-log-collector   ## 按服務(wù)劃分用作kafka topic
              evn: dev
              
          output.kafka:
            enabled: true
            hosts: ["192.168.11.51:9092"]
            topic: '%{[fields.logtopic]}'
            partition.hash:
              reachable_only: true
            compression: gzip
            max_message_bytes: 1000000
            required_acks: 1
          logging.to_files: true

          filebeat啟動(dòng):

          檢查配置是否正確

          cd /usr/local/filebeat-6.6.0
          ./filebeat -c filebeat.yml -configtest
          ## Config OK

          啟動(dòng)filebeat

          /usr/local/filebeat-6.6.0/filebeat &

          檢查是否啟動(dòng)成功

          ps -ef | grep filebeat
          可以看到filebeat已經(jīng)啟動(dòng)成功

          然后我們?cè)L問192.168.11.31:8001/index和192.168.11.31:8001/err,再查看kafka的logs文件,可以看到已經(jīng)生成了app-log-collector-0和error-log-collector-0文件,說明filebeat已經(jīng)幫我們把數(shù)據(jù)收集好放到了kafka上。

          logstash安裝

          我們?cè)趌ogstash的安裝目錄下新建一個(gè)文件夾

          mkdir scrpit
          然后cd進(jìn)該文件,創(chuàng)建一個(gè)logstash-script.conf文件
          cd scrpit
          vim logstash-script.conf
          ## multiline 插件也可以用于其他類似的堆棧式信息,比如 linux 的內(nèi)核日志。
          input {
            kafka {
              ## app-log-服務(wù)名稱
              topics_pattern => "app-log-.*"
              bootstrap_servers => "192.168.11.51:9092"
           codec => json
           consumer_threads => 1 ## 增加consumer的并行消費(fèi)線程數(shù)
           decorate_events => true
              #auto_offset_rest => "latest"
           group_id => "app-log-group"
             }
             
             kafka {
              ## error-log-服務(wù)名稱
              topics_pattern => "error-log-.*"
              bootstrap_servers => "192.168.11.51:9092"
           codec => json
           consumer_threads => 1
           decorate_events => true
              #auto_offset_rest => "latest"
           group_id => "error-log-group"
             }
             
          }

          filter {
            
            ## 時(shí)區(qū)轉(zhuǎn)換
            ruby {
           code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
            }

            if "app-log" in [fields][logtopic]{
              grok {
                  ## 表達(dá)式,這里對(duì)應(yīng)的是Springboot輸出的日志格式
                  match => ["message""\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
              }
            }

            if "error-log" in [fields][logtopic]{
              grok {
                  ## 表達(dá)式
                  match => ["message""\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
              }
            }
            
          }

          ## 測試輸出到控制臺(tái):
          output {
            stdout { codec => rubydebug }
          }


          ## elasticsearch:
          output {

            if "app-log" in [fields][logtopic]{
           ## es插件
           elasticsearch {
                 # es服務(wù)地址
                  hosts => ["192.168.11.35:9200"]
                  # 用戶名密碼      
                  user => "elastic"
                  password => "123456"
                  ## 索引名,+ 號(hào)開頭的,就會(huì)自動(dòng)認(rèn)為后面是時(shí)間格式:
                  ## javalog-app-service-2019.01.23 
                  index => "app-log-%{[fields][logbiz]}-%{index_time}"
                  # 是否嗅探集群ip:一般設(shè)置true;http://192.168.11.35:9200/_nodes/http?pretty
                  # 通過嗅探機(jī)制進(jìn)行es集群負(fù)載均衡發(fā)日志消息
                  sniffing => true
                  # logstash默認(rèn)自帶一個(gè)mapping模板,進(jìn)行模板覆蓋
                  template_overwrite => true
              } 
            }
            
            if "error-log" in [fields][logtopic]{
           elasticsearch {
                  hosts => ["192.168.11.35:9200"]    
                  user => "elastic"
                  password => "123456"
                  index => "error-log-%{[fields][logbiz]}-%{index_time}"
                  sniffing => true
                  template_overwrite => true
              } 
            }
            

          }

          啟動(dòng)logstash

          /usr/local/logstash-6.6.0/bin/logstash -f /usr/local/logstash-6.6.0/script/logstash-script.conf &

          等待啟動(dòng)成功,我們?cè)俅卧L問192.168.11.31:8001/err

          可以看到控制臺(tái)開始打印日志

          ElasticSearch與Kibana

          ES和Kibana的搭建之前沒寫過博客,網(wǎng)上資料也比較多,大家可以自行搜索。

          搭建完成后,訪問Kibana的管理頁面192.168.11.35:5601,選擇Management -> Kinaba - Index Patterns

          然后Create index pattern

          • index pattern 輸入 app-log-*
          • Time Filter field name 選擇 currentDateTime

          這樣我們就成功創(chuàng)建了索引。

          我們?cè)俅卧L問192.168.11.31:8001/err,這個(gè)時(shí)候就可以看到我們已經(jīng)命中了一條log信息

          里面展示了日志的全量信息

          到這里,我們完整的日志收集及可視化就搭建完成了!

          原文鏈接:https://blog.csdn.net/lt326030434/article/details/107361190

          感謝您的閱讀,也歡迎您發(fā)表關(guān)于這篇文章的任何建議,關(guān)注我,技術(shù)不迷茫!小編到你上高速。

              · END ·
          最后,關(guān)注公眾號(hào)互聯(lián)網(wǎng)架構(gòu)師,在后臺(tái)回復(fù):2T,可以獲取我整理的 Java 系列面試題和答案,非常齊全。


          正文結(jié)束


          推薦閱讀 ↓↓↓

          1.不認(rèn)命,從10年流水線工人,到谷歌上班的程序媛,一位湖南妹子的勵(lì)志故事

          2.如何才能成為優(yōu)秀的架構(gòu)師?

          3.從零開始搭建創(chuàng)業(yè)公司后臺(tái)技術(shù)棧

          4.程序員一般可以從什么平臺(tái)接私活?

          5.37歲程序員被裁,120天沒找到工作,無奈去小公司,結(jié)果懵了...

          6.IntelliJ IDEA 2019.3 首個(gè)最新訪問版本發(fā)布,新特性搶先看

          7.這封“領(lǐng)導(dǎo)痛批95后下屬”的郵件,句句扎心!

          8.15張圖看懂瞎忙和高效的區(qū)別!

          一個(gè)人學(xué)習(xí)、工作很迷茫?


          點(diǎn)擊「閱讀原文」加入我們的小圈子!

          瀏覽 40
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  www.91久久爱 | 天天干 夜夜操 | 久草手机视频在线 | 云南省医疗服务质量评估中心官网 | 亚洲视屏在线观看 |