Flink+Iceberg搭建實(shí)時(shí)數(shù)據(jù)湖實(shí)戰(zhàn)

第一部分:Iceberg 核心功能原理剖析 :
Apache Iceberg
摘自官網(wǎng):
Apache Iceberg is an open table format for huge analytic datasets.
可以看到 Founders 對(duì) Iceberg 的定位是面向海量數(shù)據(jù)分析場(chǎng)景的高效存儲(chǔ)格式。海量數(shù)據(jù)分析的場(chǎng)景,類(lèi)比于 Hive 是 Hdfs 的封裝一樣,本質(zhì)上解決的還是數(shù)倉(cāng)場(chǎng)景的痛點(diǎn)問(wèn)題。
Iceberg 在最開(kāi)始,也確實(shí)是在數(shù)倉(cāng)場(chǎng)景朝著更快更好用的 format 目標(biāo)不斷演進(jìn),比如支持 schema 變更,文件粒度的 Filter 優(yōu)化等,但隨著和流式計(jì)算 Flink 引擎的生態(tài)打通,Delete/Update/Merge 語(yǔ)義的出現(xiàn),場(chǎng)景就會(huì)變得多樣化起來(lái)。
背景
過(guò)去業(yè)界更多是使用 Hive/Spark on HDFS 作為離線數(shù)據(jù)倉(cāng)庫(kù)的載體,在越來(lái)越趨于實(shí)時(shí)化和快速迭代的場(chǎng)景中,逐漸暴露出以下缺點(diǎn):
不支持 Row-Level-Update,對(duì)于更新的操作需要 overwrite 整張 Hive 表,成本極高 不支持讀寫(xiě)分離,用戶的讀取操作會(huì)被另一個(gè)用戶的寫(xiě)入操作所影響(尤其是流式讀取的場(chǎng)景) 不支持版本回滾和快照,需要保存大量歷史數(shù)據(jù) 不支持增量讀取,每次掃描全表或分區(qū)所有數(shù)據(jù) 性能低,只能裁剪到 Hive Partition 粒度 不支持 Schema 變更 .....
基本概念

