滴滴基于Binlog的采集架構(gòu)與實(shí)踐
點(diǎn)擊上方“服務(wù)端思維”,選擇“設(shè)為星標(biāo)”
回復(fù)”669“獲取獨(dú)家整理的精選資料集
回復(fù)”加群“加入全國(guó)服務(wù)端高端社群「后端圈」
桔妹導(dǎo)讀:大數(shù)據(jù)是這個(gè)時(shí)代賦予我們的強(qiáng)大引擎,在數(shù)字化大潮中 ,借助數(shù)據(jù)驅(qū)動(dòng)的方法推動(dòng)業(yè)務(wù)乘風(fēng)破浪,幾乎是每家公司的核心戰(zhàn)略。數(shù)據(jù)驅(qū)動(dòng)的落腳點(diǎn)是數(shù)據(jù),能否將組織或業(yè)務(wù)運(yùn)行過(guò)程中的信息,進(jìn)行有效收集并組織成信息流,是數(shù)據(jù)驅(qū)動(dòng)的基石所在。本文分享了滴滴數(shù)據(jù)體系建設(shè)過(guò)程中,MySQL這一類(lèi)數(shù)據(jù)源的采集架構(gòu)和應(yīng)用實(shí)踐。
1.
背景

關(guān)系模型構(gòu)建起整個(gè)數(shù)據(jù)分析的基石,關(guān)系型數(shù)據(jù)庫(kù)作為具體實(shí)現(xiàn)、采集MySQL數(shù)據(jù)接入Hive是很多企業(yè)進(jìn)行數(shù)據(jù)分析的前提。如何及時(shí)、準(zhǔn)確的把MySQL數(shù)據(jù)同步到Hive呢?
一般解決方案是使用類(lèi)似Sqoop的工具,直連MySQL去Select數(shù)據(jù)存儲(chǔ)到HDFS,然后把HDFS數(shù)據(jù)Load到Hive中。這種方法簡(jiǎn)單易操作,但隨著業(yè)務(wù)規(guī)模擴(kuò)大,不足之處也逐步暴露出來(lái):
直連MySQL查詢(xún),對(duì)于數(shù)據(jù)庫(kù)壓力較大(如訂單表、支付表等),可能直接影響在線(xiàn)業(yè)務(wù)
數(shù)據(jù)整體就位時(shí)間(尤其大表)不滿(mǎn)足下游生產(chǎn)需求
擴(kuò)展性較差,對(duì)于分表、字段增減、變更等的支持較弱
拉取的數(shù)據(jù)是該時(shí)刻的鏡像,無(wú)法獲取中間變化情況
為解決上述問(wèn)題,我們引入Binlog實(shí)時(shí)采集 + 離線(xiàn)還原的解決方案,本文將從這兩個(gè)方面介紹整個(gè)數(shù)據(jù)的接入流程。


