SpringBoot+Kafka+ELK 完成海量日志收集(超詳細(xì))
來(lái)源:jiandansuifeng.blog.csdn.net/
article/details/107361190
整體流程大概如下:

服務(wù)器準(zhǔn)備
在這先列出各服務(wù)器節(jié)點(diǎn),方便同學(xué)們?cè)谙挛闹袑?duì)照節(jié)點(diǎn)查看相應(yīng)內(nèi)容

SpringBoot項(xiàng)目準(zhǔn)備
引入log4j2替換SpringBoot默認(rèn)log,demo項(xiàng)目結(jié)構(gòu)如下:

pom
<dependencies>
????<dependency>
????????<groupId>org.springframework.bootgroupId>
????????<artifactId>spring-boot-starter-webartifactId>
????????
????????<exclusions>
????????????<exclusion>
????????????????<groupId>org.springframework.bootgroupId>
????????????????<artifactId>spring-boot-starter-loggingartifactId>
????????????exclusion>
????????exclusions>
????dependency>?
?
?<dependency>
?????<groupId>org.springframework.bootgroupId>
?????<artifactId>spring-boot-starter-log4j2artifactId>
?dependency>?
???<dependency>
?????<groupId>com.lmaxgroupId>
?????<artifactId>disruptorartifactId>
?????<version>3.3.4version>
???dependency>?
dependencies>?
log4j2.xml
<Configuration?status="INFO"?schema="Log4J-V2.0.xsd"?monitorInterval="600"?>
????<Properties>
????????<Property?name="LOG_HOME">logsProperty>
????????<property?name="FILE_NAME">collectorproperty>
????????<property?name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}]?[%level{length=5}]?[%thread-%tid]?[%logger]?[%X{hostName}]?[%X{ip}]?[%X{applicationName}]?[%F,%L,%C,%M]?[%m]?##?'%ex'%nproperty>
????Properties>
????<Appenders>
????????<Console?name="CONSOLE"?target="SYSTEM_OUT">
????????????<PatternLayout?pattern="${patternLayout}"/>
????????Console>??
????????<RollingRandomAccessFile?name="appAppender"?fileName="${LOG_HOME}/app-${FILE_NAME}.log"?filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log"?>
??????????<PatternLayout?pattern="${patternLayout}"?/>
??????????<Policies>
??????????????<TimeBasedTriggeringPolicy?interval="1"/>
??????????????<SizeBasedTriggeringPolicy?size="500MB"/>
??????????Policies>
??????????<DefaultRolloverStrategy?max="20"/>?????????
????????RollingRandomAccessFile>
????????<RollingRandomAccessFile?name="errorAppender"?fileName="${LOG_HOME}/error-${FILE_NAME}.log"?filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log"?>
??????????<PatternLayout?pattern="${patternLayout}"?/>
??????????<Filters>
??????????????<ThresholdFilter?level="warn"?onMatch="ACCEPT"?onMismatch="DENY"/>
??????????Filters>??????????????
??????????<Policies>
??????????????<TimeBasedTriggeringPolicy?interval="1"/>
??????????????<SizeBasedTriggeringPolicy?size="500MB"/>
??????????Policies>
??????????<DefaultRolloverStrategy?max="20"/>?????????
????????RollingRandomAccessFile>????????????
????Appenders>
????<Loggers>
????????
????????<AsyncLogger?name="com.bfxy.*"?level="info"?includeLocation="true">
??????????<AppenderRef?ref="appAppender"/>
????????AsyncLogger>
????????<AsyncLogger?name="com.bfxy.*"?level="info"?includeLocation="true">
??????????<AppenderRef?ref="errorAppender"/>
????????AsyncLogger>???????
????????<Root?level="info">
????????????<Appender-Ref?ref="CONSOLE"/>
????????????<Appender-Ref?ref="appAppender"/>
????????????<AppenderRef?ref="errorAppender"/>
????????Root>?????????
????Loggers>
Configuration>
IndexController
測(cè)試Controller,用以打印日志進(jìn)行調(diào)試
@Slf4j
@RestController
public?class?IndexController?{
?@RequestMapping(value?=?"/index")
?public?String?index()?{
??InputMDC.putMDC();
??
??log.info("我是一條info日志");
??
??log.warn("我是一條warn日志");
??log.error("我是一條error日志");
??
??return?"idx";
?}
?@RequestMapping(value?=?"/err")
?public?String?err()?{
??InputMDC.putMDC();
??try?{
???int?a?=?1/0;
??}?catch?(Exception?e)?{
???log.error("算術(shù)異常",?e);
??}
??return?"err";
?}
?
}
InputMDC
用以獲取log中的[%X{hostName}]、[%X{ip}]、[%X{applicationName}]三個(gè)字段值
@Component
public?class?InputMDC?implements?EnvironmentAware?{
?private?static?Environment?environment;
?
?@Override
?public?void?setEnvironment(Environment?environment)?{
??InputMDC.environment?=?environment;
?}
?
?public?static?void?putMDC()?{
??MDC.put("hostName",?NetUtil.getLocalHostName());
??MDC.put("ip",?NetUtil.getLocalIp());
??MDC.put("applicationName",?environment.getProperty("spring.application.name"));
?}
}
NetUtil
public?class?NetUtil?{???
?
?public?static?String?normalizeAddress(String?address){
??String[]?blocks?=?address.split("[:]");
??if(blocks.length?>?2){
???throw?new?IllegalArgumentException(address?+?"?is?invalid");
??}
??String?host?=?blocks[0];
??int?port?=?80;
??if(blocks.length?>?1){
???port?=?Integer.valueOf(blocks[1]);
??}?else?{
???address?+=?":"+port;?//use?default?80
??}?
??String?serverAddr?=?String.format("%s:%d",?host,?port);
??return?serverAddr;
?}
?
?public?static?String?getLocalAddress(String?address){
??String[]?blocks?=?address.split("[:]");
??if(blocks.length?!=?2){
???throw?new?IllegalArgumentException(address?+?"?is?invalid?address");
??}?
??String?host?=?blocks[0];
??int?port?=?Integer.valueOf(blocks[1]);
??
??if("0.0.0.0".equals(host)){
???return?String.format("%s:%d",NetUtil.getLocalIp(),?port);
??}
??return?address;
?}
?
?private?static?int?matchedIndex(String?ip,?String[]?prefix){
??for(int?i=0;?i???String?p?=?prefix[i];
???if("*".equals(p)){?//*,?assumed?to?be?IP
????if(ip.startsWith("127.")?||
???????ip.startsWith("10.")?||?
???????ip.startsWith("172.")?||
???????ip.startsWith("192.")){
?????continue;
????}
????return?i;
???}?else?{
????if(ip.startsWith(p)){
?????return?i;
????}
???}?
??}
??
??return?-1;
?}
?
?public?static?String?getLocalIp(String?ipPreference)?{
??if(ipPreference?==?null){
???ipPreference?=?"*>10>172>192>127";
??}
??String[]?prefix?=?ipPreference.split("[>?]+");
??try?{
???Pattern?pattern?=?Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");
???Enumeration?interfaces?=?NetworkInterface.getNetworkInterfaces();
???String?matchedIp?=?null;
???int?matchedIdx?=?-1;
???while?(interfaces.hasMoreElements())?{
????NetworkInterface?ni?=?interfaces.nextElement();
????Enumeration?en?=?ni.getInetAddresses();?
????while?(en.hasMoreElements())?{
?????InetAddress?addr?=?en.nextElement();
?????String?ip?=?addr.getHostAddress();??
?????Matcher?matcher?=?pattern.matcher(ip);
?????if?(matcher.matches())?{??
??????int?idx?=?matchedIndex(ip,?prefix);
??????if(idx?==?-1)?continue;
??????if(matchedIdx?==?-1){
???????matchedIdx?=?idx;
???????matchedIp?=?ip;
??????}?else?{
???????if(matchedIdx>idx){
????????matchedIdx?=?idx;
????????matchedIp?=?ip;
???????}
??????}
?????}?
????}?
???}?
???if(matchedIp?!=?null)?return?matchedIp;
???return?"127.0.0.1";
??}?catch?(Exception?e)?{?
???return?"127.0.0.1";
??}
?}
?
?public?static?String?getLocalIp()?{
??return?getLocalIp("*>10>172>192>127");
?}
?
?public?static?String?remoteAddress(SocketChannel?channel){
??SocketAddress?addr?=?channel.socket().getRemoteSocketAddress();
??String?res?=?String.format("%s",?addr);
??return?res;
?}
?
?public?static?String?localAddress(SocketChannel?channel){
??SocketAddress?addr?=?channel.socket().getLocalSocketAddress();
??String?res?=?String.format("%s",?addr);
??return?addr==null??res:?res.substring(1);
?}
?
?public?static?String?getPid(){
??RuntimeMXBean?runtime?=?ManagementFactory.getRuntimeMXBean();
????????String?name?=?runtime.getName();
????????int?index?=?name.indexOf("@");
????????if?(index?!=?-1)?{
????????????return?name.substring(0,?index);
????????}
??return?null;
?}
?
?public?static?String?getLocalHostName()?{
????????try?{
????????????return?(InetAddress.getLocalHost()).getHostName();
????????}?catch?(UnknownHostException?uhe)?{
????????????String?host?=?uhe.getMessage();
????????????if?(host?!=?null)?{
????????????????int?colon?=?host.indexOf(':');
????????????????if?(colon?>?0)?{
????????????????????return?host.substring(0,?colon);
????????????????}
????????????}
????????????return?"UnknownHost";
????????}
????}
}
啟動(dòng)項(xiàng)目,訪問(wèn)/index和/ero接口,可以看到項(xiàng)目中生成了app-collector.log和error-collector.log兩個(gè)日志文件

