Flume+Kafka雙劍合璧玩轉(zhuǎn)大數(shù)據(jù)平臺日志采集
大數(shù)據(jù)平臺每天會產(chǎn)生大量的日志,處理這些日志需要特定的日志系統(tǒng)。
一般而言,這些系統(tǒng)需要具有以下特征:
構(gòu)建應(yīng)用系統(tǒng)和分析系統(tǒng)的橋梁,并將它們之間的關(guān)聯(lián)解耦
支持近實時的在線分析系統(tǒng)和類似于Hadoop之類的離線分析系統(tǒng)
具有高可擴展性。即:當(dāng)數(shù)據(jù)量增加時,可以通過增加節(jié)點進行水平擴展
為此建議將日志采集分析系統(tǒng)分為如下幾個模塊:

數(shù)據(jù)采集模塊:負責(zé)從各節(jié)點上實時采集數(shù)據(jù),建議選用Flume-NG來實現(xiàn)。
數(shù)據(jù)接入模塊:由于采集數(shù)據(jù)的速度和數(shù)據(jù)處理的速度不一定同步,因此添加一個消息中間件來作為緩沖,建議選用Kafka來實現(xiàn)。
流式計算模塊:對采集到的數(shù)據(jù)進行實時分析,建議選用Storm來實現(xiàn)。
數(shù)據(jù)輸出模塊:對分析后的結(jié)果持久化,可以使用HDFS、MySQL等。

大數(shù)據(jù)平臺每天會產(chǎn)生大量的日志,處理這些日志需要特定的日志系統(tǒng)。目前常用的開源日志系統(tǒng)有 Flume 和Kafka兩種, 都是非常優(yōu)秀的日志系統(tǒng),且各有特點。下面我們來逐一認識一下。

Flume是一個分布式、可靠、高可用的海量日志采集、聚合和傳輸?shù)娜罩臼占到y(tǒng)。支持在日志系統(tǒng)中定制各類數(shù)據(jù)發(fā)送方,用于收集數(shù)據(jù);同時,F(xiàn)lume提供對數(shù)據(jù)進行簡單處理,并寫到各種數(shù)據(jù)接受方(可定制)的能力。

可靠性
Flume的核心是把數(shù)據(jù)從數(shù)據(jù)源收集過來,再送到目的地。為了保證輸送一定成功,在送到目的地之前,會先緩存數(shù)據(jù),待數(shù)據(jù)真正到達目的地后,刪除自己緩存的數(shù)據(jù)。Flume 使用事務(wù)性的方式保證傳送Event整個過程的可靠性。
可擴展性
Flume中只有一個角色Agent,其中包含Source、Sink、Channel三種組件。一個Agent的Sink可以輸出到另一個Agent的Source。這樣通過配置可以實現(xiàn)多個層次的流配置。
功能可擴展性
Flume自帶豐富的Source、Sink、Channel實現(xiàn)。用戶也可以根據(jù)需要添加自定義的組件實現(xiàn), 并在配置中使用起來。

Flume的基本架構(gòu)是Agent。它是一個完整的數(shù)據(jù)收集工具,含有三個核心組件,分別是 Source、Channel、Sink。數(shù)據(jù)以Event為基本單位經(jīng)過Source、Channel、Sink,從外部數(shù)據(jù)源來,向外部的目的地去。

除了單Agent的架構(gòu)外,還可以將多個Agent組合起來形成多層的數(shù)據(jù)流架構(gòu):
多個Agent順序連接:將多個Agent順序連接起來,將最初的數(shù)據(jù)源經(jīng)過收集,存儲到最終的存儲系統(tǒng)中。一般情況下,應(yīng)該控制這種順序連接的Agent的數(shù)量,因為數(shù)據(jù)流經(jīng)的路徑變長了,如果不考慮Failover的話,出現(xiàn)故障將影響整個Flow上的Agent收集服務(wù)。

多個Agent的數(shù)據(jù)匯聚到同一個Agent:這種情況應(yīng)用的場景比較多,適用于數(shù)據(jù)源分散的分布式系統(tǒng)中數(shù)據(jù)流匯總。

多路(Multiplexing)Agent:多路模式一般有兩種實現(xiàn)方式,一種是用來復(fù)制,另一種是用來分流。復(fù)制方式可以將最前端的數(shù)據(jù)源復(fù)制多份,分別傳遞到多個Channel中,每個Channel接收到的數(shù)據(jù)都是相同的。分流方式,Selector可以根據(jù)Header的值來確定數(shù)據(jù)傳遞到哪一個Channel。

實現(xiàn)Load Balance功能:Channel中Event可以均衡到對應(yīng)的多個Sink組件上,而每個Sink組件再分別連接到一個獨立的Agent上,這樣可以實現(xiàn)負載均衡。


kafka實際上是一個消息發(fā)布訂閱系統(tǒng)。Producer向某個Topic發(fā)布消息,而Consumer訂閱某個Topic的消息。一旦有新的關(guān)于某個Topic的消息,Broker會傳遞給訂閱它的所有Consumer。

