Spark Shuffle過程詳解

對比 Hadoop MapReduce 和 Spark 的 Shuffle 過程
Shuffle write

spark.shuffle.file.buffer.kb ,默認是 32KB(Spark 1.1 版本以前是 100KB)。其實 bucket 是一個廣義的概念,代表 ShuffleMapTask 輸出結(jié)果經(jīng)過 partition 后要存放的地方,這里為了細化數(shù)據(jù)存放位置和數(shù)據(jù)名稱,僅僅用 bucket 表示緩沖區(qū)。
partitioner.partition(record.getKey()))決定。每個 bucket 里面的數(shù)據(jù)會不斷被寫到本地磁盤上,形成一個 ShuffleBlockFile,或者簡稱 FileSegment。之后的 reducer 會去 fetch 屬于自己的 FileSegment,進入 shuffle read 階段。產(chǎn)生的 FileSegment 過多。每個 ShuffleMapTask 產(chǎn)生 R(reducer 個數(shù))個 FileSegment,M 個 ShuffleMapTask 就會產(chǎn)生 M * R 個文件。一般 Spark job 的 M 和 R 都很大,因此磁盤上會存在大量的數(shù)據(jù)文件。
緩沖區(qū)占用內(nèi)存空間大。每個 ShuffleMapTask 需要開 R 個 bucket,M 個 ShuffleMapTask 就會產(chǎn)生 M R 個 bucket。雖然一個 ShuffleMapTask 結(jié)束后,對應的緩沖區(qū)可以被回收,但一個 worker node 上同時存在的 bucket 個數(shù)可以達到 cores R 個(一般 worker 同時可以運行 cores 個 ShuffleMapTask),占用的內(nèi)存空間也就達到了
cores * R * 32 KB。對于 8 核 1000 個 reducer 來說,占用內(nèi)存就是 256MB。

spark.shuffle.consolidateFiles=true來開啟。Shuffle read

