Flink 實(shí)踐 | 基于 Flink Kylin Hudi 湖倉一體的大數(shù)據(jù)生態(tài)體系
湖倉一體的架構(gòu)
Flink/Hudi/Kylin 介紹與融合
T3 出行結(jié)合湖倉一體的實(shí)踐

首先第一點(diǎn),是沒有走現(xiàn)在傳統(tǒng)數(shù)倉所廣泛應(yīng)用的 Shared-Nothing 這個(gè)架構(gòu),而是轉(zhuǎn)向 Shared-Data 這個(gè)架構(gòu)。
其次,論文中重點(diǎn)提及的存儲和計(jì)算分離,是文中我覺得最有價(jià)值的一個(gè)觀點(diǎn)。他提出了統(tǒng)一存儲然后彈性計(jì)算的這樣一個(gè)觀念。
第三,數(shù)倉及服務(wù)是我認(rèn)為他們商業(yè)化最成功的點(diǎn)。它將數(shù)倉提供了一個(gè) SaaS 化的體驗(yàn),并且摒棄傳統(tǒng)上大家認(rèn)為的數(shù)倉是大而重的偏見。
第四,高可用這一塊是提高用戶體驗(yàn)和容錯的很關(guān)鍵的一個(gè)點(diǎn)。
最后,結(jié)構(gòu)化延伸到半結(jié)構(gòu)化這一塊已經(jīng)體現(xiàn)當(dāng)時(shí)他們能夠探索湖上通用數(shù)據(jù)的能力。

第一點(diǎn),Table 上的數(shù)據(jù)可以進(jìn)行跨節(jié)點(diǎn)的水平分區(qū),并且每個(gè)節(jié)點(diǎn)有自己的本地存儲。每個(gè)節(jié)點(diǎn)的計(jì)算資源,只關(guān)注處理每個(gè)節(jié)點(diǎn)自己存儲的數(shù)據(jù)。
所以它的另一個(gè)優(yōu)點(diǎn)就是它的處理機(jī)制相對簡單,是數(shù)倉領(lǐng)域很典型的一個(gè)架構(gòu)。

最大的一點(diǎn)就是他耦合了計(jì)算與存儲資源,
同時(shí)也帶來第二個(gè)問題,就是彈性不足。具體可以體現(xiàn)在 2 個(gè)方面。
集群在擴(kuò)縮容的時(shí)候,數(shù)據(jù)需要被大量重分布
沒有辦法簡單地卸載不用的計(jì)算資源。
第三個(gè)問題是,耦合計(jì)算和存儲資源同時(shí)也就造成了它的可用性是相當(dāng)有限的。由于這些稱之為有狀態(tài)的計(jì)算,所以在失敗或者是升級的時(shí)候會顯著影響性能,并會導(dǎo)致服務(wù)整體不可用的狀態(tài)。
最后是同構(gòu)的資源與異構(gòu)的負(fù)載的問題。因?yàn)樵跀?shù)倉的場景中,我們有很多異構(gòu)的負(fù)載,比如說批量的加載,查詢,報(bào)表的大規(guī)模計(jì)算分析等等。但 Shared-Nothing 架構(gòu)的資源是同構(gòu)的,所以這帶來兩者之間的碰撞。

這個(gè)架構(gòu)的第一個(gè)優(yōu)勢是它沒有數(shù)據(jù)孤島,是一個(gè)統(tǒng)一的存儲。這也就能夠?qū)⒋鎯挠?jì)算中進(jìn)行解耦。
第二個(gè)優(yōu)勢是基于現(xiàn)在的對象存儲去容納結(jié)構(gòu)化和非結(jié)構(gòu)化數(shù)據(jù)。
第三,它的集群規(guī)模是可以彈性作用的。
第四,上述特征同時(shí)也帶來了按需計(jì)算這個(gè)低成本優(yōu)點(diǎn)。

最底層是云廠商提供的對象存儲,也就是用戶的存儲。
中間層是多用途多份的計(jì)算集群。
再往上是數(shù)據(jù)湖的管理服務(wù),它存載的是一個(gè)大的 SaaS 化的平臺,是對整個(gè)底層存儲以及計(jì)算集群的管理的角色。

