<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          實(shí)戰(zhàn)!Spring Boot 整合 阿里開源中間件 Canal 實(shí)現(xiàn)數(shù)據(jù)增量同步!

          共 6688字,需瀏覽 14分鐘

           ·

          2022-01-09 21:16

          數(shù)據(jù)同步一直是一個令人頭疼的問題。在業(yè)務(wù)量小,場景不多,數(shù)據(jù)量不大的情況下我們可能會選擇在項(xiàng)目中直接寫一些定時任務(wù)手動處理數(shù)據(jù),例如從多個表將數(shù)據(jù)查出來,再匯總處理,再插入到相應(yīng)的地方。

          但是隨著業(yè)務(wù)量增大,數(shù)據(jù)量變多以及各種復(fù)雜場景下的分庫分表的實(shí)現(xiàn),使數(shù)據(jù)同步變得越來越困難。

          今天這篇文章使用阿里開源的中間件Canal解決數(shù)據(jù)增量同步的痛點(diǎn)。

          文章目錄如下:


          Canal是什么?

          canal譯意為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)。

          從這句話理解到了什么?

          基于MySQL,并且通過MySQL日志進(jìn)行的增量解析,這也就意味著對原有的業(yè)務(wù)代碼完全是無侵入性的。

          工作原理:解析MySQL的binlog日志,提供增量數(shù)據(jù)。

          基于日志增量訂閱和消費(fèi)的業(yè)務(wù)包括

          • 數(shù)據(jù)庫鏡像

          • 數(shù)據(jù)庫實(shí)時備份

          • 索引構(gòu)建和實(shí)時維護(hù)(拆分異構(gòu)索引、倒排索引等)

          • 業(yè)務(wù) cache 刷新

          • 帶業(yè)務(wù)邏輯的增量數(shù)據(jù)處理

          當(dāng)前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

          官方文檔:https://github.com/alibaba/canal

          ?

          Canal數(shù)據(jù)如何傳輸?

          先來一張官方圖:

          Canal分為服務(wù)端和客戶端,這也是阿里常用的套路,比如前面講到的注冊中心Nacos

          • 服務(wù)端:負(fù)責(zé)解析MySQL的binlog日志,傳遞增量數(shù)據(jù)給客戶端或者消息中間件

          • 客戶端:負(fù)責(zé)解析服務(wù)端傳過來的數(shù)據(jù),然后定制自己的業(yè)務(wù)處理。

          目前為止支持的消息中間件很全面了,比如KafkaRocketMQRabbitMQ

          ?

          數(shù)據(jù)同步還有其他中間件嗎?

          有,當(dāng)然有,還有一些開源的中間件也是相當(dāng)不錯的,比如Bifrost

          常見的幾款中間件的區(qū)別如下:

          當(dāng)然要我選擇的話,首選阿里的中間件Canal。

          ?

          Canal服務(wù)端安裝

          服務(wù)端需要下載壓縮包,下載地址:https://github.com/alibaba/canal/releases

          目前最新的是v1.1.5,點(diǎn)擊下載:

          下載完成解壓,目錄如下:

          本文使用Canal+RabbitMQ進(jìn)行數(shù)據(jù)的同步,因此下面步驟完全按照這個base進(jìn)行。

          1、打開MySQL的binlog日志

          修改MySQL的日志文件,my.cnf 配置如下:

          [mysqld]
          log-bin=mysql-bin # 開啟 binlog
          binlog-format=ROW # 選擇 ROW 模式
          server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復(fù)

          2、設(shè)置MySQL的配置

          需要設(shè)置服務(wù)端配置文件中的MySQL配置,這樣Canal才能知道需要監(jiān)聽哪個庫、哪個表的日志文件。

          一個 Server 可以配置多個實(shí)例監(jiān)聽 ,Canal 功能默認(rèn)自帶的有個 example 實(shí)例,本篇就用 example 實(shí)例 。如果增加實(shí)例,復(fù)制 example 文件夾內(nèi)容到同級目錄下,然后在 canal.properties 指定添加實(shí)例的名稱。

          修改canal.deployer-1.1.5\conf\example\instance.properties配置文件

          # url
          canal.instance.master.address=127.0.0.1:3306
          # username/password
          canal.instance.dbUsername=root
          canal.instance.dbPassword=root
          # 監(jiān)聽的數(shù)據(jù)庫
          canal.instance.defaultDatabaseName=test

          # 監(jiān)聽的表,可以指定,多個用逗號分割,這里正則是監(jiān)聽所有
          canal.instance.filter.regex=.*\\..*

          3、設(shè)置RabbitMQ的配置

          服務(wù)端默認(rèn)的傳輸方式是tcp,需要在配置文件中設(shè)置MQ的相關(guān)信息。

          這里需要修改兩處配置文件,如下;

          1、canal.deployer-1.1.5\conf\canal.properties

          這個配置文件主要是設(shè)置MQ相關(guān)的配置,比如URL,用戶名、密碼...

          # 傳輸方式:tcp, kafka, rocketMQ, rabbitMQ
          canal.serverMode = rabbitMQ
          ##################################################
          ######### RabbitMQ #############
          ##################################################
          rabbitmq.host = 127.0.0.1
          rabbitmq.virtual.host =/
          # exchange
          rabbitmq.exchange =canal.exchange
          # 用戶名、密碼
          rabbitmq.username =guest
          rabbitmq.password =guest
          ## 是否持久化
          rabbitmq.deliveryMode = 2

          2、canal.deployer-1.1.5\conf\example\instance.properties

          這個文件設(shè)置MQ的路由KEY,這樣才能路由到指定的隊(duì)列中,如下:

          canal.mq.topic=canal.routing.key

          4、RabbitMQ新建exchange和Queue

          在RabbitMQ中需要新建一個canal.exchange(必須和配置中的相同)的exchange和一個名稱為 canal.queue(名稱隨意)的隊(duì)列。

          其中綁定的路由KEY為:canal.routing.key(必須和配置中的相同),如下圖:

          5、啟動服務(wù)端

          點(diǎn)擊bin目錄下的腳本,windows直接雙擊startup.bat,啟動成功如下:

          6、測試

          在本地?cái)?shù)據(jù)庫test中的oauth_client_details插入一條數(shù)據(jù),如下:

          INSERT INTO `oauth_client_details` VALUES ('myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false');

          此時查看MQ中的canal.queue已經(jīng)有了數(shù)據(jù),如下:

          其實(shí)就是一串JSON數(shù)據(jù),這個JSON如下:

          {
          ?"data":?[{
          ??"client_id":?"myjszl",
          ??"resource_ids":?"res1",
          ??"client_secret":?"$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W",
          ??"scope":?"all",
          ??"authorized_grant_types":?"password,refresh_token,authorization_code,client_credentials,implicit",
          ??"web_server_redirect_uri":?"http://www.baidu.com",
          ??"authorities":?null,
          ??"access_token_validity":?"1000",
          ??"refresh_token_validity":?"1000",
          ??"additional_information":?null,
          ??"autoapprove":?"false"
          ?}],
          ?"database":?"test",
          ?"es":?1640337532000,
          ?"id":?7,
          ?"isDdl":?false,
          ?"mysqlType":?{
          ??"client_id":?"varchar(48)",
          ??"resource_ids":?"varchar(256)",
          ??"client_secret":?"varchar(256)",
          ??"scope":?"varchar(256)",
          ??"authorized_grant_types":?"varchar(256)",
          ??"web_server_redirect_uri":?"varchar(256)",
          ??"authorities":?"varchar(256)",
          ??"access_token_validity":?"int(11)",
          ??"refresh_token_validity":?"int(11)",
          ??"additional_information":?"varchar(4096)",
          ??"autoapprove":?"varchar(256)"
          ?},
          ?"old":?null,
          ?"pkNames":?["client_id"],
          ?"sql":?"",
          ?"sqlType":?{
          ??"client_id":?12,
          ??"resource_ids":?12,
          ??"client_secret":?12,
          ??"scope":?12,
          ??"authorized_grant_types":?12,
          ??"web_server_redirect_uri":?12,
          ??"authorities":?12,
          ??"access_token_validity":?4,
          ??"refresh_token_validity":?4,
          ??"additional_information":?12,
          ??"autoapprove":?12
          ?},
          ?"table":?"oauth_client_details",
          ?"ts":?1640337532520,
          ?"type":?"INSERT"
          }

          每個字段的意思已經(jīng)很清楚了,有表名稱、方法、參數(shù)、參數(shù)類型、參數(shù)值.....

          客戶端要做的就是監(jiān)聽MQ獲取JSON數(shù)據(jù),然后將其解析出來,處理自己的業(yè)務(wù)邏輯。

          ?

          Canal客戶端搭建

          客戶端很簡單實(shí)現(xiàn),要做的就是消費(fèi)Canal服務(wù)端傳遞過來的消息,監(jiān)聽canal.queue這個隊(duì)列。

          1、創(chuàng)建消息實(shí)體類

          MQ傳遞過來的是JSON數(shù)據(jù),當(dāng)然要創(chuàng)建個實(shí)體類接收數(shù)據(jù),如下:

          /**
          ?*?Canal消息接收實(shí)體類
          ?*/

          @NoArgsConstructor
          @Data
          public?class?CanalMessage<T>?{
          ????@JsonProperty("type")
          ????private?String?type;

          ????@JsonProperty("table")
          ????private?String?table;

          ????@JsonProperty("data")
          ????private?List?data;

          ????@JsonProperty("database")
          ????private?String?database;

          ????@JsonProperty("es")
          ????private?Long?es;

          ????@JsonProperty("id")
          ????private?Integer?id;

          ????@JsonProperty("isDdl")
          ????private?Boolean?isDdl;

          ????@JsonProperty("old")
          ????private?List?old;

          ????@JsonProperty("pkNames")
          ????private?List?pkNames;

          ????@JsonProperty("sql")
          ????private?String?sql;

          ????@JsonProperty("ts")
          ????private?Long?ts;
          }

          2、MQ消息監(jiān)聽業(yè)務(wù)

          接下來就是監(jiān)聽隊(duì)列,一旦有Canal服務(wù)端有數(shù)據(jù)推送能夠及時的消費(fèi)。

          代碼很簡單,只是給出個接收的案例,具體的業(yè)務(wù)邏輯可以根據(jù)業(yè)務(wù)實(shí)現(xiàn),如下:

          import?cn.hutool.json.JSONUtil;
          import?cn.myjszl.middle.ware.canal.mq.rabbit.model.CanalMessage;
          import?lombok.RequiredArgsConstructor;
          import?lombok.extern.slf4j.Slf4j;
          import?org.springframework.amqp.rabbit.annotation.Exchange;
          import?org.springframework.amqp.rabbit.annotation.Queue;
          import?org.springframework.amqp.rabbit.annotation.QueueBinding;
          import?org.springframework.amqp.rabbit.annotation.RabbitListener;
          import?org.springframework.stereotype.Component;

          /**
          ?*?監(jiān)聽MQ獲取Canal增量的數(shù)據(jù)消息
          ?*/

          @Component
          @Slf4j
          @RequiredArgsConstructor
          public?class?CanalRabbitMQListener?{

          ????@RabbitListener(bindings?=?{
          ????????????@QueueBinding(
          ????????????????????value?=?@Queue(value?=?"canal.queue",?durable?=?"true"),
          ????????????????????exchange?=?@Exchange(value?=?"canal.exchange"),
          ????????????????????key?=?"canal.routing.key"
          ????????????)
          ????})
          ????public?void?handleDataChange(String?message)?{
          ????????//將message轉(zhuǎn)換為CanalMessage
          ????????CanalMessage?canalMessage?=?JSONUtil.toBean(message,?CanalMessage.class);
          ????????String?tableName?=?canalMessage.getTable();
          ????????log.info("Canal 監(jiān)聽?{}?發(fā)生變化;明細(xì):{}",?tableName,?message);
          ????????//TODO?業(yè)務(wù)邏輯自己完善...............
          ????}
          }

          3、測試

          下面向表中插入數(shù)據(jù),看下接收的消息是什么樣的,SQL如下:

          INSERT?INTO?`oauth_client_details`
          VALUES
          ?(?'myjszl',?'res1',?'$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W',?'all',?'password,refresh_token,authorization_code,client_credentials,implicit',?'http://www.baidu.com',?NULL,?1000,?1000,?NULL,?'false'?);

          客戶端轉(zhuǎn)換后的消息如下圖:

          上圖可以看出所有的數(shù)據(jù)都已經(jīng)成功接收到,只需要根據(jù)數(shù)據(jù)完善自己的業(yè)務(wù)邏輯即可。

          ?

          總結(jié)

          數(shù)據(jù)增量同步的開源工具并不只有Canal一種,根據(jù)自己的業(yè)務(wù)需要選擇合適的組件。

          有道無術(shù),術(shù)可成;有術(shù)無道,止于術(shù)

          歡迎大家關(guān)注Java之道公眾號


          好文章,我在看??

          瀏覽 38
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产一级理论片 | 苍井空一二区 | 青青草肏逼视频 | 7777精品视频 | 精品无码一区二区三区在线 |