Apache Spark 完全替代傳統(tǒng)數(shù)倉的技術(shù)挑戰(zhàn)及實(shí)踐
7月30日,由 Kyligence 主辦的 Data & Cloud Summit 2021 行業(yè)峰會(huì)在上海成功舉辦,此次峰會(huì)特設(shè)「開源有道」分論壇,邀請了來自 Apache Kylin,Apache Spark,Alluxio,Linkis,Ray 以及 MLSQL 等開源社區(qū)的技術(shù)大佬,分享了目前開源社區(qū)關(guān)于大數(shù)據(jù)、機(jī)器學(xué)習(xí)等多個(gè)熱門話題的前沿技術(shù)和最佳實(shí)踐。來自 eBay 數(shù)據(jù)團(tuán)隊(duì)的馬剛分享了他們在用 Apache Spark 完全替代傳統(tǒng)數(shù)倉中遇到的技術(shù)挑戰(zhàn)及改進(jìn)等話題,引起了現(xiàn)場觀眾的熱烈討論。
以下為馬剛在大會(huì)演講實(shí)錄
大家好!我叫馬剛,來自 eBay 的大數(shù)據(jù)團(tuán)隊(duì),很高興今天有機(jī)會(huì)在這里分享我們團(tuán)隊(duì)在過去 2 年做的工作,主要是基于開源的 Spark 和 Hadoop 替換掉傳統(tǒng)數(shù)據(jù)倉庫。今天我會(huì)講到我們在用 Apache Spark 替換傳統(tǒng)的數(shù)據(jù)倉庫中遇到的技術(shù)挑戰(zhàn),以及我們怎么解決的。
今天我分享的 Agenda 如下:
系統(tǒng)介紹
技術(shù)挑戰(zhàn)
- 功能性改進(jìn)
- 性能改進(jìn)
- 穩(wěn)定性改進(jìn)
總結(jié)
系統(tǒng)介紹
我們這個(gè)系統(tǒng)的名字叫 Carmel,它是基于開源的 Hadoop 和 Spark 來替換傳統(tǒng)的數(shù)據(jù)倉庫,我們是 2019 年開始做我們這個(gè)項(xiàng)目的,當(dāng)時(shí)是基于 Spark 2.3.1,最近剛剛升到 Spark 3.0。面臨的主要技術(shù)挑戰(zhàn),第一個(gè)是功能方面的缺失,包括訪問控制,還有一些 Update 和 Delete 的支持;在性能方面跟傳統(tǒng)數(shù)倉,特別是交互式的分析查詢中性能方面存在較大差距,還有一些穩(wěn)定性的問題。

這是 Carmel 系統(tǒng)的整體架構(gòu),比較簡單,可以看到我們自己開發(fā)了 ODBC 和 JDBC driver,對接一些 BI 工具,用戶可以直接用 Python 或者 Java 來連我們的系統(tǒng)。中間是 Gateway,用來做用戶認(rèn)證以及權(quán)限檢查,之后會(huì)把用戶的 SQL 請求發(fā)到一個(gè)分析查詢的集群。這里有兩個(gè)集群,一個(gè)是我們主要使用的分析集群,另外一個(gè)是 eBay 內(nèi)部比較通用的 Hadoop 集群,是用來跑大量的 Spark 或者 MapReduce 的任務(wù),包括一些機(jī)器學(xué)習(xí)任務(wù)等,比較忙。
這個(gè)分析集群主要是服務(wù)于 eBay 內(nèi)部的分析師,我們 Spark 也是跑在 YARN 上面,不同的部門會(huì)切不同的 Queue,這些不同部門的請求之后會(huì)分到對應(yīng) Queue 的 Spark Driver,是一個(gè) long running 的服務(wù),它是一直啟動(dòng)著的,給所有部門的用戶,所有 SQL 都是它提供服務(wù),去解析、去進(jìn)行執(zhí)行的。這個(gè)分析集群是 SSD 存儲,存儲的性能比較好。上面主要是存一些 Spark Shuffle 和 Cache 的數(shù)據(jù),以及分析師個(gè)人的數(shù)據(jù)集。這個(gè)分析集群的 Spark Driver 還可以共享集群的 HDFS 數(shù)據(jù),讓用戶也可以直接分析生產(chǎn)的一些數(shù)據(jù)集,這個(gè)分析的集群跟 General 集群是共享一個(gè) Hive Metastore 的,表是可以互相訪問的。
功能改進(jìn)
接下來我會(huì)講在功能性的改進(jìn)方面做了哪些工作。
訪問控制