首先,作為一個(gè) SaaS 化的應(yīng)用,它的容錯性是需要體現(xiàn)在整體架構(gòu)上。這里我們同樣分層來回顧一下。
最底層的存儲層利用了云廠商的對象存儲能力,他本身是一個(gè)跨中心復(fù)制以及接近無限擴(kuò)容的一個(gè)機(jī)制,所以用戶基本無需關(guān)心。
再往上是多元的計(jì)算集群。每個(gè)計(jì)算集群是在同一個(gè)數(shù)據(jù)中心內(nèi),來保證它網(wǎng)絡(luò)傳輸?shù)男阅?。這里就提到一個(gè)問題,有可能某一個(gè)計(jì)算集群會有節(jié)點(diǎn)失敗的問題。假如在一次查詢中有一個(gè)節(jié)點(diǎn)失敗,這些計(jì)算節(jié)點(diǎn)會將這個(gè)狀態(tài)返回上面的服務(wù)層。服務(wù)層在接受這個(gè)失敗后,會將這個(gè)計(jì)算再次傳遞到可用的節(jié)點(diǎn)中進(jìn)行二次查詢。所以 Shared-Data 存儲和計(jì)算分離的這種架構(gòu)上節(jié)點(diǎn)近乎是無狀態(tài)的計(jì)算。這種架構(gòu)的一個(gè)節(jié)點(diǎn)失敗就不是一個(gè)非常大的問題。
再往上服務(wù)層對于元數(shù)據(jù)的存儲也是利用了對象存儲的這個(gè)能力。所以這個(gè)服務(wù)層基本上可以看做是無狀態(tài)的服務(wù)。
最上層是一個(gè)負(fù)載均衡器,可以進(jìn)行服務(wù)的冗余和負(fù)載的均攤。

第二點(diǎn)在線升級這一塊主要利用兩個(gè)設(shè)計(jì),其實(shí)這也并不是很新穎的做法。一個(gè)是在計(jì)算層和服務(wù)層的多方面的映射,然后灰度的切換。這里可以看到在計(jì)算層是分多版本的,并且這些版本之間會共享本地的 Cache。服務(wù)層的元數(shù)據(jù)管理也是在多方面共享。這其實(shí)也是架構(gòu)內(nèi)的子 Shared-Data,對于多版本之間的數(shù)據(jù)共享能做到再升級和平滑灰度的能力。


在存儲方面,Hudi 可以支持 HDFS,OSS 和 S3。
在數(shù)據(jù)處理引擎方面,Hudi 支持 Flink 和 Spark。Java 和 Python 客戶端已經(jīng)在社區(qū)支持中。Hudi 支持兩種表,COW 和 MOR,這兩種表分別對應(yīng)低延遲的查詢和快速攝入兩種場景。
在索引方面,Hudi 支持 Bloom 和 HBase 等 4 種索引類型。底層用了 Parquet 和 Avro 存儲數(shù)據(jù),社區(qū)還正在做 ORC 格式的支持以及 SQL支持,相信不久的將來會跟大家見面。

第一個(gè)是 Copy On Write 模式,對應(yīng)到 Hudi 的 COW 表。它是一種側(cè)重低延時(shí)的數(shù)據(jù)查詢場景的表,底層使用 Parquet 數(shù)據(jù)文件存儲數(shù)據(jù),能夠支持快照查詢和增量查詢兩種查詢方式。在查詢引擎方面,大家可以看到上面有 5 個(gè)引擎,他們對快照查詢、增量查詢和讀優(yōu)化 3 種視圖都有不同程度的支持。
Merge On Read 表對 Copy On Write 有不同層面的互補(bǔ),可以看到它側(cè)重于快速的數(shù)據(jù)攝入場景。使用 Parquet 文件來存儲具體的數(shù)據(jù),使用行式 Avro 增量文件來存儲操作日志,類似于 HBase WAL。它支持 Hudi 所有 3 種視圖,可以看到 Hive,Spark SQL,Spark Datasource, Presto 和 Impala 對于讀優(yōu)化查詢都是支持的。而 Hive, Spark SQL 只支持到了快照查詢。這種組件支持的信息大家以后可以到官網(wǎng)上查詢。




第一個(gè) Hudi client 支持多引擎,將 Hudi 與 Spark 解耦,讓 Hudi 支持多引擎成為可能。
第二個(gè)是 Flink 客戶端基本實(shí)現(xiàn)貢獻(xiàn)到社區(qū),讓 Hudi 可以真正意義上寫入 Flink 數(shù)據(jù)表。這 2 個(gè)改動非常大,加在一起已經(jīng)超過了 1 萬行的代碼,也可以說是今年 Hudi 社區(qū)比較亮眼的一個(gè)特性。



