<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Hudi 原理 | 詳解 Apache Hudi Schema Evolution 模式演進

          共 20033字,需瀏覽 41分鐘

           ·

          2022-07-01 01:03



          Schema Evolution(模式演進)允許用戶輕松更改 Hudi 表的當前模式,以適應隨時間變化的數(shù)據(jù)。從 0.11.0 版本開始,支持 Spark SQL(spark3.1.x 和 spark3.2.1)對 Schema 演進的 DDL 支持并且標志為實驗性的。

          場景

          • ? 可以添加、刪除、修改和移動列(包括嵌套列)

          • ? 分區(qū)列不能演進

          • ? 不能對 Array 類型的嵌套列進行添加、刪除或操作

          SparkSQL模式演進以及語法描述

          使用模式演進之前,請先設(shè)置spark.sql.extensions,對于spark 3.2.x,需要設(shè)置spark.sql.catalog.spark_catalog

          Spark SQL for spark 3.1.x
          spark-sql --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.11.1,org.apache.spark:spark-avro_2.12:3.1.2 \
          --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
          --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

          Spark SQL for spark 3.2.1
          spark-sql --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.11.1,org.apache.spark:spark-avro_2.12:3.2.1 \
          --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
          --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
          --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

          啟動spark app后,請執(zhí)行set schema.on.read.enable=true開啟模式演進

          當前模式演進開啟后不能關(guān)閉

          添加列

          語法

          -- add columns
          ALTER TABLE Table name ADD COLUMNS(col_spec[, col_spec ...])

          參數(shù)描述

          參數(shù)描述
          tableName表名
          col_spec列定義,由五個字段組成,col_name, col_type, nullable, comment, col_position

          col_name : 新列名,強制必須存在,如果在嵌套類型中添加子列,請指定子列的全路徑

          示例

          • ? 在嵌套類型users struct<name: string, age int>中添加子列col1,設(shè)置字段為users.col1

          • ? 在嵌套map類型member map<string, struct<n: string, a: int>>中添加子列col1, 設(shè)置字段為member.value.col1

          col_type : 新列的類型 

          nullable : 新列是否可為null,可為空,當前Hudi中并未使用 

          comment : 新列的注釋,可為空 

          col_position : 列添加的位置,值可為FIRST或者AFTER 某字段

          • ? 如果設(shè)置為FIRST,那么新加的列在表的第一列

          • ? 如果設(shè)置為AFTER 某字段,將在某字段后添加新列

          • ? 如果設(shè)置為空,只有當新的子列被添加到嵌套列時,才能使用 FIRST。不要在頂級列中使用 FIRST。AFTER 的使用沒有限制。

          示例

          alter table h0 add columns(ext0 string);
          alter table h0 add columns(new_col int not null comment 'add new column' after col1);
          alter table complex_table add columns(col_struct.col_name string comment 'add new column to a struct col' after col_from_col_struct);

          修改列

          語法

          -- alter table ... alter column
          ALTER TABLE Table name ALTER [COLUMN] col_old_name TYPE column_type [COMMENT] col_comment[FIRST|AFTER] column_name

          參數(shù)描述

          參數(shù)描述
          tableName表名
          col_old_name待修改的列名
          column_type新的列類型
          col_comment列comment
          column_name列名,放置目標列的新位置。例如,AFTER column_name 表示目標列放在 column_name 之后

          示例

          --- Changing the column type
          ALTER TABLE table1 ALTER COLUMN a.b.c TYPE bigint

          --- Altering other attributes
          ALTER TABLE table1 ALTER COLUMN a.b.c COMMENT 'new comment'
          ALTER TABLE table1 ALTER COLUMN a.b.c FIRST
          ALTER TABLE table1 ALTER COLUMN a.b.c AFTER x
          ALTER TABLE table1 ALTER COLUMN a.b.c DROP NOT NULL

          列類型變更兼容表

          源列類型\目標列類型longfloatdoublestringdecimaldateint
          intYYYYYNY
          longYNYYYNN
          floatNYYYYNN
          doubleNNYYYNN
          decimalNNNYYNN
          stringNNNYYYN
          dateNNNYNYN

          刪除列

          語法

          -- alter table ... drop columns
          ALTER TABLE tableName DROP COLUMN|COLUMNS cols

          示例

          ALTER TABLE table1 DROP COLUMN a.b.c
          ALTER TABLE table1 DROP COLUMNS a.b.c, x, y

          修改列名

          語法

          -- alter table ... rename column
          ALTER TABLE tableName RENAME COLUMN old_columnName TO new_columnName

          示例

          ALTER TABLE table1 RENAME COLUMN a.b.c TO x

          修改表屬性

          語法

          -- alter table ... set|unset
          ALTER TABLE Table name SET|UNSET tblproperties

          示例

          ALTER TABLE table SET TBLPROPERTIES ('table_property' = 'property_value')
          ALTER TABLE table UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key')

          修改表名

          語法

          -- alter table ... rename
          ALTER TABLE tableName RENAME TO newTableName

          示例

          ALTER TABLE table1 RENAME TO table2

          0.11.0之前的模式演進

          模式演進是數(shù)據(jù)管理的一個非常重要的方面。Hudi 支持開箱即用的常見模式演進場景,例如添加可為空的字段或提升字段的數(shù)據(jù)類型。此外,演進后的模式可以跨引擎查詢,例如 Presto、Hive 和 Spark SQL。下表總結(jié)了與不同 Hudi 表類型兼容的Schema變更類型。

          Schema變更COWMOR說明
          在最后的根級別添加一個新的可為空列YesYesYes意味著具有演進模式的寫入成功并且寫入之后的讀取成功讀取整個數(shù)據(jù)集
          向內(nèi)部結(jié)構(gòu)添加一個新的可為空列(最后)YesYes
          添加具有默認值的新復雜類型字段(map和array)YesYes
          添加新的可為空列并更改字段的順序NoNo如果使用演進模式的寫入僅更新了一些基本文件而不是全部,則寫入成功但讀取失敗。目前Hudi 不維護模式注冊表,其中包含跨基礎(chǔ)文件的更改歷史記錄。然而如果 upsert 觸及所有基本文件,則讀取將成功
          添加自定義可為空的 Hudi 元列,例如 _hoodie_meta_colYesYes
          將根級別字段的數(shù)據(jù)類型從 int 提升為 longYesYes對于其他類型,Hudi 支持與Avro相同 Avro schema resolution[1]
          .


          將嵌套字段的數(shù)據(jù)類型從 int 提升為 longYesYes
          對于復雜類型(map或array的值),將數(shù)據(jù)類型從 int 提升為 longYesYes
          在最后的根級別添加一個新的不可為空的列NoNo對于Spark數(shù)據(jù)源的MOR表,寫入成功但讀取失敗。作為一種解決方法,您可以使該字段為空
          向內(nèi)部結(jié)構(gòu)添加一個新的不可為空的列(最后)NoNo
          將嵌套字段的數(shù)據(jù)類型從 long 更改為 intNoNo
          將復雜類型的數(shù)據(jù)類型從 long 更改為 int(映射或數(shù)組的值)NoNo

          讓我們通過一個示例來演示 Hudi 中的模式演進支持。在下面的示例中,我們將添加一個新的字符串字段并將字段的數(shù)據(jù)類型從 int 更改為 long。

          Welcome to
              ____              __
              / __/__  ___ _____/ /__
              _\ \/ _ \/ _ `/ __/  '_/
              /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
              /_/

              Using Scala version 2.12.10 (OpenJDK 64-Bit Server VMJava 1.8.0_292)
              Type in expressions to have them evaluated.
              Type :help for more information.

          scala> import org.apache.hudi.QuickstartUtils._
          import org.apache.hudi.QuickstartUtils._

          scala> import scala.collection.JavaConversions._
          import scala.collection.JavaConversions._

          scala> import org.apache.spark.sql.SaveMode._
          import org.apache.spark.sql.SaveMode._

          scala> import org.apache.hudi.DataSourceReadOptions._
          import org.apache.hudi.DataSourceReadOptions._

          scala> import org.apache.hudi.DataSourceWriteOptions._
          import org.apache.hudi.DataSourceWriteOptions._

          scala> import org.apache.hudi.config.HoodieWriteConfig._
          import org.apache.hudi.config.HoodieWriteConfig._

          scala> import org.apache.spark.sql.types._
          import org.apache.spark.sql.types._

          scala> import org.apache.spark.sql.Row
          import org.apache.spark.sql.Row

          scala> val tableName = "hudi_trips_cow"
              tableName: String = hudi_trips_cow
          scala> val basePath = "file:///tmp/hudi_trips_cow"
              basePath: String = file:///tmp/hudi_trips_cow
          scala> val schema = StructTypeArray(
              | StructField("rowId"StringType,true),
              | StructField("partitionId"StringType,true),
              | StructField("preComb"LongType,true),
              | StructField("name"StringType,true),
              | StructField("versionId"StringType,true),
              | StructField("intToLong"IntegerType,true)
              | ))
              schema: org.apache.spark.sql.types.StructType = StructType(StructField(rowId,StringType,true), StructField(partitionId,StringType,true), StructField(preComb,LongType,true), StructField(name,StringType,true), StructField(versionId,StringType,true), StructField(intToLong,IntegerType,true))
              
          scala> val data1 = Seq(Row("row_1""part_0"0L, "bob""v_0"0),
              |                Row("row_2""part_0"0L, "john""v_0"0),
              |                Row("row_3""part_0"0L, "tom""v_0"0))
              data1: Seq[org.apache.spark.sql.Row] = List([row_1,part_0,0,bob,v_0,0], [row_2,part_0,0,john,v_0,0], [row_3,part_0,0,tom,v_0,0])

          scala> var dfFromData1 = spark.createDataFrame(data1, schema)
          scala> dfFromData1.write.format("hudi").
              |   options(getQuickstartWriteConfigs).
              |   option(PRECOMBINE_FIELD_OPT_KEY.key, "preComb").
              |   option(RECORDKEY_FIELD_OPT_KEY.key, "rowId").
              |   option(PARTITIONPATH_FIELD_OPT_KEY.key, "partitionId").
              |   option("hoodie.index.type","SIMPLE").
              |   option(TABLE_NAME.key, tableName).
              |   mode(Overwrite).
              |   save(basePath)

          scala> var tripsSnapshotDF1 = spark.read.format("hudi").load(basePath + "/*/*")
              tripsSnapshotDF1: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 9 more fields]

          scala> tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")

          scala> spark.sql("desc hudi_trips_snapshot").show()
              +--------------------+---------+-------+
              |            col_name|data_type|comment|
              +--------------------+---------+-------+
              | _hoodie_commit_time|   string|   null|
              |_hoodie_commit_seqno|   string|   null|
              |  _hoodie_record_key|   string|   null|
              |_hoodie_partition...|   string|   null|
              |   _hoodie_file_name|   string|   null|
              |               rowId|   string|   null|
              |         partitionId|   string|   null|
              |             preComb|   bigint|   null|
              |                name|   string|   null|
              |           versionId|   string|   null|
              |           intToLong|      int|   null|
              +--------------------+---------+-------+
              
          scala> spark.sql("select rowId, partitionId, preComb, name, versionId, intToLong from hudi_trips_snapshot").show()
              +-----+-----------+-------+----+---------+---------+
              |rowId|partitionId|preComb|name|versionId|intToLong|
              +-----+-----------+-------+----+---------+---------+
              |row_3|     part_0|      0| tom|      v_0|        0|
              |row_2|     part_0|      0|john|      v_0|        0|
              |row_1|     part_0|      0| bob|      v_0|        0|
              +-----+-----------+-------+----+---------+---------+

          // In the new schema, we are going to add a String field and 
          // change the datatype `intToLong` field from  int to long.
          scala> val newSchema = StructTypeArray(
              | StructField("rowId"StringType,true),
              | StructField("partitionId"StringType,true),
              | StructField("preComb"LongType,true),
              | StructField("name"StringType,true),
              | StructField("versionId"StringType,true),
              | StructField("intToLong"LongType,true),
              | StructField("newField"StringType,true)
              | ))
              newSchema: org.apache.spark.sql.types.StructType = StructType(StructField(rowId,StringType,true), StructField(partitionId,StringType,true), StructField(preComb,LongType,true), StructField(name,StringType,true), StructField(versionId,StringType,true), StructField(intToLong,LongType,true), StructField(newField,StringType,true))

          scala> val data2 = Seq(Row("row_2""part_0"5L, "john""v_3"3L, "newField_1"),
              |                Row("row_5""part_0"5L, "maroon""v_2"2L, "newField_1"),
              |                Row("row_9""part_0"5L, "michael""v_2"2L, "newField_1"))
              data2: Seq[org.apache.spark.sql.Row] = List([row_2,part_0,5,john,v_3,3,newField_1], [row_5,part_0,5,maroon,v_2,2,newField_1], [row_9,part_0,5,michael,v_2,2,newField_1])

          scala> var dfFromData2 = spark.createDataFrame(data2, newSchema)
          scala> dfFromData2.write.format("hudi").
              |   options(getQuickstartWriteConfigs).
              |   option(PRECOMBINE_FIELD_OPT_KEY.key, "preComb").
              |   option(RECORDKEY_FIELD_OPT_KEY.key, "rowId").
              |   option(PARTITIONPATH_FIELD_OPT_KEY.key, "partitionId").
              |   option("hoodie.index.type","SIMPLE").
              |   option(TABLE_NAME.key, tableName).
              |   mode(Append).
              |   save(basePath)

          scala> var tripsSnapshotDF2 = spark.read.format("hudi").load(basePath + "/*/*")
              tripsSnapshotDF2: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 10 more fields]

          scala> tripsSnapshotDF2.createOrReplaceTempView("hudi_trips_snapshot")

          scala> spark.sql("desc hudi_trips_snapshot").show()
              +--------------------+---------+-------+
              |            col_name|data_type|comment|
              +--------------------+---------+-------+
              | _hoodie_commit_time|   string|   null|
              |_hoodie_commit_seqno|   string|   null|
              |  _hoodie_record_key|   string|   null|
              |_hoodie_partition...|   string|   null|
              |   _hoodie_file_name|   string|   null|
              |               rowId|   string|   null|
              |         partitionId|   string|   null|
              |             preComb|   bigint|   null|
              |                name|   string|   null|
              |           versionId|   string|   null|
              |           intToLong|   bigint|   null|
              |            newField|   string|   null|
              +--------------------+---------+-------+


          scala> spark.sql("select rowId, partitionId, preComb, name, versionId, intToLong, newField from hudi_trips_snapshot").show()
              +-----+-----------+-------+-------+---------+---------+----------+
              |rowId|partitionId|preComb|   name|versionId|intToLong|  newField|
              +-----+-----------+-------+-------+---------+---------+----------+
              |row_3|     part_0|      0|    tom|      v_0|        0|      null|
              |row_2|     part_0|      5|   john|      v_3|        3|newField_1|
              |row_1|     part_0|      0|    bob|      v_0|        0|      null|
              |row_5|     part_0|      5| maroon|      v_2|        2|newField_1|
              |row_9|     part_0|      5|michael|      v_2|        2|newField_1|
              +-----+-----------+-------+-------+---------+---------+----------+

          引用鏈接

          [1] Avro schema resolution: http://avro.apache.org/docs/current/spec#Schema+Resolution

          推薦閱讀

          Apache Hudi數(shù)據(jù)跳過技術(shù)加速查詢高達50倍

          深入理解Apache Hudi異步索引機制

          基于Apache Hudi拉鏈表的全量表極限存儲優(yōu)化方案

          騰訊廣告業(yè)務(wù)基于Apache Flink + Hudi的批流一體實踐

          基于TIS構(gòu)建Apache Hudi千表入湖方案




          瀏覽 114
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  无码啪啪啪 | 一级黄色片。 | 逼逼网址 | 激情 小说 图片 亚洲 伦 | 亚洲欧美在线观看久99一区 |