<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink實戰(zhàn) - Binlog日志并對接Kafka實戰(zhàn)

          共 6747字,需瀏覽 14分鐘

           ·

          2020-12-31 09:51

          點擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)

          回復(fù)”資源“獲取更多資源

          大數(shù)據(jù)技術(shù)與架構(gòu)
          點擊右側(cè)關(guān)注,大數(shù)據(jù)開發(fā)領(lǐng)域最強公眾號!

          大數(shù)據(jù)真好玩
          點擊右側(cè)關(guān)注,大數(shù)據(jù)真好玩!


          對于 Flink 數(shù)據(jù)流的處理,一般都是去直接監(jiān)控 xxx.log 日志的數(shù)據(jù),至于如何實現(xiàn)關(guān)系型數(shù)據(jù)庫數(shù)據(jù)的同步的話網(wǎng)上基本沒啥多少可用性的文章,基于項目的需求,經(jīng)過一段時間的研究終于還是弄出來了,寫這篇文章主要是以中介的方式記錄下來,也希望能幫助到在做關(guān)系型數(shù)據(jù)庫的實時計算處理流的初學(xué)者。

          一、設(shè)計流程圖

          二、MySQL 的 Binlog 日志的設(shè)置

          找到 MySQL 的配置文件并編輯:

          [root@localhost etc]# vim /etc/my.cnf
          [mysqld]
          # 其它配置省略。。。。。。

          lower_case_table_names=1
          ## Replication
          server_id =2020041006 # 唯一
          log_bin =mysql-bin-1 # 唯一
          relay_log_recovery =1
          binlog_format =row # 格式必須是 row,否則 ogg 監(jiān)控有問題
          master_info_repository =TABLE
          relay_log_info_repository =TABLE
          #rpl_semi_sync_master_enabled =1
          rpl_semi_sync_master_timeout =1000
          rpl_semi_sync_slave_enabled =1
          binlog-do-db =dsout # 要生成binlog 的數(shù)據(jù)庫
          sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES

          這里注意的是配置完 my.cnf 文件之后要重啟 MySQL 服務(wù)器才能生效。查看配置的 狀態(tài) 和 serverid 命令請參見這篇文章:

          三、下載 OGG 并安裝部署

          下載地址:https://www.oracle.com/middleware/technologies/goldengate-downloads.html

          1.下載下來的壓縮包解壓并放入指定的文件夾中去

          mkdir -p /opt/module/ogg/oggservice
          tar -xvf ggs_Linux_x64_MySQL_64bit.tar -C /opt/module/ogg/oggservice/
          chown -R root:root oggservice/ # 授權(quán)成指定的用戶及用戶組

          2.進(jìn)入ogg并啟動

          cd oggservice/
          ./ggsci

          3.源系統(tǒng)的操作步驟及配置信息如下:

          GGSCI (localhost.localdomain) 1> create subdirs   # 創(chuàng)建目錄
          GGSCI (localhost.localdomain) 3> dblogin sourcedb [email protected]:3306,userid 用戶,password 密碼; # 監(jiān)控日志
          GGSCI (localhost.localdomain) 3> edit params mgr
          port 7015
          AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
          PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

          GGSCI (localhost.localdomain) 4> edit params ext1 # 抽取進(jìn)程
          EXTRACT ext1
          setenv (MYSQL_HOME="/usr/local/mysql")
          dboptions host 192.168.x.xx:3306, connectionport 3306
          tranlogoptions altlogdest /usr/local/mysql/data/mysql-bin-1.index
          SOURCEDB [email protected]:3306,userid 用戶,password 密碼
          EXTTRAIL ./dirdat/et
          dynamicresolution
          GETUPDATEBEFORES
          NOCOMPRESSDELETES
          NOCOMPRESSUPDATES
          table dsout.employees;
          table dsout.departments;

          GGSCI (localhost.localdomain) 5> edit params pump1 # 推送進(jìn)程
          EXTRACT pump1
          SOURCEDB [email protected]:3306,userid 用戶,password 密碼
          RMTHOST 目標(biāo)服務(wù)器的IP地址, MGRPORT 2021
          RMTTRAIL ./dirdat/xd
          table dsout.*; # 要推送的表

          #為數(shù)據(jù)庫的binlog添加監(jiān)控和推送進(jìn)程
          GGSCI (localhost.localdomain DBLOGIN as dsout) 8> add extract ext1, tranlog,begin now
          GGSCI (localhost.localdomain DBLOGIN as dsout) 9> add exttrail ./dirdat/et, extract ext1
          GGSCI (localhost.localdomain DBLOGIN as dsout) 10> add extract pump1, exttrailsource ./dirdat/et
          GGSCI (localhost.localdomain DBLOGIN as dsout) 11> add rmttrail ./dirdat/rt,extract pump1

          # 配置 defgen 進(jìn)程
          GGSCI (localhost.localdomain) 6> edit param defgen
          defsfile ./dirdef/defgen.def
          sourcedb [email protected]:3306,userid 用戶,password 密碼
          table dsout.*;

          # 生成 defgen.prm 文件
          [mysql@localhost oggformysql]$ ./defgen paramfile ./dirprm/defgen.prm

          4.進(jìn)入 ogg 查看各個配置的服務(wù)進(jìn)程:

          GGSCI (localhost.localdomain) 5> info all

          效果圖如下:

          到此為止源系統(tǒng)的ogg已經(jīng)配置完成,接下來我們要在目標(biāo)端配置接收到的數(shù)據(jù)將其以 json 的形式發(fā)送到 kafka。

          5.解壓并授權(quán)

          mkdir -p /opt/module/ogg/oggservice
          tar -xvf OGG_BigData_Linux_x64_19.1.0.0.1.tar -C /opt/module/ogg/oggservice/
          chown -R root:root oggservice/

          6.配置依賴包

          find / -name libjvm.so

          vim ~/.bash_profile
          export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b03-1.el7.x86_64/jre/lib/amd64/server/

          source ~/.bash_profile

          7.啟動并配置相關(guān)進(jìn)程

          cd oggservice/
          ./ggsci

          GGSCI (cdh102) 1> create subdirs

          GGSCI (cdh102) 1> edit param mgr # 配置主進(jìn)程
          PORT 2021
          ACCESSRULE, PROG *, IPADDR *, ALLOW

          GGSCI (cdh102) 2> edit param rep2 # 配置復(fù)制進(jìn)程
          replicat rep2
          sourcedefs ./dirdef/defgen.def
          TARGETDB LIBFILE libggjava.so SET property=./dirprm/kafkaxd.props
          MAP dsout.*, TARGET dsout.*;

          # (注意,這里的exttrail必須和源端的dump配置一致)
          GGSCI (cdh102) 5> add replicat rep2, exttrail ./dirdat/rt

          8.創(chuàng)建對接 kafka的配置文件

          cd ./dirprm

          [root@cdh102 dirprm]# vim kafkaxd.props # -> 配置文件內(nèi)容如下

          gg.handlerlist = kafkahandler
          gg.handler.kafkahandler.type=kafka
          gg.handler.kafkahandler.KafkaProducerConfigFile=xindai_kafka_producer.properties # kafka 生產(chǎn)者屬性文件
          #######The following resolves the topic name using the short table name
          gg.handler.kafkahandler.topicMappingTemplate=xindai-topic # 主題
          ############The following selects the message key using the concatenated primary keys
          ############gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}
          ###########gg.handler.kafkahandler.format=avro_op
          gg.handler.kafkahandler.SchemaTopicName=xindai-topic # 主題
          gg.handler.kafkahandler.BlockingSend =false
          gg.handler.kafkahandler.includeTokens=false
          gg.handler.kafkahandler.mode=op
          gg.handler.kafkahandler.format=json
          #########gg.handler.kafkahandler.format.insertOpKey=I
          #######gg.handler.kafkahandler.format.updateOpKey=U
          #########gg.handler.kafkahandler.format.deleteOpKey=D
          #######gg.handler.kafkahandler.format.truncateOpKey=T
          goldengate.userexit.writers=javawriter
          javawriter.stats.display=TRUE
          javawriter.stats.full=TRUE
          gg.log=log4j
          gg.log.level=INFO
          gg.report.time=30sec
          ##########Sample gg.classpath for Apache Kafka 這里一定要指定kafka依賴包的路徑
          gg.classpath=dirprm/:/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/libs/*
          ##########Sample gg.classpath for HDP
          #########gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*
          javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

          9.配置 KafkaProducerConfigFile屬性文件

          [root@cdh102 dirprm]# vim xindai_kafka_producer.properties

          bootstrap.servers=cdh101:9092,cdh102:9092,cdh103:9092
          acks=1
          reconnect.backoff.ms=1000
          value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
          key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
          ######## 100KB per partition
          batch.size=16384
          linger.ms=0
          key.converter=org.apache.kafka.connect.json.JsonConverter
          value.converter=org.apache.kafka.connect.json.JsonConverter
          key.converter.schemas.enable=false
          value.converter.schemas.enable=false

          10.啟動進(jìn)程

          # 目標(biāo)端
          GGSCI (gpdata) 6> start mgr
          GGSCI (gpdata) 7> start rep2

          # 源端
          GGSCI (localhost.localdomain) 1> start mgr
          GGSCI (localhost.localdomain) 2> start ext1
          GGSCI (localhost.localdomain) 3> start pump1 # (先起目標(biāo)的 mgr 才不會報錯)

          效果圖如下:

          四、驗證

          1.啟動kafka消費者

          [root@cdh102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server cdh101:9092,cdh102:9092,cdh103:9092 --topic xindai-topic --from-beginning

          2.向庫的監(jiān)控表中對數(shù)據(jù)進(jìn)行增、刪、改 操作

          INSERT INTO employees VALUES('101','changyin',6666.66,'2020-05-05 16:12:20','syy01');
          INSERT INTO employees VALUES('102','siling',1234.12,'2020-05-05 16:12:20','syy01');

          3.查看Kafka消費者的數(shù)據(jù)

          到此,我們已經(jīng)成功的配置好了 使用 Ogg 監(jiān)控 MySQL - Binlog 日志,然后將數(shù)據(jù)以 Json 的形式傳給 Kafka 的消費者的整個流程;這是項目實踐中總結(jié)出來的,為了方便以后查詢,在此做了下記錄,希望也能幫到志同道合的同學(xué)們。

          版權(quán)聲明:

          本文為大數(shù)據(jù)技術(shù)與架構(gòu)整理,原作者獨家授權(quán)。未經(jīng)原作者允許轉(zhuǎn)載追究侵權(quán)責(zé)任。
          微信公眾號|import_bigdata

          編輯?《大數(shù)據(jù)技術(shù)與架構(gòu)》

          插畫?《大數(shù)據(jù)技術(shù)與架構(gòu)》

          文章鏈接?https://www.jianshu.com/u/b14730fd40bd



          歡迎點贊+收藏+轉(zhuǎn)發(fā)朋友圈素質(zhì)三連


          文章不錯?點個【在看】吧!??

          瀏覽 116
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  在线免费看柬埔寨大屌 | 做受 视频毛片丰满 | 国产精品久久久久久久久久久久久久久 | 人人干人人操人人干 | 俺来也俺去也www色官网 |