如上圖所示,iceberg 將 hdfs 上的文件進(jìn)行了 snapshot、manifest list、manifest、data files 的分層。
Snapshot:用戶的每次 commit(每次寫(xiě)入的 spark job) 會(huì)產(chǎn)生一個(gè)新的 snapshot Manifest List:維護(hù)當(dāng)前 snapshot 中所有的 manifest Manifest:維護(hù)當(dāng)前 Manifest 下所有的 data files Data File:存儲(chǔ)數(shù)據(jù)的文件,后續(xù) Iceberg 引入了 Delete File,用于存儲(chǔ)要?jiǎng)h除的數(shù)據(jù),文件結(jié)構(gòu)上也是與 Data File 處在同一層
核心功能剖析
Time Travel 和增量讀取
Time Travel 指的是用戶可以任意讀取歷史時(shí)刻的相關(guān)數(shù)據(jù),以 Spark 的代碼為例:
// time travel to October 26, 1986 at 01:21:00
spark.read
.option("as-of-timestamp", "499162860000")
.format("iceberg")
.load("path/to/table")
上述代碼即是在讀取 timestamp=499162860000 時(shí),該 Iceberg 表的數(shù)據(jù),那么底層原理是什么樣子的呢?
從「基本概念」中的文件結(jié)構(gòu)可以看到,用戶每次新的寫(xiě)入都會(huì)產(chǎn)生一個(gè) snapshot,那么 Iceberg 只需要存儲(chǔ)用戶每次 commit 產(chǎn)生的 metadata,比如時(shí)間戳等信息,就能找到對(duì)應(yīng)時(shí)刻的 snapshot,并且解析出 Data Files。
增量讀取也同理,通過(guò) start 和 end 的時(shí)間戳取到時(shí)間范圍內(nèi)的 snapshot,并讀取所有的 Data Files 作為原始數(shù)據(jù)。
Fast Scan & Data Filtering
上面提到 Hive 的查詢性能低下,其中一個(gè)原因是數(shù)據(jù)計(jì)算時(shí),只能下推到 Partition 層面,粒度太粗。而 Iceberg 在細(xì)粒度的 Plan 上做了一系列的優(yōu)化,當(dāng)一個(gè) Query 進(jìn)入 Iceberg 后:
根據(jù) timestamp 找到對(duì)應(yīng)的 snapshot(默認(rèn)最新) 根據(jù) Query 的 Partition 信息從指定 snapshot 中過(guò)濾出符合條件的 manifest 文件集合 從 manifest 文件集合中取出所有的 Data Files 對(duì)象(只包含元信息) 根據(jù) Data File 的若干個(gè)屬性,進(jìn)行更細(xì)粒度的數(shù)據(jù)過(guò)濾,包括 column-level value counts, null counts, lower bounds, and upper bounds 等
Delete 實(shí)現(xiàn)
為了上線 Row-Level Update 的功能,Iceberg 提供了 Delete 的實(shí)現(xiàn),通過(guò) Delete + Insert 我們可以達(dá)到 Update 的目的。在引入 Delete 實(shí)現(xiàn)時(shí),引入了兩個(gè)概念:
Delete File:用于存儲(chǔ)刪除的數(shù)據(jù)(分為 position delete 和 equality delete) Sequence Number:是 Data File 和 Delete File 的共有屬性之一,主要用于區(qū)分 Insert 和 Delete 的先后順序,否則會(huì)出現(xiàn)數(shù)據(jù)一致性的問(wèn)題
position & equality delete
Iceberg 引入了 equality_ids 概念,用戶建表時(shí)可以指定 Table 的 equality_ids 來(lái)標(biāo)識(shí)未來(lái) Delete 操作對(duì)應(yīng)的 Key,比如 GDPR 場(chǎng)景,我們需要根據(jù) user_id 來(lái)隨機(jī)刪除用戶的相關(guān)數(shù)據(jù),就可以把 equality_ids 設(shè)置為 user_id。
兩種 Delete 操作對(duì)應(yīng)不同的 Delete File,其存儲(chǔ)字段也不同:
position delete:包括三列,file_path(要?jiǎng)h除的數(shù)據(jù)所在的 Data File)、pos(行數(shù))、row(數(shù)據(jù)) equality delete:包括 equality_ids 中的字段
顯而易見(jiàn),存儲(chǔ) Delete File 的目的是將來(lái)讀取數(shù)據(jù)時(shí),進(jìn)行實(shí)時(shí)的 Join,而 position delete 在 Join 時(shí)能精準(zhǔn)定位到文件,并且只需要行號(hào)的比較,肯定是更加高效的。所以在 Delete 操作寫(xiě)入時(shí),Iceberg 會(huì)將正在寫(xiě)入的數(shù)據(jù)文件信息存儲(chǔ)到內(nèi)存中,來(lái)保證將 DELETE 操作盡量走 position delete 的鏈路。示意圖如下所示:

