解決Spark數(shù)據(jù)傾斜全面總結(jié)(收藏版)

本文轉(zhuǎn)發(fā)自技術(shù)世界,原文鏈接:http://www.jasongj.com/spark/skew/

假期發(fā)現(xiàn)一篇好文章,分享給大家。推薦收藏轉(zhuǎn)發(fā)朋友圈備用。
導(dǎo)讀
本文結(jié)合實(shí)例詳細(xì)闡明了Spark數(shù)據(jù)傾斜的幾種場景以及對應(yīng)的解決方案,包括避免數(shù)據(jù)源傾斜,調(diào)整并行度,使用自定義Partitioner,使用Map側(cè)Join代替Reduce側(cè)Join,給傾斜Key加上隨機(jī)前綴等。
為何要處理數(shù)據(jù)傾斜(Data Skew)
什么是數(shù)據(jù)傾斜
對Spark/Hadoop這樣的大數(shù)據(jù)系統(tǒng)來講,數(shù)據(jù)量大并不可怕,可怕的是數(shù)據(jù)傾斜。
何謂數(shù)據(jù)傾斜?數(shù)據(jù)傾斜指的是,并行處理的數(shù)據(jù)集中,某一部分(如Spark或Kafka的一個(gè)Partition)的數(shù)據(jù)顯著多于其它部分,從而使得該部分的處理速度成為整個(gè)數(shù)據(jù)集處理的瓶頸。
對于分布式系統(tǒng)而言,理想情況下,隨著系統(tǒng)規(guī)模(節(jié)點(diǎn)數(shù)量)的增加,應(yīng)用整體耗時(shí)線性下降。如果一臺機(jī)器處理一批大量數(shù)據(jù)需要120分鐘,當(dāng)機(jī)器數(shù)量增加到三時(shí),理想的耗時(shí)為120 / 3 = 40分鐘,如下圖所示
但是,上述情況只是理想情況,實(shí)際上將單機(jī)任務(wù)轉(zhuǎn)換成分布式任務(wù)后,會(huì)有overhead,使得總的任務(wù)量較之單機(jī)時(shí)有所增加,所以每臺機(jī)器的執(zhí)行時(shí)間加起來比單臺機(jī)器時(shí)更大。這里暫不考慮這些overhead,假設(shè)單機(jī)任務(wù)轉(zhuǎn)換成分布式任務(wù)后,總?cè)蝿?wù)量不變。
但即使如此,想做到分布式情況下每臺機(jī)器執(zhí)行時(shí)間是單機(jī)時(shí)的1 / N,就必須保證每臺機(jī)器的任務(wù)量相等。不幸的是,很多時(shí)候,任務(wù)的分配是不均勻的,甚至不均勻到大部分任務(wù)被分配到個(gè)別機(jī)器上,其它大部分機(jī)器所分配的任務(wù)量只占總得的小部分。比如一臺機(jī)器負(fù)責(zé)處理80%的任務(wù),另外兩臺機(jī)器各處理10%的任務(wù),如下圖所示
在上圖中,機(jī)器數(shù)據(jù)增加為三倍,但執(zhí)行時(shí)間只降為原來的80%,遠(yuǎn)低于理想值。
數(shù)據(jù)傾斜的危害
從上圖可見,當(dāng)出現(xiàn)數(shù)據(jù)傾斜時(shí),小量任務(wù)耗時(shí)遠(yuǎn)高于其它任務(wù),從而使得整體耗時(shí)過大,未能充分發(fā)揮分布式系統(tǒng)的并行計(jì)算優(yōu)勢。
另外,當(dāng)發(fā)生數(shù)據(jù)傾斜時(shí),部分任務(wù)處理的數(shù)據(jù)量過大,可能造成內(nèi)存不足使得任務(wù)失敗,并進(jìn)而引進(jìn)整個(gè)應(yīng)用失敗。
數(shù)據(jù)傾斜是如何造成的
在Spark中,同一個(gè)Stage的不同Partition可以并行處理,而具有依賴關(guān)系的不同Stage之間是串行處理的。假設(shè)某個(gè)Spark Job分為Stage 0和Stage 1兩個(gè)Stage,且Stage 1依賴于Stage 0,那Stage 0完全處理結(jié)束之前不會(huì)處理Stage 1。而Stage 0可能包含N個(gè)Task,這N個(gè)Task可以并行進(jìn)行。如果其中N-1個(gè)Task都在10秒內(nèi)完成,而另外一個(gè)Task卻耗時(shí)1分鐘,那該Stage的總時(shí)間至少為1分鐘。換句話說,一個(gè)Stage所耗費(fèi)的時(shí)間,主要由最慢的那個(gè)Task決定。
由于同一個(gè)Stage內(nèi)的所有Task執(zhí)行相同的計(jì)算,在排除不同計(jì)算節(jié)點(diǎn)計(jì)算能力差異的前提下,不同Task之間耗時(shí)的差異主要由該Task所處理的數(shù)據(jù)量決定。
Stage的數(shù)據(jù)來源主要分為如下兩類
從數(shù)據(jù)源直接讀取。如讀取HDFS,Kafka
讀取上一個(gè)Stage的Shuffle數(shù)據(jù)
如何緩解/消除數(shù)據(jù)傾斜
避免數(shù)據(jù)源的數(shù)據(jù)傾斜 ———— 讀Kafka
以Spark Stream通過DirectStream方式讀取Kafka數(shù)據(jù)為例。由于Kafka的每一個(gè)Partition對應(yīng)Spark的一個(gè)Task(Partition),所以Kafka內(nèi)相關(guān)Topic的各Partition之間數(shù)據(jù)是否平衡,直接決定Spark處理該數(shù)據(jù)時(shí)是否會(huì)產(chǎn)生數(shù)據(jù)傾斜。
如《Kafka設(shè)計(jì)解析(一)- Kafka背景及架構(gòu)介紹》一文所述,Kafka某一Topic內(nèi)消息在不同Partition之間的分布,主要由Producer端所使用的Partition實(shí)現(xiàn)類決定。如果使用隨機(jī)Partitioner,則每條消息會(huì)隨機(jī)發(fā)送到一個(gè)Partition中,從而從概率上來講,各Partition間的數(shù)據(jù)會(huì)達(dá)到平衡。此時(shí)源Stage(直接讀取Kafka數(shù)據(jù)的Stage)不會(huì)產(chǎn)生數(shù)據(jù)傾斜。
但很多時(shí)候,業(yè)務(wù)場景可能會(huì)要求將具備同一特征的數(shù)據(jù)順序消費(fèi),此時(shí)就需要將具有相同特征的數(shù)據(jù)放于同一個(gè)Partition中。一個(gè)典型的場景是,需要將同一個(gè)用戶相關(guān)的PV信息置于同一個(gè)Partition中。此時(shí),如果產(chǎn)生了數(shù)據(jù)傾斜,則需要通過其它方式處理。
避免數(shù)據(jù)源的數(shù)據(jù)傾斜 ———— 讀文件
原理
Spark以通過textFile(path, minPartitions)方法讀取文件時(shí),使用TextFileFormat。
對于不可切分的文件,每個(gè)文件對應(yīng)一個(gè)Split從而對應(yīng)一個(gè)Partition。此時(shí)各文件大小是否一致,很大程度上決定了是否存在數(shù)據(jù)源側(cè)的數(shù)據(jù)傾斜。另外,對于不可切分的壓縮文件,即使壓縮后的文件大小一致,它所包含的實(shí)際數(shù)據(jù)量也可能差別很多,因?yàn)樵次募?shù)據(jù)重復(fù)度越高,壓縮比越高。反過來,即使壓縮文件大小接近,但由于壓縮比可能差距很大,所需處理的數(shù)據(jù)量差距也可能很大。
此時(shí)可通過在數(shù)據(jù)生成端將不可切分文件存儲為可切分文件,或者保證各文件包含數(shù)據(jù)量相同的方式避免數(shù)據(jù)傾斜。
對于可切分的文件,每個(gè)Split大小由如下算法決定。其中g(shù)oalSize等于所有文件總大小除以minPartitions。而blockSize,如果是HDFS文件,由文件本身的block大小決定;如果是Linux本地文件,且使用本地模式,由fs.local.block.size決定。
protected long computeSplitSize(long goalSize, long minSize, long blockSize) {return Math.max(minSize, Math.min(goalSize, blockSize));}
默認(rèn)情況下各Split的大小不會(huì)太大,一般相當(dāng)于一個(gè)Block大小(在Hadoop 2中,默認(rèn)值為128MB),所以數(shù)據(jù)傾斜問題不明顯。如果出現(xiàn)了嚴(yán)重的數(shù)據(jù)傾斜,可通過上述參數(shù)調(diào)整。
案例
現(xiàn)通過腳本生成一些文本文件,并通過如下代碼進(jìn)行簡單的單詞計(jì)數(shù)。為避免Shuffle,只計(jì)單詞總個(gè)數(shù),不須對單詞進(jìn)行分組計(jì)數(shù)。
SparkConf sparkConf = new SparkConf().setAppName("ReadFileSkewDemo");JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);long count = javaSparkContext.textFile(inputFile, minPartitions).flatMap((String line) -> Arrays.asList(line.split(" ")).iterator()).count();System.out.printf("total words : %s", count);javaSparkContext.stop();
總共生成如下11個(gè)csv文件,其中10個(gè)大小均為271.9MB,另外一個(gè)大小為8.5GB。
之后將8.5GB大小的文件使用gzip壓縮,壓縮后大小僅為25.3MB。
使用如上代碼對未壓縮文件夾進(jìn)行單詞計(jì)數(shù)操作。Split大小為 max(minSize, min(goalSize, blockSize) = max(1 B, min((271.9 10+8.5?1024) / 1 MB, 128 MB) = 128MB。無明顯數(shù)據(jù)傾斜。
使用同樣代碼對包含壓縮文件的文件夾進(jìn)行同樣的單詞計(jì)數(shù)操作。未壓縮文件的Split大小仍然為128MB,而壓縮文件(gzip壓縮)由于不可切分,且大小僅為25.3MB,因此該文件作為一個(gè)單獨(dú)的Split/Partition。雖然該文件相對較小,但是它由8.5GB文件壓縮而來,包含數(shù)據(jù)量是其它未壓縮文件的32倍,因此處理該Split/Partition/文件的Task耗時(shí)為4.4分鐘,遠(yuǎn)高于其它Task的10秒。
由于上述gzip壓縮文件大小為25.3MB,小于128MB的Split大小,不能證明gzip壓縮文件不可切分。現(xiàn)將minPartitions從默認(rèn)的1設(shè)置為229,從而目標(biāo)Split大小為max(minSize, min(goalSize, blockSize) = max(1 B, min((271.9 * 10+25.3) / 229 MB, 128 MB) = 12 MB。如果gzip壓縮文件可切分,則所有Split/Partition大小都不會(huì)遠(yuǎn)大于12。反之,如果仍然存在25.3MB的Partition,則說明gzip壓縮文件確實(shí)不可切分,在生成不可切分文件時(shí)需要如上文所述保證各文件數(shù)量大大致相同。
如下圖所示,gzip壓縮文件對應(yīng)的Split/Partition大小為25.3MB,其它Split大小均為12MB左右。而該Task耗時(shí)4.7分鐘,遠(yuǎn)大于其它Task的4秒。

總結(jié)
適用場景
數(shù)據(jù)源側(cè)存在不可切分文件,且文件內(nèi)包含的數(shù)據(jù)量相差較大。
解決方案
盡量使用可切分的格式代替不可切分的格式,或者保證各文件實(shí)際包含數(shù)據(jù)量大致相同。
優(yōu)勢
可撤底消除數(shù)據(jù)源側(cè)數(shù)據(jù)傾斜,效果顯著。
劣勢
數(shù)據(jù)源一般來源于外部系統(tǒng),需要外部系統(tǒng)的支持。
調(diào)整并行度分散同一個(gè)Task的不同Key
原理
Spark在做Shuffle時(shí),默認(rèn)使用HashPartitioner(非Hash Shuffle)對數(shù)據(jù)進(jìn)行分區(qū)。如果并行度設(shè)置的不合適,可能造成大量不相同的Key對應(yīng)的數(shù)據(jù)被分配到了同一個(gè)Task上,造成該Task所處理的數(shù)據(jù)遠(yuǎn)大于其它Task,從而造成數(shù)據(jù)傾斜。
如果調(diào)整Shuffle時(shí)的并行度,使得原本被分配到同一Task的不同Key發(fā)配到不同Task上處理,則可降低原Task所需處理的數(shù)據(jù)量,從而緩解數(shù)據(jù)傾斜問題造成的短板效應(yīng)。
案例
現(xiàn)有一張測試表,名為student_external,內(nèi)有10.5億條數(shù)據(jù),每條數(shù)據(jù)有一個(gè)唯一的id值。現(xiàn)從中取出id取值為9億到10.5億的共1.5億條數(shù)據(jù),并通過一些處理,使得id為9億到9.4億間的所有數(shù)據(jù)對12取模后余數(shù)為8(即在Shuffle并行度為12時(shí)該數(shù)據(jù)集全部被HashPartition分配到第8個(gè)Task),其它數(shù)據(jù)集對其id除以100取整,從而使得id大于9.4億的數(shù)據(jù)在Shuffle時(shí)可被均勻分配到所有Task中,而id小于9.4億的數(shù)據(jù)全部分配到同一個(gè)Task中。處理過程如下
INSERT OVERWRITE TABLE testSELECT CASE WHEN id < 940000000 THEN (9500000 + (CAST (RAND() * 8 AS INTEGER)) * 12 )ELSE CAST(id/100 AS INTEGER)END,nameFROM student_externalWHERE id BETWEEN 900000000 AND 1050000000;
通過上述處理,一份可能造成后續(xù)數(shù)據(jù)傾斜的測試數(shù)據(jù)即以準(zhǔn)備好。接下來,使用Spark讀取該測試數(shù)據(jù),并通過groupByKey(12)對id分組處理,且Shuffle并行度為12。代碼如下
public class SparkDataSkew {public static void main(String[] args) {SparkSession sparkSession = SparkSession.builder().appName("SparkDataSkewTunning").config("hive.metastore.uris", "thrift://hadoop1:9083").enableHiveSupport().getOrCreate();Datasetdataframe = sparkSession.sql( "select * from test");
dataframe.toJavaRDD().mapToPair((Row row) -> new Tuple2(row.getInt(0),row.getString(1))) .groupByKey(12).mapToPair((Tuple2> tuple) -> { int id = tuple._1();AtomicInteger atomicInteger = new AtomicInteger(0);tuple._2().forEach((String name) -> atomicInteger.incrementAndGet());return new Tuple2(id, atomicInteger.get()); }).count();sparkSession.stop();sparkSession.close();}}
本次實(shí)驗(yàn)所使用集群節(jié)點(diǎn)數(shù)為4,每個(gè)節(jié)點(diǎn)可被Yarn使用的CPU核數(shù)為16,內(nèi)存為16GB。使用如下方式提交上述應(yīng)用,將啟動(dòng)4個(gè)Executor,每個(gè)Executor可使用核數(shù)為12(該配置并非生產(chǎn)環(huán)境下的最優(yōu)配置,僅用于本文實(shí)驗(yàn)),可用內(nèi)存為12GB。
spark-submit --queue ambari --num-executors 4 --executor-cores 12 --executor-memory 12g --class com.jasongj.spark.driver.SparkDataSkew --master yarn --deploy-mode client SparkExample-with-dependencies-1.0.jarGroupBy Stage的Task狀態(tài)如下圖所示,Task 8處理的記錄數(shù)為4500萬,遠(yuǎn)大于(9倍于)其它11個(gè)Task處理的500萬記錄。而Task 8所耗費(fèi)的時(shí)間為38秒,遠(yuǎn)高于其它11個(gè)Task的平均時(shí)間(16秒)。整個(gè)Stage的時(shí)間也為38秒,該時(shí)間主要由最慢的Task 8決定。
在這種情況下,可以通過調(diào)整Shuffle并行度,使得原來被分配到同一個(gè)Task(即該例中的Task 8)的不同Key分配到不同Task,從而降低Task 8所需處理的數(shù)據(jù)量,緩解數(shù)據(jù)傾斜。
通過groupByKey(48)將Shuffle并行度調(diào)整為48,重新提交到Spark。新的Job的GroupBy Stage所有Task狀態(tài)如下圖所示。
從上圖可知,記錄數(shù)最多的Task 20處理的記錄數(shù)約為1125萬,相比于并行度為12時(shí)Task 8的4500萬,降低了75%左右,而其耗時(shí)從原來Task 8的38秒降到了24秒。
在這種場景下,調(diào)整并行度,并不意味著一定要增加并行度,也可能是減小并行度。如果通過groupByKey(11)將Shuffle并行度調(diào)整為11,重新提交到Spark。新Job的GroupBy Stage的所有Task狀態(tài)如下圖所示。
從上圖可見,處理記錄數(shù)最多的Task 6所處理的記錄數(shù)約為1045萬,耗時(shí)為23秒。處理記錄數(shù)最少的Task 1處理的記錄數(shù)約為545萬,耗時(shí)12秒。
總結(jié)
適用場景
大量不同的Key被分配到了相同的Task造成該Task數(shù)據(jù)量過大。
解決方案
調(diào)整并行度。一般是增大并行度,但有時(shí)如本例減小并行度也可達(dá)到效果。
優(yōu)勢
實(shí)現(xiàn)簡單,可在需要Shuffle的操作算子上直接設(shè)置并行度或者使用spark.default.parallelism設(shè)置。如果是Spark SQL,還可通過SET spark.sql.shuffle.partitions=[num_tasks]設(shè)置并行度。可用最小的代價(jià)解決問題。一般如果出現(xiàn)數(shù)據(jù)傾斜,都可以通過這種方法先試驗(yàn)幾次,如果問題未解決,再嘗試其它方法。
劣勢
適用場景少,只能將分配到同一Task的不同Key分散開,但對于同一Key傾斜嚴(yán)重的情況該方法并不適用。并且該方法一般只能緩解數(shù)據(jù)傾斜,沒有徹底消除問題。從實(shí)踐經(jīng)驗(yàn)來看,其效果一般。
自定義Partitioner
原理
使用自定義的Partitioner(默認(rèn)為HashPartitioner),將原本被分配到同一個(gè)Task的不同Key分配到不同Task。
案例
以上述數(shù)據(jù)集為例,繼續(xù)將并發(fā)度設(shè)置為12,但是在groupByKey算子上,使用自定義的Partitioner(實(shí)現(xiàn)如下)
.groupByKey(new Partitioner() {public int numPartitions() {return 12;}public int getPartition(Object key) {int id = Integer.parseInt(key.toString());if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) {return (id - 9500000) / 12;} else {return id % 12;}}})
由下圖可見,使用自定義Partition后,耗時(shí)最長的Task 6處理約1000萬條數(shù)據(jù),用時(shí)15秒。并且各Task所處理的數(shù)據(jù)集大小相當(dāng)。
總結(jié)
適用場景
大量不同的Key被分配到了相同的Task造成該Task數(shù)據(jù)量過大。
解決方案
使用自定義的Partitioner實(shí)現(xiàn)類代替默認(rèn)的HashPartitioner,盡量將所有不同的Key均勻分配到不同的Task中。
優(yōu)勢
不影響原有的并行度設(shè)計(jì)。如果改變并行度,后續(xù)Stage的并行度也會(huì)默認(rèn)改變,可能會(huì)影響后續(xù)Stage。
劣勢
適用場景有限,只能將不同Key分散開,對于同一Key對應(yīng)數(shù)據(jù)集非常大的場景不適用。效果與調(diào)整并行度類似,只能緩解數(shù)據(jù)傾斜而不能完全消除數(shù)據(jù)傾斜。而且需要根據(jù)數(shù)據(jù)特點(diǎn)自定義專用的Partitioner,不夠靈活。
將Reduce side Join轉(zhuǎn)變?yōu)镸ap side Join
原理
通過Spark的Broadcast機(jī)制,將Reduce側(cè)Join轉(zhuǎn)化為Map側(cè)Join,避免Shuffle從而完全消除Shuffle帶來的數(shù)據(jù)傾斜。
案例
通過如下SQL創(chuàng)建一張具有傾斜Key且總記錄數(shù)為1.5億的大表test。
INSERT OVERWRITE TABLE testSELECT CAST(CASE WHEN id < 980000000 THEN (95000000 + (CAST (RAND() * 4 AS INT) + 1) * 48 )ELSE CAST(id/10 AS INT) END AS STRING),nameFROM student_externalWHERE id BETWEEN 900000000 AND 1050000000;
使用如下SQL創(chuàng)建一張數(shù)據(jù)分布均勻且總記錄數(shù)為50萬的小表test_new。
INSERT OVERWRITE TABLE test_newSELECT CAST(CAST(id/10 AS INT) AS STRING),nameFROM student_delta_externalWHERE id BETWEEN 950000000 AND 950500000;
直接通過Spark Thrift Server提交如下SQL將表test與表test_new進(jìn)行Join并將Join結(jié)果存于表test_join中。
INSERT OVERWRITE TABLE test_joinSELECT test_new.id, test_new.nameFROM testJOIN test_newON test.id = test_new.id;
該SQL對應(yīng)的DAG如下圖所示。從該圖可見,該執(zhí)行過程總共分為三個(gè)Stage,前兩個(gè)用于從Hive中讀取數(shù)據(jù),同時(shí)二者進(jìn)行Shuffle,通過最后一個(gè)Stage進(jìn)行Join并將結(jié)果寫入表test_join中。
從下圖可見,Join Stage各Task處理的數(shù)據(jù)傾斜嚴(yán)重,處理數(shù)據(jù)量最大的Task耗時(shí)7.1分鐘,遠(yuǎn)高于其它無數(shù)據(jù)傾斜的Task約2秒的耗時(shí)。
接下來,嘗試通過Broadcast實(shí)現(xiàn)Map側(cè)Join。實(shí)現(xiàn)Map側(cè)Join的方法,并非直接通過CACHE TABLE test_new將小表test_new進(jìn)行cache。現(xiàn)通過如下SQL進(jìn)行Join。
CACHE TABLE test_new;INSERT OVERWRITE TABLE test_joinSELECT test_new.id, test_new.nameFROM testJOIN test_newON test.id = test_new.id;
通過如下DAG圖可見,該操作仍分為三個(gè)Stage,且仍然有Shuffle存在,唯一不同的是,小表的讀取不再直接掃描Hive表,而是掃描內(nèi)存中緩存的表。
并且數(shù)據(jù)傾斜仍然存在。如下圖所示,最慢的Task耗時(shí)為7.1分鐘,遠(yuǎn)高于其它Task的約2秒。

正確的使用Broadcast實(shí)現(xiàn)Map側(cè)Join的方式是,通過SET spark.sql.autoBroadcastJoinThreshold=104857600;將Broadcast的閾值設(shè)置得足夠大。
再次通過如下SQL進(jìn)行Join。
SET spark.sql.autoBroadcastJoinThreshold=104857600;INSERT OVERWRITE TABLE test_joinSELECT test_new.id, test_new.nameFROM testJOIN test_newON test.id = test_new.id;
通過如下DAG圖可見,該方案只包含一個(gè)Stage。
并且從下圖可見,各Task耗時(shí)相當(dāng),無明顯數(shù)據(jù)傾斜現(xiàn)象。并且總耗時(shí)為1.5分鐘,遠(yuǎn)低于Reduce側(cè)Join的7.3分鐘。
總結(jié)
適用場景
參與Join的一邊數(shù)據(jù)集足夠小,可被加載進(jìn)Driver并通過Broadcast方法廣播到各個(gè)Executor中。
解決方案
在Java/Scala代碼中將小數(shù)據(jù)集數(shù)據(jù)拉取到Driver,然后通過Broadcast方案將小數(shù)據(jù)集的數(shù)據(jù)廣播到各Executor。或者在使用SQL前,將Broadcast的閾值調(diào)整得足夠大,從而使用Broadcast生效。進(jìn)而將Reduce側(cè)Join替換為Map側(cè)Join。
優(yōu)勢
避免了Shuffle,徹底消除了數(shù)據(jù)傾斜產(chǎn)生的條件,可極大提升性能。
劣勢
要求參與Join的一側(cè)數(shù)據(jù)集足夠小,并且主要適用于Join的場景,不適合聚合的場景,適用條件有限。
為skew的key增加隨機(jī)前/后綴
原理
為數(shù)據(jù)量特別大的Key增加隨機(jī)前/后綴,使得原來Key相同的數(shù)據(jù)變?yōu)镵ey不相同的數(shù)據(jù),從而使傾斜的數(shù)據(jù)集分散到不同的Task中,徹底解決數(shù)據(jù)傾斜問題。Join另一則的數(shù)據(jù)中,與傾斜Key對應(yīng)的部分?jǐn)?shù)據(jù),與隨機(jī)前綴集作笛卡爾乘積,從而保證無論數(shù)據(jù)傾斜側(cè)傾斜Key如何加前綴,都能與之正常Join。
案例
通過如下SQL,將id為9億到9.08億共800萬條數(shù)據(jù)的id轉(zhuǎn)為9500048或者9500096,其它數(shù)據(jù)的id除以100取整。從而該數(shù)據(jù)集中,id為9500048和9500096的數(shù)據(jù)各400萬,其它id對應(yīng)的數(shù)據(jù)記錄數(shù)均為100條。這些數(shù)據(jù)存于名為test的表中。
對于另外一張小表test_new,取出50萬條數(shù)據(jù),并將id(遞增且唯一)除以100取整,使得所有id都對應(yīng)100條數(shù)據(jù)。
INSERT OVERWRITE TABLE testSELECT CAST(CASE WHEN id < 908000000 THEN (9500000 + (CAST (RAND() * 2 AS INT) + 1) * 48 )ELSE CAST(id/100 AS INT) END AS STRING),nameFROM student_externalWHERE id BETWEEN 900000000 AND 1050000000;INSERT OVERWRITE TABLE test_newSELECT CAST(CAST(id/100 AS INT) AS STRING),nameFROM student_delta_externalWHERE id BETWEEN 950000000 AND 950500000;
通過如下代碼,讀取test表對應(yīng)的文件夾內(nèi)的數(shù)據(jù)并轉(zhuǎn)換為JavaPairRDD存于leftRDD中,同樣讀取test表對應(yīng)的數(shù)據(jù)存于rightRDD中。通過RDD的join算子對leftRDD與rightRDD進(jìn)行Join,并指定并行度為48。
public class SparkDataSkew{public static void main(String[] args) {SparkConf sparkConf = new SparkConf();sparkConf.setAppName("DemoSparkDataFrameWithSkewedBigTableDirect");sparkConf.set("spark.default.parallelism", String.valueOf(parallelism));JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/").mapToPair((String row) -> {String[] str = row.split(",");return new Tuple2<String, String>(str[0], str[1]);});JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/").mapToPair((String row) -> {String[] str = row.split(",");return new Tuple2<String, String>(str[0], str[1]);});leftRDD.join(rightRDD, parallelism).mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1(), tuple._2()._2())).foreachPartition((IteratorString , String>> iterator) -> {AtomicInteger atomicInteger = new AtomicInteger();iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());});javaSparkContext.stop();javaSparkContext.close();}}
從下圖可看出,整個(gè)Join耗時(shí)1分54秒,其中Join Stage耗時(shí)1.7分鐘。

