Kyuubi實(shí)踐 | 編譯 Spark3.1 以適配 CDH5 并集成 Kyuubi
1. Spark ThriftServer 在我們業(yè)務(wù)場(chǎng)景中的不足
2. 關(guān)于 Kyuubi
3. 編譯 Spark3.1.2 以適配 CDH5
4. 測(cè)試 Spark 是否正??捎?/p>
5. 安裝配置 kyuubi
5.1 創(chuàng)建用戶 kyuubi
5.2 配置 kyuubi 代理用戶的權(quán)限
5.3 創(chuàng)建用戶 kyuubi 的 principal 并導(dǎo)出 keytab
5.4 在 Sentry 中給 kyuubi 用戶賦予角色以及權(quán)限
5.5 配置 Kyuubi
6. Kyuubi 測(cè)試
6.1 Kyuubi 多租戶測(cè)試
6.2 Kyuubi CONNECTION 級(jí)別的引擎共享
7. 總結(jié)
1. Spark ThriftServer 在我們業(yè)務(wù)場(chǎng)景中的不足
STS 是 Spark 中自帶的一個(gè)組件,它可以在 yarn 上啟動(dòng)一個(gè)常駐的 Spark 應(yīng)用,然后對(duì)外提供 JDBC 服務(wù)。
STS 在我們內(nèi)部最典型的應(yīng)用場(chǎng)景是,每天大量的 Tableau Spark Thrift 數(shù)據(jù)源刷新任務(wù),需要定時(shí)執(zhí)行,并拉取數(shù)據(jù)到 Tableau Server 端。在較高的并發(fā)負(fù)載下或一次性拉取的數(shù)據(jù)量過多時(shí),就大概率會(huì)出現(xiàn) driver 端進(jìn)程卡死的情況(具體表現(xiàn)是,driver 端進(jìn)程老年代內(nèi)存占用 100%,GC 線程無法工作,分析 dump 文件,發(fā)現(xiàn)有內(nèi)存泄漏的現(xiàn)象發(fā)生)。
每當(dāng)遇到這樣的情況,只能選擇重啟 STS,這期間 Application 中運(yùn)行的 job 也一并會(huì)被殺死,導(dǎo)致數(shù)據(jù)源刷新任務(wù)頻繁重試,浪費(fèi)集群資源的同時(shí),也會(huì)造成報(bào)表數(shù)據(jù)刷新的 SLA 無法滿足業(yè)務(wù)方的要求。
基于此,我們開始了對(duì) kyuubi 的調(diào)研,并重點(diǎn)測(cè)試其高可用,以及 CONNECTION 級(jí)別的引擎共享能力。
2. 關(guān)于 Kyuubi
Kyuubi 是一個(gè)分布式多租戶 Thrift JDBC/ODBC 服務(wù),同樣構(gòu)建在 Apache Spark 之上,用于大規(guī)模數(shù)據(jù)管理、處理和分析,類似 STS,但功能比 STS 更加豐富和優(yōu)良。
其更詳細(xì)的文檔以及與 STS 的深入對(duì)比,可以參考其官方文檔:
https://blog.csdn.net/NetEaseResearch/article/details/115013920
https://kyuubi.apache.org/docs/stable/integrations/index.html
3. 編譯 Spark3.1.2 以適配 CDH5
我們團(tuán)隊(duì)內(nèi)部 Hadoop 集群的版本是:
hive 1.1.0-cdh5.13.1 hadoop 2.6.0-cdh-5.13.1
同時(shí),集群開啟了 Sentry 和 Kerberos。
從 Spark3.0.x 開始,官方要求編譯 Spark 時(shí)所依賴的 Hadoop 的最低版本是 2.7,默認(rèn)所支持的 hive 的版本也高于 1.1.0。基于此,本小節(jié)記錄了針對(duì)在 CDH5 的環(huán)境中,編譯以及測(cè)試 Spark3.1.2 的完整流程。
當(dāng)然,如果你沒有強(qiáng)迫癥,也可以選擇 spark3-hadoop-2.7 的安裝包來進(jìn)行測(cè)試,kyuubi 社區(qū)也有大佬做過這樣的測(cè)試。
下載 Spark3.1.2 的代碼并導(dǎo)入到 IDEA 中,修改 Spark 的父 pom 文件,加入aliyun、cloudera、maven center的倉庫地址,防止在國內(nèi)的網(wǎng)絡(luò)環(huán)境下,有些依賴的 jar 包下載不下來(原來的 repository 地址可以注釋掉)。
<repository>
<id>aliyun</id>
<name>Nexus Release Repository</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>cloudera</id>
<name>cloudera Repository</name>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>central</id>
<name>Maven Repository</name>
<url>https://repo.maven.apache.org/maven2</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
在 pom 中加入 cdh-hadoop 的 profile,然后在 IDEA 中先嘗試 build 下代碼,看是否有報(bào)錯(cuò)發(fā)生。
<profile>
<id>hadoop2.6.0-cdh5.13.1</id>
<properties>
<hadoop.version>hadoop2.6.0-cdh5.13.1</hadoop.version>
<curator.version>2.7.1</curator.version>
<commons-io.version>2.4</commons-io.version>
<javax.servlet-api.name>servlet-api</javax.servlet-api.name>
</properties>
</profile>
點(diǎn)擊綠色的錘子按鈕,嘗試 build 下代碼。

