<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Kafka實戰(zhàn):如何跨機房傳輸數(shù)據(jù)

          共 4989字,需瀏覽 10分鐘

           ·

          2020-08-26 00:56

          工作中遇到Kafka跨機房傳輸?shù)竭h程機房的場景,之前的方案是使用Flume消費后轉(zhuǎn)發(fā)到目標kafka,當topic增多并且數(shù)據(jù)量變大后,維護性較差且Flume較耗費資源。

          一、原理

          1. 參考官網(wǎng):http://kafka.apache.org/10/documentation.html#basic_ops_mirror_maker

          2. 參考:https://www.sohu.com/a/217316110_411876

          MirrorMaker 為Kafka 內(nèi)置的跨集群/機房數(shù)據(jù)復制工具,二進制包解壓后bin目錄下有kafka-mirror-maker.sh,Mirror Maker啟動后,包含了一組消費者,這些消費者屬于同一個group,并從多個topic上讀取數(shù)據(jù),所有的topic均使用該group.id,每個MirrorMaker 進程僅有一個生產(chǎn)者,該生產(chǎn)者將數(shù)據(jù)發(fā)送給目標集群的多個topic;

          Kafka MirrorMaker的官方文檔一直沒有更新,因此新版Kafka為MirrorMaker增加的一些參數(shù)、特性等在文檔上往往找不到,需要看Kafka MirrorMaker的源碼,Kafka MirrorMaker啟動腳步如下,發(fā)現(xiàn)其主類位于kafka.tools.MirrorMaker,尤其是一些參數(shù)的解析邏輯和主要的執(zhí)行流程,會比較有助于我們理解和運維Kafka MirrorMaker;

          代碼示例

          1. exec $(dirname $0)/kafka-run-class.sh kafka.tools.MirrorMaker"$@"

          MirrorMaker 為每個消費者分配一個線程,消費者從源集群的topic和分區(qū)上讀取數(shù)據(jù),然后通過公共生產(chǎn)者將數(shù)據(jù)發(fā)送到目標集群上,官方建議盡量讓 MirrorMaker 運行在目標數(shù)據(jù)中心里,因為長距離的跨機房網(wǎng)絡相對而言更加不可靠,如果發(fā)生了網(wǎng)絡分區(qū),數(shù)據(jù)中心之間斷開了連接,無法連接到集群的消費者要比一個無法連接到集群的生產(chǎn)者要安全得多。

          如果消費者無法連接到集群,最多也就是無法消費數(shù)據(jù),數(shù)據(jù)仍然會在 Kafka 集群里保留很長的一段時間,不會有丟失的風險。相反,在發(fā)生網(wǎng)絡分區(qū)時如果 MirrorMaker 已經(jīng)讀取了數(shù)據(jù),但無法將數(shù)據(jù)生產(chǎn)到目標集群上,就會造成數(shù)據(jù)丟失。所以說遠程讀取比遠程生成更加安全。

          建議:

          1. 建議啟動多個kafak-mirror-maker.sh 進程來完成數(shù)據(jù)同步,這樣就算有進程掛掉,topic的同組消費者可以進行reblance;

          2. 建議將kafka-mirror-maker.sh進程啟動在目標集群,原因上文有提及;

          3. kafak-mirror-maker.sh啟動默認不會后臺運行,調(diào)用kafka-run-class.sh的啟動內(nèi)存256M,需要修改一下啟動參數(shù)(內(nèi)存大小、日志);

          4. 建議對source 集群的whitelist中的topic的消費情況,加實時的積壓量監(jiān)控;

          5. 建議producer.properties配置中開啟auto.create.topics.enable=true;

          二、使用和配置

          • 消費端配置(consumer.properties)

            生產(chǎn)環(huán)境的source kafka版本是0.10,使用zk指定集群地址,配置方式如下:

          zo

          1. ?zookeeper.connect=zk1:2181,zk2:2181,zk3:2181

          2. group.id=groupyzg-02

          3. # 選取鏡像數(shù)據(jù)的起始?即鏡像MirrorMaker啟動后的數(shù)據(jù),參數(shù)latest,還是鏡像之前的數(shù)據(jù),參數(shù)earliest

          4. auto.offset.reset=largest


          5. # 更改分區(qū)策略,默認是range,雖然有一定優(yōu)勢但會導致不公平現(xiàn)象,特別是鏡像大量的主題和分區(qū)的時候,0.10版本設置

          6. partition.assignment.strategy=roundrobin

          source kafka版本是1.0,配置bootstrap-server指定kafka集群地址,配置方式如下:

          1. bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092

          2. group.id=groupyzg-02


          3. # 選取鏡像數(shù)據(jù)的起始?即鏡像MirrorMaker啟動后的數(shù)據(jù),參數(shù)latest,還是鏡像之前的數(shù)據(jù),參數(shù)earliest

          4. auto.offset.reset=latest


          5. # 消費者提交心跳周期,默認3000,由于是遠程鏡像,此處設為30秒

          6. heartbeat.interval.ms=30000


          7. # 消費連接超時值,默認10000,由于遠程鏡像,此處設為100秒

          8. session.timeout.ms=100000


          9. # 更改分區(qū)策略,默認是range,雖然有一定優(yōu)勢但會導致不公平現(xiàn)象,特別是鏡像大量的主題和分區(qū)的時候

          10. partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor


          11. # 單個poll()執(zhí)行的最大record數(shù),默認是500

          12. max.poll.records=20000


          13. # 讀數(shù)據(jù)時tcp接收緩沖區(qū)大小,默認是65536(64KiB)

          14. receive.buffer.bytes=4194304


          15. # 設置每個分區(qū)總的大小,默認是1048576

          16. max.partition.fetch.bytes=10485760

          • 生產(chǎn)者配置(producer.properties)

            配置mirror-maker的source集群和target集群的版本多不一致,當前生產(chǎn)使用的kafka版本是1.0.0版本,producer的配置如下:

          1. bootstrap.servers = 192.168.xxx:9092,192.168.xxx:9092

          2. buffer.memory = 268435456

          3. batch.size = 104857

          4. acks=0

          5. linger.ms=10

          6. max.request.size = 10485760

          7. send.buffer.bytes = 10485760

          8. compression.type=snappy

          • 啟動、優(yōu)化、日志監(jiān)控

          啟動命令kafka-mirror-maker.sh中添加端口約束和啟動內(nèi)存配置:
          1. export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"

          2. export JMX_PORT="8888"

          3. exec $(dirname $0)/kafka-run-class.sh kafka.tools.MirrorMaker"$@"

          日志監(jiān)控:若想輸出日志數(shù)據(jù),則使用一下命令啟動,日志數(shù)據(jù)會保存在kafka/logs/mirrormaker.out 中;
          1. ./kafka-run-class.sh -daemon -name mirror_maker -loggc kafka.tools.MirrorMaker--consumer.config consumer.properties --num.streams 2--producer.config producer.properties --whitelist='testnet'

          • 積壓監(jiān)控:

          0.10版本的積壓量監(jiān)控:
          1. ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker--zookeeper xxxx:21810,xxx:21810,xxx:21810--topic testnet -group testnet-group

          1.0版本的積壓量監(jiān)控:
          1. ./kafka-consumer-groups.sh --bootstrap-server xxx:9092--describe --group testnet-group

          進程數(shù)監(jiān)控:建議增加mirror-maker的進程數(shù)監(jiān)控,及時發(fā)現(xiàn)并啟動掛點進程;
          1. #!/bin/bash

          2. ###################

          3. #

          4. # info :5 mins to check last 5mins logs

          5. # add by deploy

          6. # date:20190917

          7. #

          8. ###################


          9. #當前時間

          10. sj=`date "+%F %T"`

          11. #當前時間5分鐘前

          12. last_sj=`date "+%F %T" -d '-5 min'`

          13. #定義目錄

          14. runlog=~/kafka_2.11-1.0.0/alarm/run.log

          15. #通知手機號

          16. noticetel="138XXXXXXXX"

          17. province=~/kafka_2.11-1.0.0/alarm/province.cfg

          18. tmplog=~/kafka_2.11-1.0.0/alarm/tmp.log


          19. ###短信通知,也可以使用郵箱通知服務

          20. smsnotice(){

          21. info=$@

          22. IFS=","

          23. for i in $noticetel;do

          24. curl -kd xx

          25. #curl -D - -kd xx

          26. done

          27. }



          28. ###判斷mirror-maker的進程個數(shù);

          29. province_all=`cat ${province}|wc -l`

          30. mount=`ps -ef|grep -i mirror_maker-gc |wc -l`


          31. ps -ef|grep -i mirror_maker-gc >${tmplog}


          32. echo "the mount of mirror-maker is `expr $mount - 1`!"> $runlog

          33. echo "the mount of province config is $province_all ! ">> $runlog

          34. if[ `expr $mount - 1`-ge $province_all ] ;then

          35. echo "`hostname -i` ----${sj} ---- the mirrormaker is ok!">> $runlog

          36. else

          37. message="`hostname -i` ----${sj} ----the mount mirror-maker processor `expr $mount - 1` is less than the mount of province_config $province_all, "

          38. echo ${message} >> $runlog

          39. while read line

          40. do

          41. province_name=`echo ${line}|awk -F '|' '{print $1}'`

          42. province_code=`echo ${line}|awk -F '|' '{print $2}'`

          43. mount_two=`cat ${tmplog}|grep -i ${province_code} |wc -l`


          44. if[ $mount_two -ge 1] ;then

          45. echo "`hostname -i` ----${sj} ---- the province of ${province_name} is ok!">> $runlog

          46. else

          47. message_two="${message} the province of [ ${province_name} ] mirror-maker processor is down, please check for it!"

          48. echo ${message_two} >> $runlog

          49. smsnotice ${message_two}

          50. fi

          51. done<${province}

          52. fi



          長按掃碼添加“Python小助手”?

          進入?P Y 交 流 群

          ▼點擊成為社區(qū)會員? ?喜歡就點個在看吧

          瀏覽 50
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲狠狠操 | 高中学生妹毛片 | 老鸭窝在线视频狂综合 | 国产一级 片内射视频播放蘑菇 | 91久久久久无码精品国产麻豆 |