每年節(jié)約3千萬!微信實(shí)驗(yàn)平臺(tái)Iceberg湖倉一體架構(gòu)改造
# 關(guān)注并星標(biāo)騰訊云開發(fā)者


-
微信實(shí)驗(yàn)平臺(tái)簡介

-
資源量級
微信實(shí)驗(yàn)平臺(tái)承載的是全微信所有業(yè)務(wù)的實(shí)驗(yàn)場景下的指標(biāo)計(jì)算及統(tǒng)計(jì)推斷,業(yè)務(wù)有效指標(biāo)個(gè)數(shù)達(dá)到了6w+,妥妥的資源使用大戶,當(dāng)前規(guī)模:
|
基于成本及穩(wěn)定性、中心內(nèi)業(yè)務(wù)建設(shè)等角度考慮,我們計(jì)算資源大多收斂在 Gemini(微信云原生大數(shù)據(jù)平臺(tái)),天穹 Gaia(TEG 公司級大數(shù)據(jù)平臺(tái))計(jì)算資源做 Backup,當(dāng)平臺(tái)依賴計(jì)算集群有異常,可以進(jìn)行任務(wù)層面的計(jì)算集群切換,使我們支持的業(yè)務(wù)用戶影響范圍最小化,存儲(chǔ)資源則完全依賴于天穹。
-
選型分析
Iceberg、Hudi 以及 DeltaLake 是基本同時(shí)期出現(xiàn)的開源表存儲(chǔ)格式項(xiàng)目,整體的功能和定位也是基本相同,也一定會(huì)前期百花齊放相互借鑒最終走向同質(zhì)化,網(wǎng)上已經(jīng)有很多相關(guān)對比介紹的文章,這里就不詳細(xì)比較了。
我們選擇 iceberg 作為 Lakehouse Table Format 的方案的主要原因是:
|
其他例如組件抽象更友好、更通用、pluggable 設(shè)計(jì),向下支持的文件格式(Parquet/Orc/Avro)、存儲(chǔ)類型(Object Storage/File Storage)更多,向上支持的計(jì)算引擎(Spark/Flink/Hive/Trino/Impala/SR…)更廣泛,這些并不是我們業(yè)務(wù)方的主要考慮點(diǎn)。只要其與我們平臺(tái)業(yè)務(wù)依賴的引擎框架都能足夠兼容,生產(chǎn)環(huán)境穩(wěn)定性可控、性能優(yōu)異、解決方案優(yōu)雅,即:在兼容我們已有技術(shù)架構(gòu)下,優(yōu)化現(xiàn)有流程,達(dá)到更快(計(jì)算時(shí)效性)及更省(節(jié)省更多資源)的目標(biāo)。

基于 Iceberg Table Schema 建設(shè)優(yōu)化
實(shí)驗(yàn)平臺(tái)業(yè)務(wù)不是層級復(fù)雜、主題域多樣的業(yè)務(wù)數(shù)倉建設(shè)場景,而是僅具備超大規(guī)模指標(biāo)計(jì)算的單一場景。所以我們的工作重點(diǎn)也不在高效復(fù)雜的數(shù)倉建設(shè)上,而是在于大規(guī)模指標(biāo)計(jì)算優(yōu)化上。
實(shí)驗(yàn)平臺(tái)指標(biāo)計(jì)算一般分為兩種類型的表,命中表:包含某個(gè)實(shí)驗(yàn) ID 及命中 uin 信息的物理表, 業(yè)務(wù)表:業(yè)務(wù)配置的用于指標(biāo)計(jì)算及后續(xù)假設(shè)檢驗(yàn)的物理表, 一般命中表及業(yè)務(wù)表及相關(guān)的配置邏輯口徑會(huì)組成具體的某個(gè)指標(biāo)。
有經(jīng)驗(yàn)的數(shù)倉同學(xué)基本都清楚,表 Schema 的建設(shè)優(yōu)化,基本都在于分區(qū)、分桶、排序、索引等方面。
業(yè)務(wù)表在表 Schema 的優(yōu)化上會(huì)依賴于業(yè)務(wù)數(shù)倉的建設(shè),依托于業(yè)務(wù)方的能力,實(shí)驗(yàn)平臺(tái)可控性并不強(qiáng)。命中表的 Schema 恰恰是平臺(tái)建設(shè)優(yōu)化的重點(diǎn),一般某天的某個(gè)指標(biāo)計(jì)算會(huì)綁定具體的實(shí)驗(yàn) ID,很自然的會(huì)想到按天作為一級分區(qū),實(shí)驗(yàn) ID 作為二級分區(qū),可以將可用數(shù)據(jù)最小化,降低后續(xù)指標(biāo)計(jì)算的讀表 IO/shuffle IO。可是難點(diǎn)在于之前依賴于THive的建設(shè),命中表一般至少保留3個(gè)月,實(shí)驗(yàn) ID 更是達(dá)到 2w 個(gè),做笛卡爾積后分區(qū)個(gè)數(shù)可達(dá)180w,Thive的元數(shù)據(jù)體系會(huì)受限于單點(diǎn)數(shù)據(jù)庫瓶頸,存在 OMS(廠版Hive metastore) RDBS 單點(diǎn)問題,經(jīng)常會(huì)因?yàn)槟硞€(gè)表元數(shù)據(jù)太多導(dǎo)致整個(gè)元數(shù)據(jù)庫 OMS 負(fù)載高,導(dǎo)致 Thive 不可用,影響 THive 服務(wù)上所有業(yè)務(wù)方保證的 SLA,所以 Thive 一般無法做到此種分區(qū)結(jié)構(gòu)。剛好 Iceberg 的出現(xiàn),由于其基于 HDFS 的獨(dú)立三層元數(shù)據(jù)體系,可以將元數(shù)據(jù)信息的壓力從 OMS 分?jǐn)偟?HDFS 上,規(guī)避 OMS 的單點(diǎn)瓶頸問題。

