Hudi 原理 | 詳解 Apache Hudi Schema Evolution 模式演進

Schema Evolution(模式演進)允許用戶輕松更改 Hudi 表的當前模式,以適應隨時間變化的數(shù)據(jù)。從 0.11.0 版本開始,支持 Spark SQL(spark3.1.x 和 spark3.2.1)對 Schema 演進的 DDL 支持并且標志為實驗性的。
場景
? 可以添加、刪除、修改和移動列(包括嵌套列)
? 分區(qū)列不能演進
? 不能對 Array 類型的嵌套列進行添加、刪除或操作
SparkSQL模式演進以及語法描述
使用模式演進之前,請先設(shè)置spark.sql.extensions,對于spark 3.2.x,需要設(shè)置spark.sql.catalog.spark_catalog
# Spark SQL for spark 3.1.x
spark-sql --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.11.1,org.apache.spark:spark-avro_2.12:3.1.2 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
# Spark SQL for spark 3.2.1
spark-sql --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.11.1,org.apache.spark:spark-avro_2.12:3.2.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'啟動spark app后,請執(zhí)行set schema.on.read.enable=true開啟模式演進
當前模式演進開啟后不能關(guān)閉
添加列
語法
-- add columns
ALTER TABLE Table name ADD COLUMNS(col_spec[, col_spec ...])參數(shù)描述
| 參數(shù) | 描述 |
| tableName | 表名 |
| col_spec | 列定義,由五個字段組成,col_name, col_type, nullable, comment, col_position |
col_name : 新列名,強制必須存在,如果在嵌套類型中添加子列,請指定子列的全路徑
示例
? 在嵌套類型users struct<name: string, age int>中添加子列col1,設(shè)置字段為users.col1
? 在嵌套map類型member map<string, struct<n: string, a: int>>中添加子列col1, 設(shè)置字段為member.value.col1
col_type : 新列的類型
nullable : 新列是否可為null,可為空,當前Hudi中并未使用
comment : 新列的注釋,可為空
col_position : 列添加的位置,值可為FIRST或者AFTER 某字段
? 如果設(shè)置為FIRST,那么新加的列在表的第一列
? 如果設(shè)置為AFTER 某字段,將在某字段后添加新列
? 如果設(shè)置為空,只有當新的子列被添加到嵌套列時,才能使用 FIRST。不要在頂級列中使用 FIRST。AFTER 的使用沒有限制。
示例
alter table h0 add columns(ext0 string);
alter table h0 add columns(new_col int not null comment 'add new column' after col1);
alter table complex_table add columns(col_struct.col_name string comment 'add new column to a struct col' after col_from_col_struct);修改列
語法
-- alter table ... alter column
ALTER TABLE Table name ALTER [COLUMN] col_old_name TYPE column_type [COMMENT] col_comment[FIRST|AFTER] column_name參數(shù)描述
| 參數(shù) | 描述 |
| tableName | 表名 |
| col_old_name | 待修改的列名 |
| column_type | 新的列類型 |
| col_comment | 列comment |
| column_name | 列名,放置目標列的新位置。例如,AFTER column_name 表示目標列放在 column_name 之后 |
示例
--- Changing the column type
ALTER TABLE table1 ALTER COLUMN a.b.c TYPE bigint
--- Altering other attributes
ALTER TABLE table1 ALTER COLUMN a.b.c COMMENT 'new comment'
ALTER TABLE table1 ALTER COLUMN a.b.c FIRST
ALTER TABLE table1 ALTER COLUMN a.b.c AFTER x
ALTER TABLE table1 ALTER COLUMN a.b.c DROP NOT NULL列類型變更兼容表
| 源列類型\目標列類型 | long | float | double | string | decimal | date | int |
| int | Y | Y | Y | Y | Y | N | Y |
| long | Y | N | Y | Y | Y | N | N |
| float | N | Y | Y | Y | Y | N | N |
| double | N | N | Y | Y | Y | N | N |
| decimal | N | N | N | Y | Y | N | N |
| string | N | N | N | Y | Y | Y | N |
| date | N | N | N | Y | N | Y | N |
刪除列
語法
-- alter table ... drop columns
ALTER TABLE tableName DROP COLUMN|COLUMNS cols示例
ALTER TABLE table1 DROP COLUMN a.b.c
ALTER TABLE table1 DROP COLUMNS a.b.c, x, y修改列名
語法
-- alter table ... rename column
ALTER TABLE tableName RENAME COLUMN old_columnName TO new_columnName示例
ALTER TABLE table1 RENAME COLUMN a.b.c TO x修改表屬性
語法
-- alter table ... set|unset
ALTER TABLE Table name SET|UNSET tblproperties示例
ALTER TABLE table SET TBLPROPERTIES ('table_property' = 'property_value')
ALTER TABLE table UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key')修改表名
語法
-- alter table ... rename
ALTER TABLE tableName RENAME TO newTableName示例
ALTER TABLE table1 RENAME TO table20.11.0之前的模式演進
模式演進是數(shù)據(jù)管理的一個非常重要的方面。Hudi 支持開箱即用的常見模式演進場景,例如添加可為空的字段或提升字段的數(shù)據(jù)類型。此外,演進后的模式可以跨引擎查詢,例如 Presto、Hive 和 Spark SQL。下表總結(jié)了與不同 Hudi 表類型兼容的Schema變更類型。
| Schema變更 | COW | MOR | 說明 |
| 在最后的根級別添加一個新的可為空列 | Yes | Yes | Yes意味著具有演進模式的寫入成功并且寫入之后的讀取成功讀取整個數(shù)據(jù)集 |
| 向內(nèi)部結(jié)構(gòu)添加一個新的可為空列(最后) | Yes | Yes | |
| 添加具有默認值的新復雜類型字段(map和array) | Yes | Yes | |
| 添加新的可為空列并更改字段的順序 | No | No | 如果使用演進模式的寫入僅更新了一些基本文件而不是全部,則寫入成功但讀取失敗。目前Hudi 不維護模式注冊表,其中包含跨基礎(chǔ)文件的更改歷史記錄。然而如果 upsert 觸及所有基本文件,則讀取將成功 |
| 添加自定義可為空的 Hudi 元列,例如 _hoodie_meta_col | Yes | Yes | |
| 將根級別字段的數(shù)據(jù)類型從 int 提升為 long | Yes | Yes | 對于其他類型,Hudi 支持與Avro相同 Avro schema resolution[1] |
| . | |||
| 將嵌套字段的數(shù)據(jù)類型從 int 提升為 long | Yes | Yes | |
| 對于復雜類型(map或array的值),將數(shù)據(jù)類型從 int 提升為 long | Yes | Yes | |
| 在最后的根級別添加一個新的不可為空的列 | No | No | 對于Spark數(shù)據(jù)源的MOR表,寫入成功但讀取失敗。作為一種解決方法,您可以使該字段為空 |
| 向內(nèi)部結(jié)構(gòu)添加一個新的不可為空的列(最后) | No | No | |
| 將嵌套字段的數(shù)據(jù)類型從 long 更改為 int | No | No | |
| 將復雜類型的數(shù)據(jù)類型從 long 更改為 int(映射或數(shù)組的值) | No | No |
讓我們通過一個示例來演示 Hudi 中的模式演進支持。在下面的示例中,我們將添加一個新的字符串字段并將字段的數(shù)據(jù)類型從 int 更改為 long。
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.2
/_/
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_292)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.QuickstartUtils._
scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
scala> import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SaveMode._
scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._
scala> import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions._
scala> import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig._
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> val tableName = "hudi_trips_cow"
tableName: String = hudi_trips_cow
scala> val basePath = "file:///tmp/hudi_trips_cow"
basePath: String = file:///tmp/hudi_trips_cow
scala> val schema = StructType( Array(
| StructField("rowId", StringType,true),
| StructField("partitionId", StringType,true),
| StructField("preComb", LongType,true),
| StructField("name", StringType,true),
| StructField("versionId", StringType,true),
| StructField("intToLong", IntegerType,true)
| ))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(rowId,StringType,true), StructField(partitionId,StringType,true), StructField(preComb,LongType,true), StructField(name,StringType,true), StructField(versionId,StringType,true), StructField(intToLong,IntegerType,true))
scala> val data1 = Seq(Row("row_1", "part_0", 0L, "bob", "v_0", 0),
| Row("row_2", "part_0", 0L, "john", "v_0", 0),
| Row("row_3", "part_0", 0L, "tom", "v_0", 0))
data1: Seq[org.apache.spark.sql.Row] = List([row_1,part_0,0,bob,v_0,0], [row_2,part_0,0,john,v_0,0], [row_3,part_0,0,tom,v_0,0])
scala> var dfFromData1 = spark.createDataFrame(data1, schema)
scala> dfFromData1.write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY.key, "preComb").
| option(RECORDKEY_FIELD_OPT_KEY.key, "rowId").
| option(PARTITIONPATH_FIELD_OPT_KEY.key, "partitionId").
| option("hoodie.index.type","SIMPLE").
| option(TABLE_NAME.key, tableName).
| mode(Overwrite).
| save(basePath)
scala> var tripsSnapshotDF1 = spark.read.format("hudi").load(basePath + "/*/*")
tripsSnapshotDF1: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 9 more fields]
scala> tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
scala> spark.sql("desc hudi_trips_snapshot").show()
+--------------------+---------+-------+
| col_name|data_type|comment|
+--------------------+---------+-------+
| _hoodie_commit_time| string| null|
|_hoodie_commit_seqno| string| null|
| _hoodie_record_key| string| null|
|_hoodie_partition...| string| null|
| _hoodie_file_name| string| null|
| rowId| string| null|
| partitionId| string| null|
| preComb| bigint| null|
| name| string| null|
| versionId| string| null|
| intToLong| int| null|
+--------------------+---------+-------+
scala> spark.sql("select rowId, partitionId, preComb, name, versionId, intToLong from hudi_trips_snapshot").show()
+-----+-----------+-------+----+---------+---------+
|rowId|partitionId|preComb|name|versionId|intToLong|
+-----+-----------+-------+----+---------+---------+
|row_3| part_0| 0| tom| v_0| 0|
|row_2| part_0| 0|john| v_0| 0|
|row_1| part_0| 0| bob| v_0| 0|
+-----+-----------+-------+----+---------+---------+
// In the new schema, we are going to add a String field and
// change the datatype `intToLong` field from int to long.
scala> val newSchema = StructType( Array(
| StructField("rowId", StringType,true),
| StructField("partitionId", StringType,true),
| StructField("preComb", LongType,true),
| StructField("name", StringType,true),
| StructField("versionId", StringType,true),
| StructField("intToLong", LongType,true),
| StructField("newField", StringType,true)
| ))
newSchema: org.apache.spark.sql.types.StructType = StructType(StructField(rowId,StringType,true), StructField(partitionId,StringType,true), StructField(preComb,LongType,true), StructField(name,StringType,true), StructField(versionId,StringType,true), StructField(intToLong,LongType,true), StructField(newField,StringType,true))
scala> val data2 = Seq(Row("row_2", "part_0", 5L, "john", "v_3", 3L, "newField_1"),
| Row("row_5", "part_0", 5L, "maroon", "v_2", 2L, "newField_1"),
| Row("row_9", "part_0", 5L, "michael", "v_2", 2L, "newField_1"))
data2: Seq[org.apache.spark.sql.Row] = List([row_2,part_0,5,john,v_3,3,newField_1], [row_5,part_0,5,maroon,v_2,2,newField_1], [row_9,part_0,5,michael,v_2,2,newField_1])
scala> var dfFromData2 = spark.createDataFrame(data2, newSchema)
scala> dfFromData2.write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY.key, "preComb").
| option(RECORDKEY_FIELD_OPT_KEY.key, "rowId").
| option(PARTITIONPATH_FIELD_OPT_KEY.key, "partitionId").
| option("hoodie.index.type","SIMPLE").
| option(TABLE_NAME.key, tableName).
| mode(Append).
| save(basePath)
scala> var tripsSnapshotDF2 = spark.read.format("hudi").load(basePath + "/*/*")
tripsSnapshotDF2: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 10 more fields]
scala> tripsSnapshotDF2.createOrReplaceTempView("hudi_trips_snapshot")
scala> spark.sql("desc hudi_trips_snapshot").show()
+--------------------+---------+-------+
| col_name|data_type|comment|
+--------------------+---------+-------+
| _hoodie_commit_time| string| null|
|_hoodie_commit_seqno| string| null|
| _hoodie_record_key| string| null|
|_hoodie_partition...| string| null|
| _hoodie_file_name| string| null|
| rowId| string| null|
| partitionId| string| null|
| preComb| bigint| null|
| name| string| null|
| versionId| string| null|
| intToLong| bigint| null|
| newField| string| null|
+--------------------+---------+-------+
scala> spark.sql("select rowId, partitionId, preComb, name, versionId, intToLong, newField from hudi_trips_snapshot").show()
+-----+-----------+-------+-------+---------+---------+----------+
|rowId|partitionId|preComb| name|versionId|intToLong| newField|
+-----+-----------+-------+-------+---------+---------+----------+
|row_3| part_0| 0| tom| v_0| 0| null|
|row_2| part_0| 5| john| v_3| 3|newField_1|
|row_1| part_0| 0| bob| v_0| 0| null|
|row_5| part_0| 5| maroon| v_2| 2|newField_1|
|row_9| part_0| 5|michael| v_2| 2|newField_1|
+-----+-----------+-------+-------+---------+---------+----------+
引用鏈接
[1] Avro schema resolution: http://avro.apache.org/docs/current/spec#Schema+Resolution
推薦閱讀
Apache Hudi數(shù)據(jù)跳過技術(shù)加速查詢高達50倍
基于Apache Hudi拉鏈表的全量表極限存儲優(yōu)化方案
騰訊廣告業(yè)務(wù)基于Apache Flink + Hudi的批流一體實踐
