Kyuubi 實(shí)踐 | 放棄 Spark Thrift Server 吧,你需要的是 Apache Kyuubi!
飽受詬病的Spark Thrift Server
關(guān)于Kyuubi
編譯Kyuubi For Spark 3.1 & Hadoop 3.2
下載Kyuubi源碼包
安裝scala編譯環(huán)境
編譯
構(gòu)建發(fā)行版
在YARN上部署Kyuubi引擎
上傳解壓Kyuubi安裝包
創(chuàng)建快捷方式
測試Hadoop/Spark環(huán)境
配置Kyuubi環(huán)境
配置環(huán)境變量
配置Spark參數(shù)
啟動Kyuubi
解決端口沖突問題
配置Kyuubi HA
重新啟動
測試Kyuubi
配置spark用戶代理權(quán)限
重新使用beeline測試
飽受詬病的Spark Thrift Server
Spark用戶大都知道有個(gè)組件叫Spark Thrift Server,它可以讓Spark應(yīng)用啟動在YARN上,并對外提供JDBC服務(wù)。
如果有一些數(shù)據(jù)服務(wù)、或者BI查詢,
使用Thrift Server是比較快的。
但實(shí)際我們在生產(chǎn)上幾乎沒法用Thrift Server做一些重要的應(yīng)用。
因?yàn)樗⒉豢煽?,在較高的并發(fā)負(fù)載下,容易會出現(xiàn)莫名的卡死、泄漏,
而且也沒法實(shí)現(xiàn)用戶資源的隔離,
支持的數(shù)據(jù)源也有限,
之前我們也測試過,踩了不少坑。
所以迫于無奈,
我們選擇了基于Livy來做一些即席查詢的工作。
但Livy的限制非常明顯,它是以HTTP REST方式來提交要執(zhí)行的代碼。
通過Livy,我們可以實(shí)現(xiàn)用戶之間的資源隔離。
但每次在查詢時(shí),使用Livy體驗(yàn)并不是很好。
每次申請資源,都需要等待一段時(shí)間來啟動Spark Driver,并申請資源。
而且,沒法對外提供Thrift或者JDBC服務(wù)。
針對同一用戶也無法實(shí)現(xiàn)資源的共用,
一個(gè)用戶可能會創(chuàng)建很多的應(yīng)用。
而今天給大家?guī)淼氖茿pache Kyuubi。

關(guān)于Kyuubi
Kyuubi 是一個(gè)分布式多租戶 Thrift JDBC/ODBC 服務(wù)器,用于大規(guī)模數(shù)據(jù)管理、處理和分析,構(gòu)建在 Apache Spark 之上。
這一句話就把Kyuubi介紹清楚了。
注意關(guān)鍵字:基于Spark、多租戶、Thrift JDBC/ODBC服務(wù)器。

