Spark 數(shù)據(jù)讀取冷啟動(dòng)優(yōu)化分析
有時(shí)候會(huì)發(fā)現(xiàn)即使是讀取少量的數(shù)據(jù),啟動(dòng)延時(shí)可能也非常大,針對(duì)該現(xiàn)象進(jìn)行分析,并提供一些解決思路。
背景
Spark 一次查詢(xún)過(guò)程可以簡(jiǎn)單抽象為 planning 階段和 execution 階段,在一個(gè)新的 Spark Session 中第一次查詢(xún)某數(shù)據(jù)的過(guò)程稱(chēng)為冷啟動(dòng),在這種情況下 planning 的耗時(shí)可能會(huì)比 execution 更長(zhǎng)。
Spark 讀取數(shù)據(jù)冷啟動(dòng)時(shí),會(huì)從文件系統(tǒng)中獲取文件的一些元數(shù)據(jù)信息(location,size,etc.)用于優(yōu)化,如果一個(gè)目錄下的文件過(guò)多,就會(huì)比較耗時(shí)(可能達(dá)到數(shù)十分鐘),該邏輯在 InMemoryFieIndex 中實(shí)現(xiàn)。
后續(xù)再次多次查詢(xún)則會(huì)在 FileStatusCache 中進(jìn)行查詢(xún),planning 階段性能也就大幅提升了,下文將探討 planning 階段如何加載元數(shù)據(jù)以及可能的一些優(yōu)化點(diǎn)。
InMemoryFileIndex
before spark 2.1
spark 2.1 版本前,spark 直接從文件系統(tǒng)中查詢(xún)數(shù)據(jù)的元數(shù)據(jù)并將其緩存到內(nèi)存中,元數(shù)據(jù)包括一個(gè) partition 的列表和文件的一些統(tǒng)計(jì)信息(路徑,文件大小,是否為目錄,備份數(shù),塊大小,定義時(shí)間,訪(fǎng)問(wèn)時(shí)間,數(shù)據(jù)塊位置信息)。一旦數(shù)據(jù)緩存后,在后續(xù)的查詢(xún)中,表的 partition 就可以在內(nèi)存中進(jìn)行下推,得以快速的查詢(xún)。
將元數(shù)據(jù)緩存在內(nèi)存中雖然提供了很好的性能,但也存在2個(gè)缺點(diǎn):在 spark 加載所有表分區(qū)的元數(shù)據(jù)之前,會(huì)阻塞查詢(xún)。對(duì)于大型分區(qū)表,遞歸的掃描文件系統(tǒng)以發(fā)現(xiàn)初始查詢(xún)文件的元數(shù)據(jù)可能會(huì)花費(fèi)數(shù)分鐘,特別是當(dāng)數(shù)據(jù)存儲(chǔ)在云端。其次,表的所有元數(shù)據(jù)都需要放入內(nèi)存中,增加了內(nèi)存壓力。
after spark 2.1
spark 2.1 針對(duì)上述缺點(diǎn)進(jìn)行了優(yōu)化,可參考 SPARK-17861
將表分區(qū)元數(shù)據(jù)信息緩存到 catalog 中,例如 (hive metastore),因此可以在 PruneFileSourcePartitions 規(guī)則中提前進(jìn)行分區(qū)發(fā)現(xiàn),catalyse optimeizer 會(huì)在邏輯計(jì)劃中對(duì)分區(qū)進(jìn)行修剪,避免讀取到不需要的分區(qū)文件信息。
文件統(tǒng)計(jì)現(xiàn)在可以在計(jì)劃期間內(nèi)增量的,部分的緩存,而不是全部預(yù)先加載。Spark需要知道文件的大小以便在執(zhí)行物理計(jì)劃時(shí)將它們劃分為讀取任務(wù)。通過(guò)共享一個(gè)固定大小的250MB緩存(可配置),而不是將所有表文件統(tǒng)計(jì)信息緩存到內(nèi)存中,在減少內(nèi)存錯(cuò)誤風(fēng)險(xiǎn)的情況下顯著加快重復(fù)查詢(xún)的速度。
MSCK REPAIR TABLE 命令進(jìn)行轉(zhuǎn)化,查看是否生效,如果 Partition Provider 為 Catalog 則表示會(huì)從 catalog 中獲取分區(qū)信息sql("describe formatted test_table").filter("col_name like '%Partition Provider%'").show+-------------------+---------+-------+| col_name|data_type|comment|+-------------------+---------+-------+|Partition Provider:| Catalog| |+-------------------+---------+-------+
性能對(duì)比
出自官方blog,通過(guò)讀取一張表不同的分區(qū)數(shù),觀察任務(wù) execution time 和 planning time,在spark2.1之前 planning 階段的耗時(shí)是相同的,意味著讀取一個(gè)分區(qū)也需要掃描全表的 file status。

