<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          伴魚(yú)基于 Flink 構(gòu)建數(shù)據(jù)集成平臺(tái)的設(shè)計(jì)與實(shí)現(xiàn)

          共 7020字,需瀏覽 15分鐘

           ·

          2021-12-23 00:34

          摘要:數(shù)據(jù)倉(cāng)庫(kù)有四個(gè)基本的特征:面向主題的、集成的、相對(duì)穩(wěn)定的、反映歷史變化的。其中數(shù)據(jù)集成是數(shù)據(jù)倉(cāng)庫(kù)構(gòu)建的首要前提,指將多個(gè)分散的、異構(gòu)的數(shù)據(jù)源整合在一起以便于后續(xù)的數(shù)據(jù)分析。將數(shù)據(jù)集成過(guò)程平臺(tái)化,將極大提升數(shù)據(jù)開(kāi)發(fā)人員的效率,本文主要內(nèi)容為:


          1. 數(shù)據(jù)集成 VS 數(shù)據(jù)同步
          2. 集成需求
          3. 數(shù)據(jù)集成 V1
          4. 數(shù)據(jù)集成 V2
          5. 線上效果
          6. 總結(jié)


          A data warehouse is a subject-oriented, integrated, nonvolatile, and time-variant collection of data in support of management’s decisions.

          —— Bill Inmon


          一、數(shù)據(jù)集成 VS 數(shù)據(jù)同步


          「數(shù)據(jù)集成」往往和「數(shù)據(jù)同步」在概念上存在一定的混淆,為此我們對(duì)這二者進(jìn)行了區(qū)分:


          • 「數(shù)據(jù)集成」特指面向數(shù)據(jù)倉(cāng)庫(kù) ODS 層的數(shù)據(jù)同步過(guò)程;
          • 「數(shù)據(jù)同步」面向的是一般化的 Source 到 Sink 的數(shù)據(jù)傳輸過(guò)程。



          二者的關(guān)系如下圖所示:






          • 「數(shù)據(jù)同步平臺(tái)」提供基礎(chǔ)能力,不摻雜具體的業(yè)務(wù)邏輯。

          • 「數(shù)據(jù)集成平臺(tái)」是構(gòu)建在「數(shù)據(jù)同步平臺(tái)」之上的,除了將原始數(shù)據(jù)同步之外還包含了一些聚合的邏輯 (如通過(guò)數(shù)據(jù)庫(kù)的日志數(shù)據(jù)對(duì)快照數(shù)據(jù)進(jìn)行恢復(fù),下文將會(huì)詳細(xì)展開(kāi)) 以及數(shù)倉(cāng)規(guī)范相關(guān)的內(nèi)容 (如數(shù)倉(cāng) ODS 層庫(kù)表命名規(guī)范) 等。



          目前「數(shù)據(jù)同步平臺(tái)」的建設(shè)正在我們的規(guī)劃之中,但這并不影響「數(shù)據(jù)集成平臺(tái)」的搭建,一些同步的需求可提前在「實(shí)時(shí)計(jì)算平臺(tái)」創(chuàng)建,以「約定」的方式解耦。

          值得一提的是「數(shù)據(jù)集成」也應(yīng)當(dāng)涵蓋「數(shù)據(jù)采集」(由特定的工具支持) 和「數(shù)據(jù)清洗」(由采集粒度、日志規(guī)范等因素決定) 兩部分內(nèi)容,這兩部分內(nèi)容各個(gè)公司都有自己的實(shí)現(xiàn),本文將不做詳細(xì)介紹。

          二、集成需求



          目前伴魚(yú)內(nèi)部數(shù)據(jù)的集成需求主要體現(xiàn)在三塊:Stat Log (業(yè)務(wù)標(biāo)準(zhǔn)化日志或稱(chēng)統(tǒng)計(jì)日志)、TiDB 及 MongoDB。除此之外還有一些 Service Log、Nginx Log 等,此類(lèi)不具備代表性不在本文介紹。另外,由于實(shí)時(shí)數(shù)倉(cāng)正處于建設(shè)過(guò)程中,目前「數(shù)據(jù)集成平臺(tái)」只涵蓋離線數(shù)倉(cāng) (Hive)。


          • Stat Log:業(yè)務(wù)落盤(pán)的日志將由 FileBeat 組件收集至 Kafka。由于日志為 Append Only 類(lèi)型, 因此 Stat Log 集成相對(duì)簡(jiǎn)單,只需將 Kafka 數(shù)據(jù)同步至 Hive 即可。

          • DB (TiDB、MongoDB):DB 數(shù)據(jù)相對(duì)麻煩,核心訴求是數(shù)倉(cāng)中能夠存在業(yè)務(wù)數(shù)據(jù)庫(kù)的鏡像,即存在業(yè)務(wù)數(shù)據(jù)庫(kù)中某一時(shí)刻(天級(jí) or 小時(shí)級(jí))的數(shù)據(jù)快照,當(dāng)然有時(shí)也有對(duì)數(shù)據(jù)變更過(guò)程的分析需求。因此 DB 數(shù)據(jù)集成需要將這兩個(gè)方面都考慮進(jìn)去。



          由于以上兩種類(lèi)型的數(shù)據(jù)集成方式差異較大,下文將分別予以討論。

          三、數(shù)據(jù)集成 V1




          伴魚(yú)早期「數(shù)據(jù)集成平臺(tái)」已具備雛形,這個(gè)階段主要是借助一系列開(kāi)源的工具實(shí)現(xiàn)。隨著時(shí)間推進(jìn),這個(gè)版本暴露的問(wèn)題也逐漸增多,接下來(lái)將主要從數(shù)據(jù)流的角度對(duì) V1 進(jìn)行闡述,更多的細(xì)節(jié)問(wèn)題將在 V2 版本的設(shè)計(jì)中體現(xiàn)。


          3.1 Stat Log



          日志的集成并未接入平臺(tái),而是煙囪式的開(kāi)發(fā)方式,數(shù)據(jù)集成的鏈路如下圖所示:




          Kafka 中的數(shù)據(jù)先經(jīng)過(guò) Flume 同步至 HDFS,再由 Spark 任務(wù)將數(shù)據(jù)從 HDFS 導(dǎo)入至 Hive 并創(chuàng)建分區(qū)。整體鏈路較長(zhǎng)且引入了第三方組件(Flume)增加了運(yùn)維的成本,另外 Kafka 的原始數(shù)據(jù)在 HDFS 冗余存儲(chǔ)也增加了存儲(chǔ)的開(kāi)銷(xiāo)。


          3.2 DB



          DB 數(shù)據(jù)的集成主要是基于查詢(xún)的方式(批的方式,通過(guò) Select 查詢(xún)進(jìn)行全表掃描得到快照數(shù)據(jù))實(shí)現(xiàn),其鏈路如下圖所示:




          用戶(hù)通過(guò)平臺(tái)提交集成任務(wù),由 Airflow 定時(shí)任務(wù)掃描集成平臺(tái)元數(shù)據(jù)庫(kù),生成對(duì)應(yīng)的取數(shù)任務(wù) (TiDB 的數(shù)據(jù)通過(guò) Sqoop 工具,MongoDB 的數(shù)據(jù)則通過(guò) Mongoexport 工具)??梢钥吹?V1 版本并沒(méi)有獲取數(shù)據(jù)庫(kù)的變更的日志數(shù)據(jù),不能滿足對(duì)數(shù)據(jù)變更過(guò)程的分析訴求。

          由于 Sqoop 任務(wù)最終要從 TiDB 生產(chǎn)環(huán)境的業(yè)務(wù)數(shù)據(jù)庫(kù)獲取數(shù)據(jù),數(shù)據(jù)量大的情況下勢(shì)必對(duì)業(yè)務(wù)數(shù)據(jù)庫(kù)造成一定的影響。Mongoexport 任務(wù)直接作用在 MongoDB 的隱藏節(jié)點(diǎn) (無(wú)業(yè)務(wù)數(shù)據(jù)請(qǐng)求),對(duì)于線上業(yè)務(wù)的影響可以忽略不計(jì)。基于此,DBA 單獨(dú)搭建了一套 TiDB 大數(shù)據(jù)集群,用于將體量較大的業(yè)務(wù)數(shù)據(jù)庫(kù)同步至此 (基于 TiDB Pump 和 Drainer 組件),因此部分 Sqoop 任務(wù)可以從此集群拉群數(shù)據(jù)以消除對(duì)業(yè)務(wù)數(shù)據(jù)庫(kù)的影響。從數(shù)據(jù)流的角度,整個(gè)過(guò)程如下圖所示:




          是否將生產(chǎn)環(huán)境 TiDB 業(yè)務(wù)數(shù)據(jù)庫(kù)同步至 TiDB 大數(shù)據(jù)集群由數(shù)倉(cāng)的需求以及 DBA 對(duì)于數(shù)據(jù)量評(píng)估決定。可以看出,這種形式也存在著大量數(shù)據(jù)的冗余,集群的資源隨著同步任務(wù)的增加時(shí)長(zhǎng)達(dá)到瓶頸。并且隨著后續(xù)的演進(jìn),TiDB 大數(shù)據(jù)集群也涵蓋一部分?jǐn)?shù)據(jù)應(yīng)用生產(chǎn)環(huán)境的業(yè)務(wù)數(shù)據(jù)庫(kù),集群作用域逐漸模糊。

          四、數(shù)據(jù)集成 V2




          V2 版本我們引入了 Flink,將同步的鏈路進(jìn)行了簡(jiǎn)化,DB 數(shù)據(jù)集成從之前的基于查詢(xún)的方式改成了基于日志的方式 (流的方式),大大降低了冗余的存儲(chǔ)。


          4.1 Stat Log



          借助 Flink 1.11 版本后對(duì)于 Hive Integration 的支持,我們可以輕松的將 Kafka 的數(shù)據(jù)寫(xiě)入 Hive,因此 Stat Log 的集成也就變得異常簡(jiǎn)單 (相比 V1 版本,去除了對(duì) Flume 組件的依賴(lài),數(shù)據(jù)冗余也消除了),同時(shí) Flink Exactly-Once 的語(yǔ)義也確保了數(shù)據(jù)的準(zhǔn)確性。從數(shù)據(jù)流的角度,整個(gè)過(guò)程如下圖所示:




          目前按照小時(shí)粒度生成日志分區(qū),幾項(xiàng) Flink 任務(wù)配置參數(shù)如下:


          checkpoint: 10 min
          watermark: 1 min
          partition.time-extractor.kind: ‘custom’
          sink.partition-commit.delay: ‘3600s’
          sink.partition-commit.policy.kind: ‘metastore,success-file’
          sink.partition-commit.trigger: ‘partition-time’




          4.2 DB



          基于日志的方式對(duì) DB 數(shù)據(jù)進(jìn)行集成,意味著需要采集 DB 的日志數(shù)據(jù),在我們目前的實(shí)現(xiàn)中 TiDB 基于 Pump 和 Drainer 組件(目前生產(chǎn)環(huán)境數(shù)據(jù)庫(kù)集群版本暫不支持開(kāi)啟 TICDC),MongoDB 基于 MongoShake 組件,采集的數(shù)據(jù)將輸送至 Kafka。

          采用這種方式,一方面降低了業(yè)務(wù)數(shù)據(jù)庫(kù)的查詢(xún)壓力,另一方面可以捕捉數(shù)據(jù)的變更過(guò)程,同時(shí)冗余的數(shù)據(jù)存儲(chǔ)也消除了。不過(guò)由于原始數(shù)據(jù)是日志數(shù)據(jù),需要通過(guò)一定的手段還原出快照數(shù)據(jù)。新的鏈路如下圖所示:





          用戶(hù)提交集成任務(wù)后將同步創(chuàng)建三個(gè)任務(wù):


          • 增量任務(wù) (流):「增量任務(wù)」將 DB 日志數(shù)據(jù)由 Kafka 同步至 Hive。由于采集組件都是按照集群粒度進(jìn)行采集,且集群數(shù)量有限,目前都是手動(dòng)的方式將同步的任務(wù)在「實(shí)時(shí)計(jì)算平臺(tái)」創(chuàng)建,集成任務(wù)創(chuàng)建時(shí)默認(rèn)假定同步任務(wù)已經(jīng) ready,待「數(shù)據(jù)同步平臺(tái)」落地后可以同步做更多的自動(dòng)化操作和校驗(yàn)。

          • 存量任務(wù) (批):要想還原出快照數(shù)據(jù)則至少需要一份初始的快照數(shù)據(jù),因此「存量任務(wù)」的目的是從業(yè)務(wù)數(shù)據(jù)庫(kù)拉取集成時(shí)數(shù)據(jù)的初始快照數(shù)據(jù)。

          • Merge 任務(wù) (批):「Merge 任務(wù)」將存量數(shù)據(jù)和增量數(shù)據(jù)進(jìn)行聚合以還原快照數(shù)據(jù)。還原后的快照數(shù)據(jù)可作為下一日的存量,因此「存量任務(wù)」只需調(diào)度執(zhí)行一次,獲取初始快照數(shù)據(jù)即可。



          「存量任務(wù)」和「Merge 任務(wù)」由離線調(diào)度平臺(tái) Dolphinscheduler (簡(jiǎn)稱(chēng) DS) 調(diào)度執(zhí)行,任務(wù)執(zhí)行過(guò)程中將從集成任務(wù)的元數(shù)據(jù)庫(kù)中獲取所需的信息。目前「Merge 任務(wù)」按小時(shí)粒度調(diào)度,即每小時(shí)還原快照數(shù)據(jù)。

          從數(shù)據(jù)流的角度,整個(gè)過(guò)程如下圖所示:





          DB 的數(shù)據(jù)集成相較于 Stat Log 復(fù)雜性高,接下來(lái)以 TiDB 的數(shù)據(jù)集成為例講述設(shè)計(jì)過(guò)程中的一些要點(diǎn) (MongoDB 流程類(lèi)似,區(qū)別在于存量同步工具及數(shù)據(jù)解析)。


          ■?4.2.1 需求表達(dá)



          對(duì)于用戶(hù)而言,集成任務(wù)需要提供以下兩類(lèi)信息:


          • TiDB 源信息:包括集群、庫(kù)、表。

          • 成方式:集成方式表示的是快照數(shù)據(jù)的聚合粒度,包括全量和增量。全量表示需要將存量的快照數(shù)據(jù)與今日的增量日志數(shù)據(jù)聚合,而增量表示只需要將今日的增量日志數(shù)據(jù)聚合 (即便增量方式無(wú)需和存量的快照數(shù)據(jù)聚合,但初始存量的獲取依舊是有必要的,具體的使用形式由數(shù)倉(cāng)人員自行決定)。




          ■?4.2.2 存量任務(wù)



          存量任務(wù)雖然有且僅執(zhí)行一次,但為了完全消除數(shù)據(jù)集成對(duì)業(yè)務(wù)數(shù)據(jù)庫(kù)的影響,我們選擇數(shù)據(jù)庫(kù)的備份-恢復(fù)機(jī)制來(lái)實(shí)現(xiàn)。公司內(nèi)部數(shù)據(jù)庫(kù)的備份和恢復(fù)操作已經(jīng)平臺(tái)化,集群將定期進(jìn)行備份 (天粒度),通過(guò)平臺(tái)可以查詢(xún)到集群的最新備份,并且可由接口觸發(fā)備份恢復(fù)操作,故存量的獲取可直接作用于恢復(fù)的數(shù)據(jù)庫(kù)。

          由于數(shù)據(jù)庫(kù)備份的時(shí)間點(diǎn)與集成任務(wù)提交的時(shí)間點(diǎn)并不一定是同一天,這之間存在著一定的時(shí)間差將導(dǎo)致存量快照數(shù)據(jù)不符合我們的預(yù)期,各時(shí)間點(diǎn)的關(guān)系如下圖所示:




          按照我們的設(shè)定,存量快照數(shù)據(jù)應(yīng)當(dāng)是包含 T4 之前的全部數(shù)據(jù),而實(shí)際備份的快照數(shù)據(jù)僅包含 T1 之前的全部數(shù)據(jù),這之間存在這 N 天的數(shù)據(jù)差。

          注:這里之所以不說(shuō)數(shù)據(jù)差集為 T1 至 T4 區(qū)間的數(shù)據(jù),是因?yàn)樵隽康?Binlog 數(shù)據(jù)是以整點(diǎn)為分區(qū)的,在 Merge 的時(shí)候也是將整點(diǎn)的分區(qū)數(shù)據(jù)與存量數(shù)據(jù)進(jìn)行聚合,并支持了數(shù)據(jù)去重。因此 T1 時(shí)刻的存量數(shù)據(jù)與 T0-T3 之間的增量數(shù)據(jù)的 Merge 結(jié)果等效于 T0 時(shí)刻的存量數(shù)據(jù)與 T0-T3 之間的增量數(shù)據(jù)的 Merge 結(jié)果。所以 T1 至 T4 的數(shù)據(jù)差集等效 T0 至 T3 的數(shù)據(jù)差集,即圖示中的 N 天數(shù)據(jù)。

          對(duì)于缺失的這部分?jǐn)?shù)據(jù)實(shí)則是可以在「存量任務(wù)」中進(jìn)行補(bǔ)全,仔細(xì)分析這其實(shí)是可以通過(guò)執(zhí)行的 「Merge 任務(wù)」的補(bǔ)數(shù)操作實(shí)現(xiàn)。

          整個(gè)「存量任務(wù)」的工作流如下圖所示:



          • 同步觸發(fā)數(shù)據(jù)庫(kù)平臺(tái)進(jìn)行備份恢復(fù),產(chǎn)生回執(zhí) ID;

          • 通過(guò)回執(zhí) ID 輪訓(xùn)備份恢復(fù)狀態(tài),恢復(fù)失敗需要 DBA 定位異常,故將下線整個(gè)工作流,待恢復(fù)成功可在平臺(tái)重新恢復(fù)執(zhí)行「存量任務(wù)」。恢復(fù)進(jìn)行中,工作流直接退出,借助 DS 定時(shí)調(diào)度等待下次喚醒?;謴?fù)成功,進(jìn)入后續(xù)邏輯;

          • 從恢復(fù)庫(kù)中拉取存量,判定存量是否存在數(shù)據(jù)差,若存在則執(zhí)行 Merge 任務(wù)的補(bǔ)數(shù)操作,整個(gè)操作可冪等執(zhí)行,如若失敗退出此次工作流,等待下次調(diào)度;

          • 成功,下線整個(gè)工作流,任務(wù)完成。




          ■?4.2.3 Merge 任務(wù)



          Merge 任務(wù)的前提是存量數(shù)據(jù)與增量數(shù)據(jù)都已經(jīng) ready,我們通過(guò) _SUCCESS 文件進(jìn)行標(biāo)記。整個(gè)「Merge 任務(wù)」的工作流如下圖所示:



          • 校驗(yàn)文件標(biāo)記是否存在,若不存在說(shuō)明數(shù)據(jù)未 ready ,進(jìn)行報(bào)警并退出工作流等待下次調(diào)度;

          • 執(zhí)行 Merge 操作,失敗報(bào)警并退出工作流等待下次調(diào)度;

          • 成功,退出工作流等待下次調(diào)度。



          Merge 操作通過(guò) Flink DataSet API 實(shí)現(xiàn)。核心邏輯如下:


          • 加載存量、增量數(shù)據(jù),統(tǒng)一數(shù)據(jù)格式(核心字段:主鍵 Key 作為同一條數(shù)據(jù)的聚合字段;CommitTs 標(biāo)識(shí) binlog 的提交時(shí)間,存量數(shù)據(jù)默認(rèn)為 0 早于增量數(shù)據(jù);OpType 標(biāo)識(shí)數(shù)據(jù)操作類(lèi)型,包括:Insert、Update、Delete,存量數(shù)據(jù)默認(rèn)為 Insert 類(lèi)型),將兩份數(shù)據(jù)進(jìn)行 union;

          • 按照主鍵聚合;

          • 保留聚合后 CommitTs 最大的數(shù)據(jù)條目,其余丟棄;

          • 過(guò)濾 OpType 為 Delete 類(lèi)型的數(shù)據(jù)條目;

          • 輸出聚合結(jié)果。



          核心代碼:


          allMergedData.groupBy(x -> x.getKeyCols())             .reduce(new ReduceFunction() {
          public MergeTransform reduce(MergeTransform value1, MergeTransform value2) throws Exception { if (value1.getCommitTS() > value2.getCommitTS()){ return value1; } return value2; } }) .filter(new FilterFunction() { //增量:過(guò)濾掉 op=delete
          public boolean filter(MergeTransform merge) throws Exception { if (merge.getOpType().equals(OPType.DELETE)){ return false; } return true; } }) .map(x -> x.getHiveColsText()) .writeAsText(outPath);


          主要思想為「后來(lái)者居上」,針對(duì)于 Insert、Update 操作,最新值直接覆蓋舊值,針對(duì) Delete 操作,直接丟棄。這種方式也天然的實(shí)現(xiàn)了數(shù)據(jù)去重操作。


          ■?4.2.4 容錯(cuò)性與數(shù)據(jù)一致性保證


          我們大體可以從三個(gè)任務(wù)故障場(chǎng)景下的處理方式來(lái)驗(yàn)證方案的容錯(cuò)性。


          • 「存量任務(wù)」異常失敗:通常是備份恢復(fù)失敗導(dǎo)致,DS 任務(wù)將發(fā)送失敗報(bào)警,因「數(shù)據(jù)庫(kù)平臺(tái)」暫不支持恢復(fù)重試,需人工介入處理。同時(shí)「Merge 任務(wù)」檢測(cè)不到存量的 _SUCCESS 標(biāo)記,工作流不會(huì)向后推進(jìn)。

          • 「增量任務(wù)」異常失?。?/span>Flink 自身的容錯(cuò)機(jī)制以及「實(shí)時(shí)計(jì)算平臺(tái)」的外部檢測(cè)機(jī)制保障「增量任務(wù)」的容錯(cuò)性。若在「Merge 任務(wù)」調(diào)度執(zhí)行期間「增量任務(wù)」尚未恢復(fù),將誤以為該小時(shí)無(wú)增量數(shù)據(jù)跳過(guò)執(zhí)行,此時(shí)相當(dāng)于快照更新延遲(Merge 是將全天的增量數(shù)據(jù)與存量聚合,在之后的調(diào)度時(shí)間點(diǎn)如果「增量任務(wù)」恢復(fù)又可以聚合得到最新的快照),或者在「增量任務(wù)」恢復(fù)后可人為觸發(fā)「Merge 任務(wù)」補(bǔ)數(shù)。

          • 「Merge 任務(wù)」異常失?。?/span>任務(wù)具有冪等性,通過(guò)設(shè)置 DS 任務(wù)失敗后的重試機(jī)制保障容錯(cuò)性,同時(shí)發(fā)送失敗報(bào)警。



          以上,通過(guò)自動(dòng)恢復(fù)機(jī)制和報(bào)警機(jī)制確保了整個(gè)工作流的正確執(zhí)行。接下來(lái)我們可以從數(shù)據(jù)的角度看一下方案對(duì)于一致性的保障。


          數(shù)據(jù)的一致性體現(xiàn)在 Merge 操作。兩份數(shù)據(jù)聚合,從代碼層面一定可以確保算法的正確性 (這是可驗(yàn)證的、可測(cè)試的),那么唯一可能導(dǎo)致數(shù)據(jù)不一致的情況出現(xiàn)在兩份輸入的數(shù)據(jù)上,即存量和增量,存在兩種情況:


          • 存量和增量數(shù)據(jù)有交疊:體現(xiàn)在初始存量與整點(diǎn)的增量數(shù)據(jù)聚合場(chǎng)景,由于算法天然的去重性可以保證數(shù)據(jù)的一致。

          • 存量和增量數(shù)據(jù)有缺失:體現(xiàn)在增量數(shù)據(jù)的缺失上,而增量數(shù)據(jù)是由 Flink 將 Kafka 數(shù)據(jù)寫(xiě)入 Hive 的,這個(gè)過(guò)程中是有一定的可能性造成數(shù)據(jù)的不一致,即分區(qū)提交后的亂序數(shù)據(jù)。雖然說(shuō)亂序數(shù)據(jù)到來(lái)后的下一次 checkpoint 時(shí)間點(diǎn)分區(qū)將再次提交,但下游任務(wù)一般是檢測(cè)到首次分區(qū)提交就會(huì)觸發(fā)執(zhí)行,造成下游任務(wù)的數(shù)據(jù)不一致。




          針對(duì) Flink 流式寫(xiě) Hive 過(guò)程中的亂序數(shù)據(jù)處理可以采取兩種手段:


          • 一是 Kafka 設(shè)置單分區(qū),多分區(qū)是產(chǎn)生導(dǎo)致亂序的根因,通過(guò)避免多分區(qū)消除數(shù)據(jù)亂序。


          • 二是報(bào)警補(bǔ)償,亂序一旦產(chǎn)生流式任務(wù)是無(wú)法完全避免的 (可通過(guò) watermark 設(shè)置亂序容忍時(shí)間,但終有一個(gè)界限),那么只能通過(guò)報(bào)警做事后補(bǔ)償。



          問(wèn)題轉(zhuǎn)換成了如何感知到亂序,我們可以進(jìn)一步分析,既然亂序數(shù)據(jù)會(huì)觸發(fā)前一個(gè)分區(qū)的二次提交,那么只需要在提交分區(qū)的時(shí)候檢測(cè)前一個(gè)分區(qū)是否存在 _SUCCESS 標(biāo)記便可以知曉是否是亂序數(shù)據(jù)以及觸發(fā)報(bào)警。

          五、線上效果




          總覽




          存量任務(wù)




          Merge 任務(wù)




          六、總結(jié)




          本文闡述了伴魚(yú)「數(shù)據(jù)集成平臺(tái)」核心設(shè)計(jì)思路,整個(gè)方案還有一些細(xì)節(jié)未在文章中體現(xiàn),如數(shù)據(jù) Schema 的變更、DB 日志數(shù)據(jù)的解析等,這些細(xì)節(jié)對(duì)于平臺(tái)構(gòu)建也至關(guān)重要。目前伴魚(yú)絕大部分的集成任務(wù)已切換至新的方式并穩(wěn)定運(yùn)行。我們也正在推進(jìn)實(shí)時(shí)數(shù)倉(cāng)集成任務(wù)的接入,以提供更統(tǒng)一的體驗(yàn)。

          瀏覽 27
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  一级特黄60分钟免费看 | 亚洲国产欧美日韩在线观看 | 黄色视频在线观看国产 | 全部免费毛片在线播放网站 | 一级爱爱 |