<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          SpringBoot + Kafka + ELK 完成海量日志收集(超詳細)

          共 304字,需瀏覽 1分鐘

           ·

          2021-10-13 07:20

          程序員的成長之路
          互聯(lián)網(wǎng)/程序員/技術/資料共享?
          關注


          閱讀本文大概需要 8 分鐘。

          來自:https://blog.csdn.net/lt326030434/article/details/107361190

          整體流程大概如下:



          服務器準備

          在這先列出各服務器節(jié)點,方便同學們在下文中對照節(jié)點查看相應內(nèi)容

          SpringBoot項目準備

          引入log4j2替換SpringBoot默認log,demo項目結構如下:
          pom

          ????
          ????????org.springframework.boot
          ????????spring-boot-starter-web
          ????????
          ????????
          ????????????
          ????????????????org.springframework.boot
          ????????????????spring-boot-starter-logging
          ????????????

          ????????

          ????
          ?
          ?
          ?
          ?????org.springframework.boot
          ?????spring-boot-starter-log4j2
          ?
          ?
          ???
          ?????com.lmax
          ?????disruptor
          ?????3.3.4
          ???
          ?
          ?
          log4j2.xml
          "1.0"?encoding="UTF-8"?>
          "INFO"?schema="Log4J-V2.0.xsd"?monitorInterval="600"?>
          ????
          ????????"LOG_HOME">logs
          ????????"FILE_NAME">collector
          ????????"patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}]?[%level{length=5}]?[%thread-%tid]?[%logger]?[%X{hostName}]?[%X{ip}]?[%X{applicationName}]?[%F,%L,%C,%M]?[%m]?##?'%ex'%n
          ????

          ????
          ????????"CONSOLE"?target="SYSTEM_OUT">
          ????????????"${patternLayout}"/>
          ??????????
          ????????"appAppender"?fileName="${LOG_HOME}/app-${FILE_NAME}.log"?filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log"?>
          ??????????"${patternLayout}"?/>
          ??????????
          ??????????????"1"/>
          ??????????????"500MB"/>
          ??????????

          ??????????"20"/>?????????
          ????????
          ????????"errorAppender"?fileName="${LOG_HOME}/error-${FILE_NAME}.log"?filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log"?>
          ??????????"${patternLayout}"?/>
          ??????????
          ??????????????"warn"?onMatch="ACCEPT"?onMismatch="DENY"/>
          ??????????
          ??????????????
          ??????????
          ??????????????"1"/>
          ??????????????"500MB"/>
          ??????????

          ??????????"20"/>?????????
          ????????????????????
          ????

          ????
          ????????
          ????????"com.bfxy.*"?level="info"?includeLocation="true">
          ??????????"appAppender"/>
          ????????
          ????????"com.bfxy.*"?level="info"?includeLocation="true">
          ??????????"errorAppender"/>
          ???????????????
          ????????"info">
          ????????????"CONSOLE"/>
          ????????????"appAppender"/>
          ????????????"errorAppender"/>
          ?????????????????
          ????


          IndexController
          測試Controller,用以打印日志進行調(diào)試
          @Slf4j
          @RestController
          public?class?IndexController?{

          ?@RequestMapping(value?=?"/index")
          ?public?String?index()?{
          ??InputMDC.putMDC();
          ??
          ??log.info("我是一條info日志");
          ??
          ??log.warn("我是一條warn日志");

          ??log.error("我是一條error日志");
          ??
          ??return?"idx";
          ?}


          ?@RequestMapping(value?=?"/err")
          ?public?String?err()?{
          ??InputMDC.putMDC();
          ??try?{
          ???int?a?=?1/0;
          ??}?catch?(Exception?e)?{
          ???log.error("算術異常",?e);
          ??}
          ??return?"err";
          ?}
          ?
          }
          InputMDC
          用以獲取log中的[%X{hostName}][%X{ip}][%X{applicationName}]三個字段值
          @Component
          public?class?InputMDC?implements?EnvironmentAware?{

          ?private?static?Environment?environment;
          ?
          ?@Override
          ?public?void?setEnvironment(Environment?environment)?{
          ??InputMDC.environment?=?environment;
          ?}
          ?
          ?public?static?void?putMDC()?{
          ??MDC.put("hostName",?NetUtil.getLocalHostName());
          ??MDC.put("ip",?NetUtil.getLocalIp());
          ??MDC.put("applicationName",?environment.getProperty("spring.application.name"));
          ?}

          }
          NetUtil
          public?class?NetUtil?{???
          ?
          ?public?static?String?normalizeAddress(String?address){
          ??String[]?blocks?=?address.split("[:]");
          ??if(blocks.length?>?2){
          ???throw?new?IllegalArgumentException(address?+?"?is?invalid");
          ??}
          ??String?host?=?blocks[0];
          ??int?port?=?80;
          ??if(blocks.length?>?1){
          ???port?=?Integer.valueOf(blocks[1]);
          ??}?else?{
          ???address?+=?":"+port;?//use?default?80
          ??}?
          ??String?serverAddr?=?String.format("%s:%d",?host,?port);
          ??return?serverAddr;
          ?}
          ?
          ?public?static?String?getLocalAddress(String?address){
          ??String[]?blocks?=?address.split("[:]");
          ??if(blocks.length?!=?2){
          ???throw?new?IllegalArgumentException(address?+?"?is?invalid?address");
          ??}?
          ??String?host?=?blocks[0];
          ??int?port?=?Integer.valueOf(blocks[1]);
          ??
          ??if("0.0.0.0".equals(host)){
          ???return?String.format("%s:%d",NetUtil.getLocalIp(),?port);
          ??}
          ??return?address;
          ?}
          ?
          ?private?static?int?matchedIndex(String?ip,?String[]?prefix){
          ??for(int?i=0;?i???String?p?=?prefix[i];
          ???if("*".equals(p)){?//*,?assumed?to?be?IP
          ????if(ip.startsWith("127.")?||
          ???????ip.startsWith("10.")?||?
          ???????ip.startsWith("172.")?||
          ???????ip.startsWith("192.")){
          ?????continue;
          ????}
          ????return?i;
          ???}?else?{
          ????if(ip.startsWith(p)){
          ?????return?i;
          ????}
          ???}?
          ??}
          ??
          ??return?-1;
          ?}
          ?
          ?public?static?String?getLocalIp(String?ipPreference)?{
          ??if(ipPreference?==?null){
          ???ipPreference?=?"*>10>172>192>127";
          ??}
          ??String[]?prefix?=?ipPreference.split("[>?]+");
          ??try?{
          ???Pattern?pattern?=?Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");
          ???Enumeration?interfaces?=?NetworkInterface.getNetworkInterfaces();
          ???String?matchedIp?=?null;
          ???int?matchedIdx?=?-1;
          ???while?(interfaces.hasMoreElements())?{
          ????NetworkInterface?ni?=?interfaces.nextElement();
          ????Enumeration?en?=?ni.getInetAddresses();?
          ????while?(en.hasMoreElements())?{
          ?????InetAddress?addr?=?en.nextElement();
          ?????String?ip?=?addr.getHostAddress();??
          ?????Matcher?matcher?=?pattern.matcher(ip);
          ?????if?(matcher.matches())?{??
          ??????int?idx?=?matchedIndex(ip,?prefix);
          ??????if(idx?==?-1)?continue;
          ??????if(matchedIdx?==?-1){
          ???????matchedIdx?=?idx;
          ???????matchedIp?=?ip;
          ??????}?else?{
          ???????if(matchedIdx>idx){
          ????????matchedIdx?=?idx;
          ????????matchedIp?=?ip;
          ???????}
          ??????}
          ?????}?
          ????}?
          ???}?
          ???if(matchedIp?!=?null)?return?matchedIp;
          ???return?"127.0.0.1";
          ??}?catch?(Exception?e)?{?
          ???return?"127.0.0.1";
          ??}
          ?}
          ?
          ?public?static?String?getLocalIp()?{
          ??return?getLocalIp("*>10>172>192>127");
          ?}
          ?
          ?public?static?String?remoteAddress(SocketChannel?channel){
          ??SocketAddress?addr?=?channel.socket().getRemoteSocketAddress();
          ??String?res?=?String.format("%s",?addr);
          ??return?res;
          ?}
          ?
          ?public?static?String?localAddress(SocketChannel?channel){
          ??SocketAddress?addr?=?channel.socket().getLocalSocketAddress();
          ??String?res?=?String.format("%s",?addr);
          ??return?addr==null??res:?res.substring(1);
          ?}
          ?
          ?public?static?String?getPid(){
          ??RuntimeMXBean?runtime?=?ManagementFactory.getRuntimeMXBean();
          ????????String?name?=?runtime.getName();
          ????????int?index?=?name.indexOf("@");
          ????????if?(index?!=?-1)?{
          ????????????return?name.substring(0,?index);
          ????????}
          ??return?null;
          ?}
          ?
          ?public?static?String?getLocalHostName()?{
          ????????try?{
          ????????????return?(InetAddress.getLocalHost()).getHostName();
          ????????}?catch?(UnknownHostException?uhe)?{
          ????????????String?host?=?uhe.getMessage();
          ????????????if?(host?!=?null)?{
          ????????????????int?colon?=?host.indexOf(':');
          ????????????????if?(colon?>?0)?{
          ????????????????????return?host.substring(0,?colon);
          ????????????????}
          ????????????}
          ????????????return?"UnknownHost";
          ????????}
          ????}
          }
          動項目,訪問/index/ero接口,可以看到項目中生成了app-collector.logerror-collector.log兩個日志文件:
          我們將Springboot服務部署在192.168.11.31這臺機器上。推薦一個 Spring Boot 基礎教程及實戰(zhàn)示例:https://github.com/javastacks/spring-boot-best-practice

          Kafka安裝和啟用

          kafka下載地址:
          http://kafka.apache.org/downloads.html
          kafka安裝步驟:首先kafka安裝需要依賴與zookeeper,所以小伙伴們先準備好zookeeper環(huán)境(三個節(jié)點即可),然后我們來一起構建kafka broker。

          ##?解壓命令:
          tar?-zxvf?kafka_2.12-2.1.0.tgz?-C?/usr/local/
          ##?改名命令:
          mv?kafka_2.12-2.1.0/?kafka_2.12
          ##?進入解壓后的目錄,修改server.properties文件:
          vim?/usr/local/kafka_2.12/config/server.properties
          ##?修改配置:
          broker.id=0
          port=9092
          host.name=192.168.11.51
          advertised.host.name=192.168.11.51
          log.dirs=/usr/local/kafka_2.12/kafka-logs
          num.partitions=2
          zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181

          ##?建立日志文件夾:
          mkdir?/usr/local/kafka_2.12/kafka-logs

          ##啟動kafka:
          /usr/local/kafka_2.12/bin/kafka-server-start.sh?/usr/local/kafka_2.12/config/server.properties?&

          創(chuàng)建兩個topic

          ##?創(chuàng)建topic
          kafka-topics.sh?--zookeeper?192.168.11.111:2181?--create?--topic?app-log-collector?--partitions?1??--replication-factor?1
          kafka-topics.sh?--zookeeper?192.168.11.111:2181?--create?--topic?error-log-collector?--partitions?1??--replication-factor?1?

          我們可以查看一下topic情況

          kafka-topics.sh?--zookeeper?192.168.11.111:2181?--topic?app-log-test?--describe

          可以看到已經(jīng)成功啟用了app-log-collectorerror-log-collector兩個topic

          filebeat安裝和啟用

          filebeat下載

          cd?/usr/local/software
          tar?-zxvf?filebeat-6.6.0-linux-x86_64.tar.gz?-C?/usr/local/
          cd?/usr/local
          mv?filebeat-6.6.0-linux-x86_64/?filebeat-6.6.0

          配置filebeat,可以參考下方y(tǒng)ml配置文件

          vim?/usr/local/filebeat-5.6.2/filebeat.yml
          ######################?Filebeat?Configuration?Example?#########################
          filebeat.prospectors:

          -?input_type:?log

          ??paths:
          ????##?app-服務名稱.log,?為什么寫死,防止發(fā)生輪轉抓取歷史數(shù)據(jù)
          ????-?/usr/local/logs/app-collector.log
          ??#定義寫入?ES?時的?_type?值
          ??document_type:?"app-log"
          ??multiline:
          ????#pattern:?'^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'???#?指定匹配的表達式(匹配以?2017-11-15?08:04:23:889?時間格式開頭的字符串)
          ????pattern:?'^\['??????????????????????????????#?指定匹配的表達式(匹配以?"{?開頭的字符串)
          ????negate:?true????????????????????????????????#?是否匹配到
          ????match:?after????????????????????????????????#?合并到上一行的末尾
          ????max_lines:?2000?????????????????????????????#?最大的行數(shù)
          ????timeout:?2s?????????????????????????????????#?如果在規(guī)定時間沒有新的日志事件就不等待后面的日志
          ??fields:
          ????logbiz:?collector
          ????logtopic:?app-log-collector???##?按服務劃分用作kafka?topic
          ????evn:?dev

          -?input_type:?log

          ??paths:
          ????-?/usr/local/logs/error-collector.log
          ??document_type:?"error-log"
          ??multiline:
          ????#pattern:?'^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'???#?指定匹配的表達式(匹配以?2017-11-15?08:04:23:889?時間格式開頭的字符串)
          ????pattern:?'^\['??????????????????????????????#?指定匹配的表達式(匹配以?"{?開頭的字符串)
          ????negate:?true????????????????????????????????#?是否匹配到
          ????match:?after????????????????????????????????#?合并到上一行的末尾
          ????max_lines:?2000?????????????????????????????#?最大的行數(shù)
          ????timeout:?2s?????????????????????????????????#?如果在規(guī)定時間沒有新的日志事件就不等待后面的日志
          ??fields:
          ????logbiz:?collector
          ????logtopic:?error-log-collector???##?按服務劃分用作kafka?topic
          ????evn:?dev
          ????
          output.kafka:
          ??enabled:?true
          ??hosts:?["192.168.11.51:9092"]
          ??topic:?'%{[fields.logtopic]}'
          ??partition.hash:
          ????reachable_only:?true
          ??compression:?gzip
          ??max_message_bytes:?1000000
          ??required_acks:?1
          logging.to_files:?true

          filebeat啟動:
          檢查配置是否正確

          cd?/usr/local/filebeat-6.6.0
          ./filebeat?-c?filebeat.yml?-configtest
          ##?Config?OK

          啟動filebeat

          /usr/local/filebeat-6.6.0/filebeat?&

          檢查是否啟動成功

          ps?-ef?|?grep?filebeat

          可以看到filebeat已經(jīng)啟動成功
          然后我們訪問192.168.11.31:8001/index和192.168.11.31:8001/err,再查看kafka的logs文件,可以看到已經(jīng)生成了app-log-collector-0和error-log-collector-0文件,說明filebeat已經(jīng)幫我們把數(shù)據(jù)收集好放到了kafka上。

          logstash安裝

          我們在logstash的安裝目錄下新建一個文件夾

          mkdir?scrpit

          然后cd進該文件,創(chuàng)建一個logstash-script.conf文件

          cd?scrpit
          vim?logstash-script.conf
          ## multiline 插件也可以用于其他類似的堆棧式信息,比如 linux 的內(nèi)核日志。
          input?{
          ??kafka?{
          ????##?app-log-服務名稱
          ????topics_pattern?=>?"app-log-.*"
          ????bootstrap_servers?=>?"192.168.11.51:9092"
          ?codec?=>?json
          ?consumer_threads?=>?1?##?增加consumer的并行消費線程數(shù)
          ?decorate_events?=>?true
          ????#auto_offset_rest?=>?"latest"
          ?group_id?=>?"app-log-group"
          ???}
          ???
          ???kafka?{
          ????##?error-log-服務名稱
          ????topics_pattern?=>?"error-log-.*"
          ????bootstrap_servers?=>?"192.168.11.51:9092"
          ?codec?=>?json
          ?consumer_threads?=>?1
          ?decorate_events?=>?true
          ????#auto_offset_rest?=>?"latest"
          ?group_id?=>?"error-log-group"
          ???}
          ???
          }

          filter?{
          ??
          ??##?時區(qū)轉換
          ??ruby?{
          ?code?=>?"event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
          ??}

          ??if?"app-log"?in?[fields][logtopic]{
          ????grok?{
          ????????##?表達式,這里對應的是Springboot輸出的日志格式
          ????????match?=>?["message",?"\[%{NOTSPACE:currentDateTime}\]?\[%{NOTSPACE:level}\]?\[%{NOTSPACE:thread-id}\]?\[%{NOTSPACE:class}\]?\[%{DATA:hostName}\]?\[%{DATA:ip}\]?\[%{DATA:applicationName}\]?\[%{DATA:location}\]?\[%{DATA:messageInfo}\]?##?(\'\'|%{QUOTEDSTRING:throwable})"]
          ????}
          ??}

          ??if?"error-log"?in?[fields][logtopic]{
          ????grok?{
          ????????##?表達式
          ????????match?=>?["message",?"\[%{NOTSPACE:currentDateTime}\]?\[%{NOTSPACE:level}\]?\[%{NOTSPACE:thread-id}\]?\[%{NOTSPACE:class}\]?\[%{DATA:hostName}\]?\[%{DATA:ip}\]?\[%{DATA:applicationName}\]?\[%{DATA:location}\]?\[%{DATA:messageInfo}\]?##?(\'\'|%{QUOTEDSTRING:throwable})"]
          ????}
          ??}
          ??
          }

          ##?測試輸出到控制臺:
          output?{
          ??stdout?{?codec?=>?rubydebug?}
          }


          ## elasticsearch:
          output?{

          ??if?"app-log"?in?[fields][logtopic]{
          ?##?es插件
          ?elasticsearch?{
          ???????#?es服務地址
          ????????hosts?=>?["192.168.11.35:9200"]
          ????????#?用戶名密碼??????
          ????????user?=>?"elastic"
          ????????password?=>?"123456"
          ????????##?索引名,+?號開頭的,就會自動認為后面是時間格式:
          ????????##?javalog-app-service-2019.01.23?
          ????????index?=>?"app-log-%{[fields][logbiz]}-%{index_time}"
          ????????#?是否嗅探集群ip:一般設置true;http://192.168.11.35:9200/_nodes/http?pretty
          ????????#?通過嗅探機制進行es集群負載均衡發(fā)日志消息
          ????????sniffing?=>?true
          ????????#?logstash默認自帶一個mapping模板,進行模板覆蓋
          ????????template_overwrite?=>?true
          ????}?
          ??}
          ??
          ??if?"error-log"?in?[fields][logtopic]{
          ?elasticsearch?{
          ????????hosts?=>?["192.168.11.35:9200"]????
          ????????user?=>?"elastic"
          ????????password?=>?"123456"
          ????????index?=>?"error-log-%{[fields][logbiz]}-%{index_time}"
          ????????sniffing?=>?true
          ????????template_overwrite?=>?true
          ????}?
          ??}
          ??

          }

          啟動logstash

          /usr/local/logstash-6.6.0/bin/logstash?-f?/usr/local/logstash-6.6.0/script/logstash-script.conf?&

          等待啟動成功,我們再次訪問192.168.11.31:8001/err
          可以看到控制臺開始打印日志

          ElasticSearch與Kibana

          ES和Kibana的搭建之前沒寫過博客,網(wǎng)上資料也比較多,大家可以自行搜索。
          搭建完成后,訪問Kibana的管理頁面192.168.11.35:5601,選擇Management -> Kinaba - Index Patterns
          然后Create index pattern
          • index pattern 輸入?app-log-*
          • Time Filter field name 選擇 currentDateTime
          這樣我們就成功創(chuàng)建了索引。
          我們再次訪問192.168.11.31:8001/err,這個時候就可以看到我們已經(jīng)命中了一條log信息
          里面展示了日志的全量信息
          到這里,我們完整的日志收集及可視化就搭建完成了!

          推薦閱讀:

          稚暉君自制機械臂,能給葡萄縫針的那種,成本1萬塊,網(wǎng)友:能把腦子開源一下?

          解決支付訂單,重復提交問題!

          最近面試BAT,整理一份面試資料《Java面試BATJ通關手冊》,覆蓋了Java核心技術、JVM、Java并發(fā)、SSM、微服務、數(shù)據(jù)庫、數(shù)據(jù)結構等等。

          獲取方式:點個「在看」,點擊上方小卡片,進入公眾號后回復「面試題」領取,更多內(nèi)容陸續(xù)奉上。

          朕已閱?

          瀏覽 60
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  免费无码做爱视频 | 国产变态视频a片一二三 | 91啦国产 | 国产免费淫秽视频 | 色天堂视频在线观看 |