Kafka實戰(zhàn):如何跨機房傳輸數(shù)據(jù)
工作中遇到Kafka跨機房傳輸?shù)竭h程機房的場景,之前的方案是使用Flume消費后轉(zhuǎn)發(fā)到目標kafka,當topic增多并且數(shù)據(jù)量變大后,維護性較差且Flume較耗費資源。
一、原理
參考官網(wǎng):http://kafka.apache.org/10/documentation.html#basic_ops_mirror_maker
參考:https://www.sohu.com/a/217316110_411876
MirrorMaker 為Kafka 內(nèi)置的跨集群/機房數(shù)據(jù)復制工具,二進制包解壓后bin目錄下有kafka-mirror-maker.sh,Mirror Maker啟動后,包含了一組消費者,這些消費者屬于同一個group,并從多個topic上讀取數(shù)據(jù),所有的topic均使用該group.id,每個MirrorMaker 進程僅有一個生產(chǎn)者,該生產(chǎn)者將數(shù)據(jù)發(fā)送給目標集群的多個topic;
Kafka MirrorMaker的官方文檔一直沒有更新,因此新版Kafka為MirrorMaker增加的一些參數(shù)、特性等在文檔上往往找不到,需要看Kafka MirrorMaker的源碼,Kafka MirrorMaker啟動腳步如下,發(fā)現(xiàn)其主類位于kafka.tools.MirrorMaker,尤其是一些參數(shù)的解析邏輯和主要的執(zhí)行流程,會比較有助于我們理解和運維Kafka MirrorMaker;
代碼示例
exec $(dirname $0)/kafka-run-class.sh kafka.tools.MirrorMaker"$@"
MirrorMaker 為每個消費者分配一個線程,消費者從源集群的topic和分區(qū)上讀取數(shù)據(jù),然后通過公共生產(chǎn)者將數(shù)據(jù)發(fā)送到目標集群上,官方建議盡量讓 MirrorMaker 運行在目標數(shù)據(jù)中心里,因為長距離的跨機房網(wǎng)絡相對而言更加不可靠,如果發(fā)生了網(wǎng)絡分區(qū),數(shù)據(jù)中心之間斷開了連接,無法連接到集群的消費者要比一個無法連接到集群的生產(chǎn)者要安全得多。
如果消費者無法連接到集群,最多也就是無法消費數(shù)據(jù),數(shù)據(jù)仍然會在 Kafka 集群里保留很長的一段時間,不會有丟失的風險。相反,在發(fā)生網(wǎng)絡分區(qū)時如果 MirrorMaker 已經(jīng)讀取了數(shù)據(jù),但無法將數(shù)據(jù)生產(chǎn)到目標集群上,就會造成數(shù)據(jù)丟失。所以說遠程讀取比遠程生成更加安全。

