<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink+Iceberg搭建實(shí)時(shí)數(shù)據(jù)湖實(shí)戰(zhàn)

          共 38043字,需瀏覽 77分鐘

           ·

          2022-07-09 10:47

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”
          回復(fù)"面試"獲取更多驚喜

          全網(wǎng)最全大數(shù)據(jù)面試提升手冊(cè)!

          第一部分:Iceberg 核心功能原理剖析 :

          Apache Iceberg

          摘自官網(wǎng):

          Apache Iceberg is an open table format for huge analytic datasets.

          可以看到 Founders 對(duì) Iceberg 的定位是面向海量數(shù)據(jù)分析場(chǎng)景的高效存儲(chǔ)格式。海量數(shù)據(jù)分析的場(chǎng)景,類(lèi)比于 Hive 是 Hdfs 的封裝一樣,本質(zhì)上解決的還是數(shù)倉(cāng)場(chǎng)景的痛點(diǎn)問(wèn)題。

          Iceberg 在最開(kāi)始,也確實(shí)是在數(shù)倉(cāng)場(chǎng)景朝著更快更好用的 format 目標(biāo)不斷演進(jìn),比如支持 schema 變更,文件粒度的 Filter 優(yōu)化等,但隨著和流式計(jì)算 Flink 引擎的生態(tài)打通,Delete/Update/Merge 語(yǔ)義的出現(xiàn),場(chǎng)景就會(huì)變得多樣化起來(lái)。

          背景

          過(guò)去業(yè)界更多是使用 Hive/Spark on HDFS 作為離線數(shù)據(jù)倉(cāng)庫(kù)的載體,在越來(lái)越趨于實(shí)時(shí)化和快速迭代的場(chǎng)景中,逐漸暴露出以下缺點(diǎn):

          • 不支持 Row-Level-Update,對(duì)于更新的操作需要 overwrite 整張 Hive 表,成本極高
          • 不支持讀寫(xiě)分離,用戶的讀取操作會(huì)被另一個(gè)用戶的寫(xiě)入操作所影響(尤其是流式讀取的場(chǎng)景)
          • 不支持版本回滾和快照,需要保存大量歷史數(shù)據(jù)
          • 不支持增量讀取,每次掃描全表或分區(qū)所有數(shù)據(jù)
          • 性能低,只能裁剪到 Hive Partition 粒度
          • 不支持 Schema 變更
          • .....
          基本概念

          如上圖所示,iceberg 將 hdfs 上的文件進(jìn)行了 snapshot、manifest list、manifest、data files 的分層。

          1. Snapshot:用戶的每次 commit(每次寫(xiě)入的 spark job) 會(huì)產(chǎn)生一個(gè)新的 snapshot
          2. Manifest List:維護(hù)當(dāng)前 snapshot 中所有的 manifest
          3. Manifest:維護(hù)當(dāng)前 Manifest 下所有的 data files
          4. Data File:存儲(chǔ)數(shù)據(jù)的文件,后續(xù) Iceberg 引入了 Delete File,用于存儲(chǔ)要?jiǎng)h除的數(shù)據(jù),文件結(jié)構(gòu)上也是與 Data File 處在同一層
          核心功能剖析
          Time Travel 和增量讀取

          Time Travel 指的是用戶可以任意讀取歷史時(shí)刻的相關(guān)數(shù)據(jù),以 Spark 的代碼為例:

          // time travel to October 26, 1986 at 01:21:00
          spark.read
              .option("as-of-timestamp""499162860000")
              .format("iceberg")
              .load("path/to/table")

          上述代碼即是在讀取 timestamp=499162860000 時(shí),該 Iceberg 表的數(shù)據(jù),那么底層原理是什么樣子的呢?

          從「基本概念」中的文件結(jié)構(gòu)可以看到,用戶每次新的寫(xiě)入都會(huì)產(chǎn)生一個(gè) snapshot,那么 Iceberg 只需要存儲(chǔ)用戶每次 commit 產(chǎn)生的 metadata,比如時(shí)間戳等信息,就能找到對(duì)應(yīng)時(shí)刻的 snapshot,并且解析出 Data Files。

          增量讀取也同理,通過(guò) start 和 end 的時(shí)間戳取到時(shí)間范圍內(nèi)的 snapshot,并讀取所有的 Data Files 作為原始數(shù)據(jù)。

          Fast Scan & Data Filtering

          上面提到 Hive 的查詢性能低下,其中一個(gè)原因是數(shù)據(jù)計(jì)算時(shí),只能下推到 Partition 層面,粒度太粗。而 Iceberg 在細(xì)粒度的 Plan 上做了一系列的優(yōu)化,當(dāng)一個(gè) Query 進(jìn)入 Iceberg 后:

          1. 根據(jù) timestamp 找到對(duì)應(yīng)的 snapshot(默認(rèn)最新)
          2. 根據(jù) Query 的 Partition 信息從指定 snapshot 中過(guò)濾出符合條件的 manifest 文件集合
          3. 從 manifest 文件集合中取出所有的 Data Files 對(duì)象(只包含元信息)
          4. 根據(jù) Data File 的若干個(gè)屬性,進(jìn)行更細(xì)粒度的數(shù)據(jù)過(guò)濾,包括 column-level value counts, null counts, lower bounds, and upper bounds 等
          Delete 實(shí)現(xiàn)

          為了上線 Row-Level Update 的功能,Iceberg 提供了 Delete 的實(shí)現(xiàn),通過(guò) Delete + Insert 我們可以達(dá)到 Update 的目的。在引入 Delete 實(shí)現(xiàn)時(shí),引入了兩個(gè)概念:

          • Delete File:用于存儲(chǔ)刪除的數(shù)據(jù)(分為 position delete 和 equality delete)
          • Sequence Number:是 Data File 和 Delete File 的共有屬性之一,主要用于區(qū)分 Insert 和 Delete 的先后順序,否則會(huì)出現(xiàn)數(shù)據(jù)一致性的問(wèn)題
          position & equality delete

          Iceberg 引入了 equality_ids 概念,用戶建表時(shí)可以指定 Table 的 equality_ids 來(lái)標(biāo)識(shí)未來(lái) Delete 操作對(duì)應(yīng)的 Key,比如 GDPR 場(chǎng)景,我們需要根據(jù) user_id 來(lái)隨機(jī)刪除用戶的相關(guān)數(shù)據(jù),就可以把 equality_ids 設(shè)置為 user_id。

          兩種 Delete 操作對(duì)應(yīng)不同的 Delete File,其存儲(chǔ)字段也不同:

          • position delete:包括三列,file_path(要?jiǎng)h除的數(shù)據(jù)所在的 Data File)、pos(行數(shù))、row(數(shù)據(jù))
          • equality delete:包括 equality_ids 中的字段

          顯而易見(jiàn),存儲(chǔ) Delete File 的目的是將來(lái)讀取數(shù)據(jù)時(shí),進(jìn)行實(shí)時(shí)的 Join,而 position delete 在 Join 時(shí)能精準(zhǔn)定位到文件,并且只需要行號(hào)的比較,肯定是更加高效的。所以在 Delete 操作寫(xiě)入時(shí),Iceberg 會(huì)將正在寫(xiě)入的數(shù)據(jù)文件信息存儲(chǔ)到內(nèi)存中,來(lái)保證將 DELETE 操作盡量走 position delete 的鏈路。示意圖如下所示:

          按照時(shí)間順序,依次寫(xiě)入三條 INSERT 和 DELETE 數(shù)據(jù),假設(shè) Iceberg Writer 在寫(xiě)入 a1 和 b1 的 INSERT 數(shù)據(jù)后,就關(guān)閉并新開(kāi)啟了一個(gè)文件,那么此時(shí)寫(xiě)入的記錄 c1 和對(duì)應(yīng)的行號(hào)會(huì)被記錄在內(nèi)存中。此時(shí) Writer 接收到 user_id=c1 的數(shù)據(jù)后,便能直接從內(nèi)存中找到 user_id=c1 的數(shù)據(jù)是在 fileA 中的第一行,此時(shí)寫(xiě)下一個(gè) Position Delete File;而 user_id=a1 的 DELETE 數(shù)據(jù),由于文件已經(jīng)關(guān)閉,內(nèi)存中沒(méi)有記錄其信息,所以寫(xiě)下一個(gè) Equality Delete File。

          Sequence Number

          引入 DELETE 操作后,如果在讀取時(shí)進(jìn)行合并,則涉及到一個(gè)問(wèn)題,如果用戶對(duì)同一個(gè) equality_id 的數(shù)據(jù)進(jìn)行插入、刪除、再插入,那么讀取時(shí)該如何保證把第一次插入的數(shù)據(jù)給刪掉,讀取第二次插入的數(shù)據(jù)?

          這里的處理方式是將 Data File 和 Delete File 放在一起按寫(xiě)入順序編號(hào),在讀取時(shí),DELETE 只對(duì)小于當(dāng)前 Sequence Number 的 Data File 生效。如果遇到相同記錄的并發(fā)寫(xiě)入的時(shí)候怎么辦?這里就要利用 Iceberg 自身的事務(wù)機(jī)制了,Iceberg Writer 在寫(xiě)入前會(huì)檢查相關(guān) meta 以及 Sequence Number,如果寫(xiě)入后不符合預(yù)期則會(huì)采取樂(lè)觀鎖的形式進(jìn)行重試。

          Schema Evolution

          Iceberg 的 schema evolution 是其特色之一,支持以下操作:

          • 增加字段
          • 刪除字段
          • 重命名字段
          • 修改字段
          • 改變字段順序

          關(guān)于 schema 的變更也依賴(lài)上面文件結(jié)構(gòu),由于每次寫(xiě)入時(shí),都會(huì)產(chǎn)生 snapshot -> manifest -> data file 的層級(jí),同樣,讀取時(shí)也會(huì)從 snapshot 開(kāi)始讀取并路由到對(duì)應(yīng)的底層 data file。所以 Iceberg 只需要每次寫(xiě)入時(shí)在 manifest 中記錄下 schema 的情況,并在讀取時(shí)進(jìn)行對(duì)應(yīng)的轉(zhuǎn)換即可。

          第二部分:Flink+Iceberg環(huán)境搭建:

          1. Flink SQL Client配置Iceberg

          Flink集群需要使用Scala 2.12版本的

          1. 將Iceberg的依賴(lài)包下載放到Flink集群所有服務(wù)器的lib目錄下,然后重啟Flink
          [root@flink1 ~]# wget -P /root/flink-1.14.3/lib https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.0/iceberg-flink-runtime-1.14-0.13.0.jar
          [root@flink1 ~]#
          [root@flink1 ~]# scp /root/flink-1.14.3/lib/iceberg-flink-runtime-1.14-0.13.0.jar root@flink2:/root/flink-1.14.3/lib
          iceberg-flink-runtime-1.14-0.13.0.jar                                                                                                    100%   23MB  42.9MB/s   00:00    
          [root@flink1 ~]# scp /root/flink-1.14.3/lib/iceberg-flink-runtime-1.14-0.13.0.jar root@flink3:/root/flink-1.14.3/lib
          iceberg-flink-runtime-1.14-0.13.0.jar                                                                                                    100%   23MB  35.4MB/s   00:00    
          [root@flink1 ~]

          Iceberg默認(rèn)支持Hadoop Catalog。如果需要使用Hive Catalog,需要將flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar放到Flink集群所有服務(wù)器的lib目錄下,然后重啟Flink

          1. 然后啟動(dòng)SQL Client就可以了
          2.Java/Scala pom.xml配置

          添加如下依賴(lài)

          <dependency>
                      <groupId>org.apache.iceberg</groupId>
                      <artifactId>iceberg-flink</artifactId>
                      <version>0.13.0</version>
                      <scope>provided</scope>
                  </dependency>
          3.Catalog
          3.1 Hive Catalog

          注意:測(cè)試的時(shí)候,從Hive中查詢表數(shù)據(jù),查詢不到。但是從Trino查詢可以查詢到數(shù)據(jù)

          使用Hive的metastore保存元數(shù)據(jù),HDFS保存數(shù)據(jù)庫(kù)表的數(shù)據(jù)

          Flink SQL> create catalog hive_catalog with(
          'type'='iceberg',
          'catalog-type'='hive',
          'property-version'='1',
          'cache-enabled'='true',
          'uri'='thrift://hive1:9083',
          'client'='5',
          'warehouse'='hdfs://nnha/user/hive/warehouse',
          'hive-conf-dir'='/root/flink-1.14.3/hive_conf'
          > );
          [INFO] Execute statement succeed.

          Flink SQL>
          • property-version: 為了向后兼容,以防property格式改變。當(dāng)前設(shè)置為1即可
          • cache-enabled: 是否開(kāi)啟catalog緩存,默認(rèn)開(kāi)啟
          • clients: 在hive metastore中,hive_catalog供客戶端訪問(wèn)的連接池大小,默認(rèn)是2
          • warehouse: 是Flink集群所在的HDFS路徑, hive_catalog下的數(shù)據(jù)庫(kù)表存放數(shù)據(jù)的位置
          • hive-conf-dir: hive集群的配置目錄。只能是Flink集群的本地路徑,從hive-site.xml解析出來(lái)的HDFS路徑,是Flink集群所在HDFS路徑
          • warehouse的優(yōu)先級(jí)比hive-conf-dir的優(yōu)先級(jí)高
          • 如果Hive中已經(jīng)存在要?jiǎng)?chuàng)建的數(shù)據(jù)庫(kù),則創(chuàng)建的表path會(huì)位于Hive的warehouse下
          3.2 HDFS Catalog

          用HDFS保存元數(shù)據(jù)和數(shù)據(jù)庫(kù)表的數(shù)據(jù)。warehouse是Flink集群所在的HDFS路徑

          Flink SQL> create catalog hadoop_catalog with (
          'type'='iceberg',
          'catalog-type'='hadoop',
          'property-version'='1',
          'cache-enabled'='true',
          'warehouse'='hdfs://nnha/user/iceberg/warehouse'
          > );
          [INFO] Execute statement succeed.

          Flink SQL>

          通過(guò)配置conf/sql-cli-defaults.yaml實(shí)現(xiàn)永久catalog。但測(cè)試的時(shí)候并未生效

          [root@flink1 ~]# cat /root/flink-1.14.3/conf/sql-cli-defaults.yaml 
          catalogs:
            - name: hadoop_catalog
              type: iceberg
              catalog-type: hadoop
              property-version: 1
              cache-enabled: true
              warehouse: hdfs://nnha/user/iceberg/warehouse

          [root@flink1 ~]#
          [root@flink1 ~]# chown 501:games /root/flink-1.14.3/conf/sql-cli-defaults.yaml

          下面我們重點(diǎn)以Hadoop Catalog為例,進(jìn)行測(cè)試講解

          4.數(shù)據(jù)庫(kù)和表相關(guān)DDL命令
          4.1 創(chuàng)建數(shù)據(jù)庫(kù)

          Catalog下面默認(rèn)都有一個(gè)default數(shù)據(jù)庫(kù)

          Flink SQL> create database hadoop_catalog.iceberg_db;
          [INFO] Execute statement succeed.

          Flink SQL> use hadoop_catalog.iceberg_db;
          [INFO] Execute statement succeed.

          Flink SQL>
          • 會(huì)在HDFS目錄上創(chuàng)建iceberg_db子目錄
          • 如果刪除數(shù)據(jù)庫(kù),會(huì)刪除HDFS上的iceberg_db子目錄
          4.2 創(chuàng)建表(不支持primary key等)
          Flink SQL> create table hadoop_catalog.iceberg_db.my_user (
          > user_id bigint comment '用戶ID',
          > user_name string,
          > birthday date,
          > country string
          > ) comment '用戶表' 
          > partitioned by (birthday, country) with (
          'write.format.default'='parquet',
          'write.parquet.compression-codec'='gzip'
          > );
          [INFO] Execute statement succeed.

          Flink SQL>
          • 目前表不支持計(jì)算列、primay key, Watermark
          • 不支持計(jì)算分區(qū)。但是iceberg支持計(jì)算分區(qū)
          • 因?yàn)镮ceberg支持primary key。設(shè)置屬性'format-version' = '2'和'write.upsert.enabled' = 'true',同時(shí)表添加primary key,也是可以支持upsert的??梢詫?shí)現(xiàn)insert、update、delete的功能
          • 創(chuàng)建表生成的文件信息如下:
          [root@flink1 ~]
          [root@flink1 ~]# hadoop fs -ls hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata
          Found 2 items
          -rw-r--r--   1 root supergroup       2115 2022-02-13 22:01 hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v1.metadata.json
          -rw-r--r--   1 root supergroup          1 2022-02-13 22:01 hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/version-hint.text
          [root@flink1 ~]#

          查看v1.metadata.json,可以看到"current-snapshot-id" : -1

          Flink SQL> create table hadoop_catalog.iceberg_db.my_user_copy 
          > like hadoop_catalog.iceberg_db.my_user;
          [INFO] Execute statement succeed.

          Flink SQL> 
          • 復(fù)制的表?yè)碛邢嗤谋斫Y(jié)構(gòu)、分區(qū)、表屬性
          4.3 修改表

          修改表屬性

          Flink SQL> alter table hadoop_catalog.iceberg_db.my_user_copy 
          set(
          'write.format.default'='avro',
          'write.avro.compression-codec'='gzip'
          > );
          [INFO] Execute statement succeed.

          Flink SQL>
          • 目前Flink只支持修改iceberg的表屬性

          重命名表

          Flink SQL> alter table hadoop_catalog.iceberg_db.my_user_copy 
          > rename to hadoop_catalog.iceberg_db.my_user_copy_new;
          [ERROR] Could not execute SQL statement. Reason:
          java.lang.UnsupportedOperationException: Cannot rename Hadoop tables

          Flink SQL>
          • Hadoop Catalog中的表不支持重命名表
          4.4 刪除表
          Flink SQL> drop table hadoop_catalog.iceberg_db.my_user_copy;
          [INFO] Execute statement succeed.

          Flink SQL>

          會(huì)刪除HDFS上的my_user_copy子目錄

          5.插入數(shù)據(jù)到表
          5.1 insert into
          1. insert into … values …
          2. insert into … select …
          Flink SQL> insert into hadoop_catalog.iceberg_db.my_user(
          > user_id, user_name, birthday, country
          > ) values(1, 'zhang_san', date '2022-02-01''china'), 
          > (2, 'li_si', date '2022-02-02''japan');
          [INFO] Submitting SQL update statement to the cluster...
          [INFO] SQL update statement has been successfully submitted to the cluster:
          Job ID: f1aa8bee0be5bda8b166cc361e113268


          Flink SQL>
          Flink SQL> insert into hadoop_catalog.iceberg_db.my_user select (user_id + 1), user_name, birthday, country from hadoop_catalog.iceberg_db.my_user;
          [INFO] Submitting SQL update statement to the cluster...
          [INFO] SQL update statement has been successfully submitted to the cluster:
          Job ID: c408e324ca3861b39176c6bd15770aca


          Flink SQL>

          HDFS目錄結(jié)果如下

          hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-01/country=china/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00001.parquet
          hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-01/country=china/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00001.parquet
          hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-02/country=japan/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00002.parquet
          hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-02/country=japan/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00002.parquet
          5.2 insert overwrite(只有Batch模式支持,且overwrite粒度為partition)

          只支持Flink Batch模式,不支持Streaming模式

          insert overwrite替換多個(gè)整個(gè)分區(qū),而不是一行數(shù)據(jù)。如果不是分區(qū)表,則替換的是整個(gè)表,如下所示:

          Flink SQL> set 'execution.runtime-mode' = 'batch';
          [INFO] Session property has been set.

          Flink SQL>
          Flink SQL> insert overwrite hadoop_catalog.iceberg_db.my_user values (4, 'wang_wu', date '2022-02-02''japan');
          [INFO] Submitting SQL update statement to the cluster...
          [INFO] SQL update statement has been successfully submitted to the cluster:
          Job ID: 63cf6c27060ec9ebdce75b785cc3fa3a

          Flink SQL> set 'sql-client.execution.result-mode' = 'tableau';
          [INFO] Session property has been set.

          Flink SQL> select * from hadoop_catalog.iceberg_db.my_user;
          +---------+-----------+------------+---------+
          | user_id | user_name |   birthday | country |
          +---------+-----------+------------+---------+
          |       1 | zhang_san | 2022-02-01 |   china |
          |       4 |   wang_wu | 2022-02-02 |   japan |
          |       2 | zhang_san | 2022-02-01 |   china |
          +---------+-----------+------------+---------+
          3 rows in set

          birthday=2022-02-02/country=japan分區(qū)下的數(shù)據(jù)如下,insert overwrite也是新增一個(gè)文件

          birthday=2022-02-02/country=japan/00000-0-1d0ff907-60a7-4062-93a3-9b443626e383-00001.parquet
          birthday=2022-02-02/country=japan/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00002.parquet
          birthday=2022-02-02/country=japan/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e

          insert ovewrite … partition替換指定分區(qū)

          Flink SQL> insert overwrite hadoop_catalog.iceberg_db.my_user partition (birthday = '2022-02-02', country = 'japan') select 5, 'zhao_liu';
          [INFO] Submitting SQL update statement to the cluster...
          [INFO] SQL update statement has been successfully submitted to the cluster:
          Job ID: 97e9ba4131028c53461e739b34108ae0


          Flink SQL> select * from hadoop_catalog.iceberg_db.my_user;
          +---------+-----------+------------+---------+
          | user_id | user_name |   birthday | country |
          +---------+-----------+------------+---------+
          |       1 | zhang_san | 2022-02-01 |   china |
          |       5 |  zhao_liu | 2022-02-02 |   japan |
          |       2 | zhang_san | 2022-02-01 |   china |
          +---------+-----------+------------+---------+
          3 rows in set

          Flink SQL>
          6.查詢數(shù)據(jù)

          Batch模式

          Flink SQL> select * from hadoop_catalog.iceberg_db.my_user;
          +---------+-----------+------------+---------+
          | user_id | user_name |   birthday | country |
          +---------+-----------+------------+---------+
          |       1 | zhang_san | 2022-02-01 |   china |
          |       5 |  zhao_liu | 2022-02-02 |   japan |
          |       2 | zhang_san | 2022-02-01 |   china |
          +---------+-----------+------------+---------+
          3 rows in set

          Flink SQL>

          streaming模式

          查看最新的snapshot-id

          [root@flink1 conf]# hadoop fs -cat hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/version-hint.text
          5

          我們前面創(chuàng)建表 + 兩次insert + 兩次insert overwrite,所以最新的版本號(hào)為5。然后我們查看該版本號(hào)對(duì)于的metadata json文件

          [root@flink1 ~]# hadoop fs -cat hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v5.metadata.json
          {
            "format-version" : 1,
            "table-uuid" : "84a5e90d-7ae9-4dfd-aeab-c74f07447513",
            "location" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user",
            "last-updated-ms" : 1644761481488,
            "last-column-id" : 4,
            "schema" : {
              "type" : "struct",
              "schema-id" : 0,
              "fields" : [ {
                "id" : 1,
                "name" : "user_id",
                "required" : false,
                "type" : "long"
              }, {
                "id" : 2,
                "name" : "user_name",
                "required" : false,
                "type" : "string"
              }, {
                "id" : 3,
                "name" : "birthday",
                "required" : false,
                "type" : "date"
              }, {
                "id" : 4,
                "name" : "country",
                "required" : false,
                "type" : "string"
              } ]
            },
            "current-schema-id" : 0,
            "schemas" : [ {
              "type" : "struct",
              "schema-id" : 0,
              "fields" : [ {
                "id" : 1,
                "name" : "user_id",
                "required" : false,
                "type" : "long"
              }, {
                "id" : 2,
                "name" : "user_name",
                "required" : false,
                "type" : "string"
              }, {
                "id" : 3,
                "name" : "birthday",
                "required" : false,
                "type" : "date"
              }, {
                "id" : 4,
                "name" : "country",
                "required" : false,
                "type" : "string"
              } ]
            } ],
            "partition-spec" : [ {
              "name" : "birthday",
              "transform" : "identity",
              "source-id" : 3,
              "field-id" : 1000
            }, {
              "name" : "country",
              "transform" : "identity",
              "source-id" : 4,
              "field-id" : 1001
            } ],
            "default-spec-id" : 0,
            "partition-specs" : [ {
              "spec-id" : 0,
              "fields" : [ {
                "name" : "birthday",
                "transform" : "identity",
                "source-id" : 3,
                "field-id" : 1000
              }, {
                "name" : "country",
                "transform" : "identity",
                "source-id" : 4,
                "field-id" : 1001
              } ]
            } ],
            "last-partition-id" : 1001,
            "default-sort-order-id" : 0,
            "sort-orders" : [ {
              "order-id" : 0,
              "fields" : [ ]
            } ],
            "properties" : {
              "write.format.default" : "parquet",
              "write.parquet.compression-codec" : "gzip"
            },
            "current-snapshot-id" : 138573494821828246,
            "snapshots" : [ {
              "snapshot-id" : 8012517928892530314,
              "timestamp-ms" : 1644761130111,
              "summary" : {
                "operation" : "append",
                "flink.job-id" : "8f228ae49d34aafb4b2887db3149e3f6",
                "flink.max-committed-checkpoint-id" : "9223372036854775807",
                "added-data-files" : "2",
                "added-records" : "2",
                "added-files-size" : "2487",
                "changed-partition-count" : "2",
                "total-records" : "2",
                "total-files-size" : "2487",
                "total-data-files" : "2",
                "total-delete-files" : "0",
                "total-position-deletes" : "0",
                "total-equality-deletes" : "0"
              },
              "manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-8012517928892530314-1-5c33451b-48ab-4ce5-be7a-2c2d2dc9e11d.avro",
              "schema-id" : 0
            }, {
              "snapshot-id" : 453371561664052237,
              "parent-snapshot-id" : 8012517928892530314,
              "timestamp-ms" : 1644761150082,
              "summary" : {
                "operation" : "append",
                "flink.job-id" : "813b7a17c21ddd003e1a210b1366e0c5",
                "flink.max-committed-checkpoint-id" : "9223372036854775807",
                "added-data-files" : "2",
                "added-records" : "2",
                "added-files-size" : "2487",
                "changed-partition-count" : "2",
                "total-records" : "4",
                "total-files-size" : "4974",
                "total-data-files" : "4",
                "total-delete-files" : "0",
                "total-position-deletes" : "0",
                "total-equality-deletes" : "0"
              },
              "manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-453371561664052237-1-bc0e56ec-9f78-4956-8412-4d8ca70ccc19.avro",
              "schema-id" : 0
            }, {
              "snapshot-id" : 6410282459040239217,
              "parent-snapshot-id" : 453371561664052237,
              "timestamp-ms" : 1644761403566,
              "summary" : {
                "operation" : "overwrite",
                "replace-partitions" : "true",
                "flink.job-id" : "f7085f68e5ff73c1c8aa1f4f59996068",
                "flink.max-committed-checkpoint-id" : "9223372036854775807",
                "added-data-files" : "1",
                "deleted-data-files" : "2",
                "added-records" : "1",
                "deleted-records" : "2",
                "added-files-size" : "1244",
                "removed-files-size" : "2459",
                "changed-partition-count" : "1",
                "total-records" : "3",
                "total-files-size" : "3759",
                "total-data-files" : "3",
                "total-delete-files" : "0",
                "total-position-deletes" : "0",
                "total-equality-deletes" : "0"
              },
              "manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-6410282459040239217-1-2b20c57e-5428-4483-9f7b-928b980dd50d.avro",
              "schema-id" : 0
            }, {
              "snapshot-id" : 138573494821828246,
              "parent-snapshot-id" : 6410282459040239217,
              "timestamp-ms" : 1644761481488,
              "summary" : {
                "operation" : "overwrite",
                "replace-partitions" : "true",
                "flink.job-id" : "d434d6d4f658d61732d7e9a0a85279fc",
                "flink.max-committed-checkpoint-id" : "9223372036854775807",
                "added-data-files" : "1",
                "deleted-data-files" : "1",
                "added-records" : "1",
                "deleted-records" : "1",
                "added-files-size" : "1251",
                "removed-files-size" : "1244",
                "changed-partition-count" : "1",
                "total-records" : "3",
                "total-files-size" : "3766",
                "total-data-files" : "3",
                "total-delete-files" : "0",
                "total-position-deletes" : "0",
                "total-equality-deletes" : "0"
              },
              "manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-138573494821828246-1-b243b39e-7122-4571-b6fa-c902241e36a8.avro",
              "schema-id" : 0
            } ],
            "snapshot-log" : [ {
              "timestamp-ms" : 1644761130111,
              "snapshot-id" : 8012517928892530314
            }, {
              "timestamp-ms" : 1644761150082,
              "snapshot-id" : 453371561664052237
            }, {
              "timestamp-ms" : 1644761403566,
              "snapshot-id" : 6410282459040239217
            }, {
              "timestamp-ms" : 1644761481488,
              "snapshot-id" : 138573494821828246
            } ],
            "metadata-log" : [ {
              "timestamp-ms" : 1644760911017,
              "metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v1.metadata.json"
            }, {
              "timestamp-ms" : 1644761130111,
              "metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v2.metadata.json"
            }, {
              "timestamp-ms" : 1644761150082,
              "metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v3.metadata.json"
            }, {
              "timestamp-ms" : 1644761403566,
              "metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v4.metadata.json"
            } ]
          }[root@flink1 ~]#

          可以看到 "current-snapshot-id" : 138573494821828246,,表示當(dāng)前的snapshot-id

          Flink SQL> set 'execution.runtime-mode' = 'streaming';
          [INFO] Session property has been set.

          Flink SQL>
          Flink SQL> select * from hadoop_catalog.iceberg_db.my_user 
          > /*+ options(
          'streaming'='true'
          'monitor-interval'='5s'
          > )*/ ;
          +----+----------------------+--------------------------------+------------+--------------------------------+
          | op |              user_id |                      user_name |   birthday |                        country |
          +----+----------------------+--------------------------------+------------+--------------------------------+
          | +I |                    5 |                       zhao_liu | 2022-02-02 |                          japan |
          | +I |                    2 |                      zhang_san | 2022-02-01 |                          china |
          | +I |                    1 |                      zhang_san | 2022-02-01 |                          china |

          可以看到最新snapshot對(duì)應(yīng)的數(shù)據(jù)

          Flink SQL> select * from hadoop_catalog.iceberg_db.my_user 
          > /*+ options(
          'streaming'='true'
          'monitor-interval'='5s',
          'start-snapshot-id'='138573494821828246'
          > )*/ ;
          +----+----------------------+--------------------------------+------------+--------------------------------+
          | op |              user_id |                      user_name |   birthday |                        country |
          +----+----------------------+--------------------------------+------------+--------------------------------+

          這里只能指定最后一個(gè)insert overwrite操作的snapshot id,及其后面的snapshot id,否則后臺(tái)會(huì)報(bào)異常,且程序一直處于restarting的狀態(tài):

          java.lang.UnsupportedOperationException: Found overwrite operation, cannot support incremental data in snapshots (8012517928892530314, 138573494821828246]

          在本示例中snapshot id: 138573494821828246,是最后一個(gè)snapshot id,同時(shí)也是最后一個(gè)insert overwrite操作的snapshot id。如果再insert兩條數(shù)據(jù),則只能看到增量的數(shù)據(jù)

          Flink SQL> insert into hadoop_catalog.iceberg_db.my_user(
          > user_id, user_name, birthday, country
          > ) values(6, 'zhang_san', date '2022-02-01''china');
          [INFO] Submitting SQL update statement to the cluster...
          [INFO] SQL update statement has been successfully submitted to the cluster:
          Job ID: 8eb279e61aed66304d78ad027eaf8d30


          Flink SQL> insert into hadoop_catalog.iceberg_db.my_user(
          > user_id, user_name, birthday, country
          > ) values(7, 'zhang_san', date '2022-02-01''china');
          [INFO] Submitting SQL update statement to the cluster...
          [INFO] SQL update statement has been successfully submitted to the cluster:
          Job ID: 70a050e455d188d0d3f3adc2ba367fb6


          Flink SQL> select * from hadoop_catalog.iceberg_db.my_user 
          > /*+ options(
          'streaming'='true'
          'monitor-interval'='30s',
          'start-snapshot-id'='138573494821828246'
          > )*/ ;
          +----+----------------------+--------------------------------+------------+--------------------------------+
          | op |              user_id |                      user_name |   birthday |                        country |
          +----+----------------------+--------------------------------+------------+--------------------------------+
          | +I |                    6 |                      zhang_san | 2022-02-01 |                          china |
          | +I |                    7 |                      zhang_san | 2022-02-01 |                          china |
          • streaming模式支持讀取增量snapshot數(shù)據(jù)
          • 如果不指定start-snapshot-id,則先讀取當(dāng)前snapshot全量數(shù)據(jù),再讀取增量數(shù)據(jù)。如果指定start-snapshot-id,讀取該snapshot-id之后的增量數(shù)據(jù),即不讀取該snapshot-id的數(shù)據(jù)
          • monitor-interval:表示監(jiān)控新提交的數(shù)據(jù)文件的時(shí)間間隔,默認(rèn)1s
          如果這個(gè)文章對(duì)你有幫助,不要忘記 「在看」 「點(diǎn)贊」 「收藏」 三連啊喂!

          2022年全網(wǎng)首發(fā)|大數(shù)據(jù)專(zhuān)家級(jí)技能模型與學(xué)習(xí)指南(勝天半子篇)
          互聯(lián)網(wǎng)最壞的時(shí)代可能真的來(lái)了
          我在B站讀大學(xué),大數(shù)據(jù)專(zhuān)業(yè)
          我們?cè)趯W(xué)習(xí)Flink的時(shí)候,到底在學(xué)習(xí)什么?
          193篇文章暴揍Flink,這個(gè)合集你需要關(guān)注一下
          Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS
          Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問(wèn)題小盤(pán)點(diǎn)
          我們?cè)趯W(xué)習(xí)Spark的時(shí)候,到底在學(xué)習(xí)什么?
          在所有Spark模塊中,我愿稱(chēng)SparkSQL為最強(qiáng)!
          硬剛Hive | 4萬(wàn)字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)
          數(shù)據(jù)治理方法論和實(shí)踐小百科全書(shū)
          標(biāo)簽體系下的用戶畫(huà)像建設(shè)小指南
          4萬(wàn)字長(zhǎng)文 | ClickHouse基礎(chǔ)&實(shí)踐&調(diào)優(yōu)全視角解析
          【面試&個(gè)人成長(zhǎng)】2021年過(guò)半,社招和校招的經(jīng)驗(yàn)之談
          大數(shù)據(jù)方向另一個(gè)十年開(kāi)啟 |《硬剛系列》第一版完結(jié)
          我寫(xiě)過(guò)的關(guān)于成長(zhǎng)/面試/職場(chǎng)進(jìn)階的文章
          當(dāng)我們?cè)趯W(xué)習(xí)Hive的時(shí)候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」
          瀏覽 77
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  视频在线一区在线观看 | 成人网站三级在线视频网站 | 色婷婷狠| 夜夜高潮少妇 | 精品一区二区三区东京热 |