日志從 Kafka 到 Loki 的 N 種方式?
最近群里有小伙伴有說到自己的日志存儲路徑先是從客戶端到Kafka,再通過消費kafka到ElasticSearch。現(xiàn)在要將ES換成Loki面臨需要同時支持Kafka和Loki插件的工具。小白查了下當前市面上滿足需求且足夠可靠的工具分別為Fluentd、Logstash以及Vector。
Fluentd
CNCF已畢業(yè)的云原生日志采集客戶端。與kubernetes結合比較緊密,插件豐富且有大廠背書。不足之處在于受ruby限制,在日志量大(建議使用FluentBit)時本身的資源消耗不小。
Logstash
ELK棧中老牌的日志采集和聚合工具,使用廣泛且插件豐富,不足之處在于資源消耗整體比較高,單機日志并發(fā)處理效率不高。
Vector
剛開源不久的輕量級日志客戶端,產品集成度比較高,資源消耗極低。不足之處就是當下產品似乎沒有還沒有廣泛的最佳實踐。
官方性能報告 https://vector.dev/#performance
以下是vector分別對上述產品做的一個性能測試對比,大家可以參考下:
| Test | Vector | FluentBit | FluentD | Logstash |
|---|---|---|---|---|
| TCP to Blackhole | 86mib/s | 64.4mib/s | 27.7mib/s | 40.6mib/s |
| File to TCP | 76.7mib/s | 35mib/s | 26.1mib/s | 3.1mib/s |
| Regex Parsing | 13.2mib/s | 20.5mib/s | 2.6mib/s | 4.6mib/s |
| TCP to HTTP | 26.7mib/s | 19.6mib/s | <1mib/s | 2.7mib/s |
| TCP to TCP | 69.9mib/s | 67.1mib/s | 3.9mib/s | 10mib/s |
那么接下來進入主題吧,當我們需要將Kafka里的日志存進Loki時,我們有哪些方法實現(xiàn),先來看個簡單的?
Vector
Vector內部已經集成好了kafka和loki方法,我們只需下載vector和配置就能直接用起來。

安裝vector
curl?--proto?'=https'?--tlsv1.2?-sSf?https://sh.vector.dev?|?sh
或者你可以直接使用docker鏡像
docker?pull?timberio/vector:0.10.0-alpine
vector配置
[sources.in]
??bootstrap_servers?=?""
??group_id?=?"<消費組id>"
??topics?=?["^(prefix1|prefix2)-.+",?"topic-1",?"topic-2"]??\\topic名字,支持正則
??type?=?"kafka"
[sinks.out]
??endpoint?=?"http://" ?
??inputs?=?["in"]??????\\source.in?
??type?=?"loki"
??labels.key?=?"value"??\\自定義key
??labels.key?=?"{{?event_field?}}"?\\event的動態(tài)值
關于vector-loki更多參數(shù)可以參考:https://vector.dev/docs/reference/sinks/loki/
Fluentd
Input - fluent-plugin-kafka
fluent-plugin-kafka插件是fluent的官方處理kafka的插件,可同時用于input和output兩個階段。它的安裝方式如下:
gem?install?fluent-plugin-kafka
當它用于input階段時,這時fluentd就會作為一個kafka的消費者,從指定的topic中取出消息并做相關處理,它的配置如下:
??@type?kafka
??brokers??borker地址>
??topics?
??format?<日志處理格式,默認是json,支持text|json|ltsv|msgpack>
??message_key?<日志key,僅用于text格式的日志>
??add_prefix?<添加fluentd的tag前綴>
??add_suffix?<添加fluentd的tag后綴>
如果你想指定從不同topic的偏移量開始消費消息的話,就需要如下配置:
??@type?kafka
??brokers???borker地址>
??format???<日志處理格式,默認是json,支持text|json|ltsv|msgpack>
??
????topic?????<單個topic名>
????partition?
????offset????<從offset開始>
??
??
????topic?????<單個topic名>
????partition?
????offset????<從offset開始>
??
熟悉fluentd的同學可能知道,在fluentd中是以tag名來處理pipeline的,默認情況下kafka插件會用topic名來做你tag名,如果你想做一些全局的filter可以添加tag前綴/后綴來全局匹配實現(xiàn)。
Output - fluent-plugin-grafana-loki
fluent-plugin-grafana-loki是grafana lab貢獻的一個從fluentd發(fā)送日志到loki的插件。之前小白在《Loki和Fluentd的那點事兒》里介紹過,這里不過多展開。
配置直接從以前的文章中copy過來,主要的區(qū)別在于tag的匹配,參考如下:
?$kafka.topic>??\\此處為kafka的topic
??@type?loki
??@id?loki.output
??url?"http://loki:3100"
??
????key1??????????\\如果你的日志json格式,那么你可以將需要提取
????key2??????????\\的字段作為你的loki?labels
??
???label>
????@type?file
????path?/var/log/fluentd-buffers/loki.buffer
????flush_mode?interval
????flush_thread_count?4
????flush_interval?3s
????retry_type?exponential_backoff
????retry_wait?2s
????retry_max_interval?60s
????retry_timeout?12h
????chunk_limit_size?8M
????total_limit_size?5G
????queued_chunks_limit_size?64
????overflow_action?drop_oldest_chunk
??
Logstash
Input - logstash-input-kafka
logstash-input-kafka是elastic官方提供的kafka消費端插件,對于input階段的配置也比較簡單。
input?{
????kafka?{
????????bootstrap_servers?=>?""
????????topics?=>?""
????????codec?=>?"<日志類型,默認plain>"
????????tags?=>?""
????????}
????}
更多的參數(shù)參考https://www.elastic.co/guide/en/logstash/7.10/plugins-inputs-kafka.html
Output - logstash-output-loki
logstash-output-loki也是由grafana lab貢獻的用于處理往loki輸出的logstash插件。安裝時直接執(zhí)行下列命令即可:
logstash-plugin?install?logstash-output-loki
logstash輸出到loki的參數(shù)不多,小白撿主要的幾個說明如下:
output?{
??loki?{
????url?=>?"http:///loki/api/v1/push"
????batch_size?=>?112640?#單次push的日志包大小
????retries?=>?5
????min_delay?=>?3
????max_delay?=>?500
????message_field?=>?"<日志消息行的key,改key由logstas傳遞過來,默認為message>"
??}
}
總結
以上三個工具均沒有做filter和解析,僅僅只是充當管道將日志從kafka里轉存到loki,實際環(huán)境可能比較復雜,需要對日志做進一步分析。不過從小白的體驗來看vector對于日志從kafka到loki的配置算是比較簡單直接,fluentd和logstash整體差不多,就看大家自己的順手程度了。
?點擊屏末?|?閱讀原文?|?即刻學習