統(tǒng)一使用泛型 I、K、O 代替。
去 Spark 化,抽象層 API 都是引擎無關(guān)的,難以在抽象層實(shí)現(xiàn)的,我們會把它改為抽象方法下推到 Spark 子類實(shí)現(xiàn)。
不影響原版,抽象層盡量的減少改動,以保證固定的功能性。
引入 HoodieEngineContext 代替 JavaSparkContext, 提供運(yùn)行時(shí)的上下文。

下面說 Flink Client DAG,這里主要分了 5 部分,
第一部分是 Kafka Streaming Source,主要用來接收Kafka數(shù)據(jù)并轉(zhuǎn)換成 List<HoodieRecord>。
第二個(gè)是 InstantGeneratorOperator,一個(gè) Flink 算子, 用來生成全局唯一的 instant。
第三是 KeyBy 分區(qū)操作,根據(jù) partitionPath 分區(qū)避免多個(gè)子任務(wù)將數(shù)據(jù)寫入同一個(gè)分區(qū)造成沖突。
第四個(gè)是 WriteProcessOperator,這也是我們自定義的一個(gè)算子。這個(gè)算子是寫操作實(shí)際發(fā)生的地方。
第五個(gè)是 CommitSink,他會接受上游 WriteProcessOperator 發(fā)來的數(shù)據(jù),根據(jù)上游數(shù)據(jù)判斷是否提交事務(wù)。

可以看到 insert 函數(shù)的入?yún)⑹?RDD,返回值也是 RDD。右側(cè)抽象之后的 abstract 可以看到它的入?yún)⒆兂闪朔盒虸,返回值變成了 O,有興趣的話大家可以去了解一下。



計(jì)算層我們用到了 Flink、Spark、Kylin 和 Presto 并且搭配 ES 做任務(wù)調(diào)度。數(shù)據(jù)分析和展示方面用到了達(dá)芬奇和 Zeppelin。
在存儲層,我們使用了阿里云 OSS 并搭配 HDFS 做數(shù)據(jù)存儲。數(shù)據(jù)格式方面使用 Hudi 作為主要的存儲格式,并配合 Parquet、ORC 和 Json 文件。在計(jì)算和存儲之前,我們加了一個(gè) Alluxio 來加速提升數(shù)據(jù)處理性能。資源管理方面我用到了 Yarn,在后期時(shí)機(jī)成熟的時(shí)候也會轉(zhuǎn)向 K8s。


一個(gè)是近實(shí)時(shí)的流數(shù)據(jù)管道。我們可以從左側(cè)通過 Log、MySQL 或者直接讀取業(yè)務(wù)數(shù)據(jù)的 Kafka,把數(shù)據(jù)導(dǎo)入到數(shù)據(jù)管道中,再使用 Flink 或者原版的 DeltaStreamer 將流式數(shù)據(jù)輸入到列表中。


另一個(gè)是近實(shí)時(shí)的數(shù)據(jù)分析場景。我們使用 Hive、Spark 或 Presto 查詢數(shù)據(jù),并最終用達(dá)芬奇或者 Zeppelin 做最終的數(shù)據(jù)報(bào)表。

這是我們用 Hudi 構(gòu)建的增量數(shù)據(jù)管道。最左側(cè) CDC 數(shù)據(jù)捕獲之后要更新到后面的一系列的表。有了 Hudi 之后,因?yàn)?Hudi 支持索引和增量數(shù)據(jù)處理,我們只需要去更新需要更新的數(shù)據(jù)就可以了,不需要再像以前那樣去更新整個(gè)分區(qū)或者更新整個(gè)表。

最后的一個(gè)場景是將前面介紹的用 Flink 將線上或者業(yè)務(wù)數(shù)據(jù)訂閱 ETL 到 Hudi 表中供機(jī)器學(xué)習(xí)使用。但是機(jī)器學(xué)習(xí)是需要有數(shù)據(jù)基礎(chǔ)的,所以我們利用 Hudi 將線上的數(shù)據(jù)增量發(fā)布到線下環(huán)境,進(jìn)行模型訓(xùn)練或者調(diào)參。之后再將模型發(fā)布到線上為我們的業(yè)務(wù)提供服務(wù)。

▼ 關(guān)注「Flink 中文社區(qū)」,獲取更多技術(shù)干貨 ▼
戳我,回顧作者分享視頻!