正常情況下,在 build 的執(zhí)行過程中,會(huì)有編譯型異常發(fā)生。
Spark3.1.x 適配hadoop2.6.0-cdh5.13.1需要修改代碼的地方有兩處:
一處是修改resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala,
參考:https://github.com/apache/spark/pull/16884/files
修改后的代碼示例:
sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern =>
// 原有代碼
/* try {
val logAggregationContext = Records.newRecord(classOf[LogAggregationContext])
logAggregationContext.setRolledLogsIncludePattern(includePattern)
sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern =>
logAggregationContext.setRolledLogsExcludePattern(excludePattern)
}
appContext.setLogAggregationContext(logAggregationContext)
} catch {
case NonFatal(e) =>
logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " +
"does not support it", e)
}*/
try {
val logAggregationContext = Records.newRecord(classOf[LogAggregationContext])
// These two methods were added in Hadoop 2.6.4, so we still need to use reflection to
// avoid compile error when building against Hadoop 2.6.0 ~ 2.6.3.
val setRolledLogsIncludePatternMethod =
logAggregationContext.getClass.getMethod("setRolledLogsIncludePattern", classOf[String])
setRolledLogsIncludePatternMethod.invoke(logAggregationContext, includePattern)
sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern =>
val setRolledLogsExcludePatternMethod =
logAggregationContext.getClass.getMethod("setRolledLogsExcludePattern", classOf[String])
setRolledLogsExcludePatternMethod.invoke(logAggregationContext, excludePattern)
}
appContext.setLogAggregationContext(logAggregationContext)
} catch {
case NonFatal(e) =>
logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " +
"does not support it", e)
}
}
第二處修改/spark-3.1.2/core/src/main/scala/org/apache/spark/util/Utils.scala:
def unpack(source: File, dest: File): Unit = {
// StringUtils 在hadoop2.6.0中引用不到,所以取消此import,然后修改為相似的功能
// val lowerSrc = StringUtils.toLowerCase(source.getName)
if (source.getName == null) {
throw new NullPointerException
}
val lowerSrc = source.getName.toLowerCase()
if (lowerSrc.endsWith(".jar")) {
RunJar.unJar(source, dest, RunJar.MATCH_ANY)
} else if (lowerSrc.endsWith(".zip")) {
FileUtil.unZip(source, dest)
} else if (
lowerSrc.endsWith(".tar.gz") || lowerSrc.endsWith(".tgz") || lowerSrc.endsWith(".tar")) {
FileUtil.unTar(source, dest)
} else {
logWarning(s"Cannot unpack $source, just copying it to $dest.")
copyRecursive(source, dest)
}
}
然后重新 build,修改完代碼之后,build 的過程中應(yīng)該不會(huì)再有報(bào)錯(cuò)發(fā)生。
hive 的版本就不要在 pom 中替換了,替換之后的編譯報(bào)錯(cuò)修改起來會(huì)很麻煩。所以 hive 就用默認(rèn)版本(2.3.7),因?yàn)閺?Spark1.4.0 開始,Spark SQL 的單個(gè)二進(jìn)制版本可用于查詢不同版本的 Hive 元存儲(chǔ)。具體內(nèi)容可以參考:
https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html#interacting-with-different-versions-of-hive-metastore
運(yùn)行打包命令前,請(qǐng)先修改下dev/make-distribution.sh,具體修改內(nèi)容如下:
# spark版本
VERSION=3.1.2
# scala版本
SCALA_VERSION=2.12
# hadoop版本
SPARK_HADOOP_VERSION=2.6.0-cdh5.13.1
# 開啟hive
SPARK_HIVE=1
# 原來的內(nèi)容注釋掉
#VERSION=$("$MVN" help:evaluate -Dexpression=project.version $@ \
# | grep -v "INFO"\
# | grep -v "WARNING"\
# | tail -n 1)
#SCALA_VERSION=$("$MVN" help:evaluate -Dexpression=scala.binary.version $@ \
# | grep -v "INFO"\
# | grep -v "WARNING"\
# | tail -n 1)
#SPARK_HADOOP_VERSION=$("$MVN" help:evaluate -Dexpression=hadoop.version $@ \
# | grep -v "INFO"\
# | grep -v "WARNING"\
# | tail -n 1)
#SPARK_HIVE=$("$MVN" help:evaluate -Dexpression=project.activeProfiles -pl sql/hive $@ \
# | grep -v "INFO"\
# | grep -v "WARNING"\
# | fgrep --count "<id>hive</id>";\
# # Reset exit status to 0, otherwise the script stops here if the last grep finds nothing\
# # because we use "set -o pipefail"
# echo -n)
在 Spark 項(xiàng)目的根路徑下運(yùn)行編譯打包的命令
./dev/make-distribution.sh --name 2.6.0-cdh5.13.1 --pip --tgz -Phive -Phive-thriftserver -Pmesos -Pyarn -Pkubernetes -Phadoop2.6.0-cdh5.13.1 -Dhadoop.version=2.6.0-cdh5.13.1 -Dscala.version=2.12.10
首次編譯時(shí)會(huì)拉取大量依賴,遇到網(wǎng)絡(luò)錯(cuò)誤可以多試幾次。編譯過程中會(huì)下載如 maven 安裝包、Scala 之類的軟件包到項(xiàng)目根路徑的 build 目錄下,在網(wǎng)絡(luò)慢的情況下,可以手動(dòng)下載這些軟件包,然后移動(dòng)到相應(yīng)的目錄之下也行。
Mac 下 R 環(huán)境有些問題,所以這里編譯時(shí)就沒帶上 sparkR 的支持,所以 sparkR 估計(jì)用不了,編譯完成之后最終的安裝包會(huì)輸出在 Spark 項(xiàng)目的根路徑下。
4. 測(cè)試 Spark 是否正??捎?/span>
編譯的工作還算順利,網(wǎng)上也有很多類似的文章可供參考,如:
https://www.codeleading.com/article/42014747657/
之后的工作便是測(cè)試 Spark 的可用性,這里我主要測(cè)試的是,跑通 spark-shell、spark-sql,正常提交作業(yè)到 yarn 集群,正常讀取到 hive 表中的數(shù)據(jù),至于其他更細(xì)致的測(cè)試,后續(xù)會(huì)有文章接著介紹。
Spark 無需分布式部署安裝,只需要準(zhǔn)備好一個(gè) Spark 的客戶端環(huán)境即可。
涉及到修改 Spark 配置的命令參考如下:
cd /opt/spark-3.1.2/conf
# 把hive hdfs 相關(guān)配置文件的軟連接構(gòu)建起來
ln -s /etc/hive/conf/hive-site.xml hive-site.xml
ln -s /etc/hive/conf/hdfs-site.xml hdfs-site.xml
ln -s /etc/hive/conf/core-site.xml core-site.xml
mv log4j.properties.template log4j.properties
mv spark-defaults.conf.template spark-defaults.conf
mv spark-env.sh.template spark-env.sh
vim spark-defaults.conf
# 自定義spark要加載的hive metastore的版本以及jar包的路徑,注意在spark3.1.x 與 spark3.0.x中此處配置的差異
spark.sql.hive.metastore.version=1.1.0
spark.sql.hive.metastore.jars=path
spark.sql.hive.metastore.jars.path=file:///opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/hive/lib/*
spark.sql.hive.metastore.version 不要寫1.1.0-cdh5.13.1,spark 中 metastore 的自定義配置請(qǐng)參考:
http://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html#specifying-storage-format-for-hive-tables
測(cè)試 spark-shell 以及 spark-sql 的命令參考:
export SPARK_HOPME=/opt/spark-3.1.2
# 本地跑spark-shell
sh /opt/spark-3.1.2/bin/spark-shell
# 以yarn-client的方式跑spark-shell
sh /opt/spark-3.1.2/bin/spark-shell --master yarn --deploy-mode client --executor-memory 1G --num-executors 2
# 以yarn-cluster的方式跑spark-sql
sh /opt/spark-3.1.2/bin/spark-sql --master yarn -deploy-mode cluster --executor-memory 1G --num-executors 2
# 跑一些spark代碼 操作hive表中的一些數(shù)據(jù),測(cè)試正常就OK
5. 安裝配置 kyuubi
kyuubi 的安裝包可以選擇官方制作好的,如:

