滴滴基于Binlog的采集架構(gòu)與實(shí)踐


桔妹導(dǎo)讀:大數(shù)據(jù)是這個(gè)時(shí)代賦予我們的強(qiáng)大引擎,在數(shù)字化大潮中 ,借助數(shù)據(jù)驅(qū)動(dòng)的方法推動(dòng)業(yè)務(wù)乘風(fēng)破浪,幾乎是每家公司的核心戰(zhàn)略。數(shù)據(jù)驅(qū)動(dòng)的落腳點(diǎn)是數(shù)據(jù),能否將組織或業(yè)務(wù)運(yùn)行過程中的信息,進(jìn)行有效收集并組織成信息流,是數(shù)據(jù)驅(qū)動(dòng)的基石所在。本文分享了滴滴數(shù)據(jù)體系建設(shè)過程中,MySQL這一類數(shù)據(jù)源的采集架構(gòu)和應(yīng)用實(shí)踐。
1.
背景

關(guān)系模型構(gòu)建起整個(gè)數(shù)據(jù)分析的基石,關(guān)系型數(shù)據(jù)庫作為具體實(shí)現(xiàn)、采集MySQL數(shù)據(jù)接入Hive是很多企業(yè)進(jìn)行數(shù)據(jù)分析的前提。如何及時(shí)、準(zhǔn)確的把MySQL數(shù)據(jù)同步到Hive呢?
一般解決方案是使用類似Sqoop的工具,直連MySQL去Select數(shù)據(jù)存儲(chǔ)到HDFS,然后把HDFS數(shù)據(jù)Load到Hive中。這種方法簡單易操作,但隨著業(yè)務(wù)規(guī)模擴(kuò)大,不足之處也逐步暴露出來:
直連MySQL查詢,對于數(shù)據(jù)庫壓力較大(如訂單表、支付表等),可能直接影響在線業(yè)務(wù)
數(shù)據(jù)整體就位時(shí)間(尤其大表)不滿足下游生產(chǎn)需求
擴(kuò)展性較差,對于分表、字段增減、變更等的支持較弱
拉取的數(shù)據(jù)是該時(shí)刻的鏡像,無法獲取中間變化情況
為解決上述問題,我們引入Binlog實(shí)時(shí)采集 + 離線還原的解決方案,本文將從這兩個(gè)方面介紹整個(gè)數(shù)據(jù)的接入流程。


