Flume+Kafka雙劍合璧玩轉(zhuǎn)大數(shù)據(jù)平臺(tái)日志采集
點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”

大數(shù)據(jù)平臺(tái)每天會(huì)產(chǎn)生大量的日志,處理這些日志需要特定的日志系統(tǒng)。
一般而言,這些系統(tǒng)需要具有以下特征:
構(gòu)建應(yīng)用系統(tǒng)和分析系統(tǒng)的橋梁,并將它們之間的關(guān)聯(lián)解耦
支持近實(shí)時(shí)的在線分析系統(tǒng)和類似于Hadoop之類的離線分析系統(tǒng)
具有高可擴(kuò)展性。即:當(dāng)數(shù)據(jù)量增加時(shí),可以通過(guò)增加節(jié)點(diǎn)進(jìn)行水平擴(kuò)展
為此建議將日志采集分析系統(tǒng)分為如下幾個(gè)模塊:

數(shù)據(jù)采集模塊:負(fù)責(zé)從各節(jié)點(diǎn)上實(shí)時(shí)采集數(shù)據(jù),建議選用Flume-NG來(lái)實(shí)現(xiàn)。
數(shù)據(jù)接入模塊:由于采集數(shù)據(jù)的速度和數(shù)據(jù)處理的速度不一定同步,因此添加一個(gè)消息中間件來(lái)作為緩沖,建議選用Kafka來(lái)實(shí)現(xiàn)。
流式計(jì)算模塊:對(duì)采集到的數(shù)據(jù)進(jìn)行實(shí)時(shí)分析,建議選用Storm來(lái)實(shí)現(xiàn)。
數(shù)據(jù)輸出模塊:對(duì)分析后的結(jié)果持久化,可以使用HDFS、MySQL等。
日志采集選型
大數(shù)據(jù)平臺(tái)每天會(huì)產(chǎn)生大量的日志,處理這些日志需要特定的日志系統(tǒng)。目前常用的開源日志系統(tǒng)有 Flume 和Kafka兩種, 都是非常優(yōu)秀的日志系統(tǒng),且各有特點(diǎn)。下面我們來(lái)逐一認(rèn)識(shí)一下。
Flume組件特點(diǎn)
Flume是一個(gè)分布式、可靠、高可用的海量日志采集、聚合和傳輸?shù)娜罩臼占到y(tǒng)。支持在日志系統(tǒng)中定制各類數(shù)據(jù)發(fā)送方,用于收集數(shù)據(jù);同時(shí),F(xiàn)lume提供對(duì)數(shù)據(jù)進(jìn)行簡(jiǎn)單處理,并寫到各種數(shù)據(jù)接受方(可定制)的能力。
Flume的設(shè)計(jì)目標(biāo)
可靠性
Flume的核心是把數(shù)據(jù)從數(shù)據(jù)源收集過(guò)來(lái),再送到目的地。為了保證輸送一定成功,在送到目的地之前,會(huì)先緩存數(shù)據(jù),待數(shù)據(jù)真正到達(dá)目的地后,刪除自己緩存的數(shù)據(jù)。Flume 使用事務(wù)性的方式保證傳送Event整個(gè)過(guò)程的可靠性。
可擴(kuò)展性
Flume中只有一個(gè)角色Agent,其中包含Source、Sink、Channel三種組件。一個(gè)Agent的Sink可以輸出到另一個(gè)Agent的Source。這樣通過(guò)配置可以實(shí)現(xiàn)多個(gè)層次的流配置。
功能可擴(kuò)展性
Flume自帶豐富的Source、Sink、Channel實(shí)現(xiàn)。用戶也可以根據(jù)需要添加自定義的組件實(shí)現(xiàn), 并在配置中使用起來(lái)。
Flume的架構(gòu)
Flume的基本架構(gòu)是Agent。它是一個(gè)完整的數(shù)據(jù)收集工具,含有三個(gè)核心組件,分別是 Source、Channel、Sink。數(shù)據(jù)以Event為基本單位經(jīng)過(guò)Source、Channel、Sink,從外部數(shù)據(jù)源來(lái),向外部的目的地去。

