<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spark Shuffle過程詳解

          共 13963字,需瀏覽 28分鐘

           ·

          2021-04-15 08:15

          點擊上方藍色字體,選擇“設(shè)為星標
          回復”資源“獲取更多資源

          ?

          ?你需要預習:
          《Spark的Cache和Checkpoint區(qū)別和聯(lián)系拾遺

          《Spark Job 邏輯執(zhí)行圖和數(shù)據(jù)依賴解析

          《Spark Job 物理執(zhí)行圖詳解

          上一章里討論了 job 的物理執(zhí)行圖,也討論了流入 RDD 中的 records 是怎么被 compute() 后流到后續(xù) RDD 的,同時也分析了 task 是怎么產(chǎn)生 result,以及 result 怎么被收集后計算出最終結(jié)果的。然而,我們還沒有討論數(shù)據(jù)是怎么通過 ShuffleDependency 流向下一個 stage 的?

          對比 Hadoop MapReduce 和 Spark 的 Shuffle 過程

          如果熟悉 Hadoop MapReduce 中的 shuffle 過程,可能會按照 MapReduce 的思路去想象 Spark 的 shuffle 過程。然而,它們之間有一些區(qū)別和聯(lián)系。
          從 high-level 的角度來看,兩者并沒有大的差別。 都是將 mapper(Spark 里是 ShuffleMapTask)的輸出進行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一個 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以內(nèi)存作緩沖區(qū),邊 shuffle 邊 aggregate 數(shù)據(jù),等到數(shù)據(jù) aggregate 好以后進行 reduce() (Spark 里可能是后續(xù)的一系列操作)。
          從 low-level 的角度來看,兩者差別不小。 Hadoop MapReduce 是 sort-based,進入 combine() 和 reduce() 的 records 必須先 sort。這樣的好處在于 combine/reduce() 可以處理大規(guī)模的數(shù)據(jù),因為其輸入數(shù)據(jù)可以通過外排得到(mapper 對每段數(shù)據(jù)先做排序,reducer 的 shuffle 對排好序的每段數(shù)據(jù)做歸并)。目前的 Spark 默認選擇的是 hash-based,通常使用 HashMap 來對 shuffle 來的數(shù)據(jù)進行 aggregate,不會對數(shù)據(jù)進行提前排序。如果用戶需要經(jīng)過排序的數(shù)據(jù),那么需要自己調(diào)用類似 sortByKey() 的操作;如果你是Spark 1.1的用戶,可以將spark.shuffle.manager設(shè)置為sort,則會對數(shù)據(jù)進行排序。在Spark 1.2中,sort將作為默認的Shuffle實現(xiàn)。
          從實現(xiàn)角度來看,兩者也有不少差別。 Hadoop MapReduce 將處理流程劃分出明顯的幾個階段:map(), spill, merge, shuffle, sort, reduce() 等。每個階段各司其職,可以按照過程式的編程思想來逐一實現(xiàn)每個階段的功能。在 Spark 中,沒有這樣功能明確的階段,只有不同的 stage 和一系列的 transformation(),所以 spill, merge, aggregate 等操作需要蘊含在 transformation() 中。
          如果我們將 map 端劃分數(shù)據(jù)、持久化數(shù)據(jù)的過程稱為 shuffle write,而將 reducer 讀入數(shù)據(jù)、aggregate 數(shù)據(jù)的過程稱為 shuffle read。那么在 Spark 中,問題就變?yōu)樵趺丛?job 的邏輯或者物理執(zhí)行圖中加入 shuffle write 和 shuffle read 的處理邏輯?以及兩個處理邏輯應該怎么高效實現(xiàn)?

          Shuffle write

          由于不要求數(shù)據(jù)有序,shuffle write 的任務很簡單:將數(shù)據(jù) partition 好,并持久化。之所以要持久化,一方面是要減少內(nèi)存存儲空間壓力,另一方面也是為了 fault-tolerance。
          shuffle write 的任務很簡單,那么實現(xiàn)也很簡單:將 shuffle write 的處理邏輯加入到 ShuffleMapStage(ShuffleMapTask 所在的 stage) 的最后,該 stage 的 final RDD 每輸出一個 record 就將其 partition 并持久化。圖示如下:

          上圖有 4 個 ShuffleMapTask 要在同一個 worker node 上運行,CPU core 數(shù)為 2,可以同時運行兩個 task。每個 task 的執(zhí)行結(jié)果(該 stage 的 finalRDD 中某個 partition 包含的 records)被逐一寫到本地磁盤上。每個 task 包含 R 個緩沖區(qū),R = reducer 個數(shù)(也就是下一個 stage 中 task 的個數(shù)),緩沖區(qū)被稱為 bucket,其大小為spark.shuffle.file.buffer.kb ,默認是 32KB(Spark 1.1 版本以前是 100KB)。
          其實 bucket 是一個廣義的概念,代表 ShuffleMapTask 輸出結(jié)果經(jīng)過 partition 后要存放的地方,這里為了細化數(shù)據(jù)存放位置和數(shù)據(jù)名稱,僅僅用 bucket 表示緩沖區(qū)。
          ShuffleMapTask 的執(zhí)行過程很簡單:先利用 pipeline 計算得到 finalRDD 中對應 partition 的 records。每得到一個 record 就將其送到對應的 bucket 里,具體是哪個 bucket 由partitioner.partition(record.getKey()))決定。每個 bucket 里面的數(shù)據(jù)會不斷被寫到本地磁盤上,形成一個 ShuffleBlockFile,或者簡稱 FileSegment。之后的 reducer 會去 fetch 屬于自己的 FileSegment,進入 shuffle read 階段。
          這樣的實現(xiàn)很簡單,但有幾個問題:
          1. 產(chǎn)生的 FileSegment 過多。每個 ShuffleMapTask 產(chǎn)生 R(reducer 個數(shù))個 FileSegment,M 個 ShuffleMapTask 就會產(chǎn)生 M * R 個文件。一般 Spark job 的 M 和 R 都很大,因此磁盤上會存在大量的數(shù)據(jù)文件。

          2. 緩沖區(qū)占用內(nèi)存空間大。每個 ShuffleMapTask 需要開 R 個 bucket,M 個 ShuffleMapTask 就會產(chǎn)生 M R 個 bucket。雖然一個 ShuffleMapTask 結(jié)束后,對應的緩沖區(qū)可以被回收,但一個 worker node 上同時存在的 bucket 個數(shù)可以達到 cores R 個(一般 worker 同時可以運行 cores 個 ShuffleMapTask),占用的內(nèi)存空間也就達到了cores * R * 32 KB。對于 8 核 1000 個 reducer 來說,占用內(nèi)存就是 256MB。

          目前來看,第二個問題還沒有好的方法解決,因為寫磁盤終究是要開緩沖區(qū)的,緩沖區(qū)太小會影響 IO 速度。但第一個問題有一些方法去解決,下面介紹已經(jīng)在 Spark 里面實現(xiàn)的 FileConsolidation 方法。先上圖:

          可以明顯看出,在一個 core 上連續(xù)執(zhí)行的 ShuffleMapTasks 可以共用一個輸出文件 ShuffleFile。先執(zhí)行完的 ShuffleMapTask 形成 ShuffleBlock i,后執(zhí)行的 ShuffleMapTask 可以將輸出數(shù)據(jù)直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i',每個 ShuffleBlock 被稱為 FileSegment。下一個 stage 的 reducer 只需要 fetch 整個 ShuffleFile 就行了。這樣,每個 worker 持有的文件數(shù)降為 cores * R。FileConsolidation 功能可以通過spark.shuffle.consolidateFiles=true來開啟。

          Shuffle read

          先看一張包含 ShuffleDependency 的物理執(zhí)行圖,來自 reduceByKey:

          很自然地,要計算 ShuffleRDD 中的數(shù)據(jù),必須先把 MapPartitionsRDD 中的數(shù)據(jù) fetch 過來。那么問題就來了:
          • 在什么時候 fetch,parent stage 中的一個 ShuffleMapTask 執(zhí)行完還是等全部 ShuffleMapTasks 執(zhí)行完?

          • 邊 fetch 邊處理還是一次性 fetch 完再處理?

          • fetch 來的數(shù)據(jù)存放到哪里?

          • 怎么獲得要 fetch 的數(shù)據(jù)的存放位置?

          解決問題:
          • 在什么時候 fetch?當 parent stage 的所有 ShuffleMapTasks 結(jié)束后再 fetch。理論上講,一個 ShuffleMapTask 結(jié)束后就可以 fetch,但是為了迎合 stage 的概念(即一個 stage 如果其 parent stages 沒有執(zhí)行完,自己是不能被提交執(zhí)行的),還是選擇全部 ShuffleMapTasks 執(zhí)行完再去 fetch。因為 fetch 來的 FileSegments 要先在內(nèi)存做緩沖,所以一次 fetch 的 FileSegments 總大小不能太大。Spark 規(guī)定這個緩沖界限不能超過 spark.reducer.maxMbInFlight,這里用 softBuffer 表示,默認大小為 48MB。一個 softBuffer 里面一般包含多個 FileSegment,但如果某個 FileSegment 特別大的話,這一個就可以填滿甚至超過 softBuffer 的界限。

          • 邊 fetch 邊處理還是一次性 fetch 完再處理?邊 fetch 邊處理。本質(zhì)上,MapReduce shuffle 階段就是邊 fetch 邊使用 combine() 進行處理,只是 combine() 處理的是部分數(shù)據(jù)。MapReduce 為了讓進入 reduce() 的 records 有序,必須等到全部數(shù)據(jù)都 shuffle-sort 后再開始 reduce()。因為 Spark 不要求 shuffle 后的數(shù)據(jù)全局有序,因此沒必要等到全部數(shù)據(jù) shuffle 完成后再處理。那么如何實現(xiàn)邊 shuffle 邊處理,而且流入的 records 是無序的?答案是使用可以 aggregate 的數(shù)據(jù)結(jié)構(gòu),比如 HashMap。每 shuffle 得到(從緩沖的 FileSegment 中 deserialize 出來)一個 \ record,直接將其放進 HashMap 里面。如果該 HashMap 已經(jīng)存在相應的 Key,那么直接進行 aggregate 也就是 func(hashMap.get(Key), Value),比如上面 WordCount 例子中的 func 就是 hashMap.get(Key) + Value,并將 func 的結(jié)果重新 put(key) 到 HashMap 中去。這個 func 功能上相當于 reduce(),但實際處理數(shù)據(jù)的方式與 MapReduce reduce() 有差別,差別相當于下面兩段程序的差別。

              // MapReduce
            reduce(K key, Iterable<V> values) {
            result = process(key, values)
            return result
            }

            // Spark
            reduce(K key, Iterable<V> values) {
            result = null
            for (V value : values)
            result
            = func(result, value)
            return result
            }

            MapReduce 可以在 process 函數(shù)里面可以定義任何數(shù)據(jù)結(jié)構(gòu),也可以將部分或全部的 values 都 cache 后再進行處理,非常靈活。而 Spark 中的 func 的輸入?yún)?shù)是固定的,一個是上一個 record 的處理結(jié)果,另一個是當前讀入的 record,它們經(jīng)過 func 處理后的結(jié)果被下一個 record 處理時使用。因此一些算法比如求平均數(shù),在 process 里面很好實現(xiàn),直接sum(values)/values.length,而在 Spark 中 func 可以實現(xiàn)sum(values),但不好實現(xiàn)/values.length。更多的 func 將會在下面的章節(jié)細致分析。

          • fetch 來的數(shù)據(jù)存放到哪里?剛 fetch 來的 FileSegment 存放在 softBuffer 緩沖區(qū),經(jīng)過處理后的數(shù)據(jù)放在內(nèi)存 + 磁盤上。這里我們主要討論處理后的數(shù)據(jù),可以靈活設(shè)置這些數(shù)據(jù)是“只用內(nèi)存”還是“內(nèi)存+磁盤”。如果spark.shuffle.spill = false就只用內(nèi)存。內(nèi)存使用的是AppendOnlyMap ,類似 Java 的HashMap,內(nèi)存+磁盤使用的是ExternalAppendOnlyMap,如果內(nèi)存空間不足時,ExternalAppendOnlyMap可以將 \

             records 進行 sort 后 spill 到磁盤上,等到需要它們的時候再進行歸并,后面會詳解。使用“內(nèi)存+磁盤”的一個主要問題就是如何在兩者之間取得平衡?在 Hadoop MapReduce 中,默認將 reducer 的 70% 的內(nèi)存空間用于存放 shuffle 來的數(shù)據(jù),等到這個空間利用率達到 66% 的時候就開始 merge-combine()-spill。在 Spark 中,也適用同樣的策略,一旦 ExternalAppendOnlyMap 達到一個閾值就開始 spill,具體細節(jié)下面會討論。
          • 怎么獲得要 fetch 的數(shù)據(jù)的存放位置?在上一章討論物理執(zhí)行圖中的 stage 劃分的時候,我們強調(diào) “一個 ShuffleMapStage 形成后,會將該 stage 最后一個 final RDD 注冊到 MapOutputTrackerMaster.registerShuffle(shuffleId, rdd.partitions.size),這一步很重要,因為 shuffle 過程需要 MapOutputTrackerMaster 來指示 ShuffleMapTask 輸出數(shù)據(jù)的位置”。因此,reducer 在 shuffle 的時候是要去 driver 里面的 MapOutputTrackerMaster 詢問 ShuffleMapTask 輸出的數(shù)據(jù)位置的。每個 ShuffleMapTask 完成時會將 FileSegment 的存儲位置信息匯報給 MapOutputTrackerMaster。

          至此,我們已經(jīng)討論了 shuffle write 和 shuffle read 設(shè)計的核心思想、算法及某些實現(xiàn)。接下來,我們深入一些細節(jié)來討論。

          典型 transformation() 的 shuffle read

          1. reduceByKey(func)

          上面初步介紹了 reduceByKey() 是如何實現(xiàn)邊 fetch 邊 reduce() 的。需要注意的是雖然 Example(WordCount) 中給出了各個 RDD 的內(nèi)容,但一個 partition 里面的 records 并不是同時存在的。比如在 ShuffledRDD 中,每 fetch 來一個 record 就立即進入了 func 進行處理。MapPartitionsRDD 中的數(shù)據(jù)是 func 在全部 records 上的處理結(jié)果。從 record 粒度上來看,reduce() 可以表示如下:

          可以看到,fetch 來的 records 被逐個 aggreagte 到 HashMap 中,等到所有 records 都進入 HashMap,就得到最后的處理結(jié)果。唯一要求是 func 必須是 commulative 的(參見上面的 Spark 的 reduce() 的代碼)。
          ShuffledRDD 到 MapPartitionsRDD 使用的是 mapPartitionsWithContext 操作。
          為了減少數(shù)據(jù)傳輸量,MapReduce 可以在 map 端先進行 combine(),其實在 Spark 也可以實現(xiàn),只需要將上圖 ShuffledRDD => MapPartitionsRDD 的 mapPartitionsWithContext 在 ShuffleMapStage 中也進行一次即可,比如 reduceByKey 例子中 ParallelCollectionRDD => MapPartitionsRDD 完成的就是 map 端的 combine()。
          對比 MapReduce 的 map()-reduce() 和 Spark 中的 reduceByKey():
          • map 端的區(qū)別:map() 沒有區(qū)別。對于 combine(),MapReduce 先 sort 再 combine(),Spark 直接在 HashMap 上進行 combine()。

          • reduce 端區(qū)別:MapReduce 的 shuffle 階段先 fetch 數(shù)據(jù),數(shù)據(jù)量到達一定規(guī)模后 combine(),再將剩余數(shù)據(jù) merge-sort 后 reduce(),reduce() 非常靈活。Spark 邊 fetch 邊 reduce()(在 HashMap 上執(zhí)行 func),因此要求 func 符合 commulative 的特性。

          從內(nèi)存利用上來對比:
          • map 端區(qū)別:MapReduce 需要開一個大型環(huán)形緩沖區(qū)來暫存和排序 map() 的部分輸出結(jié)果,但 combine() 不需要額外空間(除非用戶自己定義)。Spark 需要 HashMap 內(nèi)存數(shù)據(jù)結(jié)構(gòu)來進行 combine(),同時輸出 records 到磁盤上時也需要一個小的 buffer(bucket)。

          • reduce 端區(qū)別:MapReduce 需要一部分內(nèi)存空間來存儲 shuffle 過來的數(shù)據(jù),combine() 和 reduce() 不需要額外空間,因為它們的輸入數(shù)據(jù)分段有序,只需歸并一下就可以得到。在 Spark 中,fetch 時需要 softBuffer,處理數(shù)據(jù)時如果只使用內(nèi)存,那么需要 HashMap 來持有處理后的結(jié)果。如果使用內(nèi)存+磁盤,那么在 HashMap 存放一部分處理后的數(shù)據(jù)。

          2. groupByKey(numPartitions)

          與 reduceByKey() 流程一樣,只是 func 變成 result = result ++ record.value,功能是將每個 key 對應的所有 values 鏈接在一起。result 來自 hashMap.get(record.key),計算后的 result 會再次被 put 到 hashMap 中。與 reduceByKey() 的區(qū)別就是 groupByKey() 沒有 map 端的 combine()。對于 groupByKey() 來說 map 端的 combine() 只是減少了重復 Key 占用的空間,如果 key 重復率不高,沒必要 combine(),否則,最好能夠 combine()。

          3. distinct(numPartitions)

          與 reduceByKey() 流程一樣,只是 func 變成 result = result == null? record.value : result,如果 HashMap 中沒有該 record 就將其放入,否則舍棄。與 reduceByKey() 相同,在map 端存在 combine()。

          4. cogroup(otherRDD, numPartitions)

          CoGroupedRDD 可能有 0 個、1 個或者多個 ShuffleDependency。但并不是要為每一個 ShuffleDependency 建立一個 HashMap,而是所有的 Dependency 共用一個 HashMap。與 reduceByKey() 不同的是,HashMap 在 CoGroupedRDD 的 compute() 中建立,而不是在 mapPartitionsWithContext() 中建立。
          粗線表示的 task 首先 new 出一個 Array[ArrayBuffer(), ArrayBuffer()],ArrayBuffer() 的個數(shù)與參與 cogroup 的 RDD 個數(shù)相同。func 的邏輯是這樣的:每當從 RDD a 中 shuffle 過來一個 \ record 就將其添加到 hashmap.get(Key) 對應的 Array 中的第一個 ArrayBuffer() 中,每當從 RDD b 中 shuffle 過來一個 record,就將其添加到對應的 Array 中的第二個 ArrayBuffer()。
          CoGroupedRDD => MappedValuesRDD 對應 mapValues() 操作,就是將 [ArrayBuffer(), ArrayBuffer()] 變成 [Iterable[V], Iterable[W]]。

          5. intersection(otherRDD) 和 join(otherRDD, numPartitions)

           這兩個操作中均使用了 cogroup,所以 shuffle 的處理方式與 cogroup 一樣。

          6. sortByKey(ascending, numPartitions)

          sortByKey() 中 ShuffledRDD => MapPartitionsRDD 的處理邏輯與 reduceByKey() 不太一樣,沒有使用 HashMap 和 func 來處理 fetch 過來的 records。
          sortByKey() 中 ShuffledRDD => MapPartitionsRDD 的處理邏輯是:將 shuffle 過來的一個個 record 存放到一個 Array 里,然后按照 Key 來對 Array 中的 records 進行 sort。

          7. coalesce(numPartitions, shuffle = true)

          coalesce() 雖然有 ShuffleDependency,但不需要對 shuffle 過來的 records 進行 aggregate,所以沒有建立 HashMap。每 shuffle 一個 record,就直接流向 CoalescedRDD,進而流向 MappedRDD 中。

          Shuffle read 中的 HashMap

          HashMap 是 Spark shuffle read 過程中頻繁使用的、用于 aggregate 的數(shù)據(jù)結(jié)構(gòu)。Spark 設(shè)計了兩種:一種是全內(nèi)存的 AppendOnlyMap,另一種是內(nèi)存+磁盤的 ExternalAppendOnlyMap。下面我們來分析一下兩者特性及內(nèi)存使用情況

          1. AppendOnlyMap

          AppendOnlyMap 的官方介紹是 A simple open hash table optimized for the append-only use case, where keys are never removed, but the value for each key may be changed。意思是類似 HashMap,但沒有remove(key)方法。其實現(xiàn)原理很簡單,開一個大 Object 數(shù)組,藍色部分存儲 Key,白色部分存儲 Value。如下圖:

          當要 put(K, V) 時,先 hash(K) 找存放位置,如果存放位置已經(jīng)被占用,就使用 Quadratic probing 探測方法來找下一個空閑位置。對于圖中的 K6 來說,第三次查找找到 K4 后面的空閑位置,放進去即可。get(K6) 的時候類似,找三次找到 K6,取出緊挨著的 V6,與先來的 value 做 func,結(jié)果重新放到 V6 的位置。
          迭代 AppendOnlyMap 中的元素的時候,從前到后掃描輸出。
          如果 Array 的利用率達到 70%,那么就擴張一倍,并對所有 key 進行 rehash 后,重新排列每個 key 的位置。
          AppendOnlyMap 還有一個 destructiveSortedIterator(): Iterator[(K, V)] 方法,可以返回 Array 中排序后的 (K, V) pairs。實現(xiàn)方法很簡單:先將所有 (K, V) pairs compact 到 Array 的前端,并使得每個 (K, V) 占一個位置(原來占兩個),之后直接調(diào)用 Array.sort() 排序,不過這樣做會破壞數(shù)組(key 的位置變化了)。

          2. ExternalAppendOnlyMap

          相比 AppendOnlyMap,ExternalAppendOnlyMap 的實現(xiàn)略復雜,但邏輯其實很簡單,類似 Hadoop MapReduce 中的 shuffle-merge-combine-sort 過程:
          ExternalAppendOnlyMap 持有一個 AppendOnlyMap,shuffle 來的一個個 (K, V) record 先 insert 到 AppendOnlyMap 中,insert 過程與原始的 AppendOnlyMap 一模一樣。如果 AppendOnlyMap 快被裝滿時檢查一下內(nèi)存剩余空間是否可以夠擴展,夠就直接在內(nèi)存中擴展,不夠就 sort 一下 AppendOnlyMap,將其內(nèi)部所有 records 都 spill 到磁盤上。圖中 spill 了 4 次,每次 spill 完在磁盤上生成一個 spilledMap 文件,然后重新 new 出來一個 AppendOnlyMap。最后一個 (K, V) record insert 到 AppendOnlyMap 后,表示所有 shuffle 來的 records 都被放到了 ExternalAppendOnlyMap 中,但不表示 records 已經(jīng)被處理完,因為每次 insert 的時候,新來的 record 只與 AppendOnlyMap 中的 records 進行 aggregate,并不是與所有的 records 進行 aggregate(一些 records 已經(jīng)被 spill 到磁盤上了)。因此當需要 aggregate 的最終結(jié)果時,需要對 AppendOnlyMap 和所有的 spilledMaps 進行全局 merge-aggregate。
          全局 merge-aggregate 的流程也很簡單:先將 AppendOnlyMap 中的 records 進行 sort,形成 sortedMap。然后利用 DestructiveSortedIterator 和 DiskMapIterator 分別從 sortedMap 和各個 spilledMap 讀出一部分數(shù)據(jù)(StreamBuffer)放到 mergeHeap 里面。StreamBuffer 里面包含的 records 需要具有相同的 hash(key),所以圖中第一個 spilledMap 只讀出前三個 records 進入 StreamBuffer。mergeHeap 顧名思義就是使用堆排序不斷提取出 hash(firstRecord.Key) 相同的 StreamBuffer,并將其一個個放入 mergeBuffers 中,放入的時候與已經(jīng)存在于 mergeBuffers 中的 StreamBuffer 進行 merge-combine,第一個被放入 mergeBuffers 的 StreamBuffer 被稱為 minBuffer,那么 minKey 就是 minBuffer 中第一個 record 的 key。當 merge-combine 的時候,與 minKey 相同的 records 被 aggregate 一起,然后輸出。整個 merge-combine 在 mergeBuffers 中結(jié)束后,StreamBuffer 剩余的 records 隨著 StreamBuffer 重新進入 mergeHeap。一旦某個 StreamBuffer 在 merge-combine 后變?yōu)榭眨ɡ锩娴?records 都被輸出了),那么會使用 DestructiveSortedIterator 或 DiskMapIterator 重新裝填 hash(key) 相同的 records,然后再重新進入 mergeHeap。
          整個 insert-merge-aggregate 的過程有三點需要進一步探討一下:
          • 內(nèi)存剩余空間檢測

            與 Hadoop MapReduce 規(guī)定 reducer 中 70% 的空間可用于 shuffle-sort 類似,Spark 也規(guī)定 executor 中 spark.shuffle.memoryFraction * spark.shuffle.safetyFraction 的空間(默認是0.3 * 0.8)可用于 ExternalOnlyAppendMap。Spark 略保守是不是?更保守的是這 24% 的空間不是完全用于一個 ExternalOnlyAppendMap 的,而是由在 executor 上同時運行的所有 reducer 共享的。為此,exectuor 專門持有一個 ShuffleMemroyMap: HashMap[threadId, occupiedMemory] 來監(jiān)控每個 reducer 中 ExternalOnlyAppendMap 占用的內(nèi)存量。每當 AppendOnlyMap 要擴展時,都會計算 ShuffleMemroyMap 持有的所有 reducer 中的 AppendOnlyMap 已占用的內(nèi)存 + 擴展后的內(nèi)存 是會否會大于內(nèi)存限制,大于就會將 AppendOnlyMap spill 到磁盤。有一點需要注意的是前 1000 個 records 進入 AppendOnlyMap 的時候不會啟動是否要 spill 的檢查,需要擴展時就直接在內(nèi)存中擴展。

          • AppendOnlyMap 大小估計

            為了獲知 AppendOnlyMap 占用的內(nèi)存空間,可以在每次擴展時都將 AppendOnlyMap reference 的所有 objects 大小都算一遍,然后加和,但這樣做非常耗時。所以 Spark 設(shè)計了粗略的估算算法,算法時間復雜度是 O(1),核心思想是利用 AppendOnlyMap 中每次 insert-aggregate record 后 result 的大小變化及一共 insert 的 records 的個數(shù)來估算大小,具體見 SizeTrackingAppendOnlyMap 和 SizeEstimator。

          • Spill 過程

            與 shuffle write 一樣,在 spill records 到磁盤上的時候,會建立一個 buffer 緩沖區(qū),大小仍為 spark.shuffle.file.buffer.kb ,默認是 32KB。另外,由于 serializer 也會分配緩沖區(qū)用于序列化和反序列化,所以如果一次 serialize 的 records 過多的話緩沖區(qū)會變得很大。Spark 限制每次 serialize 的 records 個數(shù)為 spark.shuffle.spill.batchSize,默認是 10000。

          Discussion

          通過本章的介紹可以發(fā)現(xiàn),相比 MapReduce 固定的 shuffle-combine-merge-reduce 策略,Spark 更加靈活,會根據(jù)不同的 transformation() 的語義去設(shè)計不同的 shuffle-aggregate 策略,再加上不同的內(nèi)存數(shù)據(jù)結(jié)構(gòu)來混搭出合理的執(zhí)行流程。
          這章主要討論了 Spark 是怎么在不排序 records 的情況下完成 shuffle write 和 shuffle read,以及怎么將 shuffle 過程融入 RDD computing chain 中的。附帶討論了內(nèi)存與磁盤的平衡以及與 Hadoop MapReduce shuffle 的異同。下一章將從部署圖以及進程通信角度來描述 job 執(zhí)行的整個流程,也會涉及 shuffle write 和 shuffle read 中的數(shù)據(jù)位置獲取問題。

          Spark的Cache和Checkpoint區(qū)別和聯(lián)系拾遺

          Spark Job 邏輯執(zhí)行圖和數(shù)據(jù)依賴解析

          Spark Job 物理執(zhí)行圖詳解



          歡迎點贊+收藏+轉(zhuǎn)發(fā)朋友圈素質(zhì)三連


          文章不錯?點個【在看】吧! 
          瀏覽 52
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  成人在线黄色片 | a在线级电影网站 | 天天日天天干天天插 | 啊啊啊啊啊靠逼 | 日本在线中文不卡 |