通過分析Join Stage的所有Task可知,在其它Task所處理記錄數(shù)為192.71萬的同時(shí)Task 32的處理的記錄數(shù)為992.72萬,故它耗時(shí)為1.7分鐘,遠(yuǎn)高于其它Task的約10秒。這與上文準(zhǔn)備數(shù)據(jù)集時(shí),將id為9500048為9500096對應(yīng)的數(shù)據(jù)量設(shè)置非常大,其它id對應(yīng)的數(shù)據(jù)集非常均勻相符合。
現(xiàn)通過如下操作,實(shí)現(xiàn)傾斜Key的分散處理
將leftRDD中傾斜的key(即9500048與9500096)對應(yīng)的數(shù)據(jù)單獨(dú)過濾出來,且加上1到24的隨機(jī)前綴,并將前綴與原數(shù)據(jù)用逗號分隔(以方便之后去掉前綴)形成單獨(dú)的leftSkewRDD
將rightRDD中傾斜key對應(yīng)的數(shù)據(jù)抽取出來,并通過flatMap操作將該數(shù)據(jù)集中每條數(shù)據(jù)均轉(zhuǎn)換為24條數(shù)據(jù)(每條分別加上1到24的隨機(jī)前綴),形成單獨(dú)的rightSkewRDD
將leftSkewRDD與rightSkewRDD進(jìn)行Join,并將并行度設(shè)置為48,且在Join過程中將隨機(jī)前綴去掉,得到傾斜數(shù)據(jù)集的Join結(jié)果skewedJoinRDD
將leftRDD中不包含傾斜Key的數(shù)據(jù)抽取出來作為單獨(dú)的leftUnSkewRDD
對leftUnSkewRDD與原始的rightRDD進(jìn)行Join,并行度也設(shè)置為48,得到Join結(jié)果unskewedJoinRDD
通過union算子將skewedJoinRDD與unskewedJoinRDD進(jìn)行合并,從而得到完整的Join結(jié)果集
具體實(shí)現(xiàn)代碼如下
public class SparkDataSkew{public static void main(String[] args) {int parallelism = 48;SparkConf sparkConf = new SparkConf();sparkConf.setAppName("SolveDataSkewWithRandomPrefix");sparkConf.set("spark.default.parallelism", parallelism + "");JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/").mapToPair((String row) -> {String[] str = row.split(",");return new Tuple2<String, String>(str[0], str[1]);});JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/").mapToPair((String row) -> {String[] str = row.split(",");return new Tuple2<String, String>(str[0], str[1]);});String[] skewedKeyArray = new String[]{"9500048", "9500096"};Set<String> skewedKeySet = new HashSet<String>();List<String> addList = new ArrayList<String>();for(int i = 1; i <=24; i++) {addList.add(i + "");}for(String key : skewedKeyArray) {skewedKeySet.add(key);}Broadcast<Set<String>> skewedKeys = javaSparkContext.broadcast(skewedKeySet);BroadcastString
>> addListKeys = javaSparkContext.broadcast(addList);JavaPairRDD<String, String> leftSkewRDD = leftRDD.filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1())).mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>((new Random().nextInt(24) + 1) + "," + tuple._1(), tuple._2()));JavaPairRDD<String, String> rightSkewRDD = rightRDD.filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1())).flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream().map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2())).collect(Collectors.toList()).iterator());JavaPairRDD<String, String> skewedJoinRDD = leftSkewRDD.join(rightSkewRDD, parallelism).mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1().split(",")[1], tuple._2()._2()));JavaPairRDD<String, String> leftUnSkewRDD = leftRDD.filter((Tuple2<String, String> tuple) -> !skewedKeys.value().contains(tuple._1()));JavaPairRDD<String, String> unskewedJoinRDD = leftUnSkewRDD.join(rightRDD, parallelism).mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1(), tuple._2()._2()));skewedJoinRDD.union(unskewedJoinRDD).foreachPartition((IteratorString , String>> iterator) -> {AtomicInteger atomicInteger = new AtomicInteger();iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());});javaSparkContext.stop();javaSparkContext.close();}}
從下圖可看出,整個(gè)Join耗時(shí)58秒,其中Join Stage耗時(shí)33秒。
通過分析Join Stage的所有Task可知
由于Join分傾斜數(shù)據(jù)集Join和非傾斜數(shù)據(jù)集Join,而各Join的并行度均為48,故總的并行度為96
由于提交任務(wù)時(shí),設(shè)置的Executor個(gè)數(shù)為4,每個(gè)Executor的core數(shù)為12,故可用Core數(shù)為48,所以前48個(gè)Task同時(shí)啟動(dòng)(其Launch時(shí)間相同),后48個(gè)Task的啟動(dòng)時(shí)間各不相同(等待前面的Task結(jié)束才開始)
由于傾斜Key被加上隨機(jī)前綴,原本相同的Key變?yōu)椴煌腒ey,被分散到不同的Task處理,故在所有Task中,未發(fā)現(xiàn)所處理數(shù)據(jù)集明顯高于其它Task的情況

