<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spark 數(shù)據(jù)讀取冷啟動(dòng)優(yōu)化分析

          共 4360字,需瀏覽 9分鐘

           ·

          2021-03-25 11:20

          有時(shí)候會(huì)發(fā)現(xiàn)即使是讀取少量的數(shù)據(jù),啟動(dòng)延時(shí)可能也非常大,針對(duì)該現(xiàn)象進(jìn)行分析,并提供一些解決思路。

          背景

          Spark 一次查詢(xún)過(guò)程可以簡(jiǎn)單抽象為 planning 階段和 execution 階段,在一個(gè)新的 Spark Session 中第一次查詢(xún)某數(shù)據(jù)的過(guò)程稱(chēng)為冷啟動(dòng),在這種情況下 planning 的耗時(shí)可能會(huì)比 execution 更長(zhǎng)。

          Spark 讀取數(shù)據(jù)冷啟動(dòng)時(shí),會(huì)從文件系統(tǒng)中獲取文件的一些元數(shù)據(jù)信息(location,size,etc.)用于優(yōu)化,如果一個(gè)目錄下的文件過(guò)多,就會(huì)比較耗時(shí)(可能達(dá)到數(shù)十分鐘),該邏輯在 InMemoryFieIndex 中實(shí)現(xiàn)。

          后續(xù)再次多次查詢(xún)則會(huì)在 FileStatusCache 中進(jìn)行查詢(xún),planning 階段性能也就大幅提升了,下文將探討 planning 階段如何加載元數(shù)據(jù)以及可能的一些優(yōu)化點(diǎn)。

          InMemoryFileIndex

          before spark 2.1


          spark 2.1 版本前,spark 直接從文件系統(tǒng)中查詢(xún)數(shù)據(jù)的元數(shù)據(jù)并將其緩存到內(nèi)存中,元數(shù)據(jù)包括一個(gè) partition 的列表和文件的一些統(tǒng)計(jì)信息(路徑,文件大小,是否為目錄,備份數(shù),塊大小,定義時(shí)間,訪(fǎng)問(wèn)時(shí)間,數(shù)據(jù)塊位置信息)。一旦數(shù)據(jù)緩存后,在后續(xù)的查詢(xún)中,表的 partition 就可以在內(nèi)存中進(jìn)行下推,得以快速的查詢(xún)。

          將元數(shù)據(jù)緩存在內(nèi)存中雖然提供了很好的性能,但也存在2個(gè)缺點(diǎn):在 spark 加載所有表分區(qū)的元數(shù)據(jù)之前,會(huì)阻塞查詢(xún)。對(duì)于大型分區(qū)表,遞歸的掃描文件系統(tǒng)以發(fā)現(xiàn)初始查詢(xún)文件的元數(shù)據(jù)可能會(huì)花費(fèi)數(shù)分鐘,特別是當(dāng)數(shù)據(jù)存儲(chǔ)在云端。其次,表的所有元數(shù)據(jù)都需要放入內(nèi)存中,增加了內(nèi)存壓力。

          after spark 2.1

          spark 2.1 針對(duì)上述缺點(diǎn)進(jìn)行了優(yōu)化,可參考 SPARK-17861

          • 將表分區(qū)元數(shù)據(jù)信息緩存到 catalog 中,例如 (hive metastore),因此可以在 PruneFileSourcePartitions  規(guī)則中提前進(jìn)行分區(qū)發(fā)現(xiàn),catalyse optimeizer 會(huì)在邏輯計(jì)劃中對(duì)分區(qū)進(jìn)行修剪,避免讀取到不需要的分區(qū)文件信息。

          • 文件統(tǒng)計(jì)現(xiàn)在可以在計(jì)劃期間內(nèi)增量的,部分的緩存,而不是全部預(yù)先加載。Spark需要知道文件的大小以便在執(zhí)行物理計(jì)劃時(shí)將它們劃分為讀取任務(wù)。通過(guò)共享一個(gè)固定大小的250MB緩存(可配置),而不是將所有表文件統(tǒng)計(jì)信息緩存到內(nèi)存中,在減少內(nèi)存錯(cuò)誤風(fēng)險(xiǎn)的情況下顯著加快重復(fù)查詢(xún)的速度。
          舊表可以使用 MSCK REPAIR TABLE 命令進(jìn)行轉(zhuǎn)化,查看是否生效,如果 Partition ProviderCatalog 則表示會(huì)從 catalog 中獲取分區(qū)信息
          sql("describe formatted test_table").filter("col_name like '%Partition Provider%'").show+-------------------+---------+-------+|           col_name|data_type|comment|+-------------------+---------+-------+|Partition Provider:|  Catalog|       |+-------------------+---------+-------+

          性能對(duì)比

          出自官方blog,通過(guò)讀取一張表不同的分區(qū)數(shù),觀察任務(wù) execution time 和 planning time,在spark2.1之前 planning 階段的耗時(shí)是相同的,意味著讀取一個(gè)分區(qū)也需要掃描全表的 file status。


          優(yōu)化 HDFS 獲取 File 元數(shù)據(jù)性能

          雖然優(yōu)化了避免加載過(guò)多元數(shù)據(jù)的問(wèn)題,但是單個(gè)分區(qū)下文件過(guò)多導(dǎo)致讀取文件元數(shù)據(jù)緩慢的問(wèn)題并沒(méi)有解決。
          在 SPARK-27801 中(將在 spark3.0 release),對(duì)一個(gè)目錄下多文件的場(chǎng)景進(jìn)行了優(yōu)化,性能有大幅度的提升。
          使用 DistributedFileSystem.listLocatedStatus 代替了 fs.listStatus + getFileBlockLocations的方式
          • listLocatedStatus  向 namenode 發(fā)起一次請(qǐng)求獲得 file statusfile block location 信息
          • listStatus 獲取一系列的 file status 后,還要根據(jù) file status 循環(huán)向 namenode 發(fā)起請(qǐng)求獲得 file block location信息

          listLocatedStatus

          // 對(duì) namenode 只發(fā)起一次 listLocatedStatus 請(qǐng)求,在方法內(nèi)部獲得每個(gè)文件 block location 信息val statuses = fs.listLocatedStatus(path)new Iterator[LocatedFileStatus]() {  def next(): LocatedFileStatus = remoteIter.next  def hasNext(): Boolean = remoteIter.hasNext}.toArraystatuses.flatMap{  Some(f)}


          fs.listStatus + getFileBlockLocations (只展示核心代碼)


          val statuses = fs.listStatus(path)statuses.flatMap{  val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc =>    if (loc.getClass == classOf[BlockLocation]) {        loc    } else {        new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength)    }    }  val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,            f.getModificationTime, 0, null, null, null, null, f.getPath, locations)  if (f.isSymlink) {    lfs.setSymlink(f.getSymlink)  }  Some(lfs)}


          性能對(duì)比

          實(shí)測(cè)一個(gè)57個(gè)分區(qū),每個(gè)分區(qū)1445個(gè)文件的任務(wù),性能提升6倍左右

          打入 SPARK-27801 前

          打入 SPARK-27801 后

          文件元數(shù)據(jù)讀取方式及元數(shù)據(jù)緩存管理

          1. 讀取數(shù)據(jù)時(shí)會(huì)先判斷分區(qū)的數(shù)量,如果分區(qū)數(shù)量小于等于spark.sql.sources.parallelPartitionDiscovery.threshold (默認(rèn)32),則使用 driver 循環(huán)讀取文件元數(shù)據(jù),如果分區(qū)數(shù)量大于該值,則會(huì)啟動(dòng)一個(gè) spark job,分布式的處理元數(shù)據(jù)信息(每個(gè)分區(qū)下的文件使用一個(gè)task進(jìn)行處理)

          2. 分區(qū)數(shù)量很多意味著 Listing leaf files task 的任務(wù)會(huì)很多,分區(qū)里的文件數(shù)量多意味著每個(gè) task 的負(fù)載高,使用 FileStatusCache 緩存文件狀態(tài),默認(rèn)的緩存 spark.sql.hive.filesourcePartitionFileCacheSize 為 250MB

          Tip
          Listing leaf files task 的數(shù)量計(jì)算公式為

          val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)

          其中,paths.size 為需要讀取的分區(qū)數(shù)量,parallelPartitionDiscoveryParallelism 由參數(shù) spark.sql.sources.parallelPartitionDiscovery.parallelism 控制,默認(rèn)為10000,目的是防止 task 過(guò)多,但從生產(chǎn)任務(wù)上觀察發(fā)現(xiàn)大多數(shù) get status task 完成的時(shí)間都是毫秒級(jí),可以考慮把這個(gè)值調(diào)低,減少任務(wù)啟動(dòng)關(guān)閉的開(kāi)銷(xiāo),或者直接修改源碼將 paths.size 按一定比例調(diào)低,例如 paths.size/2

          控制 task 數(shù)量之前

          控制 task 數(shù)量之后

          結(jié)語(yǔ)

          spark 查詢(xún)冷啟動(dòng)(獲取文件元數(shù)據(jù)性能)對(duì)比前幾個(gè)版本已經(jīng)有非常大提升,降低了查詢(xún)的延時(shí)

          • SPARK-17861 在物理計(jì)劃中進(jìn)行了優(yōu)化,通過(guò)將分區(qū)信息存入 catalog ,避免了讀取時(shí)加載全量表的文件信息

          • SPARK-27801 優(yōu)化讀取 hdfs 文件元數(shù)據(jù)的方式,之前 getFileBlockLocations 的方式是串行的,在文件數(shù)量很多的情況下速度會(huì)很慢,同時(shí)用 listLocatedStatus 的方式減少了客戶(hù)端對(duì) namenode 的直接調(diào)用,例如需要讀取的數(shù)據(jù)為3個(gè)分區(qū),每個(gè)分區(qū) 10k 個(gè)文件,之前客戶(hù)端需要訪(fǎng)問(wèn) namenode 的次數(shù)為30k,現(xiàn)在為3次

          • 打入最新的 patch 和 優(yōu)化 task 數(shù)量后,隨機(jī)找的一個(gè)生產(chǎn)任務(wù) Listing Leaf files job 時(shí)間從數(shù)十秒減少到1S以?xún)?nèi),不過(guò)有時(shí)候依舊存在毛刺,這與 namenode 和 機(jī)器的負(fù)載程度有關(guān)

          一些思考,是否可以考慮用 Redis 替換 FileStatusCache,在數(shù)據(jù)寫(xiě)入的時(shí)候更新 Redis 中的 file status 信息,這樣就相當(dāng)于所有的 spark 應(yīng)用共享了 FileStatusCache ,減少了內(nèi)存使用的同時(shí)也不再有讀數(shù)據(jù)冷啟動(dòng)的問(wèn)題了。

          參考

          scalable-partition-handling-for-cloud-native-architecture-in-apache-spark-2-1

          瀏覽 93
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产美女自拍视频 | 三级国产网站 | 日本A视频在线观看 | 综合激情五月天 | 荫蒂添出高潮A片视频 |