也可以自己拉取代碼進(jìn)行編譯,這里我選擇自己編譯 kyuubi,使用的版本是最新發(fā)布的
v1.3.0-incubating
https://github.com/apache/incubator-kyuubi/releases/tag/v1.3.0-incubating
kyuubi 的編譯請(qǐng)參考:
https://kyuubi.apache.org/docs/r1.3.0-incubating/develop_tools/distribution.html
下載代碼到本地,導(dǎo)入 IDEA 中(或不導(dǎo)入),在項(xiàng)目根路徑下輸入:
./build/dist --tgz --spark-provided
# --spark-provided 我的spark環(huán)境已有,這里選擇不把spark一并打包
打包完成之后,上傳 tar 包至服務(wù)器,解壓縮,去掉配置文件的后綴.template

配置和運(yùn)行 kyuubi 之前,需要先做一些額外的操作。
5.1 創(chuàng)建用戶 kyuubi
創(chuàng)建一個(gè)獨(dú)立用戶 kyuubi 來起 kyuubi 服務(wù)
useradd kyuubi
5.2 配置 kyuubi 代理用戶的權(quán)限
kyuubi 需要代理其他用戶來啟動(dòng)引擎,所以需要為其設(shè)置代理用戶的權(quán)限。

5.3 創(chuàng)建用戶 kyuubi 的 principal 并導(dǎo)出 keytab
命令參考:
kadmin.local
addprinc -pw 123456 kyuubi/[email protected]
ktadd -k kyuubi.keytab -norandkey kyuubi/[email protected]
其中 principal 的組成有 kyuubi node2.bigdata.leo.com @LEO.COM 這三部分組成,如果設(shè)置為[email protected],運(yùn)行 kyuubi 時(shí)會(huì)報(bào)錯(cuò)。
5.4 在 Sentry 中給 kyuubi 用戶賦予角色以及權(quán)限
無 Sentry 可跳過授權(quán),但還是需要注意 kyuubi 用戶在 hdfs 以及 hive 中是否存在權(quán)限問題。
5.5 配置 Kyuubi
上述準(zhǔn)備工作完成之后,可以配置 kyuubi。參考命令以及參考的配置文件。
vim kyuubi-defaults.conf
# 啟動(dòng)的spark引擎以yarn-cluster模式跑
spark.master=yarn
spark.submit.deployMode=cluster
# spark引擎共享級(jí)別,user,即同一用戶共享一個(gè)引擎,
kyuubi.engine.share.level=USER
kyuubi.session.engine.idle.timeout=PT10H
# 啟用HA,指定ZK地址
kyuubi.ha.enabled true
kyuubi.ha.zookeeper.quorum node1:2181,node2:2181,node3:2181
kyuubi.ha.zookeeper.client.port 2181
# kerbero授權(quán)開啟
kyuubi.authentication=KERBEROS
kyuubi.kinit.principal=kyuubi/[email protected]
kyuubi.kinit.keytab=/home/kyuubi/kyuubi.keytab
# kyuubi環(huán)境變量配置,按需配置
vim kyuubi-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera
export SPARK_HOME=/opt/spark-3.1.2
關(guān)于 kyuubi 更詳細(xì)的配置參數(shù),請(qǐng)參考。
https://kyuubi.apache.org/docs/r1.3.0-incubating/deployment/settings.html#kyuubi-configurations
運(yùn)行 kyuubi
bin/kyuubi start
運(yùn)行 beeline 測(cè)試連接 kyuubi
beeline -u "jdbc:hive2://node2.bigdata.leo.com:10009/;principal=kyuubi/[email protected]"