總結(jié)起來,其實(shí)就是利用 Iceberg 的三層元數(shù)據(jù)體系帶來的靈活性,細(xì)化業(yè)務(wù)表多級分區(qū),規(guī)避 OMS 受限于單點(diǎn)數(shù)據(jù)庫 RDBS 瓶頸的問題,提升后續(xù)計(jì)算效率。但要注意的是,要考慮到 NameNode 的元數(shù)據(jù)膨脹的問題,單 HDFS 存儲(chǔ)集群一般超過8億 metadata file(目錄+文件)則處于高負(fù)載,會(huì)對 HDFS 存儲(chǔ)集群后續(xù)穩(wěn)定性帶來壓力。
Merge into+Time travel 代替?zhèn)鹘y(tǒng)數(shù)倉拉鏈表
微信實(shí)驗(yàn)平臺(tái)會(huì)有命中信息增量變更的場景,即數(shù)倉同學(xué)所熟悉的緩慢變化維問題。
|
|
一般此類問題的傳統(tǒng)解決方案都是基于 Hive 拉鏈表來實(shí)現(xiàn)的,來減少重復(fù)的冗余數(shù)據(jù),Hive 拉鏈表雖然可以解決業(yè)務(wù)問題,但是效率和靈活性都較低。我們引入了高效的數(shù)據(jù)湖表格式 Iceberg 來解決相應(yīng)問題,相比于樸素的 Hive 增加了很多變化和靈活性。
Hive 拉鏈表的方式來減少重復(fù)的冗余數(shù)據(jù),記錄加上 start_time,end_time 作為生效起止時(shí)間, 但是此種方式帶來的新問題就是每日計(jì)算時(shí)都需要拉取全部數(shù)據(jù)讀入進(jìn) MapReduce/Spark 等計(jì)算框架內(nèi),將新增數(shù)據(jù)處理后再寫入,需要消耗的計(jì)算資源很大,如果數(shù)據(jù)量特別大也很容易導(dǎo)致集群負(fù)載壓力過大使任務(wù)失敗。并且在讀取拉鏈表的時(shí)候也需要加過濾條件(where >=start_time and <end_time)會(huì)掃描過濾很多無關(guān)的數(shù)據(jù),導(dǎo)致查詢效率低下。
基于新的數(shù)據(jù)湖表格式 Iceberg 來更優(yōu)雅地處理緩慢變化維問題,對比傳統(tǒng)解決該問題基于 Hive 的拉鏈表方案的優(yōu)勢。歸納起來主要是通過如下方式實(shí)現(xiàn):
使用 Merge Into 替代 Insert Overwrite
采用 Merge Into 進(jìn)行增量數(shù)據(jù)批量變更(update/insert/delete)。
是通過重寫相關(guān)文件,即包含在提交中需要更新的行的數(shù)據(jù)文件來支持 Merge Into,相對比于 Insert Overwrite 的方式,Iceberg 只替換受影響的數(shù)據(jù)文件來提升運(yùn)行效率寫入效率。
MERGE INTO iceberg_catalog.mmexpt_lakehouse.mmexpt_cumu_finder tUSING (selectfirst_hit_ds,uin,exptid,groupid,bucketsrc_hitfromiceberg_catalog.mmexpt_lakehouse.mmexpt_daily_finder) sON t.uin = s.uin and t.groupid = s.groupidWHEN MATCHED AND s.ds < t.first_hit_ds THEN UPDATE SET t.first_hit_ds = s.dsWHEN NOT MATCHEDTHEN INSERT (first_hit_ds, uin, exptid, groupid, bucketsrc_hit, ext_int, ext_string)VALUES (s.first_hit_ds, s.uin, s.exptid, s.groupid, s.bucketsrc_hit, null, null);
使用 Time Travel Snapshot 代替拉鏈表冗余的記錄有效起止時(shí)間 start_time,end_time 屬性字段
可以使用 time travel in sql queries,比如查詢2022-12-07 01:21:00 的歷史狀態(tài),可以直接用。
-- time travel to 2022-12-07 01:21:00SELECT * FROM mmexpt_lakehouse.table TIMESTAMP AS OF '2022-12-07 01:21:00';
另外由于廠內(nèi) iceberg 老版本還不支持 timestamp as of 等語法,iceberg/issues/270 我們給廠內(nèi)數(shù)平同學(xué)單獨(dú)提了issue,在 iceberg metadata 中加入了 custom-timestamp 結(jié)合 sql hint 來代替 timestamp as of 方式。后續(xù)我們計(jì)劃應(yīng)用 Iceberg 1.2.2 帶來的 Branching and Tagging 來去做更優(yōu)雅的 Snapshot Time Travel。
特殊情況處理
例如歷史數(shù)據(jù)出錯(cuò),則可以直接回滾到具體出錯(cuò)前的 snapshot。
讓用戶在每次提交的 snapshot 列表中切換,比如 version rollback,set snapshot id。
Roll back table db.sample to snapshot ID 1:CALL catalog_name.system.rollback_to_snapshot('db.sample', 1)Sets the current snapshot ID for a table.CALL catalog_name.system.set_current_snapshot('db.sample', 1)
然后數(shù)據(jù)修正后 commit 到其后的 snapshot 中。
總結(jié)起來,其實(shí)就是利用 Iceberg 的三層元數(shù)據(jù)體系帶來的靈活性,可以解決 Hive 實(shí)現(xiàn)傳統(tǒng)拉鏈表方式下的寫入效率低,查詢效率低,靈活性低,易用性低等問題。在特定業(yè)務(wù)超大拉鏈表的場景中,任務(wù)寫入及查詢效率都帶來了指數(shù)級的提升。
針對此方案我們也申請了專利《一種基于數(shù)據(jù)湖表格式處理緩慢變化維問題的新方法》專利立項(xiàng)編號(hào):2023010065CN
流批一體

