<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          TiDB 實(shí)踐 | Flink 最佳實(shí)踐之使用 Canal 同步 MySQL 數(shù)據(jù)至 TiDB

          共 28636字,需瀏覽 58分鐘

           ·

          2021-05-14 11:14


          摘要:本文將介紹如何將 MySQL 中的數(shù)據(jù),通過(guò) Binlog + Canal 的形式導(dǎo)入到 Kafka 中,繼而被 Flink 消費(fèi)的案例。內(nèi)容包括

          1. 背景介紹

          2. 環(huán)境介紹

          3. 部署 TiDB Cluster

          4. 部署 Zookeeper 環(huán)境

          5. 部署 Kafka

          6. 部署 Flink

          7. 部署 MySQL

          8. 部署 Canal

          9. 配置數(shù)據(jù)流向


          Tips:點(diǎn)擊文末閱讀原文查看更多技術(shù)干貨~


           GitHub 地址 
          https://github.com/apache/flink
          歡迎大家給 Flink 點(diǎn)贊送 star~


          一、背景介紹


          為了能夠快速的驗(yàn)證整套流程的功能性,所有的組件都以單機(jī)的形式部署。如果手上的物理資源不足,可以將本文中的所有組件搭建在一臺(tái) 4G 1U 的虛擬機(jī)環(huán)境中。


          如果需要在生產(chǎn)環(huán)境中部署,建議將每一個(gè)組件替換成高可用的集群部署方案。

          其中,我們單獨(dú)創(chuàng)建了一套 Zookeeper 單節(jié)點(diǎn)環(huán)境,F(xiàn)link、Kafka、Canal 等組件共用這個(gè) Zookeeper 環(huán)境。


          針對(duì)于所有需要 JRE 的組件,如 Flink,Kafka,Canal,Zookeeper,考慮到升級(jí) JRE 可能會(huì)影響到其他的應(yīng)用,我們選擇每個(gè)組件獨(dú)立使用自己的 JRE 環(huán)境。


          本文分為兩個(gè)部分,其中,前七小節(jié)主要介紹基礎(chǔ)環(huán)境的搭建,最后一個(gè)小節(jié)介紹了數(shù)據(jù)是如何在各個(gè)組件中流通的。




          數(shù)據(jù)的流動(dòng)經(jīng)過(guò)以下組件:


          • MySQL 數(shù)據(jù)源生成 Binlog。


          • Canal 讀取 Binlog,生成 Canal json,推送到 Kafka 指定的 Topic 中。


          • Flink 使用 flink-sql-connector-kafka API,消費(fèi) Kafka Topic 中的數(shù)據(jù)。


          • Flink 在通過(guò) flink-connector-jdbc,將數(shù)據(jù)寫(xiě)入到 TiDB 中。


          TiDB + Flink 的結(jié)構(gòu),支持開(kāi)發(fā)與運(yùn)行多種不同種類(lèi)的應(yīng)用程序。


          目前主要的特性主要包括:


          • 批流一體化。


          • 精密的狀態(tài)管理。


          • 事件時(shí)間支持。


          • 精確的一次狀態(tài)一致性保障。


          Flink 可以運(yùn)行在包括 YARN、Mesos、Kubernetes 在內(nèi)的多種資源管理框架上,還支持裸機(jī)集群上獨(dú)立部署。TiDB 可以部署 AWS、Kubernetes、GCP GKE 上,同時(shí)也支持使用 TiUP 在裸機(jī)集群上獨(dú)立部署。


          TiDB + Flink 結(jié)構(gòu)常見(jiàn)的幾類(lèi)應(yīng)用如下:


          • 事件驅(qū)動(dòng)型應(yīng)用:


            • 反欺詐。

            • 異常檢測(cè)。

            • 基于規(guī)則的報(bào)警。

            • 業(yè)務(wù)流程監(jiān)控。


          • 數(shù)據(jù)分析應(yīng)用:


            • 網(wǎng)絡(luò)質(zhì)量監(jiān)控。

            • 產(chǎn)品更新及試驗(yàn)評(píng)估分析。

            • 事實(shí)數(shù)據(jù)即席分析。

            • 大規(guī)模圖分析。


          • 數(shù)據(jù)管道應(yīng)用:


            • 電商實(shí)時(shí)查詢(xún)索引構(gòu)建。

            • 電商持續(xù) ETL。


          二、環(huán)境介紹

          2.1 操作系統(tǒng)環(huán)境


          [root@r20 topology]# cat /etc/redhat-releaseCentOS Stream release 8


          2.2 軟件環(huán)境


          ItemVersionDownload link
          TiDBv4.0.9https://download.pingcap.org/tidb-community-server-v4.0.9-linux-amd64.tar.gz 1
          Kafkav2.7.0https://mirrors.bfsu.edu.cn/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz
          Flinkv1.12.1https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.12.1/flink-1.12.1-bin-scala_2.11.tgz
          Jrev1.8.0_281https://javadl.oracle.com/webapps/download/AutoDL?BundleId=244058_89d678f2be164786b292527658ca1605
          Zookeeperv3.6.2https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
          flink-sql-connector-kafkav1.12.1https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.12.0/flink-sql-connector-kafka_2.12-1.12.0.jar
          flink-connector-jdbcv1.12.0https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.12.0/flink-connector-jdbc_2.12-1.12.0.jar
          MySQLv8.0.23https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.23-linux-glibc2.12-x86_64.tar.xz
          Canalv1.1.4https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz


          2.3 機(jī)器分配


          HostnameIPComponent

          r21

          192.168.12.21

          TiDB Cluster

          r22

          192.168.12.22

          Kafka

          r23

          192.168.12.23

          Flink

          r24

          192.168.12.24

          Zookeeper

          r25

          192.168.12.25

          MySQL

          r26

          192.168.12.26

          Canal


          三、部署 TiDB Cluster


          與傳統(tǒng)的單機(jī)數(shù)據(jù)庫(kù)相比,TiDB 具有以下優(yōu)勢(shì):


          • 純分布式架構(gòu),擁有良好的擴(kuò)展性,支持彈性的擴(kuò)縮容。


          • 支持 SQL,對(duì)外暴露 MySQL 的網(wǎng)絡(luò)協(xié)議,并兼容大多數(shù) MySQL 的語(yǔ)法,在大多數(shù)場(chǎng)景下可以直接替換 MySQL。


          • 默認(rèn)支持高可用,在少數(shù)副本失效的情況下,數(shù)據(jù)庫(kù)本身能夠自動(dòng)進(jìn)行數(shù)據(jù)修復(fù)和故障轉(zhuǎn)移,對(duì)業(yè)務(wù)透明。


          • 支持 ACID 事務(wù),對(duì)于一些有強(qiáng)一致需求的場(chǎng)景友好,例如:銀行轉(zhuǎn)賬。


          • 具有豐富的工具鏈生態(tài),覆蓋數(shù)據(jù)遷移、同步、備份等多種場(chǎng)景。


          在內(nèi)核設(shè)計(jì)上,TiDB 分布式數(shù)據(jù)庫(kù)將整體架構(gòu)拆分成了多個(gè)模塊,各模塊之間互相通信,組成完整的 TiDB 系統(tǒng)。對(duì)應(yīng)的架構(gòu)圖如下:



          在本文中,我們只做最簡(jiǎn)單的功能測(cè)試,所以部署了一套單節(jié)點(diǎn)但副本的 TiDB,涉及到了以下的三個(gè)模塊:


          • TiDB Server:SQL 層,對(duì)外暴露 MySQL 協(xié)議的連接 endpoint,負(fù)責(zé)接受客戶(hù)端的連接,執(zhí)行 SQL 解析和優(yōu)化,最終生成分布式執(zhí)行計(jì)劃。


          • PD (Placement Driver) Server:整個(gè) TiDB 集群的元信息管理模塊,負(fù)責(zé)存儲(chǔ)每個(gè) TiKV 節(jié)點(diǎn)實(shí)時(shí)的數(shù)據(jù)分布情況和集群的整體拓?fù)浣Y(jié)構(gòu),提供 TiDB Dashboard 管控界面,并為分布式事務(wù)分配事務(wù) ID。


          • TiKV Server:負(fù)責(zé)存儲(chǔ)數(shù)據(jù),從外部看 TiKV 是一個(gè)分布式的提供事務(wù)的 Key-Value 存儲(chǔ)引擎。


          3.1 TiUP 部署模板文件


          # # Global variables are applied to all deployments and used as the default value of# # the deployments if a specific deployment value is missing.global:  user: "tidb"  ssh_port: 22  deploy_dir: "/opt/tidb-c1/"  data_dir: "/opt/tidb-c1/data/"# # Monitored variables are applied to all the machines.#monitored:#  node_exporter_port: 19100#  blackbox_exporter_port: 39115#  deploy_dir: "/opt/tidb-c3/monitored"#  data_dir: "/opt/tidb-c3/data/monitored"#  log_dir: "/opt/tidb-c3/log/monitored"# # Server configs are used to specify the runtime configuration of TiDB components.# # All configuration items can be found in TiDB docs:# # - TiDB: https://pingcap.com/docs/stable/reference/configuration/tidb-server/configuration-file/# # - TiKV: https://pingcap.com/docs/stable/reference/configuration/tikv-server/configuration-file/# # - PD: https://pingcap.com/docs/stable/reference/configuration/pd-server/configuration-file/# # All configuration items use points to represent the hierarchy, e.g:# #   readpool.storage.use-unified-pool# ## # You can overwrite this configuration via the instance-level `config` field.server_configs:  tidb:    log.slow-threshold: 300    binlog.enable: false    binlog.ignore-error: false    tikv-client.copr-cache.enable: true  tikv:    server.grpc-concurrency: 4    raftstore.apply-pool-size: 2    raftstore.store-pool-size: 2    rocksdb.max-sub-compactions: 1    storage.block-cache.capacity: "16GB"    readpool.unified.max-thread-count: 12    readpool.storage.use-unified-pool: false    readpool.coprocessor.use-unified-pool: true    raftdb.rate-bytes-per-sec: 0  pd:    schedule.leader-schedule-limit: 4    schedule.region-schedule-limit: 2048    schedule.replica-schedule-limit: 64pd_servers:  - host: 192.168.12.21    ssh_port: 22    name: "pd-2"    client_port: 12379    peer_port: 12380    deploy_dir: "/opt/tidb-c1/pd-12379"    data_dir: "/opt/tidb-c1/data/pd-12379"    log_dir: "/opt/tidb-c1/log/pd-12379"    numa_node: "0"    # # The following configs are used to overwrite the `server_configs.pd` values.    config:      schedule.max-merge-region-size: 20      schedule.max-merge-region-keys: 200000tidb_servers:  - host: 192.168.12.21    ssh_port: 22    port: 14000    status_port: 12080    deploy_dir: "/opt/tidb-c1/tidb-14000"    log_dir: "/opt/tidb-c1/log/tidb-14000"    numa_node: "0"    # # The following configs are used to overwrite the `server_configs.tidb` values.    config:      log.slow-query-file: tidb-slow-overwrited.log      tikv-client.copr-cache.enable: truetikv_servers:  - host: 192.168.12.21    ssh_port: 22    port: 12160    status_port: 12180    deploy_dir: "/opt/tidb-c1/tikv-12160"    data_dir: "/opt/tidb-c1/data/tikv-12160"    log_dir: "/opt/tidb-c1/log/tikv-12160"    numa_node: "0"    # # The following configs are used to overwrite the `server_configs.tikv` values.    config:      server.grpc-concurrency: 4      #server.labels: { zone: "zone1", dc: "dc1", host: "host1" }#monitoring_servers:#  - host: 192.168.12.21#    ssh_port: 22#    port: 19090#    deploy_dir: "/opt/tidb-c1/prometheus-19090"#    data_dir: "/opt/tidb-c1/data/prometheus-19090"#    log_dir: "/opt/tidb-c1/log/prometheus-19090"#grafana_servers:#  - host: 192.168.12.21#    port: 13000#    deploy_dir: "/opt/tidb-c1/grafana-13000"#alertmanager_servers:#  - host: 192.168.12.21#    ssh_port: 22#    web_port: 19093#    cluster_port: 19094#    deploy_dir: "/opt/tidb-c1/alertmanager-19093"#    data_dir: "/opt/tidb-c1/data/alertmanager-19093"#    log_dir: "/opt/tidb-c1/log/alertmanager-19093"


          3.2 TiDB Cluster 環(huán)境


          本文重點(diǎn)并非部署 TiDB Cluster,作為快速實(shí)驗(yàn)環(huán)境,只在一臺(tái)機(jī)器上部署單副本的 TiDB Cluster 集群。不需要部署監(jiān)控環(huán)境。


          [root@r20 topology]# tiup cluster display tidb-c1-v409Starting component `cluster`: /root/.tiup/components/cluster/v1.3.2/tiup-cluster display tidb-c1-v409Cluster type:       tidbCluster name:       tidb-c1-v409Cluster version:    v4.0.9SSH type:           builtinDashboard URL:      http://192.168.12.21:12379/dashboardID                   Role  Host           Ports        OS/Arch       Status   Data Dir                      Deploy Dir--                   ----  ----           -----        -------       ------   --------                      ----------192.168.12.21:12379  pd    192.168.12.21  12379/12380  linux/x86_64  Up|L|UI  /opt/tidb-c1/data/pd-12379    /opt/tidb-c1/pd-12379192.168.12.21:14000  tidb  192.168.12.21  14000/12080  linux/x86_64  Up       -                             /opt/tidb-c1/tidb-14000192.168.12.21:12160  tikv  192.168.12.21  12160/12180  linux/x86_64  Up       /opt/tidb-c1/data/tikv-12160  /opt/tidb-c1/tikv-12160Total nodes: 4創(chuàng)建用于測(cè)試的表mysql> show create table t1;+-------+-------------------------------------------------------------------------------------------------------------------------------+| Table | Create Table                                                                                                                  |+-------+-------------------------------------------------------------------------------------------------------------------------------+| t1    | CREATE TABLE `t1` (  `id` int(11) NOT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin |+-------+-------------------------------------------------------------------------------------------------------------------------------+1 row in set (0.00 sec)


          四、部署 Zookeeper 環(huán)境


          在本實(shí)驗(yàn)中單獨(dú)配置 Zookeeper 環(huán)境,為 Kafka 和 Flink 環(huán)境提供服務(wù)。

          作為實(shí)驗(yàn)演示方案,只部署單機(jī)環(huán)境。


          4.1 解壓 Zookeeper 包


          [root@r24 soft]# tar vxzf apache-zookeeper-3.6.2-bin.tar.gz[root@r24 soft]# mv apache-zookeeper-3.6.2-bin /opt/zookeeper


          4.2 部署用于 Zookeeper 的 jre


          [root@r24 soft]# tar vxzf jre1.8.0_281.tar.gz[root@r24 soft]# mv jre1.8.0_281 /opt/zookeeper/jre


          修改 /opt/zookeeper/bin/zkEnv.sh 文件,增加 JAVA_HOME 環(huán)境變量


          ## add bellowing env var in the head of zkEnv.shJAVA_HOME=/opt/zookeeper/jre


          4.3 創(chuàng)建 Zookeeper 的配置文件


          [root@r24 conf]# cat zoo.cfg | grep -v "#"tickTime=2000initLimit=10syncLimit=5dataDir=/opt/zookeeper/dataclientPort=2181


          4.4 啟動(dòng) Zookeeper


          [root@r24 bin]# /opt/zookeeper/bin/zkServer.sh start


          4.5 檢查 Zookeeper 的狀態(tài)


          ## check zk status[root@r24 bin]# ./zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /opt/zookeeper/bin/../conf/zoo.cfgClient port found: 2181. Client address: localhost. Client SSL: false.Mode: standalone## check OS port status[root@r24 bin]# netstat -ntlpActive Internet connections (only servers)Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program nametcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      942/sshdtcp6       0      0 :::2181                 :::*                    LISTEN      15062/javatcp6       0      0 :::8080                 :::*                    LISTEN      15062/javatcp6       0      0 :::22                   :::*                    LISTEN      942/sshdtcp6       0      0 :::44505                :::*                    LISTEN      15062/java## use zkCli tool to check zk connection[root@r24 bin]# ./zkCli.sh -server 192.168.12.24:2181


          4.6 關(guān)于 Zookeeper 的建議


          我個(gè)人有一個(gè)關(guān)于 Zookeeper 的不成熟的小建議:


          Zookeeper 集群版本一定要開(kāi)啟網(wǎng)絡(luò)監(jiān)控。特別是要關(guān)注 system metrics 里面的 network bandwidth。


          五、部署 Kafka


          Kafka 是一個(gè)分布式流處理平臺(tái),主要應(yīng)用于兩大類(lèi)的應(yīng)用中:


          • 構(gòu)造實(shí)時(shí)流數(shù)據(jù)管道,它可以在系統(tǒng)或應(yīng)用之間可靠地獲取數(shù)據(jù)。(相當(dāng)于message queue)


          • 構(gòu)建實(shí)時(shí)流式應(yīng)用程序,對(duì)這些流數(shù)據(jù)進(jìn)行轉(zhuǎn)換或者影響。(就是流處理,通過(guò)kafka stream topic和topic之間內(nèi)部進(jìn)行變化)



          Kafka 有四個(gè)核心的 API:


          • The Producer API 允許一個(gè)應(yīng)用程序發(fā)布一串流式的數(shù)據(jù)到一個(gè)或者多個(gè)Kafka topic。


          • The Consumer API 允許一個(gè)應(yīng)用程序訂閱一個(gè)或多個(gè) topic ,并且對(duì)發(fā)布給他們的流式數(shù)據(jù)進(jìn)行處理。


          • The Streams API 允許一個(gè)應(yīng)用程序作為一個(gè)流處理器,消費(fèi)一個(gè)或者多個(gè)topic產(chǎn)生的輸入流,然后生產(chǎn)一個(gè)輸出流到一個(gè)或多個(gè)topic中去,在輸入輸出流中進(jìn)行有效的轉(zhuǎn)換。


          • The Connector API 允許構(gòu)建并運(yùn)行可重用的生產(chǎn)者或者消費(fèi)者,將Kafka topics連接到已存在的應(yīng)用程序或者數(shù)據(jù)系統(tǒng)。比如,連接到一個(gè)關(guān)系型數(shù)據(jù)庫(kù),捕捉表(table)的所有變更內(nèi)容。


          在本實(shí)驗(yàn)中只做功能性驗(yàn)證,只搭建一個(gè)單機(jī)版的 Kafka 環(huán)境。


          5.1 下載并解壓 Kafka


          [root@r22 soft]# tar vxzf kafka_2.13-2.7.0.tgz[root@r22 soft]# mv kafka_2.13-2.7.0 /opt/kafka


          5.2 部署用于 Kafka 的 jre


          [root@r22 soft]# tar vxzf jre1.8.0_281.tar.gz[root@r22 soft]# mv jre1.8.0_281 /opt/kafka/jre


          修改 Kafka 的 jre 環(huán)境變量


          [root@r22 bin]# vim /opt/kafka/bin/kafka-run-class.sh## add bellowing line in the head of kafka-run-class.shJAVA_HOME=/opt/kafka/jre


          5.3 修改 Kafka 配置文件


          修改 Kafka 配置文件 /opt/kafka/config/server.properties


          ## change bellowing variable in /opt/kafka/config/server.propertiesbroker.id=0listeners=PLAINTEXT://192.168.12.22:9092log.dirs=/opt/kafka/logszookeeper.connect=i192.168.12.24:2181


          5.4 啟動(dòng) Kafka


          [root@r22 bin]# /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties


          5.5 查看 Kafka 的版本信息


          Kafka 并沒(méi)有提供 --version 的 optional 來(lái)查看 Kafka 的版本信息。


          [root@r22 ~]# ll /opt/kafka/libs/ | grep kafka-rw-r--r-- 1 root root  4929521 Dec 16 09:02 kafka_2.13-2.7.0.jar-rw-r--r-- 1 root root      821 Dec 16 09:03 kafka_2.13-2.7.0.jar.asc-rw-r--r-- 1 root root    41793 Dec 16 09:02 kafka_2.13-2.7.0-javadoc.jar-rw-r--r-- 1 root root      821 Dec 16 09:03 kafka_2.13-2.7.0-javadoc.jar.asc-rw-r--r-- 1 root root   892036 Dec 16 09:02 kafka_2.13-2.7.0-sources.jar-rw-r--r-- 1 root root      821 Dec 16 09:03 kafka_2.13-2.7.0-sources.jar.asc... ...


          其中 2.13 是 scale 的版本信息,2.7.0 是 Kafka 的版本信息。


          六、部署 Flink


          Apache Flink 是一個(gè)框架和分布式處理引擎,用于在無(wú)邊界和有邊界數(shù)據(jù)流上進(jìn)行有狀態(tài)的計(jì)算。Flink 能在所有常見(jiàn)集群環(huán)境中運(yùn)行,并能以?xún)?nèi)存速度和任意規(guī)模進(jìn)行計(jì)算。


          支持高吞吐、低延遲、高性能的分布式處理框架 Apache Flink 是一個(gè)框架和分布式處理引擎,用于對(duì)無(wú)界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計(jì)算。Flink被設(shè)計(jì)在所有常見(jiàn)的集群環(huán)境中運(yùn)行,以?xún)?nèi)存執(zhí)行速度和任意規(guī)模來(lái)執(zhí)行計(jì)算。



          本實(shí)驗(yàn)只做功能性測(cè)試,僅部署單機(jī) Flink 環(huán)境。


          6.1 下載并分發(fā) Flink


          [root@r23 soft]# tar vxzf flink-1.12.1-bin-scala_2.11.tgz[root@r23 soft]# mv flink-1.12.1 /opt/flink


          6.2 部署 Flink 的 jre


          [root@r23 soft]# tar vxzf jre1.8.0_281.tar.gz[root@r23 soft]# mv jre1.8.0_281 /opt/flink/jre


          6.3 添加 Flink 需要的 lib


          Flink 消費(fèi) Kafka 數(shù)據(jù),需要 flink-sql-connector-kafka 包。


          Flink 鏈接 MySQL/TiDB,需要 flink-connector-jdbc 包。


          [root@r23 soft]# mv flink-sql-connector-kafka_2.12-1.12.0.jar /opt/flink/lib/[root@r23 soft]# mv flink-connector-jdbc_2.12-1.12.0.jar /opt/flink/lib/


          6.4 修改 Flink 配置文件


          ## add or modify bellowing lines in /opt/flink/conf/flink-conf.yamljobmanager.rpc.address: 192.168.12.23env.java.home: /opt/flink/jre


          6.5 啟動(dòng) Flink


          [root@r23 ~]# /opt/flink/bin/start-cluster.shStarting cluster.Starting standalonesession daemon on host r23.Starting taskexecutor daemon on host r23.


          6.6 查看 Flink GUI



          七、部署 MySQL


          7.1 解壓 MySQL package


          [root@r25 soft]# tar vxf mysql-8.0.23-linux-glibc2.12-x86_64.tar.xz[root@r25 soft]# mv mysql-8.0.23-linux-glibc2.12-x86_64 /opt/mysql/


          7.2 創(chuàng)建 MySQL Service 文件


          [root@r25 ~]# touch /opt/mysql/support-files/mysqld.service[root@r25 support-files]# cat mysqld.service[Unit]Description=MySQL 8.0 database serverAfter=syslog.targetAfter=network.target[Service]Type=simpleUser=mysqlGroup=mysql#ExecStartPre=/usr/libexec/mysql-check-socket#ExecStartPre=/usr/libexec/mysql-prepare-db-dir %n# Note: we set --basedir to prevent probes that might trigger SELinux alarms,# per bug #547485ExecStart=/opt/mysql/bin/mysqld_safe#ExecStartPost=/opt/mysql/bin/mysql-check-upgrade#ExecStopPost=/opt/mysql/bin/mysql-wait-stop# Give a reasonable amount of time for the server to start up/shut downTimeoutSec=300# Place temp files in a secure directory, not /tmpPrivateTmp=trueRestart=on-failureRestartPreventExitStatus=1# Sets open_files_limitLimitNOFILE = 10000# Set enviroment variable MYSQLD_PARENT_PID. This is required for SQL restart command.Environment=MYSQLD_PARENT_PID=1[Install]WantedBy=multi-user.target## copy mysqld.service to /usr/lib/systemd/system/[root@r25 support-files]# cp mysqld.service  /usr/lib/systemd/system/


          7.3 創(chuàng)建 my.cnf 文件


          [root@r34 opt]# cat /etc/my.cnf[mysqld]port=3306basedir=/opt/mysqldatadir=/opt/mysql/datasocket=/opt/mysql/data/mysql.socketmax_connections = 100default-storage-engine = InnoDBcharacter-set-server=utf8log-error = /opt/mysql/log/error.logslow_query_log = 1long-query-time = 30slow_query_log_file = /opt/mysql/log/show.logmin_examined_row_limit = 1000log-slow-slave-statementslog-queries-not-using-indexes#skip-grant-tables


          7.4 初始化并啟動(dòng) MySQL


          [root@r25 ~]# /opt/mysql/bin/mysqld --initialize --user=mysql --console[root@r25 ~]# chown -R mysql:mysql /opt/mysql[root@r25 ~]# systemctl start mysqld## check mysql temp passord from /opt/mysql/log/error.log2021-02-24T02:45:47.316406Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: I?nDjijxa3>-


          7.5 創(chuàng)建一個(gè)新的 MySQL 用戶(hù)用以連接 Canal


          ## change mysql temp password firstlymysql> alter user 'root'@'localhost' identified by 'mysql';Query OK, 0 rows affected (0.00 sec)## create a management user 'root'@'%'mysql> create user 'root'@'%' identified by 'mysql';Query OK, 0 rows affected (0.01 sec)mysql> grant all privileges on *.* to 'root'@'%';Query OK, 0 rows affected (0.00 sec)## create a canal replication user 'canal'@'%'mysql> create user 'canal'@'%' identified by 'canal';Query OK, 0 rows affected (0.01 sec)mysql> grant select, replication slave, replication client on *.* to 'canal'@'%';Query OK, 0 rows affected (0.00 sec)mysql> flush privileges;Query OK, 0 rows affected (0.00 sec)


          7.6 在 MySQL 中創(chuàng)建用于測(cè)試的表


          mysql> show create table test.t2;+-------+----------------------------------------------------------------------------------+| Table | Create Table                                                                     |+-------+----------------------------------------------------------------------------------+| t2    | CREATE TABLE `t2` (  `id` int DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8 |+-------+----------------------------------------------------------------------------------+1 row in set (0.00 sec)


          八、部署 Canal


          Canal 主要用途是基于 MySQL 數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)。

          早期阿里巴巴因?yàn)楹贾莺兔绹?guó)雙機(jī)房部署,存在跨機(jī)房同步的業(yè)務(wù)需求,實(shí)現(xiàn)方式主要是基于業(yè)務(wù) trigger 獲取增量變更。


          從 2010 年開(kāi)始,業(yè)務(wù)逐步嘗試數(shù)據(jù)庫(kù)日志解析獲取增量變更進(jìn)行同步,由此衍生出了大量的數(shù)據(jù)庫(kù)增量訂閱和消費(fèi)業(yè)務(wù)。



          基于日志增量訂閱和消費(fèi)的業(yè)務(wù)包括:


          • 數(shù)據(jù)庫(kù)鏡像。


          • 數(shù)據(jù)庫(kù)實(shí)時(shí)備份。


          • 索引構(gòu)建和實(shí)時(shí)維護(hù)(拆分異構(gòu)索引、倒排索引等)。


          • 業(yè)務(wù) cache 刷新。


          • 帶業(yè)務(wù)邏輯的增量數(shù)據(jù)處理。


          當(dāng)前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。


          8.1 解壓 Canal 包


          [root@r26 soft]# mkdir /opt/canal && tar vxzf canal.deployer-1.1.4.tar.gz -C /opt/canal


          8.2 部署 Canal 的 jre


          [root@r26 soft]# tar vxzf jre1.8.0_281.tar.gz[root@r26 soft]# mv jre1.8.0_281 /opt/canal/jre## configue jre, add bellowing line in the head of /opt/canal/bin/startup.sh JAVA=/opt/canal/jre/bin/java


          8.3 修改 Canal 的配置文件


          修改 /opt/canal/conf/canal.properties 配置文件


          ## modify bellowing configurationcanal.zkServers =192.168.12.24:2181canal.serverMode = kafkacanal.destinations = example        ## 需要在 /opt/canal/conf 目錄下創(chuàng)建一個(gè) example 文件夾,用于存放 destination 的配置canal.mq.servers = 192.168.12.22:9092修改 /opt/canal/conf/example/instance.properties 配置文件## modify bellowing configurationcanal.instance.master.address=192.168.12.25:3306canal.instance.dbUsername=canalcanal.instance.dbPassword=canalcanal.instance.filter.regex=.*\\..*                    ## 過(guò)濾數(shù)據(jù)庫(kù)的表canal.mq.topic=canal-kafka


          九、配置數(shù)據(jù)流向


          9.1 MySQL Binlog -> Canal -> Kafka 通路


          ■ 9.1.1 查看 MySQL Binlog 信息


          查看 MySQL Binlog 信息,確保 Binlog 是正常的。


          mysql> show master status;+---------------+----------+--------------+------------------+-------------------+| File          | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |+---------------+----------+--------------+------------------+-------------------+| binlog.000001 |     2888 |              |                  |                   |+---------------+----------+--------------+------------------+-------------------+1 row in set (0.00 sec)


          ■ 9.1.2 在 Kafka 中創(chuàng)建一個(gè) Topic


          在 Kafka 中創(chuàng)建一個(gè) Topic canal-kafka,這個(gè)Topic 的名字要與 Canal 配置文件 /opt/canal/conf/example/instance.properties 中的 canal.mq.topic=canal-kafka 對(duì)應(yīng):


          [root@r22 kafka]# /opt/kafka/bin/kafka-topics.sh --create \> --zookeeper 192.168.12.24:2181 \> --config max.message.bytes=12800000 \> --config flush.messages=1 \> --replication-factor 1 \> --partitions 1 \> --topic canal-kafkaCreated topic canal-kafka.[2021-02-24 01:51:55,050] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(canal-kafka-0) (kafka.server.ReplicaFetcherManager)[2021-02-24 01:51:55,052] INFO [Log partition=canal-kafka-0, dir=/opt/kafka/logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)[2021-02-24 01:51:55,053] INFO Created log for partition canal-kafka-0 in /opt/kafka/logs/canal-kafka-0 with properties {compression.type -> producer, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 1, message.format.version -> 2.7-IV2, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 12800000, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)[2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] No checkpointed highwatermark is found for partition canal-kafka-0 (kafka.cluster.Partition)[2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] Log loaded for partition canal-kafka-0 with initial high watermark 0 (kafka.cluster.Partition)


          查看 Kafka 中所有的 Topic:


          [root@r22 kafka]# /opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.12.24:2181__consumer_offsetscanal-kafkaticdc-test


          查看 Kafka 中 Topic ticdc-test 的信息:


          [root@r22 ~]# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.12.24:2181  --topic canal-kafkaTopic: ticdc-test       PartitionCount: 1       ReplicationFactor: 1    Configs: max.message.bytes=12800000,flush.messages=1        Topic: ticdc-test       Partition: 0    Leader: 0       Replicas: 0     Isr: 0


          9.1.3 啟動(dòng) Canal


          在啟動(dòng) Canal 之前,需要在 Canal 節(jié)點(diǎn)上查看一下端口的情況:


          ## check MySQL 3306 port## canal.instance.master.address=192.168.12.25:3306[root@r26 bin]# telnet 192.168.12.25 3306## check Kafka 9092 port## canal.mq.servers = 192.168.12.22:9092[root@r26 bin]# telnet 192.168.12.22 9092## check zookeeper 2181 port## canal.zkServers = 192.168.12.24:2181[root@r26 bin]# telnet 192.168.12.24 2181


          啟動(dòng) Canal:


          [root@r26 bin]# /opt/canal/bin/startup.shcd to /opt/canal/bin for workaround relative pathLOG CONFIGURATION : /opt/canal/bin/../conf/logback.xmlcanal conf : /opt/canal/bin/../conf/canal.propertiesCLASSPATH :/opt/canal/bin/../conf:/opt/canal/bin/../lib/zookeeper-3.4.5.jar:/opt/canal/bin/../lib/zkclient-0.10.jar:/opt/canal/bin/../lib/spring-tx-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-orm-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-jdbc-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-expression-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-core-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-context-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-beans-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-aop-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/snappy-java-1.1.7.1.jar:/opt/canal/bin/../lib/snakeyaml-1.19.jar:/opt/canal/bin/../lib/slf4j-api-1.7.12.jar:/opt/canal/bin/../lib/simpleclient_pushgateway-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_httpserver-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_hotspot-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_common-0.4.0.jar:/opt/canal/bin/../lib/simpleclient-0.4.0.jar:/opt/canal/bin/../lib/scala-reflect-2.11.12.jar:/opt/canal/bin/../lib/scala-logging_2.11-3.8.0.jar:/opt/canal/bin/../lib/scala-library-2.11.12.jar:/opt/canal/bin/../lib/rocketmq-srvutil-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-remoting-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-logging-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-common-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-client-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-acl-4.5.2.jar:/opt/canal/bin/../lib/protobuf-java-3.6.1.jar:/opt/canal/bin/../lib/oro-2.0.8.jar:/opt/canal/bin/../lib/netty-tcnative-boringssl-static-1.1.33.Fork26.jar:/opt/canal/bin/../lib/netty-all-4.1.6.Final.jar:/opt/canal/bin/../lib/netty-3.2.2.Final.jar:/opt/canal/bin/../lib/mysql-connector-java-5.1.47.jar:/opt/canal/bin/../lib/metrics-core-2.2.0.jar:/opt/canal/bin/../lib/lz4-java-1.4.1.jar:/opt/canal/bin/../lib/logback-core-1.1.3.jar:/opt/canal/bin/../lib/logback-classic-1.1.3.jar:/opt/canal/bin/../lib/kafka-clients-1.1.1.jar:/opt/canal/bin/../lib/kafka_2.11-1.1.1.jar:/opt/canal/bin/../lib/jsr305-3.0.2.jar:/opt/canal/bin/../lib/jopt-simple-5.0.4.jar:/opt/canal/bin/../lib/jctools-core-2.1.2.jar:/opt/canal/bin/../lib/jcl-over-slf4j-1.7.12.jar:/opt/canal/bin/../lib/javax.annotation-api-1.3.2.jar:/opt/canal/bin/../lib/jackson-databind-2.9.6.jar:/opt/canal/bin/../lib/jackson-core-2.9.6.jar:/opt/canal/bin/../lib/jackson-annotations-2.9.0.jar:/opt/canal/bin/../lib/ibatis-sqlmap-2.3.4.726.jar:/opt/canal/bin/../lib/httpcore-4.4.3.jar:/opt/canal/bin/../lib/httpclient-4.5.1.jar:/opt/canal/bin/../lib/h2-1.4.196.jar:/opt/canal/bin/../lib/guava-18.0.jar:/opt/canal/bin/../lib/fastsql-2.0.0_preview_973.jar:/opt/canal/bin/../lib/fastjson-1.2.58.jar:/opt/canal/bin/../lib/druid-1.1.9.jar:/opt/canal/bin/../lib/disruptor-3.4.2.jar:/opt/canal/bin/../lib/commons-logging-1.1.3.jar:/opt/canal/bin/../lib/commons-lang3-3.4.jar:/opt/canal/bin/../lib/commons-lang-2.6.jar:/opt/canal/bin/../lib/commons-io-2.4.jar:/opt/canal/bin/../lib/commons-compress-1.9.jar:/opt/canal/bin/../lib/commons-codec-1.9.jar:/opt/canal/bin/../lib/commons-cli-1.2.jar:/opt/canal/bin/../lib/commons-beanutils-1.8.2.jar:/opt/canal/bin/../lib/canal.store-1.1.4.jar:/opt/canal/bin/../lib/canal.sink-1.1.4.jar:/opt/canal/bin/../lib/canal.server-1.1.4.jar:/opt/canal/bin/../lib/canal.protocol-1.1.4.jar:/opt/canal/bin/../lib/canal.prometheus-1.1.4.jar:/opt/canal/bin/../lib/canal.parse.driver-1.1.4.jar:/opt/canal/bin/../lib/canal.parse.dbsync-1.1.4.jar:/opt/canal/bin/../lib/canal.parse-1.1.4.jar:/opt/canal/bin/../lib/canal.meta-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.spring-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.manager-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.core-1.1.4.jar:/opt/canal/bin/../lib/canal.filter-1.1.4.jar:/opt/canal/bin/../lib/canal.deployer-1.1.4.jar:/opt/canal/bin/../lib/canal.common-1.1.4.jar:/opt/canal/bin/../lib/aviator-2.2.1.jar:/opt/canal/bin/../lib/aopalliance-1.0.jar:cd to /opt/canal/bin for continue


          9.1.4 查看 Canal 日志


          查看 /opt/canal/logs/example/example.log


          2021-02-24 01:41:40.293 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position2021-02-24 01:41:40.293 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status2021-02-24 01:41:40.542 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000001,position=4,serverId=1,gtid=<null>,timestamp=1614134832000] cost : 244ms , the next step is binlog dump


          9.1.5 查看 Kafka 中 consumer 信息


          mysql> insert into t2 values(1);Query OK, 1 row affected (0.00 sec)/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.12.22:9092 --topic canal-kafka --from-beginning{"data":null,"database":"test","es":1614151725000,"id":2,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"create database test","sqlType":null,"table":"","ts":1614151725890,"type":"QUERY"}{"data":null,"database":"test","es":1614151746000,"id":3,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table t2(id int)","sqlType":null,"table":"t2","ts":1614151746141,"type":"CREATE"}{"data":[{"id":"1"}],"database":"test","es":1614151941000,"id":4,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4},"table":"t2","ts":1614151941235,"type":"INSERT"}


          9.2 Kafka -> Flink 通路


          ## create a test table t2 in FlinkFlink SQL> create table t2(id int)> WITH (>  'connector' = 'kafka',>  'topic' = 'canal-kafka',>  'properties.bootstrap.servers' = '192.168.12.22:9092',>  'properties.group.id' = 'canal-kafka-consumer-group',>  'format' = 'canal-json',>  'scan.startup.mode' = 'latest-offset'> );Flink SQL> select * from t1;mysql> insert into test.t2 values(2);Query OK, 1 row affected (0.00 sec)Flink SQL> select * from t1; Refresh: 1 s                                                                                                             Page: Last of 1                                                                                                     Updated: 02:49:27.366                        id                         2


          9.3 Flink -> TiDB 通路


          9.3.1 在 下游的 TiDB 中創(chuàng)建用于測(cè)試的表


          [root@r20 soft]# mysql -uroot -P14000 -hr21mysql> create table t3 (id int);Query OK, 0 rows affected (0.31 sec)


          9.3.2 在 Flink 中創(chuàng)建測(cè)試表


          Flink SQL> CREATE TABLE t3 (>     id int> ) with (>     'connector' = 'jdbc',>     'url' = 'jdbc:mysql://192.168.12.21:14000/test',>     'table-name' = 't3',>     'username' = 'root',>     'password' = 'mysql'> );Flink SQL> insert into t3 values(3);[INFO] Submitting SQL update statement to the cluster...[INFO] Table update statement has been successfully submitted to the cluster:Job ID: a0827487030db177ee7e5c8575ef714e


          9.3.3 在下游 TiDB 中查看插入的數(shù)據(jù)


          mysql> select * from test.t3;+------+| id   |+------+|    3 |+------+1 row in set (0.00 sec)


          更多 Flink 相關(guān)技術(shù)交流,可掃碼加入社區(qū)釘釘大群~





            戳我,查看更多技術(shù)干貨~
          瀏覽 53
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天堂网天堂网 | 啪啪啪啪啪啪啪网站 | 日韩mv国产视频 | 亚洲精品一区中文字幕乱码 | 欧性猛交ⅩXXX乱大交 |