<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          數(shù)據(jù)湖Iceberg | 如何正確使用Iceberg

          共 24876字,需瀏覽 50分鐘

           ·

          2021-04-10 11:22


          在介紹如何使用Iceberg之前,先簡單地介紹一下Iceberg catalog的概念。catalog是Iceberg對表進(jìn)行管理(create、drop、rename等)的一個組件。目前Iceberg主要支持HiveCatalog和HadoopCatalog兩種Catalog。其中HiveCatalog將當(dāng)前表metadata文件路徑存儲在Metastore,這個表metadata文件是所有讀寫Iceberg表的入口,所以每次讀寫Iceberg表都需要先從Metastore中取出對應(yīng)的表metadata文件路徑,然后再解析這個Metadata文件進(jìn)行接下來的操作。而HadoopCatalog將當(dāng)前表metadata文件路徑記錄在一個文件目錄下,因此不需要連接Metastore。

            

          在下述案例中,筆者會分別使用HiveCatalog和HadoopCatalog創(chuàng)建Iceberg表并進(jìn)行讀寫操作,再結(jié)合案例在下一篇文章中介紹兩者的不同之處。


          1

          下載Iceberg


          在Iceberg官方下載頁面(http://iceberg.apache.org/releases/)下載Iceberg Spark runtime Jar。目前社區(qū)已經(jīng)發(fā)布了兩個版本:apache-iceberg-0.7.0-incubating和0.8.0-incubating,建議下載最新的0.8.0版本。這里需要注意的是,Iceberg在使用上就是一個Jar包,這個Jar包可以放在Spark的Jars目錄下,然后就可以使用Spark創(chuàng)建Iceberg表、將數(shù)據(jù)導(dǎo)入Iceberg表中以及從Iceberg表中查詢。另外需要注意的一點(diǎn)是,目前Iceberg僅支持Spark 2.4版本。

            

          我們先將下載的iceberg-spark-runtime-0.8.0-incubating.jar放到spark的classpath下,然后啟動spark-shell:

          ./spark-shell --jars ../../iceberg-spark-runtime-0.8.0-incubating.jar


          2

          基于HiveCatalog示例


          1.使用Spark創(chuàng)建Iceberg表

          使用HiveCatalog創(chuàng)建Iceberg表之前需要先在Hive中創(chuàng)建對應(yīng)的數(shù)據(jù)庫,否則執(zhí)行建表語句的話會報錯。下面是完整的建表語句:

          import org.apache.iceberg.hive.HiveTableCatalogimport org.apache.iceberg.catalog.TableIdentifierimport org.apache.spark.sql.types._import org.apache.iceberg.PartitionSpecimport org.apache.iceberg.spark.SparkSchemaUtilimport org.apache.spark.sql._import org.apache.spark.sql.functions.{to_date, to_timestamp}import java.sql.Timestampval conf = spark.sessionState.newHadoopConf()conf.set("hive.metastore.uris","xxxxxx")conf.set("hive.metastore.warehouse.dir","xxxxxx")val catalog = new HiveTableCatalog(conf)//need to create hiveiceberg at Hive env firstlyval name = TableIdentifier.of("hive_iceberg""action_logs_36")val sparkSchema = StructType(List(StructField("id"IntegerType,true),StructField("user"StringType,false),StructField("action"StringType,false),StructField("music_id"LongType,false), StructField("event_time"TimestampType,false)))val icebergSchema = SparkSchemaUtil.convert(sparkSchema)val spec = PartitionSpec.builderFor(icebergSchema).hour("event_time").identity("action").buildval table = catalog.createTable(name, icebergSchema, spec)


          建好表后可以在hive查詢到創(chuàng)建的表:

          hive (default)> use hive_iceberg;OKTime taken: 0.148 secondshive (hive_iceberg)> show tables;OKdatabase_nametab_namehive_icebergaction_logsTime taken: 0.035 seconds, Fetched: 1 row(s)hive (hive_iceberg)> show create table action_logs;OKcreatetab_stmtCREATE EXTERNAL TABLE `action_logs`(  `id` int COMMENT '',  `user` string COMMENT '',  `action` string COMMENT '',  `music_id` bigint COMMENT '',  `event_time` timestamp COMMENT '')ROW FORMAT SERDE  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'STORED AS INPUTFORMAT  'org.apache.hadoop.mapred.FileInputFormat'OUTPUTFORMAT  'org.apache.hadoop.mapred.FileOutputFormat'LOCATION  'hdfs://ntsdb0.jd.163.org:9000/libis/hive-2.3.6/hive_iceberg.db/action_logs'TBLPROPERTIES (  'metadata_location'='/libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/00000-e7c1e6ce-8eb9-4faf-a176-bd94dec3c0e4.metadata.json',  'numFiles'='1',  'table_type'='ICEBERG',  'totalSize'='1448',  'transient_lastDdlTime'='1591587097')Time taken: 0.501 seconds, Fetched: 20 row(s)


          2.使用Spark導(dǎo)入數(shù)據(jù)到Iceberg表

           iceberg表建好之后,就可以使用spark將數(shù)據(jù)導(dǎo)入Iceberg表中。下面是完整的示例:

          val action_data = Seq(Row(5,"lly","view",13643L,Timestamp.valueOf("2020-06-05 03:03:00")),Row(7,"mint_1989","view",34769L,Timestamp.valueOf("2020-06-05 04:17:00")),Row(6,"lly","click",13643L,Timestamp.valueOf("2020-06-05 03:04:00")),Row(8,"mint_1989","click",34769L,Timestamp.valueOf("2020-06-05 04:17:05")))val df = spark.createDataFrame(sc.makeRDD(action_data), sparkSchema)df.write.format("iceberg").mode("append").save("hive_iceberg.action_logs_35")


          3.使用Spark查詢Iceberg表

          scala>spark.read.format("iceberg").load("hive_iceberg.action_logs_35").show+---+---------+------+--------+-------------------+| id|     user|action|music_id|         event_time|+---+---------+------+--------+-------------------+|  1|      lly|  view|   13643|2020-06-05 03:03:00||  2|      lly| click|   13643|2020-06-05 03:04:00||  3|mint_1989|  view|   34769|2020-06-05 04:17:00||  3|mint_1989| click|   34769|2020-06-05 04:17:05|+---+---------+------+--------+-------------------+


          3

          基于HadoopCatalog示例


          基于HadoopCatalog的用法與HiveCatalog基本相同,主要的區(qū)別在于建表的時候稍有不同,如下所示:

          import org.apache.iceberg.hadoop.HadoopCatalogimport org.apache.hadoop.conf.Configurationval conf = new Configuration()val catalog = new HadoopCatalog(conf,"hdfs://ntsdb0.jd.163.org:9000/libis/hive-2.3.6/")val name = TableIdentifier.of("hadoop_iceberg""action_logs")...val table = catalog.createTable(name,icebergSchema, spec)


          導(dǎo)入和查詢的時候也有一點(diǎn)區(qū)別:

          df.write.format("iceberg").mode("append").save("hdfs://ntsdb0.jd.163.org:9000/libis/hive-2.3.6/hadoop_iceberg/action_logs")spark.read.format("iceberg").load("hdfs://ntsdb0.jd.163.org:9000/libis/hive-2.3.6/hadoop_iceberg/action_logs").show


          4

          Iceberg表的目錄組織形式


          1.HiveCatalog

          hadoop@xxx:~$ hdfs dfs -ls /libis/hive-2.3.6/hive_iceberg.db/action_logsFound 2 itemsdrwxrwxrwx   - hadoop supergroup          0 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/datadrwxrwxrwx   - hadoop supergroup          0 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata

          中data目錄下存儲數(shù)據(jù)文件,metadata目錄下存儲元數(shù)據(jù)文件。

            

          2.metadata目錄

          hadoop@xxx:~$ hdfs dfs -ls /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadataFound 4 items-rw-r--r--   1 hadoop supergroup       1448 2020-06-08 11:31 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/00000-e7c1e6ce-8eb9-4faf-a176-bd94dec3c0e4.metadata.json-rw-r--r--   1 hadoop supergroup       2217 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/00001-62ade8ab-c1cf-40d3-bc21-fd5027bc3a55.metadata.json-rw-r--r--   1 hadoop supergroup       5040 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/bb641961-162a-49a8-b567-885430d4e799-m0.avro-rw-r--r--   1 hadoop supergroup       2567 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/snap-6771375506965563160-1-bb641961-162a-49a8-b567-885430d4e799.avro

          其中00001-62ade8ab-c1cf-40d3-bc21-fd5027bc3a55.metadata.json中存儲表的schema、partition spec以及當(dāng)前snapshot manifests文件路徑。snap-6771375506965563160-1-bb641961-162a-49a8-b567-885430d4e799.avro存儲manifest文件路徑。bb641961-162a-49a8-b567-885430d4e799-m0.avro記錄本次提交的文件以及文件級別元數(shù)據(jù)。


          3.data目錄

          hadoop@xxx:~$ hdfs dfs -ls /libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-19/action=clickFound 1 items-rw-r--r--   1 hadoop supergroup       1425 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-19/action=click/00015-47-a9f5ce8f-ee6f-4748-9f49-0f94761859bc-00000.parquet


          4.HadoopCatalog

          Hadoopcatalog與Hivecatalog的的data目錄完全相同,metadata目錄下文件稍有不同,HadoopCatalog管理的metadata目錄如下所示:

          hadoop@xxx:~$ hdfs dfs -ls /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadataFound 5 items-rw-r--r--   1 hadoop supergroup       5064 2020-06-08 17:24 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/b222d277-2692-4e35-9327-3716dec9f070-m0.avro-rw-r--r--   1 hadoop supergroup       2591 2020-06-08 17:24 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/snap-3124052841098464551-1-b222d277-2692-4e35-9327-3716dec9f070.avro-rw-r--r--   1 hadoop supergroup       1476 2020-06-08 17:23 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/v1.metadata.json-rw-r--r--   1 hadoop supergroup       2261 2020-06-08 17:24 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/v2.metadata.json-rw-r--r--   1 hadoop supergroup          1 2020-06-08 17:24 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/version-hint.text 其中文件version-hint.text中存儲當(dāng)前iceberg表的最新snapshot_id,如下所示:hadoop@xxx:~$ hdfs dfs -cat /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/version-hint.text 2

          說明該表的最新snapshot_id是2,即對應(yīng)的snapshot元數(shù)據(jù)文件是v2.metadata.json,解析v2.metadata.json可以獲取到該表當(dāng)前最新snapshot對應(yīng)的schema、partition spec、父snapshot以及該snapshot對應(yīng)的manifestList文件路徑等,因此version-hint.text是HadoopCatalog獲取當(dāng)前snapshot版本的入口。


          閱讀到這里,讀者可能就會問了,HiveCatalog的metadata目錄下并沒有version-hint.text文件,那它獲取當(dāng)前snapshot版本的入口在哪里呢?它的入口在Metastore中的schema里面,讀者可以在HiveCatalog建表schema中的TBPROPERTIES中有個key是"metadata_location",對應(yīng)的value就是當(dāng)前最新的snapshot文件。因此,有兩點(diǎn)需要說明:

          1. HiveCatalog創(chuàng)建的表,每次提交寫入文件生成新的snapshot后都需要更新Metastore中的metadata_location字段。

          2. HiveCatalog和HadoopCatalog不能混用。即使用HiveCatalog創(chuàng)建的表,再使用HadoopCatalog是不能正常加載的,反之亦然。


          5

          為什么選擇HadoopCatalog


          上面說到Iceberg目前支持兩種Catalog,而且兩種Catalog相互不兼容。那這里有兩個問題:

          1. 社區(qū)是出于什么考慮實(shí)現(xiàn)兩種不兼容的Catalog?

          2. 因?yàn)閮烧卟患嫒荩仨氝x擇其一作為系統(tǒng)唯一的Catalog,那是選擇HiveCatalog還是HadoopCatalog,為什么?

            

          先回答第一個問題。社區(qū)是出于什么考慮實(shí)現(xiàn)兩種不兼容的Catalog?

          在回答這個問題之前,首先回顧一下上一篇文章中介紹到的基于HadoopCatalog,Iceberg實(shí)現(xiàn)數(shù)據(jù)寫入提交的ACID機(jī)制,最終的結(jié)論是使用了樂觀鎖機(jī)制和HDFS rename的原子性一起保障寫入提交的ACID。如果某些文件系統(tǒng)比如S3不支持rename的原子性呢?那就需要另外一種機(jī)制保障寫入提交的ACID,HiveCatalog就是另一種不依賴文件系統(tǒng)支持,但是可以提供ACID支持的方案,它在每次提交的時候都更新MySQL中同一行記錄,這樣的更新MySQL本身是可以保證ACID的。這就是社區(qū)為什么會支持兩種不兼容Catalog的本質(zhì)原因。

            

          再來回答第二個問題。HadoopCatalog依賴于HDFS提供的rename原子性語義,而HiveCatalog不依賴于任何文件系統(tǒng)的rename原子性語義支持,因此基于HiveCatalog的表不僅可以支持HDFS,而且可以支持s3、oss等其他文件系統(tǒng)。但是HadoopCatalog可以認(rèn)為只支持HDFS表,比較難以遷移到其他文件系統(tǒng)。但是HadoopCatalog寫入提交的過程只依賴HDFS,不和Metastore/MySQL交互,而HiveCatalog每次提交都需要和Metastore/MySQL交互,可以認(rèn)為是強(qiáng)依賴于Metastore,如果Metastore有異常,基于HiveCatalog的Iceberg表的寫入和查詢會有問題。相反,HadoopCatalog并不依賴于Metastore,即使Metastore有異常,也不影響Iceberg表的寫入和查詢。

            

          考慮到我們目前主要還是依賴HDFS,同時不想強(qiáng)依賴于Metastore,所以我們選擇HadoopCatalog作為我們系統(tǒng)唯一的Catalog。即使有一天,想要把HDFS上的表遷移到S3上去,也是可以辦到的,大家想想,無論是HadoopCatalog還是HiveCatalog,數(shù)據(jù)文件和元數(shù)據(jù)文件本身都是相同的,只是標(biāo)記當(dāng)前最新的snapshot的入口不一樣,那只需要簡單的手動變換一下入口就可以實(shí)現(xiàn)Catalog的切換,切換到HiveCatalog上之后,就可以擺脫HDFS的依賴,問題并不大。

            

          6

          Iceberg表相關(guān)元數(shù)據(jù)內(nèi)容


          上一篇介紹Iceberg的文章中講到了Iceberg寫入數(shù)據(jù)文件提交snapshot的時候會依次生成manifest文件、manifestList文件以及snapshot文件,那這些元數(shù)據(jù)文件中都存放什么記錄呢?如果使用HiveCatalog創(chuàng)建的表,可以直接使用spark讀出相關(guān)元數(shù)據(jù)記錄。如果是HadoopCatalog創(chuàng)建的表,暫時不能直接使用spark讀出相關(guān)信息,但是也可以使用文件解析器解析出相關(guān)元數(shù)據(jù)文件內(nèi)容。


          下文基于HiveCatalog建表模式介紹Iceberg相關(guān)元數(shù)據(jù):

          scala>spark.read.format("iceberg").load("hive_iceberg.action_logs.history").show+--------------------+-------------------+---------+-------------------+|     made_current_at|        snapshot_id|parent_id|is_current_ancestor|+--------------------+-------------------+---------+-------------------+|2020-06-08 12:20:...|6771375506965563160|     null|               true|+--------------------+-------------------+---------+-------------------+scala> spark.read.format("iceberg").load("hive_iceberg.action_logs.snapshots").show+--------------------+-------------------+---------+---------+--------------------+--------------------+|        committed_at|        snapshot_id|parent_id|operation|       manifest_list|             summary|+--------------------+-------------------+---------+---------+--------------------+--------------------+|2020-06-08 12:20:...|6771375506965563160|     null|   append|/libis/hive-2.3.6...|[spark.app.id -> ...|+--------------------+-------------------+---------+---------+--------------------+--------------------+scala> spark.read.format("iceberg").load("hive_iceberg.action_logs.manifests").show(false)+---------------------------------------------------------------------------------------------------+------+-----------------+-------------------+----------------------+-------------------------+------------------------+-------------------------------------------------------------+|path                                                                                               |length|partition_spec_id|added_snapshot_id  |added_data_files_count|existing_data_files_count|deleted_data_files_count|partition_summaries                                          |+---------------------------------------------------------------------------------------------------+------+-----------------+-------------------+----------------------+-------------------------+------------------------+-------------------------------------------------------------+|/libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/bb641961-162a-49a8-b567-885430d4e799-m0.avro|5040  |0                |6771375506965563160|4                     |0                        |0                       |[[false, 2020-06-04-19, 2020-06-04-20], [false, click, view]]|+---------------------------------------------------------------------------------------------------+------+-----------------+-------------------+----------------------+-------------------------+------------------------+-------------------------------------------------------------+scala> spark.read.format("iceberg").load("hive_iceberg.action_logs.files").show(false)+---------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------------+------------+------------------+-------------------+---------------------------------------------+----------------------------------------+----------------------------------------+--------------------------------------------------------------------+--------------------------------------------------------------------+------------+-------------+|file_path                                                                                                                                                |file_format|partition      |record_count|file_size_in_bytes|block_size_in_bytes|column_sizes                                 |value_counts                            |null_value_counts                       |lower_bounds                                                        |upper_bounds                                                        |key_metadata|split_offsets|+---------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------------+------------+------------------+-------------------+---------------------------------------------+----------------------------------------+----------------------------------------+--------------------------------------------------------------------+--------------------------------------------------------------------+------------+-------------+|/libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-19/action=view/00007-39-4e7af786-9668-4e3d-b8aa-07b7b30fa60a-00000.parquet |PARQUET    |[442027, view] |1           |1418              |67108864           |[1 -> 51, 2 -> 50, 3 -> 51, 4 -> 47, 5 -> 51]|[1 -> 12 -> 13 -> 14 -> 15 -> 1]|[1 -> 0, 2 -> 0, 3 -> 0, 4 -> 0, 5 -> 0]|[1 -> , 2 -> lly, 3 -> view, 4 -> K5, 5 -> !?F?]      |[1 -> , 2 -> lly, 3 -> view, 4 -> K5, 5 -> !?F?]      |null        |[4]          ||/libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-19/action=click/00015-47-a9f5ce8f-ee6f-4748-9f49-0f94761859bc-00000.parquet|PARQUET    |[442027, click]|1           |1425              |67108864           |[1 -> 51, 2 -> 50, 3 -> 52, 4 -> 47, 5 -> 51]|[1 -> 12 -> 13 -> 14 -> 15 -> 1]|[1 -> 0, 2 -> 0, 3 -> 0, 4 -> 0, 5 -> 0]|[1 -> , 2 -> lly, 3 -> click, 4 -> K5, 5 -> ???F?]     |[1 -> , 2 -> lly, 3 -> click, 4 -> K5, 5 -> ???F?]     |null        |[4]          ||/libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-20/action=view/00023-55-f0494272-6166-4386-88c7-059e3081aa11-00000.parquet |PARQUET    |[442028, view] |1           |1460              |67108864           |[1 -> 51, 2 -> 56, 3 -> 51, 4 -> 47, 5 -> 51]|[1 -> 12 -> 13 -> 14 -> 15 -> 1]|[1 -> 0, 2 -> 0, 3 -> 0, 4 -> 0, 5 -> 0]|[1 -> , 2 -> mint_1989, 3 -> view, 4 -> ч, 5 -> '??G?] |[1 -> , 2 -> mint_1989, 3 -> view, 4 -> ч, 5 -> '??G?] |null        |[4]          ||/libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-20/action=click/00031-63-a04ce10d-ae98-4004-bda8-2f18d842b66b-00000.parquet|PARQUET    |[442028, click]|1           |1467              |67108864           |[1 -> 512 -> 563 -> 524 -> 475 -> 51]|[1 -> 1, 2 -> 1, 3 -> 1, 4 -> 1, 5 -> 1]|[1 -> 02 -> 03 -> 04 -> 05 -> 0]|[1 -> , 2 -> mint_1989, 3 -> click, 4 -> ч, 5 -> @r?G?]|[1 -> , 2 -> mint_1989, 3 -> click, 4 -> ч, 5 -> @r?G?]|null        |[4]          |+---------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------------+------------+------------------+-------------------+---------------------------------------------+----------------------------------------+----------------------------------------+--------------------------------------------------------------------+--------------------------------------------------------------------+------------+-------------+


          7

          總結(jié)


          本文基于上一篇介紹Iceberg入門的文章,介紹了如何使用Iceberg進(jìn)行簡單的創(chuàng)建表、往表中寫數(shù)據(jù)、查詢表數(shù)據(jù)以及表中元數(shù)據(jù)。同時也介紹了HadoopCatalog和HiveCatalog的不同以及我們選擇HadoopCatalog的一些想法。



          作者簡介

          子和,網(wǎng)易大數(shù)據(jù)開發(fā)工程師,長期從事分布式KV數(shù)據(jù)庫、分布式時序數(shù)據(jù)庫以及大數(shù)據(jù)底層組件等相關(guān)工作。



          分享,點(diǎn)贊,在看,安排一下?
          瀏覽 104
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产成人免费做爰视频 | 午夜三级电影 | 麻豆亚洲AV成人无码一区精品 | 免费一区二三区 | 日本黄色电影网站wwww |