Hudi 實踐 | 當(dāng) Apache Hudi DeltaStreamer 遇見 Serverless Spark
Apache Hudi的DeltaStreamer是一種以近實時方式攝取數(shù)據(jù)并寫入Hudi表的工具類,它簡化了流式數(shù)據(jù)入湖并存儲為Hudi表的操作,自 0.10.0 版開始,Hudi又在DeltaStreamer的基礎(chǔ)上增加了基于Debezium的CDC數(shù)據(jù)處理能力,這使得其可以直接將Debezium采集的CDC數(shù)據(jù)落地成Hudi表,這一功能極大地簡化了從源頭業(yè)務(wù)數(shù)據(jù)庫到Hudi數(shù)據(jù)湖的數(shù)據(jù)集成工作。
另一方面,得益于開箱即用和零運(yùn)維的極致體驗,越來越多的云上用戶開始擁抱Serverless產(chǎn)品。Amazon云平臺上的EMR是一個集成了多款主流大數(shù)據(jù)工具的計算平臺,自6.6.0版本開始,EMR推出了 Serverless版本,開始提供無服務(wù)器的Spark運(yùn)行環(huán)境,用戶無需維護(hù)Hadoop/Spark集群,即可輕松提交Spark作業(yè)。
一個是“全配置”的Hudi工具類, 一個是“開箱即用”的Spark運(yùn)行環(huán)境,兩者結(jié)合在一起,無需編寫CDC處理代碼,無需構(gòu)建Spark集群,僅通過一條命令,就可以輕松實現(xiàn)CDC數(shù)據(jù)入湖,這是一個非常吸引人的技術(shù)方案,本文我們就詳細(xì)介紹一下這一方案的整體架構(gòu)和實現(xiàn)細(xì)節(jié)。
1. 整體架構(gòu)
Apache Huid在 0.10.0版引入的DeltaStreamer CDC是一整條CDC數(shù)據(jù)處理鏈路中的末端環(huán)節(jié),為了能讓大家清楚地理解DeltaStreamer在其中所處的位置和發(fā)揮的作用,我們有必要看一下完整架構(gòu):
①:MySQL是一個業(yè)務(wù)數(shù)據(jù)庫,是CDC數(shù)據(jù)的源頭;
②:系統(tǒng)使用一個CDC攝取工具實時讀取MySQL的binlog,業(yè)界主流的CDC攝取工具有:Debezium,Maxwell,F(xiàn)linkCDC等,在該架構(gòu)中,選型的是安裝了Debezium MySQL Connector的Kafka Connect;
③:現(xiàn)在越來越多的CDC數(shù)據(jù)攝取方案開始引入Schema Registry用于更好的控制上游業(yè)務(wù)系統(tǒng)的Schema變更,實現(xiàn)更可控的Schema Evolution。在開源社區(qū),較為主流的產(chǎn)品是Confluent Schema Registry,且目前Hudi的DeltaStreamer也僅支持Confluent這一種Schema Registry,所以該架構(gòu)選型的也是它。引入Schema Registry之后,Kafka Connect在捕獲一條記錄時,會先在其本地的Schema Cache中查找是否已經(jīng)存在對應(yīng)的Schema,如果有,則直接從本地Cache中獲得Schema ID,如果沒有,則會將其提交給Schema Registry,由Schema Registry完成該Schema的注冊并將生成的Schema ID返回給Kafka Connect,Kafka Connect會基于Schema ID對原始的CDC數(shù)據(jù)進(jìn)行封裝(序列化):一是將Schema ID添加到消息中,二是如果使用Avro格式傳遞消息,Kafka Connect會去除Avro消息中的Schema部分,只保留Raw Data,因為Schema信息已緩存在Producer和Consumer本地或可通過Schema Registry一次性獲得,沒有必要伴隨Raw Data傳輸,這樣可以大大減小Avro消息的體積,提升傳輸效率。這些工作是通過Confluent提供的Avro Converter(io.confluent.connect.avro.AvroConverter)完成的;
④:Kafka Connect將封裝好的Avro消息投遞給Kafka
⑤:EMR Serverless為DeltaStreamer提供Serverless的Spark運(yùn)行環(huán)境;
⑥:Hudi的DeltaStreamer作為一個Spark作業(yè)運(yùn)行在EMR Serverless環(huán)境中,它從Kafka讀取到Avro消息后,會使用Confluent提供的Avro反序列化器(io.confluent.kafka.serializers.KafkaAvroDeserializer)解析Avro消息,得到Schema ID和Raw Data,反序列化器同樣會先在本地的Schema Cache中根據(jù)ID查找對應(yīng)的Schema,如果找到就根據(jù)這個Schema將Raw Data反序列化,如果沒有找到,就向Schema Registry請求獲取該ID對應(yīng)的Schema,然后再進(jìn)行反序列化;
⑦:DeltaStreamer將解析出來的數(shù)據(jù)寫入存放在S3上的Hudi表,如果數(shù)據(jù)表不存在,會自動創(chuàng)建表并同步到Hive MetaStore中
2. 環(huán)境準(zhǔn)備
限于篇幅,本文不會介紹①、②、③、④環(huán)節(jié)的構(gòu)建工作,讀者可以參考以下文檔自行構(gòu)建一套完整的測試環(huán)境:
①M(fèi)ySQL:如果僅以測試為目的,建議使用Debezium提供的官方Docker鏡像,構(gòu)建操作可參考其官方文檔(下文將給出的操作示例所處理的CDC數(shù)據(jù)就是自于該MySQL鏡像中的inventory數(shù)據(jù)庫);
②Kafka Connect:如果僅以測試為目的,建議使用Confluent提供的官方Docker鏡像,構(gòu)建操作可參考其官方文檔,或者使用AWS上托管的Kafka Connct:Amazon MSK Connect。需要提醒的是:Kafka Connect上必須安裝Debezium MySQL Connector和Confluent Avro Converter兩個插件,因此需要在官方鏡像的基礎(chǔ)上手動添加這兩個插件;
③Confluent Schema Registry:如果僅以測試為目的,建議使用Confluent提供的官方Docker鏡像,構(gòu)建操作可參考其官方文檔;
④Kafka:如果僅以測試為目的,建議使用Confluent提供的官方Docker鏡像,構(gòu)建操作可參考其官方文檔,或者使用AWS上托管的Kafka:Amazon MSK
完成上述工作后,我們會獲得“Confluent Schema Registry”和“Kafka Bootstrap Servers”兩項依賴服務(wù)的地址,它們是啟動DeltaStreamer CDC作業(yè)的必要條件,后續(xù)會以參數(shù)形式傳遞給DeltaStreamer作業(yè)。
3. 配置全局變量
環(huán)境準(zhǔn)備工作就緒后,就可以著手第⑤、⑥、⑦部分的工作了。本文所有操作全部通過命令完成,以shell腳本形式提供給讀者使用,腳本上會標(biāo)注實操步驟的序號,如果是二選一操作,會使用字母a/b加以標(biāo)識,部分操作還有示例供讀者參考。為了使腳本具有良好的可移植性,我們將與環(huán)境相關(guān)的依賴項和需要用戶自定義的配置項抽離出來,以全局變量的形式集中配置,如果您在自己的環(huán)境中執(zhí)行本文操作,只需修改下面的全局變量即可,不必修改具體命令:
| 變量 | 說明 | 設(shè)定時機(jī) |
| APP_NAME | 由用戶為本應(yīng)用設(shè)定的名稱 | 提前設(shè)定 |
| APP_S3_HOME | 由用戶為本應(yīng)用設(shè)定的S3專屬桶 | 提前設(shè)定 |
| APP_LOCAL_HOME | 由用戶為本應(yīng)用設(shè)定的本地工作目錄 | 提前設(shè)定 |
| SCHEMA_REGISTRY_URL | 用戶環(huán)境中的Confluent Schema Registry地址 | 提前設(shè)定 |
| KAFKA_BOOTSTRAP_SERVERS | 用戶環(huán)境中的Kafka Bootstrap Servers地址 | 提前設(shè)定 |
| EMR_SERVERLESS_APP_SUBNET_ID | 將要創(chuàng)建的EMR Serverless Application所屬子網(wǎng)ID | 提前設(shè)定 |
| EMR_SERVERLESS_APP_SECURITY_GROUP_ID | 將要創(chuàng)建的EMR Serverless Application所屬安全組ID | 提前設(shè)定 |
| EMR_SERVERLESS_APP_ID | 將要創(chuàng)建的EMR Serverless Application的ID | 過程中產(chǎn)生 |
| EMR_SERVERLESS_EXECUTION_ROLE_ARN | 將要創(chuàng)建的EMR Serverless Execution Role的ARN | 過程中產(chǎn)生 |
| EMR_SERVERLESS_JOB_RUN_ID | 提交EMR Serverless作業(yè)后返回的作業(yè)ID | 過程中產(chǎn)生 |
接下來,我們將進(jìn)入實操階段,需要您擁有一個安裝了AWS CLI并配置了用戶憑證的Linux環(huán)境(建議使用Amazon Linux2),通過SSH登錄后,先使用命令sudo yum -y install jq安裝操作json文件的命令行工具:jq(后續(xù)腳本會使用到它),然后將以上全局變量悉數(shù)導(dǎo)出(請根據(jù)您的AWS賬號和本地環(huán)境替換命令行中的相應(yīng)值):
# 實操步驟(1)
export APP_NAME='change-to-your-app-name'
export APP_S3_HOME='change-to-your-app-s3-home'
export APP_LOCAL_HOME='change-to-your-app-local-home'
export SCHEMA_REGISTRY_URL='change-to-your-schema-registry-url'
export KAFKA_BOOTSTRAP_SERVERS='change-to-your-kafka-bootstrap-servers'
export EMR_SERVERLESS_APP_SUBNET_ID='change-to-your-subnet-id'
export EMR_SERVERLESS_APP_SECURITY_GROUP_ID='change-to-your-security-group-id'
以下是一份示例:
# 示例(非實操步驟)
export APP_NAME='apache-hudi-delta-streamer'
export APP_S3_HOME='s3://apache-hudi-delta-streamer'
export APP_LOCAL_HOME='/home/ec2-user/apache-hudi-delta-streamer'
export SCHEMA_REGISTRY_URL='http://localhost:8081'
export KAFKA_BOOTSTRAP_SERVERS='localhost:9092'
export EMR_SERVERLESS_APP_SUBNET_ID='subnet-0a11afe6dbb4df759'
export EMR_SERVERLESS_APP_SECURITY_GROUP_ID='sg-071f18562f41b5804'
至于 EMR_SERVERLESS_APP_ID、EMR_SERVERLESS_EXECUTION_ROLE_ARN、EMR_SERVERLESS_JOB_RUN_ID 三個變量將在后續(xù)的操作過程中產(chǎn)生并導(dǎo)出。
4. 創(chuàng)建專屬工作目錄和存儲桶
作為一項最佳實踐,我們先為應(yīng)用程序(Job)創(chuàng)建一個專屬的本地工作目錄(即APP_LOCAL_HOME設(shè)定的路徑)和一個S3存儲桶(即APP_S3_HOME設(shè)定的桶),應(yīng)用程序的腳本、配置文件、依賴包、日志以及產(chǎn)生的數(shù)據(jù)都統(tǒng)一存放在專屬目錄和存儲桶中,這樣會便于維護(hù):
# 實操步驟(2)
mkdir -p $APP_LOCAL_HOME
aws s3 mb $APP_S3_HOME
5. 創(chuàng)建 EMR Serverless Execution Role
運(yùn)行EMR Serverless作業(yè)需要配置一個IAM Role,這個Role將賦予EMR Serverless作業(yè)訪問AWS相關(guān)資源的權(quán)限,我們的DeltaStreamer CDC作業(yè)應(yīng)至少需要分配:
? 對S3專屬桶的讀寫權(quán)限
? 對Glue Data Catalog的讀寫權(quán)限
? 對Glue Schema Registry的讀寫權(quán)限
您可以根據(jù)EMR Serverless的官方文檔手動創(chuàng)建這個Role,然后將其ARN作為變量導(dǎo)出(請根據(jù)您的AWS賬號環(huán)境替換命令行中的相應(yīng)值):
# 實操步驟(3/a)
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='change-to-your-emr-serverless-execution-role-arn'
以下是一份示例:
# 示例(非實操步驟)
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='arn:aws:iam::123456789000:role/EMR_SERVERLESS_ADMIN'
考慮到手動創(chuàng)建這個Role較為煩瑣,本文提供如下一段腳本,可以在您的AWS賬號中創(chuàng)建一個擁有管理員權(quán)限的Role:EMR_SERVERLESS_ADMIN,從而幫助您快速完成本節(jié)工作(注意:由于該Role具有最高權(quán)限,應(yīng)謹(jǐn)慎使用,完成快速驗證后,還是應(yīng)該在生產(chǎn)環(huán)境中配置嚴(yán)格限定權(quán)限的專有Execution Role):
# 實操步驟(3/b)
EMR_SERVERLESS_EXECUTION_ROLE_NAME='EMR_SERVERLESS_ADMIN'
cat << EOF > $APP_LOCAL_HOME/assume-role-policy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "EMRServerlessTrustPolicy",
"Effect": "Allow",
"Principal": {
"Service": "emr-serverless.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOF
jq . $APP_LOCAL_HOME/assume-role-policy.json
export EMR_SERVERLESS_EXECUTION_ROLE_ARN=$(aws iam create-role \
--no-paginate --no-cli-pager --output text \
--role-name "$EMR_SERVERLESS_EXECUTION_ROLE_NAME" \
--assume-role-policy-document file://$APP_LOCAL_HOME/assume-role-policy.json \
--query Role.Arn)
aws iam attach-role-policy \
--policy-arn "arn:aws:iam::aws:policy/AdministratorAccess" \
--role-name "$EMR_SERVERLESS_EXECUTION_ROLE_NAME"
6. 創(chuàng)建 EMR Serverless Application
向EMR Serverless提交作業(yè)前,需要先創(chuàng)建一個EMR Serverless Application,這是EMR Serverless中的一個概念,可以理解為一個虛擬的EMR集群。在創(chuàng)建Application時,需要指定EMR的版本,網(wǎng)絡(luò)配置,集群規(guī)模,預(yù)熱節(jié)點等信息。通常,我們僅需如下一條命令就可以完成創(chuàng)建工作:
# 示例(非實操步驟)
aws emr-serverless create-application \
--name "$APP_NAME" \
--type "SPARK" \
--release-label "emr-6.11.0"
但是,這樣創(chuàng)建出的Application是沒有網(wǎng)絡(luò)配置的,由于我們的DeltaStreamer CDC作業(yè)需要訪問位于特定VPC中的Confluent Schema Registry和Kafka Bootstrap Servers,所以必須顯式地為Application設(shè)定子網(wǎng)和安全組,以確保DeltaStreamer可以連通這兩項服務(wù)。因此,我們需要使用以下命令創(chuàng)建一個帶有特定網(wǎng)絡(luò)配置的Application:
# 實操步驟(4)
cat << EOF > $APP_LOCAL_HOME/create-application.json
{
"name":"$APP_NAME",
"releaseLabel":"emr-6.11.0",
"type":"SPARK",
"networkConfiguration":{
"subnetIds":[
"$EMR_SERVERLESS_APP_SUBNET_ID"
],
"securityGroupIds":[
"$EMR_SERVERLESS_APP_SECURITY_GROUP_ID"
]
}
}
EOF
jq . $APP_LOCAL_HOME/create-application.json
export EMR_SERVERLESS_APP_ID=$(aws emr-serverless create-application \
--no-paginate --no-cli-pager --output text \
--release-label "emr-6.11.0" --type "SPARK" \
--cli-input-json file://$APP_LOCAL_HOME/create-application.json \
--query "applicationId")
7. 提交 Apache Hudi DeltaStreamer CDC 作業(yè)
創(chuàng)建好Application就可以提交作業(yè)了,Apache Hudi DeltaStreamer CDC是一個較為復(fù)雜的作業(yè),配置項非常多,這一點從Hudi官方博客給出的示例中可見一斑,我們要做的是:將使用spark-submit命令提交的作業(yè)“翻譯”成EMR Serverless的作業(yè)。
7.1 準(zhǔn)備作業(yè)描述文件
使用命令行提交EMR Serverless作業(yè)需要提供一個json格式的作業(yè)描述文件,通常在spark-submit命令行中配置的參數(shù)都會由這個文件來描述。由于DeltaStreamer作業(yè)的配置項非常多,限于篇幅,我們無法一一做出解釋,您可以將下面的作業(yè)描述文件和Hudi官方博客提供的原生Spark作業(yè)做一下對比,然后就能相對容易地理解作業(yè)描述文件的作用了。
需要注意的是,在執(zhí)行下面的腳本時,請根據(jù)您的AWS賬號和本地環(huán)境替換腳本中所有的<your-xxx>部分,這些被替換的部分取決于您本地環(huán)境中的源頭數(shù)據(jù)庫、數(shù)據(jù)表,Kakfa Topic以及Schema Registry等信息,每換一張表都需要調(diào)整相應(yīng)的值,所以沒有被抽離到全局變量中。
此外,該作業(yè)其實并不依賴任何第三方Jar包,其使用的Confluent Avro Converter已經(jīng)集成到了hudi-utilities-bundle.jar中,這里我們特意在配置中聲明--conf spark.jars=$(...)(參考示例命令)是為了演示“如何加載三方類庫”,供有需要的讀者參考。
# 實操步驟(5)
cat << EOF > $APP_LOCAL_HOME/start-job-run.json
{
"name":"apache-hudi-delta-streamer",
"applicationId":"$EMR_SERVERLESS_APP_ID",
"executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN",
"jobDriver":{
"sparkSubmit":{
"entryPoint":"/usr/lib/hudi/hudi-utilities-bundle.jar",
"entryPointArguments":[
"--continuous",
"--enable-sync",
"--table-type", "COPY_ON_WRITE",
"--op", "UPSERT",
"--target-base-path", "<your-table-s3-path>",
"--target-table", "orders",
"--min-sync-interval-seconds", "60",
"--source-class", "org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource",
"--source-ordering-field", "_event_origin_ts_ms",
"--payload-class", "org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload",
"--hoodie-conf", "bootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS",
"--hoodie-conf", "schema.registry.url=$SCHEMA_REGISTRY_URL",
"--hoodie-conf", "hoodie.deltastreamer.schemaprovider.registry.url=${SCHEMA_REGISTRY_URL}/subjects/<your-registry-name>.<your-src-database>.<your-src-table>-value/versions/latest",
"--hoodie-conf", "hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer",
"--hoodie-conf", "hoodie.deltastreamer.source.kafka.topic=<your-kafka-topic-of-your-table-cdc-message>",
"--hoodie-conf", "auto.offset.reset=earliest",
"--hoodie-conf", "hoodie.datasource.write.recordkey.field=<your-table-recordkey-field>",
"--hoodie-conf", "hoodie.datasource.write.partitionpath.field=<your-table-partitionpath-field>",
"--hoodie-conf", "hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor",
"--hoodie-conf", "hoodie.datasource.write.hive_style_partitioning=true",
"--hoodie-conf", "hoodie.datasource.hive_sync.database=<your-sync-database>",
"--hoodie-conf", "hoodie.datasource.hive_sync.table==<your-sync-table>",
"--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields=<your-table-partition-fields>"
],
"sparkSubmitParameters":"--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.jars=<your-app-dependent-jars>"
}
},
"configurationOverrides":{
"monitoringConfiguration":{
"s3MonitoringConfiguration":{
"logUri":"<your-s3-location-for-emr-logs>"
}
}
}
}
EOF
jq . $APP_LOCAL_HOME/start-job-run.json
以下是一份示例:
# 示例(非實操步驟)
cat << EOF > $APP_LOCAL_HOME/start-job-run.json
{
"name":"apache-hudi-delta-streamer",
"applicationId":"$EMR_SERVERLESS_APP_ID",
"executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN",
"jobDriver":{
"sparkSubmit":{
"entryPoint":"/usr/lib/hudi/hudi-utilities-bundle.jar",
"entryPointArguments":[
"--continuous",
"--enable-sync",
"--table-type", "COPY_ON_WRITE",
"--op", "UPSERT",
"--target-base-path", "$APP_S3_HOME/data/mysql-server-3/inventory/orders",
"--target-table", "orders",
"--min-sync-interval-seconds", "60",
"--source-class", "org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource",
"--source-ordering-field", "_event_origin_ts_ms",
"--payload-class", "org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload",
"--hoodie-conf", "bootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS",
"--hoodie-conf", "schema.registry.url=$SCHEMA_REGISTRY_URL",
"--hoodie-conf", "hoodie.deltastreamer.schemaprovider.registry.url=${SCHEMA_REGISTRY_URL}/subjects/osci.mysql-server-3.inventory.orders-value/versions/latest",
"--hoodie-conf", "hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer",
"--hoodie-conf", "hoodie.deltastreamer.source.kafka.topic=osci.mysql-server-3.inventory.orders",
"--hoodie-conf", "auto.offset.reset=earliest",
"--hoodie-conf", "hoodie.datasource.write.recordkey.field=order_number",
"--hoodie-conf", "hoodie.datasource.write.partitionpath.field=order_date",
"--hoodie-conf", "hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor",
"--hoodie-conf", "hoodie.datasource.write.hive_style_partitioning=true",
"--hoodie-conf", "hoodie.datasource.hive_sync.database=inventory",
"--hoodie-conf", "hoodie.datasource.hive_sync.table=orders",
"--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields=order_date"
],
"sparkSubmitParameters":"--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.jars=$(aws s3 ls $APP_S3_HOME/jars/ | grep -o '\S*\.jar$'| awk '{print "'"$APP_S3_HOME/jars/"'"$1","}' | tr -d '\n' | sed 's/,$//')"
}
},
"configurationOverrides":{
"monitoringConfiguration":{
"s3MonitoringConfiguration":{
"logUri":"$APP_S3_HOME/logs"
}
}
}
}
EOF
jq . $APP_LOCAL_HOME/start-job-run.json
7.2 提交作業(yè)
準(zhǔn)備好作業(yè)描述文件后,就可以正式提交作業(yè)了,命令如下:
# 實操步驟(6)
export EMR_SERVERLESS_JOB_RUN_ID=$(aws emr-serverless start-job-run \
--no-paginate --no-cli-pager --output text \
--name apache-hudi-delta-streamer \
--application-id $EMR_SERVERLESS_APP_ID \
--execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \
--execution-timeout-minutes 0 \
--cli-input-json file://$APP_LOCAL_HOME/start-job-run.json \
--query jobRunId)
7.3 監(jiān)控作業(yè)
作業(yè)提交后,可以在控制臺查看作業(yè)運(yùn)行狀態(tài)。如果想在命令行窗口持續(xù)監(jiān)控作業(yè),可以使用如下腳本:
# 實操步驟(7)
now=$(date +%s)sec
while true; do
jobStatus=$(aws emr-serverless get-job-run \
--no-paginate --no-cli-pager --output text \
--application-id $EMR_SERVERLESS_APP_ID \
--job-run-id $EMR_SERVERLESS_JOB_RUN_ID \
--query jobRun.state)
if [ "$jobStatus" = "PENDING" ] || [ "$jobStatus" = "SCHEDULED" ] || [ "$jobStatus" = "RUNNING" ]; then
for i in {0..5}; do
echo -ne "\E[33;5m>>> The job [ $EMR_SERVERLESS_JOB_RUN_ID ] state is [ $jobStatus ], duration [ $(date -u --date now-$now +%H:%M:%S) ] ....\r\E[0m"
sleep 1
done
else
echo -ne "The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]\n\n"
break
fi
done
7.4 錯誤檢索
作業(yè)開始運(yùn)行后,Spark Driver和Executor會持續(xù)生成日志,這些日志存放在配置的$APP_S3_HOME/logs路徑下,如果作業(yè)失敗,可以使用下面的腳本快速檢索到錯誤信息:
# 實操步驟(8)
JOB_LOG_HOME=$APP_LOCAL_HOME/log/$EMR_SERVERLESS_JOB_RUN_ID
rm -rf $JOB_LOG_HOME && mkdir -p $JOB_LOG_HOME
aws s3 cp --recursive $APP_S3_HOME/logs/applications/$EMR_SERVERLESS_APP_ID/jobs/$EMR_SERVERLESS_JOB_RUN_ID/ $JOB_LOG_HOME >& /dev/null
gzip -d -r -f $JOB_LOG_HOME >& /dev/null
grep --color=always -r -i -E 'error|failed|exception' $JOB_LOG_HOME
7.5 停止作業(yè)
DeltaStreamer是一個持續(xù)運(yùn)行的作業(yè),如果需要停止作業(yè),可以使用如下命令:
# 實操步驟(9)
aws emr-serverless cancel-job-run \
--no-paginate --no-cli-pager\
--application-id $EMR_SERVERLESS_APP_ID \
--job-run-id $EMR_SERVERLESS_JOB_RUN_ID
8. 結(jié)果驗證
作業(yè)啟動后會自動創(chuàng)建一個數(shù)據(jù)表,并在指定的S3位置上寫入數(shù)據(jù),使用如下命令可以查看自動創(chuàng)建的數(shù)據(jù)表和落地的數(shù)據(jù)文件:
# 實操步驟(10)
aws s3 ls --recursive <your-table-s3-path>
aws glue get-table --no-paginate --no-cli-pager \
--database-name <your-sync-database> --name <your-sync-table>
# 示例(非實操步驟)
aws s3 ls --recursive $APP_S3_HOME/data/mysql-server-3/inventory/orders/
aws glue get-table --no-paginate --no-cli-pager \
--database-name inventory --name orders
9. 評估與展望
本文,我們詳細(xì)介紹了如何在EMR Serverless上運(yùn)行Apapche Hudi DeltaStreamer將CDC數(shù)據(jù)接入到Hudi表中,這是一個主打“零編碼”,“零運(yùn)維”的超輕量解決方案。但是,它的局限性也很明顯,那就是:一個DeltaStreamer作業(yè)只能接入一張表,這對于動輒就需要接入數(shù)百張甚至數(shù)千張表的數(shù)據(jù)湖來說是不實用的,盡管Hudi也提供了用于多表接入的MultiTableDeltaStreamer,但是這個工具類目前的成熟度和完備性還不足以應(yīng)用于生產(chǎn)。此外,Hudi自0.10.0起針對Kafka Connect提供了Hudi Sink插件(目前也是僅支持單表),為CDC數(shù)據(jù)接入Hudi數(shù)據(jù)湖開辟了新的途徑,這是值得持續(xù)關(guān)注的新亮點。
從長遠(yuǎn)來看,CDC數(shù)據(jù)入湖并落地為Hudi表是一個非常普遍的需求,迭代并完善包括DeltaStreamer、HoodieMultiTableDeltaStreamer和Kafka Connect Hudi Sink插件在內(nèi)的多種原生組件在社區(qū)的呼聲將會越來越強(qiáng)烈,相信伴隨著Hudi的蓬勃發(fā)展,這些組件將不斷成熟起來,并逐步應(yīng)用到生產(chǎn)環(huán)境中。
推薦閱讀
圖加速數(shù)據(jù)湖分析-GeaFlow和Apache Hudi集成
加速LakeHouse ACID Upsert的新寫時復(fù)制方案
如何不加鎖地將數(shù)據(jù)并發(fā)寫入Apache Hudi?
關(guān)于作者:耿立超,架構(gòu)師,著有 《大數(shù)據(jù)平臺架構(gòu)與原型實現(xiàn):數(shù)據(jù)中臺建設(shè)實戰(zhàn)》一書,多年IT系統(tǒng)開發(fā)和架構(gòu)經(jīng)驗,對大數(shù)據(jù)、企業(yè)級應(yīng)用架構(gòu)、SaaS、分布式存儲和領(lǐng)域驅(qū)動設(shè)計有豐富的實踐經(jīng)驗,個人技術(shù)博客:https://laurence.blog.csdn.net