第一是訪問控制,前面提到我們有一個(gè) Gateway 的服務(wù)器負(fù)責(zé)做一些用戶權(quán)限認(rèn)證,還有對一些集群和 Queue 的訪問控制。我們通過一個(gè)系統(tǒng)賬戶來讀寫 HDFS,個(gè)人賬戶不能直接訪問 HDFS,來避免一些安全方面的問題。對于一些數(shù)據(jù)庫或者表級別的訪問權(quán)限控制,我們是基于 SQL 來做訪問控制,類似于傳統(tǒng)數(shù)據(jù)庫的訪問控制 SQL 語句,例如 Grant、創(chuàng)建 Role 等。用戶也是通過 view 來進(jìn)行列級別的訪問控制,可以針對每一個(gè)物理表建立不同的 view,讓某些用戶只能訪問一些不敏感的列。
對 Update/Delete 支持

我們是基于 Delta Lake 來做 Update 和 Delete 的支持。剛開始做的時(shí)候是基于 0.4 的版本,現(xiàn)在升級到了最新版本。Delta Lake 當(dāng)時(shí)只支持 Dataframe 的接口,我們提供了 Update/Delete 的 SQL 語法的支持,還支持了比較先進(jìn)的傳統(tǒng)數(shù)倉中會(huì)支持 Update/Delete with Join 語法。還有就是 Delta 表的管理,我們這個(gè)系統(tǒng)是基于 Hadoop 的,大家知道 Hadoop 最怕小文件,但是 Delta Lake 的特點(diǎn)就是每次都會(huì)創(chuàng)建比較多的小文件出來,會(huì)使 Hadoop 系統(tǒng)不是很穩(wěn)定,我們會(huì)定期地去清除老版本的 Delta 文件,做一些 Delta 表的管理工作。
上傳下載 API

我們還對一些缺失的功能進(jìn)行了改進(jìn),比如上傳下載的 API。用戶經(jīng)常會(huì)把一些外部數(shù)據(jù)傳到數(shù)倉里面去做分析,比如對一些社交媒體的數(shù)據(jù)進(jìn)行分析,我們需要支持這種上傳 CSV 文件到某個(gè)表或者某個(gè)表的分區(qū)。其次就是下載,用戶經(jīng)常會(huì)用一些 BI 工具,比如 Tableau 或者 MicroStrategy 去做分析,這種分析工具經(jīng)常會(huì)需要下載比較大規(guī)模的結(jié)果集到工具本身去進(jìn)行操作,比如構(gòu)建一些本地的 Cube 等,而且我們線上發(fā)現(xiàn)用戶經(jīng)常會(huì)下載 100G 到 200G 的數(shù)據(jù)到本地。Spark 原生的一些 API、thrift API 的性能是比較差的。我們也自己實(shí)現(xiàn)了一些下載的 API,來提升性能,把那些 Parquet 文件直接下載到文地,通過 ODBC driver 去迭代本地的 Parquet 文件,來提高它的性能。經(jīng)過我們測試,通過這種方式去訪問超大的數(shù)據(jù)集會(huì)比傳統(tǒng)的 Thrift API 快 3 - 4 倍,因?yàn)闇p少了大量的 RPC 調(diào)用,還有每條記錄的序列化、反序列化的開銷。
其他新功能
我們還增加了很多其他新功能,比如支持 Volatile table,因?yàn)?Spark 社區(qū)版只支持一個(gè) Temporary view,只定義一個(gè) SQL,不會(huì) materialized 到存儲中去。傳統(tǒng)數(shù)倉其實(shí)有這種功能,用戶寫的時(shí)候經(jīng)常會(huì)建了很多 tmp table,把 tmp table 遷到 Spark 中去,如果直接用 Temporary view 來代替的話,最后生成的 SQL 執(zhí)行計(jì)劃會(huì)非常復(fù)雜,性能會(huì)非常差。
我們也實(shí)現(xiàn)了對 like any/like all 的支持,還有對壓縮表的支持,主要也是解決 Hadoop 中一些小文件的問題,把小文件壓縮成一些比較大的文件。
性能改進(jìn)
接下來我會(huì)介紹我們在性能改進(jìn)方面所做的一些工作。
性能改進(jìn) - 透明數(shù)據(jù)緩存

第一個(gè),透明的數(shù)據(jù)緩存,前面介紹過我們系統(tǒng)的架構(gòu)里面有 2 個(gè) HFDS,一個(gè)是分析集群的 HDFS、一個(gè)是共享集群的 HDFS,共享集群的 HFDS Load 比較高,經(jīng)常會(huì)跑一些機(jī)器學(xué)習(xí)的任務(wù)。如果用戶要訪問共享集群的 HDFS,經(jīng)常會(huì)不穩(wěn)定。比如說共享的 Namenode 不穩(wěn)定,或者是后面的 Datanode 不穩(wěn)定,會(huì)時(shí)快時(shí)慢。我們對常用的生產(chǎn)數(shù)據(jù)集做了數(shù)據(jù)緩存,會(huì)定期把這些常用數(shù)據(jù)集從共享集群復(fù)制到分析集群。用戶分析的時(shí)候,生成物理執(zhí)行計(jì)劃的時(shí)候會(huì)把這些 HDFS 文件透明替換掉,用戶是感知不到你在后面做了緩存的。
性能改進(jìn) - 索引