按照上述流程采集binlog日志增量入HDFS
使用離線(xiàn)一次性拉取一份歷史全量數(shù)據(jù),按字段還原到Hive作為基點(diǎn)(即第一個(gè)接入周期的數(shù)據(jù))
使用前一個(gè)接入周期的全量數(shù)據(jù)和本周期的增量binlog做merge形成該周期內(nèi)的數(shù)據(jù)。
相比一般解決方案,其優(yōu)點(diǎn)比較明顯,主要表現(xiàn)在:
基于Binlog日志的數(shù)據(jù)還原,與在線(xiàn)業(yè)務(wù)解耦
采集通過(guò)分布式隊(duì)列實(shí)時(shí)傳遞,還原操作在集群上實(shí)現(xiàn),及時(shí)性及可擴(kuò)展性強(qiáng)
Binlog日志包括了增、刪、改等明細(xì)動(dòng)作,支持定制化的ETL
MySQL Binlog是二進(jìn)制格式的日志文件,用來(lái)記錄數(shù)據(jù)庫(kù)的數(shù)據(jù)更新或者潛在更新(比如DELETE語(yǔ)句執(zhí)行刪除而實(shí)際并沒(méi)有符合條件的數(shù)據(jù)),主要用于數(shù)據(jù)庫(kù)的主從復(fù)制以及增量恢復(fù)。Statement模式:每一條會(huì)修改數(shù)據(jù)的sql都會(huì)被記錄在binlog中,如inserts, updates, deletes。
Row模式: 每一行的具體變更事件都會(huì)被記錄在binlog中。
Canal是阿里巴巴旗下的一款開(kāi)源項(xiàng)目,純Java開(kāi)發(fā)。基于數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱&消費(fèi),目前主要支持了MySQL(也支持mariaDB)。滴滴內(nèi)部版本在開(kāi)源基礎(chǔ)上新增了同步到MQ、消息上報(bào)功能以及容災(zāi)機(jī)制。
Canal主要運(yùn)作方式如下:
canal模擬mysql slave的交互協(xié)議,偽裝自己為mysql slave,向master發(fā)送dump協(xié)議
mysql master收到dump請(qǐng)求,開(kāi)始推送binary log到canal
canal解析binary log對(duì)象,并將解析的結(jié)果編碼成JSON格式的文本串
把解析后的文本串發(fā)送到消息隊(duì)列并上報(bào)發(fā)送情況(如Kafka、DDMQ)
格式化后的單條記錄新增消息示例如下:
{"binlog": "[email protected]","time": 1450236307000,"canalTime": 1450236308279,"db": "TestCanal","table": "g_order_010","event": "u","columns": [{"n": "order_id", "t": "bigint(20)", "v": "126", "null": false, "updated": false},{"n": "driver_id", "t": "bigint(20)", "v": "123456", "null": false, "updated": false},{ "n": "passenger_id", "t": "bigint(20)", "v": "654321", "null": false, "updated": false},{"n": "current_lng", "t": "decimal(10,6)", "v": "39.021400", "null": false, "updated": false},{"n": "current_lat", "t": "decimal(10,6)", "v": "120.423300", "null": false, "updated": false},{ "n": "starting_lng", "t": "decimal(10,6)", "v": "38.128000", "null": false, "updated": false},{ "n": "starting_lat", "t": "decimal(10,6)", "v": "121.445000", "null": false, "updated": false},{ "n": "dest_name", "t": "varchar(100)", "v": "Renmin University", "origin_val": "知春路", "null": false, "updated": true}],"keys": ["order_id"]}{"binlog": "[email protected]","time": 1450236307000,"canalTime": 1450236308279,"db": "TestCanal","table": "g_order_010","event": "u","columns": [{"n": "order_id", "t": "bigint(20)", "v": "126", "null": false, "updated": false},{"n": "driver_id", "t": "bigint(20)", "v": "123456", "null": false, "updated": false},{ "n": "passenger_id", "t": "bigint(20)", "v": "654321", "null": false, "updated": false},{"n": "current_lng", "t": "decimal(10,6)", "v": "39.021400", "null": false, "updated": false},{"n": "current_lat", "t": "decimal(10,6)", "v": "120.423300", "null": false, "updated": false},{ "n": "starting_lng", "t": "decimal(10,6)", "v": "38.128000", "null": false, "updated": false},{ "n": "starting_lat", "t": "decimal(10,6)", "v": "121.445000", "null": false, "updated": false},{ "n": "dest_name", "t": "varchar(100)", "v": "Renmin University", "origin_val": "知春路", "null": false, "updated": true}],"keys": ["order_id"]}
為保障整個(gè)Binlog鏈路中數(shù)據(jù)完整性,我們引入了Dquality服務(wù)。Dquality是數(shù)據(jù)通道中非常重要的一個(gè)環(huán)節(jié),記錄著整個(gè)數(shù)據(jù)通道每一個(gè)流程的數(shù)據(jù)信息,如某一段時(shí)間內(nèi)的數(shù)據(jù)總和等。Dquality主要包含以下功能:
為數(shù)據(jù)回溯提供元數(shù)據(jù)支持
校驗(yàn)數(shù)據(jù)丟失與延遲情況
校驗(yàn)數(shù)據(jù)完整性
簡(jiǎn)單流程為數(shù)據(jù)鏈路上的各發(fā)送方在成功傳遞數(shù)據(jù)后,把投遞結(jié)果以及時(shí)間信息發(fā)送到Dquality,Dquality統(tǒng)一匯總,分析判定每個(gè)時(shí)間段內(nèi)數(shù)據(jù)是否完成及時(shí)準(zhǔn)確傳輸,并把分析結(jié)果存儲(chǔ)下來(lái)。下游數(shù)據(jù)使用方通過(guò)接口從Dquality查詢(xún)?cè)摻Y(jié)果。
以Binlog鏈路為例,在Binlog流程中有兩個(gè)環(huán)節(jié)Canal->MQ、MQ->HDFS,上報(bào)數(shù)據(jù)發(fā)送情況到Dquality。下游ETL環(huán)節(jié)使用Dquality接口查詢(xún)數(shù)據(jù)就位情況,比如對(duì)于小時(shí)粒度任務(wù),查詢(xún)?cè)撔r(shí)的0分0秒到59分59秒之間的數(shù)據(jù)是否已經(jīng)完成寫(xiě)入,如果已經(jīng)完成寫(xiě)入,那么ETL任務(wù)就可以啟動(dòng)執(zhí)行。
基于此,天或小時(shí)采集周期內(nèi)的數(shù)據(jù)是固定的(冪等),以該時(shí)間段內(nèi)的數(shù)據(jù)作為清洗基礎(chǔ),無(wú)論什么時(shí)候執(zhí)行其結(jié)果不會(huì)改變。但在Canal上報(bào)環(huán)節(jié),目前無(wú)法有效判定較小數(shù)據(jù)量場(chǎng)景和同步異常場(chǎng)景,一定程度上影響數(shù)據(jù)就位時(shí)間。
5.
一次性拉取&初始化
Binlog從發(fā)起采集的一刻起才會(huì)在整個(gè)鏈路上存在,即以增量的方式傳遞,那么對(duì)于歷史數(shù)據(jù)如何獲取?實(shí)際場(chǎng)景中包括全量接入或增量歷史數(shù)據(jù)回溯。
目前實(shí)現(xiàn)方式為通過(guò)DataX工具直連MySQL離線(xiàn)庫(kù),拉取一份截至到當(dāng)前時(shí)間的全量數(shù)據(jù),然后按列還原到Hive表的首個(gè)分區(qū)中。
全量采集場(chǎng)景下,下個(gè)分區(qū)的數(shù)據(jù)基于上個(gè)分區(qū)的數(shù)據(jù)和當(dāng)前周期內(nèi)的增量Binlog日志merge,即可產(chǎn)生該分區(qū)內(nèi)的數(shù)據(jù)。
上面介紹了基于Binlog數(shù)據(jù)接入的整體流程,下面列舉兩個(gè)實(shí)際解決的業(yè)務(wù)問(wèn)題。
6.
場(chǎng)景一:數(shù)據(jù)飄移的支持
在實(shí)際業(yè)務(wù)中,存在很多類(lèi)似的兩種case,其采集周期存在一定的不確定性。
case 1:訂單的Binlog日志中,當(dāng)訂單事件的更新時(shí)間在59分59秒左右時(shí),數(shù)據(jù)有可能會(huì)落在下一個(gè)小時(shí)的分區(qū),以至于當(dāng)前小時(shí)數(shù)據(jù)沒(méi)有統(tǒng)計(jì)到該條訂單,同時(shí)下一個(gè)小時(shí)分區(qū)的數(shù)據(jù)也沒(méi)有打上相應(yīng)的事件標(biāo)簽。

