<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Hudi 實踐 | 在 AWS Glue 中使用 Apache Hudi

          共 16472字,需瀏覽 33分鐘

           ·

          2021-04-22 12:41

          1. Glue與Hudi簡介

          ?AWS Glue

          AWS Glue是Amazon Web Services(AWS)云平臺推出的一款無服務器(Serverless)的大數(shù)據(jù)分析服務。對于不了解該產(chǎn)品的讀者來說,可以用一句話概括其實質:Glue是一個無服務器的全托管的Spark運行環(huán)境,只需提供Spark程序代碼即可運行Spark作業(yè),無需維護集群。

          ?Apache Hudi

          Apache Hudi最早由Uber設計開發(fā),后提交給Apache孵化器,2020年5月,Hudi正式升級為Apache的頂級項目。Hudi是一個數(shù)據(jù)湖平臺,支持增量數(shù)據(jù)處理,其提供的更新插入增量查詢兩大操作原語很好地彌補了傳統(tǒng)大數(shù)據(jù)處理引擎(如Spark、Hive等)在這方面的缺失,因而受到廣泛關注并開始流行。此外,Hudi在設計理念上非常注意與現(xiàn)有大數(shù)據(jù)生態(tài)的融合,它能以相對透明和非侵入的方式融入到Spark、Flink計算框架中,并且支持了流式讀寫,有望成為未來數(shù)據(jù)湖的統(tǒng)一存儲層(同時支持批流讀寫)。

          2. 集成的可行性分析

          鑒于Hudi的日益流行,很多正在使用Glue或者為搭建無服務器數(shù)據(jù)湖進行技術選型的團隊對Glue與Hudi的集成非常關心,如果兩者可以成功地集成在一起,團隊就可以建設出支持增量數(shù)據(jù)處理的無服務器架構的新一代數(shù)據(jù)湖。

          但是,AWS Glue的產(chǎn)品團隊從未就支持Hudi給出過官方保證,雖然從“Glue內(nèi)核是Spark”這一事實進行推斷,理論上Glue是可以與Hudi集成的,但由于Glue沒有使用Hive的Metastore,而是依賴自己的元數(shù)據(jù)存儲服務Glue Catalog,這會讓Glue在同步Hudi元數(shù)據(jù)時遇到不小的麻煩。

          本文將在代碼驗證的基礎之上,詳細介紹如何在Glue里使用Hudi,對集成過程中發(fā)現(xiàn)的各種問題和錯誤給出解釋和應對方案。我們希望通過本文的介紹,給讀者在數(shù)據(jù)湖建設的技術選型上提供新的靈感和方向。無論如何,一個支持增量數(shù)據(jù)處理的無服務器架構的數(shù)據(jù)湖是非常吸引人的!

          注:本文討論和編寫的程序代碼基于的都是Glue 2.0(基于Spark 2.4.3)和Hudi 0.8.0,兩者均為當前(2021年4月)各自的最新版本。

          3. 在Glue作業(yè)中使用Hudi

          現(xiàn)在,我們來演示如何在Glue中創(chuàng)建并運行一個基于Hudi的作業(yè)。我們假定讀者具有一定的Glue使用經(jīng)驗,因此不對Glue的基本操作進行解釋。

          3.1. 資源列表

          在開始之前,我們把本文使用的各類資源匯總如下,便于讀者統(tǒng)一下載。

          3.1.1. 示例程序

          為配合本文的講解,我們專門編寫了一個示例程序并存放在Github上,詳情如下:

          項目名稱Repository地址
          glue-hudi-integration-examplehttps://github.com/bluishglc/glue-hudi-integration-example

          3.1.2. 依賴JAR包

          運行程序需要使用到Hudi和Spark的兩個Jar包,由于包文件較大,無法存放在Github的Repository里,建議大家從Maven的中心庫下載,以下是鏈接信息:

          Jar包下載鏈接
          hudi-spark-bundle_2.11-0.8.0.jarhttps://search.maven.org/remotecontent?filepath=org/apache/hudi/hudi-spark-bundle_2.11/0.8.0/hudi-spark-bundle_2.11-0.8.0.jar
          spark-avro_2.11-2.4.3.jarhttps://search.maven.org/remotecontent?filepath=org/apache/spark/spark-avro_2.11/2.4.3/spark-avro_2.11-2.4.3.jar

          3.2. 創(chuàng)建基于Hudi的Glue作業(yè)

          根據(jù)Hudi官方給出的集成原生Spark的方式(鏈接:https://hudi.apache.org/docs/quick-start-guide.html#setup-spark-shell):

          spark-shell \  --packages org.apache.hudi:hudi-spark-bundle_2.11:0.8.0,org.apache.spark:spark-avro_2.11:2.4.3 \  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

          可知,將Hudi加載到Spark運行環(huán)境中需要完成兩個關鍵動作:

          1.在Spark運行環(huán)境引入Hudi的Jar包: hudi-spark-bundle_2.11-0.8.0.jarspark-avro_2.11-2.4.3.jar2.在Spark中配置Hudi需要的Kyro序列化器:spark.serializer=org.apache.spark.serializer.KryoSerializer

          由此,不難推理出Glue集成Hudi的方法,即以Glue的方式實現(xiàn)上述兩個操作。下面我們進入實操環(huán)節(jié)。

          3.2.1. 創(chuàng)建桶并上傳程序和依賴包

          首先,在S3上創(chuàng)建一個供本示例使用的桶,取名glue-hudi-integration-example。要注意的是:為避免桶名沖突,你應該定義并使用自己的桶,并在后續(xù)操作中將所有出現(xiàn)glue-hudi-integration-example的配置替換為自己的桶名。然后,從Github檢出專門為本文編寫的Glue讀寫Hudi的示例程序(地址參考3.1.1節(jié)),將項目中的GlueHudiReadWriteExample.scala文件上傳到新建的桶里。同時,下載hudi-spark-bundle_2.11-0.8.0.jarspark-avro_2.11-2.4.3.jar兩個Jar包(地址參考3.1.2節(jié)),并同樣上傳到新建的桶里。操作完成后,S3上的glue-hudi-integration-example桶應該包含內(nèi)容:

          3.2.2. 添加作業(yè)

          接下來,進入Glue控制臺,添加一個作業(yè),在“添加作業(yè)”向導中進行如下配置:

          ?在“配置作業(yè)屬性”環(huán)節(jié),向“名稱”輸入框中填入作業(yè)名稱:glue-hudi-integration-example?在“IAM角色”下拉列表中選擇一個IAM角色,要注意的是這個角色必須要有讀寫glue-hudi-integration-example桶和訪問Glue服務的權限,如果沒有現(xiàn)成的合適角色,需要去IAM控制臺創(chuàng)建一個,本處不再贅述;?“Glue version”這一項選“Spark 2.4, Scala 2 with improved job startup times (Glue Version 2.0)”;?“此作業(yè)運行”處選“您提供的現(xiàn)成腳本”;?“Scala類名”和“存儲腳本所在的S3路徑”兩別填入com.github.GlueHudiReadWriteExamples3://glue-hudi-integration-example/GlueHudiReadWriteExample.scala

          如下圖所示:

          然后向下滾動進入到“安全配置、腳本庫和作業(yè)參數(shù)(可選)”環(huán)節(jié),在“從屬JAR路徑”的輸入框中將前面上傳到桶里的兩個依賴Jar包的S3路徑(記住,中間要使用逗號分隔):

          s3://glue-hudi-integration-example/hudi-spark-bundle_2.11-0.8.0.jar,s3://glue-hudi-integration-example/spark-avro_2.11-2.4.3.jar

          粘貼進去。如下圖所示:

          這里是前文提及的集成Hudi的兩個關鍵性操作中的第一個:將Hudi的Jar包引入到Glue的類路徑中。這與在spark-shell命令行中配置package參數(shù)效果是等價的:

          --packages org.apache.hudi:hudi-spark-bundle_2.11:0.8.0,org.apache.spark:spark-avro_2.11:2.4.3

          再接下來,在“作業(yè)參數(shù)”環(huán)節(jié),添加一個作業(yè)參數(shù):

          鍵名取值
          --bucketNameglue-hudi-integration-example

          如下圖所示:

          我們需要把S3桶的名稱以“作業(yè)參數(shù)”的形式傳給示例程序,以便其可以拼接出Hudi數(shù)據(jù)集的完整路徑,這個值會在讀寫Hudi數(shù)據(jù)集時使用,因為Hudi數(shù)據(jù)集會被寫到這個桶里。

          最后,在“目錄選項”中勾選Use Glue data catalog as the Hive metastore,啟用Glue Catalog:

          全部操作完成后,點擊“下一步”,再點擊“保存并編輯腳本”就會進入到腳本編輯頁面,頁面將會展示上傳的GlueHudiReadWriteExample.scala這個類的源代碼。

          3.3. 在Glue作業(yè)中讀寫Hudi數(shù)據(jù)集

          接下來,我們從編程角度看一下如何在Glue中使用Hudi,具體就是以GlueHudiReadWriteExample.scala這個類的實現(xiàn)為主軸,介紹幾個重要的技術細節(jié)。

          首先,需要我們得先了解一下GlueHudiReadWriteExample.scala這個類的主線邏輯,即main方法中的操作:

          def main(sysArgs: Array[String]): Unit = {  init(sysArgs)  val sparkImplicits = spark.implicits  import sparkImplicits._  // Step 1: build a dataframe with 2 user records, then write as  // hudi format, but won't create table in glue catalog  val users1 = Seq(    User(1, "Tom", 24, System.currentTimeMillis()),    User(2, "Bill", 32, System.currentTimeMillis())  )  val dataframe1 = users1.toDF  saveUserAsHudiWithoutHiveTableSync(dataframe1)  // Step 2: read just saved hudi dataset, and print each records  val dataframe2 = readUserFromHudi()  val users2 = dataframe2.as[User].collect().toSeq  println("printing user records in dataframe2...")  users2.foreach(println(_))  // Step 3: append 2 new user records, one is updating Bill's age from 32 to 33,  // the other is a new user whose name is 'Rose'. This time, we will enable  // hudi hive syncing function, and a table named `user` will be created on  // default database, this action is done by hudi automatically based on  // the metadata of hudi user dataset.  val users3 = users2 ++ Seq(    User(2, "Bill", 33, System.currentTimeMillis()),    User(3, "Rose", 45, System.currentTimeMillis())  )  val dataframe3 = users3.toDF  saveUserAsHudiWithHiveTableSync(dataframe3)  // Step 4: since a table is created automatically, now, we can query user table  // immediately, and print returned user records, printed messages should show:  // Bill's is updated, Rose's record is inserted, this demoed UPSERT feature of hudi!  val dataframe4 = spark.sql("select * from user")  val users4 = dataframe4.as[User].collect().toSeq  println("printing user records in dataframe4...")  users4.foreach(println(_))  commit()}

          作為一份示例性質的代碼,main方法的邏輯是“為了演示”而設計的,一共分成了四步操作:

          ?第一步,構建一個包含兩條User數(shù)據(jù)的Dataframe,取名dataframe1,然后將其以Hudi格式保存到S3上,但并不會同步元數(shù)據(jù)(也就是不會自動建表);?第二步,以Hudi格式讀取剛剛保存的數(shù)據(jù)集,得到本例的第二個Dataframe:dataframe2,此時它應該包含前面創(chuàng)建的兩條User數(shù)據(jù);?第三步,在dataframe2的基礎上再追加兩條User數(shù)據(jù),一條是針對現(xiàn)有數(shù)據(jù)Bill用戶的更新數(shù)據(jù),另一條Rose用戶的是新增數(shù)據(jù),進而得到第三個dataframe3,然后將其再次以Hudi格式寫回去,但是與上次不同的是,這一次程序將使用Hudi的元數(shù)據(jù)同步功能,將User數(shù)據(jù)集的元數(shù)據(jù)同步到Glue Catalog,一張名為user的表將會被自動創(chuàng)建出來;?第四步,為了驗證元數(shù)據(jù)是否同步成功,以及更新和插入的數(shù)據(jù)是否正確地處理,這次改用SQL查詢user表,得到第四個Dataframe:dataframe4,其不但應該包含數(shù)據(jù),且更新和插入數(shù)據(jù)都必須是正確的。

          main在開始時調(diào)用了一個init函數(shù),該函數(shù)會完成一些必要初始化工作,如:解析并獲取作業(yè)參數(shù),創(chuàng)建GlueContextSparkSession實例等。其中有一處代碼需要特別說明,即類文件的第90-92行,也就是下面代碼中的第10-12行:

          /** * 1. Parse job params * 2. Create SparkSession instance with given configs * 3. Init glue job * * @param sysArgs all params passing from main method */def init(sysArgs: Array[String]): Unit = {  ...  val conf = new SparkConf()  // This is required for hudi  conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")  ...}

          該處代碼正是前文提及的集成Hudi的第二個關鍵性操作:在Spark中配置Hudi需要的Kyro序列化器:spark.serializer=org.apache.spark.serializer.KryoSerializer。如果沒有配置該項,程序將會報出如下錯誤:

          org.apache.hudi.exception.HoodieException : hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer

          下面,我們要把關注重點放在Glue是如何讀寫Hudi數(shù)據(jù)集的,也就是readUserFromHudisaveUserAsHudiWithoutHiveTableSync兩個方法的實現(xiàn)。首先看一下較為簡單的讀取操作:

          /** * Read user records from Hudi, and return a dataframe. * * @return The dataframe of user records */def readUserFromHudi(): DataFrame = {  spark    .read    .format("hudi")    .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)    .load(userTablePath)}

          因為代碼中設置了

          option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)

          所以該方法使用的是Hudi最簡單也是最常用的一種讀取方式:快照讀取,即:讀取當前數(shù)據(jù)集最新狀態(tài)的快照。關于讀取Hudi數(shù)據(jù)集的更多內(nèi)容,請參考Hudi官方文檔:https://hudi.apache.org/docs/querying_data.html 。接下來是寫操作:

          /** * Save a user dataframe as hudi dataset, but WON'T SYNC its metadata to glue catalog, * In other words, no table will be created after saving. * * @param dataframe The dataframe to be saved */def saveUserAsHudiWithoutHiveTableSync(dataframe: DataFrame) = {  val hudiOptions = Map[String, String](    HoodieWriteConfig.TABLE_NAME -> userTableName,    DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,    DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> userRecordKeyField,    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> userPrecombineField,    DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[NonpartitionedKeyGenerator].getName  )  dataframe    .write    .format("hudi")    .options(hudiOptions)    .mode(SaveMode.Append)    .save(userTablePath)}

          寫操作中大部分的代碼都是在對Hudi進行一些必要的配置,這些配置包括:

          ?指定表名;?指定寫操作的類型:是UPSERT,INSERT還是DELETE等;?指定Hudi在比對新舊數(shù)據(jù)時要使用的兩個關鍵字段的名稱:RECORDKEY_FIELD_OPT_KEYPRECOMBINE_FIELD_OPT_KEY?指定為記錄生成key的策略(一個Class)

          這些都是Hudi的基本配置,本文不再一一解釋,請讀者參考Hudi的官方文檔:https://hudi.apache.org/docs/writing_data.html

          3.4. 將Hudi元數(shù)據(jù)同步到Glue Catalog

          上述讀寫操作并沒有同步元數(shù)據(jù),在實際應用中,大多數(shù)情況下,開發(fā)者會開啟Hudi的Hive Sync功能,讓Hudi將其元數(shù)據(jù)映射到Hive Metastore中,自動創(chuàng)建Hive表,這是一個很有用的操作。不過,對于Glue來說,這個問題就比較棘手了,基于筆者的使用經(jīng)歷,早期遇到的大部分問題都出在了同步元數(shù)據(jù)上,究其原因,主要是因為Glue使用了自己的元數(shù)據(jù)服務Glue Catalog,而Hudi的元數(shù)據(jù)同步是面向Hive Metastore的。那這是否意味著Hudi就不能把元數(shù)據(jù)同步到Glue上呢?幸運的是,在經(jīng)過各種嘗試和摸索之后,我們還是順利地完成了這項工作,這為Hudi在Glue上的應用鋪平了道路。

          在介紹具體操作之前,我們先了解一下Hudi同步元數(shù)據(jù)到Hive的基本操作。根據(jù)官方文檔: https://hudi.apache.org/docs/configurations.html#hive-sync-options給出的說明,標準的Hudi Hive Sync配置應該是這樣的:

          首先是最基本的三項:

          DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true"DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY -> "your-target-database"DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "your-target-table"

          這三項很容易理解,就是告訴Hudi要開啟Hive Sync,同時指定同步到Hive的什么庫里的什么表。如果你要同步的是一張分區(qū)表,還需要追加以下幾項:

          DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[ComplexKeyGenerator].getNameDataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getNameDataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true"DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "your-partition-path-field"DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "your-hive-partition-field"

          這些配置項主要在告訴Hudi數(shù)據(jù)集的分區(qū)信息,以便Hudi能正確地將分區(qū)相關的元數(shù)據(jù)也同步到Hive Metastore中。現(xiàn)在,我們看一下在Glue中要怎樣實現(xiàn)元數(shù)據(jù)同步,也就是示例代碼中的saveUserAsHudiWithHiveTableSync方法:

          /** * Save a user dataframe as hudi dataset, but also SYNC its metadata to glue catalog, * In other words, after saving, a table named `default.user` will be created automatically by hudi hive sync * tool on Glue Catalog! * * @param dataframe The dataframe to be saved */def saveUserAsHudiWithHiveTableSync(dataframe: DataFrame) = {  val hudiOptions = Map[String, String](    HoodieWriteConfig.TABLE_NAME -> userTableName,    DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,    DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> userRecordKeyField,    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> userPrecombineField,    DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[NonpartitionedKeyGenerator].getName,    DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[NonPartitionedExtractor].getName,    // Register hudi dataset as hive table (sync meta data)    DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",    DataSourceWriteOptions.HIVE_USE_JDBC_OPT_KEY -> "false", // For glue, it is required to disable sync via hive jdbc!    DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY -> "default",    DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> userTableName  )  dataframe    .write    .format("hudi")    .options(hudiOptions)    .mode(SaveMode.Append)    .save(userTablePath)}

          該方法的實現(xiàn)在saveUserAsHudiWithoutHiveTableSync的基礎之上,追加了四個與同步元數(shù)據(jù)相關的配置項,基中三項是前面提到的必填項,唯獨:

          DataSourceWriteOptions.HIVE_USE_JDBC_OPT_KEY -> "false"

          是前面沒有提到的,而這一項配置是在Glue下同步元數(shù)據(jù)至關重要的。如果不進行此項配置,我們一定會遇到這樣一個錯誤:

          Cannot create hive connection jdbc:hive2://localhost:10000/

          這是因為:Hudi的Hive Sync默認是通過JDBC連接HiveServer2執(zhí)行建表操作的,而jdbc:hive2://localhost:10000/是Hudi配置的默認Hive JDBC連接字符串(這個字符串當然是可修改的,對應配置項為hive_sync.jdbc_url)。由于在Glue里沒有Hive Metastore和HiverServer2,所以報錯是必然的。

          那為什么在禁用JDBC方式連接Hive Metastore之后,就可以同步了呢?通過查看Hudi的源代碼可知,當HIVE_USE_JDBC_OPT_KEY被置為false時,Hudi會轉而使用一個專職的IMetaStoreClient去與對應的Metastore進行交互。在Hudi同步元數(shù)據(jù)的主要實現(xiàn)類org.apache.hudi.hive.HoodieHiveClient中,維護著一個私有成員變量private IMetaStoreClient client,Hudi就是使用這個Client去和Metastore交互的,在HoodieHiveClient中有多處代碼都是先判斷是否開啟了JDBC,如果是true,則通過JDBC做交互,如果是false,就使用Client,例如org.apache.hudi.hive.HoodieHiveClient#getTableSchema方法就是依此邏輯實現(xiàn)的:

          public class HoodieHiveClient extends AbstractSyncHoodieClient {    ...    private IMetaStoreClient client;    ...    public Map<String, String> getTableSchema(String tableName) {      if (syncConfig.useJdbc) {         ...      } else {         return getTableSchemaUsingMetastoreClient(tableName);      }    }    ...}

          而在Glue這一側,由于其使用了自己的Metastore:Glue Catalog,為了和上層Hive相關的基礎設施進行兼容,Glue提供了一個自己的IMetaStoreClient實現(xiàn)用于與Glue Catalog交互,這個實現(xiàn)就是com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient(參考:https://github.com/awslabs/aws-glue-data-catalog-client-for-apache-hive-metastore/blob/master/aws-glue-datacatalog-hive2-client/src/main/java/com/amazonaws/glue/catalog/metastore/AWSCatalogMetastoreClient.java):

          public class AWSCatalogMetastoreClient implements IMetaStoreClient {  ...}

          該類實現(xiàn)了IMetaStoreClient接口。所以只要使用的是AWSCatalogMetastoreClient這個客戶端,就能用Hive Metastore的交互方式和Glue Catalog進行交互(這得感謝Hive設計了IMetaStoreClient這個接口,而不是給出一個實現(xiàn)類)。在Spark中,有spark.hadoop.hive.metastore.client.factory.class這樣一項配置,顧名思義,這一配置就是告訴Spark使用哪一個工廠類來生產(chǎn)Hive Metastore的Client了,所以你應該大概率猜到了,在Glue里,這個配置應該是被修改了,配置的應該是某個Glue自己實現(xiàn)的工廠類,用于專門生產(chǎn)AWSCatalogMetastoreClient。是的,的確如此,在Glue里這一項是這樣配置的:

          spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory

          從Github AwsLab釋出的Glue Catalog的部分源碼中,可以找到這個類的實現(xiàn)(地址:https://github.com/awslabs/aws-glue-data-catalog-client-for-apache-hive-metastore/blob/master/aws-glue-datacatalog-spark-client/src/main/java/com/amazonaws/glue/catalog/metastore/AWSGlueDataCatalogHiveClientFactory.java):

          public class AWSGlueDataCatalogHiveClientFactory implements HiveMetaStoreClientFactory {  @Override  public IMetaStoreClient createMetaStoreClient(      HiveConf conf,      HiveMetaHookLoader hookLoader  ) throws MetaException {    AWSCatalogMetastoreClient client = new AWSCatalogMetastoreClient(conf, hookLoader);    return client;  }}

          和我們的猜測完全一致。所以,梳理下來整件事情是這樣的:當禁用Hive JDBC之后,Hudi會轉而使用一個客戶端(即某個IMetaStoreClient接口的實現(xiàn)類)與Metastore進行交互,而在Glue環(huán)境里,Glue提供了一個遵循IMetaStoreClient接口規(guī)范,但卻是與Glue Catalog 進行交互的客戶端類AWSCatalogMetastoreClient。這樣,Hudi就能通過這個客戶端與Glue Catalog進行透明交互了!

          最后,讓我們來運行一下這個作業(yè),看一看輸出的日志和同步出的數(shù)據(jù)表。回到Glue控制臺,在前面停留的“腳本編輯”頁面上,點擊“運行作業(yè)”按鈕,即可執(zhí)行作業(yè)了。在作業(yè)運行結束后,可以在“日志”Tab頁看到程序打印的信息,如下圖所示:

          其中dataframe4的數(shù)據(jù)很好地體現(xiàn)了Hudi的UPSERT能力,程序按照我們期望的邏輯執(zhí)行出了結果:Bill的年齡從32更新為了33,新增的Rose用戶也出現(xiàn)在了結果集中。于此同時,在Glue控制臺的Catalog頁面上,也會看到同步出來的user表:

          以及列信息:

          它的輸入/輸出格式以及5個_hoodie開頭的列名清楚地表明這是一張基于Hudi元數(shù)據(jù)映射出來的表。

          4. 常見錯誤

          1. hoodie only support KryoSerializer as spark.serializer

          該問題在3.2節(jié)已經(jīng)提及,是由于沒有配置spark.serializer=org.apache.spark.serializer.KryoSerializer所致,請參考前文。

          2. Cannot create hive connection jdbc:hive2://localhost:10000/

          該問題在3.3節(jié)已經(jīng)提及,須在Hudi中禁用Hive JDBC,請參考前文。

          3. Got runtime exception when hive syncing ...

          這是一個非常棘手的問題,筆者曾在這個問題上耽誤了不少時間,并研究了Hudi同步元數(shù)據(jù)的大部分代碼,坦率地說,目前它的觸發(fā)機制還不是非常確定,主要原因是在Glue這種無服務器環(huán)境下不方便進行遠程DEBUG,只能通過日志進行分析。一個大概率的懷疑方向是:在整個SparkSession的上下文中,由于某一次Hudi的讀寫操作沒能正確地關閉并釋放IMetaStoreClient實例,導致后面需要再使用該Client同步元數(shù)據(jù)時,其已經(jīng)不可用。不過,相比尚不確定的起因,其解決方案是非常清晰和確定的,即:在出錯的位置前追加一行代碼:

          Hive.closeCurrent()

          這一操作非常有效,它主動銷毀了綁定在當前線程上的org.apache.hadoop.hive.ql.metadata.Hive實例,該類的實例是存放在一個ThreadLocal變量里的,而它本身又會包含一個IMetaStoreClient實例,所以Hive實例中的Metastore客戶端也是一個線程只維護一個實例。而上述代碼顯式地關閉并釋放了當前的Client(即主動關閉并釋放已經(jīng)無法再使用的Client實例),這會促使Hudi在下一次同步元數(shù)據(jù)時重建新的Client實例。

          關于這一問題更深入的分析和研究,可參考筆者的另一篇文章《AWS Glue集成Apache Hudi同步元數(shù)據(jù)深度歷險(各類錯誤的填坑方案)》

          4. Failed to check if database exists ...

          該問題與上一個問題是一樣的,只是處在異常堆棧的不同位置上,解決辦法同上。

          5. 結語

          雖然本文篇幅較長,但是從GlueHudiReadWriteExample.scala這個類的實現(xiàn)上不難看出,只要一次性做好幾處關鍵配置,在Glue中使用Hudi其實與在Spark原生環(huán)境中使用Hudi幾乎是無異的,這意味著兩者可以平滑地集成并各自持續(xù)升級。如此一來,Glue + Hudi的技術選型將非常具有競爭力,前者是一個無服務器架構的Spark計算環(huán)境,主打零運維和極致的成本控制,后者則為新一代數(shù)據(jù)湖提供更新插入、增量查詢和并發(fā)控制等功能性支持,兩者的成功結合是一件令人激動的事情,我想再次引用文章開始時使用的一句話作為結尾:無論如何,一個支持增量數(shù)據(jù)處理的無服務器架構的數(shù)據(jù)湖是非常吸引人的!


          關于作者:耿立超,架構師,15年IT系統(tǒng)開發(fā)和架構經(jīng)驗,對大數(shù)據(jù)、企業(yè)級應用架構、SaaS、分布式存儲和領域驅動設計有豐富的實踐經(jīng)驗,熱衷函數(shù)式編程。對Hadoop/Spark 生態(tài)系統(tǒng)有深入和廣泛的了解,參與過Hadoop商業(yè)發(fā)行版的開發(fā),曾帶領團隊建設過數(shù)個完備的企業(yè)數(shù)據(jù)平臺,個人技術博客:https://laurence.blog.csdn.net/ 作者著有《大數(shù)據(jù)平臺架構與原型實現(xiàn):數(shù)據(jù)中臺建設實戰(zhàn)》一書,該書已在京東和當當上線。



          推薦閱讀
          致廣大數(shù)據(jù)湖用戶的一封信
          Apache Hudi在Linkflow構建實時數(shù)據(jù)湖的生產(chǎn)實踐
          Apache Hudi C位!云計算一哥AWS EMR 2020年度回顧
          Apache Hudi 0.8.0版本重磅發(fā)布
          一文徹底掌握Apache Hudi的主鍵和分區(qū)配置



          瀏覽 151
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  色欲影视 淫香淫色 | 亚洲av无码精品在线观看 | 日韩一级无码特黄AAA片 | 国产黄色电影网 | 青娱乐成人 |