大家可以看到,Kyuubi支持的存儲、客戶端工具是比較豐富的。尤其是對數(shù)據(jù)湖組件支持比較好。贊贊贊!
其他介紹大家可以去看下官網(wǎng):https://kyuubi.apache.org/docs/latest/index.html。
這里,不多說了。直接開始作業(yè)吧。
編譯Kyuubi For Spark 3.1 & Hadoop 3.2
下載Kyuubi源碼包
https://github.com/apache/incubator-kyuubi/archive/refs/tags/v1.3.0-incubating.zip
安裝scala編譯環(huán)境
因?yàn)镵yuubi是用scala。如果沒有scala 2.12.14就會自動下載。這個(gè)過程會比較慢,建議還是提前下載scala-2.12.14,然后放入到kyuubi的build目錄。scala 2.12.14下載地址為:https://downloads.lightbend.com/scala/2.12.14/scala-2.12.14.tgz
編譯
./build/mvn clean package -DskipTests -P mirror-cn -P spark-3.1 -P spark-hadoop-3.2
大家可以查看下pom.xml中的Profile來編譯指定組件版本對應(yīng)的Kyuubi。
期間,Maven會下載一些依賴的包。成功編譯會提示如下:
[INFO] Kyuubi Project Parent .............................. SUCCESS [ 41.108 s]
[INFO] Kyuubi Project Common .............................. SUCCESS [06:22 min]
[INFO] Kyuubi Project Embedded Zookeeper .................. SUCCESS [ 7.426 s]
[INFO] Kyuubi Project High Availability ................... SUCCESS [ 10.869 s]
[INFO] Kyuubi Project Control ............................. SUCCESS [ 13.458 s]
[INFO] Kyuubi Project Metrics ............................. SUCCESS [ 16.678 s]
[INFO] Kyuubi Project Download Externals .................. SUCCESS [ 26.663 s]
[INFO] Kyuubi Project Spark Monitor ....................... SUCCESS [ 11.792 s]
[INFO] Kyuubi Project Engine Spark SQL .................... SUCCESS [ 30.331 s]
[INFO] Kyuubi Project Server .............................. SUCCESS [ 23.617 s]
[INFO] Kyuubi Project Dev Code Coverage ................... SUCCESS [ 0.228 s]
[INFO] Kyuubi Project Assembly ............................ SUCCESS [ 0.334 s]
[INFO] Kyuubi Project Hive JDBC Client .................... SUCCESS [ 7.173 s]
構(gòu)建發(fā)行版
./build/dist --tgz --spark-provided
構(gòu)建完后,會在目錄下生成一個(gè)tgz包。
[root@compile incubator-kyuubi-1.3.0-incubating]# ll -l | grep kyuubi
-rw-r--r--. 1 root root 60369276 9月 24 11:57 apache-kyuubi-1.3.0-incubating-bin.tgz
在YARN上部署Kyuubi引擎
部署的前提是我們提前整合好 spark on yarn。
此處省略,因?yàn)槲覀兊腟park已經(jīng)整合好了Hive、以及Hudi,并做好了基本的配置。
大家如果不熟悉這一塊,可以自行參考Spark官網(wǎng)來配置整合。
上傳解壓Kyuubi安裝包
tar -xvzf apache-kyuubi-1.3.0-incubating-bin.tgz -C /opt/
創(chuàng)建快捷方式
n -s /opt/apache-kyuubi-1.3.0-incubating-bin/ /opt/kyuubi
測試Hadoop/Spark環(huán)境
spark-submit \
--master yarn \
--queue root.p1 \
--class org.apache.spark.examples.SparkPi \
/opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar \
10
如果程序能夠成功輸出以下:
Pi is roughly 3.1420391420391423
表示環(huán)境測試成功,可以開始正常部署Kyuubi。
配置Kyuubi環(huán)境
kyuubi-env.sh
cd /opt/kyuubi/conf
cp kyuubi-env.sh.template kyuubi-env.sh
vim kyuubi-env.sh
export JAVA_HOME=/opt/jdk1.8.0_181/
export HADOOP_CONF_DIR=/opt/hadoop/etc/hadoop
export KYUUBI_JAVA_OPTS="-Xmx10g -XX:+UnlockDiagnosticVMOptions -XX:ParGCCardsPerStrideChunk=4096 -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSConcurrentMTEnabled -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -XX:+CMSClassUnloadingEnabled -XX:+CMSParallelRemarkEnabled -XX:+UseCondCardMark -XX:MaxDirectMemorySize=1024m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=./logs -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Xloggc:./logs/kyuubi-server-gc-%t.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=5M -XX:NewRatio=3 -XX:MetaspaceSize=512m"
下面的這個(gè)是JVM的配置。我注意到了它配置了堆內(nèi)存最大為10G、Metaspace是512M、使用的是CMS GC、直接內(nèi)存是1GB。這個(gè)配置比較符合我們的測試集群。大家可以按照自己的測試環(huán)境來調(diào)整。
kyuubi-defaults.conf
cp kyuubi-defaults.conf.template kyuubi-defaults.conf
vim kyuubi-defaults.conf
kyuubi.frontend.bind.host hadoop1
kyuubi.frontend.bind.port 10009
配置環(huán)境變量
vim ~/.bashrc
export KYUUBI_HOME=/opt/kyuubi
export PATH=$KYUUBI_HOME/bin:$PATH
source ~/.bashrc
配置Spark參數(shù)
Kyuubi應(yīng)用其實(shí)就是啟動在YARN上的Spark應(yīng)用,它也需要使用spark-submit提交應(yīng)用到Y(jié)ARN。
而提交Spark應(yīng)用的參數(shù),我們可以配置對應(yīng)的默認(rèn)參數(shù)。
有以下幾種配置方式:
使用JDBC URL配置
jdbc:hive2://localhost:10009/;#spark.master=yarn;spark.yarn.queue=thequeue在kyuubi-default.conf中配置
在spark-defaults.conf中配置
Kyuubi推薦我們使用Spark的動態(tài)資源分配方式,避免資源一直占用。
大家可以看下我之前寫的Spark動態(tài)資源分配文章。
優(yōu)化點(diǎn):可以配置spark.yarn.archive或者spark.yarn.jars指向HDFS上的地址,這樣不需要在啟動應(yīng)用時(shí)每次都分發(fā)JAR包。
啟動Kyuubi
# 啟動
bin/kyuubi start
# 關(guān)閉
bin/kyuubi stop
解決端口沖突問題
執(zhí)行start啟動后,報(bào)錯(cuò)如下:
Exception in thread "main" java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:90)
at org.apache.kyuubi.zookeeper.EmbeddedZookeeper.initialize(EmbeddedZookeeper.scala:53)
at org.apache.kyuubi.server.KyuubiServer$.startServer(KyuubiServer.scala:38)
at org.apache.kyuubi.server.KyuubiServer$.main(KyuubiServer.scala:76)
at org.apache.kyuubi.server.KyuubiServer.main(KyuubiServer.scala)
發(fā)現(xiàn)KyuubiServer報(bào)錯(cuò)端口占用,但不知道占用的是哪個(gè)端口??聪仍创a,
private val zkServer = new EmbeddedZookeeper()
def startServer(conf: KyuubiConf): KyuubiServer = {
if (!ServiceDiscovery.supportServiceDiscovery(conf)) {
zkServer.initialize(conf)
zkServer.start()
conf.set(HA_ZK_QUORUM, zkServer.getConnectString)
conf.set(HA_ZK_ACL_ENABLED, false)
}
val server = new KyuubiServer()
server.initialize(conf)
server.start()
Utils.addShutdownHook(new Runnable {
override def run(): Unit = server.stop()
}, Utils.SERVER_SHUTDOWN_PRIORITY)
server
}
看下源碼,如果檢測到?jīng)]有配置服務(wù)發(fā)現(xiàn),就會默認(rèn)使用的是內(nèi)嵌的ZooKeeper。當(dāng)前,我們并沒有開啟HA模式。所以,會啟動一個(gè)本地的ZK,而我們當(dāng)前測試環(huán)境已經(jīng)部署了ZK。所以,基于此,我們還是配置好HA。這樣,也可以讓我們Kyuubi服務(wù)更加可靠。
配置Kyuubi HA
vim /opt/kyuubi/conf/kyuubi-defaults.conf
kyuubi.ha.enabled true
kyuubi.ha.zookeeper.quorum hadoop1,hadoop2,hadoop3,hadoop4,hadoop5
kyuubi.ha.zookeeper.client.port 2181
kyuubi.ha.zookeeper.session.timeout 600000
重新啟動
配置好HA后,重新啟動即可。這里,我們先啟動一個(gè)Kyuubi Server。如果啟動多個(gè),Kyuubi是實(shí)現(xiàn)了負(fù)載均衡的。
Welcome to
__ __ __
/\ \/\ \ /\ \ __
\ \ \/'/' __ __ __ __ __ __\ \ \____/\_\
\ \ , < /\ \/\ \/\ \/\ \/\ \/\ \\ \ '__`\/\ \
\ \ \\`\\ \ \_\ \ \ \_\ \ \ \_\ \\ \ \L\ \ \ \
\ \_\ \_\/`____ \ \____/\ \____/ \ \_,__/\ \_\
\/_/\/_/`/___/> \/___/ \/___/ \/___/ \/_/
/\___/
\/__/
檢查Kyuubi啟動情況:
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp 0 0 10.94.158.51:10009 0.0.0.0:* LISTEN 25803/java
我們看了下yarn集群,當(dāng)前沒有啟動任何的應(yīng)用。
接下來,我們使用Hive的beeline來連接下Kyuubi。
測試Kyuubi
注意:使用Spark中的beeline,Hive的beeline有可能客戶端比較老,連接Kyuubi會報(bào)錯(cuò)。
/opt/spark/bin/beeline
!connect jdbc:hive2://hadoop1:10009/;#spark.yarn.queue=root.p1
我們發(fā)現(xiàn)連接報(bào)錯(cuò):
Caused by: org.apache.kyuubi.KyuubiSQLException: 21/09/24 15:38:12 INFO org.apache.hadoop.io.retry.RetryInvocationHandler: org.apache.hadoop.security.authorize.AuthorizationException: User: spark is not allowed to impersonate hive, while invoking ApplicationClientProtocolPBClientImpl.getClusterMetrics over rm1 after 2 failover attempts. Trying to failover after sleeping for 24076ms.
這個(gè)問題是hadoop拋出來的,意思是Spark用戶不允許扮演hive用戶的。我們需要給Spark用戶配置開啟Hadoop代理。
配置spark用戶代理權(quán)限
vim core-site.xml
<property>
<name>hadoop.proxyuser.spark.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.spark.groups</name>
<value>*</value>
</property>
分發(fā)配置文件到所有Hadoop節(jié)點(diǎn),重啟HADOOP集群。
重新使用beeline測試
/opt/spark/bin/beeline
!connect jdbc:hive2://hadoop1:10009/;#spark.yarn.queue=root.p1
第一次啟動比較慢,當(dāng)我們在YARN中可以看到以下應(yīng)用展示表示配置成功:

在Spark的Web UI我們能看到:

在beeline中,顯示如下:
beeline> !connect jdbc:hive2://hadoop1:10009/;#spark.yarn.queue=root.p1
Connecting to jdbc:hive2://hadoop1:10009/;#spark.yarn.queue=root.p1
Enter username for jdbc:hive2://hadoop1:10009/: spark
Enter password for jdbc:hive2://hadoop1:10009/:
Connected to: Spark SQL (version 1.3.0-incubating)
Driver: Hive JDBC (version 3.1.2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://hadoop1:10009/>
執(zhí)行幾條SQL測試下。

再試下hive用戶:
/opt/spark/bin/beeline
!connect jdbc:hive2://hadoop1:10009/;#spark.yarn.queue=root.p1

多個(gè)用戶是相互隔離的。通過用戶來隔離不同的Thrift Server。厲害厲害!
Enjoy!
參考文獻(xiàn):
[1] https://kyuubi.apache.org/
作者|斜杠代碼日記
關(guān)注我
分享Java、大數(shù)據(jù)技術(shù)干貨