在什么時候 fetch,parent stage 中的一個 ShuffleMapTask 執(zhí)行完還是等全部 ShuffleMapTasks 執(zhí)行完?
邊 fetch 邊處理還是一次性 fetch 完再處理?
fetch 來的數(shù)據(jù)存放到哪里?
怎么獲得要 fetch 的數(shù)據(jù)的存放位置?
在什么時候 fetch?當 parent stage 的所有 ShuffleMapTasks 結(jié)束后再 fetch。理論上講,一個 ShuffleMapTask 結(jié)束后就可以 fetch,但是為了迎合 stage 的概念(即一個 stage 如果其 parent stages 沒有執(zhí)行完,自己是不能被提交執(zhí)行的),還是選擇全部 ShuffleMapTasks 執(zhí)行完再去 fetch。因為 fetch 來的 FileSegments 要先在內(nèi)存做緩沖,所以一次 fetch 的 FileSegments 總大小不能太大。Spark 規(guī)定這個緩沖界限不能超過
spark.reducer.maxMbInFlight,這里用 softBuffer 表示,默認大小為 48MB。一個 softBuffer 里面一般包含多個 FileSegment,但如果某個 FileSegment 特別大的話,這一個就可以填滿甚至超過 softBuffer 的界限。邊 fetch 邊處理還是一次性 fetch 完再處理?邊 fetch 邊處理。本質(zhì)上,MapReduce shuffle 階段就是邊 fetch 邊使用 combine() 進行處理,只是 combine() 處理的是部分數(shù)據(jù)。MapReduce 為了讓進入 reduce() 的 records 有序,必須等到全部數(shù)據(jù)都 shuffle-sort 后再開始 reduce()。因為 Spark 不要求 shuffle 后的數(shù)據(jù)全局有序,因此沒必要等到全部數(shù)據(jù) shuffle 完成后再處理。那么如何實現(xiàn)邊 shuffle 邊處理,而且流入的 records 是無序的?答案是使用可以 aggregate 的數(shù)據(jù)結(jié)構(gòu),比如 HashMap。每 shuffle 得到(從緩沖的 FileSegment 中 deserialize 出來)一個 \
record,直接將其放進 HashMap 里面。如果該 HashMap 已經(jīng)存在相應的 Key,那么直接進行 aggregate 也就是 func(hashMap.get(Key), Value),比如上面 WordCount 例子中的 func 就是hashMap.get(Key) + Value,并將 func 的結(jié)果重新 put(key) 到 HashMap 中去。這個 func 功能上相當于 reduce(),但實際處理數(shù)據(jù)的方式與 MapReduce reduce() 有差別,差別相當于下面兩段程序的差別。// MapReduce
reduce(K key, Iterable<V> values) {
result = process(key, values)
return result
}
// Spark
reduce(K key, Iterable<V> values) {
result = null
for (V value : values)
result = func(result, value)
return result
}MapReduce 可以在 process 函數(shù)里面可以定義任何數(shù)據(jù)結(jié)構(gòu),也可以將部分或全部的 values 都 cache 后再進行處理,非常靈活。而 Spark 中的 func 的輸入?yún)?shù)是固定的,一個是上一個 record 的處理結(jié)果,另一個是當前讀入的 record,它們經(jīng)過 func 處理后的結(jié)果被下一個 record 處理時使用。因此一些算法比如求平均數(shù),在 process 里面很好實現(xiàn),直接
sum(values)/values.length,而在 Spark 中 func 可以實現(xiàn)sum(values),但不好實現(xiàn)/values.length。更多的 func 將會在下面的章節(jié)細致分析。fetch 來的數(shù)據(jù)存放到哪里?剛 fetch 來的 FileSegment 存放在 softBuffer 緩沖區(qū),經(jīng)過處理后的數(shù)據(jù)放在內(nèi)存 + 磁盤上。這里我們主要討論處理后的數(shù)據(jù),可以靈活設(shè)置這些數(shù)據(jù)是“只用內(nèi)存”還是“內(nèi)存+磁盤”。如果
spark.shuffle.spill = false就只用內(nèi)存。內(nèi)存使用的是AppendOnlyMap,類似 Java 的HashMap,內(nèi)存+磁盤使用的是ExternalAppendOnlyMap,如果內(nèi)存空間不足時,ExternalAppendOnlyMap可以將 \records 進行 sort 后 spill 到磁盤上,等到需要它們的時候再進行歸并,后面會詳解。使用“內(nèi)存+磁盤”的一個主要問題就是如何在兩者之間取得平衡?在 Hadoop MapReduce 中,默認將 reducer 的 70% 的內(nèi)存空間用于存放 shuffle 來的數(shù)據(jù),等到這個空間利用率達到 66% 的時候就開始 merge-combine()-spill。在 Spark 中,也適用同樣的策略,一旦 ExternalAppendOnlyMap 達到一個閾值就開始 spill,具體細節(jié)下面會討論。 怎么獲得要 fetch 的數(shù)據(jù)的存放位置?在上一章討論物理執(zhí)行圖中的 stage 劃分的時候,我們強調(diào) “一個 ShuffleMapStage 形成后,會將該 stage 最后一個 final RDD 注冊到
MapOutputTrackerMaster.registerShuffle(shuffleId, rdd.partitions.size),這一步很重要,因為 shuffle 過程需要 MapOutputTrackerMaster 來指示 ShuffleMapTask 輸出數(shù)據(jù)的位置”。因此,reducer 在 shuffle 的時候是要去 driver 里面的 MapOutputTrackerMaster 詢問 ShuffleMapTask 輸出的數(shù)據(jù)位置的。每個 ShuffleMapTask 完成時會將 FileSegment 的存儲位置信息匯報給 MapOutputTrackerMaster。
典型 transformation() 的 shuffle read
1. reduceByKey(func)

map 端的區(qū)別:map() 沒有區(qū)別。對于 combine(),MapReduce 先 sort 再 combine(),Spark 直接在 HashMap 上進行 combine()。
reduce 端區(qū)別:MapReduce 的 shuffle 階段先 fetch 數(shù)據(jù),數(shù)據(jù)量到達一定規(guī)模后 combine(),再將剩余數(shù)據(jù) merge-sort 后 reduce(),reduce() 非常靈活。Spark 邊 fetch 邊 reduce()(在 HashMap 上執(zhí)行 func),因此要求 func 符合 commulative 的特性。
map 端區(qū)別:MapReduce 需要開一個大型環(huán)形緩沖區(qū)來暫存和排序 map() 的部分輸出結(jié)果,但 combine() 不需要額外空間(除非用戶自己定義)。Spark 需要 HashMap 內(nèi)存數(shù)據(jù)結(jié)構(gòu)來進行 combine(),同時輸出 records 到磁盤上時也需要一個小的 buffer(bucket)。
reduce 端區(qū)別:MapReduce 需要一部分內(nèi)存空間來存儲 shuffle 過來的數(shù)據(jù),combine() 和 reduce() 不需要額外空間,因為它們的輸入數(shù)據(jù)分段有序,只需歸并一下就可以得到。在 Spark 中,fetch 時需要 softBuffer,處理數(shù)據(jù)時如果只使用內(nèi)存,那么需要 HashMap 來持有處理后的結(jié)果。如果使用內(nèi)存+磁盤,那么在 HashMap 存放一部分處理后的數(shù)據(jù)。
2. groupByKey(numPartitions)

