<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Hudi 實踐 | 當(dāng) Apache Hudi DeltaStreamer 遇見 Serverless Spark

          共 25507字,需瀏覽 52分鐘

           ·

          2023-08-08 20:50



          Apache Hudi的DeltaStreamer是一種以近實時方式攝取數(shù)據(jù)并寫入Hudi表的工具類,它簡化了流式數(shù)據(jù)入湖并存儲為Hudi表的操作,自 0.10.0 版開始,Hudi又在DeltaStreamer的基礎(chǔ)上增加了基于Debezium的CDC數(shù)據(jù)處理能力,這使得其可以直接將Debezium采集的CDC數(shù)據(jù)落地成Hudi表,這一功能極大地簡化了從源頭業(yè)務(wù)數(shù)據(jù)庫到Hudi數(shù)據(jù)湖的數(shù)據(jù)集成工作。

          另一方面,得益于開箱即用和零運(yùn)維的極致體驗,越來越多的云上用戶開始擁抱Serverless產(chǎn)品。Amazon云平臺上的EMR是一個集成了多款主流大數(shù)據(jù)工具的計算平臺,自6.6.0版本開始,EMR推出了 Serverless版本,開始提供無服務(wù)器的Spark運(yùn)行環(huán)境,用戶無需維護(hù)Hadoop/Spark集群,即可輕松提交Spark作業(yè)。

          一個是“全配置”的Hudi工具類, 一個是“開箱即用”的Spark運(yùn)行環(huán)境,兩者結(jié)合在一起,無需編寫CDC處理代碼,無需構(gòu)建Spark集群,僅通過一條命令,就可以輕松實現(xiàn)CDC數(shù)據(jù)入湖,這是一個非常吸引人的技術(shù)方案,本文我們就詳細(xì)介紹一下這一方案的整體架構(gòu)和實現(xiàn)細(xì)節(jié)。

          1. 整體架構(gòu)

          Apache Huid在 0.10.0版引入的DeltaStreamer CDC是一整條CDC數(shù)據(jù)處理鏈路中的末端環(huán)節(jié),為了能讓大家清楚地理解DeltaStreamer在其中所處的位置和發(fā)揮的作用,我們有必要看一下完整架構(gòu):

          ①:MySQL是一個業(yè)務(wù)數(shù)據(jù)庫,是CDC數(shù)據(jù)的源頭;

          ②:系統(tǒng)使用一個CDC攝取工具實時讀取MySQL的binlog,業(yè)界主流的CDC攝取工具有:Debezium,Maxwell,F(xiàn)linkCDC等,在該架構(gòu)中,選型的是安裝了Debezium MySQL Connector的Kafka Connect;

          ③:現(xiàn)在越來越多的CDC數(shù)據(jù)攝取方案開始引入Schema Registry用于更好的控制上游業(yè)務(wù)系統(tǒng)的Schema變更,實現(xiàn)更可控的Schema Evolution。在開源社區(qū),較為主流的產(chǎn)品是Confluent Schema Registry,且目前Hudi的DeltaStreamer也僅支持Confluent這一種Schema Registry,所以該架構(gòu)選型的也是它。引入Schema Registry之后,Kafka Connect在捕獲一條記錄時,會先在其本地的Schema Cache中查找是否已經(jīng)存在對應(yīng)的Schema,如果有,則直接從本地Cache中獲得Schema ID,如果沒有,則會將其提交給Schema Registry,由Schema Registry完成該Schema的注冊并將生成的Schema ID返回給Kafka Connect,Kafka Connect會基于Schema ID對原始的CDC數(shù)據(jù)進(jìn)行封裝(序列化):一是將Schema ID添加到消息中,二是如果使用Avro格式傳遞消息,Kafka Connect會去除Avro消息中的Schema部分,只保留Raw Data,因為Schema信息已緩存在Producer和Consumer本地或可通過Schema Registry一次性獲得,沒有必要伴隨Raw Data傳輸,這樣可以大大減小Avro消息的體積,提升傳輸效率。這些工作是通過Confluent提供的Avro Converter(io.confluent.connect.avro.AvroConverter)完成的;

          ④:Kafka Connect將封裝好的Avro消息投遞給Kafka

          ⑤:EMR Serverless為DeltaStreamer提供Serverless的Spark運(yùn)行環(huán)境;

          ⑥:Hudi的DeltaStreamer作為一個Spark作業(yè)運(yùn)行在EMR Serverless環(huán)境中,它從Kafka讀取到Avro消息后,會使用Confluent提供的Avro反序列化器(io.confluent.kafka.serializers.KafkaAvroDeserializer)解析Avro消息,得到Schema ID和Raw Data,反序列化器同樣會先在本地的Schema Cache中根據(jù)ID查找對應(yīng)的Schema,如果找到就根據(jù)這個Schema將Raw Data反序列化,如果沒有找到,就向Schema Registry請求獲取該ID對應(yīng)的Schema,然后再進(jìn)行反序列化;

          ⑦:DeltaStreamer將解析出來的數(shù)據(jù)寫入存放在S3上的Hudi表,如果數(shù)據(jù)表不存在,會自動創(chuàng)建表并同步到Hive MetaStore中

          2. 環(huán)境準(zhǔn)備

          限于篇幅,本文不會介紹①、②、③、④環(huán)節(jié)的構(gòu)建工作,讀者可以參考以下文檔自行構(gòu)建一套完整的測試環(huán)境:

          ①M(fèi)ySQL:如果僅以測試為目的,建議使用Debezium提供的官方Docker鏡像,構(gòu)建操作可參考其官方文檔(下文將給出的操作示例所處理的CDC數(shù)據(jù)就是自于該MySQL鏡像中的inventory數(shù)據(jù)庫);

          ②Kafka Connect:如果僅以測試為目的,建議使用Confluent提供的官方Docker鏡像,構(gòu)建操作可參考其官方文檔,或者使用AWS上托管的Kafka Connct:Amazon MSK Connect。需要提醒的是:Kafka Connect上必須安裝Debezium MySQL Connector和Confluent Avro Converter兩個插件,因此需要在官方鏡像的基礎(chǔ)上手動添加這兩個插件;

          ③Confluent Schema Registry:如果僅以測試為目的,建議使用Confluent提供的官方Docker鏡像,構(gòu)建操作可參考其官方文檔

          ④Kafka:如果僅以測試為目的,建議使用Confluent提供的官方Docker鏡像,構(gòu)建操作可參考其官方文檔,或者使用AWS上托管的Kafka:Amazon MSK

          完成上述工作后,我們會獲得“Confluent Schema Registry”和“Kafka Bootstrap Servers”兩項依賴服務(wù)的地址,它們是啟動DeltaStreamer CDC作業(yè)的必要條件,后續(xù)會以參數(shù)形式傳遞給DeltaStreamer作業(yè)。

          3. 配置全局變量

          環(huán)境準(zhǔn)備工作就緒后,就可以著手第⑤、⑥、⑦部分的工作了。本文所有操作全部通過命令完成,以shell腳本形式提供給讀者使用,腳本上會標(biāo)注實操步驟的序號,如果是二選一操作,會使用字母a/b加以標(biāo)識,部分操作還有示例供讀者參考。為了使腳本具有良好的可移植性,我們將與環(huán)境相關(guān)的依賴項和需要用戶自定義的配置項抽離出來,以全局變量的形式集中配置,如果您在自己的環(huán)境中執(zhí)行本文操作,只需修改下面的全局變量即可,不必修改具體命令:

          變量 說明 設(shè)定時機(jī)
          APP_NAME 由用戶為本應(yīng)用設(shè)定的名稱 提前設(shè)定
          APP_S3_HOME 由用戶為本應(yīng)用設(shè)定的S3專屬桶 提前設(shè)定
          APP_LOCAL_HOME 由用戶為本應(yīng)用設(shè)定的本地工作目錄 提前設(shè)定
          SCHEMA_REGISTRY_URL 用戶環(huán)境中的Confluent Schema Registry地址 提前設(shè)定
          KAFKA_BOOTSTRAP_SERVERS 用戶環(huán)境中的Kafka Bootstrap Servers地址 提前設(shè)定
          EMR_SERVERLESS_APP_SUBNET_ID 將要創(chuàng)建的EMR Serverless Application所屬子網(wǎng)ID 提前設(shè)定
          EMR_SERVERLESS_APP_SECURITY_GROUP_ID 將要創(chuàng)建的EMR Serverless Application所屬安全組ID 提前設(shè)定
          EMR_SERVERLESS_APP_ID 將要創(chuàng)建的EMR Serverless Application的ID 過程中產(chǎn)生
          EMR_SERVERLESS_EXECUTION_ROLE_ARN 將要創(chuàng)建的EMR Serverless Execution Role的ARN 過程中產(chǎn)生
          EMR_SERVERLESS_JOB_RUN_ID 提交EMR Serverless作業(yè)后返回的作業(yè)ID 過程中產(chǎn)生

          接下來,我們將進(jìn)入實操階段,需要您擁有一個安裝了AWS CLI并配置了用戶憑證的Linux環(huán)境(建議使用Amazon Linux2),通過SSH登錄后,先使用命令sudo yum -y install jq安裝操作json文件的命令行工具:jq(后續(xù)腳本會使用到它),然后將以上全局變量悉數(shù)導(dǎo)出(請根據(jù)您的AWS賬號和本地環(huán)境替換命令行中的相應(yīng)值):

          # 實操步驟(1)
          export APP_NAME='change-to-your-app-name'
          export APP_S3_HOME='change-to-your-app-s3-home'
          export APP_LOCAL_HOME='change-to-your-app-local-home'
          export SCHEMA_REGISTRY_URL='change-to-your-schema-registry-url'
          export KAFKA_BOOTSTRAP_SERVERS='change-to-your-kafka-bootstrap-servers'
          export EMR_SERVERLESS_APP_SUBNET_ID='change-to-your-subnet-id'
          export EMR_SERVERLESS_APP_SECURITY_GROUP_ID='change-to-your-security-group-id'

          以下是一份示例:

          # 示例(非實操步驟)
          export APP_NAME='apache-hudi-delta-streamer'
          export APP_S3_HOME='s3://apache-hudi-delta-streamer'
          export APP_LOCAL_HOME='/home/ec2-user/apache-hudi-delta-streamer'
          export SCHEMA_REGISTRY_URL='http://localhost:8081'
          export KAFKA_BOOTSTRAP_SERVERS='localhost:9092'
          export EMR_SERVERLESS_APP_SUBNET_ID='subnet-0a11afe6dbb4df759'
          export EMR_SERVERLESS_APP_SECURITY_GROUP_ID='sg-071f18562f41b5804'

          至于 EMR_SERVERLESS_APP_IDEMR_SERVERLESS_EXECUTION_ROLE_ARNEMR_SERVERLESS_JOB_RUN_ID 三個變量將在后續(xù)的操作過程中產(chǎn)生并導(dǎo)出。

          4. 創(chuàng)建專屬工作目錄和存儲桶

          作為一項最佳實踐,我們先為應(yīng)用程序(Job)創(chuàng)建一個專屬的本地工作目錄(即APP_LOCAL_HOME設(shè)定的路徑)和一個S3存儲桶(即APP_S3_HOME設(shè)定的桶),應(yīng)用程序的腳本、配置文件、依賴包、日志以及產(chǎn)生的數(shù)據(jù)都統(tǒng)一存放在專屬目錄和存儲桶中,這樣會便于維護(hù):

          # 實操步驟(2)
          mkdir -p $APP_LOCAL_HOME
          aws s3 mb $APP_S3_HOME

          5. 創(chuàng)建 EMR Serverless Execution Role

          運(yùn)行EMR Serverless作業(yè)需要配置一個IAM Role,這個Role將賦予EMR Serverless作業(yè)訪問AWS相關(guān)資源的權(quán)限,我們的DeltaStreamer CDC作業(yè)應(yīng)至少需要分配:

          • ? 對S3專屬桶的讀寫權(quán)限

          • ? 對Glue Data Catalog的讀寫權(quán)限

          • ? 對Glue Schema Registry的讀寫權(quán)限

          您可以根據(jù)EMR Serverless的官方文檔手動創(chuàng)建這個Role,然后將其ARN作為變量導(dǎo)出(請根據(jù)您的AWS賬號環(huán)境替換命令行中的相應(yīng)值):

          # 實操步驟(3/a)
          export EMR_SERVERLESS_EXECUTION_ROLE_ARN='change-to-your-emr-serverless-execution-role-arn'

          以下是一份示例:

          # 示例(非實操步驟)
          export EMR_SERVERLESS_EXECUTION_ROLE_ARN='arn:aws:iam::123456789000:role/EMR_SERVERLESS_ADMIN'

          考慮到手動創(chuàng)建這個Role較為煩瑣,本文提供如下一段腳本,可以在您的AWS賬號中創(chuàng)建一個擁有管理員權(quán)限的Role:EMR_SERVERLESS_ADMIN,從而幫助您快速完成本節(jié)工作(注意:由于該Role具有最高權(quán)限,應(yīng)謹(jǐn)慎使用,完成快速驗證后,還是應(yīng)該在生產(chǎn)環(huán)境中配置嚴(yán)格限定權(quán)限的專有Execution Role):

          # 實操步驟(3/b)
          EMR_SERVERLESS_EXECUTION_ROLE_NAME='EMR_SERVERLESS_ADMIN'
          cat << EOF > $APP_LOCAL_HOME/assume-role-policy.json
          {
              "Version": "2012-10-17",
              "Statement": [
                  {
                      "Sid": "EMRServerlessTrustPolicy",
                      "Effect": "Allow",
                      "Principal": {
                          "Service": "emr-serverless.amazonaws.com"
                      },
                      "Action": "sts:AssumeRole"
                  }
              ]
          }
          EOF

          jq . $APP_LOCAL_HOME/assume-role-policy.json
          export EMR_SERVERLESS_EXECUTION_ROLE_ARN=$(aws iam create-role \
              --no-paginate --no-cli-pager --output text \
              --role-name "$EMR_SERVERLESS_EXECUTION_ROLE_NAME" \
              --assume-role-policy-document file://$APP_LOCAL_HOME/assume-role-policy.json \
              --query Role.Arn)
          aws iam attach-role-policy \
              --policy-arn "arn:aws:iam::aws:policy/AdministratorAccess" \
              --role-name "$EMR_SERVERLESS_EXECUTION_ROLE_NAME"

          6. 創(chuàng)建 EMR Serverless Application

          向EMR Serverless提交作業(yè)前,需要先創(chuàng)建一個EMR Serverless Application,這是EMR Serverless中的一個概念,可以理解為一個虛擬的EMR集群。在創(chuàng)建Application時,需要指定EMR的版本,網(wǎng)絡(luò)配置,集群規(guī)模,預(yù)熱節(jié)點等信息。通常,我們僅需如下一條命令就可以完成創(chuàng)建工作:

          # 示例(非實操步驟)
          aws emr-serverless create-application \
              --name "$APP_NAME" \
              --type "SPARK" \
              --release-label "emr-6.11.0"

          但是,這樣創(chuàng)建出的Application是沒有網(wǎng)絡(luò)配置的,由于我們的DeltaStreamer CDC作業(yè)需要訪問位于特定VPC中的Confluent Schema Registry和Kafka Bootstrap Servers,所以必須顯式地為Application設(shè)定子網(wǎng)和安全組,以確保DeltaStreamer可以連通這兩項服務(wù)。因此,我們需要使用以下命令創(chuàng)建一個帶有特定網(wǎng)絡(luò)配置的Application:

          # 實操步驟(4)
          cat << EOF > $APP_LOCAL_HOME/create-application.json
          {
              "name":"$APP_NAME",
              "releaseLabel":"emr-6.11.0",
              "type":"SPARK",
              "networkConfiguration":{
                  "subnetIds":[
                      "$EMR_SERVERLESS_APP_SUBNET_ID"
                  ],
                  "securityGroupIds":[
                      "$EMR_SERVERLESS_APP_SECURITY_GROUP_ID"
                  ]
              }
          }
          EOF

          jq . $APP_LOCAL_HOME/create-application.json
          export EMR_SERVERLESS_APP_ID=$(aws emr-serverless create-application \
              --no-paginate --no-cli-pager --output text \
              --release-label "emr-6.11.0" --type "SPARK" \
              --cli-input-json file://$APP_LOCAL_HOME/create-application.json \
              --query "applicationId")

          7. 提交 Apache Hudi DeltaStreamer CDC 作業(yè)

          創(chuàng)建好Application就可以提交作業(yè)了,Apache Hudi DeltaStreamer CDC是一個較為復(fù)雜的作業(yè),配置項非常多,這一點從Hudi官方博客給出的示例中可見一斑,我們要做的是:將使用spark-submit命令提交的作業(yè)“翻譯”成EMR Serverless的作業(yè)。

          7.1 準(zhǔn)備作業(yè)描述文件

          使用命令行提交EMR Serverless作業(yè)需要提供一個json格式的作業(yè)描述文件,通常在spark-submit命令行中配置的參數(shù)都會由這個文件來描述。由于DeltaStreamer作業(yè)的配置項非常多,限于篇幅,我們無法一一做出解釋,您可以將下面的作業(yè)描述文件和Hudi官方博客提供的原生Spark作業(yè)做一下對比,然后就能相對容易地理解作業(yè)描述文件的作用了。

          需要注意的是,在執(zhí)行下面的腳本時,請根據(jù)您的AWS賬號和本地環(huán)境替換腳本中所有的<your-xxx>部分,這些被替換的部分取決于您本地環(huán)境中的源頭數(shù)據(jù)庫、數(shù)據(jù)表,Kakfa Topic以及Schema Registry等信息,每換一張表都需要調(diào)整相應(yīng)的值,所以沒有被抽離到全局變量中。

          此外,該作業(yè)其實并不依賴任何第三方Jar包,其使用的Confluent Avro Converter已經(jīng)集成到了hudi-utilities-bundle.jar中,這里我們特意在配置中聲明--conf spark.jars=$(...)(參考示例命令)是為了演示“如何加載三方類庫”,供有需要的讀者參考。

          # 實操步驟(5)
          cat << EOF > $APP_LOCAL_HOME/start-job-run.json
          {
              "name":"apache-hudi-delta-streamer",
              "applicationId":"$EMR_SERVERLESS_APP_ID",
              "executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN",
              "jobDriver":{
                  "sparkSubmit":{
                  "entryPoint":"/usr/lib/hudi/hudi-utilities-bundle.jar",
                  "entryPointArguments":[
                      "--continuous",
                      "--enable-sync",
                      "--table-type", "COPY_ON_WRITE",
                      "--op", "UPSERT",
                      "--target-base-path", "<your-table-s3-path>",
                      "--target-table", "orders",
                      "--min-sync-interval-seconds", "60",
                      "--source-class", "org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource",
                      "--source-ordering-field", "_event_origin_ts_ms",
                      "--payload-class", "org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload",
                      "--hoodie-conf", "bootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS",
                      "--hoodie-conf", "schema.registry.url=$SCHEMA_REGISTRY_URL",
                      "--hoodie-conf", "hoodie.deltastreamer.schemaprovider.registry.url=${SCHEMA_REGISTRY_URL}/subjects/<your-registry-name>.<your-src-database>.<your-src-table>-value/versions/latest",
                      "--hoodie-conf", "hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer",
                      "--hoodie-conf", "hoodie.deltastreamer.source.kafka.topic=<your-kafka-topic-of-your-table-cdc-message>",
                      "--hoodie-conf", "auto.offset.reset=earliest",
                      "--hoodie-conf", "hoodie.datasource.write.recordkey.field=<your-table-recordkey-field>",
                      "--hoodie-conf", "hoodie.datasource.write.partitionpath.field=<your-table-partitionpath-field>",
                      "--hoodie-conf", "hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor",
                      "--hoodie-conf", "hoodie.datasource.write.hive_style_partitioning=true",
                      "--hoodie-conf", "hoodie.datasource.hive_sync.database=<your-sync-database>",
                      "--hoodie-conf", "hoodie.datasource.hive_sync.table==<your-sync-table>",
                      "--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields=<your-table-partition-fields>"
                  ],
                   "sparkSubmitParameters":"--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.jars=<your-app-dependent-jars>"
                  }
             },
             "configurationOverrides":{
                  "monitoringConfiguration":{
                      "s3MonitoringConfiguration":{
                          "logUri":"<your-s3-location-for-emr-logs>"
                      }
                  }
             }
          }
          EOF

          jq . $APP_LOCAL_HOME/start-job-run.json

          以下是一份示例:

          # 示例(非實操步驟)
          cat << EOF > $APP_LOCAL_HOME/start-job-run.json
          {
              "name":"apache-hudi-delta-streamer",
              "applicationId":"$EMR_SERVERLESS_APP_ID",
              "executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN",
              "jobDriver":{
                  "sparkSubmit":{
                  "entryPoint":"/usr/lib/hudi/hudi-utilities-bundle.jar",
                  "entryPointArguments":[
                      "--continuous",
                      "--enable-sync",
                      "--table-type", "COPY_ON_WRITE",
                      "--op", "UPSERT",
                      "--target-base-path", "$APP_S3_HOME/data/mysql-server-3/inventory/orders",
                      "--target-table", "orders",
                      "--min-sync-interval-seconds", "60",
                      "--source-class", "org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource",
                      "--source-ordering-field", "_event_origin_ts_ms",
                      "--payload-class", "org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload",
                      "--hoodie-conf", "bootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS",
                      "--hoodie-conf", "schema.registry.url=$SCHEMA_REGISTRY_URL",
                      "--hoodie-conf", "hoodie.deltastreamer.schemaprovider.registry.url=${SCHEMA_REGISTRY_URL}/subjects/osci.mysql-server-3.inventory.orders-value/versions/latest",
                      "--hoodie-conf", "hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer",
                      "--hoodie-conf", "hoodie.deltastreamer.source.kafka.topic=osci.mysql-server-3.inventory.orders",
                      "--hoodie-conf", "auto.offset.reset=earliest",
                      "--hoodie-conf", "hoodie.datasource.write.recordkey.field=order_number",
                      "--hoodie-conf", "hoodie.datasource.write.partitionpath.field=order_date",
                      "--hoodie-conf", "hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor",
                      "--hoodie-conf", "hoodie.datasource.write.hive_style_partitioning=true",
                      "--hoodie-conf", "hoodie.datasource.hive_sync.database=inventory",
                      "--hoodie-conf", "hoodie.datasource.hive_sync.table=orders",
                      "--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields=order_date"
                  ],
                   "sparkSubmitParameters":"--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.jars=$(aws s3 ls $APP_S3_HOME/jars/ | grep -o '\S*\.jar$'| awk '{print "'"$APP_S3_HOME/jars/"'"$1","}' | tr -d '\n' | sed 's/,$//')"
                  }
             },
             "configurationOverrides":{
                  "monitoringConfiguration":{
                      "s3MonitoringConfiguration":{
                          "logUri":"$APP_S3_HOME/logs"
                      }
                  }
             }
          }
          EOF
          jq . $APP_LOCAL_HOME/start-job-run.json

          7.2 提交作業(yè)

          準(zhǔn)備好作業(yè)描述文件后,就可以正式提交作業(yè)了,命令如下:

          # 實操步驟(6)
          export EMR_SERVERLESS_JOB_RUN_ID=$(aws emr-serverless start-job-run \
              --no-paginate --no-cli-pager --output text \
              --name apache-hudi-delta-streamer \
              --application-id $EMR_SERVERLESS_APP_ID \
              --execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \
              --execution-timeout-minutes 0 \
              --cli-input-json file://$APP_LOCAL_HOME/start-job-run.json \
              --query jobRunId)

          7.3 監(jiān)控作業(yè)

          作業(yè)提交后,可以在控制臺查看作業(yè)運(yùn)行狀態(tài)。如果想在命令行窗口持續(xù)監(jiān)控作業(yè),可以使用如下腳本:

          # 實操步驟(7)
          now=$(date +%s)sec
          while truedo
              jobStatus=$(aws emr-serverless get-job-run \
                              --no-paginate --no-cli-pager --output text \
                              --application-id $EMR_SERVERLESS_APP_ID \
                              --job-run-id $EMR_SERVERLESS_JOB_RUN_ID \
                              --query jobRun.state)
              if [ "$jobStatus" = "PENDING" ] || [ "$jobStatus" = "SCHEDULED" ] || [ "$jobStatus" = "RUNNING" ]; then
                  for i in {0..5}; do
                      echo -ne "\E[33;5m>>> The job [ $EMR_SERVERLESS_JOB_RUN_ID ] state is [ $jobStatus ], duration [ $(date -u --date now-$now +%H:%M:%S) ] ....\r\E[0m"
                      sleep 1
                  done
              else
                  echo -ne "The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]\n\n"
                  break
              fi
          done

          7.4 錯誤檢索

          作業(yè)開始運(yùn)行后,Spark Driver和Executor會持續(xù)生成日志,這些日志存放在配置的$APP_S3_HOME/logs路徑下,如果作業(yè)失敗,可以使用下面的腳本快速檢索到錯誤信息:

          # 實操步驟(8)
          JOB_LOG_HOME=$APP_LOCAL_HOME/log/$EMR_SERVERLESS_JOB_RUN_ID
          rm -rf $JOB_LOG_HOME && mkdir -p $JOB_LOG_HOME
          aws s3 cp --recursive $APP_S3_HOME/logs/applications/$EMR_SERVERLESS_APP_ID/jobs/$EMR_SERVERLESS_JOB_RUN_ID$JOB_LOG_HOME >& /dev/null
          gzip -d -r -f $JOB_LOG_HOME >& /dev/null
          grep --color=always -r -i -E 'error|failed|exception' $JOB_LOG_HOME

          7.5 停止作業(yè)

          DeltaStreamer是一個持續(xù)運(yùn)行的作業(yè),如果需要停止作業(yè),可以使用如下命令:

          # 實操步驟(9)
          aws emr-serverless cancel-job-run \
              --no-paginate --no-cli-pager\
              --application-id $EMR_SERVERLESS_APP_ID \
              --job-run-id $EMR_SERVERLESS_JOB_RUN_ID

          8. 結(jié)果驗證

          作業(yè)啟動后會自動創(chuàng)建一個數(shù)據(jù)表,并在指定的S3位置上寫入數(shù)據(jù),使用如下命令可以查看自動創(chuàng)建的數(shù)據(jù)表和落地的數(shù)據(jù)文件:

          # 實操步驟(10)
          aws s3 ls --recursive <your-table-s3-path>
          aws glue get-table --no-paginate --no-cli-pager \
              --database-name <your-sync-database> --name <your-sync-table>
          # 示例(非實操步驟)
          aws s3 ls --recursive $APP_S3_HOME/data/mysql-server-3/inventory/orders/
          aws glue get-table --no-paginate --no-cli-pager \
              --database-name inventory --name orders

          9. 評估與展望

          本文,我們詳細(xì)介紹了如何在EMR Serverless上運(yùn)行Apapche Hudi DeltaStreamer將CDC數(shù)據(jù)接入到Hudi表中,這是一個主打“零編碼”,“零運(yùn)維”的超輕量解決方案。但是,它的局限性也很明顯,那就是:一個DeltaStreamer作業(yè)只能接入一張表,這對于動輒就需要接入數(shù)百張甚至數(shù)千張表的數(shù)據(jù)湖來說是不實用的,盡管Hudi也提供了用于多表接入的MultiTableDeltaStreamer,但是這個工具類目前的成熟度和完備性還不足以應(yīng)用于生產(chǎn)。此外,Hudi自0.10.0起針對Kafka Connect提供了Hudi Sink插件(目前也是僅支持單表),為CDC數(shù)據(jù)接入Hudi數(shù)據(jù)湖開辟了新的途徑,這是值得持續(xù)關(guān)注的新亮點。

          從長遠(yuǎn)來看,CDC數(shù)據(jù)入湖并落地為Hudi表是一個非常普遍的需求,迭代并完善包括DeltaStreamer、HoodieMultiTableDeltaStreamer和Kafka Connect Hudi Sink插件在內(nèi)的多種原生組件在社區(qū)的呼聲將會越來越強(qiáng)烈,相信伴隨著Hudi的蓬勃發(fā)展,這些組件將不斷成熟起來,并逐步應(yīng)用到生產(chǎn)環(huán)境中。

          推薦閱讀

          數(shù)據(jù)湖在快手的生產(chǎn)實踐

          圖加速數(shù)據(jù)湖分析-GeaFlow和Apache Hudi集成

          加速LakeHouse ACID Upsert的新寫時復(fù)制方案

          Apache Hudi Timeline Server介紹

          如何不加鎖地將數(shù)據(jù)并發(fā)寫入Apache Hudi?


          關(guān)于作者:耿立超,架構(gòu)師,著有 《大數(shù)據(jù)平臺架構(gòu)與原型實現(xiàn):數(shù)據(jù)中臺建設(shè)實戰(zhàn)》一書,多年IT系統(tǒng)開發(fā)和架構(gòu)經(jīng)驗,對大數(shù)據(jù)、企業(yè)級應(yīng)用架構(gòu)、SaaS、分布式存儲和領(lǐng)域驅(qū)動設(shè)計有豐富的實踐經(jīng)驗,個人技術(shù)博客:https://laurence.blog.csdn.net


          瀏覽 224
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲AV无码久久精品色无码蜜桃 | 日逼91| 蜜桃AV无码乱码精品 | 熟女人妻一区二区三区免费看 | AV天堂观看 |