查看日志,先查看 kyuubi 服務(wù)運(yùn)行過程中的輸出日志
/opt/kyuubi-1.3.0/logs/kyuubi-kyuubi-org.apache.kyuubi.server.KyuubiServer-node2.bigdata.leo.com.out

此日志中沒有上述異常發(fā)生的具體原因,繼續(xù)看日志,此日志為 kyuubi 啟動(dòng)的 Spark 的 engine 日志
/opt/kyuubi-1.3.0/work/kyuubi/kyuubi-spark-sql-engine.log.0

此日志中依舊沒有有用的信息,繼續(xù)看 spark application 的日志,由于沒有 Spark History 服務(wù),所以我這里用 yarn 命令來看日志:
yarn logs -applicationId application_1632672215527_0014

由于開啟了 HA,kyuubi 會(huì)在指定 ZK 上創(chuàng)建一些 znode path,由于我的 zookeeper 服務(wù)在 kerberos 環(huán)境中,因此可能會(huì)遇到 zk kerberos 認(rèn)證,或 zk node path 的 ACL 問題,在 zk shell 查看/kyuubi_USER 的 ACL,并嘗試是否可以創(chuàng)建 znode path。

經(jīng)過上述測(cè)試可知,kyuubi 用戶 kinit 拿到 kerberos 憑證之后,再去操作/kyuubi_USER 這個(gè) znode path 時(shí),并不會(huì)出現(xiàn) ACL 問題。
根據(jù)官方大佬的解釋以及大致看了下源碼之后得知,這里出現(xiàn) ZK ACL 的問題是因?yàn)?,kyuubi server 啟動(dòng),創(chuàng)建 zkClient 時(shí),會(huì)獲取 kerberos 認(rèn)證。但在 spark engine 的 driver 端創(chuàng)建的 zkClient,并沒有去獲取 Kerberos 認(rèn)證,所以才產(chǎn)生這個(gè)報(bào)錯(cuò)(分析不到位的地方還望與我溝通)。
臨時(shí)的解決辦法是取消 HA 配置或使用其他未開啟 kerberos 的 Zookeeper(待確認(rèn))。禁用 HA 使用 kyuubi 自帶的 ZK 服務(wù),需要修改 kyuubi-defaults.conf 為:
spark.master=yarn
spark.submit.deployMode=cluster
kyuubi.engine.share.level=USER
kyuubi.session.engine.idle.timeout=PT10H
kyuubi.ha.enabled false
kyuubi.zookeeper.embedded.client.port 2182
kyuubi.authentication=KERBEROS
kyuubi.kinit.principal=kyuubi/[email protected]
kyuubi.kinit.keytab=/home/kyuubi/kyuubi.keytab
重啟 kyuubi 之后,再用 beeline 連接,測(cè)試正常。