按照上述流程采集binlog日志增量入HDFS
使用離線一次性拉取一份歷史全量數(shù)據(jù),按字段還原到Hive作為基點(diǎn)(即第一個(gè)接入周期的數(shù)據(jù))
使用前一個(gè)接入周期的全量數(shù)據(jù)和本周期的增量binlog做merge形成該周期內(nèi)的數(shù)據(jù)。
相比一般解決方案,其優(yōu)點(diǎn)比較明顯,主要表現(xiàn)在:
基于Binlog日志的數(shù)據(jù)還原,與在線業(yè)務(wù)解耦
采集通過分布式隊(duì)列實(shí)時(shí)傳遞,還原操作在集群上實(shí)現(xiàn),及時(shí)性及可擴(kuò)展性強(qiáng)
Binlog日志包括了增、刪、改等明細(xì)動(dòng)作,支持定制化的ETL
MySQL Binlog是二進(jìn)制格式的日志文件,用來記錄數(shù)據(jù)庫的數(shù)據(jù)更新或者潛在更新(比如DELETE語句執(zhí)行刪除而實(shí)際并沒有符合條件的數(shù)據(jù)),主要用于數(shù)據(jù)庫的主從復(fù)制以及增量恢復(fù)。Statement模式:每一條會(huì)修改數(shù)據(jù)的sql都會(huì)被記錄在binlog中,如inserts, updates, deletes。
Row模式: 每一行的具體變更事件都會(huì)被記錄在binlog中。
Canal是阿里巴巴旗下的一款開源項(xiàng)目,純Java開發(fā)?;跀?shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱&消費(fèi),目前主要支持了MySQL(也支持mariaDB)。滴滴內(nèi)部版本在開源基礎(chǔ)上新增了同步到MQ、消息上報(bào)功能以及容災(zāi)機(jī)制。
Canal主要運(yùn)作方式如下:
canal模擬mysql slave的交互協(xié)議,偽裝自己為mysql slave,向master發(fā)送dump協(xié)議
mysql master收到dump請求,開始推送binary log到canal
canal解析binary log對象,并將解析的結(jié)果編碼成JSON格式的文本串
把解析后的文本串發(fā)送到消息隊(duì)列并上報(bào)發(fā)送情況(如Kafka、DDMQ)
格式化后的單條記錄新增消息示例如下:
{"binlog": "[email protected]","time": 1450236307000,"canalTime": 1450236308279,"db": "TestCanal","table": "g_order_010","event": "u","columns": [{"n": "order_id", "t": "bigint(20)", "v": "126", "null": false, "updated": false},{"n": "driver_id", "t": "bigint(20)", "v": "123456", "null": false, "updated": false},{ "n": "passenger_id", "t": "bigint(20)", "v": "654321", "null": false, "updated": false},{"n": "current_lng", "t": "decimal(10,6)", "v": "39.021400", "null": false, "updated": false},{"n": "current_lat", "t": "decimal(10,6)", "v": "120.423300", "null": false, "updated": false},{ "n": "starting_lng", "t": "decimal(10,6)", "v": "38.128000", "null": false, "updated": false},{ "n": "starting_lat", "t": "decimal(10,6)", "v": "121.445000", "null": false, "updated": false},{ "n": "dest_name", "t": "varchar(100)", "v": "Renmin University", "origin_val": "知春路", "null": false, "updated": true}],"keys": ["order_id"]}{"binlog": "[email protected]","time": 1450236307000,"canalTime": 1450236308279,"db": "TestCanal","table": "g_order_010","event": "u","columns": [{"n": "order_id", "t": "bigint(20)", "v": "126", "null": false, "updated": false},{"n": "driver_id", "t": "bigint(20)", "v": "123456", "null": false, "updated": false},{ "n": "passenger_id", "t": "bigint(20)", "v": "654321", "null": false, "updated": false},{"n": "current_lng", "t": "decimal(10,6)", "v": "39.021400", "null": false, "updated": false},{"n": "current_lat", "t": "decimal(10,6)", "v": "120.423300", "null": false, "updated": false},{ "n": "starting_lng", "t": "decimal(10,6)", "v": "38.128000", "null": false, "updated": false},{ "n": "starting_lat", "t": "decimal(10,6)", "v": "121.445000", "null": false, "updated": false},{ "n": "dest_name", "t": "varchar(100)", "v": "Renmin University", "origin_val": "知春路", "null": false, "updated": true}],"keys": ["order_id"]}
為保障整個(gè)Binlog鏈路中數(shù)據(jù)完整性,我們引入了Dquality服務(wù)。Dquality是數(shù)據(jù)通道中非常重要的一個(gè)環(huán)節(jié),記錄著整個(gè)數(shù)據(jù)通道每一個(gè)流程的數(shù)據(jù)信息,如某一段時(shí)間內(nèi)的數(shù)據(jù)總和等。Dquality主要包含以下功能:
為數(shù)據(jù)回溯提供元數(shù)據(jù)支持
校驗(yàn)數(shù)據(jù)丟失與延遲情況
校驗(yàn)數(shù)據(jù)完整性
簡單流程為數(shù)據(jù)鏈路上的各發(fā)送方在成功傳遞數(shù)據(jù)后,把投遞結(jié)果以及時(shí)間信息發(fā)送到Dquality,Dquality統(tǒng)一匯總,分析判定每個(gè)時(shí)間段內(nèi)數(shù)據(jù)是否完成及時(shí)準(zhǔn)確傳輸,并把分析結(jié)果存儲(chǔ)下來。下游數(shù)據(jù)使用方通過接口從Dquality查詢該結(jié)果。
以Binlog鏈路為例,在Binlog流程中有兩個(gè)環(huán)節(jié)Canal->MQ、MQ->HDFS,上報(bào)數(shù)據(jù)發(fā)送情況到Dquality。下游ETL環(huán)節(jié)使用Dquality接口查詢數(shù)據(jù)就位情況,比如對于小時(shí)粒度任務(wù),查詢該小時(shí)的0分0秒到59分59秒之間的數(shù)據(jù)是否已經(jīng)完成寫入,如果已經(jīng)完成寫入,那么ETL任務(wù)就可以啟動(dòng)執(zhí)行。
基于此,天或小時(shí)采集周期內(nèi)的數(shù)據(jù)是固定的(冪等),以該時(shí)間段內(nèi)的數(shù)據(jù)作為清洗基礎(chǔ),無論什么時(shí)候執(zhí)行其結(jié)果不會(huì)改變。但在Canal上報(bào)環(huán)節(jié),目前無法有效判定較小數(shù)據(jù)量場景和同步異常場景,一定程度上影響數(shù)據(jù)就位時(shí)間。
5.
一次性拉取&初始化
Binlog從發(fā)起采集的一刻起才會(huì)在整個(gè)鏈路上存在,即以增量的方式傳遞,那么對于歷史數(shù)據(jù)如何獲?。?/span>實(shí)際場景中包括全量接入或增量歷史數(shù)據(jù)回溯。
目前實(shí)現(xiàn)方式為通過DataX工具直連MySQL離線庫,拉取一份截至到當(dāng)前時(shí)間的全量數(shù)據(jù),然后按列還原到Hive表的首個(gè)分區(qū)中。
全量采集場景下,下個(gè)分區(qū)的數(shù)據(jù)基于上個(gè)分區(qū)的數(shù)據(jù)和當(dāng)前周期內(nèi)的增量Binlog日志merge,即可產(chǎn)生該分區(qū)內(nèi)的數(shù)據(jù)。
上面介紹了基于Binlog數(shù)據(jù)接入的整體流程,下面列舉兩個(gè)實(shí)際解決的業(yè)務(wù)問題。
6.
場景一:數(shù)據(jù)飄移的支持
在實(shí)際業(yè)務(wù)中,存在很多類似的兩種case,其采集周期存在一定的不確定性。
case 1:訂單的Binlog日志中,當(dāng)訂單事件的更新時(shí)間在59分59秒左右時(shí),數(shù)據(jù)有可能會(huì)落在下一個(gè)小時(shí)的分區(qū),以至于當(dāng)前小時(shí)數(shù)據(jù)沒有統(tǒng)計(jì)到該條訂單,同時(shí)下一個(gè)小時(shí)分區(qū)的數(shù)據(jù)也沒有打上相應(yīng)的事件標(biāo)簽。

