<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Hudi 實(shí)踐 | Linkflow 構(gòu)建實(shí)時(shí)數(shù)據(jù)湖的生產(chǎn)實(shí)踐

          共 12252字,需瀏覽 25分鐘

           ·

          2021-04-17 18:04

          可變數(shù)據(jù)的處理一直以來都是大數(shù)據(jù)系統(tǒng),尤其是實(shí)時(shí)系統(tǒng)的一大難點(diǎn)。在調(diào)研多種方案后,我們選擇了 CDC to Hudi 的數(shù)據(jù)攝入方案,目前在生產(chǎn)環(huán)境可實(shí)現(xiàn)分鐘級(jí)的數(shù)據(jù)實(shí)時(shí)性,希望本文所述對(duì)大家的生產(chǎn)實(shí)踐有所啟發(fā)。

          1. 背景

          Linkflow 作為客戶數(shù)據(jù)平臺(tái)(CDP),為企業(yè)提供從客戶數(shù)據(jù)采集、分析到執(zhí)行的運(yùn)營(yíng)閉環(huán)。每天都會(huì)通過一方數(shù)據(jù)采集端點(diǎn)(SDK)和三方數(shù)據(jù)源,如微信,微博等,收集大量的數(shù)據(jù)。這些數(shù)據(jù)都會(huì)經(jīng)過清洗,計(jì)算,整合后寫入存儲(chǔ)。使用者可以通過靈活的報(bào)表或標(biāo)簽對(duì)持久化的數(shù)據(jù)進(jìn)行分析和計(jì)算,結(jié)果又會(huì)作為MA (Marketing Automation) 系統(tǒng)的數(shù)據(jù)源,從而實(shí)現(xiàn)對(duì)特定人群的精準(zhǔn)營(yíng)銷。

          在 Linkflow 中,數(shù)據(jù)分為不可變數(shù)據(jù)(Immutable Data)和可變數(shù)據(jù)(Mutable Data),這些數(shù)據(jù)都會(huì)參與分析,涉及到的表大概有十幾張,其中不可變數(shù)據(jù)的數(shù)據(jù)量較大,可以達(dá)到數(shù)十億級(jí)。如果放到傳統(tǒng)大數(shù)據(jù)系統(tǒng),不可變數(shù)據(jù)即為事實(shí)數(shù)據(jù),可變數(shù)據(jù)為維度數(shù)據(jù)。但在真正的業(yè)務(wù)實(shí)踐里,用戶的自然屬性,訂單的金額和狀態(tài)等都是可更新的,這些數(shù)據(jù)的數(shù)據(jù)量往往也非??捎^,在我們的系統(tǒng)里此類數(shù)據(jù)也會(huì)達(dá)到億級(jí)。對(duì)于可變數(shù)據(jù)之前一直都是通過關(guān)系型數(shù)據(jù)庫(kù)MySQL進(jìn)行管理,一來數(shù)據(jù)維護(hù)方便,二來業(yè)務(wù)對(duì)接容易。

          但問題也顯而易見

          ?數(shù)據(jù)碎片化,由于 MySQL 大表 online DDL 風(fēng)險(xiǎn)較大,隨著業(yè)務(wù)復(fù)雜度的提升,往往需要增加新的子表來擴(kuò)展業(yè)務(wù)屬性,也就是說一個(gè)完整的用戶數(shù)據(jù)會(huì)散落在多張表中,這對(duì)查詢十分不友好。?多維度查詢無(wú)法實(shí)現(xiàn),由于關(guān)系型數(shù)據(jù)庫(kù)的優(yōu)勢(shì)不是多維度查詢,并且給所有字段都加索引也并不現(xiàn)實(shí),所以需要一款可支持OLAP查詢引擎的數(shù)據(jù)組件來支撐多維分析的業(yè)務(wù)場(chǎng)景。并且考慮到未來可分別獨(dú)立擴(kuò)展的可能,我們也優(yōu)先考慮計(jì)算和存儲(chǔ)分離的架構(gòu)。

          2. CDC 和數(shù)據(jù)湖

          CDC(CHANGE DATA CAPTURE)是一種軟件設(shè)計(jì)模式,用于確定和跟蹤已變更的數(shù)據(jù),以便可以對(duì)更改后的數(shù)據(jù)采取措施。其實(shí)早在兩年前我們就有使用 canal 冗余 MySQL 數(shù)據(jù)到異構(gòu)存儲(chǔ)的經(jīng)驗(yàn),只是當(dāng)時(shí)沒有意識(shí)到可以通過這種方式與大數(shù)據(jù)存儲(chǔ)進(jìn)行集成。在使用canal 的過程中我們發(fā)現(xiàn)了一些性能的問題,并且開源社區(qū)基本無(wú)人維護(hù),所以在新架構(gòu)啟動(dòng)前又調(diào)研了 Maxwell 和 Debezium,恰好關(guān)注到 Flink 母公司 Ververica 開源的項(xiàng)目 flink-cdc-connectors[1] ,該項(xiàng)目將 Debezium 作為 binlog 的同步引擎嵌入到 Flink 任務(wù)中,可以方便地在流任務(wù)中對(duì) binlog 的消息進(jìn)行篩選、校驗(yàn)、數(shù)據(jù)整合和格式轉(zhuǎn)換,并且性能優(yōu)異??紤]到未來又可以直接與行為數(shù)據(jù)進(jìn)行雙流 join,甚至通過 CEP 進(jìn)行簡(jiǎn)單的風(fēng)控,我們最終選擇了 Debezium in Flink 的 CDC 方案。

          由于MySQL中的數(shù)據(jù)主題很多,在流任務(wù)中我們同時(shí)也做了數(shù)據(jù)路由,即不同主題的變化數(shù)據(jù)會(huì)路由到不同的 Kafka Topic 中,即將 Kafka 作為 ODS。這樣做的好處很多,首先對(duì)于可變數(shù)據(jù)我們可以清晰的觀察到每次變化的過程,其次可以對(duì)數(shù)據(jù)進(jìn)行回放,逐次變化的疊加結(jié)果便是最終的狀態(tài)。

          接下來要考慮的就是數(shù)據(jù)存在哪里,結(jié)合上文提到的“計(jì)算存儲(chǔ)分離”原則, 這也是數(shù)據(jù)湖提供的一個(gè)優(yōu)勢(shì),數(shù)據(jù)湖一般使用類似文件系統(tǒng)存儲(chǔ)(對(duì)象存儲(chǔ)或傳統(tǒng)的HDFS)來構(gòu)建,恰好符合我們的預(yù)期。在對(duì)比了幾種數(shù)據(jù)湖方案后,我們選擇了Apache Hudi,理由如下

          ?Hudi 提供了一個(gè)在 HDFS 中 upsert 的解決方案,即類似關(guān)系型數(shù)據(jù)庫(kù)的使用體驗(yàn),對(duì)于可更新數(shù)據(jù)非常友好,并且也符合 MySQL binlog 的語(yǔ)義。?增量查詢,可以很方便的獲取最近30分鐘,或者1天內(nèi)發(fā)生變化的數(shù)據(jù),這對(duì)于一些可疊加的離線計(jì)算任務(wù)非常友好,不再需要針對(duì)全量數(shù)據(jù)進(jìn)行計(jì)算,只需要針對(duì)變化數(shù)據(jù)進(jìn)行計(jì)算,大大節(jié)省了機(jī)器資源和時(shí)間。?可以實(shí)時(shí)同步元數(shù)據(jù)到 Hive,為“入湖即可查”創(chuàng)造了條件。?對(duì) COW 和 MOR 兩種不同使用場(chǎng)景分別進(jìn)行了優(yōu)化。?Hudi社區(qū)開放且迭代速度快,在其孵化階段就被AWS EMR集成,然后被阿里云DLA數(shù)據(jù)湖分析[2]、阿里云EMR[3]以及騰訊云EMR[4]集成,前景不錯(cuò),同時(shí)ApacheHudi國(guó)內(nèi)技術(shù)交流群討論非常熱烈,國(guó)內(nèi)基于Hudi構(gòu)建數(shù)據(jù)湖的公司越來越多。

          在集成了 Hudi 后,我們的架構(gòu)演化成這樣

          數(shù)據(jù)表都選擇了 COW(寫時(shí)復(fù)制)模式,主要是考慮到讀多寫少的特點(diǎn),并且我們需要查詢過程盡可能地快,MOR(讀時(shí)合并)的策略在查詢端的性能還是要稍微弱一些,再加上對(duì)于數(shù)據(jù)時(shí)延并沒有到亞秒級(jí)的要求,所以最終選擇了 COW。

          最上層我們使用了 Presto 作為分析引擎,提供數(shù)據(jù)即席查詢的能力。由于我們使用的 Hudi 版本是0.6.0,與 Flink 的集成還沒有發(fā)布,所以我們不得不采用 Flink + Spark 雙擎的策略,使用 Spark Streaming 將 Kafka 中的數(shù)據(jù)寫入 Hudi。

          3. 技術(shù)挑戰(zhàn)

          在進(jìn)行了 PoC 后我們確定了上圖所示的架構(gòu)設(shè)計(jì),但在真正的實(shí)現(xiàn)過程中,也遇到了不小的挑戰(zhàn)。

          3.1 CDC 運(yùn)行模式定制

          3.1.1 全量模式

          Debezium 的一大優(yōu)勢(shì)就是“批流一體”,snapshot 階段就是通過掃描全表將數(shù)據(jù)回放成與 binlog 增量日志內(nèi)容一致的消息,這樣使用者就可以使用相同的代碼同時(shí)處理全量和增量數(shù)據(jù)。但是在我們的業(yè)務(wù)實(shí)踐中,如果歷史表的個(gè)數(shù)和表內(nèi)的數(shù)據(jù)都很多,就會(huì)造成 snapshot 階段持續(xù)的時(shí)間非常長(zhǎng),一旦這個(gè)過程出現(xiàn)意外中斷,那么下次需要從第一張表開始重新掃描。假設(shè)完整的 snapshot 過程需要數(shù)天,那么這種規(guī)模的“重試”我們是無(wú)法接受的,所以需要有類似斷點(diǎn)續(xù)傳的機(jī)制,在查詢了 Debezuim 官方文檔后我們發(fā)現(xiàn)了 snapshot.include.collection.list  參數(shù)

          An optional, comma-separated list of regular expressions that match names of schemas specified in table.include.list for which you want to take the snapshot.

          所以可以在 snapshot 中斷后,通過該參數(shù)傳入剩余待掃描的表,從而實(shí)現(xiàn)“接力”的能力。但這里需要注意的一點(diǎn)是,無(wú)論 snapshot 階段重試幾次,增量的 binlog 位點(diǎn)都必須是首次 snapshot 時(shí)的位點(diǎn),否則就會(huì)丟數(shù)據(jù)。這也帶來了另一個(gè)問題,假如中斷后再接力直到 snapshot 完成,Debezuim 是會(huì)自動(dòng)開始從本次(而不是首次)snapshot 時(shí)的 binlog 位點(diǎn)直接開始增量同步數(shù)據(jù),這不是我們需要的結(jié)果,我們需要 snapshot 結(jié)束后任務(wù)直接終止。

          翻了很多 Debezuim 的文檔并沒有發(fā)現(xiàn)這樣的功能,但是在翻閱源碼的過程中看到其實(shí)是有辦法的

          /*** Perform a snapshot and then stop before attempting to read the binlog.*/INITIAL_ONLY("initial_only", true);// MySqlConnectorTask.javaif (taskContext.isInitialSnapshotOnly()) {    logger.warn("This connector will only perform a snapshot, and will stop after that completes.");    chainedReaderBuilder.addReader(new BlockingReader("blocker",            "Connector has completed all of its work but will continue in the running state. It can be shut down at any time."));    chainedReaderBuilder            .completionMessage("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate.");}

          即在 initial_only 的模式下 Debezuim 會(huì)使用 BlockingReader 替代 BinlogReader 將線程阻塞,不再進(jìn)行增量消費(fèi)。

          3.1.2 增量模式

          如果 snapshot 結(jié)束后任務(wù)自動(dòng)停止,那么就需要手動(dòng)重啟任務(wù)繼續(xù)增量同步,同時(shí)增量模式需要支持指定 MySQL 的 binlog 文件和具體的位點(diǎn)(position)。Debezuim 自帶 schema_only_recovery 模式,可以手動(dòng)設(shè)置參數(shù)。

          DebeziumOffset specificOffset = new DebeziumOffset();Map<String, Object> sourceOffset = new HashMap<>();sourceOffset.put("file", startupOptions.specificOffsetFile);sourceOffset.put("pos", startupOptions.specificOffsetPos);specificOffset.setSourceOffset(sourceOffset);

          由于我們之前使用的 ververica/flink-cdc-connectors 版本是1.2.0,沒有開放 Debezuim 的 schema_only_recovery 模式,所以修改了相關(guān)源碼。目前1.3.0版本已支持,在 MySQLSourceBuilder 中作為啟動(dòng)參數(shù)傳入即可。

          3.2 部分更新(Patch Update)

          這里有必要解釋一下什么是覆蓋更新什么是部分更新,這其實(shí)也對(duì)應(yīng)于 RESTful 的語(yǔ)義,put 就是覆蓋更新,要求調(diào)用方提供的一定是一個(gè)完整的資源對(duì)象,理論上說,如果用了 put,但卻沒有提供完整的資源對(duì)象,那么缺了的那些字段應(yīng)該被清空。patch 對(duì)應(yīng)部分更新,或局部更新,調(diào)用方只提供需要更新的字段,而不提供完整的資源對(duì)象,好處是可以節(jié)省帶寬。

          在 Hudi 中默認(rèn)只支持覆蓋更新,但對(duì)于我們業(yè)務(wù)而言,采集端點(diǎn)上報(bào)的數(shù)據(jù)不可能包含完整的業(yè)務(wù)對(duì)象,如用戶年齡的增長(zhǎng),在上報(bào)時(shí)只會(huì)包含一個(gè)字段的信息

          {  "id": 123,  "ts": 1435290195610,  "data": {    "age": 25  }}

          這就需要先找出 rowkey=123 的數(shù)據(jù)內(nèi)容,并與待更新內(nèi)容進(jìn)行合并后再寫入。合并時(shí)如果待寫入數(shù)據(jù)的字段不為空,那么進(jìn)行歸并。Hudi默認(rèn)采用 OverwriteWithLatestAvroPayload 的 combineAndGetUpdateValue 方法

          Simply overwrites storage with latest delta record

          為了向前兼容,數(shù)據(jù)開發(fā)同事 Karl 新增了 OverwriteNonDefaultsWithLatestAvroPayload 類,覆寫了 combineAndGetUpdateValue 來處理上述問題,并已反饋給社區(qū) [HUDI-1255] Add new Payload(OverwriteNonDefaultsWithLatestAvroPayload) for updating specified fields in storage[5] , 其實(shí)社區(qū)內(nèi)類似需求還有很多,如 [HUDI-1160] Support update partial fields for CoW table[6], 我們也期待有更多的開發(fā)者可以將這個(gè)功能做的愈加完善。

          當(dāng)然這里也存在限制,如果真的希望將某個(gè)字段更新為空值,那么使用 OverwriteNonDefaultsWithLatestAvroPayload 是無(wú)法實(shí)現(xiàn)的。

          同時(shí)我們也對(duì)社區(qū)的Compaction策略了補(bǔ)充,添加了基于時(shí)間的Compaction調(diào)度策略,即不僅僅可以基于增量提交數(shù)進(jìn)行Compaction,還可以基于時(shí)間做Compaction,該工作也已經(jīng)反饋給社區(qū),參見[HUDI-1381] Schedule compaction based on time elapsed[7],這對(duì)于想要在指定時(shí)間內(nèi)進(jìn)行Compaction提供了更高的靈活性。

          3.3 一批次內(nèi)相同 rowkey 數(shù)據(jù)的歸并

          由于 CDC 的一個(gè)特征就是實(shí)時(shí)監(jiān)聽數(shù)據(jù)的變化,例如一個(gè)訂單的狀態(tài)在幾分鐘內(nèi)可能就會(huì)發(fā)生若干次改變,再加上 Spark Streaming 微批處理的特點(diǎn),有較大的概率會(huì)在一個(gè)時(shí)間窗口獲取大量相同 rowkey 的數(shù)據(jù),不同rowkey對(duì)應(yīng)部分?jǐn)?shù)據(jù),因此我們?cè)?Streaming 任務(wù)中對(duì)一批次相同 rowkey 的數(shù)據(jù)進(jìn)行了歸并,整體類似 Hudi 使用 Bloom 判斷 rowkey 是否存在的邏輯。特別需要注意的是時(shí)序問題,數(shù)據(jù)的疊加必須嚴(yán)格按照 ts 時(shí)間,否則就會(huì)出現(xiàn)舊版本的數(shù)據(jù)覆蓋新版本的情況。

          3.4 Schema evolution

          由于業(yè)務(wù)的發(fā)展以及靈活性的要求,表字段擴(kuò)展(Schema evolution)一定是剛需。Hudi恰好也考慮到了這一點(diǎn),我們從Hudi的 wiki[8] 上了解到

          What's Hudi's schema evolution story

          Hudi uses Avro as the internal canonical representation for records, primarily due to its nice schema compatibility & evolution[9] properties. This is a key aspect of having reliability in your ingestion or ETL pipelines. As long as the schema passed to Hudi (either explicitly in DeltaStreamer schema provider configs or implicitly by Spark Datasource's Dataset schemas) is backwards compatible (e.g no field deletes, only appending new fields to schema), Hudi will seamlessly handle read/write of old and new data and also keep the Hive schema up-to date.

          既然Avro格式本身就支持 Schema evolution,自然地Hudi也支持。

          Schema evolution 大致可以分為4種:

          1.Backwards compatible: 向后兼容,用新的schema可以讀取舊數(shù)據(jù),如果字段沒值,就用default值,這也是Hudi提供的兼容方式。2.Forwards compatible: 向前兼容,用舊schema可以讀取新數(shù)據(jù),Avro將忽略新加的字段,如果要向前兼容,刪掉的字段必須要有默認(rèn)值。3.Full compatible: 支持向前兼容,向后兼容,如果要全兼容,那么就需要只添加有默認(rèn)值的字段,并且只移除有默認(rèn)值的字段。4.No Compatibility Checking:這種情況一般來說就是需要強(qiáng)制改變某個(gè)字段的類型,此時(shí)就需要做全量的數(shù)據(jù)遷移,不推薦。

          在生產(chǎn)實(shí)踐中,我們通過修改 schema 就可以實(shí)現(xiàn)字段擴(kuò)展的需求。但隨之而來也會(huì)發(fā)現(xiàn)一些問題,比如字段過多會(huì)造成單個(gè)文件很大(突破128mb),寫入很慢,極端情況下1000多列的文件寫入會(huì)達(dá)到小時(shí)級(jí)別。后續(xù)我們也在尋找一些優(yōu)化方案,例如字段回收或者垂直分表,將單文件內(nèi)的字段數(shù)量降低。

          3.5 同時(shí)查詢和寫入導(dǎo)致異常

          這是出現(xiàn)在查詢端的問題,我們使用 Presto 查詢 Hive 表時(shí)會(huì)出現(xiàn) Hudi 元數(shù)據(jù)文件找不到的異常,進(jìn)而導(dǎo)致 Hudi 內(nèi)部的 NPE

          Error checking path :hdfs://hudipath/.hoodie_partition_metadata, under folder: hdfs://hudipath/event/202102; nested exception is java.sql.SQLException: Query failed (#20210309_031334_04606_fipir)

          基于上述信息,懷疑是在查詢的同時(shí),元數(shù)據(jù)信息被修改而導(dǎo)致的問題。在求助社區(qū)后,我們將 HoodieROTablePathFilter 中的 hoodiePathCache 改為線程安全的 ConcurrentHashMap, 重新打包得到 hudi-hadoop-mr.jar 和 hudi-common.jar ,替換到 presto/plugin/hive-hadoop2 的目錄下,重啟 Presto。后續(xù)沒有發(fā)現(xiàn)NPE的情況。

          4. 效果

          再來回顧一下我們?cè)诩軜?gòu)之初對(duì)于數(shù)據(jù)湖的設(shè)想:

          ?支持可變數(shù)據(jù)?支持 schema evolution?計(jì)算存儲(chǔ)分離,支持多種查詢引擎?支持增量視圖和時(shí)間旅行

          這些特性 Hudi 基本都實(shí)現(xiàn)了,新架構(gòu)完成后對(duì)比之前的系統(tǒng),數(shù)據(jù)時(shí)延和離線處理性能都有了顯著提升,具體表現(xiàn)在

          1.實(shí)時(shí)數(shù)據(jù)寫入過程簡(jiǎn)化,之前的更新操作實(shí)現(xiàn)繁瑣,現(xiàn)在開發(fā)過程中基本不用關(guān)心是新增還是更新操作,大大降低了開發(fā)人員的心智負(fù)擔(dān)。2.實(shí)時(shí)數(shù)據(jù)入湖到可查詢的時(shí)間縮短,雖然我們的采用的是 COW 的表模式,但實(shí)際測(cè)試發(fā)現(xiàn)入湖到可查詢的時(shí)效性并不低,基本都在分鐘級(jí)。3.離線處理性能提升,基于 Hudi 的增量視圖特性,每天的離線任務(wù)可以很容易的獲取過去24h變化的數(shù)據(jù),處理的數(shù)據(jù)量級(jí)變小,進(jìn)而帶來更短的處理時(shí)間。

          5. 未來計(jì)劃

          5.1 Flink 集成

          之前提到“迫不得已”的雙擎策略,事實(shí)上是非??鄲赖?,運(yùn)維和開發(fā)方式都無(wú)法統(tǒng)一,所以我們對(duì) Hudi 官方集成 Flink 的進(jìn)展非常關(guān)注,并且近期也有了新的 RFC - 24: Hoodie Flink Writer Proposal[10] ,同時(shí)也已經(jīng)在Hudi 0.8.0版本深度集成了Flink能力,期待未來的 Flink 集成版本在性能上可以有很大的提升,同時(shí)也可以將處理引擎統(tǒng)一成Flink,不再采用雙引擎模式。

          5.2 并發(fā)寫

          由于 Hudi 文件為了保證元數(shù)據(jù)的一致性,在0.8.0版本之前不支持并發(fā)寫。但在實(shí)際應(yīng)用中,數(shù)據(jù)湖中的很多數(shù)據(jù)不光是實(shí)時(shí)的數(shù)據(jù),還有很多是需要通過離線計(jì)算獲得的,假如某張表的一部分字段是 CDC 的直接反映,另一部分字段是離線任務(wù)的計(jì)算結(jié)果,這就會(huì)帶來并發(fā)寫的需求。

          我們目前采用兩種方式來規(guī)避

          1.垂直分表,即將兩部分文件分開,CDC 數(shù)據(jù)通過 Spark Streaming 寫入,離線計(jì)算結(jié)果寫入另一個(gè)文件,避免并發(fā)寫。2.模擬成 CDC 消息回寫 Kafka,為了查詢性能不能分表的情況下,離線計(jì)算結(jié)果會(huì)模擬成 CDC 消息寫入 Kafka,再通過 Spark Streaming 寫入 Hudi。但缺點(diǎn)也是很明顯的,就是離線任務(wù)的結(jié)果反映到最終存儲(chǔ)的時(shí)延較長(zhǎng)。

          最近 Hudi 發(fā)布的0.8.0版本已經(jīng)支持了并發(fā)寫模式,其基于樂觀鎖,文件級(jí)別的沖突檢測(cè)可很好的滿足并發(fā)寫需求,后續(xù)會(huì)測(cè)試看下效果。

          5.3 性能優(yōu)化

          上文也提到了諸如大文件,GC頻繁的一些問題,綜合來看我們發(fā)現(xiàn)寫入的瓶頸主要發(fā)生在兩個(gè)地方

          5.3.1 索引

          由于目前我們采用 HoodieGlobalBloomIndex,導(dǎo)致建立索引和查詢索引的時(shí)間都較長(zhǎng),官方提供了3種索引實(shí)現(xiàn)

          How does the Hudi indexing work & what are its benefits?

          The indexing component is a key part of the Hudi writing and it maps a given recordKey to a fileGroup inside Hudi consistently. This enables faster identification of the file groups that are affected/dirtied by a given write operation.

          Hudi supports a few options for indexing as below

          ?HoodieBloomIndex (default) : Uses a bloom filter and ranges information placed in the footer of parquet/base files (and soon log files as well)?HoodieGlobalBloomIndex : The default indexing only enforces uniqueness of a key inside a single partition i.e the user is expected to know the partition under which a given record key is stored. This helps the indexing scale very well for even very large datasets[11]. However, in some cases, it might be necessary instead to do the de-duping/enforce uniqueness across all partitions and the global bloom index does exactly that. If this is used, incoming records are compared to files across the entire dataset and ensure a recordKey is only present in one partition.?HBaseIndex : Apache HBase is a key value store, typically found in close proximity to HDFS. You can also store the index inside HBase, which could be handy if you are already operating HBase.

          You can implement your own index if you'd like, by subclassing the HoodieIndex class and configuring the index class name in configs.

          在與社區(qū)的討論后,我們更傾向于使用HBaseIndex或類似的k-v store來管理索引。

          5.3.2 更新

          upsert 慢除了某些文件較大的問題,另一方面也與 CDC 的特點(diǎn)有關(guān)??勺償?shù)據(jù)的更新范圍其實(shí)是不可預(yù)測(cè)的,極端情況下待更新的1000條數(shù)據(jù)屬于1000個(gè)不同的文件時(shí),更新的性能很難通過代碼優(yōu)化的方式提升,只能增加 cpu 資源提高處理并行度。我們會(huì)從幾個(gè)方面著手:

          1.參數(shù)調(diào)整,要是否有辦法平衡文件的數(shù)量和大小2.嘗試部分業(yè)務(wù)表使用 MOR 模式,MOR 在更新時(shí)會(huì)先將數(shù)據(jù)寫入日志文件,之后再合并到 Parquet,理論上可以降低覆寫 Parquet 文件的頻率3.討論業(yè)務(wù)上的 trade-off 來?yè)Q取更好的寫入速度

          6. 總結(jié)

          未來有待優(yōu)化的工作還有很多,我們也會(huì)積極參與社區(qū)建設(shè),嘗試新的特性,為用戶帶來更好的數(shù)據(jù)服務(wù)體驗(yàn),最后感謝 Flink CDC Connectors 和 Apache Hudi 的開發(fā)者和社區(qū)維護(hù)者。

          作者Dean,Linkflow首席架構(gòu)師

          筆者數(shù)據(jù)團(tuán)隊(duì)招聘:如果有兩年以上實(shí)時(shí)系統(tǒng)開發(fā)設(shè)計(jì)經(jīng)驗(yàn),一年以上Flink使用經(jīng)驗(yàn),熱衷于技術(shù),愛讀源碼,計(jì)算機(jī)基礎(chǔ)扎實(shí),Scala寫的跟Java一樣溜的,那很有可能你會(huì)是我們正在找的大數(shù)據(jù)工程師。簡(jiǎn)歷投遞到 [email protected][12],其他崗位詳見 https://hr.lagou.com/company/gongsi/j188444.html

          推薦閱讀

          Apache Hudi C位!云計(jì)算一哥AWS EMR 2020年度回顧

          Apache Hudi 0.8.0版本重磅發(fā)布

          一文徹底掌握Apache Hudi的主鍵和分區(qū)配置

          Apache Flink 1.12.2集成Hudi 0.9.0運(yùn)行指南

          干貨!Apache Hudi如何智能處理小文件問題

          引用鏈接

          [1] flink-cdc-connectors: https://github.com/ververica/flink-cdc-connectors
          [2] 阿里云DLA數(shù)據(jù)湖分析: https://help.aliyun.com/document_detail/173653.html?spm=a2c4g.11186623.6.576.1562672dKa8RYR
          [3] 阿里云EMR: https://help.aliyun.com/document_detail/193310.html
          [4] 騰訊云EMR: https://cloud.tencent.com/document/product/589/42955
          [5] [HUDI-1255] Add new Payload(OverwriteNonDefaultsWithLatestAvroPayload) for updating specified fields in storage: https://github.com/apache/hudi/pull/2056
          [6] [HUDI-1160] Support update partial fields for CoW table: https://github.com/apache/hudi/pull/2666
          [7] [HUDI-1381] Schedule compaction based on time elapsed: https://github.com/apache/hudi/pull/2260
          [8] wiki: https://cwiki.apache.org/confluence/display/HUDI/FAQ
          [9] schema compatibility & evolution: https://docs.confluent.io/current/schema-registry/avro.html
          [10] RFC - 24: Hoodie Flink Writer Proposal: https://cwiki.apache.org/confluence/display/HUDI/RFC+-+24%3A+Hoodie+Flink+Writer+Proposal
          [11] very large datasets: https://eng.uber.com/uber-big-data-platform/
          [12] [email protected]mailto:[email protected]


          瀏覽 39
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  又黄又嫩的视频网站 | 日大黑逼视频 | 自拍偷拍成人在线 | 日韩不卡天堂 | 豆花视频在线视频 |