Spark 實(shí)踐 | Spark SQL 在雪球的實(shí)踐
背景
因?yàn)闃I(yè)務(wù)需要,雪球數(shù)據(jù)團(tuán)隊(duì)基于HDP 3.1.5(Hadoop 3.1.1+Hive 3.1.0+Tez 0.9.1)搭建了一個(gè)新的集群,HDP 3.1.5默認(rèn)使用Hive3 on Tez作為ETL計(jì)算引擎,但是在使用Hive3 on Tez中,我們遇到很多問題:
- 部分SQL執(zhí)行失敗,需要關(guān)閉掉容器復(fù)用或者向量化執(zhí)行。
- 部分SQL開啟CBO優(yōu)化之后的執(zhí)行計(jì)劃錯(cuò)誤,導(dǎo)致結(jié)果出錯(cuò),需要關(guān)閉CBO優(yōu)化。
- 還有一些時(shí)區(qū)不準(zhǔn)、GroupBy with Limit不準(zhǔn)確等已經(jīng)在新版本fix的bug。
- 極其個(gè)別復(fù)雜多級關(guān)聯(lián)的SQL,計(jì)算結(jié)果不準(zhǔn)確,很難發(fā)現(xiàn),需要通過修改SQL來解決。
這些問題對數(shù)倉開發(fā)來說非常致命。從業(yè)界來看,各公司生產(chǎn)上大部分還是使用Hive2,而Hive和Tez的社區(qū)活躍程度低,更新迭代慢(Hive3.x最新一次release已經(jīng)將近3年了),修復(fù)相關(guān)問題的代價(jià)比較大。
在分別比較了Hive3 on Tez、Hive3 on MR、Hive3 on Spark2 、Spark SQL等各種引擎之后,從準(zhǔn)確性和穩(wěn)定性以及計(jì)算效率各方面綜合考慮,數(shù)據(jù)團(tuán)隊(duì)決定采用Spark SQL在作為數(shù)倉的ETL引擎。經(jīng)過一段時(shí)間推廣和使用,目前在交互查詢和離線ETL很多場景和計(jì)算都已經(jīng)支持了Spark SQL:

本文主要分享了從Hive3 SQL切換成Spark3 SQL的實(shí)踐。
切換過程
Facebook在從Hive切換到Spark SQL的時(shí)候,重寫了Spark SQL的執(zhí)行計(jì)劃,增加了一個(gè)Shadow過程:基于Hive SQL的執(zhí)行日志,執(zhí)行一個(gè)Spark SQL,將數(shù)據(jù)雙寫到Shadow表中,然后再通過工具對比實(shí)際表和Shadow表的執(zhí)行效率和正確性。
雪球數(shù)據(jù)團(tuán)隊(duì)也開發(fā)了類似的工具分別做了測試和對比。公司自研的調(diào)度系統(tǒng)本身自帶執(zhí)行時(shí)長和資源消耗工具(基于yarn的application資源使用統(tǒng)計(jì)),可以用來對比執(zhí)行效率。同時(shí)特意開發(fā)了一個(gè)基于Trino的正確率對比工具來對比正確率。
測試分兩個(gè)階段:
- 對于復(fù)雜場景SQL,主要做了正確率的對比:Hive3 on Tez的正確率約為50%,Hive3 on MR的正確率約為70%,Hive3 on Spark2的正確率為100%(需要關(guān)閉CBO),Spark SQL的正確率為100%。
數(shù)據(jù)準(zhǔn)確性 | 平均執(zhí)行時(shí)間(秒) | |||||
Hive3 on MR | Hive3 on Tez | Hive3 on Spark2(關(guān)閉優(yōu)化) | Spark SQL | Hive on Spark2 | Spark SQL | 降低 |
70% | 50% | 100% | 100% | 1957 | 423 | 88% |
- 對線上實(shí)際運(yùn)行的SQL,通過收集和重放了大量的線上實(shí)際SQL,用不同的引擎寫入不同的目標(biāo)表,然后用工具對比執(zhí)行結(jié)果和執(zhí)行效率。從執(zhí)行時(shí)長來看,Spark SQL執(zhí)行時(shí)長和Hive3 on Tez在一個(gè)數(shù)據(jù)量級,但Spark SQL資源消耗大概在Hive3 on Tez(限制了并行度)的1/3。而Hive3 on Spark2經(jīng)常會(huì)出現(xiàn)數(shù)據(jù)傾斜。Spark SQL的表現(xiàn)最佳。