6. Kyuubi 測(cè)試
6.1 Kyuubi 多租戶測(cè)試
保證 kyuubi server 正常啟動(dòng),創(chuàng)建一個(gè)測(cè)試用戶 leo_jie,并為其創(chuàng)建 principal 以及導(dǎo)出對(duì)應(yīng)用戶的 keytab,分別在兩個(gè)客戶端上用 kyuubi 以及 leo_jie 來測(cè)試連接 kyuubi server。示例命令如下:
# 用戶leo_jie的操作
kinit -kt leo_jie.keytab [email protected]
beeline -u "jdbc:hive2://node2.bigdata.leo.com:10009/;principal=kyuubi/[email protected]"
# 用戶kyuubi的操作
kinit -kt kyuubi.keytab kyuubi/[email protected]
beeline -u "jdbc:hive2://node2.bigdata.leo.com:10009/;principal=kyuubi/[email protected]"
在 yarn ui 上可以看到起了兩個(gè) application,一個(gè)所屬用戶是 leo_jie,一個(gè)是 kyuubi。如果是 STS,則需要連續(xù)手動(dòng)啟動(dòng)兩個(gè) STS 進(jìn)程。

Spark Engine 空閑一段時(shí)間之后就會(huì)被釋放,這個(gè)空閑時(shí)間可由參數(shù)kyuubi.session.engine.idle.timeout來控制。
6.2 Kyuubi CONNECTION 級(jí)別的引擎共享
從資源共享的角度,Engine 支持用戶端配置不同的共享級(jí)別,如果設(shè)置為 CONNECTION 級(jí)別共享,用戶的一次 JDBC 連接就會(huì)初始化一個(gè) Engine(例如:起多個(gè) beeline 客戶端,就會(huì)起多個(gè) Spark Engine,對(duì)應(yīng)多個(gè) Spark Application),在這個(gè)連接內(nèi),用戶可以執(zhí)行多個(gè) Statement 直至連接關(guān)閉。
如果設(shè)置為 USER 級(jí)別共享,用戶的多次 JDBC 都會(huì)復(fù)用這個(gè) Engine(例如:起多個(gè) beeline 客戶端,只會(huì)起一個(gè) Spark Engine,對(duì)應(yīng)一個(gè) Spark Application),在高可用的模式中,無論用戶的連接打到哪個(gè) Kyuubi Server 實(shí)例上,這個(gè) Engine 都能實(shí)現(xiàn)共享。
配置 conf/kyuubi-defaults.conf 以支持 CONNECTION 級(jí)別的引擎共享
kyuubi.engine.share.level=CONNECTION
# kyuubi.engine.share.level=USER
# kyuubi.session.engine.idle.timeout=PT10H
在多個(gè)客戶端中起多個(gè) beeline 客戶端,yarn-ui 上可以看到每一個(gè)客戶端對(duì)應(yīng)著一個(gè) Application,客戶端連接斷開之后,對(duì)應(yīng)的 Spark Application 也會(huì)被釋放掉。

7. 總結(jié)
上述文章梳理并記錄了 Spark3.1.2 適配 CDH5 Hadoop 的編譯和功能測(cè)試過程,并初步體驗(yàn)了 kyuubi,簡(jiǎn)單測(cè)試了 kyuubi 的多租戶隔離特性,以及不同級(jí)別的 Engine 共享能力。
尤其是 Kyuubi 的 CONNECTION 級(jí)別的引擎共享機(jī)制,可以有效地解決 STS 被某些 SQL 執(zhí)行時(shí)卡死的情況,每一個(gè) tableau 數(shù)據(jù)源刷新任務(wù)觸發(fā)時(shí)所建立的對(duì) Kyuubi Server 的 ODBC 連接,都能對(duì)應(yīng)一個(gè)獨(dú)立的 Driver,就算此 Driver 被打死,也不會(huì)影響其他 SQL 的正常執(zhí)行。
之后會(huì)繼續(xù)為大家介紹我們?cè)谏a(chǎn)環(huán)境中對(duì) Kyuubi 的使用經(jīng)驗(yàn)。