我們也做了一個(gè) Bloom Filter 索引,因?yàn)槲覀兙€上還是有一些查詢是點(diǎn)查的場景,從非常大的數(shù)據(jù)集中,最后結(jié)果只需要一點(diǎn)點(diǎn)的數(shù)據(jù),這時(shí)候我們可以建一些索引。我們這個(gè)索引是單獨(dú)的一個(gè)索引文件,跟數(shù)據(jù)文件是分開的,所以可以比較靈活地根據(jù)用戶的需求,在表的不同列上建索引、刪除索引。同時(shí),我們提供了建索引的 SQL 語法,就跟普通的 OLTP 建索引的語法類似。我們也測試了一些比較常見的點(diǎn)查場景,在 80%-90% 的場景下,IO 和 CPU 使用降低比較多。
性能改進(jìn) - AQE
我們也對 AQE 進(jìn)行了一些性能改進(jìn)。之前提到我們系統(tǒng)最開始是基于 Spark 2.3.1 的,AQE 是 Spark 3.0 的時(shí)候引進(jìn)來的,所以我們把 AQE Backport 進(jìn)我們版本里面去了。AQE 對我們所在的場景的性能提升是非常重要的。我們是一個(gè)共享的 Spark Driver,用戶是沒辦法設(shè)定 Spark 參數(shù)的,比如 shuffle partition 的數(shù)量都是固定的。在 AQE,你可以做 partition 數(shù)量的 coalesce,把一些小的 partition 壓成一個(gè)大的 partition。還有就是把 SortMergeJoin 轉(zhuǎn)成 BroadcastJoin 去處理一些 Skew Join 的 Case。在社區(qū)版本的 Spark 中的 Skew Join 的場景是比較簡單的,兩邊是 shuffle stage,中間是一個(gè) SortMergejoin,為了處理這種場景,我們做了更多改進(jìn)。

比如說第一個(gè)式子中,單個(gè) Querystage,右邊是一個(gè) Parquet 表,他是沒有 shuffle 在的。第二種不是這種比較經(jīng)典的兩個(gè) shuffle stage,是后面加 sort 的,可能中間還有一些別的算子比如 Aggregation 算子,SortMergeJoin 等。
性能改進(jìn) - Bucket Join
我們做的另外一個(gè)性能改進(jìn)是 Bucket Join,一些倍數(shù)關(guān)系的 Bucket 表的 join 是不用 shuffle 的。


我們現(xiàn)在支持兩種不用 shuffle 的方式,一種是 Merge Sort,把多的并到小的,把 Table A 和 Table B 進(jìn)行 Join,左邊表的 Bucket 數(shù)量是 4 個(gè),右邊 Bucket 數(shù)量是 8 個(gè),可以按左邊 4 個(gè)去做一個(gè) Join,右邊是不需要 shuffle 的。還有 Rebucket,相反的就把少變多,task 數(shù)量會(huì)變多。缺點(diǎn)可能在于會(huì)重復(fù)讀數(shù)據(jù),IO 會(huì)多一些,現(xiàn)在我們生產(chǎn)上是 enable merge sort 這種方式。
另外一個(gè)性能改進(jìn)是 DPP,我們也把 Spark 3.0 中的 DPP 移植到 Spark 2.3 中,做了一些改進(jìn),使 DPP 和 AQE 可以協(xié)同工作。目前社區(qū)版 AQE enable 的時(shí)候,DPP 是沒辦法同時(shí)運(yùn)行的。但是這兩個(gè)功能對我們線上版本都還是很有用的,所以我們進(jìn)行了一些改動(dòng),使它們可以同時(shí)工作。

還有 Runtime Filter,它的原理和 DPP 類似,因?yàn)?DPP 要求你的 Join 條件中包含了 partition column 才會(huì) enable 成 DPP,Runtime Filter 可以把一些非 partition 條件做一些 filter 放到左邊,這對某些 case 比較有用,比如在右表很小、左表很大的情況下,如果 filter 效率比較高的話,可以使它的 shuffle 數(shù)據(jù)量減少非常多。
性能改進(jìn) - Range Join