建議:
建議啟動多個kafak-mirror-maker.sh 進程來完成數(shù)據(jù)同步,這樣就算有進程掛掉,topic的同組消費者可以進行reblance;
建議將kafka-mirror-maker.sh進程啟動在目標集群,原因上文有提及;
kafak-mirror-maker.sh啟動默認不會后臺運行,調(diào)用kafka-run-class.sh的啟動內(nèi)存256M,需要修改一下啟動參數(shù)(內(nèi)存大小、日志);
建議對source 集群的whitelist中的topic的消費情況,加實時的積壓量監(jiān)控;
建議producer.properties配置中開啟auto.create.topics.enable=true;
二、使用和配置
消費端配置(consumer.properties)
生產(chǎn)環(huán)境的source kafka版本是0.10,使用zk指定集群地址,配置方式如下:
zo
?zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
group.id=groupyzg-02
# 選取鏡像數(shù)據(jù)的起始?即鏡像MirrorMaker啟動后的數(shù)據(jù),參數(shù)latest,還是鏡像之前的數(shù)據(jù),參數(shù)earliest
auto.offset.reset=largest
# 更改分區(qū)策略,默認是range,雖然有一定優(yōu)勢但會導致不公平現(xiàn)象,特別是鏡像大量的主題和分區(qū)的時候,0.10版本設置
partition.assignment.strategy=roundrobin
source kafka版本是1.0,配置bootstrap-server指定kafka集群地址,配置方式如下:
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
group.id=groupyzg-02
# 選取鏡像數(shù)據(jù)的起始?即鏡像MirrorMaker啟動后的數(shù)據(jù),參數(shù)latest,還是鏡像之前的數(shù)據(jù),參數(shù)earliest
auto.offset.reset=latest
# 消費者提交心跳周期,默認3000,由于是遠程鏡像,此處設為30秒
heartbeat.interval.ms=30000
# 消費連接超時值,默認10000,由于遠程鏡像,此處設為100秒
session.timeout.ms=100000
# 更改分區(qū)策略,默認是range,雖然有一定優(yōu)勢但會導致不公平現(xiàn)象,特別是鏡像大量的主題和分區(qū)的時候
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
# 單個poll()執(zhí)行的最大record數(shù),默認是500
max.poll.records=20000
# 讀數(shù)據(jù)時tcp接收緩沖區(qū)大小,默認是65536(64KiB)
receive.buffer.bytes=4194304
# 設置每個分區(qū)總的大小,默認是1048576
max.partition.fetch.bytes=10485760
生產(chǎn)者配置(producer.properties)
配置mirror-maker的source集群和target集群的版本多不一致,當前生產(chǎn)使用的kafka版本是1.0.0版本,producer的配置如下:
bootstrap.servers = 192.168.xxx:9092,192.168.xxx:9092
buffer.memory = 268435456
batch.size = 104857
acks=0
linger.ms=10
max.request.size = 10485760
send.buffer.bytes = 10485760
compression.type=snappy
啟動、優(yōu)化、日志監(jiān)控
啟動命令kafka-mirror-maker.sh中添加端口約束和啟動內(nèi)存配置:
export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"
export JMX_PORT="8888"
exec $(dirname $0)/kafka-run-class.sh kafka.tools.MirrorMaker"$@"
日志監(jiān)控:若想輸出日志數(shù)據(jù),則使用一下命令啟動,日志數(shù)據(jù)會保存在kafka/logs/mirrormaker.out 中;
./kafka-run-class.sh -daemon -name mirror_maker -loggc kafka.tools.MirrorMaker--consumer.config consumer.properties --num.streams 2--producer.config producer.properties --whitelist='testnet'
積壓監(jiān)控:
0.10版本的積壓量監(jiān)控:
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker--zookeeper xxxx:21810,xxx:21810,xxx:21810--topic testnet -group testnet-group
1.0版本的積壓量監(jiān)控:
./kafka-consumer-groups.sh --bootstrap-server xxx:9092--describe --group testnet-group
進程數(shù)監(jiān)控:建議增加mirror-maker的進程數(shù)監(jiān)控,及時發(fā)現(xiàn)并啟動掛點進程;
#!/bin/bash
###################
#
# info :5 mins to check last 5mins logs
# add by deploy
# date:20190917
#
###################
#當前時間
sj=`date "+%F %T"`
#當前時間5分鐘前
last_sj=`date "+%F %T" -d '-5 min'`
#定義目錄
runlog=~/kafka_2.11-1.0.0/alarm/run.log
#通知手機號
noticetel="138XXXXXXXX"
province=~/kafka_2.11-1.0.0/alarm/province.cfg
tmplog=~/kafka_2.11-1.0.0/alarm/tmp.log
###短信通知,也可以使用郵箱通知服務
smsnotice(){
info=$@
IFS=","
for i in $noticetel;do
curl -kd xx
#curl -D - -kd xx
done
}
###判斷mirror-maker的進程個數(shù);
province_all=`cat ${province}|wc -l`
mount=`ps -ef|grep -i mirror_maker-gc |wc -l`
ps -ef|grep -i mirror_maker-gc >${tmplog}
echo "the mount of mirror-maker is `expr $mount - 1`!"> $runlog
echo "the mount of province config is $province_all ! ">> $runlog
if[ `expr $mount - 1`-ge $province_all ] ;then
echo "`hostname -i` ----${sj} ---- the mirrormaker is ok!">> $runlog
else
message="`hostname -i` ----${sj} ----the mount mirror-maker processor `expr $mount - 1` is less than the mount of province_config $province_all, "
echo ${message} >> $runlog
while read line
do
province_name=`echo ${line}|awk -F '|' '{print $1}'`
province_code=`echo ${line}|awk -F '|' '{print $2}'`
mount_two=`cat ${tmplog}|grep -i ${province_code} |wc -l`
if[ $mount_two -ge 1] ;then
echo "`hostname -i` ----${sj} ---- the province of ${province_name} is ok!">> $runlog
else
message_two="${message} the province of [ ${province_name} ] mirror-maker processor is down, please check for it!"
echo ${message_two} >> $runlog
smsnotice ${message_two}
fi
done<${province}
fi
長按掃碼添加“Python小助手”?
進入?P Y 交 流 群
▼點擊成為社區(qū)會員? ?喜歡就點個在看吧


