<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          SpringBoot+Kafka+ELK 完成海量日志收集(超詳細(xì))

          共 10759字,需瀏覽 22分鐘

           ·

          2021-11-11 21:06

          來(lái)源:jiandansuifeng.blog.csdn.net/

          article/details/107361190

          整體流程大概如下:

          服務(wù)器準(zhǔn)備

          在這先列出各服務(wù)器節(jié)點(diǎn),方便同學(xué)們?cè)谙挛闹袑?duì)照節(jié)點(diǎn)查看相應(yīng)內(nèi)容

          SpringBoot項(xiàng)目準(zhǔn)備

          引入log4j2替換SpringBoot默認(rèn)log,demo項(xiàng)目結(jié)構(gòu)如下:

          pom

          <dependencies>
          ????<dependency>
          ????????<groupId>org.springframework.bootgroupId>
          ????????<artifactId>spring-boot-starter-webartifactId>
          ????????
          ????????<exclusions>
          ????????????<exclusion>
          ????????????????<groupId>org.springframework.bootgroupId>
          ????????????????<artifactId>spring-boot-starter-loggingartifactId>
          ????????????exclusion>
          ????????exclusions>
          ????dependency>?
          ?
          ?<dependency>
          ?????<groupId>org.springframework.bootgroupId>
          ?????<artifactId>spring-boot-starter-log4j2artifactId>
          ?dependency>?
          ???<dependency>
          ?????<groupId>com.lmaxgroupId>
          ?????<artifactId>disruptorartifactId>
          ?????<version>3.3.4version>
          ???dependency>?
          dependencies>?

          log4j2.xml


          <Configuration?status="INFO"?schema="Log4J-V2.0.xsd"?monitorInterval="600"?>
          ????<Properties>
          ????????<Property?name="LOG_HOME">logsProperty>
          ????????<property?name="FILE_NAME">collectorproperty>
          ????????<property?name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}]?[%level{length=5}]?[%thread-%tid]?[%logger]?[%X{hostName}]?[%X{ip}]?[%X{applicationName}]?[%F,%L,%C,%M]?[%m]?##?'%ex'%nproperty>
          ????Properties>
          ????<Appenders>
          ????????<Console?name="CONSOLE"?target="SYSTEM_OUT">
          ????????????<PatternLayout?pattern="${patternLayout}"/>
          ????????Console>??
          ????????<RollingRandomAccessFile?name="appAppender"?fileName="${LOG_HOME}/app-${FILE_NAME}.log"?filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log"?>
          ??????????<PatternLayout?pattern="${patternLayout}"?/>
          ??????????<Policies>
          ??????????????<TimeBasedTriggeringPolicy?interval="1"/>
          ??????????????<SizeBasedTriggeringPolicy?size="500MB"/>
          ??????????Policies>
          ??????????<DefaultRolloverStrategy?max="20"/>?????????
          ????????RollingRandomAccessFile>
          ????????<RollingRandomAccessFile?name="errorAppender"?fileName="${LOG_HOME}/error-${FILE_NAME}.log"?filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log"?>
          ??????????<PatternLayout?pattern="${patternLayout}"?/>
          ??????????<Filters>
          ??????????????<ThresholdFilter?level="warn"?onMatch="ACCEPT"?onMismatch="DENY"/>
          ??????????Filters>??????????????
          ??????????<Policies>
          ??????????????<TimeBasedTriggeringPolicy?interval="1"/>
          ??????????????<SizeBasedTriggeringPolicy?size="500MB"/>
          ??????????Policies>
          ??????????<DefaultRolloverStrategy?max="20"/>?????????
          ????????RollingRandomAccessFile>????????????
          ????Appenders>
          ????<Loggers>
          ????????
          ????????<AsyncLogger?name="com.bfxy.*"?level="info"?includeLocation="true">
          ??????????<AppenderRef?ref="appAppender"/>
          ????????AsyncLogger>
          ????????<AsyncLogger?name="com.bfxy.*"?level="info"?includeLocation="true">
          ??????????<AppenderRef?ref="errorAppender"/>
          ????????AsyncLogger>???????
          ????????<Root?level="info">
          ????????????<Appender-Ref?ref="CONSOLE"/>
          ????????????<Appender-Ref?ref="appAppender"/>
          ????????????<AppenderRef?ref="errorAppender"/>
          ????????Root>?????????
          ????Loggers>
          Configuration>

          IndexController

          測(cè)試Controller,用以打印日志進(jìn)行調(diào)試

          @Slf4j
          @RestController
          public?class?IndexController?{

          ?@RequestMapping(value?=?"/index")
          ?public?String?index()?{
          ??InputMDC.putMDC();
          ??
          ??log.info("我是一條info日志");
          ??
          ??log.warn("我是一條warn日志");

          ??log.error("我是一條error日志");
          ??
          ??return?"idx";
          ?}


          ?@RequestMapping(value?=?"/err")
          ?public?String?err()?{
          ??InputMDC.putMDC();
          ??try?{
          ???int?a?=?1/0;
          ??}?catch?(Exception?e)?{
          ???log.error("算術(shù)異常",?e);
          ??}
          ??return?"err";
          ?}
          ?
          }

          InputMDC

          用以獲取log中的[%X{hostName}][%X{ip}][%X{applicationName}]三個(gè)字段值

          @Component
          public?class?InputMDC?implements?EnvironmentAware?{

          ?private?static?Environment?environment;
          ?
          ?@Override
          ?public?void?setEnvironment(Environment?environment)?{
          ??InputMDC.environment?=?environment;
          ?}
          ?
          ?public?static?void?putMDC()?{
          ??MDC.put("hostName",?NetUtil.getLocalHostName());
          ??MDC.put("ip",?NetUtil.getLocalIp());
          ??MDC.put("applicationName",?environment.getProperty("spring.application.name"));
          ?}

          }

          NetUtil

          public?class?NetUtil?{???
          ?
          ?public?static?String?normalizeAddress(String?address){
          ??String[]?blocks?=?address.split("[:]");
          ??if(blocks.length?>?2){
          ???throw?new?IllegalArgumentException(address?+?"?is?invalid");
          ??}
          ??String?host?=?blocks[0];
          ??int?port?=?80;
          ??if(blocks.length?>?1){
          ???port?=?Integer.valueOf(blocks[1]);
          ??}?else?{
          ???address?+=?":"+port;?//use?default?80
          ??}?
          ??String?serverAddr?=?String.format("%s:%d",?host,?port);
          ??return?serverAddr;
          ?}
          ?
          ?public?static?String?getLocalAddress(String?address){
          ??String[]?blocks?=?address.split("[:]");
          ??if(blocks.length?!=?2){
          ???throw?new?IllegalArgumentException(address?+?"?is?invalid?address");
          ??}?
          ??String?host?=?blocks[0];
          ??int?port?=?Integer.valueOf(blocks[1]);
          ??
          ??if("0.0.0.0".equals(host)){
          ???return?String.format("%s:%d",NetUtil.getLocalIp(),?port);
          ??}
          ??return?address;
          ?}
          ?
          ?private?static?int?matchedIndex(String?ip,?String[]?prefix){
          ??for(int?i=0;?i???String?p?=?prefix[i];
          ???if("*".equals(p)){?//*,?assumed?to?be?IP
          ????if(ip.startsWith("127.")?||
          ???????ip.startsWith("10.")?||?
          ???????ip.startsWith("172.")?||
          ???????ip.startsWith("192.")){
          ?????continue;
          ????}
          ????return?i;
          ???}?else?{
          ????if(ip.startsWith(p)){
          ?????return?i;
          ????}
          ???}?
          ??}
          ??
          ??return?-1;
          ?}
          ?
          ?public?static?String?getLocalIp(String?ipPreference)?{
          ??if(ipPreference?==?null){
          ???ipPreference?=?"*>10>172>192>127";
          ??}
          ??String[]?prefix?=?ipPreference.split("[>?]+");
          ??try?{
          ???Pattern?pattern?=?Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");
          ???Enumeration?interfaces?=?NetworkInterface.getNetworkInterfaces();
          ???String?matchedIp?=?null;
          ???int?matchedIdx?=?-1;
          ???while?(interfaces.hasMoreElements())?{
          ????NetworkInterface?ni?=?interfaces.nextElement();
          ????Enumeration?en?=?ni.getInetAddresses();?
          ????while?(en.hasMoreElements())?{
          ?????InetAddress?addr?=?en.nextElement();
          ?????String?ip?=?addr.getHostAddress();??
          ?????Matcher?matcher?=?pattern.matcher(ip);
          ?????if?(matcher.matches())?{??
          ??????int?idx?=?matchedIndex(ip,?prefix);
          ??????if(idx?==?-1)?continue;
          ??????if(matchedIdx?==?-1){
          ???????matchedIdx?=?idx;
          ???????matchedIp?=?ip;
          ??????}?else?{
          ???????if(matchedIdx>idx){
          ????????matchedIdx?=?idx;
          ????????matchedIp?=?ip;
          ???????}
          ??????}
          ?????}?
          ????}?
          ???}?
          ???if(matchedIp?!=?null)?return?matchedIp;
          ???return?"127.0.0.1";
          ??}?catch?(Exception?e)?{?
          ???return?"127.0.0.1";
          ??}
          ?}
          ?
          ?public?static?String?getLocalIp()?{
          ??return?getLocalIp("*>10>172>192>127");
          ?}
          ?
          ?public?static?String?remoteAddress(SocketChannel?channel){
          ??SocketAddress?addr?=?channel.socket().getRemoteSocketAddress();
          ??String?res?=?String.format("%s",?addr);
          ??return?res;
          ?}
          ?
          ?public?static?String?localAddress(SocketChannel?channel){
          ??SocketAddress?addr?=?channel.socket().getLocalSocketAddress();
          ??String?res?=?String.format("%s",?addr);
          ??return?addr==null??res:?res.substring(1);
          ?}
          ?
          ?public?static?String?getPid(){
          ??RuntimeMXBean?runtime?=?ManagementFactory.getRuntimeMXBean();
          ????????String?name?=?runtime.getName();
          ????????int?index?=?name.indexOf("@");
          ????????if?(index?!=?-1)?{
          ????????????return?name.substring(0,?index);
          ????????}
          ??return?null;
          ?}
          ?
          ?public?static?String?getLocalHostName()?{
          ????????try?{
          ????????????return?(InetAddress.getLocalHost()).getHostName();
          ????????}?catch?(UnknownHostException?uhe)?{
          ????????????String?host?=?uhe.getMessage();
          ????????????if?(host?!=?null)?{
          ????????????????int?colon?=?host.indexOf(':');
          ????????????????if?(colon?>?0)?{
          ????????????????????return?host.substring(0,?colon);
          ????????????????}
          ????????????}
          ????????????return?"UnknownHost";
          ????????}
          ????}
          }

          啟動(dòng)項(xiàng)目,訪問(wèn)/index/ero接口,可以看到項(xiàng)目中生成了app-collector.logerror-collector.log兩個(gè)日志文件

          我們將Springboot服務(wù)部署在192.168.11.31這臺(tái)機(jī)器上。

          Kafka安裝和啟用

          kafka下載地址:

          http://kafka.apache.org/downloads.html

          kafka安裝步驟:首先kafka安裝需要依賴與zookeeper,所以小伙伴們先準(zhǔn)備好zookeeper環(huán)境(三個(gè)節(jié)點(diǎn)即可),然后我們來(lái)一起構(gòu)建kafka broker。

          ##?解壓命令:
          tar?-zxvf?kafka_2.12-2.1.0.tgz?-C?/usr/local/
          ##?改名命令:
          mv?kafka_2.12-2.1.0/?kafka_2.12
          ##?進(jìn)入解壓后的目錄,修改server.properties文件:
          vim?/usr/local/kafka_2.12/config/server.properties
          ##?修改配置:
          broker.id=0
          port=9092
          host.name=192.168.11.51
          advertised.host.name=192.168.11.51
          log.dirs=/usr/local/kafka_2.12/kafka-logs
          num.partitions=2
          zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181

          ##?建立日志文件夾:
          mkdir?/usr/local/kafka_2.12/kafka-logs

          ##啟動(dòng)kafka:
          /usr/local/kafka_2.12/bin/kafka-server-start.sh?/usr/local/kafka_2.12/config/server.properties?&

          創(chuàng)建兩個(gè)topic

          ##?創(chuàng)建topic
          kafka-topics.sh?--zookeeper?192.168.11.111:2181?--create?--topic?app-log-collector?--partitions?1??--replication-factor?1
          kafka-topics.sh?--zookeeper?192.168.11.111:2181?--create?--topic?error-log-collector?--partitions?1??--replication-factor?1?

          我們可以查看一下topic情況

          kafka-topics.sh?--zookeeper?192.168.11.111:2181?--topic?app-log-test?--describe

          可以看到已經(jīng)成功啟用了app-log-collectorerror-log-collector兩個(gè)topic

          filebeat安裝和啟用

          filebeat下載

          cd?/usr/local/software
          tar?-zxvf?filebeat-6.6.0-linux-x86_64.tar.gz?-C?/usr/local/
          cd?/usr/local
          mv?filebeat-6.6.0-linux-x86_64/?filebeat-6.6.0

          配置filebeat,可以參考下方y(tǒng)ml配置文件

          vim?/usr/local/filebeat-5.6.2/filebeat.yml
          ######################?Filebeat?Configuration?Example?#########################
          filebeat.prospectors:

          -?input_type:?log

          ??paths:
          ????##?app-服務(wù)名稱.log,?為什么寫死,防止發(fā)生輪轉(zhuǎn)抓取歷史數(shù)據(jù)
          ????-?/usr/local/logs/app-collector.log
          ??#定義寫入?ES?時(shí)的?_type?值
          ??document_type:?"app-log"
          ??multiline:
          ????#pattern:?'^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'???#?指定匹配的表達(dá)式(匹配以?2017-11-15?08:04:23:889?時(shí)間格式開頭的字符串)
          ????pattern:?'^\['??????????????????????????????#?指定匹配的表達(dá)式(匹配以?"{?開頭的字符串)
          ????negate:?true????????????????????????????????#?是否匹配到
          ????match:?after????????????????????????????????#?合并到上一行的末尾
          ????max_lines:?2000?????????????????????????????#?最大的行數(shù)
          ????timeout:?2s?????????????????????????????????#?如果在規(guī)定時(shí)間沒(méi)有新的日志事件就不等待后面的日志
          ??fields:
          ????logbiz:?collector
          ????logtopic:?app-log-collector???##?按服務(wù)劃分用作kafka?topic
          ????evn:?dev

          -?input_type:?log

          ??paths:
          ????-?/usr/local/logs/error-collector.log
          ??document_type:?"error-log"
          ??multiline:
          ????#pattern:?'^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'???#?指定匹配的表達(dá)式(匹配以?2017-11-15?08:04:23:889?時(shí)間格式開頭的字符串)
          ????pattern:?'^\['??????????????????????????????#?指定匹配的表達(dá)式(匹配以?"{?開頭的字符串)
          ????negate:?true????????????????????????????????#?是否匹配到
          ????match:?after????????????????????????????????#?合并到上一行的末尾
          ????max_lines:?2000?????????????????????????????#?最大的行數(shù)
          ????timeout:?2s?????????????????????????????????#?如果在規(guī)定時(shí)間沒(méi)有新的日志事件就不等待后面的日志
          ??fields:
          ????logbiz:?collector
          ????logtopic:?error-log-collector???##?按服務(wù)劃分用作kafka?topic
          ????evn:?dev
          ????
          output.kafka:
          ??enabled:?true
          ??hosts:?["192.168.11.51:9092"]
          ??topic:?'%{[fields.logtopic]}'
          ??partition.hash:
          ????reachable_only:?true
          ??compression:?gzip
          ??max_message_bytes:?1000000
          ??required_acks:?1
          logging.to_files:?true

          filebeat啟動(dòng):

          檢查配置是否正確

          cd?/usr/local/filebeat-6.6.0
          ./filebeat?-c?filebeat.yml?-configtest
          ##?Config?OK

          啟動(dòng)filebeat

          /usr/local/filebeat-6.6.0/filebeat?&

          檢查是否啟動(dòng)成功

          ps?-ef?|?grep?filebeat

          可以看到filebeat已經(jīng)啟動(dòng)成功

          然后我們?cè)L問(wèn)192.168.11.31:8001/index和192.168.11.31:8001/err,再查看kafka的logs文件,可以看到已經(jīng)生成了app-log-collector-0和error-log-collector-0文件,說(shuō)明filebeat已經(jīng)幫我們把數(shù)據(jù)收集好放到了kafka上。

          logstash安裝

          我們?cè)趌ogstash的安裝目錄下新建一個(gè)文件夾

          mkdir?scrpit

          然后cd進(jìn)該文件,創(chuàng)建一個(gè)logstash-script.conf文件

          cd?scrpit
          vim?logstash-script.conf
          ## multiline 插件也可以用于其他類似的堆棧式信息,比如 linux 的內(nèi)核日志。
          input?{
          ??kafka?{
          ????##?app-log-服務(wù)名稱
          ????topics_pattern?=>?"app-log-.*"
          ????bootstrap_servers?=>?"192.168.11.51:9092"
          ?codec?=>?json
          ?consumer_threads?=>?1?##?增加consumer的并行消費(fèi)線程數(shù)
          ?decorate_events?=>?true
          ????#auto_offset_rest?=>?"latest"
          ?group_id?=>?"app-log-group"
          ???}
          ???
          ???kafka?{
          ????##?error-log-服務(wù)名稱
          ????topics_pattern?=>?"error-log-.*"
          ????bootstrap_servers?=>?"192.168.11.51:9092"
          ?codec?=>?json
          ?consumer_threads?=>?1
          ?decorate_events?=>?true
          ????#auto_offset_rest?=>?"latest"
          ?group_id?=>?"error-log-group"
          ???}
          ???
          }

          filter?{
          ??
          ??##?時(shí)區(qū)轉(zhuǎn)換
          ??ruby?{
          ?code?=>?"event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
          ??}

          ??if?"app-log"?in?[fields][logtopic]{
          ????grok?{
          ????????##?表達(dá)式,這里對(duì)應(yīng)的是Springboot輸出的日志格式
          ????????match?=>?["message",?"\[%{NOTSPACE:currentDateTime}\]?\[%{NOTSPACE:level}\]?\[%{NOTSPACE:thread-id}\]?\[%{NOTSPACE:class}\]?\[%{DATA:hostName}\]?\[%{DATA:ip}\]?\[%{DATA:applicationName}\]?\[%{DATA:location}\]?\[%{DATA:messageInfo}\]?##?(\'\'|%{QUOTEDSTRING:throwable})"]
          ????}
          ??}

          ??if?"error-log"?in?[fields][logtopic]{
          ????grok?{
          ????????##?表達(dá)式
          ????????match?=>?["message",?"\[%{NOTSPACE:currentDateTime}\]?\[%{NOTSPACE:level}\]?\[%{NOTSPACE:thread-id}\]?\[%{NOTSPACE:class}\]?\[%{DATA:hostName}\]?\[%{DATA:ip}\]?\[%{DATA:applicationName}\]?\[%{DATA:location}\]?\[%{DATA:messageInfo}\]?##?(\'\'|%{QUOTEDSTRING:throwable})"]
          ????}
          ??}
          ??
          }

          ##?測(cè)試輸出到控制臺(tái):
          output?{
          ??stdout?{?codec?=>?rubydebug?}
          }


          ## elasticsearch:
          output?{

          ??if?"app-log"?in?[fields][logtopic]{
          ?##?es插件
          ?elasticsearch?{
          ???????#?es服務(wù)地址
          ????????hosts?=>?["192.168.11.35:9200"]
          ????????#?用戶名密碼??????
          ????????user?=>?"elastic"
          ????????password?=>?"123456"
          ????????##?索引名,+?號(hào)開頭的,就會(huì)自動(dòng)認(rèn)為后面是時(shí)間格式:
          ????????##?javalog-app-service-2019.01.23?
          ????????index?=>?"app-log-%{[fields][logbiz]}-%{index_time}"
          ????????#?是否嗅探集群ip:一般設(shè)置true;http://192.168.11.35:9200/_nodes/http?pretty
          ????????#?通過(guò)嗅探機(jī)制進(jìn)行es集群負(fù)載均衡發(fā)日志消息
          ????????sniffing?=>?true
          ????????#?logstash默認(rèn)自帶一個(gè)mapping模板,進(jìn)行模板覆蓋
          ????????template_overwrite?=>?true
          ????}?
          ??}
          ??
          ??if?"error-log"?in?[fields][logtopic]{
          ?elasticsearch?{
          ????????hosts?=>?["192.168.11.35:9200"]????
          ????????user?=>?"elastic"
          ????????password?=>?"123456"
          ????????index?=>?"error-log-%{[fields][logbiz]}-%{index_time}"
          ????????sniffing?=>?true
          ????????template_overwrite?=>?true
          ????}?
          ??}
          ??

          }

          啟動(dòng)logstash

          /usr/local/logstash-6.6.0/bin/logstash?-f?/usr/local/logstash-6.6.0/script/logstash-script.conf?&

          等待啟動(dòng)成功,我們?cè)俅卧L問(wèn)192.168.11.31:8001/err

          可以看到控制臺(tái)開始打印日志

          ElasticSearch與Kibana

          ES和Kibana的搭建之前沒(méi)寫過(guò)博客,網(wǎng)上資料也比較多,大家可以自行搜索。

          搭建完成后,訪問(wèn)Kibana的管理頁(yè)面192.168.11.35:5601,選擇Management -> Kinaba - Index Patterns

          然后Create index pattern

          • index pattern 輸入?app-log-*
          • Time Filter field name 選擇 currentDateTime

          這樣我們就成功創(chuàng)建了索引。

          我們?cè)俅卧L問(wèn)192.168.11.31:8001/err,這個(gè)時(shí)候就可以看到我們已經(jīng)命中了一條log信息

          里面展示了日志的全量信息

          到這里,我們完整的日志收集及可視化就搭建完成了!


          瀏覽 46
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  成人91av视频在线看 | 永久免费无人区一区 | 黄片网站视频 | 无码秘 人妻一区红中av漫画 | 亚洲第一综合区 |