Kyuubi 實踐 | Apache Kyuubi on Spark 在CDH上的深度實踐

Kyuubi 是網(wǎng)易有數(shù)的大數(shù)據(jù)開源項目,于2021年6月全票通過進入世界頂級開源基金會 Apache Software Foundation 孵化器。
Kyuubi 的命名源自中國神話《山海經(jīng)》,意為“九尾狐”。狐會噴火,象征Spark;狐有九尾,類比多租戶,在Spark上實現(xiàn)多租戶是系統(tǒng)設計之初的主要目的。然后取了動漫《火影忍者》中角色九尾的羅馬音['kju:bi:],作為項目名稱。
Kyuubi 的目標是讓“大數(shù)據(jù)平民化”。為實現(xiàn)這個目標,我們遵循“專業(yè)人做專業(yè)事”的準則,通過 Kyuubi的 C/S 架構,服務端大數(shù)據(jù)專家可以將 Spark 等大數(shù)據(jù)算力極致優(yōu)化并高度封裝后提供出來,業(yè)務端可通過該算力直接在自己擅長的業(yè)務領域處理數(shù)據(jù)產(chǎn)生價值,兩者之間通過簡單的接口進行必要且有效的直接交互。
Kyuubi 使用場景:
替換 HiveServer2,輕松獲得 10~100 倍性能提升
構建 Serverless Spark 平臺
構建統(tǒng)一數(shù)據(jù)湖探索分析管理平臺
CDH 最后一個免費版 6.3.2 發(fā)布一年有余,離線計算核心組件版本停在了 Hadoop 3.0.0,Hive 2.1.1,Spark 2.4.0。
隨著 Spark 3.0 的重磅發(fā)布,在性能方面又迎來了一次飛躍,本文將描述把 Spark 3 集成到 CDH 6.3.1(未開啟 Kerberos) 的過程,并使用 Kyuubi 替換 HiveServer2,實現(xiàn) OLAP、ETL 等場景下從 HiveQL 到 SparkSQL 的無縫遷移,享受 10x-100x 的性能紅利。
CDH 缺陷修復
[ORC-125] 修復 Hive 不能讀取高版本 ORC 寫入的數(shù)據(jù)
當使用 Hive 讀取由 Presto 或者 Spark 等寫入的 ORC 文件時,會出現(xiàn)以下錯誤。
ORC split generation failed with exception: java.lang.ArrayIndexOutOfBoundsException: 6該問題在 ORC 上游被修復 [ORC-125] Correct OrcFile.WriterVersion to correctly use FUTURE。
ORC 最早是 Hive 的一個子項目,在 CDH 6 集成的 Hive 2.1 這個版本里,ORC 還沒有分離出去,所以這個問題要在 Hive 源碼里修復。
我做了一個打包好的修復版本,GitHub 傳送門 ①,下載更換 /opt/cloudera/parcels/CDH/lib/hive/lib 路徑下的 hive-exec-2.1.1-cdh6.3.1.jar, hive-orc-2.1.1-cdh6.3.1.jar即可。(至少需要更換 Hadoop Client、HiveServer2 節(jié)點,如果你不知道我在說什么,就把所有節(jié)點都換掉)
Spark 3.1
[SPARK-33212] Spark 使用 Hadoop Shaded Client
Hadoop 3.0 提供了 Shaded Client,用于下游項目規(guī)避依賴沖突 [HADOOP-11656] Classpath isolation for downstream clients。
Spark 3.2 在 hadoop-3.2 profile 中切換到了 Hadoop Shaded Client [SPARK-33212] Upgrade to Hadoop 3.2.2 and move to shaded clients for Hadoop 3.x profile。
該更改不是必須的,但個人建議從 Spark 主線將該補丁移植到 branch-3.1 使用,以規(guī)避潛在的依賴沖突。
[CDH-71907] Spark HiveShim 適配 CDH Hive
Spark 通過反射和隔離的類加載器來實現(xiàn)對多版本 Hive Metastore 的支持,詳情參考 Interacting with Different Versions of Hive Metastore - Spark Documentation。
CDH 6 使用修改過的 Hive 2.1.1 版本,其方法簽名與 Apache 版本有所不同,故 Spark HiveShim 反射調(diào)用會出現(xiàn)找不到方法簽名,需要手動將 CDH-71907 補丁打到 branch-3.1。
Spark External Shuffle Service 協(xié)議兼容
Spark shuffle 時,mapper 會將數(shù)據(jù)寫入到本地磁盤而非 HDFS,引入 ESS 后,會將文件信息注冊到 ESS 中,將 mapper 與 reducer 解耦。CDH 中,默認會啟用 Spark Yarn External Shuffle Service,作為 YARN AUX Service 在所有 Yarn Node 上啟動。
Spark 3 修改了 shuffle 通信協(xié)議,在與 CDH 2.4 版本的 ESS 交互時,需要設置 spark.shuffle.useOldFetchProtocol=true,否則可能報如下錯誤。[SPARK-29435] Spark 3 doesn't work with older shuffle service
IllegalArgumentException: Unexpected message type: <number>.Spark 版本遷移指南
如果你有現(xiàn)存的基于 CDH Spark 2.4 的 Spark SQL/Job,在將其遷移至 Spark 3 版本前,請參閱完整的官方遷移指南。
Migration Guide: Spark Core - Spark Documentation ②
Migration Guide: SQL, Datasets and DataFrame - Spark Documentation ③
Spark 部署
官方文檔 Running Spark on YARN - Documentation 中提到
To make Spark runtime jars accessible from YARN side, you can specify spark.yarn.archive or spark.yarn.jars. For details please refer to Spark Properties. If neither spark.yarn.archive nor spark.yarn.jars is specified, Spark will create a zip file with all jars under $SPARK_HOME
https://spark.apache.org/docs/latest/running-on-yarn.html#preparations
因此,無需在 CDH 所有節(jié)點上部署 Spark 3,只需在 Hadoop Client 節(jié)點上部署 Spark 3 即可。
如果你對集群權限管理沒有十分嚴格的要求,請使用 hive 用戶以避免權限問題。
我基于 Spark 3.1.2 制作了一個適配 CDH 6 的版本, GitHub 傳送門 ,下載解壓至 /opt,并軟鏈至 /opt/spark3。
[hive@cdh-kyuubi]$ ls -l /opt | grep sparklrwxrwxrwx 1 root root 39 Aug 10 18:46 spark3 -> /opt/spark-3.1.2-cdh6-bin-3.2.2drwxr-xr-x 13 hive hive 4096 Aug 10 18:46 spark-3.1.2-cdh6-bin-3.2.2
配置 Hadoop、Hive
CDH 會將配置文件自動分發(fā)到所有節(jié)點 /etc 目錄下,建立軟鏈即可。
ln -s /etc/hadoop/conf/core-site.xml /opt/spark3/conf/ln -s /etc/hadoop/conf/hdfs-site.xml /opt/spark3/conf/ln -s /etc/hadoop/conf/yarn-site.xml /opt/spark3/conf/ln -s /etc/hive/conf/hive-site.xml /opt/spark3/conf/
配置 Spark 環(huán)境變量 /opt/spark3/conf/spark-env.sh
#!/usr/bin/env bashexport HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/confexport YARN_CONF_DIR=/etc/hadoop/conf.cloudera.yarn:/etc/hive/conf
配置 Spark 默認參數(shù) /opt/spark3/conf/spark-defaults.conf
請參考 Configuration - Spark Documentation 根據(jù)集群環(huán)境實際情況進行微調(diào)
spark.authenticate=falsespark.io.encryption.enabled=falsespark.network.crypto.enabled=falsespark.eventLog.enabled=truespark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistoryspark.driver.log.dfsDir=/user/spark/driverLogsspark.driver.log.persistToDfs.enabled=truespark.files.overwrite=truespark.files.useFetchCache=falsespark.serializer=org.apache.spark.serializer.KryoSerializerspark.shuffle.service.enabled=truespark.shuffle.service.port=7337spark.shuffle.useOldFetchProtocol=truespark.ui.enabled=truespark.ui.killEnabled=truespark.yarn.historyServer.address=http://cdh-master2:18088spark.yarn.historyServer.allowTracking=truespark.master=yarnspark.submit.deployMode=clusterspark.driver.memory=2Gspark.executor.cores=6spark.executor.memory=8Gspark.executor.memoryOverhead=2Gspark.memory.offHeap.enabled=truespark.memory.offHeap.size=2Gspark.dynamicAllocation.enabled=truespark.dynamicAllocation.executorIdleTimeout=60spark.dynamicAllocation.minExecutors=0spark.dynamicAllocation.schedulerBacklogTimeout=1spark.sql.cbo.enabled=truespark.sql.cbo.starSchemaDetection=truespark.sql.datetime.java8API.enabled=falsespark.sql.sources.partitionOverwriteMode=dynamicspark.sql.hive.convertMetastoreParquet=falsespark.sql.hive.convertMetastoreParquet.mergeSchema=falsespark.sql.hive.metastore.version=2.1.1spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH/lib/hive/lib/*spark.sql.orc.mergeSchema=truespark.sql.parquet.mergeSchema=truespark.sql.parquet.writeLegacyFormat=truespark.sql.adaptive.enabled=truespark.sql.adaptive.forceApply=falsespark.sql.adaptive.logLevel=infospark.sql.adaptive.advisoryPartitionSizeInBytes=256mspark.sql.adaptive.coalescePartitions.enabled=truespark.sql.adaptive.coalescePartitions.minPartitionNum=1spark.sql.adaptive.coalescePartitions.initialPartitionNum=1024spark.sql.adaptive.fetchShuffleBlocksInBatch=truespark.sql.adaptive.localShuffleReader.enabled=truespark.sql.adaptive.skewJoin.enabled=truespark.sql.adaptive.skewJoin.skewedPartitionFactor=5spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=128mspark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0.2spark.sql.autoBroadcastJoinThreshold=-1
驗證 spark-shell 工作正常
[hive@cdh-kyuubi]$ /opt/spark3/bin/spark-shellSetting default log level to "WARN".To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).21/08/30 19:53:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable21/08/30 19:53:46 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.Spark context Web UI available at http://cdh-master1:4040Spark context available as 'sc' (master = yarn, app id = application_1615462037335_40099).Spark session available as 'spark'.Welcome to____ __/ __/__ ___ _____/ /___\ \/ _ \/ _ `/ __/ '_//___/ .__/\_,_/_/ /_/\_\ version 3.1.2-cdh6/_/Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_221)Type in expressions to have them evaluated.Type :help for more information.scala> spark.sql("show databases").show+----------------+| namespace|+----------------+| test|+----------------+scala>
至此,Spark 3 on CDH 6 已經(jīng)部署完成,可以像 CDH 自帶的 Spark 2.4 一樣,正常的使用 spark-submit、spark-shell,但依舊不支持 spark-sql,spark-thriftserver。
相比略顯雞肋的 spark-sql,spark-thriftserver,Kyuubi 是更好的選擇,因此我故意移除了這兩個功能。
注意:我提供的構建版,沒有啟用 spark-thriftserver 模塊,為正常使用 Kyuubi,請下載hive-service-rpc-3.1.2.jar 添加到 /opt/spark3/jars 路徑。
Kyuubi —— 解鎖 Spark SQL 更多場景
簡言之,Apache Kyuubi (Incubating) 之于 Spark,類似 HiveServer2 之于 Hive。
Kyuubi 通過將 Spark 暴露一個與 HiveServer2 完全兼容的 Thrift API,可以兼容現(xiàn)有的 Hive 生態(tài),如 beeline,hive-jdbc-driver,HUE,Superset 等。
可以通過以下文檔來了解 Kyuubi 的架構,Kyuubi 與 HiveServer2 和 Spark Thrift Server 的異同。
Welcome to Kyuubi’s documentation ④
Kyuubi Architecture — Kyuubi documentation ⑤
Kyuubi v.s. HiveServer2 — Kyuubi documentation ⑥
Kyuubi v.s. Spark Thrift JDBC/ODBC Server (STS) — Kyuubi documentation ⑦
Kyuubi 部署
Kyuubi 無需任何修改即可適配 CDH 6,下面給出關鍵步驟,詳情可以參考 Deploy Kyuubi engines on Yarn — Kyuubi documentation ⑧
同樣的,如果你對集群權限管理沒有十分嚴格的要求,請使用 hive 用戶以避免權限問題。
解壓部署
下載 kyuubi-1.3.0-incubating-bin.tgz 解壓至 /opt,并創(chuàng)建軟鏈到 /opt/kyuubi。
[hive@cdh-external opt]$ ls -l /opt | grep kyuubilrwxrwxrwx 1 root root 32 Aug 18 17:36 kyuubi -> /opt/kyuubi-1.3.0-incubating-bindrwxrwxr-x 13 hive hive 4096 Aug 18 17:52 kyuubi-1.3.0-incubating-bin
修改配置
按需修改/opt/kyuubi/conf/kyuubi-env.sh
export JAVA_HOME=/usr/java/defaultexport SPARK_HOME=/opt/spark3export SPARK_CONF_DIR=${SPARK_HOME}/confexport HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/confexport KYUUBI_PID_DIR=/data/log/service/kyuubi/pidexport KYUUBI_LOG_DIR=/data/log/service/kyuubi/logsexport KYUUBI_WORK_DIR_ROOT=/data/log/service/kyuubi/workexport KYUUBI_MAX_LOG_FILES=10
參考Kyuubi Configurations System — Kyuubi documentation ⑨
按需修改/opt/kyuubi/conf/kyuubi-defaults.conf
kyuubi.authentication=NONEkyuubi.engine.share.level=USERkyuubi.frontend.bind.host=0.0.0.0kyuubi.frontend.bind.port=10009kyuubi.ha.zookeeper.quorum=cdh-master1:2181,cdh-master2:2181,cdh-master3:2181kyuubi.ha.zookeeper.namespace=kyuubikyuubi.session.engine.idle.timeout=PT10Hspark.master=yarnspark.submit.deployMode=clusterspark.dynamicAllocation.enabled=truespark.dynamicAllocation.minExecutors=0spark.dynamicAllocation.maxExecutors=20spark.dynamicAllocation.executorIdleTimeout=60
注意:kyuubi-defaults.conf 中的 spark 配置優(yōu)先級高于 spark-defaults.conf
性能調(diào)優(yōu)
How To Use Spark Dynamic Resource Allocation (DRA) in Kyuubi — Kyuubi documentation ⑩
How To Use Spark Adaptive Query Execution (AQE) in Kyuubi — Kyuubi documentation ?
啟動
使用/opt/kyuubi/bin/kyuubi 在前臺或后臺啟動 Kyuubi Server。
[hive@cdh-kyuubi]$ /opt/kyuubi/bin/kyuubi --helpUsage: bin/kyuubi commandcommands:start - Run a Kyuubi server as a daemonrun - Run a Kyuubi server in the foregroundstop - Stop the Kyuubi daemonstatus - Show status of the Kyuubi daemon-h | --help - Show this help message
Beeline 連接
使用默認參數(shù)連接 Kyuubi
beeline -u jdbc:hive2://cdh-kyuubi:10009 -n bigdata使用自定義參數(shù)連接 Kyuubi
beeline -u "jdbc:hive2://cdh-master2:10009/;?spark.driver.memory=8G#spark.app.name=batch_001;kyuubi.engine.share.level=CONNECTION" -n batch詳細配置參考 Access Kyuubi with Hive JDBC and ODBC Drivers — Kyuubi documentation ?
HUE 連接
簡單說,在 Cloudera Manager 中修改 HUE 配置項 Hue Service Advanced Configuration Snippet 如下,即可在 HUE 中開啟 Spark SQL 引擎。
[desktop]app_blacklist=zookeeper,hbase,impala,search,sqoop,securityuse_new_editor=true[[interpreters]][[[sparksql]]]name=Spark SQLinterface=hiveserver2[[[hive]]]name=Hiveinterface=hiveserver2# other interpreters...[spark]sql_server_host=kyuubisql_server_port=10009
詳細配置參考 Getting Started with Kyuubi and Cloudera Hue — Kyuubi documentation ?
Kyuubi engine 共享級別與應用場景
Kyuubi Spark engine 即一個 Spark driver,通過控制 engine 的共享策略,可以在隔離性和資源利用率上取得平衡。Kyuubi 提供 3 種 engine 共享級別,分別為 CONNECTION,USER(默認),SERVER。
下面的討論均假設使用 YARN Cluster 模式啟動 Kyuubi Spark engine。
我們首先補充一些時間開銷和 Spark 操作執(zhí)行的信息。
Kyuubi Spark engine 在冷啟動時,會由 Kyuubi Server 通過spark-submit 命令向 YARN 提交一個 Spark App,即 engine,該過程從提交到 Spark driver 啟動約需要 5-6s,然后 engine 將自己注冊到 Zookeeper,Kyuubi Server 監(jiān)聽 Zookeeper 發(fā)現(xiàn) engine 并建立連接,該過程約 1-2s,如此算來,在 YARN 資源空閑時,整個 engine 冷啟動時間約 6-8s;Client 連接到存在的 engine,通常耗時 1s 內(nèi)。
如果啟動了 Spark 的 executor 動態(tài)伸縮特性,真正執(zhí)行 SQL 任務時,如果資源有富余,會動態(tài)創(chuàng)建 executor,每個 executor 創(chuàng)建耗時約為 2-3s。
元數(shù)據(jù)、DDL 等操作,如獲取 database 列表,CREATE TABLE語句等,會在 driver 上執(zhí)行;計算任務如 JOIN、COUNT() 等,會由 driver 生成執(zhí)行計劃,在 executor 上執(zhí)行。
在我們的生產(chǎn)環(huán)境中,大概有如下三種使用場景:
(1)使用 HUE 進行 ad-hoc 查詢
該場景中,會有多個用戶進行查詢,一般會運行相對較大的查詢?nèi)蝿?,用戶對連接創(chuàng)建時間以及元數(shù)據(jù)加載時間較為敏感,但對查詢結果響應時間有一定的容忍性。在這種場景中,使用默認的 USER 共享級別,每個用戶只使用一個 Spark engine,配合使用 Spark 的動態(tài)伸縮特性,動態(tài)的創(chuàng)建和銷毀 executor,在保證用戶之間隔離的基礎上,降低啟動時間和資源占用。建議的關鍵配置如下:
kyuubi.engine.share.level=USERkyuubi.session.engine.idle.timeout=PT1Hspark.dynamicAllocation.enabled=truespark.dynamicAllocation.minExecutors=0spark.dynamicAllocation.maxExecutors=30spark.dynamicAllocation.executorIdleTimeout=120
(2)使用 Beeline 運行批任務
SQL 該場景會使用統(tǒng)一的 batch 賬號提交 SQL 任務,對響應時間敏感度低,但對穩(wěn)定性要求非常要,并且應盡可能的最大化利用集群資源。推薦在該場景下將共享級別調(diào)整為 CONNECTION,這樣每個 SQL 執(zhí)行將會使用獨立的 Spark driver,并且 SQL 執(zhí)行完畢后 Spark driver 立即退出,保障離線任務互不干擾,并且資源及時釋放。建議的關鍵配置如下:
kyuubi.engine.share.level=CONNECTIONspark.dynamicAllocation.enabled=true# 根據(jù)任務具體資源消耗估算,從 workflow 整體上提升集群資源利用率spark.dynamicAllocation.minExecutors=5spark.dynamicAllocation.maxExecutors=30
(3)使用 Superset 進行多數(shù)據(jù)源聯(lián)邦查詢
該場景中,只有一個 service 賬號,會定時同時刷新大量的圖表,對連接創(chuàng)建時間、查詢結果響應時間、并發(fā)度都有較高的要求。該場景中,driver 和 executor 的啟動時間都是不容忽視的,因此在 ad-hoc 查詢配置的基礎上,應延長 driver、executor 的閑置等待時間,并且設定最小的 executor 保活數(shù)量,保證時刻有 executor 常駐,能快速響應較小的查詢。建議的關鍵配置如下:
kyuubi.engine.share.level=USERkyuubi.session.engine.idle.timeout=PT10Hspark.dynamicAllocation.enabled=truespark.dynamicAllocation.minExecutors=6spark.dynamicAllocation.maxExecutors=10spark.dynamicAllocation.executorIdleTimeout=120
我們可以啟動三個 Kyuubi Server(單機或集群)來分別應對如上的三種場景,但也可以僅啟動一個 Kyuubi Server(單機或集群)來滿足不同的場景。
一種建議的實踐方式是 Kyuubi Server 配置使用默認的 USER 共享級別,這樣客戶端連接時,會使用默認配置;當 ETL 跑批場景時,可以通過 beeline 參數(shù)將本次連接共享級別調(diào)整為 CONNECTION;類似的,在 Superset 連接中配置獨立參數(shù)。
結語
現(xiàn)在去跑一些 SQL,體驗 Spark SQL 帶來的性能飛躍吧!
參考鏈接:
①https://github.com/pan3793/cdh-hive/releases/tag/v2.1.1-cdh6.3.1-fix
②https://spark.apache.org/docs/latest/core-migration-guide.html
③https://spark.apache.org/docs/latest/sql-migration-guide.html
④https://kyuubi.readthedocs.io/en/latest/index.html
⑤https://kyuubi.apache.org/docs/latest/overview/architecture.html
⑥https://kyuubi.apache.org/docs/latest/overview/kyuubi_vs_hive.html
⑦https://kyuubi.apache.org/docs/latest/overview/kyuubi_vs_thriftserver.html
⑧https://kyuubi.apache.org/docs/latest/deployment/on_yarn.html
⑨https://kyuubi.apache.org/docs/latest/deployment/settings.html
⑩https://kyuubi.apache.org/docs/latest/deployment/spark/dynamic_allocation.html
?https://kyuubi.apache.org/docs/latest/deployment/spark/aqe.html
?https://kyuubi.apache.org/docs/latest/client/hive_jdbc.html
?https://kyuubi.apache.org/docs/latest/quick_start/quick_start_with_hue.html#for-cdh-6-x-users
潘成,來自 Apache Kyuubi 社區(qū)
贈書福利