除了單Agent的架構(gòu)外,還可以將多個(gè)Agent組合起來(lái)形成多層的數(shù)據(jù)流架構(gòu):
多個(gè)Agent順序連接:將多個(gè)Agent順序連接起來(lái),將最初的數(shù)據(jù)源經(jīng)過(guò)收集,存儲(chǔ)到最終的存儲(chǔ)系統(tǒng)中。一般情況下,應(yīng)該控制這種順序連接的Agent的數(shù)量,因?yàn)閿?shù)據(jù)流經(jīng)的路徑變長(zhǎng)了,如果不考慮Failover的話,出現(xiàn)故障將影響整個(gè)Flow上的Agent收集服務(wù)。

多個(gè)Agent的數(shù)據(jù)匯聚到同一個(gè)Agent:這種情況應(yīng)用的場(chǎng)景比較多,適用于數(shù)據(jù)源分散的分布式系統(tǒng)中數(shù)據(jù)流匯總。

多路(Multiplexing)Agent:多路模式一般有兩種實(shí)現(xiàn)方式,一種是用來(lái)復(fù)制,另一種是用來(lái)分流。復(fù)制方式可以將最前端的數(shù)據(jù)源復(fù)制多份,分別傳遞到多個(gè)Channel中,每個(gè)Channel接收到的數(shù)據(jù)都是相同的。分流方式,Selector可以根據(jù)Header的值來(lái)確定數(shù)據(jù)傳遞到哪一個(gè)Channel。

實(shí)現(xiàn)Load Balance功能:Channel中Event可以均衡到對(duì)應(yīng)的多個(gè)Sink組件上,而每個(gè)Sink組件再分別連接到一個(gè)獨(dú)立的Agent上,這樣可以實(shí)現(xiàn)負(fù)載均衡。

Kafka組件特點(diǎn)
kafka實(shí)際上是一個(gè)消息發(fā)布訂閱系統(tǒng)。Producer向某個(gè)Topic發(fā)布消息,而Consumer訂閱某個(gè)Topic的消息。一旦有新的關(guān)于某個(gè)Topic的消息,Broker會(huì)傳遞給訂閱它的所有Consumer。
Kafka的設(shè)計(jì)目標(biāo)
數(shù)據(jù)在磁盤上的存取代價(jià)為O(1)
Kafka以Topic來(lái)進(jìn)行消息管理,每個(gè)Topic包含多個(gè)Partition,每個(gè)Partition對(duì)應(yīng)一個(gè)邏輯log,由多個(gè)Segment組成。每個(gè)Segment中存儲(chǔ)多條消息。消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲(chǔ)位置,避免id到位置的額外映射。
為發(fā)布和訂閱提供高吞吐量
Kafka每秒可以生產(chǎn)約25萬(wàn)消息(50 MB),每秒處理55萬(wàn)消息(110 MB)。
分布式系統(tǒng),易于向外擴(kuò)展
所有的Producer、Broker和Consumer都會(huì)有多個(gè),均為分布式的。無(wú)需停機(jī)即可擴(kuò)展機(jī)器。
Kafka的架構(gòu)
Kafka是一個(gè)分布式的、可分區(qū)的、可復(fù)制的消息系統(tǒng),維護(hù)消息隊(duì)列。
Kafka的整體架構(gòu)非常簡(jiǎn)單,是顯式分布式架構(gòu),Producer、Broker和Consumer都可以有多個(gè)。Producer,consumer實(shí)現(xiàn)Kafka注冊(cè)的接口,數(shù)據(jù)從Producer發(fā)送到Broker,Broker承擔(dān)一個(gè)中間緩存和分發(fā)的作用。Broker分發(fā)注冊(cè)到系統(tǒng)中的Consumer。Broker的作用類似于緩存,即活躍的數(shù)據(jù)和離線處理系統(tǒng)之間的緩存??蛻舳撕头?wù)器端的通信,是基于簡(jiǎn)單、高性能、且與編程語(yǔ)言無(wú)關(guān)的TCP協(xié)議。
Flume與Kafka的比較
Flume和Kafka都是優(yōu)秀的日志系統(tǒng),其都能實(shí)現(xiàn)數(shù)據(jù)采集、數(shù)據(jù)傳輸、負(fù)載均衡、容錯(cuò)等一系列的需求, 但是兩者之間還是有著一定的差別。
由此可見Flume和Kafka還是各有特點(diǎn)的:
Flume 適用于沒有編程的配置解決方案,由于提供了豐富的source、channel、sink實(shí)現(xiàn),各種數(shù)據(jù)源的引入只是配置變更就可實(shí)現(xiàn)。
Kafka 適用于對(duì)數(shù)據(jù)管道的吞吐量、可用性要求都很高的解決方案,基本需要編程實(shí)現(xiàn)數(shù)據(jù)的生產(chǎn)和消費(fèi)。
日志采集選型小結(jié)
建議采用Flume作為數(shù)據(jù)的生產(chǎn)者,這樣可以不用編程就實(shí)現(xiàn)數(shù)據(jù)源的引入,并采用Kafka Sink作為數(shù)據(jù)的消費(fèi)者,這樣可以得到較高的吞吐量和可靠性。如果對(duì)數(shù)據(jù)的可靠性要求高的話,可以采用Kafka Channel來(lái)作為Flume的Channel使用。
Flume對(duì)接Kafka
Flume作為消息的生產(chǎn)者,將生產(chǎn)的消息數(shù)據(jù)(日志數(shù)據(jù)、業(yè)務(wù)請(qǐng)求數(shù)據(jù)等)通過(guò)Kafka Sink發(fā)布到Kafka中。
對(duì)接配置