result = result ++ record.value,功能是將每個 key 對應的所有 values 鏈接在一起。result 來自 hashMap.get(record.key),計算后的 result 會再次被 put 到 hashMap 中。與 reduceByKey() 的區(qū)別就是 groupByKey() 沒有 map 端的 combine()。對于 groupByKey() 來說 map 端的 combine() 只是減少了重復 Key 占用的空間,如果 key 重復率不高,沒必要 combine(),否則,最好能夠 combine()。3. distinct(numPartitions)

result = result == null? record.value : result,如果 HashMap 中沒有該 record 就將其放入,否則舍棄。與 reduceByKey() 相同,在map 端存在 combine()。4. cogroup(otherRDD, numPartitions)

5. intersection(otherRDD) 和 join(otherRDD, numPartitions)


6. sortByKey(ascending, numPartitions)

7. coalesce(numPartitions, shuffle = true)

Shuffle read 中的 HashMap
1. AppendOnlyMap
remove(key)方法。其實現(xiàn)原理很簡單,開一個大 Object 數(shù)組,藍色部分存儲 Key,白色部分存儲 Value。如下圖:
destructiveSortedIterator(): Iterator[(K, V)] 方法,可以返回 Array 中排序后的 (K, V) pairs。實現(xiàn)方法很簡單:先將所有 (K, V) pairs compact 到 Array 的前端,并使得每個 (K, V) 占一個位置(原來占兩個),之后直接調(diào)用 Array.sort() 排序,不過這樣做會破壞數(shù)組(key 的位置變化了)。2. ExternalAppendOnlyMap

內(nèi)存剩余空間檢測
與 Hadoop MapReduce 規(guī)定 reducer 中 70% 的空間可用于 shuffle-sort 類似,Spark 也規(guī)定 executor 中
spark.shuffle.memoryFraction * spark.shuffle.safetyFraction的空間(默認是0.3 * 0.8)可用于 ExternalOnlyAppendMap。Spark 略保守是不是?更保守的是這 24% 的空間不是完全用于一個 ExternalOnlyAppendMap 的,而是由在 executor 上同時運行的所有 reducer 共享的。為此,exectuor 專門持有一個ShuffleMemroyMap: HashMap[threadId, occupiedMemory]來監(jiān)控每個 reducer 中 ExternalOnlyAppendMap 占用的內(nèi)存量。每當 AppendOnlyMap 要擴展時,都會計算 ShuffleMemroyMap 持有的所有 reducer 中的 AppendOnlyMap 已占用的內(nèi)存 + 擴展后的內(nèi)存 是會否會大于內(nèi)存限制,大于就會將 AppendOnlyMap spill 到磁盤。有一點需要注意的是前 1000 個 records 進入 AppendOnlyMap 的時候不會啟動是否要 spill 的檢查,需要擴展時就直接在內(nèi)存中擴展。AppendOnlyMap 大小估計
為了獲知 AppendOnlyMap 占用的內(nèi)存空間,可以在每次擴展時都將 AppendOnlyMap reference 的所有 objects 大小都算一遍,然后加和,但這樣做非常耗時。所以 Spark 設(shè)計了粗略的估算算法,算法時間復雜度是 O(1),核心思想是利用 AppendOnlyMap 中每次 insert-aggregate record 后 result 的大小變化及一共 insert 的 records 的個數(shù)來估算大小,具體見
SizeTrackingAppendOnlyMap和SizeEstimator。Spill 過程
與 shuffle write 一樣,在 spill records 到磁盤上的時候,會建立一個 buffer 緩沖區(qū),大小仍為
spark.shuffle.file.buffer.kb,默認是 32KB。另外,由于 serializer 也會分配緩沖區(qū)用于序列化和反序列化,所以如果一次 serialize 的 records 過多的話緩沖區(qū)會變得很大。Spark 限制每次 serialize 的 records 個數(shù)為spark.shuffle.spill.batchSize,默認是 10000。
Discussion