Iceberg 使 CDC 場景做分鐘級寫入成為可能,可以將 Iceberg 統(tǒng)一流批 Pipeline,作為公共上游,使代碼復(fù)用,減少數(shù)據(jù)冗余,并從根源上規(guī)避數(shù)據(jù)不一致等問題。同時(shí)我們也希望精簡全鏈路,過多的 step 會(huì)增加數(shù)據(jù)開發(fā)的成本,也會(huì)降低全鏈路的穩(wěn)定性和可靠性。如上圖所示,架構(gòu)也會(huì)更加優(yōu)雅。

在我們的使用實(shí)踐過程中,發(fā)現(xiàn) THive 兼容性不足,其中默認(rèn)的 ORC 為廠內(nèi)魔改版本,帶來一定的對接使用隱患,比如 ClickHouse ORC 外表無法識(shí)別。ORC 魔改版本在 Spark 上的優(yōu)化,也距離原生組件有些差距。
總結(jié)起來 Iceberg 方案的優(yōu)勢,對比太過樸素的 Hive,兼容性不足的 THive,Iceberg 帶來的高級 Feature: 包括 ACID 粗粒度事務(wù)語義,可以避免臟讀及下游失敗等問題,借助于三層 Metadata 實(shí)現(xiàn)的 snapshot、time travel、schema evolution/partition evolution, row-level upsert/delete 等 feature 都帶來了極致的靈活性。在業(yè)務(wù)升級、問題回滾相較于樸素的 Hive 帶來了新的優(yōu)雅的解決思路。配合異步 Auto-optimizing 服務(wù)優(yōu)化數(shù)據(jù)存儲(chǔ)組織方式(定期 compact 或進(jìn)行合理排序和分組),提高查詢效率,給我們帶來很大收益。
我們已經(jīng)將 20PB 的歷史數(shù)據(jù)遷移到 Iceberg 庫上,并且后續(xù)增量數(shù)據(jù)默認(rèn)采用 Iceberg 作為數(shù)據(jù)基座。
結(jié)合社區(qū)開源版本優(yōu)化紅利,Spark 3.3全面接入(Gemini on Spark Oteam),帶來的增強(qiáng)了 AQE(adaptive query execution) 能力,增加 row-level runtime filter 來補(bǔ)充 Dynamic Partition pruning 等 Feature,及 Iceberg 1.2.2的全面接入,我們從計(jì)算性能、存儲(chǔ)占用兩方面進(jìn)行了優(yōu)化的實(shí)踐,最終效果為,計(jì)算上總核時(shí)優(yōu)化69.4%,節(jié)省約20w 核時(shí)/天,存儲(chǔ)空間上優(yōu)化約100PB,總計(jì)折合降本預(yù)計(jì)約3kw/年。在降本的同時(shí)提升了離線計(jì)算的效率。計(jì)算任務(wù) p99耗時(shí)減少70%, 平均任務(wù)耗時(shí)減少60%。