在謹(jǐn)慎評估正確率和執(zhí)行效率后,大數(shù)據(jù)團(tuán)隊(duì)決定首先使用Hive3 on Spark2作為緊急替換Tez的計(jì)算引擎,隨后選用 Spark 3.2.1 作為長期支持的計(jì)算引擎,逐步將Hive SQL切換成 Spark SQL。
遇到問題
得益于Spark3性能的提升和AQE機(jī)制,性能上很少遇到問題。不過,雪球數(shù)據(jù)團(tuán)隊(duì)在測試和切換過程中,遇到一些問題,其中大部分都是兼容性問題,下面進(jìn)行逐一介紹:
1. Spark SQL無法遞歸子目錄以及無法讀寫自己的問題
當(dāng)Hive表數(shù)據(jù)存放在多級子目錄時(shí),Tez、MR、Spark默認(rèn)均不能識(shí)別和讀取到數(shù)據(jù)。針對這種情況,Apache Hive提供了兩項(xiàng)項(xiàng)參數(shù):
set?hive.mapred.supports.subdirectories=true;
set?mapreduce.input.fileinputformat.input.dir.recursive=true;
但Spark SQL并不支持類似參數(shù)。Spark SQL在執(zhí)行ORC和Parquet格式的文件解析時(shí),默認(rèn)使用Spark內(nèi)置的解析器(Spark內(nèi)置解析器效率更高),這些內(nèi)置解析器不支持遞歸子目錄的兩項(xiàng)參數(shù),并且也沒有其它參數(shù)支持這一效果。可以通過設(shè)置 spark.sql.hive.convertMetastoreOrc=false 來指定Spark使用Hive的解析器,使遞歸子目錄參數(shù)正確生效。Spark的內(nèi)置解析器也將于未來版本中支持遞歸子目錄。
此外,當(dāng)用戶在使用Spark讀寫同一張Hive表時(shí),經(jīng)常會(huì)遇到 "Cannot overwrite a path that is also being read from "的報(bào)錯(cuò),而同樣的語句在Hive中可以進(jìn)行。這是由于Spark對數(shù)倉常用的數(shù)據(jù)類型做了自己的實(shí)現(xiàn)方式,在他自己的實(shí)現(xiàn)方式下,目標(biāo)路徑會(huì)先被清空,隨后才執(zhí)行寫入,而Hive是先寫入到臨時(shí)目錄,任務(wù)完成后再講結(jié)果數(shù)據(jù)替換目標(biāo)路徑。使用Hive解析器也可以解決這個(gè)問題。
2. Hive ORC解析的一些問題
在1 問題的解決方案中,我們選擇統(tǒng)一使用Hive的ORC解析器,這將帶來以下問題:
Hive的ORC在讀取某些Hive表時(shí),會(huì)出現(xiàn)數(shù)組越界異常或空指針異常。
其原因是某些目錄下存在空的ORC文件,可通過設(shè)置hive.exec.orc.split.strategy=BI 規(guī)避空指針問題,
設(shè)置hive.vectorized.execution.enabled=false 規(guī)避數(shù)組越界問題。此外使用Spark 3.x時(shí),還需要設(shè)置 hive.metastore.dml.events=false 避免寫入數(shù)據(jù)時(shí)報(bào)錯(cuò)。
3. Spark.sql.sources.schema問題
在Spark和Hive同時(shí)使用的情況下,某些操作可能會(huì)導(dǎo)致Hive表元數(shù)據(jù)里面有spark.sql.sources.schema.part屬性的存在,后續(xù)如果修改表結(jié)構(gòu)會(huì)導(dǎo)致表元數(shù)據(jù)和數(shù)據(jù)不一致的情況。例如:新增字段A后并執(zhí)行新的寫入語句后,查詢A字段值為NULL。
這是因?yàn)镾park在讀寫存在該屬性的Hive表時(shí),會(huì)優(yōu)先使用該屬性提供的映射值來生成表結(jié)構(gòu)。而Hive原生修改表結(jié)構(gòu)的語句不會(huì)更新該值,最終導(dǎo)致新字段在讀寫時(shí)不被Spark識(shí)別。
解決方案是重新建表,或者刪除該表屬性。在兩個(gè)引擎同時(shí)存在時(shí)期,可以約定只使用Hive來執(zhí)行DDL數(shù)據(jù)。
4. Spark權(quán)限和審計(jì)
在Hive里面,我們繼承了PasswdAuthenticationProvider實(shí)現(xiàn)了自定義的用戶認(rèn)證,通過集成Ranger實(shí)現(xiàn)了權(quán)限管控,而Spark開源版并沒有完整的解決方案。官方的Spark Thrift Server在資源隔離和權(quán)限管控上有很大的不足,我們引入了Apache Kyuubi。Kyuubi也有類似PasswdAuthenticationProvider的接口,可以來實(shí)現(xiàn)用戶認(rèn)證。對于權(quán)限管控,一般的方案是使用Submarine。但是Submarine最新版本已經(jīng)將這一模塊去掉,而最近一個(gè)支持Ranger的0.6.0版本只支持Spark 3.0。Spark集成Ranger的要先解析SQL取得相關(guān)的表和字段,以判斷當(dāng)前用戶是否有權(quán)限讀寫,而Spark 3.0到Spark 3.2.1的解析SQL做了很多修改,所以我們修改了相關(guān)的代碼來適配Spark 3.2.1。同時(shí)基于Apache Kyuubi的Event體系,完成了Spark的審計(jì)功能。