我們將Springboot服務(wù)部署在192.168.11.31這臺(tái)機(jī)器上。
Kafka安裝和啟用
kafka下載地址:
http://kafka.apache.org/downloads.html
kafka安裝步驟:首先kafka安裝需要依賴與zookeeper,所以小伙伴們先準(zhǔn)備好zookeeper環(huán)境(三個(gè)節(jié)點(diǎn)即可),然后我們來(lái)一起構(gòu)建kafka broker。
##?解壓命令:
tar?-zxvf?kafka_2.12-2.1.0.tgz?-C?/usr/local/
##?改名命令:
mv?kafka_2.12-2.1.0/?kafka_2.12
##?進(jìn)入解壓后的目錄,修改server.properties文件:
vim?/usr/local/kafka_2.12/config/server.properties
##?修改配置:
broker.id=0
port=9092
host.name=192.168.11.51
advertised.host.name=192.168.11.51
log.dirs=/usr/local/kafka_2.12/kafka-logs
num.partitions=2
zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181
##?建立日志文件夾:
mkdir?/usr/local/kafka_2.12/kafka-logs
##啟動(dòng)kafka:
/usr/local/kafka_2.12/bin/kafka-server-start.sh?/usr/local/kafka_2.12/config/server.properties?&
創(chuàng)建兩個(gè)topic
##?創(chuàng)建topic
kafka-topics.sh?--zookeeper?192.168.11.111:2181?--create?--topic?app-log-collector?--partitions?1??--replication-factor?1
kafka-topics.sh?--zookeeper?192.168.11.111:2181?--create?--topic?error-log-collector?--partitions?1??--replication-factor?1?
我們可以查看一下topic情況
kafka-topics.sh?--zookeeper?192.168.11.111:2181?--topic?app-log-test?--describe
可以看到已經(jīng)成功啟用了app-log-collector和error-log-collector兩個(gè)topic

