數(shù)據(jù)湖YYDS! Flink+IceBerg實(shí)時(shí)數(shù)據(jù)湖實(shí)踐
點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”
回復(fù)”面試“獲取更多驚喜

數(shù)據(jù)湖的前世今生
互聯(lián)網(wǎng)技術(shù)發(fā)展的當(dāng)下,數(shù)據(jù)是各大公司最寶貴的資源之一已經(jīng)是不爭(zhēng)的事實(shí)。收據(jù)的收集、存儲(chǔ)和分析已經(jīng)成為科技公司最重要的技術(shù)組成部分。大數(shù)據(jù)領(lǐng)域經(jīng)過近十年的高速發(fā)展,無論是實(shí)時(shí)計(jì)算還是離線計(jì)算、無論是數(shù)據(jù)倉(cāng)庫(kù)還是數(shù)據(jù)中臺(tái),都已經(jīng)深入各大公司的各個(gè)業(yè)務(wù)。
"數(shù)據(jù)湖"這個(gè)概念從 2020 年中期開始頻繁走入大眾視野。然而對(duì)于數(shù)據(jù)湖的定義上確沒有統(tǒng)一的標(biāo)準(zhǔn)。但是我們從維基百科、AWS、阿里云的官網(wǎng)描述中可以找到一些共同點(diǎn):
多計(jì)算引擎支持
數(shù)據(jù)湖需要支持大數(shù)據(jù)領(lǐng)域的常見的計(jì)算引擎,包括Flink、Spark、Hive等,同時(shí)支持流處理和批處理;
支持多種存儲(chǔ)引擎
存儲(chǔ)引擎應(yīng)包含常見的結(jié)構(gòu)化存儲(chǔ):MySQL、Hbase、OLAP 數(shù)據(jù)庫(kù);也應(yīng)該支持常見的非結(jié)構(gòu)化存儲(chǔ):HDFS、小文件存儲(chǔ)引擎等;
支持?jǐn)?shù)據(jù)更新和事務(wù)(ACID)
需要方便的對(duì)數(shù)據(jù)進(jìn)行更新,并且需要滿足事務(wù)特性;
元數(shù)據(jù)管理和數(shù)據(jù)質(zhì)量保障
數(shù)據(jù)湖應(yīng)提供統(tǒng)一的元數(shù)據(jù)管理和企業(yè)級(jí)的權(quán)限體系。數(shù)據(jù)湖相比于傳統(tǒng)的數(shù)據(jù)倉(cāng)庫(kù)最核心的能力之一在于支持各種各樣的非結(jié)構(gòu)化數(shù)據(jù),基于這樣的背景,誕生了類似Hudi、IceBerg之類的數(shù)據(jù)湖存儲(chǔ)技術(shù)。
各大云廠商的數(shù)據(jù)湖架構(gòu)
這部分我們用國(guó)內(nèi)外主流的云服務(wù)商的產(chǎn)品來做介紹,看看各大廠商的技術(shù)架構(gòu)設(shè)計(jì)有什么區(qū)別和共同點(diǎn)。
阿里云

在阿里云官網(wǎng)上給出了云原生企業(yè)級(jí)數(shù)據(jù)湖解決方案,該方案的四個(gè)顯著的優(yōu)勢(shì)是:
海量彈性: 計(jì)算存儲(chǔ)分離,存儲(chǔ)規(guī)模彈性擴(kuò)容 生態(tài)開放:對(duì)Hadoop生態(tài)友好,且無縫對(duì)接阿里云各計(jì)算平臺(tái) 高性價(jià)比:統(tǒng)一存儲(chǔ)池,避免重復(fù)拷貝,多種類型冷熱分層 更易管理:加密、授權(quán)、生命周期、跨區(qū)復(fù)制等統(tǒng)一管理
并且,阿里云給出了利用開源生態(tài)構(gòu)建數(shù)據(jù)湖的方案:

在這個(gè)開源場(chǎng)景的架構(gòu)下,幾大關(guān)鍵的技術(shù)點(diǎn):
支撐 EB 規(guī)模的數(shù)據(jù)湖,支持多種數(shù)據(jù)通道,全面覆蓋日志、消息、數(shù)據(jù)庫(kù)、HDFS 各種數(shù)據(jù)源 無縫對(duì)接 Hive、Spark、Presto、Impala 等大數(shù)據(jù)處理引擎,消除數(shù)據(jù)孤島 Data Lake Formation 提供數(shù)據(jù)湖元數(shù)據(jù)管理、數(shù)據(jù)湖加速等服務(wù)
AWS

