Hudi 實踐 | 在 AWS Glue 中使用 Apache Hudi
1. Glue與Hudi簡介
?AWS Glue
AWS Glue是Amazon Web Services(AWS)云平臺推出的一款無服務器(Serverless)的大數(shù)據(jù)分析服務。對于不了解該產(chǎn)品的讀者來說,可以用一句話概括其實質:Glue是一個無服務器的全托管的Spark運行環(huán)境,只需提供Spark程序代碼即可運行Spark作業(yè),無需維護集群。
?Apache Hudi
Apache Hudi最早由Uber設計開發(fā),后提交給Apache孵化器,2020年5月,Hudi正式升級為Apache的頂級項目。Hudi是一個數(shù)據(jù)湖平臺,支持增量數(shù)據(jù)處理,其提供的更新插入和增量查詢兩大操作原語很好地彌補了傳統(tǒng)大數(shù)據(jù)處理引擎(如Spark、Hive等)在這方面的缺失,因而受到廣泛關注并開始流行。此外,Hudi在設計理念上非常注意與現(xiàn)有大數(shù)據(jù)生態(tài)的融合,它能以相對透明和非侵入的方式融入到Spark、Flink計算框架中,并且支持了流式讀寫,有望成為未來數(shù)據(jù)湖的統(tǒng)一存儲層(同時支持批流讀寫)。
2. 集成的可行性分析
鑒于Hudi的日益流行,很多正在使用Glue或者為搭建無服務器數(shù)據(jù)湖進行技術選型的團隊對Glue與Hudi的集成非常關心,如果兩者可以成功地集成在一起,團隊就可以建設出支持增量數(shù)據(jù)處理的無服務器架構的新一代數(shù)據(jù)湖。
但是,AWS Glue的產(chǎn)品團隊從未就支持Hudi給出過官方保證,雖然從“Glue內(nèi)核是Spark”這一事實進行推斷,理論上Glue是可以與Hudi集成的,但由于Glue沒有使用Hive的Metastore,而是依賴自己的元數(shù)據(jù)存儲服務Glue Catalog,這會讓Glue在同步Hudi元數(shù)據(jù)時遇到不小的麻煩。
本文將在代碼驗證的基礎之上,詳細介紹如何在Glue里使用Hudi,對集成過程中發(fā)現(xiàn)的各種問題和錯誤給出解釋和應對方案。我們希望通過本文的介紹,給讀者在數(shù)據(jù)湖建設的技術選型上提供新的靈感和方向。無論如何,一個支持增量數(shù)據(jù)處理的無服務器架構的數(shù)據(jù)湖是非常吸引人的!
注:本文討論和編寫的程序代碼基于的都是Glue 2.0(基于Spark 2.4.3)和Hudi 0.8.0,兩者均為當前(2021年4月)各自的最新版本。
3. 在Glue作業(yè)中使用Hudi
現(xiàn)在,我們來演示如何在Glue中創(chuàng)建并運行一個基于Hudi的作業(yè)。我們假定讀者具有一定的Glue使用經(jīng)驗,因此不對Glue的基本操作進行解釋。
3.1. 資源列表
在開始之前,我們把本文使用的各類資源匯總如下,便于讀者統(tǒng)一下載。
3.1.1. 示例程序
為配合本文的講解,我們專門編寫了一個示例程序并存放在Github上,詳情如下:
| 項目名稱 | Repository地址 |
| glue-hudi-integration-example | https://github.com/bluishglc/glue-hudi-integration-example |
3.1.2. 依賴JAR包
運行程序需要使用到Hudi和Spark的兩個Jar包,由于包文件較大,無法存放在Github的Repository里,建議大家從Maven的中心庫下載,以下是鏈接信息:
| Jar包 | 下載鏈接 |
| hudi-spark-bundle_2.11-0.8.0.jar | https://search.maven.org/remotecontent?filepath=org/apache/hudi/hudi-spark-bundle_2.11/0.8.0/hudi-spark-bundle_2.11-0.8.0.jar |
| spark-avro_2.11-2.4.3.jar | https://search.maven.org/remotecontent?filepath=org/apache/spark/spark-avro_2.11/2.4.3/spark-avro_2.11-2.4.3.jar |
3.2. 創(chuàng)建基于Hudi的Glue作業(yè)
根據(jù)Hudi官方給出的集成原生Spark的方式(鏈接:https://hudi.apache.org/docs/quick-start-guide.html#setup-spark-shell):
spark-shell \--packages org.apache.hudi:hudi-spark-bundle_2.11:0.8.0,org.apache.spark:spark-avro_2.11:2.4.3 \--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
可知,將Hudi加載到Spark運行環(huán)境中需要完成兩個關鍵動作:
1.在Spark運行環(huán)境引入Hudi的Jar包: hudi-spark-bundle_2.11-0.8.0.jar和spark-avro_2.11-2.4.3.jar2.在Spark中配置Hudi需要的Kyro序列化器:spark.serializer=org.apache.spark.serializer.KryoSerializer
由此,不難推理出Glue集成Hudi的方法,即以Glue的方式實現(xiàn)上述兩個操作。下面我們進入實操環(huán)節(jié)。
3.2.1. 創(chuàng)建桶并上傳程序和依賴包
首先,在S3上創(chuàng)建一個供本示例使用的桶,取名glue-hudi-integration-example。要注意的是:為避免桶名沖突,你應該定義并使用自己的桶,并在后續(xù)操作中將所有出現(xiàn)glue-hudi-integration-example的配置替換為自己的桶名。然后,從Github檢出專門為本文編寫的Glue讀寫Hudi的示例程序(地址參考3.1.1節(jié)),將項目中的GlueHudiReadWriteExample.scala文件上傳到新建的桶里。同時,下載hudi-spark-bundle_2.11-0.8.0.jar和spark-avro_2.11-2.4.3.jar兩個Jar包(地址參考3.1.2節(jié)),并同樣上傳到新建的桶里。操作完成后,S3上的glue-hudi-integration-example桶應該包含內(nèi)容:

3.2.2. 添加作業(yè)
接下來,進入Glue控制臺,添加一個作業(yè),在“添加作業(yè)”向導中進行如下配置:
?在“配置作業(yè)屬性”環(huán)節(jié),向“名稱”輸入框中填入作業(yè)名稱:glue-hudi-integration-example;?在“IAM角色”下拉列表中選擇一個IAM角色,要注意的是這個角色必須要有讀寫glue-hudi-integration-example桶和訪問Glue服務的權限,如果沒有現(xiàn)成的合適角色,需要去IAM控制臺創(chuàng)建一個,本處不再贅述;?“Glue version”這一項選“Spark 2.4, Scala 2 with improved job startup times (Glue Version 2.0)”;?“此作業(yè)運行”處選“您提供的現(xiàn)成腳本”;?“Scala類名”和“存儲腳本所在的S3路徑”兩別填入com.github.GlueHudiReadWriteExample和s3://glue-hudi-integration-example/GlueHudiReadWriteExample.scala;
如下圖所示:

然后向下滾動進入到“安全配置、腳本庫和作業(yè)參數(shù)(可選)”環(huán)節(jié),在“從屬JAR路徑”的輸入框中將前面上傳到桶里的兩個依賴Jar包的S3路徑(記住,中間要使用逗號分隔):
s3://glue-hudi-integration-example/hudi-spark-bundle_2.11-0.8.0.jar,s3://glue-hudi-integration-example/spark-avro_2.11-2.4.3.jar粘貼進去。如下圖所示:

這里是前文提及的集成Hudi的兩個關鍵性操作中的第一個:將Hudi的Jar包引入到Glue的類路徑中。這與在spark-shell命令行中配置package參數(shù)效果是等價的:
--packages org.apache.hudi:hudi-spark-bundle_2.11:0.8.0,org.apache.spark:spark-avro_2.11:2.4.3再接下來,在“作業(yè)參數(shù)”環(huán)節(jié),添加一個作業(yè)參數(shù):
| 鍵名 | 取值 |
| --bucketName | glue-hudi-integration-example |
如下圖所示:

我們需要把S3桶的名稱以“作業(yè)參數(shù)”的形式傳給示例程序,以便其可以拼接出Hudi數(shù)據(jù)集的完整路徑,這個值會在讀寫Hudi數(shù)據(jù)集時使用,因為Hudi數(shù)據(jù)集會被寫到這個桶里。
最后,在“目錄選項”中勾選Use Glue data catalog as the Hive metastore,啟用Glue Catalog:

全部操作完成后,點擊“下一步”,再點擊“保存并編輯腳本”就會進入到腳本編輯頁面,頁面將會展示上傳的GlueHudiReadWriteExample.scala這個類的源代碼。
3.3. 在Glue作業(yè)中讀寫Hudi數(shù)據(jù)集
接下來,我們從編程角度看一下如何在Glue中使用Hudi,具體就是以GlueHudiReadWriteExample.scala這個類的實現(xiàn)為主軸,介紹幾個重要的技術細節(jié)。
首先,需要我們得先了解一下GlueHudiReadWriteExample.scala這個類的主線邏輯,即main方法中的操作:
def main(sysArgs: Array[String]): Unit = {init(sysArgs)val sparkImplicits = spark.implicitsimport sparkImplicits._// Step 1: build a dataframe with 2 user records, then write as// hudi format, but won't create table in glue catalogval users1 = Seq(User(1, "Tom", 24, System.currentTimeMillis()),User(2, "Bill", 32, System.currentTimeMillis()))val dataframe1 = users1.toDFsaveUserAsHudiWithoutHiveTableSync(dataframe1)// Step 2: read just saved hudi dataset, and print each recordsval dataframe2 = readUserFromHudi()val users2 = dataframe2.as[User].collect().toSeqprintln("printing user records in dataframe2...")users2.foreach(println(_))// Step 3: append 2 new user records, one is updating Bill's age from 32 to 33,// the other is a new user whose name is 'Rose'. This time, we will enable// hudi hive syncing function, and a table named `user` will be created on// default database, this action is done by hudi automatically based on// the metadata of hudi user dataset.val users3 = users2 ++ Seq(User(2, "Bill", 33, System.currentTimeMillis()),User(3, "Rose", 45, System.currentTimeMillis()))val dataframe3 = users3.toDFsaveUserAsHudiWithHiveTableSync(dataframe3)// Step 4: since a table is created automatically, now, we can query user table// immediately, and print returned user records, printed messages should show:// Bill's is updated, Rose's record is inserted, this demoed UPSERT feature of hudi!val dataframe4 = spark.sql("select * from user")val users4 = dataframe4.as[User].collect().toSeqprintln("printing user records in dataframe4...")users4.foreach(println(_))commit()}
作為一份示例性質的代碼,main方法的邏輯是“為了演示”而設計的,一共分成了四步操作:
?第一步,構建一個包含兩條User數(shù)據(jù)的Dataframe,取名dataframe1,然后將其以Hudi格式保存到S3上,但并不會同步元數(shù)據(jù)(也就是不會自動建表);?第二步,以Hudi格式讀取剛剛保存的數(shù)據(jù)集,得到本例的第二個Dataframe:dataframe2,此時它應該包含前面創(chuàng)建的兩條User數(shù)據(jù);?第三步,在dataframe2的基礎上再追加兩條User數(shù)據(jù),一條是針對現(xiàn)有數(shù)據(jù)Bill用戶的更新數(shù)據(jù),另一條Rose用戶的是新增數(shù)據(jù),進而得到第三個dataframe3,然后將其再次以Hudi格式寫回去,但是與上次不同的是,這一次程序將使用Hudi的元數(shù)據(jù)同步功能,將User數(shù)據(jù)集的元數(shù)據(jù)同步到Glue Catalog,一張名為user的表將會被自動創(chuàng)建出來;?第四步,為了驗證元數(shù)據(jù)是否同步成功,以及更新和插入的數(shù)據(jù)是否正確地處理,這次改用SQL查詢user表,得到第四個Dataframe:dataframe4,其不但應該包含數(shù)據(jù),且更新和插入數(shù)據(jù)都必須是正確的。
main在開始時調(diào)用了一個init函數(shù),該函數(shù)會完成一些必要初始化工作,如:解析并獲取作業(yè)參數(shù),創(chuàng)建GlueContext和SparkSession實例等。其中有一處代碼需要特別說明,即類文件的第90-92行,也就是下面代碼中的第10-12行:
/*** 1. Parse job params* 2. Create SparkSession instance with given configs* 3. Init glue job** @param sysArgs all params passing from main method*/def init(sysArgs: Array[String]): Unit = {...val conf = new SparkConf()// This is required for hudiconf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")...}
該處代碼正是前文提及的集成Hudi的第二個關鍵性操作:在Spark中配置Hudi需要的Kyro序列化器:spark.serializer=org.apache.spark.serializer.KryoSerializer。如果沒有配置該項,程序將會報出如下錯誤:
org.apache.hudi.exception.HoodieException : hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer下面,我們要把關注重點放在Glue是如何讀寫Hudi數(shù)據(jù)集的,也就是readUserFromHudi和saveUserAsHudiWithoutHiveTableSync兩個方法的實現(xiàn)。首先看一下較為簡單的讀取操作:
/*** Read user records from Hudi, and return a dataframe.** @return The dataframe of user records*/def readUserFromHudi(): DataFrame = {spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL).load(userTablePath)}
因為代碼中設置了
option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)所以該方法使用的是Hudi最簡單也是最常用的一種讀取方式:快照讀取,即:讀取當前數(shù)據(jù)集最新狀態(tài)的快照。關于讀取Hudi數(shù)據(jù)集的更多內(nèi)容,請參考Hudi官方文檔:https://hudi.apache.org/docs/querying_data.html 。接下來是寫操作:
/*** Save a user dataframe as hudi dataset, but WON'T SYNC its metadata to glue catalog,* In other words, no table will be created after saving.** @param dataframe The dataframe to be saved*/def saveUserAsHudiWithoutHiveTableSync(dataframe: DataFrame) = {val hudiOptions = Map[String, String](HoodieWriteConfig.TABLE_NAME -> userTableName,DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> userRecordKeyField,DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> userPrecombineField,DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[NonpartitionedKeyGenerator].getName)dataframe.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(userTablePath)}
寫操作中大部分的代碼都是在對Hudi進行一些必要的配置,這些配置包括:
?指定表名;?指定寫操作的類型:是UPSERT,INSERT還是DELETE等;?指定Hudi在比對新舊數(shù)據(jù)時要使用的兩個關鍵字段的名稱:RECORDKEY_FIELD_OPT_KEY和PRECOMBINE_FIELD_OPT_KEY;?指定為記錄生成key的策略(一個Class)
這些都是Hudi的基本配置,本文不再一一解釋,請讀者參考Hudi的官方文檔:https://hudi.apache.org/docs/writing_data.html
3.4. 將Hudi元數(shù)據(jù)同步到Glue Catalog
上述讀寫操作并沒有同步元數(shù)據(jù),在實際應用中,大多數(shù)情況下,開發(fā)者會開啟Hudi的Hive Sync功能,讓Hudi將其元數(shù)據(jù)映射到Hive Metastore中,自動創(chuàng)建Hive表,這是一個很有用的操作。不過,對于Glue來說,這個問題就比較棘手了,基于筆者的使用經(jīng)歷,早期遇到的大部分問題都出在了同步元數(shù)據(jù)上,究其原因,主要是因為Glue使用了自己的元數(shù)據(jù)服務Glue Catalog,而Hudi的元數(shù)據(jù)同步是面向Hive Metastore的。那這是否意味著Hudi就不能把元數(shù)據(jù)同步到Glue上呢?幸運的是,在經(jīng)過各種嘗試和摸索之后,我們還是順利地完成了這項工作,這為Hudi在Glue上的應用鋪平了道路。
在介紹具體操作之前,我們先了解一下Hudi同步元數(shù)據(jù)到Hive的基本操作。根據(jù)官方文檔: https://hudi.apache.org/docs/configurations.html#hive-sync-options給出的說明,標準的Hudi Hive Sync配置應該是這樣的:
首先是最基本的三項:
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true"DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY -> "your-target-database"DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "your-target-table"
這三項很容易理解,就是告訴Hudi要開啟Hive Sync,同時指定同步到Hive的什么庫里的什么表。如果你要同步的是一張分區(qū)表,還需要追加以下幾項:
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[ComplexKeyGenerator].getNameDataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getNameDataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true"DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "your-partition-path-field"DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "your-hive-partition-field"
這些配置項主要在告訴Hudi數(shù)據(jù)集的分區(qū)信息,以便Hudi能正確地將分區(qū)相關的元數(shù)據(jù)也同步到Hive Metastore中。現(xiàn)在,我們看一下在Glue中要怎樣實現(xiàn)元數(shù)據(jù)同步,也就是示例代碼中的saveUserAsHudiWithHiveTableSync方法:
/*** Save a user dataframe as hudi dataset, but also SYNC its metadata to glue catalog,* In other words, after saving, a table named `default.user` will be created automatically by hudi hive sync* tool on Glue Catalog!** @param dataframe The dataframe to be saved*/def saveUserAsHudiWithHiveTableSync(dataframe: DataFrame) = {val hudiOptions = Map[String, String](HoodieWriteConfig.TABLE_NAME -> userTableName,DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> userRecordKeyField,DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> userPrecombineField,DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[NonpartitionedKeyGenerator].getName,DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[NonPartitionedExtractor].getName,// Register hudi dataset as hive table (sync meta data)DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",DataSourceWriteOptions.HIVE_USE_JDBC_OPT_KEY -> "false", // For glue, it is required to disable sync via hive jdbc!DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY -> "default",DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> userTableName)dataframe.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(userTablePath)}
該方法的實現(xiàn)在saveUserAsHudiWithoutHiveTableSync的基礎之上,追加了四個與同步元數(shù)據(jù)相關的配置項,基中三項是前面提到的必填項,唯獨:
DataSourceWriteOptions.HIVE_USE_JDBC_OPT_KEY -> "false"是前面沒有提到的,而這一項配置是在Glue下同步元數(shù)據(jù)至關重要的。如果不進行此項配置,我們一定會遇到這樣一個錯誤:
Cannot create hive connection jdbc:hive2://localhost:10000/這是因為:Hudi的Hive Sync默認是通過JDBC連接HiveServer2執(zhí)行建表操作的,而jdbc:hive2://localhost:10000/是Hudi配置的默認Hive JDBC連接字符串(這個字符串當然是可修改的,對應配置項為hive_sync.jdbc_url)。由于在Glue里沒有Hive Metastore和HiverServer2,所以報錯是必然的。
那為什么在禁用JDBC方式連接Hive Metastore之后,就可以同步了呢?通過查看Hudi的源代碼可知,當HIVE_USE_JDBC_OPT_KEY被置為false時,Hudi會轉而使用一個專職的IMetaStoreClient去與對應的Metastore進行交互。在Hudi同步元數(shù)據(jù)的主要實現(xiàn)類org.apache.hudi.hive.HoodieHiveClient中,維護著一個私有成員變量private IMetaStoreClient client,Hudi就是使用這個Client去和Metastore交互的,在HoodieHiveClient中有多處代碼都是先判斷是否開啟了JDBC,如果是true,則通過JDBC做交互,如果是false,就使用Client,例如org.apache.hudi.hive.HoodieHiveClient#getTableSchema方法就是依此邏輯實現(xiàn)的:
public class HoodieHiveClient extends AbstractSyncHoodieClient {...private IMetaStoreClient client;...public Map<String, String> getTableSchema(String tableName) {if (syncConfig.useJdbc) {...} else {return getTableSchemaUsingMetastoreClient(tableName);}}...}
而在Glue這一側,由于其使用了自己的Metastore:Glue Catalog,為了和上層Hive相關的基礎設施進行兼容,Glue提供了一個自己的IMetaStoreClient實現(xiàn)用于與Glue Catalog交互,這個實現(xiàn)就是com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient(參考:https://github.com/awslabs/aws-glue-data-catalog-client-for-apache-hive-metastore/blob/master/aws-glue-datacatalog-hive2-client/src/main/java/com/amazonaws/glue/catalog/metastore/AWSCatalogMetastoreClient.java):
public class AWSCatalogMetastoreClient implements IMetaStoreClient {...}
該類實現(xiàn)了IMetaStoreClient接口。所以只要使用的是AWSCatalogMetastoreClient這個客戶端,就能用Hive Metastore的交互方式和Glue Catalog進行交互(這得感謝Hive設計了IMetaStoreClient這個接口,而不是給出一個實現(xiàn)類)。在Spark中,有spark.hadoop.hive.metastore.client.factory.class這樣一項配置,顧名思義,這一配置就是告訴Spark使用哪一個工廠類來生產(chǎn)Hive Metastore的Client了,所以你應該大概率猜到了,在Glue里,這個配置應該是被修改了,配置的應該是某個Glue自己實現(xiàn)的工廠類,用于專門生產(chǎn)AWSCatalogMetastoreClient。是的,的確如此,在Glue里這一項是這樣配置的:
spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory從Github AwsLab釋出的Glue Catalog的部分源碼中,可以找到這個類的實現(xiàn)(地址:https://github.com/awslabs/aws-glue-data-catalog-client-for-apache-hive-metastore/blob/master/aws-glue-datacatalog-spark-client/src/main/java/com/amazonaws/glue/catalog/metastore/AWSGlueDataCatalogHiveClientFactory.java):
public class AWSGlueDataCatalogHiveClientFactory implements HiveMetaStoreClientFactory {@Overridepublic IMetaStoreClient createMetaStoreClient(HiveConf conf,HiveMetaHookLoader hookLoader) throws MetaException {AWSCatalogMetastoreClient client = new AWSCatalogMetastoreClient(conf, hookLoader);return client;}}
和我們的猜測完全一致。所以,梳理下來整件事情是這樣的:當禁用Hive JDBC之后,Hudi會轉而使用一個客戶端(即某個IMetaStoreClient接口的實現(xiàn)類)與Metastore進行交互,而在Glue環(huán)境里,Glue提供了一個遵循IMetaStoreClient接口規(guī)范,但卻是與Glue Catalog 進行交互的客戶端類AWSCatalogMetastoreClient。這樣,Hudi就能通過這個客戶端與Glue Catalog進行透明交互了!
最后,讓我們來運行一下這個作業(yè),看一看輸出的日志和同步出的數(shù)據(jù)表。回到Glue控制臺,在前面停留的“腳本編輯”頁面上,點擊“運行作業(yè)”按鈕,即可執(zhí)行作業(yè)了。在作業(yè)運行結束后,可以在“日志”Tab頁看到程序打印的信息,如下圖所示:

其中dataframe4的數(shù)據(jù)很好地體現(xiàn)了Hudi的UPSERT能力,程序按照我們期望的邏輯執(zhí)行出了結果:Bill的年齡從32更新為了33,新增的Rose用戶也出現(xiàn)在了結果集中。于此同時,在Glue控制臺的Catalog頁面上,也會看到同步出來的user表:

以及列信息:

它的輸入/輸出格式以及5個_hoodie開頭的列名清楚地表明這是一張基于Hudi元數(shù)據(jù)映射出來的表。
4. 常見錯誤
1. hoodie only support KryoSerializer as spark.serializer
該問題在3.2節(jié)已經(jīng)提及,是由于沒有配置spark.serializer=org.apache.spark.serializer.KryoSerializer所致,請參考前文。
2. Cannot create hive connection jdbc:hive2://localhost:10000/
該問題在3.3節(jié)已經(jīng)提及,須在Hudi中禁用Hive JDBC,請參考前文。
3. Got runtime exception when hive syncing ...
這是一個非常棘手的問題,筆者曾在這個問題上耽誤了不少時間,并研究了Hudi同步元數(shù)據(jù)的大部分代碼,坦率地說,目前它的觸發(fā)機制還不是非常確定,主要原因是在Glue這種無服務器環(huán)境下不方便進行遠程DEBUG,只能通過日志進行分析。一個大概率的懷疑方向是:在整個SparkSession的上下文中,由于某一次Hudi的讀寫操作沒能正確地關閉并釋放IMetaStoreClient實例,導致后面需要再使用該Client同步元數(shù)據(jù)時,其已經(jīng)不可用。不過,相比尚不確定的起因,其解決方案是非常清晰和確定的,即:在出錯的位置前追加一行代碼:
Hive.closeCurrent()這一操作非常有效,它主動銷毀了綁定在當前線程上的org.apache.hadoop.hive.ql.metadata.Hive實例,該類的實例是存放在一個ThreadLocal變量里的,而它本身又會包含一個IMetaStoreClient實例,所以Hive實例中的Metastore客戶端也是一個線程只維護一個實例。而上述代碼顯式地關閉并釋放了當前的Client(即主動關閉并釋放已經(jīng)無法再使用的Client實例),這會促使Hudi在下一次同步元數(shù)據(jù)時重建新的Client實例。
關于這一問題更深入的分析和研究,可參考筆者的另一篇文章《AWS Glue集成Apache Hudi同步元數(shù)據(jù)深度歷險(各類錯誤的填坑方案)》
4. Failed to check if database exists ...
該問題與上一個問題是一樣的,只是處在異常堆棧的不同位置上,解決辦法同上。
5. 結語
雖然本文篇幅較長,但是從GlueHudiReadWriteExample.scala這個類的實現(xiàn)上不難看出,只要一次性做好幾處關鍵配置,在Glue中使用Hudi其實與在Spark原生環(huán)境中使用Hudi幾乎是無異的,這意味著兩者可以平滑地集成并各自持續(xù)升級。如此一來,Glue + Hudi的技術選型將非常具有競爭力,前者是一個無服務器架構的Spark計算環(huán)境,主打零運維和極致的成本控制,后者則為新一代數(shù)據(jù)湖提供更新插入、增量查詢和并發(fā)控制等功能性支持,兩者的成功結合是一件令人激動的事情,我想再次引用文章開始時使用的一句話作為結尾:無論如何,一個支持增量數(shù)據(jù)處理的無服務器架構的數(shù)據(jù)湖是非常吸引人的!
關于作者:耿立超,架構師,15年IT系統(tǒng)開發(fā)和架構經(jīng)驗,對大數(shù)據(jù)、企業(yè)級應用架構、SaaS、分布式存儲和領域驅動設計有豐富的實踐經(jīng)驗,熱衷函數(shù)式編程。對Hadoop/Spark 生態(tài)系統(tǒng)有深入和廣泛的了解,參與過Hadoop商業(yè)發(fā)行版的開發(fā),曾帶領團隊建設過數(shù)個完備的企業(yè)數(shù)據(jù)平臺,個人技術博客:https://laurence.blog.csdn.net/ 作者著有《大數(shù)據(jù)平臺架構與原型實現(xiàn):數(shù)據(jù)中臺建設實戰(zhàn)》一書,該書已在京東和當當上線。