filebeat安裝和啟用
filebeat下載
cd?/usr/local/software
tar?-zxvf?filebeat-6.6.0-linux-x86_64.tar.gz?-C?/usr/local/
cd?/usr/local
mv?filebeat-6.6.0-linux-x86_64/?filebeat-6.6.0
配置filebeat,可以參考下方y(tǒng)ml配置文件
vim?/usr/local/filebeat-5.6.2/filebeat.yml
######################?Filebeat?Configuration?Example?#########################
filebeat.prospectors:
-?input_type:?log
??paths:
????##?app-服務(wù)名稱.log,?為什么寫死,防止發(fā)生輪轉(zhuǎn)抓取歷史數(shù)據(jù)
????-?/usr/local/logs/app-collector.log
??#定義寫入?ES?時(shí)的?_type?值
??document_type:?"app-log"
??multiline:
????#pattern:?'^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'???#?指定匹配的表達(dá)式(匹配以?2017-11-15?08:04:23:889?時(shí)間格式開頭的字符串)
????pattern:?'^\['??????????????????????????????#?指定匹配的表達(dá)式(匹配以?"{?開頭的字符串)
????negate:?true????????????????????????????????#?是否匹配到
????match:?after????????????????????????????????#?合并到上一行的末尾
????max_lines:?2000?????????????????????????????#?最大的行數(shù)
????timeout:?2s?????????????????????????????????#?如果在規(guī)定時(shí)間沒(méi)有新的日志事件就不等待后面的日志
??fields:
????logbiz:?collector
????logtopic:?app-log-collector???##?按服務(wù)劃分用作kafka?topic
????evn:?dev
-?input_type:?log
??paths:
????-?/usr/local/logs/error-collector.log
??document_type:?"error-log"
??multiline:
????#pattern:?'^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'???#?指定匹配的表達(dá)式(匹配以?2017-11-15?08:04:23:889?時(shí)間格式開頭的字符串)
????pattern:?'^\['??????????????????????????????#?指定匹配的表達(dá)式(匹配以?"{?開頭的字符串)
????negate:?true????????????????????????????????#?是否匹配到
????match:?after????????????????????????????????#?合并到上一行的末尾
????max_lines:?2000?????????????????????????????#?最大的行數(shù)
????timeout:?2s?????????????????????????????????#?如果在規(guī)定時(shí)間沒(méi)有新的日志事件就不等待后面的日志
??fields:
????logbiz:?collector
????logtopic:?error-log-collector???##?按服務(wù)劃分用作kafka?topic
????evn:?dev
????
output.kafka:
??enabled:?true
??hosts:?["192.168.11.51:9092"]
??topic:?'%{[fields.logtopic]}'
??partition.hash:
????reachable_only:?true
??compression:?gzip
??max_message_bytes:?1000000
??required_acks:?1
logging.to_files:?true
filebeat啟動(dòng):
檢查配置是否正確
cd?/usr/local/filebeat-6.6.0
./filebeat?-c?filebeat.yml?-configtest
##?Config?OK
啟動(dòng)filebeat
/usr/local/filebeat-6.6.0/filebeat?&
檢查是否啟動(dòng)成功
ps?-ef?|?grep?filebeat
可以看到filebeat已經(jīng)啟動(dòng)成功