實(shí)際上,由于傾斜Key與非傾斜Key的操作完全獨(dú)立,可并行進(jìn)行。而本實(shí)驗(yàn)受限于可用總核數(shù)為48,可同時(shí)運(yùn)行的總Task數(shù)為48,故而該方案只是將總耗時(shí)減少一半(效率提升一倍)。如果資源充足,可并發(fā)執(zhí)行Task數(shù)增多,該方案的優(yōu)勢將更為明顯。在實(shí)際項(xiàng)目中,該方案往往可提升數(shù)倍至10倍的效率。
總結(jié)
適用場景
兩張表都比較大,無法使用Map則Join。其中一個(gè)RDD有少數(shù)幾個(gè)Key的數(shù)據(jù)量過大,另外一個(gè)RDD的Key分布較為均勻。
解決方案
將有數(shù)據(jù)傾斜的RDD中傾斜Key對應(yīng)的數(shù)據(jù)集單獨(dú)抽取出來加上隨機(jī)前綴,另外一個(gè)RDD每條數(shù)據(jù)分別與隨機(jī)前綴結(jié)合形成新的RDD(相當(dāng)于將其數(shù)據(jù)增到到原來的N倍,N即為隨機(jī)前綴的總個(gè)數(shù)),然后將二者Join并去掉前綴。然后將不包含傾斜Key的剩余數(shù)據(jù)進(jìn)行Join。最后將兩次Join的結(jié)果集通過union合并,即可得到全部Join結(jié)果。
優(yōu)勢
相對于Map則Join,更能適應(yīng)大數(shù)據(jù)集的Join。如果資源充足,傾斜部分?jǐn)?shù)據(jù)集與非傾斜部分?jǐn)?shù)據(jù)集可并行進(jìn)行,效率提升明顯。且只針對傾斜部分的數(shù)據(jù)做數(shù)據(jù)擴(kuò)展,增加的資源消耗有限。
劣勢
如果傾斜Key非常多,則另一側(cè)數(shù)據(jù)膨脹非常大,此方案不適用。而且此時(shí)對傾斜Key與非傾斜Key分開處理,需要掃描數(shù)據(jù)集兩遍,增加了開銷。
大表隨機(jī)添加N種隨機(jī)前綴,小表擴(kuò)大N倍
原理
如果出現(xiàn)數(shù)據(jù)傾斜的Key比較多,上一種方法將這些大量的傾斜Key分拆出來,意義不大。此時(shí)更適合直接對存在數(shù)據(jù)傾斜的數(shù)據(jù)集全部加上隨機(jī)前綴,然后對另外一個(gè)不存在嚴(yán)重?cái)?shù)據(jù)傾斜的數(shù)據(jù)集整體與隨機(jī)前綴集作笛卡爾乘積(即將數(shù)據(jù)量擴(kuò)大N倍)。
案例
這里給出示例代碼,讀者可參考上文中分拆出少數(shù)傾斜Key添加隨機(jī)前綴的方法,自行測試。
public class SparkDataSkew {public static void main(String[] args) {SparkConf sparkConf = new SparkConf();sparkConf.setAppName("ResolveDataSkewWithNAndRandom");sparkConf.set("spark.default.parallelism", parallelism + "");JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/").mapToPair((String row) -> {String[] str = row.split(",");return new Tuple2<String, String>(str[0], str[1]);});JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/").mapToPair((String row) -> {String[] str = row.split(",");return new Tuple2<String, String>(str[0], str[1]);});List<String> addList = new ArrayList<String>();for(int i = 1; i <=48; i++) {addList.add(i + "");}BroadcastString
>> addListKeys = javaSparkContext.broadcast(addList);JavaPairRDD<String, String> leftRandomRDD = leftRDD.mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>(new Random().nextInt(48) + "," + tuple._1(), tuple._2()));JavaPairRDD<String, String> rightNewRDD = rightRDD.flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream().map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2())).collect(Collectors.toList()).iterator());JavaPairRDD<String, String> joinRDD = leftRandomRDD.join(rightNewRDD, parallelism).mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1().split(",")[1], tuple._2()._2()));joinRDD.foreachPartition((IteratorString , String>> iterator) -> {AtomicInteger atomicInteger = new AtomicInteger();iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());});javaSparkContext.stop();javaSparkContext.close();}}
總結(jié)
適用場景
一個(gè)數(shù)據(jù)集存在的傾斜Key比較多,另外一個(gè)數(shù)據(jù)集數(shù)據(jù)分布比較均勻。
優(yōu)勢
對大部分場景都適用,效果不錯(cuò)。
劣勢
需要將一個(gè)數(shù)據(jù)集整體擴(kuò)大N倍,會(huì)增加資源消耗。
總結(jié)
對于數(shù)據(jù)傾斜,并無一個(gè)統(tǒng)一的一勞永逸的方法。更多的時(shí)候,是結(jié)合數(shù)據(jù)特點(diǎn)(數(shù)據(jù)集大小,傾斜Key的多少等)綜合使用上文所述的多種方法。
??著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者!