AWS 在 2018 年推出了 AWS Lake Formation,它的上游是 S3 存儲(chǔ)以及 NoSQL 存儲(chǔ),AWS Lake Formation 承擔(dān)了元數(shù)據(jù)定義的功能,寫入 S3 中的數(shù)據(jù)包括爬蟲數(shù)據(jù)、ETL 數(shù)據(jù)、日志數(shù)據(jù)等等,并且 AWS 提供了完整的權(quán)限體系。
華為云

華為數(shù)據(jù)湖治理中心完全兼容了Spark、Flink的生態(tài),提供一站式的流處理、批處理、交互式分析的Serverless融合處理分析服務(wù)。用戶不需要管理任何服務(wù)器,即開即用。支持標(biāo)準(zhǔn)SQL/Spark SQL/Flink SQL,支持多種接入方式,并兼容主流數(shù)據(jù)格式。數(shù)據(jù)無需復(fù)雜的抽取、轉(zhuǎn)換、加載,使用SQL或程序就可以對(duì)云上數(shù)據(jù)庫(kù)以及線下數(shù)據(jù)庫(kù)的異構(gòu)數(shù)據(jù)進(jìn)行探索。
看了國(guó)內(nèi)外主流云廠商的數(shù)據(jù)湖解決方案,我個(gè)人認(rèn)為數(shù)據(jù)湖的出現(xiàn)并不是一項(xiàng)創(chuàng)新的技術(shù),更像是一種數(shù)據(jù)理念的發(fā)展。數(shù)據(jù)湖不是一個(gè)簡(jiǎn)單的技術(shù),實(shí)現(xiàn)數(shù)據(jù)湖的方式多種多樣,我們?cè)u(píng)價(jià)一個(gè)數(shù)據(jù)湖解決方案的成熟與否,關(guān)鍵在于其提供的數(shù)據(jù)治理、元數(shù)據(jù)管理、數(shù)據(jù)計(jì)算、權(quán)限管理的成熟程度。
湖倉(cāng)一體才是未來?
在數(shù)據(jù)湖的發(fā)展過程中,Data Lakehouse(湖倉(cāng)一體)數(shù)據(jù)架構(gòu)被推上了風(fēng)口浪尖。湖倉(cāng)一體架構(gòu)的出現(xiàn)結(jié)合了傳統(tǒng)數(shù)據(jù)倉(cāng)庫(kù)和數(shù)據(jù)湖的優(yōu)勢(shì)。Lakehouse的概念最早是由 Databricks 所提出的:《What is a Lakehouse?》,Databricks的出現(xiàn)使得數(shù)據(jù)的存儲(chǔ)變得更加廉價(jià)和具有彈性。并且在提升數(shù)據(jù)質(zhì)量上游長(zhǎng)足的進(jìn)步。Data Lakehouse有一些關(guān)鍵特性:
事務(wù)支持 Schema支持 端到端的流式支持 計(jì)算存儲(chǔ)分離
Lakehouse通常會(huì)使用Iceberg,Hudi,Delta Lake等開源組件,將最底層的數(shù)據(jù)存儲(chǔ)格式進(jìn)行統(tǒng)一。并且Lakehouse支持不同的語言進(jìn)行直接查詢。湖倉(cāng)一體的架構(gòu)將數(shù)據(jù)倉(cāng)庫(kù)和數(shù)據(jù)湖進(jìn)行了打通,兼具靈活存儲(chǔ)的同時(shí)極大地降低了數(shù)據(jù)管理、計(jì)算和存儲(chǔ)成本。
Flink+Iceberg構(gòu)建數(shù)據(jù)湖實(shí)戰(zhàn)
2.1 數(shù)據(jù)湖三劍客
在數(shù)據(jù)湖解決方案中有非常重要的一環(huán),那就是數(shù)據(jù)存儲(chǔ)和數(shù)據(jù)計(jì)算之間的格式適配。大數(shù)據(jù)領(lǐng)域發(fā)展至今,各個(gè)領(lǐng)域已經(jīng)非常成熟,無論是實(shí)時(shí)計(jì)算引擎 Flink 和 Spark,海量消息中間件 Kafka,各式各樣的數(shù)據(jù)存儲(chǔ)OLAP等已經(jīng)形成了足夠完善的數(shù)據(jù)解決方案體系。但是不同數(shù)據(jù)計(jì)算引擎在計(jì)算時(shí)需要讀取數(shù)據(jù),數(shù)據(jù)格式需要根據(jù)不同的計(jì)算引擎進(jìn)行適配。
這是一個(gè)非常棘手的問題,這個(gè)中間層不單單是數(shù)據(jù)存儲(chǔ)的格式問題,更是一種元數(shù)據(jù)的組織方式。正是這樣一種解決方案:介于上層計(jì)算引擎和底層存儲(chǔ)格式。成為數(shù)據(jù)湖解決方案中的關(guān)鍵一環(huán)。
目前的開源領(lǐng)域出現(xiàn)了 Delta、Apache Iceberg 和 Apache Hudi 三種比較成熟的解決方案。網(wǎng)上已經(jīng)有很多的文章來介紹三者的區(qū)別,因?yàn)槠脑蛭疫@里不再展開了。
但是我想說的是,這三種方案并沒有明顯的優(yōu)劣之分,需要用戶結(jié)合自己的業(yè)務(wù)情況進(jìn)行選擇。
2.2 Flink+IceBerg開發(fā)實(shí)戰(zhàn)案例
Apache Iceberg 的官網(wǎng)對(duì) IceBerg 的能力和定位做了以下闡述:
Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Trino and Spark that use a high-performance format that works just like a SQL table. Iceberg是一個(gè)為大規(guī)模數(shù)據(jù)集設(shè)計(jì)的通用的表格形式。并且適配Trino(原PrestoSQL)和Spark,提供SQL化解決方案。
根據(jù)官方文檔的提示,IceBerg有一系列的特性如下:
模式演化,支持添加,刪除,更新或重命名,并且沒有副作用 隱藏分區(qū),可以防止導(dǎo)致錯(cuò)誤提示或非常慢查詢的用戶錯(cuò)誤 分區(qū)布局演變,可以隨著數(shù)據(jù)量或查詢模式的變化而更新表的布局 快照控制,可實(shí)現(xiàn)使用完全相同的表快照的可重復(fù)查詢,或者使用戶輕松檢查更改 版本回滾,使用戶可以通過將表重置為良好狀態(tài)來快速糾正問題 快速掃描數(shù)據(jù),無需使用分布式SQL引擎即可讀取表或查找文件 數(shù)據(jù)修剪優(yōu)化,使用表元數(shù)據(jù)使用分區(qū)和列級(jí)統(tǒng)計(jì)信息修剪數(shù)據(jù)文件 兼容性好,可以存儲(chǔ)在任意的云存儲(chǔ)系統(tǒng)和HDFS中 支持事務(wù),序列化隔離 表更改是原子性的,讀者永遠(yuǎn)不會(huì)看到部分更改或未提交的更改 高并發(fā),高并發(fā)寫入器使用樂觀并發(fā),即使寫入沖突,也會(huì)重試以確保兼容更新成功
其中的幾個(gè)特性精準(zhǔn)的命中了用戶的痛點(diǎn),包括:
ACID和多版本支持 支持批/流讀寫 多種分析引擎的支持
Apache Iceberg的社區(qū)非?;钴S,積極擁抱 Flink 社區(qū)的實(shí)時(shí)計(jì)算體系,提供了非常友好和連接器,大大降低了開發(fā)門檻。截止小編發(fā)稿前為止,Apache IceBerg 已經(jīng)更新到了 0.12.0 版本,并且仍在高速迭代中。
https://uploader.shimowendang.com/f/aMECh654lRQPVZxd.png!thumbnail?accessToken=eyJhbGciOiJIUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJhY2Nlc3NfcmVzb3VyY2UiLCJleHAiOjE2MzM5NzI0MjcsImciOiJrV0tXNmdRcnI4Z0pSUXRKIiwiaWF0IjoxNjMzOTcyMTI3LCJ1c2VySWQiOjE4NTAyMDc1fQ.-s9-0vlGqhwfssky9IAAJ2ByuzEkBlPUyZZrdiUuRqw
2.3 Flink 整合 IceBerg
我們這里采用經(jīng)典的 Iamda 架構(gòu),實(shí)時(shí)鏈路通過 Flink 消費(fèi) Kafka 并且通過操作 IceBerg 將數(shù)據(jù)同步到數(shù)據(jù)湖內(nèi)。