按照時(shí)間順序,依次寫(xiě)入三條 INSERT 和 DELETE 數(shù)據(jù),假設(shè) Iceberg Writer 在寫(xiě)入 a1 和 b1 的 INSERT 數(shù)據(jù)后,就關(guān)閉并新開(kāi)啟了一個(gè)文件,那么此時(shí)寫(xiě)入的記錄 c1 和對(duì)應(yīng)的行號(hào)會(huì)被記錄在內(nèi)存中。此時(shí) Writer 接收到 user_id=c1 的數(shù)據(jù)后,便能直接從內(nèi)存中找到 user_id=c1 的數(shù)據(jù)是在 fileA 中的第一行,此時(shí)寫(xiě)下一個(gè) Position Delete File;而 user_id=a1 的 DELETE 數(shù)據(jù),由于文件已經(jīng)關(guān)閉,內(nèi)存中沒(méi)有記錄其信息,所以寫(xiě)下一個(gè) Equality Delete File。
Sequence Number
引入 DELETE 操作后,如果在讀取時(shí)進(jìn)行合并,則涉及到一個(gè)問(wèn)題,如果用戶對(duì)同一個(gè) equality_id 的數(shù)據(jù)進(jìn)行插入、刪除、再插入,那么讀取時(shí)該如何保證把第一次插入的數(shù)據(jù)給刪掉,讀取第二次插入的數(shù)據(jù)?
這里的處理方式是將 Data File 和 Delete File 放在一起按寫(xiě)入順序編號(hào),在讀取時(shí),DELETE 只對(duì)小于當(dāng)前 Sequence Number 的 Data File 生效。如果遇到相同記錄的并發(fā)寫(xiě)入的時(shí)候怎么辦?這里就要利用 Iceberg 自身的事務(wù)機(jī)制了,Iceberg Writer 在寫(xiě)入前會(huì)檢查相關(guān) meta 以及 Sequence Number,如果寫(xiě)入后不符合預(yù)期則會(huì)采取樂(lè)觀鎖的形式進(jìn)行重試。
Schema Evolution
Iceberg 的 schema evolution 是其特色之一,支持以下操作:
增加字段 刪除字段 重命名字段 修改字段 改變字段順序
關(guān)于 schema 的變更也依賴(lài)上面文件結(jié)構(gòu),由于每次寫(xiě)入時(shí),都會(huì)產(chǎn)生 snapshot -> manifest -> data file 的層級(jí),同樣,讀取時(shí)也會(huì)從 snapshot 開(kāi)始讀取并路由到對(duì)應(yīng)的底層 data file。所以 Iceberg 只需要每次寫(xiě)入時(shí)在 manifest 中記錄下 schema 的情況,并在讀取時(shí)進(jìn)行對(duì)應(yīng)的轉(zhuǎn)換即可。
第二部分:Flink+Iceberg環(huán)境搭建:
1. Flink SQL Client配置Iceberg
Flink集群需要使用Scala 2.12版本的
將Iceberg的依賴(lài)包下載放到Flink集群所有服務(wù)器的lib目錄下,然后重啟Flink
[root@flink1 ~]# wget -P /root/flink-1.14.3/lib https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.0/iceberg-flink-runtime-1.14-0.13.0.jar
[root@flink1 ~]#
[root@flink1 ~]# scp /root/flink-1.14.3/lib/iceberg-flink-runtime-1.14-0.13.0.jar root@flink2:/root/flink-1.14.3/lib
iceberg-flink-runtime-1.14-0.13.0.jar 100% 23MB 42.9MB/s 00:00
[root@flink1 ~]# scp /root/flink-1.14.3/lib/iceberg-flink-runtime-1.14-0.13.0.jar root@flink3:/root/flink-1.14.3/lib
iceberg-flink-runtime-1.14-0.13.0.jar 100% 23MB 35.4MB/s 00:00
[root@flink1 ~]#
Iceberg默認(rèn)支持Hadoop Catalog。如果需要使用Hive Catalog,需要將flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar放到Flink集群所有服務(wù)器的lib目錄下,然后重啟Flink
然后啟動(dòng)SQL Client就可以了
2.Java/Scala pom.xml配置
添加如下依賴(lài)
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink</artifactId>
<version>0.13.0</version>
<scope>provided</scope>
</dependency>
3.Catalog
3.1 Hive Catalog
注意:測(cè)試的時(shí)候,從Hive中查詢表數(shù)據(jù),查詢不到。但是從Trino查詢可以查詢到數(shù)據(jù)
使用Hive的metastore保存元數(shù)據(jù),HDFS保存數(shù)據(jù)庫(kù)表的數(shù)據(jù)
Flink SQL> create catalog hive_catalog with(
> 'type'='iceberg',
> 'catalog-type'='hive',
> 'property-version'='1',
> 'cache-enabled'='true',
> 'uri'='thrift://hive1:9083',
> 'client'='5',
> 'warehouse'='hdfs://nnha/user/hive/warehouse',
> 'hive-conf-dir'='/root/flink-1.14.3/hive_conf'
> );
[INFO] Execute statement succeed.
Flink SQL>
property-version: 為了向后兼容,以防property格式改變。當(dāng)前設(shè)置為1即可 cache-enabled: 是否開(kāi)啟catalog緩存,默認(rèn)開(kāi)啟 clients: 在hive metastore中,hive_catalog供客戶端訪問(wèn)的連接池大小,默認(rèn)是2 warehouse: 是Flink集群所在的HDFS路徑, hive_catalog下的數(shù)據(jù)庫(kù)表存放數(shù)據(jù)的位置 hive-conf-dir: hive集群的配置目錄。只能是Flink集群的本地路徑,從hive-site.xml解析出來(lái)的HDFS路徑,是Flink集群所在HDFS路徑 warehouse的優(yōu)先級(jí)比hive-conf-dir的優(yōu)先級(jí)高 如果Hive中已經(jīng)存在要?jiǎng)?chuàng)建的數(shù)據(jù)庫(kù),則創(chuàng)建的表path會(huì)位于Hive的warehouse下
3.2 HDFS Catalog
用HDFS保存元數(shù)據(jù)和數(shù)據(jù)庫(kù)表的數(shù)據(jù)。warehouse是Flink集群所在的HDFS路徑
Flink SQL> create catalog hadoop_catalog with (
> 'type'='iceberg',
> 'catalog-type'='hadoop',
> 'property-version'='1',
> 'cache-enabled'='true',
> 'warehouse'='hdfs://nnha/user/iceberg/warehouse'
> );
[INFO] Execute statement succeed.
Flink SQL>
通過(guò)配置conf/sql-cli-defaults.yaml實(shí)現(xiàn)永久catalog。但測(cè)試的時(shí)候并未生效
[root@flink1 ~]# cat /root/flink-1.14.3/conf/sql-cli-defaults.yaml
catalogs:
- name: hadoop_catalog
type: iceberg
catalog-type: hadoop
property-version: 1
cache-enabled: true
warehouse: hdfs://nnha/user/iceberg/warehouse
[root@flink1 ~]#
[root@flink1 ~]# chown 501:games /root/flink-1.14.3/conf/sql-cli-defaults.yaml
下面我們重點(diǎn)以Hadoop Catalog為例,進(jìn)行測(cè)試講解
4.數(shù)據(jù)庫(kù)和表相關(guān)DDL命令
4.1 創(chuàng)建數(shù)據(jù)庫(kù)
Catalog下面默認(rèn)都有一個(gè)default數(shù)據(jù)庫(kù)
Flink SQL> create database hadoop_catalog.iceberg_db;
[INFO] Execute statement succeed.
Flink SQL> use hadoop_catalog.iceberg_db;
[INFO] Execute statement succeed.
Flink SQL>
會(huì)在HDFS目錄上創(chuàng)建iceberg_db子目錄 如果刪除數(shù)據(jù)庫(kù),會(huì)刪除HDFS上的iceberg_db子目錄
4.2 創(chuàng)建表(不支持primary key等)
Flink SQL> create table hadoop_catalog.iceberg_db.my_user (
> user_id bigint comment '用戶ID',
> user_name string,
> birthday date,
> country string
> ) comment '用戶表'
> partitioned by (birthday, country) with (
> 'write.format.default'='parquet',
> 'write.parquet.compression-codec'='gzip'
> );
[INFO] Execute statement succeed.
Flink SQL>
目前表不支持計(jì)算列、primay key, Watermark 不支持計(jì)算分區(qū)。但是iceberg支持計(jì)算分區(qū) 因?yàn)镮ceberg支持primary key。設(shè)置屬性 'format-version' = '2'和'write.upsert.enabled' = 'true',同時(shí)表添加primary key,也是可以支持upsert的??梢詫?shí)現(xiàn)insert、update、delete的功能創(chuàng)建表生成的文件信息如下:
[root@flink1 ~]#
[root@flink1 ~]# hadoop fs -ls hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata
Found 2 items
-rw-r--r-- 1 root supergroup 2115 2022-02-13 22:01 hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v1.metadata.json
-rw-r--r-- 1 root supergroup 1 2022-02-13 22:01 hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/version-hint.text
[root@flink1 ~]#
查看v1.metadata.json,可以看到"current-snapshot-id" : -1
Flink SQL> create table hadoop_catalog.iceberg_db.my_user_copy
> like hadoop_catalog.iceberg_db.my_user;
[INFO] Execute statement succeed.
Flink SQL>
復(fù)制的表?yè)碛邢嗤谋斫Y(jié)構(gòu)、分區(qū)、表屬性
4.3 修改表
修改表屬性
Flink SQL> alter table hadoop_catalog.iceberg_db.my_user_copy
> set(
> 'write.format.default'='avro',
> 'write.avro.compression-codec'='gzip'
> );
[INFO] Execute statement succeed.
Flink SQL>
目前Flink只支持修改iceberg的表屬性
重命名表
Flink SQL> alter table hadoop_catalog.iceberg_db.my_user_copy
> rename to hadoop_catalog.iceberg_db.my_user_copy_new;
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Cannot rename Hadoop tables
Flink SQL>
Hadoop Catalog中的表不支持重命名表
4.4 刪除表
Flink SQL> drop table hadoop_catalog.iceberg_db.my_user_copy;
[INFO] Execute statement succeed.
Flink SQL>
會(huì)刪除HDFS上的my_user_copy子目錄
5.插入數(shù)據(jù)到表
5.1 insert into
insert into … values … insert into … select …
Flink SQL> insert into hadoop_catalog.iceberg_db.my_user(
> user_id, user_name, birthday, country
> ) values(1, 'zhang_san', date '2022-02-01', 'china'),
> (2, 'li_si', date '2022-02-02', 'japan');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: f1aa8bee0be5bda8b166cc361e113268
Flink SQL>
Flink SQL> insert into hadoop_catalog.iceberg_db.my_user select (user_id + 1), user_name, birthday, country from hadoop_catalog.iceberg_db.my_user;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c408e324ca3861b39176c6bd15770aca
Flink SQL>
HDFS目錄結(jié)果如下
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-01/country=china/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00001.parquet
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-01/country=china/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00001.parquet
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-02/country=japan/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00002.parquet
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-02/country=japan/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00002.parquet
5.2 insert overwrite(只有Batch模式支持,且overwrite粒度為partition)
只支持Flink Batch模式,不支持Streaming模式
insert overwrite替換多個(gè)整個(gè)分區(qū),而不是一行數(shù)據(jù)。如果不是分區(qū)表,則替換的是整個(gè)表,如下所示:
Flink SQL> set 'execution.runtime-mode' = 'batch';
[INFO] Session property has been set.
Flink SQL>
Flink SQL> insert overwrite hadoop_catalog.iceberg_db.my_user values (4, 'wang_wu', date '2022-02-02', 'japan');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 63cf6c27060ec9ebdce75b785cc3fa3a
Flink SQL> set 'sql-client.execution.result-mode' = 'tableau';
[INFO] Session property has been set.
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user;
+---------+-----------+------------+---------+
| user_id | user_name | birthday | country |
+---------+-----------+------------+---------+
| 1 | zhang_san | 2022-02-01 | china |
| 4 | wang_wu | 2022-02-02 | japan |
| 2 | zhang_san | 2022-02-01 | china |
+---------+-----------+------------+---------+
3 rows in set
birthday=2022-02-02/country=japan分區(qū)下的數(shù)據(jù)如下,insert overwrite也是新增一個(gè)文件
birthday=2022-02-02/country=japan/00000-0-1d0ff907-60a7-4062-93a3-9b443626e383-00001.parquet
birthday=2022-02-02/country=japan/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00002.parquet
birthday=2022-02-02/country=japan/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e
insert ovewrite … partition替換指定分區(qū)
Flink SQL> insert overwrite hadoop_catalog.iceberg_db.my_user partition (birthday = '2022-02-02', country = 'japan') select 5, 'zhao_liu';
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 97e9ba4131028c53461e739b34108ae0
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user;
+---------+-----------+------------+---------+
| user_id | user_name | birthday | country |
+---------+-----------+------------+---------+
| 1 | zhang_san | 2022-02-01 | china |
| 5 | zhao_liu | 2022-02-02 | japan |
| 2 | zhang_san | 2022-02-01 | china |
+---------+-----------+------------+---------+
3 rows in set
Flink SQL>
6.查詢數(shù)據(jù)
Batch模式
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user;
+---------+-----------+------------+---------+
| user_id | user_name | birthday | country |
+---------+-----------+------------+---------+
| 1 | zhang_san | 2022-02-01 | china |
| 5 | zhao_liu | 2022-02-02 | japan |
| 2 | zhang_san | 2022-02-01 | china |
+---------+-----------+------------+---------+
3 rows in set
Flink SQL>
streaming模式
查看最新的snapshot-id
[root@flink1 conf]# hadoop fs -cat hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/version-hint.text
5
我們前面創(chuàng)建表 + 兩次insert + 兩次insert overwrite,所以最新的版本號(hào)為5。然后我們查看該版本號(hào)對(duì)于的metadata json文件
[root@flink1 ~]# hadoop fs -cat hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v5.metadata.json
{
"format-version" : 1,
"table-uuid" : "84a5e90d-7ae9-4dfd-aeab-c74f07447513",
"location" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user",
"last-updated-ms" : 1644761481488,
"last-column-id" : 4,
"schema" : {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "user_id",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "user_name",
"required" : false,
"type" : "string"
}, {
"id" : 3,
"name" : "birthday",
"required" : false,
"type" : "date"
}, {
"id" : 4,
"name" : "country",
"required" : false,
"type" : "string"
} ]
},
"current-schema-id" : 0,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "user_id",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "user_name",
"required" : false,
"type" : "string"
}, {
"id" : 3,
"name" : "birthday",
"required" : false,
"type" : "date"
}, {
"id" : 4,
"name" : "country",
"required" : false,
"type" : "string"
} ]
} ],
"partition-spec" : [ {
"name" : "birthday",
"transform" : "identity",
"source-id" : 3,
"field-id" : 1000
}, {
"name" : "country",
"transform" : "identity",
"source-id" : 4,
"field-id" : 1001
} ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "birthday",
"transform" : "identity",
"source-id" : 3,
"field-id" : 1000
}, {
"name" : "country",
"transform" : "identity",
"source-id" : 4,
"field-id" : 1001
} ]
} ],
"last-partition-id" : 1001,
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"write.format.default" : "parquet",
"write.parquet.compression-codec" : "gzip"
},
"current-snapshot-id" : 138573494821828246,
"snapshots" : [ {
"snapshot-id" : 8012517928892530314,
"timestamp-ms" : 1644761130111,
"summary" : {
"operation" : "append",
"flink.job-id" : "8f228ae49d34aafb4b2887db3149e3f6",
"flink.max-committed-checkpoint-id" : "9223372036854775807",
"added-data-files" : "2",
"added-records" : "2",
"added-files-size" : "2487",
"changed-partition-count" : "2",
"total-records" : "2",
"total-files-size" : "2487",
"total-data-files" : "2",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-8012517928892530314-1-5c33451b-48ab-4ce5-be7a-2c2d2dc9e11d.avro",
"schema-id" : 0
}, {
"snapshot-id" : 453371561664052237,
"parent-snapshot-id" : 8012517928892530314,
"timestamp-ms" : 1644761150082,
"summary" : {
"operation" : "append",
"flink.job-id" : "813b7a17c21ddd003e1a210b1366e0c5",
"flink.max-committed-checkpoint-id" : "9223372036854775807",
"added-data-files" : "2",
"added-records" : "2",
"added-files-size" : "2487",
"changed-partition-count" : "2",
"total-records" : "4",
"total-files-size" : "4974",
"total-data-files" : "4",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-453371561664052237-1-bc0e56ec-9f78-4956-8412-4d8ca70ccc19.avro",
"schema-id" : 0
}, {
"snapshot-id" : 6410282459040239217,
"parent-snapshot-id" : 453371561664052237,
"timestamp-ms" : 1644761403566,
"summary" : {
"operation" : "overwrite",
"replace-partitions" : "true",
"flink.job-id" : "f7085f68e5ff73c1c8aa1f4f59996068",
"flink.max-committed-checkpoint-id" : "9223372036854775807",
"added-data-files" : "1",
"deleted-data-files" : "2",
"added-records" : "1",
"deleted-records" : "2",
"added-files-size" : "1244",
"removed-files-size" : "2459",
"changed-partition-count" : "1",
"total-records" : "3",
"total-files-size" : "3759",
"total-data-files" : "3",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-6410282459040239217-1-2b20c57e-5428-4483-9f7b-928b980dd50d.avro",
"schema-id" : 0
}, {
"snapshot-id" : 138573494821828246,
"parent-snapshot-id" : 6410282459040239217,
"timestamp-ms" : 1644761481488,
"summary" : {
"operation" : "overwrite",
"replace-partitions" : "true",
"flink.job-id" : "d434d6d4f658d61732d7e9a0a85279fc",
"flink.max-committed-checkpoint-id" : "9223372036854775807",
"added-data-files" : "1",
"deleted-data-files" : "1",
"added-records" : "1",
"deleted-records" : "1",
"added-files-size" : "1251",
"removed-files-size" : "1244",
"changed-partition-count" : "1",
"total-records" : "3",
"total-files-size" : "3766",
"total-data-files" : "3",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-138573494821828246-1-b243b39e-7122-4571-b6fa-c902241e36a8.avro",
"schema-id" : 0
} ],
"snapshot-log" : [ {
"timestamp-ms" : 1644761130111,
"snapshot-id" : 8012517928892530314
}, {
"timestamp-ms" : 1644761150082,
"snapshot-id" : 453371561664052237
}, {
"timestamp-ms" : 1644761403566,
"snapshot-id" : 6410282459040239217
}, {
"timestamp-ms" : 1644761481488,
"snapshot-id" : 138573494821828246
} ],
"metadata-log" : [ {
"timestamp-ms" : 1644760911017,
"metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v1.metadata.json"
}, {
"timestamp-ms" : 1644761130111,
"metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v2.metadata.json"
}, {
"timestamp-ms" : 1644761150082,
"metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v3.metadata.json"
}, {
"timestamp-ms" : 1644761403566,
"metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v4.metadata.json"
} ]
}[root@flink1 ~]#
可以看到 "current-snapshot-id" : 138573494821828246,,表示當(dāng)前的snapshot-id
Flink SQL> set 'execution.runtime-mode' = 'streaming';
[INFO] Session property has been set.
Flink SQL>
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user
> /*+ options(
> 'streaming'='true',
> 'monitor-interval'='5s'
> )*/ ;
+----+----------------------+--------------------------------+------------+--------------------------------+
| op | user_id | user_name | birthday | country |
+----+----------------------+--------------------------------+------------+--------------------------------+
| +I | 5 | zhao_liu | 2022-02-02 | japan |
| +I | 2 | zhang_san | 2022-02-01 | china |
| +I | 1 | zhang_san | 2022-02-01 | china |
可以看到最新snapshot對(duì)應(yīng)的數(shù)據(jù)
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user
> /*+ options(
> 'streaming'='true',
> 'monitor-interval'='5s',
> 'start-snapshot-id'='138573494821828246'
> )*/ ;
+----+----------------------+--------------------------------+------------+--------------------------------+
| op | user_id | user_name | birthday | country |
+----+----------------------+--------------------------------+------------+--------------------------------+
這里只能指定最后一個(gè)insert overwrite操作的snapshot id,及其后面的snapshot id,否則后臺(tái)會(huì)報(bào)異常,且程序一直處于restarting的狀態(tài):
java.lang.UnsupportedOperationException: Found overwrite operation, cannot support incremental data in snapshots (8012517928892530314, 138573494821828246]
在本示例中snapshot id: 138573494821828246,是最后一個(gè)snapshot id,同時(shí)也是最后一個(gè)insert overwrite操作的snapshot id。如果再insert兩條數(shù)據(jù),則只能看到增量的數(shù)據(jù)
Flink SQL> insert into hadoop_catalog.iceberg_db.my_user(
> user_id, user_name, birthday, country
> ) values(6, 'zhang_san', date '2022-02-01', 'china');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 8eb279e61aed66304d78ad027eaf8d30
Flink SQL> insert into hadoop_catalog.iceberg_db.my_user(
> user_id, user_name, birthday, country
> ) values(7, 'zhang_san', date '2022-02-01', 'china');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 70a050e455d188d0d3f3adc2ba367fb6
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user
> /*+ options(
> 'streaming'='true',
> 'monitor-interval'='30s',
> 'start-snapshot-id'='138573494821828246'
> )*/ ;
+----+----------------------+--------------------------------+------------+--------------------------------+
| op | user_id | user_name | birthday | country |
+----+----------------------+--------------------------------+------------+--------------------------------+
| +I | 6 | zhang_san | 2022-02-01 | china |
| +I | 7 | zhang_san | 2022-02-01 | china |
streaming模式支持讀取增量snapshot數(shù)據(jù) 如果不指定start-snapshot-id,則先讀取當(dāng)前snapshot全量數(shù)據(jù),再讀取增量數(shù)據(jù)。如果指定start-snapshot-id,讀取該snapshot-id之后的增量數(shù)據(jù),即不讀取該snapshot-id的數(shù)據(jù) monitor-interval:表示監(jiān)控新提交的數(shù)據(jù)文件的時(shí)間間隔,默認(rèn)1s


