Datax3.0+DataX-Web打造分布式可視化ETL系統(tǒng)
全網(wǎng)最全大數(shù)據(jù)面試提升手冊(cè)!
一、DataX 簡(jiǎn)介
DataX 是阿里云 DataWorks 數(shù)據(jù)集成的開(kāi)源版本,主要就是用于實(shí)現(xiàn)數(shù)據(jù)間的離線同步。DataX 致力于實(shí)現(xiàn)包括關(guān)系型數(shù)據(jù)庫(kù)(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各種異構(gòu)數(shù)據(jù)源(即不同的數(shù)據(jù)庫(kù)) 間穩(wěn)定高效的數(shù)據(jù)同步功能。

為了解決異構(gòu)數(shù)據(jù)源同步問(wèn)題,DataX將復(fù)雜的網(wǎng)狀同步鏈路變成了星型數(shù)據(jù)鏈路,DataX 作為中間傳輸載體負(fù)責(zé)連接各種數(shù)據(jù)源;當(dāng)需要接入一個(gè)新的數(shù)據(jù)源時(shí),只需要將此數(shù)據(jù)源對(duì)接到 DataX,便能跟已有的數(shù)據(jù)源作為無(wú)縫數(shù)據(jù)同步。
1.DataX3.0框架設(shè)計(jì)
DataX 采用 Framework + Plugin 架構(gòu),將數(shù)據(jù)源讀取和寫(xiě)入抽象稱(chēng)為 Reader/Writer 插件,納入到整個(gè)同步框架中。

2.DataX3.0核心架構(gòu)
DataX 完成單個(gè)數(shù)據(jù)同步的作業(yè),我們稱(chēng)為 Job,DataX 接收到一個(gè) Job 后,將啟動(dòng)一個(gè)進(jìn)程來(lái)完成整個(gè)作業(yè)同步過(guò)程。DataX Job 模塊是單個(gè)作業(yè)的中樞管理節(jié)點(diǎn),承擔(dān)了數(shù)據(jù)清理、子任務(wù)切分、TaskGroup 管理等功能。

DataX Job 啟動(dòng)后,會(huì)根據(jù)不同源端的切分策略,將 Job 切分成多個(gè)小的 Task (子任務(wù)),以便于并發(fā)執(zhí)行。
接著 DataX Job 會(huì)調(diào)用 Scheduler 模塊,根據(jù)配置的并發(fā)數(shù)量,將拆分成的 Task 重新組合,組裝成 TaskGroup(任務(wù)組)。
每一個(gè) Task 都由 TaskGroup 負(fù)責(zé)啟動(dòng),Task 啟動(dòng)后,會(huì)固定啟動(dòng) Reader --> Channel --> Writer 線程來(lái)完成任務(wù)同步工作。
DataX 作業(yè)運(yùn)行啟動(dòng)后,Job 會(huì)對(duì) TaskGroup 進(jìn)行監(jiān)控操作,等待所有 TaskGroup 完成后,Job 便會(huì)成功退出(異常退出時(shí)值非0)
3.DataX調(diào)度過(guò)程
首先 DataX Job 模塊會(huì)根據(jù)分庫(kù)分表切分成若干個(gè) Task,然后根據(jù)用戶(hù)配置并發(fā)數(shù),來(lái)計(jì)算需要分配多少個(gè) TaskGroup;計(jì)算過(guò)程:Task / Channel = TaskGroup,最后由 TaskGroup 根據(jù)分配好的并發(fā)數(shù)來(lái)運(yùn)行 Task(任務(wù))。
二、使用 DataX 實(shí)現(xiàn)數(shù)據(jù)同步
例如生成MySQL到MySQL同步的模板:
#輸出mysql配置模版
[root@192 bin]# python /usr/local/datax/bin/datax.py -r mysqlreader -w mysqlwriter > /usr/local/datax/job/mysql2mysql.json
#根據(jù)模板編寫(xiě) mysql2mysql.json文件
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["id","name"], #"*"表示所有字段
"connection": [
{
"jdbcUrl": ["jdbc:mysql://x.x.x.210:3306/mytest"],
"table": ["user"]
}
],
"password": "root",
"username": "root"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["id","name"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://192.168.88.192:3306/mytest",
"table": ["user"]
}
],
"password": "root",
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "6"
}
}
}
}
驗(yàn)證:
[root@192 job]# python /usr/local/datax/bin/datax.py mysql2mysql.json
2022-04-24 17:39:03.445 [job-0] INFO JobContainer -
任務(wù)啟動(dòng)時(shí)刻 : 2022-04-24 17:38:49
任務(wù)結(jié)束時(shí)刻 : 2022-04-24 17:39:03
任務(wù)總計(jì)耗時(shí) : 14s
任務(wù)平均流量 : 0B/s
記錄寫(xiě)入速度 : 0rec/s
讀出記錄總數(shù) : 3
讀寫(xiě)失敗總數(shù) : 0
三、DataX-WEB 安裝部署
1.DataX-WEB
https://github.com/WeiYe-Jing/datax-web
2.解壓安裝包
在選定的安裝目錄,解壓安裝包
[root@192 ~]# tar -zxvf datax-web-2.1.2.tar.gz -C /usr/local/dataxweb
3.登錄msyql建庫(kù)
為接下來(lái)一鍵安裝部署準(zhǔn)備,這里我建的庫(kù)是dataxweb(自己定義就好,前后保持一致)
mysql> create database dataxweb;
4.執(zhí)行一鍵安裝腳本
進(jìn)入解壓后的目錄,找到bin目錄下面的install.sh文件,如果選擇交互式的安裝,則直接執(zhí)行
[root@192 dataxweb]# cd bin/
[root@192 bin]# pwd
/usr/local/dataxweb/bin
[root@192 bin]# ./install.sh
然后按照提示操作即可。包含了數(shù)據(jù)庫(kù)初始化,如果你的服務(wù)上安裝有mysql命令,在執(zhí)行安裝腳本的過(guò)程中則會(huì)出現(xiàn)以下提醒:
Scan out mysql command, so begin to initalize the database
Do you want to initalize database with sql: [{INSTALL_PATH}/bin/db/datax-web.sql]? (Y/N)y
Please input the db host(default: 127.0.0.1):
Please input the db port(default: 3306):
Please input the db username(default: root):
Please input the db password(default: ): root
Please input the db name(default: dataxweb)
按照提示輸入數(shù)據(jù)庫(kù)地址,端口號(hào),用戶(hù)名,密碼以及數(shù)據(jù)庫(kù)名稱(chēng),大部分情況下即可快速完成初始化。
如果服務(wù)上并沒(méi)有安裝mysql命令,則可以取用目錄下/bin/db/datax-web.sql腳本去手動(dòng)執(zhí)行,完成后修改相關(guān)配置文件
vi modules/datax-admin/conf/bootstrap.properties
#Database
DB_HOST=127.0.0.1
DB_PORT=3306
DB_USERNAME=root
DB_PASSWORD=root
DB_DATABASE=dataxweb
按照具體情況配置對(duì)應(yīng)的值即可。
在交互模式下,對(duì)各個(gè)模塊的package壓縮包的解壓以及configure配置腳本的調(diào)用,都會(huì)請(qǐng)求用戶(hù)確認(rèn),可根據(jù)提示查看是否安裝成功,如果沒(méi)有安裝成功,可以重復(fù)嘗試;如果不想使用交互模式,跳過(guò)確認(rèn)過(guò)程,則執(zhí)行以下命令安裝
./bin/install.sh --force
5.其他配置
郵件服務(wù)
在項(xiàng)目目錄:modules/datax-admin/bin/env.properties 配置郵件服務(wù)(可跳過(guò))MAIL_USERNAME="" MAIL_PASSWORD=""
此文件中包括一些默認(rèn)配置參數(shù),例如:server.port,具體請(qǐng)查看文件。
指定PYTHON_PATH的路徑
vim modules/datax-executor/bin/env.properties
### 執(zhí)行datax的python腳本地址
PYTHON_PATH=/usr/local/datax/bin/datax.py
### 保持和datax-admin服務(wù)的端口一致;默認(rèn)是9527,如果沒(méi)改datax-admin的端口,可以忽略
DATAX_ADMIN_PORT=
此文件中包括一些默認(rèn)配置參數(shù),例如:executor.port,json.path,data.path等,具體請(qǐng)查看文件。
6.啟動(dòng)服務(wù)
6.1一鍵啟動(dòng)所有服務(wù)
[root@192 dataxweb]# cd /usr/local/dataxweb/
[root@192 dataxweb]# ./bin/start-all.sh
中途可能發(fā)生部分模塊啟動(dòng)失敗或者卡住,可以退出重復(fù)執(zhí)行,如果需要改變某一模塊服務(wù)端口號(hào),則:
vi ./modules/{module_name}/bin/env.properties
找到SERVER_PORT配置項(xiàng),改變它的值即可。當(dāng)然也可以單一地啟動(dòng)某一模塊服務(wù):
./bin/start.sh -m {module_name}
module_name可以為datax-admin或datax-executor
6.2一鍵停止所有服務(wù)
[root@192 dataxweb]# cd /usr/local/dataxweb/
[root@192 dataxweb]# ./bin/stop-all.sh
當(dāng)然也可以單一地停止某一模塊服務(wù):
./bin/stop.sh -m {module_name}
6.3查看服務(wù)
在Linux環(huán)境下使用JPS命令,查看是否出現(xiàn)DataXAdminApplication和DataXExecutorApplication進(jìn)程,如果存在這表示項(xiàng)目運(yùn)行成功
如果項(xiàng)目啟動(dòng)失敗,請(qǐng)檢查啟動(dòng)日志:
modules/datax-admin/bin/console.out
或者
modules/datax-executor/bin/console.out
四、DataX-WEB 運(yùn)行
1.前端界面
部署完成后,在瀏覽器中輸入 http://ip:port/index.html 就可以訪問(wèn)對(duì)應(yīng)的主界面(ip為datax-admin部署所在服務(wù)器ip,port為為datax-admin 指定的運(yùn)行端口9527)