然后我們?cè)L問(wèn)192.168.11.31:8001/index和192.168.11.31:8001/err,再查看kafka的logs文件,可以看到已經(jīng)生成了app-log-collector-0和error-log-collector-0文件,說(shuō)明filebeat已經(jīng)幫我們把數(shù)據(jù)收集好放到了kafka上。
logstash安裝
我們?cè)趌ogstash的安裝目錄下新建一個(gè)文件夾
mkdir?scrpit
然后cd進(jìn)該文件,創(chuàng)建一個(gè)logstash-script.conf文件
cd?scrpit
vim?logstash-script.conf
## multiline 插件也可以用于其他類似的堆棧式信息,比如 linux 的內(nèi)核日志。
input?{
??kafka?{
????##?app-log-服務(wù)名稱
????topics_pattern?=>?"app-log-.*"
????bootstrap_servers?=>?"192.168.11.51:9092"
?codec?=>?json
?consumer_threads?=>?1?##?增加consumer的并行消費(fèi)線程數(shù)
?decorate_events?=>?true
????#auto_offset_rest?=>?"latest"
?group_id?=>?"app-log-group"
???}
???
???kafka?{
????##?error-log-服務(wù)名稱
????topics_pattern?=>?"error-log-.*"
????bootstrap_servers?=>?"192.168.11.51:9092"
?codec?=>?json
?consumer_threads?=>?1
?decorate_events?=>?true
????#auto_offset_rest?=>?"latest"
?group_id?=>?"error-log-group"
???}
???
}
filter?{
??
??##?時(shí)區(qū)轉(zhuǎn)換
??ruby?{
?code?=>?"event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
??}
??if?"app-log"?in?[fields][logtopic]{
????grok?{
????????##?表達(dá)式,這里對(duì)應(yīng)的是Springboot輸出的日志格式
????????match?=>?["message",?"\[%{NOTSPACE:currentDateTime}\]?\[%{NOTSPACE:level}\]?\[%{NOTSPACE:thread-id}\]?\[%{NOTSPACE:class}\]?\[%{DATA:hostName}\]?\[%{DATA:ip}\]?\[%{DATA:applicationName}\]?\[%{DATA:location}\]?\[%{DATA:messageInfo}\]?##?(\'\'|%{QUOTEDSTRING:throwable})"]
????}
??}
??if?"error-log"?in?[fields][logtopic]{
????grok?{
????????##?表達(dá)式
????????match?=>?["message",?"\[%{NOTSPACE:currentDateTime}\]?\[%{NOTSPACE:level}\]?\[%{NOTSPACE:thread-id}\]?\[%{NOTSPACE:class}\]?\[%{DATA:hostName}\]?\[%{DATA:ip}\]?\[%{DATA:applicationName}\]?\[%{DATA:location}\]?\[%{DATA:messageInfo}\]?##?(\'\'|%{QUOTEDSTRING:throwable})"]
????}
??}
??
}
##?測(cè)試輸出到控制臺(tái):
output?{
??stdout?{?codec?=>?rubydebug?}
}
## elasticsearch:
output?{
??if?"app-log"?in?[fields][logtopic]{
?##?es插件
?elasticsearch?{
???????#?es服務(wù)地址
????????hosts?=>?["192.168.11.35:9200"]
????????#?用戶名密碼??????
????????user?=>?"elastic"
????????password?=>?"123456"
????????##?索引名,+?號(hào)開頭的,就會(huì)自動(dòng)認(rèn)為后面是時(shí)間格式:
????????##?javalog-app-service-2019.01.23?
????????index?=>?"app-log-%{[fields][logbiz]}-%{index_time}"
????????#?是否嗅探集群ip:一般設(shè)置true;http://192.168.11.35:9200/_nodes/http?pretty
????????#?通過(guò)嗅探機(jī)制進(jìn)行es集群負(fù)載均衡發(fā)日志消息
????????sniffing?=>?true
????????#?logstash默認(rèn)自帶一個(gè)mapping模板,進(jìn)行模板覆蓋
????????template_overwrite?=>?true
????}?
??}
??
??if?"error-log"?in?[fields][logtopic]{
?elasticsearch?{
????????hosts?=>?["192.168.11.35:9200"]????
????????user?=>?"elastic"
????????password?=>?"123456"
????????index?=>?"error-log-%{[fields][logbiz]}-%{index_time}"
????????sniffing?=>?true
????????template_overwrite?=>?true
????}?
??}
??
}
啟動(dòng)logstash
/usr/local/logstash-6.6.0/bin/logstash?-f?/usr/local/logstash-6.6.0/script/logstash-script.conf?&
等待啟動(dòng)成功,我們?cè)俅卧L問(wèn)192.168.11.31:8001/err
可以看到控制臺(tái)開始打印日志

ElasticSearch與Kibana

ES和Kibana的搭建之前沒(méi)寫過(guò)博客,網(wǎng)上資料也比較多,大家可以自行搜索。
搭建完成后,訪問(wèn)Kibana的管理頁(yè)面192.168.11.35:5601,選擇Management -> Kinaba - Index Patterns

然后Create index pattern
index pattern 輸入? app-log-*Time Filter field name 選擇 currentDateTime
這樣我們就成功創(chuàng)建了索引。
我們?cè)俅卧L問(wèn)192.168.11.31:8001/err,這個(gè)時(shí)候就可以看到我們已經(jīng)命中了一條log信息

里面展示了日志的全量信息

到這里,我們完整的日志收集及可視化就搭建完成了!