針對數(shù)據(jù)開發(fā)過程中的業(yè)務(wù)常見問題- 數(shù)據(jù)傾斜問題,小文件問題,隨機(jī)性問題,我們都有遇到,并有一套解決方式供大家參考。
數(shù)據(jù)傾斜問題
-
分區(qū)數(shù)據(jù)傾斜
如上方案一描述的,我們采用實(shí)驗(yàn) ID 作為二級分區(qū),每個(gè)實(shí)驗(yàn)的命中流量都是不均勻的,尤其針對一些全流量的 holdout 實(shí)驗(yàn),就針對寫入 Iceberg 的 Stage 做了單獨(dú)的大實(shí)驗(yàn)傾斜處理,在寫入前的重分布過程中,加入了打散化處理。
val bucketIdHashUdf = udf((exptid: Long, uin: Long) => {val maxExptIds: ListBuffer[Long] = maxExptIdsBroadCast.valueif (maxExptIds.contains(exptid)) {exptid.toString + "_" + ((uin.hashCode() & Integer.MAX_VALUE) % 50)} else {exptid.toString}})val icerbergDf = df.withColumn("bucket_id", bucketIdHashUdf(col("exptid"), col("uin"))).repartition(partitionNum, col("ds"), col("bucket_id")).sortWithinPartitions("ds", "exptid")
-
Merge Into 寫入傾斜
在 Iceberg TBLPROPERTIES 中加入了 Write Properties。
'write.distribution-mode' = 'range' -- Defines distribution of write data: none: don’t shuffle rows; hash: hash distribute by partition key ; range: range distribute by partition key or sort key if table has an SortOrder
批寫小文件問題
相對于實(shí)時(shí)場景下分鐘級 commit 造成 snapshot 及 datafile 膨脹的問題,我們面對的場景是 batch 場景,基本為日度例行任務(wù),需要合理配置 targetSizeInBytes,及合理控制 spark stage 的 partition number,來規(guī)避 batch 場景寫入 iceberg 的小文件太多問題,即每個(gè) spark 的 partition 都會(huì)寫入 iceberg datafile 如果寫入的 iceberg datafile < write.target-file-size-bytes 則直接寫對應(yīng)的文件,如果寫入的 iceberg datafile > write.target-file-size-bytes 則會(huì)拆分多個(gè)文件 split 寫入。
同時(shí),因?yàn)槲覀兊拇鎯?chǔ)資源量級太大, 也跟數(shù)平運(yùn)維同學(xué),申請了專屬獨(dú)占的 HDFS 存儲(chǔ)集群,來保證業(yè)務(wù)穩(wěn)定性,避免 NameNode 過載導(dǎo)致文件讀寫延遲變大或者 Connect Fail Exception 等問題,并開通了存儲(chǔ)集群 grafana 監(jiān)控等權(quán)限,提前預(yù)知集群健康度對任務(wù)的影響。