優(yōu)化 HDFS 獲取 File 元數(shù)據(jù)性能
DistributedFileSystem.listLocatedStatus 代替了 fs.listStatus + getFileBlockLocations的方式listLocatedStatus向 namenode 發(fā)起一次請(qǐng)求獲得file status和file block location信息listStatus獲取一系列的 file status 后,還要根據(jù) file status 循環(huán)向 namenode 發(fā)起請(qǐng)求獲得file block location信息
listLocatedStatus
// 對(duì) namenode 只發(fā)起一次 listLocatedStatus 請(qǐng)求,在方法內(nèi)部獲得每個(gè)文件 block location 信息val statuses = fs.listLocatedStatus(path)new Iterator[LocatedFileStatus]() {def next(): LocatedFileStatus = remoteIter.nextdef hasNext(): Boolean = remoteIter.hasNext}.toArraystatuses.flatMap{Some(f)}
fs.listStatus + getFileBlockLocations (只展示核心代碼)
val statuses = fs.listStatus(path)statuses.flatMap{val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc =>if (loc.getClass == classOf[BlockLocation]) {loc} else {new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength)}}val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,f.getModificationTime, 0, null, null, null, null, f.getPath, locations)if (f.isSymlink) {lfs.setSymlink(f.getSymlink)}Some(lfs)}
性能對(duì)比
實(shí)測(cè)一個(gè)57個(gè)分區(qū),每個(gè)分區(qū)1445個(gè)文件的任務(wù),性能提升6倍左右

打入 SPARK-27801 前

打入 SPARK-27801 后
文件元數(shù)據(jù)讀取方式及元數(shù)據(jù)緩存管理
讀取數(shù)據(jù)時(shí)會(huì)先判斷分區(qū)的數(shù)量,如果分區(qū)數(shù)量小于等于
spark.sql.sources.parallelPartitionDiscovery.threshold (默認(rèn)32),則使用 driver 循環(huán)讀取文件元數(shù)據(jù),如果分區(qū)數(shù)量大于該值,則會(huì)啟動(dòng)一個(gè) spark job,分布式的處理元數(shù)據(jù)信息(每個(gè)分區(qū)下的文件使用一個(gè)task進(jìn)行處理)分區(qū)數(shù)量很多意味著 Listing leaf files task 的任務(wù)會(huì)很多,分區(qū)里的文件數(shù)量多意味著每個(gè) task 的負(fù)載高,使用 FileStatusCache 緩存文件狀態(tài),默認(rèn)的緩存
spark.sql.hive.filesourcePartitionFileCacheSize為 250MB
Tip
Listing leaf files task 的數(shù)量計(jì)算公式為
val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)其中,paths.size 為需要讀取的分區(qū)數(shù)量,parallelPartitionDiscoveryParallelism 由參數(shù) spark.sql.sources.parallelPartitionDiscovery.parallelism 控制,默認(rèn)為10000,目的是防止 task 過(guò)多,但從生產(chǎn)任務(wù)上觀察發(fā)現(xiàn)大多數(shù) get status task 完成的時(shí)間都是毫秒級(jí),可以考慮把這個(gè)值調(diào)低,減少任務(wù)啟動(dòng)關(guān)閉的開(kāi)銷(xiāo),或者直接修改源碼將 paths.size 按一定比例調(diào)低,例如 paths.size/2

控制 task 數(shù)量之前

控制 task 數(shù)量之后
結(jié)語(yǔ)
spark 查詢(xún)冷啟動(dòng)(獲取文件元數(shù)據(jù)性能)對(duì)比前幾個(gè)版本已經(jīng)有非常大提升,降低了查詢(xún)的延時(shí)
SPARK-17861 在物理計(jì)劃中進(jìn)行了優(yōu)化,通過(guò)將分區(qū)信息存入 catalog ,避免了讀取時(shí)加載全量表的文件信息
SPARK-27801 優(yōu)化讀取 hdfs 文件元數(shù)據(jù)的方式,之前 getFileBlockLocations 的方式是串行的,在文件數(shù)量很多的情況下速度會(huì)很慢,同時(shí)用 listLocatedStatus 的方式減少了客戶(hù)端對(duì) namenode 的直接調(diào)用,例如需要讀取的數(shù)據(jù)為3個(gè)分區(qū),每個(gè)分區(qū) 10k 個(gè)文件,之前客戶(hù)端需要訪(fǎng)問(wèn) namenode 的次數(shù)為30k,現(xiàn)在為3次
打入最新的 patch 和 優(yōu)化 task 數(shù)量后,隨機(jī)找的一個(gè)生產(chǎn)任務(wù) Listing Leaf files job 時(shí)間從數(shù)十秒減少到1S以?xún)?nèi),不過(guò)有時(shí)候依舊存在毛刺,這與 namenode 和 機(jī)器的負(fù)載程度有關(guān)
一些思考,是否可以考慮用 Redis 替換 FileStatusCache,在數(shù)據(jù)寫(xiě)入的時(shí)候更新 Redis 中的 file status 信息,這樣就相當(dāng)于所有的 spark 應(yīng)用共享了 FileStatusCache ,減少了內(nèi)存使用的同時(shí)也不再有讀數(shù)據(jù)冷啟動(dòng)的問(wèn)題了。
參考
scalable-partition-handling-for-cloud-native-architecture-in-apache-spark-2-1