輸入用戶(hù)名 admin 密碼 123456 就可以直接訪問(wèn)系統(tǒng)
2.datax-web API
datax-web部署成功后,可以了解datax-web API相關(guān)內(nèi)容,網(wǎng)址: http://ip:port/doc.html

DataX-WEB 運(yùn)行日志
部署完成之后,在modules/對(duì)應(yīng)的項(xiàng)目/data/applogs下(用戶(hù)也可以自己指定日志,修改application.yml 中的logpath地址即可),用戶(hù)可以根據(jù)此日志跟蹤項(xiàng)目實(shí)際啟動(dòng)情況
如果執(zhí)行器啟動(dòng)比admin快,執(zhí)行器會(huì)連接失敗,日志報(bào)"拒絕連接"的錯(cuò)誤,一般是先啟動(dòng)admin,再啟動(dòng)executor,30秒之后會(huì)重連,如果成功請(qǐng)忽略這個(gè)異常。
六、DataX-WEB 實(shí)操
1.查看執(zhí)行器
查看web界面是否有注冊(cè)成功的執(zhí)行器,另外執(zhí)行器可以根據(jù)需要改名稱(chēng)。

2.創(chuàng)建項(xiàng)目

3.路由策略
當(dāng)執(zhí)行器集群部署時(shí),提供豐富的路由策略,包括:
FIRST(第一個(gè)):固定選擇第一個(gè)機(jī)器;
LAST(最后一個(gè)):固定選擇最后一個(gè)機(jī)器;
ROUND(輪詢(xún)):依次分配任務(wù);
RANDOM(隨機(jī)):隨機(jī)選擇在線的機(jī)器;
CONSISTENT_HASH(一致性HASH):每個(gè)任務(wù)按照Hash算法固定選擇某一臺(tái)機(jī)器,且所有任務(wù)均勻散列在不同機(jī)器上。
LEAST_FREQUENTLY_USED(最不經(jīng)常使用):使用頻率最低的機(jī)器優(yōu)先被選舉;
LEAST_RECENTLY_USED(最近最久未使用):最久為使用的機(jī)器優(yōu)先被選舉;
FAILOVER(故障轉(zhuǎn)移):按照順序依次進(jìn)行心跳檢測(cè),第一個(gè)心跳檢測(cè)成功的機(jī)器選定為目標(biāo)執(zhí)行器并發(fā)起調(diào)度;
BUSYOVER(忙碌轉(zhuǎn)移):按照順序依次進(jìn)行空閑檢測(cè),第一個(gè)空閑檢測(cè)成功的機(jī)器選定為目標(biāo)執(zhí)行器并發(fā)起調(diào)度;
阻塞處理策略:調(diào)度過(guò)于密集執(zhí)行器來(lái)不及處理時(shí)的處理策略
單機(jī)串行:調(diào)度請(qǐng)求進(jìn)入單機(jī)執(zhí)行器后,調(diào)度請(qǐng)求進(jìn)入FIFO隊(duì)列并以串行方式運(yùn)行; 丟棄后續(xù)調(diào)度:調(diào)度請(qǐng)求進(jìn)入單機(jī)執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運(yùn)行的調(diào)度任務(wù),本次請(qǐng)求將會(huì)被丟棄并標(biāo)記為失敗; 覆蓋之前調(diào)度:調(diào)度請(qǐng)求進(jìn)入單機(jī)執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運(yùn)行的調(diào)度任務(wù),將會(huì)終止運(yùn)行中的調(diào)度任務(wù)并清空隊(duì)列,然后運(yùn)行本地調(diào)度任務(wù);
增量增新建議將阻塞策略設(shè)置為丟棄后續(xù)調(diào)度或者單機(jī)串行
設(shè)置單機(jī)串行時(shí)應(yīng)該注意合理設(shè)置重試次數(shù)(失敗重試的次數(shù)* 每次執(zhí)行時(shí)間< 任務(wù)的調(diào)度周期),重試的次數(shù)如果設(shè)置的過(guò)多會(huì)導(dǎo)致數(shù)據(jù)重復(fù),例如任務(wù)30秒執(zhí)行一次,每次執(zhí)行時(shí)間需要20秒,設(shè)置重試三次,如果任務(wù)失敗了,第一個(gè)重試的時(shí)間段為1577755680-1577756680,重試任務(wù)沒(méi)結(jié)束,新任務(wù)又開(kāi)啟,那新任務(wù)的時(shí)間段會(huì)是1577755680-1577758680
4.任務(wù)類(lèi)型
先選擇DataX任務(wù),后續(xù)配置完詳細(xì)任務(wù)后可以按照下圖修改,其它可以根據(jù)需求填寫(xiě)。

5. 數(shù)據(jù)源配置
根據(jù)不同數(shù)據(jù)源,配置參數(shù)。


6.任務(wù)構(gòu)建
構(gòu)建reader

這里沒(méi)按上面操作生成映射,直接使用任務(wù)管理->添加手動(dòng)配置

{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"*"
],
"where": " save_time >= FROM_UNIXTIME(${lastTime}) and save_time < FROM_UNIXTIME(${currentTime})",
"splitPk": "id",
"connection": [
{
"table": [
"uc_op_amazon_api_store_download"
],
"jdbcUrl": [
"jdbc:mysql://x.x.x.210:3306/test_system"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "root",
"column": [
"*"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://192.168.88.192:3306/mytest?useUnicode=true&characterEncoding=utf8",
"table": [
"uc_op_amazon_api_store_download"
]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
-DstartId='%s' -DendId='%s'
# 表名
uc_op_business_reports
#主鍵
id
#id自增配置條件