Flink 整合 IceBerg
目前 IceBerg 同時(shí)支持 Flink DataStream 和 SQL 兩種方式,目前 IceBerg 對(duì) Flink 的支持為 Flink 1.11.x 版本(Flink 1.12.x 版本沒有經(jīng)過詳細(xì)測(cè)試),詳細(xì)的兼容情況如下:

首先我們需要添加依賴,根據(jù)官方文檔我們需要添加的依賴如下:
????org.apache.iceberg
????iceberg-flink-runtime
????0.11.1
首先我們需要?jiǎng)?chuàng)建 CataLog,CataLog 是保存了 IceBerg 和 HDFS 的目錄的映射關(guān)系:
CREATE?CATALOG?iceberg_catalog?WITH?(
??'type'='iceberg',
??'catalog-type'='hive',"?+
???'warehouse'='hdfs://localhost/user/hive/warehouse',
???'uri'='thrift://localhost:9083'
)
然后我們需要在 iceberg_catalog 中創(chuàng)建表:
CREATE?TABLE?iceberg_catalog.iceberg_hadoop_db.iceberg_table?(
????user_id?STRING,
????amount?DOUBLE,
????time_stamp?STRING)?
PARTITIONED?BY?(time_stamp)
WITH?('connector'='iceberg','write.format.default'='orc')
此時(shí)我們就可以通過 Flink 將 Kafka 中的數(shù)據(jù)通過 IceBerg 寫入 HDFS 中了,這里一定要注意,在 Flink 中一定要開啟 CheckPoint:
StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.createLocalEnvironment();
env.enableCheckpointing(300?*?1000);
env.getCheckpointConfig().setCheckpointTimeout(60000);
然后我們需要使用 Hive CataLog 創(chuàng)建一張 Kafka Source 表,讀取 Kafka 中的數(shù)據(jù):
String?HIVE_CATALOG?=?"hive_catalog";
String?DEFAULT_DATABASE?=?"tmp";
String?HIVE_CONF_DIR?=?"/xx/resources";
Catalog?catalog?=?new?HiveCatalog("hive_catalog",?"hive_catalog_database",?"/user/hive/resources");
tenv.registerCatalog("hive_catalog",?catalog);
tenv.useCatalog("hive_catalog");
//?create?kafka?stream?table
tenv.executeSql("DROP?TABLE?IF?EXISTS?kafka_source_iceberg");
tenv.executeSql(
????????"CREATE?TABLE?kafka_source_iceberg?(\n"?+
????????????????"?user_id?STRING,\n"?+
????????????????"?amount?DOUBLE,\n"?+
????????????????"?time_stamp?STRING\n"?+
????????????????")?WITH?(\n"?+
????????????????"??'connector'='kafka',\n"?+
????????????????"??'topic'='kafka_topic',\n"?+
????????????????"??'scan.startup.mode'='latest-offset',\n"?+
????????????????"??'properties.bootstrap.servers'='localhost:9092',\n"?+
????????????????"??'properties.group.id'?=?'iceberg_group',\n"?+
????????????????"??'format'='json'\n"?+
????????????????")");
然后我們就可以消費(fèi) Kafka Source 表中的數(shù)據(jù),并且寫入 IceBerg 中了:
tenv.executeSql(
????????"INSERT?INTO?iceberg_catalog.iceberg_hadoop_db.iceberg_table"?+
????????????????"?SELECT?user_id,?amount,?time_stamp?FROM?hive_catalog.hive_catalog_database.kafka_source_iceberg");
到此,我們就完成了整個(gè)實(shí)時(shí)數(shù)據(jù)的入湖過程。
總結(jié)
數(shù)據(jù)湖的發(fā)展方興未艾,開源社區(qū)仍然在高速迭代中,但是可以預(yù)見的是,數(shù)據(jù)湖或者湖倉(cāng)一體的數(shù)據(jù)架構(gòu)未來一定會(huì)成為主流,是每個(gè)數(shù)據(jù)開發(fā)人員都需要掌握的知識(shí)。