對(duì)接示例
假設(shè)現(xiàn)有Flume實(shí)時(shí)讀取/data1/logs/component_role.log的數(shù)據(jù)并導(dǎo)入到Kafka的mytopic主題中。環(huán)境預(yù)設(shè)為:
Zookeeper 的地址為 zdh100:2181 zdh101:2181 zdh102:2181 Kafka broker的地址為 zdh100:9092 zdh101:9092 zdh102:9093
配置Flume agent,如下修改Flume配置:
gent1.sources = logsrc
agent1.channels = memcnl
agent1.sinks = kafkasink
#source section
agent1.sources.logsrc.type = exec
agent1.sources.logsrc.command = tail -F /data1/logs/component_role.log
agent1.sources.logsrc.shell = /bin/sh -c
agent1.sources.logsrc.batchSize = 50
agent1.sources.logsrc.channels = memcnl
# Each sink's type must be defined
agent1.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkasink.brokerList=zdh100:9092, zdh101:9092,zdh102:9092
agent1.sinks.kafkasink.topic=mytopic
agent1.sinks.kafkasink.requiredAcks = 1
agent1.sinks.kafkasink.batchSize = 20
agent1.sinks.kafkasink.channel = memcnl
# Each channel's type is defined.
agent1.channels.memcnl.type = memory
agent1.channels.memcnl.capacity = 1000啟動(dòng)該Flume節(jié)點(diǎn):
/home/mr/flume/bin/flume-ng agent -c
/home/mr/flume/conf -f /home/mr/flume/conf/flume-conf.properties -n agent1 -Dflume.monitoring.type=http -Dflume.monitoring.port=10100動(dòng)態(tài)追加日志數(shù)據(jù),執(zhí)行命令向 /data1/logs/component_role.log 添加數(shù)據(jù):
echo "測(cè)試代碼" >> /data1/logs/component_role.log
echo "檢測(cè)Flume+Kafka數(shù)據(jù)管道通暢" >> /data1/logs/component_role.log驗(yàn)證Kafka數(shù)據(jù)接收結(jié)果,執(zhí)行命令檢查Kafka收到的數(shù)據(jù)是否正確,應(yīng)該可以呈現(xiàn)剛才追加的數(shù)據(jù):
/home/mr/kafka/bin/kafka-console-consumer.sh --zookeeper zdh100:2181 --topic mytopic --from-beginning輸出結(jié)果如下:


版權(quán)聲明:
文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!??