隨機(jī)性問題
預(yù)期中的 Spark 任務(wù)應(yīng)該具有冪等性,即任務(wù)多次運(yùn)行的結(jié)果應(yīng)該完全相同,當(dāng)出現(xiàn)結(jié)果不同的隨機(jī)性問題時(shí),就很難去回放數(shù)據(jù)。
-
Accumulator 帶來的隨機(jī)性問題
因?yàn)槲覀兊某笕蝿?wù)規(guī)模比較大(單任務(wù)讀寫>20T),運(yùn)行過程中因?yàn)闄C(jī)器的負(fù)載等問題,導(dǎo)致 task fail 甚至 stage fail 很正常,恰恰 fail 造成了 Accumulator 的數(shù)據(jù)運(yùn)行不一致,spark document 上有注明。
For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.
Accumulator 的更新應(yīng)該在 action 算子中,而不應(yīng)該在 transformation 算子中,來保證 Accumulator 的更新只會(huì)應(yīng)用一次。
-
Random 處理數(shù)據(jù)傾斜帶來的隨機(jī)性問題
處理數(shù)據(jù)傾斜時(shí),常用的方案為在傾斜 key 上加入隨機(jī)數(shù)來進(jìn)行打散,但是這種處理方式在 Shuffle Fail 進(jìn)行 retry 時(shí),數(shù)據(jù)會(huì)被不同的 task 重復(fù) fetch,導(dǎo)致引入隨機(jī)性問題。小任務(wù)不太容易出現(xiàn) Shuffle Fail 的問題,超大任務(wù)或者集群負(fù)載水位較高時(shí),則更容易觸發(fā)此類問題,應(yīng)該用取模或者哈希之類的冪等函數(shù)來打散傾斜的 Key,規(guī)避此類隨機(jī)性問題。
其他思考
另外關(guān)于其他 Iceberg Data Skipping 層面的排序、索引等數(shù)據(jù)組織上優(yōu)化的考慮,我們也做了一些思考。
如前面提及的,實(shí)驗(yàn)平臺(tái)業(yè)務(wù)不是層級復(fù)雜、主題域多樣的業(yè)務(wù)數(shù)倉建設(shè)場景,而是僅具備超大規(guī)模指標(biāo)計(jì)算的單一場景。所以我們的工作重點(diǎn)也不在高效復(fù)雜的數(shù)倉建設(shè)上,而是在于大規(guī)模指標(biāo)計(jì)算優(yōu)化上。業(yè)務(wù)上決定了我們沒有字段點(diǎn)查的場景,所以并沒有使用 bloom filter、bitmap filter 等字段索引 feature,僅使用 Iceberg 默認(rèn)存儲(chǔ)文件級別每列的 Min、Max 信息,并用于 TableScan 階段的文件過濾。Z-Order 對我們業(yè)務(wù)場景收益不大,沒有太多的基于某個(gè)表的多個(gè)常用字段進(jìn)行 filter 的 data-skipping 需求。

我們的實(shí)時(shí)指標(biāo)計(jì)算場景,我們沒有復(fù)雜的 ETL pipeline,主要利用 OLAP(StarRocks/ClickHouse)等 SQL 表達(dá)能力強(qiáng)的引擎,作為實(shí)時(shí)指標(biāo)計(jì)算的依賴引擎,而非 Flink/Structured Streaming 等可編程能力強(qiáng)的計(jì)算框架。
ClickHouse 是性能優(yōu)秀的 OLAP 引擎,但是 Clickhouse sql 表達(dá)及優(yōu)化能力,普適性不足。為了解決 clickhouse shuffle 問題及冷數(shù)據(jù)復(fù)用避免轉(zhuǎn)移到 cos 等對象存儲(chǔ), 實(shí)現(xiàn)極速的 SQL on Iceberg,更好的 Ad Hoc Analysis 體驗(yàn),我們后續(xù)的實(shí)時(shí)湖倉方案會(huì)采用 StarRocks 3.x + Iceberg,便于湖倉融合。
StarRocks 既能兼容 TPC-DS Benchmark 的語法,在 ClickBench Benchmark(https://benchmark.clickhouse.com/) 打榜上和 Top 1的 Clickhouse 性能極其接近,更注定了 StarRocks 發(fā)展上限很高。

硬件層面計(jì)算框架加速
Steaming Lakehouse
??湖倉一體有哪些優(yōu)勢,你對這個(gè)技術(shù)有什么看法?歡迎留言。我們將挑選一則最有意義的評論,為其留言者送出騰訊定制-便捷通勤袋1個(gè)(見下圖)。8月30日中午12點(diǎn)開獎(jiǎng)。


第一時(shí)間看鵝廠架構(gòu)設(shè)計(jì)經(jīng)驗(yàn)