八千里路云和月 | 從零到大數(shù)據(jù)專家學(xué)習(xí)路徑指南
我們?cè)趯W(xué)習(xí)Flink的時(shí)候,到底在學(xué)習(xí)什么?
193篇文章暴揍Flink,這個(gè)合集你需要關(guān)注一下
Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS
Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點(diǎn)
我們?cè)趯W(xué)習(xí)Spark的時(shí)候,到底在學(xué)習(xí)什么?
在所有Spark模塊中,我愿稱SparkSQL為最強(qiáng)!
硬剛Hive | 4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)
4萬字長(zhǎng)文 | ClickHouse基礎(chǔ)&實(shí)踐&調(diào)優(yōu)全視角解析
【面試&個(gè)人成長(zhǎng)】2021年過半,社招和校招的經(jīng)驗(yàn)之談
大數(shù)據(jù)方向另一個(gè)十年開啟 |《硬剛系列》第一版完結(jié)
我寫過的關(guān)于成長(zhǎng)/面試/職場(chǎng)進(jìn)階的文章
當(dāng)我們?cè)趯W(xué)習(xí)Hive的時(shí)候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」
你好,我是王知無,一個(gè)大數(shù)據(jù)領(lǐng)域的硬核原創(chuàng)作者。
做過后端架構(gòu)、數(shù)據(jù)中間件、數(shù)據(jù)平臺(tái)&架構(gòu)、算法工程化。
專注大數(shù)據(jù)領(lǐng)域?qū)崟r(shí)動(dòng)態(tài)&技術(shù)提升&個(gè)人成長(zhǎng)&職場(chǎng)進(jìn)階,歡迎關(guān)注。