數(shù)據(jù)在磁盤上的存取代價為O(1)
Kafka以Topic來進行消息管理,每個Topic包含多個Partition,每個Partition對應(yīng)一個邏輯log,由多個Segment組成。每個Segment中存儲多條消息。消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲位置,避免id到位置的額外映射。
為發(fā)布和訂閱提供高吞吐量
Kafka每秒可以生產(chǎn)約25萬消息(50 MB),每秒處理55萬消息(110 MB)。
分布式系統(tǒng),易于向外擴展
所有的Producer、Broker和Consumer都會有多個,均為分布式的。無需停機即可擴展機器。

Kafka是一個分布式的、可分區(qū)的、可復(fù)制的消息系統(tǒng),維護消息隊列。
Kafka的整體架構(gòu)非常簡單,是顯式分布式架構(gòu),Producer、Broker和Consumer都可以有多個。Producer,consumer實現(xiàn)Kafka注冊的接口,數(shù)據(jù)從Producer發(fā)送到Broker,Broker承擔(dān)一個中間緩存和分發(fā)的作用。Broker分發(fā)注冊到系統(tǒng)中的Consumer。Broker的作用類似于緩存,即活躍的數(shù)據(jù)和離線處理系統(tǒng)之間的緩存??蛻舳撕头?wù)器端的通信,是基于簡單、高性能、且與編程語言無關(guān)的TCP協(xié)議。

Flume和Kafka都是優(yōu)秀的日志系統(tǒng),其都能實現(xiàn)數(shù)據(jù)采集、數(shù)據(jù)傳輸、負載均衡、容錯等一系列的需求, 但是兩者之間還是有著一定的差別。
由此可見Flume和Kafka還是各有特點的:
Flume 適用于沒有編程的配置解決方案,由于提供了豐富的source、channel、sink實現(xiàn),各種數(shù)據(jù)源的引入只是配置變更就可實現(xiàn)。
Kafka 適用于對數(shù)據(jù)管道的吞吐量、可用性要求都很高的解決方案,基本需要編程實現(xiàn)數(shù)據(jù)的生產(chǎn)和消費。

建議采用Flume作為數(shù)據(jù)的生產(chǎn)者,這樣可以不用編程就實現(xiàn)數(shù)據(jù)源的引入,并采用Kafka Sink作為數(shù)據(jù)的消費者,這樣可以得到較高的吞吐量和可靠性。如果對數(shù)據(jù)的可靠性要求高的話,可以采用Kafka Channel來作為Flume的Channel使用。

Flume作為消息的生產(chǎn)者,將生產(chǎn)的消息數(shù)據(jù)(日志數(shù)據(jù)、業(yè)務(wù)請求數(shù)據(jù)等)通過Kafka Sink發(fā)布到Kafka中。



假設(shè)現(xiàn)有Flume實時讀取/data1/logs/component_role.log的數(shù)據(jù)并導(dǎo)入到Kafka的mytopic主題中。環(huán)境預(yù)設(shè)為:
Zookeeper 的地址為 zdh100:2181 zdh101:2181 zdh102:2181 Kafka broker的地址為 zdh100:9092 zdh101:9092 zdh102:9093
配置Flume agent,如下修改Flume配置:
gent1.sources = logsrc
agent1.channels = memcnl
agent1.sinks = kafkasink
#source section
agent1.sources.logsrc.type = exec
agent1.sources.logsrc.command = tail -F /data1/logs/component_role.log
agent1.sources.logsrc.shell = /bin/sh -c
agent1.sources.logsrc.batchSize = 50
agent1.sources.logsrc.channels = memcnl
# Each sink's type must be defined
agent1.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkasink.brokerList=zdh100:9092, zdh101:9092,zdh102:9092
agent1.sinks.kafkasink.topic=mytopic
agent1.sinks.kafkasink.requiredAcks = 1
agent1.sinks.kafkasink.batchSize = 20
agent1.sinks.kafkasink.channel = memcnl
# Each channel's type is defined.
agent1.channels.memcnl.type = memory
agent1.channels.memcnl.capacity = 1000啟動該Flume節(jié)點:
/home/mr/flume/bin/flume-ng agent -c
/home/mr/flume/conf -f /home/mr/flume/conf/flume-conf.properties -n agent1 -Dflume.monitoring.type=http -Dflume.monitoring.port=10100動態(tài)追加日志數(shù)據(jù),執(zhí)行命令向 /data1/logs/component_role.log 添加數(shù)據(jù):
echo "測試代碼" >> /data1/logs/component_role.log
echo "檢測Flume+Kafka數(shù)據(jù)管道通暢" >> /data1/logs/component_role.log驗證Kafka數(shù)據(jù)接收結(jié)果,執(zhí)行命令檢查Kafka收到的數(shù)據(jù)是否正確,應(yīng)該可以呈現(xiàn)剛才追加的數(shù)據(jù):
/home/mr/kafka/bin/kafka-console-consumer.sh --zookeeper zdh100:2181 --topic mytopic --from-beginning輸出結(jié)果如下:

此文章來源網(wǎng)絡(luò),因未找到來源,如有侵權(quán),請聯(lián)系浪尖微信(langjianliaodashuju)刪除。