7.
場(chǎng)景二:分庫(kù)分表的支持
業(yè)務(wù)發(fā)展,不可避免會(huì)有分庫(kù)分表的訴求,其規(guī)則也可能多種多樣,如table_{城市區(qū)號(hào)},table_{連續(xù)數(shù)字},table_{日期},如果逐個(gè)抽取并聚合,上下游的成本巨大。因此我們需要在數(shù)據(jù)規(guī)范層面,數(shù)據(jù)鏈路上保障能自動(dòng)化收集這類(lèi)數(shù)據(jù)。

1. 統(tǒng)一MySQL使用規(guī)范,明確分庫(kù)分表的命名規(guī)則,做到規(guī)則內(nèi)自動(dòng)化識(shí)別,同時(shí)完成全量元數(shù)據(jù)信息的收集,非規(guī)范化的命名規(guī)則無(wú)法自動(dòng)化支持。
2. 默認(rèn)情況下一個(gè)庫(kù)的數(shù)據(jù)會(huì)收集到一個(gè)topic內(nèi),如果有分庫(kù)存在也可以一并收集到一個(gè)topic內(nèi),保證邏輯上分庫(kù)分表的數(shù)據(jù)物理上收集到一起。
3. 按照/{db}/{table}/{year}/{month}/{day}/{hour}的路徑結(jié)構(gòu)(其中日期由Binlog時(shí)間格式化生成)落地到HDFS上,一個(gè)邏輯表的數(shù)據(jù)存儲(chǔ)在一起。
4. ETL處理階段,取出上述路徑下的Binlog日志,還原到Hive中。
為用戶(hù)更好使用分庫(kù)分表數(shù)據(jù)以及獲取中間變化過(guò)程,ETL階段額外再Hive表中寫(xiě)入三個(gè)字段:
system_rule_etl_update_field | 記錄更新時(shí)間,更新晚的對(duì)應(yīng)該字段的值更大,前十位是時(shí)間戳信息 |
system_rule_etl_delete_flag | 標(biāo)識(shí)本條記錄是否在上游數(shù)據(jù)庫(kù)中被刪除,0-正常記錄,1-刪除記錄 |
system_rule_etl_uniq_key | 全局主鍵,由mysql庫(kù)名+表名+主鍵拼接而成 |
8.
總結(jié)
作為數(shù)據(jù)建設(shè)的基礎(chǔ),數(shù)據(jù)平臺(tái)提供的基于Binlog的MySQL入Hive服務(wù),覆蓋公司內(nèi)部各個(gè)業(yè)務(wù)線(xiàn),日1.9w+同步任務(wù),近50T數(shù)據(jù)同步量,實(shí)時(shí)層面毫秒級(jí)別延遲,實(shí)現(xiàn)了及時(shí)、準(zhǔn)確、定制化的同步需求。但在個(gè)性化ETL、性能優(yōu)化、內(nèi)容建設(shè)等方面還存在未解決的問(wèn)題,后續(xù)我們會(huì)在這些方面重點(diǎn)發(fā)力,更好的助力業(yè)務(wù)發(fā)展。

— 本文結(jié)束 —

● 漫談設(shè)計(jì)模式在 Spring 框架中的良好實(shí)踐
關(guān)注我,回復(fù) 「加群」 加入各種主題討論群。
對(duì)「服務(wù)端思維」有期待,請(qǐng)?jiān)谖哪c(diǎn)個(gè)在看
喜歡這篇文章,歡迎轉(zhuǎn)發(fā)、分享朋友圈