我們做的另外一個(gè)性能改進(jìn)是 Range Join。目前我們線上大概每天有 2000 多個(gè)非等值的 Join,join 的條件里都是大于等于、小于等于或者是不等于這種條件,對這種Join,Spark 里面默認(rèn)是用Broadcast NestedloopJoin,它的性能是比較差的,特別是對于Join和 Broadcast 的表都比較大,它的時(shí)間復(fù)雜度是 O(N*M)。我們對一些 Range join,比如 join 條件是 A Between B and C 這樣的,我們會(huì)用 BroadcastRangeJoin,因?yàn)?Broadcast NestedloopJoin 會(huì) Broadcast 那個(gè)小表,BroadcastRangeJoin 相當(dāng)于是給那個(gè)小表做了索引,給它排個(gè)序,Join 時(shí)候就不需要每一條都掃描,只掃描一部分就可以了,他的復(fù)雜度就降為 O(N*2*LOG(M)),所以在某些場景下,性能會(huì)有上百倍的提升,比如左邊有1000萬,右邊有100萬 Join 的這種 Case。
Parquet 讀優(yōu)化

我們還做了一些對 Parquet 的讀優(yōu)化,主要是做了一些降低讀 Parquet 文件時(shí)的 NameNode rpc 的調(diào)用,以及多線程讀 Parquet 文件。為什么我們要用多線程去讀呢?舉個(gè)例子,在我們線上,有一個(gè)用戶行為的表非常大,而且是一個(gè) bucket 表,bucket 數(shù)量不能太多,數(shù)量太多的話會(huì)讓 task 也非常多,系統(tǒng)非常忙。我們對這種 bucket 表進(jìn)行掃描的時(shí)候,可能一個(gè)task要讀的Parquet文件就非常多,這種 Hadoop 的“讀”就成為查詢的瓶頸了。對于這種場景,我們可以使用多線程去讀 Parquet 文件。我們前面說了系統(tǒng)中有共享的集群,HDFS 本身不是很穩(wěn)定,我們會(huì)使用多線程去讀的話,會(huì)降低 HDFS 讀等待的開銷。還有下推更多的 filter 到 Parquet 文件。
其它性能改進(jìn)
我們還做了其他方面的性能改進(jìn),比如調(diào)度性能改進(jìn),對 Spark 里 DAGScheduler 的改進(jìn)等。因?yàn)槲覀兪且粋€(gè)共享的 Spark Driver,很多用戶對調(diào)度的要求比較高,希望 task 能夠很快地調(diào)動(dòng)起來,但是現(xiàn)有 Spark 社區(qū)版本里大部分都不是這種場景的,所以對調(diào)度的性能要求沒那么高,我們做了很多異步化和多線程的改造。同時(shí)我們還做了 Spark Driver 里一些鎖的優(yōu)化,從比較大的鎖粒度降到比較小的鎖粒度。第三點(diǎn)是物化視圖,目前這個(gè)功能已經(jīng)完成,但是沒有在線上用起來,因?yàn)檫€存在一些數(shù)據(jù)質(zhì)量方面的問題,在物化視圖更新這方面還沒完善,所以說我們還在做。
穩(wěn)定性改進(jìn)
Driver 穩(wěn)定性改進(jìn)
接下來我會(huì)講一下在穩(wěn)定性方面的一些改進(jìn),主要講Executor和Driver兩個(gè)方面,很多用戶需要Driver,Driver 是長期運(yùn)行的一個(gè)服務(wù),對穩(wěn)定性要求非常高,我們做了以下改進(jìn):
大結(jié)果集溢寫到磁盤;
限制最終結(jié)果集和中間結(jié)果集的大??;
Broadcast 的管理和限制。經(jīng)過線上的跟蹤,都是這個(gè)表太多引起內(nèi)存的問題,我們對此進(jìn)行了管理和限制;
單個(gè) SQL 的 task 數(shù)量以及總 task 時(shí)間限制;
單次 table scan 的文件數(shù)和大小的限制;
SQL 優(yōu)化階段的限制;
Join 膨脹率的限制;
Spark UI 的分離;
DAGSchedule 線程模型改進(jìn)。
Executor 穩(wěn)定性改進(jìn)
我們還做了一些 Executor 的穩(wěn)定性改進(jìn),包括:
shuffle 內(nèi)存控制;
UDF review/release 流程改進(jìn);
加各種限制保護(hù) Executor 內(nèi)存。
總結(jié)

最后,我簡單做一個(gè)總結(jié),上圖是我們現(xiàn)在的一個(gè)系統(tǒng)狀況。大概每天跑 30 多萬個(gè)查詢,80 分位的查詢在 20 秒左右,95 分位是 100 秒左右,目前大概是這樣的情況。