7.
場景二:分庫分表的支持
業(yè)務(wù)發(fā)展,不可避免會(huì)有分庫分表的訴求,其規(guī)則也可能多種多樣,如table_{城市區(qū)號},table_{連續(xù)數(shù)字},table_{日期},如果逐個(gè)抽取并聚合,上下游的成本巨大。因此我們需要在數(shù)據(jù)規(guī)范層面,數(shù)據(jù)鏈路上保障能自動(dòng)化收集這類數(shù)據(jù)。

1. 統(tǒng)一MySQL使用規(guī)范,明確分庫分表的命名規(guī)則,做到規(guī)則內(nèi)自動(dòng)化識別,同時(shí)完成全量元數(shù)據(jù)信息的收集,非規(guī)范化的命名規(guī)則無法自動(dòng)化支持。
2. 默認(rèn)情況下一個(gè)庫的數(shù)據(jù)會(huì)收集到一個(gè)topic內(nèi),如果有分庫存在也可以一并收集到一個(gè)topic內(nèi),保證邏輯上分庫分表的數(shù)據(jù)物理上收集到一起。
3. 按照/{db}/{table}/{year}/{month}/{day}/{hour}的路徑結(jié)構(gòu)(其中日期由Binlog時(shí)間格式化生成)落地到HDFS上,一個(gè)邏輯表的數(shù)據(jù)存儲(chǔ)在一起。
4. ETL處理階段,取出上述路徑下的Binlog日志,還原到Hive中。
為用戶更好使用分庫分表數(shù)據(jù)以及獲取中間變化過程,ETL階段額外再Hive表中寫入三個(gè)字段:
system_rule_etl_update_field | 記錄更新時(shí)間,更新晚的對應(yīng)該字段的值更大,前十位是時(shí)間戳信息 |
system_rule_etl_delete_flag | 標(biāo)識本條記錄是否在上游數(shù)據(jù)庫中被刪除,0-正常記錄,1-刪除記錄 |
system_rule_etl_uniq_key | 全局主鍵,由mysql庫名+表名+主鍵拼接而成 |
8.
總結(jié)
作為數(shù)據(jù)建設(shè)的基礎(chǔ),數(shù)據(jù)平臺提供的基于Binlog的MySQL入Hive服務(wù),覆蓋公司內(nèi)部各個(gè)業(yè)務(wù)線,日1.9w+同步任務(wù),近50T數(shù)據(jù)同步量,實(shí)時(shí)層面毫秒級別延遲,實(shí)現(xiàn)了及時(shí)、準(zhǔn)確、定制化的同步需求。但在個(gè)性化ETL、性能優(yōu)化、內(nèi)容建設(shè)等方面還存在未解決的問題,后續(xù)我們會(huì)在這些方面重點(diǎn)發(fā)力,更好的助力業(yè)務(wù)發(fā)展。
團(tuán)隊(duì)內(nèi)推
?
滴滴數(shù)據(jù)平臺與應(yīng)用部(DT),致力于打造準(zhǔn)確、穩(wěn)定、高效、易用的數(shù)據(jù)中臺體系,從而賦能業(yè)務(wù)發(fā)展。我們不僅擁有穩(wěn)定優(yōu)質(zhì)的大數(shù)據(jù)算力,完善的數(shù)據(jù)產(chǎn)品矩陣和業(yè)界領(lǐng)先的算法策略。也同時(shí)深入業(yè)務(wù),為業(yè)務(wù)的快速發(fā)展提供準(zhǔn)確、敏捷的數(shù)據(jù)服務(wù)支撐。除了提供數(shù)據(jù)資產(chǎn)、數(shù)據(jù)產(chǎn)品、數(shù)據(jù)應(yīng)用相結(jié)合的體系化解決方案之外,我們也勇往直前,持續(xù)探索數(shù)據(jù)智能應(yīng)用場景!歡迎大家加入我們,一起創(chuàng)造數(shù)據(jù)價(jià)值!
團(tuán)隊(duì)正在熱招高級/資深數(shù)據(jù)研發(fā)工程師崗位。歡迎有興趣的小伙伴加入,可以投遞簡歷至[email protected],請將郵件主題命名為 姓名-投遞崗位-投遞團(tuán)隊(duì)。