5. Hive SQL 遷移 Spark SQL 的一些較隱蔽的坑
- 日期類型比較,處理方式不同
低版本Hive會(huì)將Date類型轉(zhuǎn)換為string,2.3.5以后的版本會(huì)將String轉(zhuǎn)換為Date比較。
如: '2022-03-14 11:11:11' > date_sub('2022-03-15',1)
在低版本時(shí),該不等式結(jié)果為true,高版本則為false。在 Spark SQL 3.2.1 中,結(jié)果同樣為false。
- 類型嚴(yán)格程度不同
Hive 默認(rèn)支持隱式轉(zhuǎn)換,Spark需要設(shè)置 spark.sql.storeAssignmentPolicy=LEGACY 才支持有限度的隱式轉(zhuǎn)換,否則執(zhí)行會(huì)報(bào)錯(cuò)。
對語義的精準(zhǔn)度要求更高,例如
關(guān)聯(lián)語法不同:
select a from t1 join t2 group by t1.a
在Spark SQL中需要寫成 select t1.a from t1 join t2 group by t1.a
????grouping語法不同:Select a,b from t1 group by a,b grouping sets (a,b)
在Hive中除了聚合匯總a和b維度外,還會(huì)匯總整體維度,但是在SparkSQL中要求寫成
Select a,b from t1 group by a,b grouping sets ((),(a),(b))
6. 動(dòng)態(tài)資源,多版本兼容
Spark動(dòng)態(tài)資源可以節(jié)省很多資源,但是要依賴shuffle service。因?yàn)榧涸谇袚Q過程中需要同時(shí)支持Spark2(Hive on Spark2)和Spark3,所以需要保證集群能夠同時(shí)支持兩個(gè)版本的shuffle service。YARN在2.9.0之后支持了Classloader隔離的aux service。而Spark 3.1引入了可配置的方式去啟動(dòng)不同端口不同classpath包的shuffle service。但是在實(shí)踐中發(fā)現(xiàn),Yarn的這種機(jī)制并不能加載xml配置文件,需要將xml打成jar包才能識(shí)別。

7. 小文件問題
為了提升計(jì)算速度,大數(shù)據(jù)計(jì)算引擎在計(jì)算時(shí)候都會(huì)采取并行處理,而Spark SQL在寫入數(shù)據(jù)的時(shí)候是并行寫入,并沒有一個(gè)合并的過程。小文件過多,會(huì)增大Namenode的壓力,同時(shí)對查詢性能也有很大影響。通常在Hive中可以引入 hive.spark.mergefiles=true 來為hive的執(zhí)行計(jì)劃增加一個(gè)合并Job,但Spark SQL不支持這個(gè)做法。
目前,我們開啟AQE,通過設(shè)置目標(biāo)大小和最大shuffle上限在一定程度上減少最后生成的文件數(shù)。例如:
--conf?spark.sql.adaptive.enabled=true?\
--conf?spark.sql.adaptive.advisoryPartitionSizeInBytes=262144000?\
--conf?spark.sql.adaptive.maxNumPostShufflePartitions=200?\
--conf?spark.sql.adaptive.forceApply=true?\
--conf?spark.sql.adaptive.coalescePartitions.parallelismFirst=false?\
--conf?spark.sql.adaptive.coalescePartitions.minPartitionSize?=52428800?\
注意:advisoryPartitionSizeInBytes這個(gè)參數(shù)指定的不是最終生成的文件大小,而是在最終輸出文件階段,每個(gè)partition read的字節(jié)大小,此處的256M對應(yīng)到ORC Snappy的輸出文件大小約為55M。
經(jīng)實(shí)驗(yàn),生成的文件數(shù)最大為200個(gè),大小平均55M。總大小小于50M時(shí),只會(huì)有一個(gè)文件。
未來規(guī)劃
目前每天300+任務(wù)是基于Spark SQL,已經(jīng)穩(wěn)定運(yùn)行較長時(shí)間,之前遇到的問題都已經(jīng)基本解決,后續(xù)會(huì)將所有的ETL引擎統(tǒng)一到Spark SQL,用來提高計(jì)算效率。使用Spark SQL的主要場景還是在數(shù)倉離線的ETL,后續(xù)會(huì)在更多的場景嘗試引入Spark SQL,比如交互式分析,會(huì)結(jié)合公司目前的Trino引擎做一些互補(bǔ)。另外,目前業(yè)務(wù)上有很多實(shí)時(shí)的數(shù)據(jù)需求,后續(xù)會(huì)基于Spark技術(shù)棧引入Hudi等數(shù)據(jù)湖技術(shù)來滿足業(yè)務(wù)的需求。
