<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          4w字Spark調(diào)優(yōu)寶典(推薦收藏)

          共 39933字,需瀏覽 80分鐘

           ·

          2021-06-27 13:57


          1
             性能調(diào)優(yōu)


          1
          分配更多資源


          分配哪些資源?

          Executor的數(shù)量

          每個Executor所能分配的CPU數(shù)量

          每個Executor所能分配的內(nèi)存量

          Driver端分配的內(nèi)存數(shù)量


          在哪里分配這些資源?

          在生產(chǎn)環(huán)境中,提交spark作業(yè)時,用的spark-submit shell腳本,里面調(diào)整對應(yīng)的參數(shù):

          /usr/local/spark/bin/spark-submit \--class cn.spark.sparktest.core.WordCountCluster \--num-executors 3 \  配置executor的數(shù)量--driver-memory 100m \  配置driver的內(nèi)存(影響不大)--executor-memory 100m \  配置每個executor的內(nèi)存大小--total-executor-cores 3 \  配置所有executor的cpu core數(shù)量/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \


          調(diào)節(jié)到多大,算是最大呢?

          常用的資源調(diào)度模式有Spark Standalone和Spark On Yarn。比如說你的每臺機(jī)器能夠給你使用60G內(nèi)存,10個cpu core,20臺機(jī)器。那么executor的數(shù)量是20。平均每個executor所能分配60G內(nèi)存和10個cpu core。

           

          為什么多分配了這些資源以后,性能會得到提升?

          ? 增加executor:

          如果executor數(shù)量比較少,那么,能夠并行執(zhí)行的task數(shù)量就比較少,就意味著,我們的Application的并行執(zhí)行的能力就很弱。

          比如有3個executor,每個executor有2個cpu core,那么同時能夠并行執(zhí)行的task,就是6個。6個執(zhí)行完以后,再換下一批6個task。

          增加了executor數(shù)量以后,那么,就意味著,能夠并行執(zhí)行的task數(shù)量,也就變多了。比如原先是6個,現(xiàn)在可能可以并行執(zhí)行10個,甚至20個,100個。那么并行能力就比之前提升了數(shù)倍,數(shù)十倍。相應(yīng)的,性能(執(zhí)行的速度),也能提升數(shù)倍~數(shù)十倍。

           

          ? 增加每個executor的cpu core,也是增加了執(zhí)行的并行能力。原本20個executor,每個才2個cpu core。能夠并行執(zhí)行的task數(shù)量,就是40個task。

          現(xiàn)在每個executor的cpu core,增加到了4個。能夠并行執(zhí)行的task數(shù)量,就是80個task。就物理性能來看,執(zhí)行的速度,提升了2倍。

          ? 增加每個executor的內(nèi)存量。增加了內(nèi)存量以后,對性能的提升,有三點:

          1、如果需要對RDD進(jìn)行cache,那么更多的內(nèi)存,就可以緩存更多的數(shù)據(jù),將更少的數(shù)據(jù)寫入磁盤,甚至不寫入磁盤。減少了磁盤IO。

          2、對于shuffle操作,reduce端,會需要內(nèi)存來存放拉取的數(shù)據(jù)并進(jìn)行聚合。如果內(nèi)存不夠,也會寫入磁盤。如果給executor分配更多內(nèi)存以后,就有更少的數(shù)據(jù),需要寫入磁盤,甚至不需要寫入磁盤。減少了磁盤IO,提升了性能。

          3、對于task的執(zhí)行,可能會創(chuàng)建很多對象。如果內(nèi)存比較小,可能會頻繁導(dǎo)致JVM堆內(nèi)存滿了,然后頻繁GC,垃圾回收,minor GC和full GC。(速度很慢)。內(nèi)存加大以后,帶來更少的GC,垃圾回收,避免了速度變慢,速度變快了。


          2
          調(diào)節(jié)并行度

          并行度的概念

          就是指的是Spark作業(yè)中,各個stage的task數(shù)量,代表了Spark作業(yè)的在各個階段(stage)的并行度。


          如果不調(diào)節(jié)并行度,導(dǎo)致并行度過低,會怎么樣?

          比如現(xiàn)在spark-submit腳本里面,給我們的spark作業(yè)分配了足夠多的資源,比如50個executor,每個executor有10G內(nèi)存,每個executor有3個cpu core。基本已經(jīng)達(dá)到了集群或者yarn隊列的資源上限。task沒有設(shè)置,或者設(shè)置的很少,比如就設(shè)置了100個task,你的Application任何一個stage運(yùn)行的時候,都有總數(shù)在150個cpu core,可以并行運(yùn)行。但是你現(xiàn)在,只有100個task,平均分配一下,每個executor分配到2個task,ok,那么同時在運(yùn)行的task,只有100個,每個executor只會并行運(yùn)行2個task。每個executor剩下的一個cpu core, 就浪費掉了。

          你的資源雖然分配足夠了,但是問題是,并行度沒有與資源相匹配,導(dǎo)致你分配下去的資源都浪費掉了。

          合理的并行度的設(shè)置,應(yīng)該是要設(shè)置的足夠大,大到可以完全合理的利用你的集群資源。比如上面的例子,總共集群有150個cpu core,可以并行運(yùn)行150個task。那么就應(yīng)該將你的Application的并行度,至少設(shè)置成150,才能完全有效的利用你的集群資源,讓150個task,并行執(zhí)行。而且task增加到150個以后,即可以同時并行運(yùn)行,還可以讓每個task要處理的數(shù)據(jù)量變少。比如總共150G的數(shù)據(jù)要處理,如果是100個task,每個task計算1.5G的數(shù)據(jù),現(xiàn)在增加到150個task,可以并行運(yùn)行,而且每個task主要處理1G的數(shù)據(jù)就可以。

          很簡單的道理,只要合理設(shè)置并行度,就可以完全充分利用你的集群計算資源,并且減少每個task要處理的數(shù)據(jù)量,最終,就是提升你的整個Spark作業(yè)的性能和運(yùn)行速度。


          設(shè)置并行度

          1)、task數(shù)量,至少設(shè)置成與Spark application的總cpu core數(shù)量相同(最理想情況,比如總共150個cpu core,分配了150個task,一起運(yùn)行,差不多同一時間運(yùn)行完畢)。

          2)、官方是推薦,task數(shù)量,設(shè)置成spark application總cpu core數(shù)量的2~3倍,比如150個cpu core,基本要設(shè)置task數(shù)量為300~500。

          實際情況,與理想情況不同的,有些task會運(yùn)行的快一點,比如50s就完了,有些task,可能會慢一點,要1分半才運(yùn)行完,所以如果你的task數(shù)量,剛好設(shè)置的跟cpu core數(shù)量相同,可能還是會導(dǎo)致資源的浪費。比如150個task,10個先運(yùn)行完了,剩余140個還在運(yùn)行,但是這個時候,有10個cpu core就空閑出來了,就導(dǎo)致了浪費。那如果task數(shù)量設(shè)置成cpu core總數(shù)的2~3倍,那么一個task運(yùn)行完了以后,另一個task馬上可以補(bǔ)上來,就盡量讓cpu core不要空閑,同時也是盡量提升spark作業(yè)運(yùn)行的效率和速度,提升性能。

          3)、如何設(shè)置一個Spark Application的并行度?

          spark.default.parallelismSparkConf conf = new SparkConf()  .set("spark.default.parallelism", "500")


          3
          重構(gòu)RDD架構(gòu)以及RDD持久化


          RDD架構(gòu)重構(gòu)與優(yōu)化

          盡量去復(fù)用RDD,差不多的RDD,可以抽取成為一個共同的RDD,供后面的RDD計算時,反復(fù)使用。


          公共RDD一定要實現(xiàn)持久化

          對于要多次計算和使用的公共RDD,一定要進(jìn)行持久化。

          持久化,就是將RDD的數(shù)據(jù)緩存到內(nèi)存中/磁盤中(BlockManager)以后無論對這個RDD做多少次計算,那么都是直接取這個RDD的持久化的數(shù)據(jù),比如從內(nèi)存中或者磁盤中,直接提取一份數(shù)據(jù)。


          持久化,是可以進(jìn)行序列化的

          如果正常將數(shù)據(jù)持久化在內(nèi)存中,那么可能會導(dǎo)致內(nèi)存的占用過大,這樣的話,也許,會導(dǎo)致OOM內(nèi)存溢出。

          當(dāng)純內(nèi)存無法支撐公共RDD數(shù)據(jù)完全存放的時候,就優(yōu)先考慮使用序列化的方式在純內(nèi)存中存儲。將RDD的每個partition的數(shù)據(jù),序列化成一個大的字節(jié)數(shù)組,就一個對象。序列化后,大大減少內(nèi)存的空間占用。

          序列化的方式,唯一的缺點就是,在獲取數(shù)據(jù)的時候,需要反序列化。

          如果序列化純內(nèi)存方式,還是導(dǎo)致OOM內(nèi)存溢出,就只能考慮磁盤的方式、內(nèi)存+磁盤的普通方式(無序列化)、內(nèi)存+磁盤(序列化)。


          為了數(shù)據(jù)的高可靠性,而且內(nèi)存充足,可以使用雙副本機(jī)制,進(jìn)行持久化。

          持久化的雙副本機(jī)制,持久化后的一個副本,因為機(jī)器宕機(jī)了,副本丟了,就還是得重新計算一次。持久化的每個數(shù)據(jù)單元,存儲一份副本,放在其他節(jié)點上面。從而進(jìn)行容錯。一個副本丟了,不用重新計算,還可以使用另外一份副本。這種方式,僅僅針對你的內(nèi)存資源極度充足的情況。


          4
          廣播變量


          概念及需求

          Spark Application(我們自己寫的Spark作業(yè))最開始在Driver端,在我們提交任務(wù)的時候,需要傳遞到各個Executor的Task上運(yùn)行。對于一些只讀、固定的數(shù)據(jù)(比如從DB中讀出的數(shù)據(jù)),每次都需要Driver廣播到各個Task上,這樣效率低下。廣播變量允許將變量只廣播(提前廣播)給各個Executor。該Executor上的各個Task再從所在節(jié)點的BlockManager獲取變量,如果本地沒有,那么就從Driver遠(yuǎn)程拉取變量副本,并保存在本地的BlockManager中。此后這個executor上的task,都會直接使用本地的BlockManager中的副本。而不是從Driver獲取變量,從而提升了效率。

          一個Executor只需要在第一個Task啟動時,獲得一份Broadcast數(shù)據(jù),之后的Task都從本節(jié)點的BlockManager中獲取相關(guān)數(shù)據(jù)。


          使用方法

          1)調(diào)用SparkContext.broadcast方法創(chuàng)建一個Broadcast[T]對象。任何序列化的類型都可以這么實現(xiàn)。

          2)通過value方法訪問該對象的值。

          3)變量只會被發(fā)送到各個節(jié)點一次,應(yīng)作為只讀值處理(修改這個值不會影響到別的節(jié)點)


          5
          使用Kryo序列化


          概念及需求

          默認(rèn)情況下,Spark內(nèi)部是使用Java的序列化機(jī)制,ObjectOutputStream / ObjectInputStream,對象輸入輸出流機(jī)制,來進(jìn)行序列化。

          這種默認(rèn)序列化機(jī)制的好處在于,處理起來比較方便,也不需要我們手動去做什么事情,只是,你在算子里面使用的變量,必須是實現(xiàn)Serializable接口的,可序列化即可。

          但是缺點在于,默認(rèn)的序列化機(jī)制的效率不高,序列化的速度比較慢,序列化以后的數(shù)據(jù),占用的內(nèi)存空間相對還是比較大。

          Spark支持使用Kryo序列化機(jī)制。這種序列化機(jī)制,比默認(rèn)的Java序列化機(jī)制速度要快,序列化后的數(shù)據(jù)更小,大概是Java序列化機(jī)制的1/10。

          所以Kryo序列化優(yōu)化以后,可以讓網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)變少,在集群中耗費的內(nèi)存資源大大減少。


          Kryo序列化機(jī)制啟用以后生效的幾個地方

          1)、算子函數(shù)中使用到的外部變量,使用Kryo以后:優(yōu)化網(wǎng)絡(luò)傳輸?shù)男阅埽梢詢?yōu)化集群中內(nèi)存的占用和消耗。

          2)、持久化RDD,優(yōu)化內(nèi)存的占用和消耗。持久化RDD占用的內(nèi)存越少,task執(zhí)行的時候,創(chuàng)建的對象,就不至于頻繁的占滿內(nèi)存,頻繁發(fā)生GC。

          3)、shuffle:可以優(yōu)化網(wǎng)絡(luò)傳輸?shù)男阅堋?/span>


          使用方法

          第一步,在SparkConf中設(shè)置一個屬性,spark.serializer,org.apache.spark.serializer.KryoSerializer類。

          第二步,注冊你使用的需要通過Kryo序列化的一些自定義類,SparkConf.registerKryoClasses()。

          項目中的使用:

          .set("spark.serializer""org.apache.spark.serializer.KryoSerializer").registerKryoClasses(new Class[]{CategorySortKey.class})


          6
          使用fastutil優(yōu)化數(shù)據(jù)格式


          fastutil介紹

          fastutil是擴(kuò)展了Java標(biāo)準(zhǔn)集合框架(Map、List、Set。HashMap、ArrayList、HashSet)的類庫,提供了特殊類型的map、set、list和queue。

          fastutil能夠提供更小的內(nèi)存占用,更快的存取速度。我們使用fastutil提供的集合類,來替代自己平時使用的JDK的原生的Map、List、Set,好處在于fastutil集合類可以減小內(nèi)存的占用,并且在進(jìn)行集合的遍歷、根據(jù)索引(或者key)獲取元素的值和設(shè)置元素的值的時候,提供更快的存取速度。

          fastutil也提供了64位的array、set和list,以及高性能快速的,以及實用的IO類,來處理二進(jìn)制和文本類型的文件。

          fastutil最新版本要求Java 7以及以上版本。

          fastutil的每一種集合類型,都實現(xiàn)了對應(yīng)的Java中的標(biāo)準(zhǔn)接口(比如fastutil的map,實現(xiàn)了Java的Map接口),因此可以直接放入已有系統(tǒng)的任何代碼中。

          fastutil還提供了一些JDK標(biāo)準(zhǔn)類庫中沒有的額外功能(比如雙向迭代器)。

          fastutil除了對象和原始類型為元素的集合,fastutil也提供引用類型的支持,但是對引用類型是使用等于號(=)進(jìn)行比較的,而不是equals()方法。

          fastutil盡量提供了在任何場景下都是速度最快的集合類庫。


          Spark中應(yīng)用fastutil的場景

          1)、如果算子函數(shù)使用了外部變量。第一,你可以使用Broadcast廣播變量優(yōu)化。第二,可以使用Kryo序列化類庫,提升序列化性能和效率。第三,如果外部變量是某種比較大的集合,那么可以考慮使用fastutil改寫外部變量,首先從源頭上就減少內(nèi)存的占用,通過廣播變量進(jìn)一步減少內(nèi)存占用,再通過Kryo序列化類庫進(jìn)一步減少內(nèi)存占用。

          2)、在你的算子函數(shù)里,也就是task要執(zhí)行的計算邏輯里面,如果有邏輯中,出現(xiàn),要創(chuàng)建比較大的Map、List等集合,可能會占用較大的內(nèi)存空間,而且可能涉及到消耗性能的遍歷、存取等集合操作,此時,可以考慮將這些集合類型使用fastutil類庫重寫,使用了fastutil集合類以后,就可以在一定程度上,減少task創(chuàng)建出來的集合類型的內(nèi)存占用。避免executor內(nèi)存頻繁占滿,頻繁喚起GC,導(dǎo)致性能下降。


          關(guān)于fastutil調(diào)優(yōu)的說明

          fastutil其實沒有你想象中的那么強(qiáng)大,也不會跟官網(wǎng)上說的效果那么一鳴驚人。廣播變量、Kryo序列化類庫、fastutil,都是之前所說的,對于性能來說,類似于一種調(diào)味品,烤雞,本來就很好吃了,然后加了一點特質(zhì)的孜然麻辣粉調(diào)料,就更加好吃了一點。分配資源、并行度、RDD架構(gòu)與持久化,這三個就是烤雞。broadcast、kryo、fastutil,類似于調(diào)料。

          比如說,你的spark作業(yè),經(jīng)過之前一些調(diào)優(yōu)以后,大概30分鐘運(yùn)行完,現(xiàn)在加上broadcast、kryo、fastutil,也許就是優(yōu)化到29分鐘運(yùn)行完、或者更好一點,也許就是28分鐘、25分鐘。

          shuffle調(diào)優(yōu),15分鐘。groupByKey用reduceByKey改寫,執(zhí)行本地聚合,也許10分鐘。跟公司申請更多的資源,比如資源更大的YARN隊列,1分鐘。


          fastutil的使用

          在pom.xml中引用fastutil的包

          <dependency>    <groupId>fastutil</groupId>    <artifactId>fastutil</artifactId>    <version>5.0.9</version></dependency>

          速度比較慢,可能是從國外的網(wǎng)去拉取jar包,可能要等待5分鐘,甚至幾十分鐘,不等

          List<Integer> 相當(dāng)于 IntList

          基本都是類似于IntList的格式,前綴就是集合的元素類型。特殊的就是Map,Int2IntMap,代表了key-value映射的元素類型。除此之外,還支持object、reference。


          7
          調(diào)節(jié)數(shù)據(jù)本地化等待時長


          tasklocality有五種

          1)、PROCESS_LOCAL:進(jìn)程本地化,代碼和數(shù)據(jù)在同一個進(jìn)程中,也就是在同一個executor中。計算數(shù)據(jù)的task由executor執(zhí)行,數(shù)據(jù)在executor的BlockManager中,性能最好。

          2)、NODE_LOCAL:節(jié)點本地化,代碼和數(shù)據(jù)在同一個節(jié)點中。比如說,數(shù)據(jù)作為一個HDFS block塊,就在節(jié)點上,而task在節(jié)點上某個executor中運(yùn)行,或者是,數(shù)據(jù)和task在一個節(jié)點上的不同executor中,數(shù)據(jù)需要在進(jìn)程間進(jìn)行傳輸。

          3)、NO_PREF:對于task來說,數(shù)據(jù)從哪里獲取都一樣,沒有好壞之分。

          4)、RACK_LOCAL:機(jī)架本地化,數(shù)據(jù)和task在一個機(jī)架的兩個節(jié)點上,數(shù)據(jù)需要通過網(wǎng)絡(luò)在節(jié)點之間進(jìn)行傳輸。

          5)、ANY:數(shù)據(jù)和task可能在集群中的任何地方,而且不在一個機(jī)架中,性能最差。


          Spark的任務(wù)調(diào)度

          Spark在Driver上,對Application的每一個stage的task進(jìn)行分配之前都會計算出每個task要計算的是哪個分片數(shù)據(jù)。Spark的task分配算法優(yōu)先會希望每個task正好分配到它要計算的數(shù)據(jù)所在的節(jié)點,這樣的話,就不用在網(wǎng)絡(luò)間傳輸數(shù)據(jù)。

          但是,有時可能task沒有機(jī)會分配到它的數(shù)據(jù)所在的節(jié)點。為什么呢,可能那個節(jié)點的計算資源和計算能力都滿了。所以這種時候, Spark會等待一段時間,默認(rèn)情況下是3s(不是絕對的,還有很多種情況,對不同的本地化級別,都會去等待),到最后,實在是等待不了了,就會選擇一個比較差的本地化級別。比如說,將task分配到靠它要計算的數(shù)據(jù)所在節(jié)點比較近的一個節(jié)點,然后進(jìn)行計算。

          但是對于第二種情況,通常來說,肯定是要發(fā)生數(shù)據(jù)傳輸,task會通過其所在節(jié)點的BlockManager來獲取數(shù)據(jù),BlockManager發(fā)現(xiàn)自己本地沒有數(shù)據(jù),會通過一個getRemote()方法,通過TransferService(網(wǎng)絡(luò)數(shù)據(jù)傳輸組件)從數(shù)據(jù)所在節(jié)點的BlockManager中,獲取數(shù)據(jù),通過網(wǎng)絡(luò)傳輸回task所在節(jié)點。

          對于我們來說,當(dāng)然不希望是類似于第二種情況的了。最好的,當(dāng)然是task和數(shù)據(jù)在一個節(jié)點上,直接從本地executor的BlockManager中獲取數(shù)據(jù),純內(nèi)存,或者帶一點磁盤IO。如果要通過網(wǎng)絡(luò)傳輸數(shù)據(jù)的話,性能肯定會下降的。大量網(wǎng)絡(luò)傳輸,以及磁盤IO,都是性能的殺手。


          我們什么時候要調(diào)節(jié)這個參數(shù)

          觀察spark作業(yè)的運(yùn)行日志。推薦大家在測試的時候,先用client模式在本地就直接可以看到比較全的日志。日志里面會顯示:starting task…,PROCESS LOCAL、NODE LOCAL

          觀察大部分task的數(shù)據(jù)本地化級別,如果大多都是PROCESS_LOCAL,那就不用調(diào)節(jié)了。

          如果是發(fā)現(xiàn),好多的級別都是NODE_LOCAL、ANY,那么最好就去調(diào)節(jié)一下數(shù)據(jù)本地化的等待時長。要反復(fù)調(diào)節(jié),每次調(diào)節(jié)完以后再運(yùn)行并觀察日志,看看大部分的task的本地化級別有沒有提升,看看整個spark作業(yè)的運(yùn)行時間有沒有縮短。注意,不要本末倒置,不要本地化級別是提升了,但是因為大量的等待時長,spark作業(yè)的運(yùn)行時間反而增加了,那還是不要調(diào)節(jié)了。


          怎么調(diào)節(jié)

          spark.locality.wait,默認(rèn)是3s。6s,10s

          默認(rèn)情況下,下面3個的等待時長,都是跟上面那個是一樣的,都是3s

          spark.locality.wait.processspark.locality.wait.nodespark.locality.wait.racknew SparkConf().set("spark.locality.wait", "10")

          2

          jvm調(diào)優(yōu)

          堆內(nèi)存存放我們創(chuàng)建的一些對象,有老年代和年輕代。理想情況下,老年代都是放一些生命周期很長的對象,數(shù)量應(yīng)該是很少的,比如數(shù)據(jù)庫連接池。我們在spark task執(zhí)行算子函數(shù)(我們自己寫的),可能會創(chuàng)建很多對象,這些對象都是要放入JVM年輕代中的。

          每一次放對象的時候,都是放入eden區(qū)域,和其中一個survivor區(qū)域。另外一個survivor區(qū)域是空閑的。

          當(dāng)eden區(qū)域和一個survivor區(qū)域放滿了以后(spark運(yùn)行過程中,產(chǎn)生的對象實在太多了),就會觸發(fā)minor gc,小型垃圾回收。把不再使用的對象,從內(nèi)存中清空,給后面新創(chuàng)建的對象騰出來點兒地方。

          清理掉了不再使用的對象之后,那么也會將存活下來的對象(還要繼續(xù)使用的),放入之前空閑的那一個survivor區(qū)域中。這里可能會出現(xiàn)一個問題。默認(rèn)eden、survior1和survivor2的內(nèi)存占比是8:1:1。問題是,如果存活下來的對象是1.5,一個survivor區(qū)域放不下。此時就可能通過JVM的擔(dān)保機(jī)制(不同JVM版本可能對應(yīng)的行為),將多余的對象,直接放入老年代了。

          如果你的JVM內(nèi)存不夠大的話,可能導(dǎo)致頻繁的年輕代內(nèi)存滿溢,頻繁的進(jìn)行minor gc。頻繁的minor gc會導(dǎo)致短時間內(nèi),有些存活的對象,多次垃圾回收都沒有回收掉。會導(dǎo)致這種短生命周期(其實不一定是要長期使用的)對象,年齡過大,垃圾回收次數(shù)太多還沒有回收到,跑到老年代。

          老年代中,可能會因為內(nèi)存不足,囤積一大堆,短生命周期的,本來應(yīng)該在年輕代中的,可能馬上就要被回收掉的對象。此時,可能導(dǎo)致老年代頻繁滿溢。頻繁進(jìn)行full gc(全局/全面垃圾回收)。full gc就會去回收老年代中的對象。full gc由于這個算法的設(shè)計,是針對的是,老年代中的對象數(shù)量很少,滿溢進(jìn)行full gc的頻率應(yīng)該很少,因此采取了不太復(fù)雜,但是耗費性能和時間的垃圾回收算法。full gc很慢。

          full gc / minor gc,無論是快,還是慢,都會導(dǎo)致jvm的工作線程停止工作,stop the world。簡而言之,就是說,gc的時候,spark停止工作了。等著垃圾回收結(jié)束。

          內(nèi)存不充足的時候,出現(xiàn)的問題:

          1、頻繁minor gc,也會導(dǎo)致頻繁spark停止工作

          2、老年代囤積大量活躍對象(短生命周期的對象),導(dǎo)致頻繁full gc,full gc時間很長,短則數(shù)十秒,長則數(shù)分鐘,甚至數(shù)小時。可能導(dǎo)致spark長時間停止工作。

          3、嚴(yán)重影響咱們的spark的性能和運(yùn)行的速度。


          降低cache操作的內(nèi)存占比

          spark中,堆內(nèi)存又被劃分成了兩塊,一塊是專門用來給RDD的cache、persist操作進(jìn)行RDD數(shù)據(jù)緩存用的。另外一塊用來給spark算子函數(shù)的運(yùn)行使用的,存放函數(shù)中自己創(chuàng)建的對象。

          默認(rèn)情況下,給RDD cache操作的內(nèi)存占比,是0.6,60%的內(nèi)存都給了cache操作了。但是問題是,如果某些情況下cache不是那么的緊張,問題在于task算子函數(shù)中創(chuàng)建的對象過多,然后內(nèi)存又不太大,導(dǎo)致了頻繁的minor gc,甚至頻繁full gc,導(dǎo)致spark頻繁的停止工作。性能影響會很大。

          針對上述這種情況,可以在任務(wù)運(yùn)行界面,去查看你的spark作業(yè)的運(yùn)行統(tǒng)計,可以看到每個stage的運(yùn)行情況,包括每個task的運(yùn)行時間、gc時間等等。如果發(fā)現(xiàn)gc太頻繁,時間太長。此時就可以適當(dāng)調(diào)價這個比例。

          降低cache操作的內(nèi)存占比,大不了用persist操作,選擇將一部分緩存的RDD數(shù)據(jù)寫入磁盤,或者序列化方式,配合Kryo序列化類,減少RDD緩存的內(nèi)存占用。降低cache操作內(nèi)存占比,對應(yīng)的,算子函數(shù)的內(nèi)存占比就提升了。這個時候,可能就可以減少minor gc的頻率,同時減少full gc的頻率。對性能的提升是有一定的幫助的。

          一句話,讓task執(zhí)行算子函數(shù)時,有更多的內(nèi)存可以使用。

          spark.storage.memoryFraction,0.6 -> 0.5 -> 0.4 -> 0.2

           

          調(diào)節(jié)executor堆外內(nèi)存與連接等待時長

          調(diào)節(jié)executor堆外內(nèi)存

          有時候,如果你的spark作業(yè)處理的數(shù)據(jù)量特別大,幾億數(shù)據(jù)量。然后spark作業(yè)一運(yùn)行,時不時的報錯,shuffle file cannot find,executor、task lost,out of memory(內(nèi)存溢出)。

          可能是executor的堆外內(nèi)存不太夠用,導(dǎo)致executor在運(yùn)行的過程中,可能會內(nèi)存溢出,可能導(dǎo)致后續(xù)的stage的task在運(yùn)行的時候,要從一些executor中去拉取shuffle map output文件,但是executor可能已經(jīng)掛掉了,關(guān)聯(lián)的block manager也沒有了。所以會報shuffle output file not found,resubmitting task,executor lost。spark作業(yè)徹底崩潰。

          上述情況下,就可以去考慮調(diào)節(jié)一下executor的堆外內(nèi)存。也許就可以避免報錯。此外,有時堆外內(nèi)存調(diào)節(jié)的比較大的時候,對于性能來說,也會帶來一定的提升。

          可以調(diào)節(jié)堆外內(nèi)存的上限:

          --conf spark.yarn.executor.memoryOverhead=2048

          spark-submit腳本里面,去用--conf的方式,去添加配置。用new SparkConf().set()這種方式去設(shè)置是沒有用的!一定要在spark-submit腳本中去設(shè)置。

          spark.yarn.executor.memoryOverhead(看名字,顧名思義,針對的是基于yarn的提交模式)

          默認(rèn)情況下,這個堆外內(nèi)存上限大概是300M。通常在項目中,真正處理大數(shù)據(jù)的時候,這里都會出現(xiàn)問題,導(dǎo)致spark作業(yè)反復(fù)崩潰,無法運(yùn)行。此時就會去調(diào)節(jié)這個參數(shù),到至少1G(1024M),甚至說2G、4G。

          通常這個參數(shù)調(diào)節(jié)上去以后,就會避免掉某些JVM OOM的異常問題,同時呢,會讓整體spark作業(yè)的性能,得到較大的提升。

           

          調(diào)節(jié)連接等待時長

          我們知道,executor會優(yōu)先從自己本地關(guān)聯(lián)的BlockManager中獲取某份數(shù)據(jù)。如果本地block manager沒有的話,那么會通過TransferService,去遠(yuǎn)程連接其他節(jié)點上executor的block manager去獲取。

          而此時上面executor去遠(yuǎn)程連接的那個executor,因為task創(chuàng)建的對象特別大,特別多,

          頻繁的讓JVM堆內(nèi)存滿溢,正在進(jìn)行垃圾回收。而處于垃圾回收過程中,所有的工作線程全部停止,相當(dāng)于只要一旦進(jìn)行垃圾回收,spark / executor停止工作,無法提供響應(yīng)。

          此時呢,就會沒有響應(yīng),無法建立網(wǎng)絡(luò)連接,會卡住。spark默認(rèn)的網(wǎng)絡(luò)連接的超時時長,是60s,如果卡住60s都無法建立連接的話,那么就宣告失敗了。

          報錯幾次,幾次都拉取不到數(shù)據(jù)的話,可能會導(dǎo)致spark作業(yè)的崩潰。也可能會導(dǎo)致DAGScheduler,反復(fù)提交幾次stage。TaskScheduler反復(fù)提交幾次task。大大延長我們的spark作業(yè)的運(yùn)行時間。

          可以考慮調(diào)節(jié)連接的超時時長:

          --conf spark.core.connection.ack.wait.timeout=300

          spark-submit腳本,切記,不是在new SparkConf().set()這種方式來設(shè)置的。

          spark.core.connection.ack.wait.timeout(spark core,connection,連接,ack,wait timeout,建立不上連接的時候,超時等待時長)

          調(diào)節(jié)這個值比較大以后,通常來說,可以避免部分的偶爾出現(xiàn)的某某文件拉取失敗,某某文件lost掉了。

          3

          shuffle調(diào)優(yōu)

          原理概述:

          什么樣的情況下,會發(fā)生shuffle?

          在spark中,主要是以下幾個算子:groupByKey、reduceByKey、countByKey、join(分情況,先groupByKey后再join是不會發(fā)生shuffle的),等等。

          什么是shuffle?

          groupByKey,要把分布在集群各個節(jié)點上的數(shù)據(jù)中的同一個key,對應(yīng)的values,都要集中到一塊兒,集中到集群中同一個節(jié)點上,更嚴(yán)密一點說,就是集中到一個節(jié)點的一個executor的一個task中。

          然后呢,集中一個key對應(yīng)的values之后,才能交給我們來進(jìn)行處理,<key, Iterable<value>>。reduceByKey,算子函數(shù)去對values集合進(jìn)行reduce操作,最后變成一個value。countByKey需要在一個task中,獲取到一個key對應(yīng)的所有的value,然后進(jìn)行計數(shù),統(tǒng)計一共有多少個value。join,RDD<key, value>,RDD<key, value>,只要是兩個RDD中,key相同對應(yīng)的2個value,都能到一個節(jié)點的executor的task中,給我們進(jìn)行處理。

          shuffle,一定是分為兩個stage來完成的。因為這其實是個逆向的過程,不是stage決定shuffle,是shuffle決定stage。

          reduceByKey(_+_),在某個action觸發(fā)job的時候,DAGScheduler,會負(fù)責(zé)劃分job為多個stage。劃分的依據(jù),就是,如果發(fā)現(xiàn)有會觸發(fā)shuffle操作的算子,比如reduceByKey,就將這個操作的前半部分,以及之前所有的RDD和transformation操作,劃分為一個stage。shuffle操作的后半部分,以及后面的,直到action為止的RDD和transformation操作,劃分為另外一個stage。

           

          1

          合并map端輸出文件


          如果不合并map端輸出文件的話,會怎么樣?

          舉例實際生產(chǎn)環(huán)境的條件:

          100個節(jié)點(每個節(jié)點一個executor):100個executor

          每個executor:2個cpu core

          總共1000個task:每個executor平均10個task

          每個節(jié)點,10個task,每個節(jié)點會輸出多少份map端文件?10 * 1000=1萬個文件

          總共有多少份map端輸出文件?100 * 10000 = 100萬。

          第一個stage,每個task,都會給第二個stage的每個task創(chuàng)建一份map端的輸出文件

          第二個stage,每個task,會到各個節(jié)點上面去,拉取第一個stage每個task輸出的,屬于自己的那一份文件。

          shuffle中的寫磁盤的操作,基本上就是shuffle中性能消耗最為嚴(yán)重的部分。

          通過上面的分析,一個普通的生產(chǎn)環(huán)境的spark job的一個shuffle環(huán)節(jié),會寫入磁盤100萬個文件。

          磁盤IO對性能和spark作業(yè)執(zhí)行速度的影響,是極其驚人和嚇人的。

          基本上,spark作業(yè)的性能,都消耗在shuffle中了,雖然不只是shuffle的map端輸出文件這一個部分,但是這里也是非常大的一個性能消耗點。


          開啟shuffle map端輸出文件合并的機(jī)制

          new SparkConf().set("spark.shuffle.consolidateFiles", "true")

          默認(rèn)情況下,是不開啟的,就是會發(fā)生如上所述的大量map端輸出文件的操作,嚴(yán)重影響性能。


          合并map端輸出文件,對咱們的spark的性能有哪些方面的影響呢?

          1、map task寫入磁盤文件的IO,減少:100萬文件 -> 20萬文件

          2、第二個stage,原本要拉取第一個stage的task數(shù)量份文件,1000個task,第二個stage的每個task,都要拉取1000份文件,走網(wǎng)絡(luò)傳輸。合并以后,100個節(jié)點,每個節(jié)點2個cpu core,第二個stage的每個task,主要拉取100 * 2 = 200個文件即可。此時網(wǎng)絡(luò)傳輸?shù)男阅芟囊泊蟠鬁p少。

          分享一下,實際在生產(chǎn)環(huán)境中,使用了spark.shuffle.consolidateFiles機(jī)制以后,實際的性能調(diào)優(yōu)的效果:對于上述的這種生產(chǎn)環(huán)境的配置,性能的提升,還是相當(dāng)?shù)目捎^的。spark作業(yè),5個小時 -> 2~3個小時。

          大家不要小看這個map端輸出文件合并機(jī)制。實際上,在數(shù)據(jù)量比較大,你自己本身做了前面的性能調(diào)優(yōu),executor上去->cpu core上去->并行度(task數(shù)量)上去,shuffle沒調(diào)優(yōu),shuffle就很糟糕了。大量的map端輸出文件的產(chǎn)生,對性能有比較惡劣的影響。

          這個時候,去開啟這個機(jī)制,可以很有效的提升性能。


          2

          調(diào)節(jié)map端內(nèi)存緩沖與reduce端內(nèi)存占比

          默認(rèn)情況下可能出現(xiàn)的問題

          默認(rèn)情況下,shuffle的map task,輸出到磁盤文件的時候,統(tǒng)一都會先寫入每個task自己關(guān)聯(lián)的一個內(nèi)存緩沖區(qū)。

          這個緩沖區(qū)大小,默認(rèn)是32kb。

          每一次,當(dāng)內(nèi)存緩沖區(qū)滿溢之后,才會進(jìn)行spill溢寫操作,溢寫到磁盤文件中去。

          reduce端task,在拉取到數(shù)據(jù)之后,會用hashmap的數(shù)據(jù)格式,來對各個key對應(yīng)的values進(jìn)行匯聚。

          針對每個key對應(yīng)的values,執(zhí)行我們自定義的聚合函數(shù)的代碼,比如_ + _(把所有values累加起來)。

          reduce task,在進(jìn)行匯聚、聚合等操作的時候,實際上,使用的就是自己對應(yīng)的executor的內(nèi)存,executor(jvm進(jìn)程,堆),默認(rèn)executor內(nèi)存中劃分給reduce task進(jìn)行聚合的比例是0.2。

          問題來了,因為比例是0.2,所以,理論上,很有可能會出現(xiàn),拉取過來的數(shù)據(jù)很多,那么在內(nèi)存中,放不下。這個時候,默認(rèn)的行為就是將在內(nèi)存放不下的數(shù)據(jù)都spill(溢寫)到磁盤文件中去。

          在數(shù)據(jù)量比較大的情況下,可能頻繁地發(fā)生reduce端的磁盤文件的讀寫。


          調(diào)優(yōu)方式

          調(diào)節(jié)map task內(nèi)存緩沖:spark.shuffle.file.buffer,默認(rèn)32k(spark 1.3.x不是這個參數(shù),后面還有一個后綴,kb。spark 1.5.x以后,變了,就是現(xiàn)在這個參數(shù))

          調(diào)節(jié)reduce端聚合內(nèi)存占比:spark.shuffle.memoryFraction,0.2


          在實際生產(chǎn)環(huán)境中,我們在什么時候來調(diào)節(jié)兩個參數(shù)?

          看Spark UI,如果你的公司是決定采用standalone模式,那么狠簡單,你的spark跑起來,會顯示一個Spark UI的地址,4040的端口。進(jìn)去觀察每個stage的詳情,有哪些executor,有哪些task,每個task的shuffle write和shuffle read的量,shuffle的磁盤和內(nèi)存讀寫的數(shù)據(jù)量。如果是用的yarn模式來提交,從yarn的界面進(jìn)去,點擊對應(yīng)的application,進(jìn)入Spark UI,查看詳情。

          如果發(fā)現(xiàn)shuffle 磁盤的write和read,很大。這個時候,就意味著最好調(diào)節(jié)一些shuffle的參數(shù)。首先當(dāng)然是考慮開啟map端輸出文件合并機(jī)制。其次調(diào)節(jié)上面說的那兩個參數(shù)。調(diào)節(jié)的時候的原則:spark.shuffle.file.buffer每次擴(kuò)大一倍,然后看看效果,64,128。spark.shuffle.memoryFraction,每次提高0.1,看看效果。

          不能調(diào)節(jié)的太大,太大了以后過猶不及,因為內(nèi)存資源是有限的,你這里調(diào)節(jié)的太大了,其他環(huán)節(jié)的內(nèi)存使用就會有問題了。

          調(diào)節(jié)以后的效果

          map task內(nèi)存緩沖變大了,減少spill到磁盤文件的次數(shù)。reduce端聚合內(nèi)存變大了,減少spill到磁盤的次數(shù),而且減少了后面聚合讀取磁盤文件的數(shù)量。

           

          3

          HashShuffleManager與SortShuffleManager


          shuffle調(diào)優(yōu)概述

          大多數(shù)Spark作業(yè)的性能主要就是消耗在了shuffle環(huán) 節(jié),因為該環(huán)節(jié)包含了大量的磁盤IO、序列化、網(wǎng)絡(luò)數(shù)據(jù)傳輸?shù)炔僮鳌R虼耍绻屪鳂I(yè)的性能更上一層樓,就有必要對shuffle過程進(jìn)行調(diào)優(yōu)。但是也 必須提醒大家的是,影響一個Spark作業(yè)性能的因素,主要還是代碼開發(fā)、資源參數(shù)以及數(shù)據(jù)傾斜,shuffle調(diào)優(yōu)只能在整個Spark的性能調(diào)優(yōu)中占 到一小部分而已。因此大家務(wù)必把握住調(diào)優(yōu)的基本原則,千萬不要舍本逐末。下面我們就給大家詳細(xì)講解shuffle的原理,以及相關(guān)參數(shù)的說明,同時給出各個參數(shù)的調(diào)優(yōu)建議。


          ShuffleManager發(fā)展概述

          在Spark的源碼中,負(fù)責(zé)shuffle過程的執(zhí)行、計算和處理的組件主要就是ShuffleManager,也即shuffle管理器。

          在Spark 1.2以前,默認(rèn)的shuffle計算引擎是HashShuffleManager。該ShuffleManager而HashShuffleManager有著一個非常嚴(yán)重的弊端,就是會產(chǎn)生大量的中間磁盤文件,進(jìn)而由大量的磁盤IO操作影響了性能。

          因此在Spark 1.2以后的版本中,默認(rèn)的ShuffleManager改成了SortShuffleManager。SortShuffleManager相較于 HashShuffleManager來說,有了一定的改進(jìn)。主要就在于,每個Task在進(jìn)行shuffle操作時,雖然也會產(chǎn)生較多的臨時磁盤文件,但是最后會將所有的臨時文件合并(merge)成一個磁盤文件,因此每個Task就只有一個磁盤文件。在下一個stage的shuffle read task拉取自己的數(shù)據(jù)時,只要根據(jù)索引讀取每個磁盤文件中的部分?jǐn)?shù)據(jù)即可。

          在spark 1.5.x以后,對于shuffle manager又出來了一種新的manager,tungsten-sort(鎢絲),鎢絲sort shuffle manager。官網(wǎng)上一般說,鎢絲sort shuffle manager,效果跟sort shuffle manager是差不多的。

          但是,唯一的不同之處在于,鎢絲manager,是使用了自己實現(xiàn)的一套內(nèi)存管理機(jī)制,性能上有很大的提升, 而且可以避免shuffle過程中產(chǎn)生的大量的OOM,GC,等等內(nèi)存相關(guān)的異常。


          hashsorttungsten-sort。如何來選擇?

          1、需不需要數(shù)據(jù)默認(rèn)就讓spark給你進(jìn)行排序?就好像mapreduce,默認(rèn)就是有按照key的排序。如果不需要的話,其實還是建議搭建就使用最基本的HashShuffleManager,因為最開始就是考慮的是不排序,換取高性能。

          2、什么時候需要用sort shuffle manager?如果你需要你的那些數(shù)據(jù)按key排序了,那么就選擇這種吧,而且要注意,reduce task的數(shù)量應(yīng)該是超過200的,這樣sort、merge(多個文件合并成一個)的機(jī)制,才能生效把。但是這里要注意,你一定要自己考量一下,有沒有必要在shuffle的過程中,就做這個事情,畢竟對性能是有影響的。

          3、如果你不需要排序,而且你希望你的每個task輸出的文件最終是會合并成一份的,你自己認(rèn)為可以減少性能開銷。可以去調(diào)節(jié)bypassMergeThreshold這個閾值,比如你的reduce task數(shù)量是500,默認(rèn)閾值是200,所以默認(rèn)還是會進(jìn)行sort和直接merge的。可以將閾值調(diào)節(jié)成550,不會進(jìn)行sort,按照hash的做法,每個reduce task創(chuàng)建一份輸出文件,最后合并成一份文件。(一定要提醒大家,這個參數(shù),其實我們通常不會在生產(chǎn)環(huán)境里去使用,也沒有經(jīng)過驗證說,這樣的方式,到底有多少性能的提升)

          4、如果你想選用sort based shuffle manager,而且你們公司的spark版本比較高,是1.5.x版本的,那么可以考慮去嘗試使用tungsten-sort shuffle manager。看看性能的提升與穩(wěn)定性怎么樣。

          總結(jié):

          1、在生產(chǎn)環(huán)境中,不建議大家貿(mào)然使用第三點和第四點:

          2、如果你不想要你的數(shù)據(jù)在shuffle時排序,那么就自己設(shè)置一下,用hash shuffle manager。

          3、如果你的確是需要你的數(shù)據(jù)在shuffle時進(jìn)行排序的,那么就默認(rèn)不用動,默認(rèn)就是sort shuffle manager。或者是什么?如果你壓根兒不care是否排序這個事兒,那么就默認(rèn)讓他就是sort的。調(diào)節(jié)一些其他的參數(shù)(consolidation機(jī)制)。(80%,都是用這種)

          spark.shuffle.manager:hash、sort、tungsten-sort

          spark.shuffle.sort.bypassMergeThreshold:200。自己可以設(shè)定一個閾值,默認(rèn)是200,當(dāng)reduce task數(shù)量少于等于200,map task創(chuàng)建的輸出文件小于等于200的,最后會將所有的輸出文件合并為一份文件。這樣做的好處,就是避免了sort排序,節(jié)省了性能開銷,而且還能將多個reduce task的文件合并成一份文件,節(jié)省了reduce task拉取數(shù)據(jù)的時候的磁盤IO的開銷。

          4

          算子調(diào)優(yōu)

          1

          MapPartitions提升Map類操作性能


          spark中,最基本的原則,就是每個task處理一個RDD的partition。


          MapPartitions的優(yōu)缺點

          MapPartitions操作的優(yōu)點:

          如果是普通的map,比如一個partition中有1萬條數(shù)據(jù)。ok,那么你的function要執(zhí)行和計算1萬次。

          但是,使用MapPartitions操作之后,一個task僅僅會執(zhí)行一次function,function一次接收所有的partition數(shù)據(jù)。只要執(zhí)行一次就可以了,性能比較高。

          MapPartitions的缺點:

          如果是普通的map操作,一次function的執(zhí)行就處理一條數(shù)據(jù)。那么如果內(nèi)存不夠用的情況下,比如處理了1千條數(shù)據(jù)了,那么這個時候內(nèi)存不夠了,那么就可以將已經(jīng)處理完的1千條數(shù)據(jù)從內(nèi)存里面垃圾回收掉,或者用其他方法,騰出空間來吧。

          所以說普通的map操作通常不會導(dǎo)致內(nèi)存的OOM異常。

          但是MapPartitions操作,對于大量數(shù)據(jù)來說,比如甚至一個partition,100萬數(shù)據(jù),一次傳入一個function以后,那么可能一下子內(nèi)存不夠,但是又沒有辦法去騰出內(nèi)存空間來,可能就OOM,內(nèi)存溢出。


          MapPartitions使用場景

          當(dāng)分析的數(shù)據(jù)量不是特別大的時候,都可以用這種MapPartitions系列操作,性能還是非常不錯的,是有提升的。比如原來是15分鐘,(曾經(jīng)有一次性能調(diào)優(yōu)),12分鐘。10分鐘->9分鐘。

          但是也有過出問題的經(jīng)驗,MapPartitions只要一用,直接OOM,內(nèi)存溢出,崩潰。

          在項目中,自己先去估算一下RDD的數(shù)據(jù)量,以及每個partition的量,還有自己分配給每個executor的內(nèi)存資源。看看一下子內(nèi)存容納所有的partition數(shù)據(jù)行不行。如果行,可以試一下,能跑通就好。性能肯定是有提升的。但是試了以后,發(fā)現(xiàn)OOM了,那就放棄吧。


          2

          filter過后使用coalesce減少分區(qū)數(shù)量

          出現(xiàn)問題

          默認(rèn)情況下,經(jīng)過了filter之后,RDD中的每個partition的數(shù)據(jù)量,可能都不太一樣了。(原本每個partition的數(shù)據(jù)量可能是差不多的)

          可能出現(xiàn)的問題:

          1、每個partition數(shù)據(jù)量變少了,但是在后面進(jìn)行處理的時候,還是要跟partition數(shù)量一樣數(shù)量的task,來進(jìn)行處理,有點浪費task計算資源。

          2、每個partition的數(shù)據(jù)量不一樣,會導(dǎo)致后面的每個task處理每個partition的時候,每個task要處理的數(shù)據(jù)量就不同,這樣就會導(dǎo)致有些task運(yùn)行的速度很快,有些task運(yùn)行的速度很慢。這就是數(shù)據(jù)傾斜。

          針對上述的兩個問題,我們希望應(yīng)該能夠怎么樣?

          1、針對第一個問題,我們希望可以進(jìn)行partition的壓縮吧,因為數(shù)據(jù)量變少了,那么partition其實也完全可以對應(yīng)的變少。比如原來是4個partition,現(xiàn)在完全可以變成2個partition。那么就只要用后面的2個task來處理即可。就不會造成task計算資源的浪費。(不必要,針對只有一點點數(shù)據(jù)的partition,還去啟動一個task來計算)

          2、針對第二個問題,其實解決方案跟第一個問題是一樣的,也是去壓縮partition,盡量讓每個partition的數(shù)據(jù)量差不多。那么后面的task分配到的partition的數(shù)據(jù)量也就差不多。不會造成有的task運(yùn)行速度特別慢,有的task運(yùn)行速度特別快。避免了數(shù)據(jù)傾斜的問題。


          解決問題方法

          調(diào)用coalesce算子

          主要就是用于在filter操作之后,針對每個partition的數(shù)據(jù)量各不相同的情況,來壓縮partition的數(shù)量,而且讓每個partition的數(shù)據(jù)量都盡量均勻緊湊。從而便于后面的task進(jìn)行計算操作,在某種程度上,能夠一定程度的提升性能。


          3

          使用foreachPartition優(yōu)化寫數(shù)據(jù)庫性能


          默認(rèn)的foreach的性能缺陷在哪里?

          首先,對于每條數(shù)據(jù),都要單獨去調(diào)用一次function,task為每個數(shù)據(jù),都要去執(zhí)行一次function函數(shù)。

          如果100萬條數(shù)據(jù),(一個partition),調(diào)用100萬次。性能比較差。

          另外一個非常非常重要的一點

          如果每個數(shù)據(jù),你都去創(chuàng)建一個數(shù)據(jù)庫連接的話,那么你就得創(chuàng)建100萬次數(shù)據(jù)庫連接。

          但是要注意的是,數(shù)據(jù)庫連接的創(chuàng)建和銷毀,都是非常非常消耗性能的。雖然我們之前已經(jīng)用了數(shù)據(jù)庫連接池,只是創(chuàng)建了固定數(shù)量的數(shù)據(jù)庫連接。

          你還是得多次通過數(shù)據(jù)庫連接,往數(shù)據(jù)庫(MySQL)發(fā)送一條SQL語句,然后MySQL需要去執(zhí)行這條SQL語句。如果有100萬條數(shù)據(jù),那么就是100萬次發(fā)送SQL語句。

          以上兩點(數(shù)據(jù)庫連接,多次發(fā)送SQL語句),都是非常消耗性能的。


          用了foreachPartition算子之后,好處在哪里?

          1、對于我們寫的function函數(shù),就調(diào)用一次,一次傳入一個partition所有的數(shù)據(jù)。

          2、主要創(chuàng)建或者獲取一個數(shù)據(jù)庫連接就可以。

          3、只要向數(shù)據(jù)庫發(fā)送一次SQL語句和多組參數(shù)即可。

          注意,與mapPartitions操作一樣,如果一個partition的數(shù)量真的特別特別大,比如是100萬,那基本上就不太靠譜了。很有可能會發(fā)生OOM,內(nèi)存溢出的問題。

           

          4

          使用repartition解決Spark SQL低并行度的性能問題


          設(shè)置并行度

          并行度:之前說過,并行度是設(shè)置的:

          1、spark.default.parallelism

          2、textFile(),傳入第二個參數(shù),指定partition數(shù)量(比較少用)

           

          在生產(chǎn)環(huán)境中,是最好設(shè)置一下并行度。官網(wǎng)有推薦的設(shè)置方式,根據(jù)你的application的總cpu core數(shù)量(在spark-submit中可以指定),自己手動設(shè)置spark.default.parallelism參數(shù),指定為cpu core總數(shù)的2~3倍。

           

          你設(shè)置的這個并行度,在哪些情況下會生效?什么情況下不會生效? 

          如果你壓根兒沒有使用Spark SQL(DataFrame),那么你整個spark application默認(rèn)所有stage的并行度都是你設(shè)置的那個參數(shù)。(除非你使用coalesce算子縮減過partition數(shù)量)。

          問題來了,用Spark SQL的情況下,stage的并行度沒法自己指定。Spark SQL自己會默認(rèn)根據(jù)hive表對應(yīng)的hdfs文件的block,自動設(shè)置Spark SQL查詢所在的那個stage的并行度。你自己通過spark.default.parallelism參數(shù)指定的并行度,只會在沒有Spark SQL的stage中生效。

          比如你第一個stage,用了Spark SQL從hive表中查詢出了一些數(shù)據(jù),然后做了一些transformation操作,接著做了一個shuffle操作(groupByKey)。下一個stage,在shuffle操作之后,做了一些transformation操作。hive表,對應(yīng)了一個hdfs文件,有20個block。你自己設(shè)置了spark.default.parallelism參數(shù)為100。

          你的第一個stage的并行度,是不受你的控制的,就只有20個task。第二個stage,才會變成你自己設(shè)置的那個并行度,100。


          可能出現(xiàn)的問題?

          Spark SQL默認(rèn)情況下,它的那個并行度,咱們沒法設(shè)置。可能導(dǎo)致的問題,也許沒什么問題,也許很有問題。Spark SQL所在的那個stage中,后面的那些transformation操作,可能會有非常復(fù)雜的業(yè)務(wù)邏輯,甚至說復(fù)雜的算法。如果你的Spark SQL默認(rèn)把task數(shù)量設(shè)置的很少,20個,然后每個task要處理為數(shù)不少的數(shù)據(jù)量,然后還要執(zhí)行特別復(fù)雜的算法。

          這個時候,就會導(dǎo)致第一個stage的速度,特別慢。第二個stage1000個task非常快。


          解決Spark SQL無法設(shè)置并行度和task數(shù)量的辦法

          repartition算子,你用Spark SQL這一步的并行度和task數(shù)量,肯定是沒有辦法去改變了。但是呢,可以將你用Spark SQL查詢出來的RDD,使用repartition算子去重新進(jìn)行分區(qū),此時可以分成多個partition。然后呢,從repartition以后的RDD,再往后,并行度和task數(shù)量,就會按照你預(yù)期的來了。就可以避免跟Spark SQL綁定在一個stage中的算子,只能使用少量的task去處理大量數(shù)據(jù)以及復(fù)雜的算法邏輯。

           

          5

          reduceByKey本地聚合介紹


          reduceByKey,相較于普通的shuffle操作(比如groupByKey),它的一個特點,就是說,會進(jìn)行map端的本地聚合。對map端給下個stage每個task創(chuàng)建的輸出文件中,寫數(shù)據(jù)之前,就會進(jìn)行本地的combiner操作,也就是說對每一個key,對應(yīng)的values,都會執(zhí)行你的算子函數(shù)(_ + _)


          reduceByKey對性能的提升

          1、在本地進(jìn)行聚合以后,在map端的數(shù)據(jù)量就變少了,減少磁盤IO。而且可以減少磁盤空間的占用。

          2、下一個stage,拉取數(shù)據(jù)的量,也就變少了。減少網(wǎng)絡(luò)的數(shù)據(jù)傳輸?shù)男阅芟摹?/span>

          3、在reduce端進(jìn)行數(shù)據(jù)緩存的內(nèi)存占用變少了。

          4、reduce端,要進(jìn)行聚合的數(shù)據(jù)量也變少了。


          reduceByKey在什么情況下使用呢?

          1、非常普通的,比如說,就是要實現(xiàn)類似于wordcount程序一樣的,對每個key對應(yīng)的值,進(jìn)行某種數(shù)據(jù)公式或者算法的計算(累加、類乘)。

          2、對于一些類似于要對每個key進(jìn)行一些字符串拼接的這種較為復(fù)雜的操作,可以自己衡量一下,其實有時,也是可以使用reduceByKey來實現(xiàn)的。但是不太好實現(xiàn)。如果真能夠?qū)崿F(xiàn)出來,對性能絕對是有幫助的。(shuffle基本上就占了整個spark作業(yè)的90%以上的性能消耗,主要能對shuffle進(jìn)行一定的調(diào)優(yōu),都是有價值的)

           

          5

          troubleshooting


          1

          控制shuffle reduce端緩沖大小以避免OOM


          map端的task是不斷的輸出數(shù)據(jù)的,數(shù)據(jù)量可能是很大的。

          但是,其實reduce端的task,并不是等到map端task將屬于自己的那份數(shù)據(jù)全部寫入磁盤文件之后,再去拉取的。map端寫一點數(shù)據(jù),reduce端task就會拉取一小部分?jǐn)?shù)據(jù),立即進(jìn)行后面的聚合、算子函數(shù)的應(yīng)用。

          每次reduece能夠拉取多少數(shù)據(jù),就由buffer來決定。因為拉取過來的數(shù)據(jù),都是先放在buffer中的。然后才用后面的executor分配的堆內(nèi)存占比(0.2),hashmap,去進(jìn)行后續(xù)的聚合、函數(shù)的執(zhí)行。

          reduce端緩沖大小的另外一面,關(guān)于性能調(diào)優(yōu)的一面

          假如Map端輸出的數(shù)據(jù)量也不是特別大,然后你的整個application的資源也特別充足。200個executor、5個cpu core、10G內(nèi)存。

          其實可以嘗試去增加這個reduce端緩沖大小的,比如從48M,變成96M。那么這樣的話,每次reduce task能夠拉取的數(shù)據(jù)量就很大。需要拉取的次數(shù)也就變少了。比如原先需要拉取100次,現(xiàn)在只要拉取50次就可以執(zhí)行完了。

          對網(wǎng)絡(luò)傳輸性能開銷的減少,以及reduce端聚合操作執(zhí)行的次數(shù)的減少,都是有幫助的。

          最終達(dá)到的效果,就應(yīng)該是性能上的一定程度上的提升。

          注意,一定要在資源充足的前提下做此操作。

          reduce端緩沖(buffer),可能會出現(xiàn)的問題及解決方式

          可能會出現(xiàn),默認(rèn)是48MB,也許大多數(shù)時候,reduce端task一邊拉取一邊計算,不一定一直都會拉滿48M的數(shù)據(jù)。大多數(shù)時候,拉取個10M數(shù)據(jù),就計算掉了。

          大多數(shù)時候,也許不會出現(xiàn)什么問題。但是有的時候,map端的數(shù)據(jù)量特別大,然后寫出的速度特別快。reduce端所有task,拉取的時候,全部達(dá)到自己的緩沖的最大極限值,緩沖區(qū)48M,全部填滿。

          這個時候,再加上你的reduce端執(zhí)行的聚合函數(shù)的代碼,可能會創(chuàng)建大量的對象。也許,一下子內(nèi)存就撐不住了,就會OOM。reduce端的內(nèi)存中,就會發(fā)生內(nèi)存溢出的問題。

           

          針對上述的可能出現(xiàn)的問題,我們該怎么來解決呢?

          這個時候,就應(yīng)該減少reduce端task緩沖的大小。我寧愿多拉取幾次,但是每次同時能夠拉取到reduce端每個task的數(shù)量比較少,就不容易發(fā)生OOM內(nèi)存溢出的問題。(比如,可以調(diào)節(jié)成12M)

          在實際生產(chǎn)環(huán)境中,我們都是碰到過這種問題的。這是典型的以性能換執(zhí)行的原理。reduce端緩沖小了,不容易OOM了,但是,性能一定是有所下降的,你要拉取的次數(shù)就多了。就走更多的網(wǎng)絡(luò)傳輸開銷。

          這種時候,只能采取犧牲性能的方式了,spark作業(yè),首先,第一要義,就是一定要讓它可以跑起來。

          操作方法

          new SparkConf().set(spark.reducer.maxSizeInFlight,”48”)


          2

          解決JVM GC導(dǎo)致的shuffle文件拉取失敗

          問題描述

          有時會出現(xiàn)一種情況,在spark的作業(yè)中,log日志會提示shuffle file not found。(spark作業(yè)中,非常常見的)而且有的時候,它是偶爾才會出現(xiàn)的一種情況。有的時候,出現(xiàn)這種情況以后,重新去提交task。重新執(zhí)行一遍,發(fā)現(xiàn)就好了。沒有這種錯誤了。

          log怎么看?用client模式去提交你的spark作業(yè)。比如standalone client或yarn client。一提交作業(yè),直接可以在本地看到更新的log。

          問題原因:比如,executor的JVM進(jìn)程可能內(nèi)存不夠用了。那么此時就會執(zhí)行GC。minor GC or full GC。此時就會導(dǎo)致executor內(nèi),所有的工作線程全部停止。比如BlockManager,基于netty的網(wǎng)絡(luò)通信。

          下一個stage的executor,可能還沒有停止掉的task想要去上一個stage的task所在的exeuctor去拉取屬于自己的數(shù)據(jù),結(jié)果由于對方正在gc,就導(dǎo)致拉取了半天沒有拉取到。

          就很可能會報出shuffle file not found。但是,可能下一個stage又重新提交了task以后,再執(zhí)行就沒有問題了,因為可能第二次就沒有碰到JVM在gc了。

          解決方案

          spark.shuffle.io.maxRetries 3

          第一個參數(shù),意思就是說,shuffle文件拉取的時候,如果沒有拉取到(拉取失敗),最多或重試幾次(會重新拉取幾次文件),默認(rèn)是3次。

          spark.shuffle.io.retryWait 5s

          第二個參數(shù),意思就是說,每一次重試?yán)∥募臅r間間隔,默認(rèn)是5s鐘。

          默認(rèn)情況下,假如說第一個stage的executor正在進(jìn)行漫長的full gc。第二個stage的executor嘗試去拉取文件,結(jié)果沒有拉取到,默認(rèn)情況下,會反復(fù)重試?yán)?次,每次間隔是五秒鐘。最多只會等待3 * 5s = 15s。如果15s內(nèi),沒有拉取到shuffle file。就會報出shuffle file not found。

          針對這種情況,我們完全可以進(jìn)行預(yù)備性的參數(shù)調(diào)節(jié)。增大上述兩個參數(shù)的值,達(dá)到比較大的一個值,盡量保證第二個stage的task,一定能夠拉取到上一個stage的輸出文件。避免報shuffle file not found。然后可能會重新提交stage和task去執(zhí)行。那樣反而對性能也不好。

          spark.shuffle.io.maxRetries 60

          spark.shuffle.io.retryWait 60s

          最多可以忍受1個小時沒有拉取到shuffle file。只是去設(shè)置一個最大的可能的值。full gc不可能1個小時都沒結(jié)束吧。

          這樣呢,就可以盡量避免因為gc導(dǎo)致的shuffle file not found,無法拉取到的問題。

           

          3

          YARN隊列資源不足導(dǎo)致的application直接失敗


          問題描述

          如果說,你是基于yarn來提交spark。比如yarn-cluster或者yarn-client。你可以指定提交到某個hadoop隊列上的。每個隊列都是可以有自己的資源的。

          跟大家說一個生產(chǎn)環(huán)境中的,給spark用的yarn資源隊列的情況:500G內(nèi)存,200個cpu core。

          比如說,某個spark application,在spark-submit里面你自己配了,executor,80個。每個executor,4G內(nèi)存。每個executor,2個cpu core。你的spark作業(yè)每次運(yùn)行,大概要消耗掉320G內(nèi)存,以及160個cpu core。

          乍看起來,咱們的隊列資源,是足夠的,500G內(nèi)存,280個cpu core。

          首先,第一點,你的spark作業(yè)實際運(yùn)行起來以后,耗費掉的資源量,可能是比你在spark-submit里面配置的,以及你預(yù)期的,是要大一些的。400G內(nèi)存,190個cpu core。

          那么這個時候,的確,咱們的隊列資源還是有一些剩余的。但問題是如果你同時又提交了一個spark作業(yè)上去,一模一樣的。那就可能會出問題。

          第二個spark作業(yè),又要申請320G內(nèi)存+160個cpu core。結(jié)果,發(fā)現(xiàn)隊列資源不足。

          此時,可能會出現(xiàn)兩種情況:(備注,具體出現(xiàn)哪種情況,跟你的YARN、Hadoop的版本,你們公司的一些運(yùn)維參數(shù),以及配置、硬件、資源肯能都有關(guān)系)

          1、YARN,發(fā)現(xiàn)資源不足時,你的spark作業(yè),并沒有hang在那里,等待資源的分配,而是直接打印一行fail的log,直接就fail掉了。

          2、YARN,發(fā)現(xiàn)資源不足,你的spark作業(yè),就hang在那里。一直等待之前的spark作業(yè)執(zhí)行完,等待有資源分配給自己來執(zhí)行。


          解決方案

          1、在你的J2EE(我們這個項目里面,spark作業(yè)的運(yùn)行, J2EE平臺觸發(fā)的,執(zhí)行spark-submit腳本的平臺)進(jìn)行限制,同時只能提交一個spark作業(yè)到y(tǒng)arn上去執(zhí)行,確保一個spark作業(yè)的資源肯定是有的。

          2、你應(yīng)該采用一些簡單的調(diào)度區(qū)分的方式,比如說,有的spark作業(yè)可能是要長時間運(yùn)行的,比如運(yùn)行30分鐘。有的spark作業(yè),可能是短時間運(yùn)行的,可能就運(yùn)行2分鐘。此時,都提交到一個隊列上去,肯定不合適。很可能出現(xiàn)30分鐘的作業(yè)卡住后面一大堆2分鐘的作業(yè)。分隊列,可以申請(跟你們的YARN、Hadoop運(yùn)維的同事申請)。你自己給自己搞兩個調(diào)度隊列。每個隊列的根據(jù)你要執(zhí)行的作業(yè)的情況來設(shè)置。在你的J2EE程序里面,要判斷,如果是長時間運(yùn)行的作業(yè),就干脆都提交到某一個固定的隊列里面去把。如果是短時間運(yùn)行的作業(yè),就統(tǒng)一提交到另外一個隊列里面去。這樣,避免了長時間運(yùn)行的作業(yè),阻塞了短時間運(yùn)行的作業(yè)。

          3、你的隊列里面,無論何時,只會有一個作業(yè)在里面運(yùn)行。那么此時,就應(yīng)該用我們之前講過的性能調(diào)優(yōu)的手段,去將每個隊列能承載的最大的資源,分配給你的每一個spark作業(yè),比如80個executor,6G的內(nèi)存,3個cpu core。盡量讓你的spark作業(yè)每一次運(yùn)行,都達(dá)到最滿的資源使用率,最快的速度,最好的性能。并行度,240個cpu core,720個task。

          4、在J2EE中,通過線程池的方式(一個線程池對應(yīng)一個資源隊列),來實現(xiàn)上述我們說的方案。


          4

          解決各種序列化導(dǎo)致的報錯


          問題描述

          用client模式去提交spark作業(yè),觀察本地打印出來的log。如果出現(xiàn)了類似于Serializable、Serialize等等字眼報錯的log,那么恭喜大家,就碰到了序列化問題導(dǎo)致的報錯。


          序列化報錯及解決方法

          1、你的算子函數(shù)里面,如果使用到了外部的自定義類型的變量,那么此時,就要求你的自定義類型,必須是可序列化的。

          final Teacher teacher = new Teacher("leo");studentsRDD.foreach(new VoidFunction() {public void call(Row row) throws Exception {   String teacherName = teacher.getName();   ....  }});
          public class Teacher implements Serializable {}

          2、如果要將自定義的類型,作為RDD的元素類型,那么自定義的類型也必須是可以序列化的。

          JavaPairRDD<Integer, Teacher> teacherRDDJavaPairRDD<Integer, Student> studentRDDstudentRDD.join(teacherRDD)public class Teacher implements Serializable {}
          public class Student implements Serializable {}

          3、不能在上述兩種情況下,去使用一些第三方的,不支持序列化的類型。

          Connection conn =studentsRDD.foreach(new VoidFunction() {public void call(Row row) throws Exception {  conn.....}});

          Connection是不支持序列化的


          5

          解決算子函數(shù)返回NULL導(dǎo)致的問題


          問題描述

          在有些算子函數(shù)里面,是需要我們有一個返回值的。但是,有時候不需要返回值。我們?nèi)绻苯臃祷豊ULL的話,是會報錯的。

          Scala.Math(NULL),異常

          解決方案

          如果碰到你的確是對于某些值不想要有返回值的話,有一個解決的辦法:

          1、在返回的時候,返回一些特殊的值,不要返回null,比如“-999”

          2、在通過算子獲取到了一個RDD之后,可以對這個RDD執(zhí)行filter操作,進(jìn)行數(shù)據(jù)過濾。filter內(nèi),可以對數(shù)據(jù)進(jìn)行判定,如果是-999,那么就返回false,給過濾掉就可以了。

          3、大家不要忘了,之前咱們講過的那個算子調(diào)優(yōu)里面的coalesce算子,在filter之后,可以使用coalesce算子壓縮一下RDD的partition的數(shù)量,讓各個partition的數(shù)據(jù)比較緊湊一些。也能提升一些性能。

           

          6

          解決yarn-client模式導(dǎo)致的網(wǎng)卡流量激增問題


          Spark-On-Yarn任務(wù)執(zhí)行流程

          Driver到底是什么?

          我們寫的spark程序,打成jar包,用spark-submit來提交。jar包中的一個main類,通過jvm的命令啟動起來。

          JVM進(jìn)程,其實就是Driver進(jìn)程。

          Driver進(jìn)程啟動起來以后,執(zhí)行我們自己寫的main函數(shù),從new SparkContext()開始。

          driver接收到屬于自己的executor進(jìn)程的注冊之后,就可以去進(jìn)行我們寫的spark作業(yè)代碼的執(zhí)行了。此時會一行一行的去執(zhí)行咱們寫的那些spark代碼。執(zhí)行到某個action操作的時候,就會觸發(fā)一個job,然后DAGScheduler會將job劃分為一個一個的stage,為每個stage都創(chuàng)建指定數(shù)量的task。TaskScheduler將每個stage的task分配到各個executor上面去執(zhí)行。

          task就會執(zhí)行咱們寫的算子函數(shù)。

          spark在yarn-client模式下,application的注冊(executor的申請)和計算task的調(diào)度,是分離開來的。

          standalone模式下,這兩個操作都是driver負(fù)責(zé)的。

          ApplicationMaster(ExecutorLauncher)負(fù)責(zé)executor的申請,driver負(fù)責(zé)job和stage的劃分,以及task的創(chuàng)建、分配和調(diào)度。

          每種計算框架(mr、spark),如果想要在yarn上執(zhí)行自己的計算應(yīng)用,那么就必須自己實現(xiàn)和提供一個ApplicationMaster。相當(dāng)于是實現(xiàn)了yarn提供的接口,spark自己開發(fā)的一個類。


          yarn-client模式下,會產(chǎn)生什么樣的問題呢?

          由于driver是啟動在本地機(jī)器的,而且driver是全權(quán)負(fù)責(zé)所有的任務(wù)的調(diào)度的,也就是說要跟yarn集群上運(yùn)行的多個executor進(jìn)行頻繁的通信(中間有task的啟動消息、task的執(zhí)行統(tǒng)計消息、task的運(yùn)行狀態(tài)、shuffle的輸出結(jié)果)。

          想象一下,比如你的executor有100個,stage有10個,task有1000個。每個stage運(yùn)行的時候,都有1000個task提交到executor上面去運(yùn)行,平均每個executor有10個task。接下來問題來了,driver要頻繁地跟executor上運(yùn)行的1000個task進(jìn)行通信。通信消息特別多,通信的頻率特別高。運(yùn)行完一個stage,接著運(yùn)行下一個stage,又是頻繁的通信。

          在整個spark運(yùn)行的生命周期內(nèi),都會頻繁的去進(jìn)行通信和調(diào)度。所有這一切通信和調(diào)度都是從你的本地機(jī)器上發(fā)出去的,和接收到的。這是最要命的地方。你的本地機(jī)器,很可能在30分鐘內(nèi)(spark作業(yè)運(yùn)行的周期內(nèi)),進(jìn)行頻繁大量的網(wǎng)絡(luò)通信。那么此時,你的本地機(jī)器的網(wǎng)絡(luò)通信負(fù)載是非常非常高的。會導(dǎo)致你的本地機(jī)器的網(wǎng)卡流量會激增!

          你的本地機(jī)器的網(wǎng)卡流量激增,當(dāng)然不是一件好事了。因為在一些大的公司里面,對每臺機(jī)器的使用情況,都是有監(jiān)控的。不會允許單個機(jī)器出現(xiàn)耗費大量網(wǎng)絡(luò)帶寬等等這種資源的情況。


          解決方案

          實際上解決的方法很簡單,就是心里要清楚,yarn-client模式是什么情況下,可以使用的?yarn-client模式,通常咱們就只會使用在測試環(huán)境中,你寫好了某個spark作業(yè),打了一個jar包,在某臺測試機(jī)器上,用yarn-client模式去提交一下。因為測試的行為是偶爾為之的,不會長時間連續(xù)提交大量的spark作業(yè)去測試。還有一點好處,yarn-client模式提交,可以在本地機(jī)器觀察到詳細(xì)全面的log。通過查看log,可以去解決線上報錯的故障(troubleshooting)、對性能進(jìn)行觀察并進(jìn)行性能調(diào)優(yōu)。

          實際上線了以后,在生產(chǎn)環(huán)境中,都得用yarn-cluster模式,去提交你的spark作業(yè)。

          yarn-cluster模式,就跟你的本地機(jī)器引起的網(wǎng)卡流量激增的問題,就沒有關(guān)系了。也就是說,就算有問題,也應(yīng)該是yarn運(yùn)維團(tuán)隊和基礎(chǔ)運(yùn)維團(tuán)隊之間的事情了。使用了yarn-cluster模式以后,就不是你的本地機(jī)器運(yùn)行Driver,進(jìn)行task調(diào)度了。是yarn集群中,某個節(jié)點會運(yùn)行driver進(jìn)程,負(fù)責(zé)task調(diào)度。

           

          7

          解決yarn-cluster模式的JVM棧內(nèi)存溢出問題


          問題描述

          有的時候,運(yùn)行一些包含了spark sql的spark作業(yè),可能會碰到y(tǒng)arn-client模式下,可以正常提交運(yùn)行。yarn-cluster模式下,可能無法提交運(yùn)行的,會報出JVM的PermGen(永久代)的內(nèi)存溢出,OOM。

          yarn-client模式下,driver是運(yùn)行在本地機(jī)器上的,spark使用的JVM的PermGen的配置,是本地的spark-class文件(spark客戶端是默認(rèn)有配置的),JVM的永久代的大小是128M,這個是沒有問題的。但是在yarn-cluster模式下,driver是運(yùn)行在yarn集群的某個節(jié)點上的,使用的是沒有經(jīng)過配置的默認(rèn)設(shè)置(PermGen永久代大小),82M。

          spark-sql,它的內(nèi)部是要進(jìn)行很復(fù)雜的SQL的語義解析、語法樹的轉(zhuǎn)換等等,特別復(fù)雜。在這種復(fù)雜的情況下,如果說你的sql本身特別復(fù)雜的話,很可能會比較導(dǎo)致性能的消耗,內(nèi)存的消耗。可能對PermGen永久代的占用會比較大。

          所以,此時,如果對永久代的占用需求,超過了82M的話,但是呢又在128M以內(nèi),就會出現(xiàn)如上所述的問題,yarn-client模式下,默認(rèn)是128M,這個還能運(yùn)行,如果在yarn-cluster模式下,默認(rèn)是82M,就有問題了。會報出PermGen Out of Memory error log。


          解決方案

          既然是JVM的PermGen永久代內(nèi)存溢出,那么就是內(nèi)存不夠用。我們就給yarn-cluster模式下的driver的PermGen多設(shè)置一些。

          spark-submit腳本中,加入以下配置即可:

          --conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"


          這個就設(shè)置了driver永久代的大小,默認(rèn)是128M,最大是256M。這樣的話,就可以基本保證你的spark作業(yè)不會出現(xiàn)上述的yarn-cluster模式導(dǎo)致的永久代內(nèi)存溢出的問題。

          spark sql中,寫sql,要注意一個問題:

          如果sql有大量的or語句。比如where keywords='' or keywords='' or keywords=''

          當(dāng)達(dá)到or語句,有成百上千的時候,此時可能就會出現(xiàn)一個driver端的jvm stack overflow,JVM棧內(nèi)存溢出的問題。

          JVM棧內(nèi)存溢出,基本上就是由于調(diào)用的方法層級過多,因為產(chǎn)生了大量的,非常深的,超出了JVM棧深度限制的遞歸方法。我們的猜測,spark sql有大量or語句的時候,spark sql內(nèi)部源碼中,在解析sql,比如轉(zhuǎn)換成語法樹,或者進(jìn)行執(zhí)行計劃的生成的時候,對or的處理是遞歸。or特別多的話,就會發(fā)生大量的遞歸。

          JVM Stack Memory Overflow,棧內(nèi)存溢出。

          這種時候,建議不要搞那么復(fù)雜的spark sql語句。采用替代方案:將一條sql語句,拆解成多條sql語句來執(zhí)行。每條sql語句,就只有100個or子句以內(nèi)。一條一條SQL語句來執(zhí)行。根據(jù)生產(chǎn)環(huán)境經(jīng)驗的測試,一條sql語句,100個or子句以內(nèi),是還可以的。通常情況下,不會報那個棧內(nèi)存溢出。

           

          8

          錯誤的持久化方式以及checkpoint的使用

          使用持久化方式

          錯誤的持久化使用方式:

          usersRDD,想要對這個RDD做一個cache,希望能夠在后面多次使用這個RDD的時候,不用反復(fù)重新計算RDD。可以直接使用通過各個節(jié)點上的executor的BlockManager管理的內(nèi)存 / 磁盤上的數(shù)據(jù),避免重新反復(fù)計算RDD。

          usersRDD.cache()

          usersRDD.count()

          usersRDD.take()

          上面這種方式,不要說會不會生效了,實際上是會報錯的。會報什么錯誤呢?會報一大堆file not found的錯誤。

           

          正確的持久化使用方式:

          usersRDD

          usersRDD = usersRDD.cache() // Java

          val cachedUsersRDD = usersRDD.cache() // Scala

          之后再去使用usersRDD,或者cachedUsersRDD就可以了。


          checkpoint的使用

          對于持久化,大多數(shù)時候都是會正常工作的。但有些時候會出現(xiàn)意外。

          比如說,緩存在內(nèi)存中的數(shù)據(jù),可能莫名其妙就丟失掉了。

          或者說,存儲在磁盤文件中的數(shù)據(jù),莫名其妙就沒了,文件被誤刪了。

          出現(xiàn)上述情況的時候,如果要對這個RDD執(zhí)行某些操作,可能會發(fā)現(xiàn)RDD的某個partition找不到了。

          下來task就會對消失的partition重新計算,計算完以后再緩存和使用。

          有些時候,計算某個RDD,可能是極其耗時的。可能RDD之前有大量的父RDD。那么如果你要重新計算一個partition,可能要重新計算之前所有的父RDD對應(yīng)的partition。

          這種情況下,就可以選擇對這個RDD進(jìn)行checkpoint,以防萬一。進(jìn)行checkpoint,就是說,會將RDD的數(shù)據(jù),持久化一份到容錯的文件系統(tǒng)上(比如hdfs)。

          在對這個RDD進(jìn)行計算的時候,如果發(fā)現(xiàn)它的緩存數(shù)據(jù)不見了。優(yōu)先就是先找一下有沒有checkpoint數(shù)據(jù)(到hdfs上面去找)。如果有的話,就使用checkpoint數(shù)據(jù)了。不至于去重新計算。

          checkpoint,其實就是可以作為是cache的一個備胎。如果cache失效了,checkpoint就可以上來使用了。

          checkpoint有利有弊,利在于,提高了spark作業(yè)的可靠性,一旦發(fā)生問題,還是很可靠的,不用重新計算大量的rdd。但是弊在于,進(jìn)行checkpoint操作的時候,也就是將rdd數(shù)據(jù)寫入hdfs中的時候,還是會消耗性能的。

          checkpoint,用性能換可靠性。

          checkpoint原理:

          1、在代碼中,用SparkContext,設(shè)置一個checkpoint目錄,可以是一個容錯文件系統(tǒng)的目錄,比如hdfs。

          2、在代碼中,對需要進(jìn)行checkpoint的rdd,執(zhí)行RDD.checkpoint()。

          3、RDDCheckpointData(spark內(nèi)部的API),接管你的RDD,會標(biāo)記為marked for checkpoint,準(zhǔn)備進(jìn)行checkpoint。

          4、你的job運(yùn)行完之后,會調(diào)用一個finalRDD.doCheckpoint()方法,會順著rdd lineage,回溯掃描,發(fā)現(xiàn)有標(biāo)記為待checkpoint的rdd,就會進(jìn)行二次標(biāo)記,inProgressCheckpoint,正在接受checkpoint操作。

          5、job執(zhí)行完之后,就會啟動一個內(nèi)部的新job,去將標(biāo)記為inProgressCheckpoint的rdd的數(shù)據(jù),都寫入hdfs文件中。(備注,如果rdd之前cache過,會直接從緩存中獲取數(shù)據(jù),寫入hdfs中。如果沒有cache過,那么就會重新計算一遍這個rdd,再checkpoint)。

          6、將checkpoint過的rdd之前的依賴rdd,改成一個CheckpointRDD*,強(qiáng)制改變你的rdd的lineage。后面如果rdd的cache數(shù)據(jù)獲取失敗,直接會通過它的上游CheckpointRDD,去容錯的文件系統(tǒng),比如hdfs,中,獲取checkpoint的數(shù)據(jù)。

          checkpoint的使用:

          1、sc.checkpointFile("hdfs://"),設(shè)置checkpoint目錄

          2、對RDD執(zhí)行checkpoint操作


          6

          數(shù)據(jù)傾斜解決方案


          數(shù)據(jù)傾斜的解決,跟之前講解的性能調(diào)優(yōu),有一點異曲同工之妙。

          性能調(diào)優(yōu)中最有效最直接最簡單的方式就是加資源加并行度,并注意RDD架構(gòu)(復(fù)用同一個RDD,加上cache緩存)。相對于前面,shuffle、jvm等是次要的。

          1

          原理以及現(xiàn)象分析

          數(shù)據(jù)傾斜怎么出現(xiàn)的

          在執(zhí)行shuffle操作的時候,是按照key,來進(jìn)行values的數(shù)據(jù)的輸出、拉取和聚合的。

          同一個key的values,一定是分配到一個reduce task進(jìn)行處理的。

          多個key對應(yīng)的values,比如一共是90萬。可能某個key對應(yīng)了88萬數(shù)據(jù),被分配到一個task上去面去執(zhí)行。

          另外兩個task,可能各分配到了1萬數(shù)據(jù),可能是數(shù)百個key,對應(yīng)的1萬條數(shù)據(jù)。

          這樣就會出現(xiàn)數(shù)據(jù)傾斜問題。

          想象一下,出現(xiàn)數(shù)據(jù)傾斜以后的運(yùn)行的情況。很糟糕!

          其中兩個task,各分配到了1萬數(shù)據(jù),可能同時在10分鐘內(nèi)都運(yùn)行完了。另外一個task有88萬條,88 * 10 =  880分鐘 = 14.5個小時。

          大家看,本來另外兩個task很快就運(yùn)行完畢了(10分鐘),但是由于一個拖后腿的家伙,第三個task,要14.5個小時才能運(yùn)行完,就導(dǎo)致整個spark作業(yè),也得14.5個小時才能運(yùn)行完。

          數(shù)據(jù)傾斜,一旦出現(xiàn),是不是性能殺手?!


          發(fā)生數(shù)據(jù)傾斜以后的現(xiàn)象

          Spark數(shù)據(jù)傾斜,有兩種表現(xiàn):

          1、你的大部分的task,都執(zhí)行的特別特別快,(你要用client模式,standalone client,yarn client,本地機(jī)器一執(zhí)行spark-submit腳本,就會開始打印log),task175 finished,剩下幾個task,執(zhí)行的特別特別慢,前面的task,一般1s可以執(zhí)行完5個,最后發(fā)現(xiàn)1000個task,998,999 task,要執(zhí)行1個小時,2個小時才能執(zhí)行完一個task。

          出現(xiàn)以上loginfo,就表明出現(xiàn)數(shù)據(jù)傾斜了。

          這樣還算好的,因為雖然老牛拉破車一樣非常慢,但是至少還能跑。

          2、另一種情況是,運(yùn)行的時候,其他task都執(zhí)行完了,也沒什么特別的問題,但是有的task,就是會突然間報了一個OOM,JVM Out Of Memory,內(nèi)存溢出了,task failed,task lost,resubmitting task。反復(fù)執(zhí)行幾次都到了某個task就是跑不通,最后就掛掉。

          某個task就直接OOM,那么基本上也是因為數(shù)據(jù)傾斜了,task分配的數(shù)量實在是太大了!所以內(nèi)存放不下,然后你的task每處理一條數(shù)據(jù),還要創(chuàng)建大量的對象,內(nèi)存爆掉了。

          這樣也表明出現(xiàn)數(shù)據(jù)傾斜了。

          這種就不太好了,因為你的程序如果不去解決數(shù)據(jù)傾斜的問題,壓根兒就跑不出來。

          作業(yè)都跑不完,還談什么性能調(diào)優(yōu)這些東西?!


          定位數(shù)據(jù)傾斜出現(xiàn)的原因與出現(xiàn)問題的位置

          根據(jù)log去定位

          出現(xiàn)數(shù)據(jù)傾斜的原因,基本只可能是因為發(fā)生了shuffle操作,在shuffle的過程中,出現(xiàn)了數(shù)據(jù)傾斜的問題。因為某個或者某些key對應(yīng)的數(shù)據(jù),遠(yuǎn)遠(yuǎn)的高于其他的key。

          1、你在自己的程序里面找找,哪些地方用了會產(chǎn)生shuffle的算子,groupByKey、countByKey、reduceByKey、join

          2、看log

          log一般會報是在你的哪一行代碼,導(dǎo)致了OOM異常。或者看log,看看是執(zhí)行到了第幾個stage。spark代碼,是怎么劃分成一個一個的stage的。哪一個stage生成的task特別慢,就能夠自己用肉眼去對你的spark代碼進(jìn)行stage的劃分,就能夠通過stage定位到你的代碼,到底哪里發(fā)生了數(shù)據(jù)傾斜。


          2

          聚合源數(shù)據(jù)以及過濾導(dǎo)致傾斜的key

          數(shù)據(jù)傾斜解決方案,第一個方案和第二個方案,一起來講。這兩個方案是最直接、最有效、最簡單的解決數(shù)據(jù)傾斜問題的方案。

          第一個方案:聚合源數(shù)據(jù)。

          第二個方案:過濾導(dǎo)致傾斜的key。

          后面的五個方案,尤其是最后4個方案,都是那種特別狂拽炫酷吊炸天的方案。但沒有第一二個方案簡單直接。如果碰到了數(shù)據(jù)傾斜的問題。上來就先考慮第一個和第二個方案看能不能做,如果能做的話,后面的5個方案,都不用去搞了。

          有效、簡單、直接才是最好的,徹底根除了數(shù)據(jù)傾斜的問題。


          方案一:聚合源數(shù)據(jù)

          一些聚合的操作,比如groupByKey、reduceByKey,groupByKey說白了就是拿到每個key對應(yīng)的values。reduceByKey說白了就是對每個key對應(yīng)的values執(zhí)行一定的計算。

          這些操作,比如groupByKey和reduceByKey,包括之前說的join。都是在spark作業(yè)中執(zhí)行的。

          spark作業(yè)的數(shù)據(jù)來源,通常是哪里呢?90%的情況下,數(shù)據(jù)來源都是hive表(hdfs,大數(shù)據(jù)分布式存儲系統(tǒng))。hdfs上存儲的大數(shù)據(jù)。hive表中的數(shù)據(jù)通常是怎么出來的呢?有了spark以后,hive比較適合做什么事情?hive就是適合做離線的,晚上凌晨跑的,ETL(extract transform load,數(shù)據(jù)的采集、清洗、導(dǎo)入),hive sql,去做這些事情,從而去形成一個完整的hive中的數(shù)據(jù)倉庫。說白了,數(shù)據(jù)倉庫,就是一堆表。

          spark作業(yè)的源表,hive表,通常情況下來說,也是通過某些hive etl生成的。hive etl可能是晚上凌晨在那兒跑。今天跑昨天的數(shù)據(jù)。

          數(shù)據(jù)傾斜,某個key對應(yīng)的80萬數(shù)據(jù),某些key對應(yīng)幾百條,某些key對應(yīng)幾十條。現(xiàn)在咱們直接在生成hive表的hive etl中對數(shù)據(jù)進(jìn)行聚合。比如按key來分組,將key對應(yīng)的所有的values全部用一種特殊的格式拼接到一個字符串里面去,比如“key=sessionid, value: action_seq=1|user_id=1|search_keyword=火鍋|category_id=001;action_seq=2|user_id=1|search_keyword=涮肉|category_id=001”。

          對key進(jìn)行g(shù)roup,在spark中,拿到key=sessionid,values<Iterable>。hive etl中,直接對key進(jìn)行了聚合。那么也就意味著,每個key就只對應(yīng)一條數(shù)據(jù)。在spark中,就不需要再去執(zhí)行g(shù)roupByKey+map這種操作了。直接對每個key對應(yīng)的values字符串進(jìn)行map操作,進(jìn)行你需要的操作即可。

          spark中,可能對這個操作,就不需要執(zhí)行shffule操作了,也就根本不可能導(dǎo)致數(shù)據(jù)傾斜。

          或者是對每個key在hive etl中進(jìn)行聚合,對所有values聚合一下,不一定是拼接起來,可能是直接進(jìn)行計算。reduceByKey計算函數(shù)應(yīng)用在hive etl中,從而得到每個key的values。

           

          聚合源數(shù)據(jù)方案第二種做法是,你可能沒有辦法對每個key聚合出來一條數(shù)據(jù)。那么也可以做一個妥協(xié),對每個key對應(yīng)的數(shù)據(jù),10萬條。有好幾個粒度,比如10萬條里面包含了幾個城市、幾天、幾個地區(qū)的數(shù)據(jù),現(xiàn)在放粗粒度。直接就按照城市粒度,做一下聚合,幾個城市,幾天、幾個地區(qū)粒度的數(shù)據(jù),都給聚合起來。比如說

          city_id date area_id

          select ... from ... group by city_id

          盡量去聚合,減少每個key對應(yīng)的數(shù)量,也許聚合到比較粗的粒度之后,原先有10萬數(shù)據(jù)量的key,現(xiàn)在只有1萬數(shù)據(jù)量。減輕數(shù)據(jù)傾斜的現(xiàn)象和問題。


          方案二:過濾導(dǎo)致傾斜的key

          如果你能夠接受某些數(shù)據(jù)在spark作業(yè)中直接就摒棄掉不使用。比如說,總共有100萬個key。只有2個key是數(shù)據(jù)量達(dá)到10萬的。其他所有的key,對應(yīng)的數(shù)量都是幾十萬。

          這個時候,你自己可以去取舍,如果業(yè)務(wù)和需求可以理解和接受的話,在你從hive表查詢源數(shù)據(jù)的時候,直接在sql中用where條件,過濾掉某幾個key。

          那么這幾個原先有大量數(shù)據(jù),會導(dǎo)致數(shù)據(jù)傾斜的key,被過濾掉之后,那么在你的spark作業(yè)中,自然就不會發(fā)生數(shù)據(jù)傾斜了。


          3

          提高shuffle操作reduce并行度


          問題描述

          第一個和第二個方案,都不適合做,然后再考慮這個方案。

          將reduce task的數(shù)量變多,就可以讓每個reduce task分配到更少的數(shù)據(jù)量。這樣的話也許就可以緩解甚至是基本解決掉數(shù)據(jù)傾斜的問題。


          提升shuffle reduce端并行度的操作方法

          很簡單,主要給我們所有的shuffle算子,比如groupByKey、countByKey、reduceByKey。在調(diào)用的時候,傳入進(jìn)去一個參數(shù)。那個數(shù)字,就代表了那個shuffle操作的reduce端的并行度。那么在進(jìn)行shuffle操作的時候,就會對應(yīng)著創(chuàng)建指定數(shù)量的reduce task。

          這樣的話,就可以讓每個reduce task分配到更少的數(shù)據(jù)。基本可以緩解數(shù)據(jù)傾斜的問題。

          比如說,原本某個task分配數(shù)據(jù)特別多,直接OOM,內(nèi)存溢出了,程序沒法運(yùn)行,直接掛掉。按照log,找到發(fā)生數(shù)據(jù)傾斜的shuffle操作,給它傳入一個并行度數(shù)字,這樣的話,原先那個task分配到的數(shù)據(jù),肯定會變少。就至少可以避免OOM的情況,程序至少是可以跑的。


          提升shuffle reduce并行度的缺陷

          治標(biāo)不治本的意思,因為它沒有從根本上改變數(shù)據(jù)傾斜的本質(zhì)和問題。不像第一個和第二個方案(直接避免了數(shù)據(jù)傾斜的發(fā)生)。原理沒有改變,只是說,盡可能地去緩解和減輕shuffle reduce task的數(shù)據(jù)壓力,以及數(shù)據(jù)傾斜的問題。

          實際生產(chǎn)環(huán)境中的經(jīng)驗:

          1、如果最理想的情況下,提升并行度以后,減輕了數(shù)據(jù)傾斜的問題,或者甚至可以讓數(shù)據(jù)傾斜的現(xiàn)象忽略不計,那么就最好。就不用做其他的數(shù)據(jù)傾斜解決方案了。

          2、不太理想的情況下,比如之前某個task運(yùn)行特別慢,要5個小時,現(xiàn)在稍微快了一點,變成了4個小時。或者是原先運(yùn)行到某個task,直接OOM,現(xiàn)在至少不會OOM了,但是那個task運(yùn)行特別慢,要5個小時才能跑完。

          那么,如果出現(xiàn)第二種情況的話,各位,就立即放棄第三種方案,開始去嘗試和選擇后面的四種方案。

          4

          使用隨機(jī)key實現(xiàn)雙重聚合

          使用場景

          groupByKey、reduceByKey比較適合使用這種方式。join咱們通常不會這樣來做,后面會講三種針對不同的join造成的數(shù)據(jù)傾斜的問題的解決方案。


          解決方案

          第一輪聚合的時候,對key進(jìn)行打散,將原先一樣的key,變成不一樣的key,相當(dāng)于是將每個key分為多組。

          先針對多個組,進(jìn)行key的局部聚合。接著,再去除掉每個key的前綴,然后對所有的key進(jìn)行全局的聚合。

          對groupByKey、reduceByKey造成的數(shù)據(jù)傾斜,有比較好的效果。

          如果說,之前的第一、第二、第三種方案,都沒法解決數(shù)據(jù)傾斜的問題,那么就只能依靠這一種方式了。


          5

          將reduce join轉(zhuǎn)換為map join


          使用方式

          普通的join,那么肯定是要走shuffle。既然是走shuffle,那么普通的join就肯定是走的是reduce join。那怎么將reduce join 轉(zhuǎn)換為mapjoin呢?先將所有相同的key,對應(yīng)的value匯聚到一個task中,然后再進(jìn)行join。


          使用場景

          這種方式適合在什么樣的情況下來使用?

          如果兩個RDD要進(jìn)行join,其中一個RDD是比較小的。比如一個RDD是100萬數(shù)據(jù),一個RDD是1萬數(shù)據(jù)。(一個RDD是1億數(shù)據(jù),一個RDD是100萬數(shù)據(jù))。

          其中一個RDD必須是比較小的,broadcast出去那個小RDD的數(shù)據(jù)以后,就會在每個executor的block manager中都保存一份。要確保你的內(nèi)存足夠存放那個小RDD中的數(shù)據(jù)。

          這種方式下,根本不會發(fā)生shuffle操作,肯定也不會發(fā)生數(shù)據(jù)傾斜。從根本上杜絕了join操作可能導(dǎo)致的數(shù)據(jù)傾斜的問題。

          對于join中有數(shù)據(jù)傾斜的情況,大家盡量第一時間先考慮這種方式,效果非常好。

          不適合的情況

          兩個RDD都比較大,那么這個時候,你去將其中一個RDD做成broadcast,就很笨拙了。很可能導(dǎo)致內(nèi)存不足。最終導(dǎo)致內(nèi)存溢出,程序掛掉。

          而且其中某些key(或者是某個key),還發(fā)生了數(shù)據(jù)傾斜。此時可以采用最后兩種方式。

          對于join這種操作,不光是考慮數(shù)據(jù)傾斜的問題。即使是沒有數(shù)據(jù)傾斜問題,也完全可以優(yōu)先考慮,用我們講的這種高級的reduce join轉(zhuǎn)map join的技術(shù),不要用普通的join,去通過shuffle,進(jìn)行數(shù)據(jù)的join。完全可以通過簡單的map,使用map join的方式,犧牲一點內(nèi)存資源。在可行的情況下,優(yōu)先這么使用。

          不走shuffle,直接走map,是不是性能也會高很多?這是肯定的。

           

          6

          sample采樣傾斜key單獨進(jìn)行join


          方案實現(xiàn)思路

          將發(fā)生數(shù)據(jù)傾斜的key,單獨拉出來,放到一個RDD中去。就用這個原本會傾斜的key RDD跟其他RDD單獨去join一下,這個時候key對應(yīng)的數(shù)據(jù)可能就會分散到多個task中去進(jìn)行join操作。

          就不至于說是,這個key跟之前其他的key混合在一個RDD中時,肯定是會導(dǎo)致一個key對應(yīng)的所有數(shù)據(jù)都到一個task中去,就會導(dǎo)致數(shù)據(jù)傾斜。


          使用場景

          這種方案什么時候適合使用?

          優(yōu)先對于join,肯定是希望能夠采用上一個方案,即reduce join轉(zhuǎn)換map join。兩個RDD數(shù)據(jù)都比較大,那么就不要那么搞了。

          針對你的RDD的數(shù)據(jù),你可以自己把它轉(zhuǎn)換成一個中間表,或者是直接用countByKey()的方式,你可以看一下這個RDD各個key對應(yīng)的數(shù)據(jù)量。此時如果你發(fā)現(xiàn)整個RDD就一個,或者少數(shù)幾個key對應(yīng)的數(shù)據(jù)量特別多。盡量建議,比如就是一個key對應(yīng)的數(shù)據(jù)量特別多。

          此時可以采用這種方案,單拉出來那個最多的key,單獨進(jìn)行join,盡可能地將key分散到各個task上去進(jìn)行join操作。

          什么時候不適用呢?

          如果一個RDD中,導(dǎo)致數(shù)據(jù)傾斜的key特別多。那么此時,最好還是不要這樣了。還是使用我們最后一個方案,終極的join數(shù)據(jù)傾斜的解決方案。

          就是說,咱們單拉出來了一個或者少數(shù)幾個可能會產(chǎn)生數(shù)據(jù)傾斜的key,然后還可以進(jìn)行更加優(yōu)化的一個操作。

          對于那個key,從另外一個要join的表中,也過濾出來一份數(shù)據(jù),比如可能就只有一條數(shù)據(jù)。userid2infoRDD,一個userid key,就對應(yīng)一條數(shù)據(jù)。

          然后呢,采取對那個只有一條數(shù)據(jù)的RDD,進(jìn)行flatMap操作,打上100個隨機(jī)數(shù),作為前綴,返回100條數(shù)據(jù)。

          單獨拉出來的可能產(chǎn)生數(shù)據(jù)傾斜的RDD,給每一條數(shù)據(jù),都打上一個100以內(nèi)的隨機(jī)數(shù),作為前綴。

          再去進(jìn)行join,是不是性能就更好了。肯定可以將數(shù)據(jù)進(jìn)行打散,去進(jìn)行join。join完以后,可以執(zhí)行map操作,去將之前打上的隨機(jī)數(shù)給去掉,然后再和另外一個普通RDD join以后的結(jié)果進(jìn)行union操作。

           

          7

          使用隨機(jī)數(shù)以及擴(kuò)容表進(jìn)行join

          使用場景及步驟

          當(dāng)采用隨機(jī)數(shù)和擴(kuò)容表進(jìn)行join解決數(shù)據(jù)傾斜的時候,就代表著,你的之前的數(shù)據(jù)傾斜的解決方案,都沒法使用。

          這個方案是沒辦法徹底解決數(shù)據(jù)傾斜的,更多的,是一種對數(shù)據(jù)傾斜的緩解。

          步驟:

          1、選擇一個RDD,要用flatMap,進(jìn)行擴(kuò)容,將每條數(shù)據(jù),映射為多條數(shù)據(jù),每個映射出來的數(shù)據(jù),都帶了一個n以內(nèi)的隨機(jī)數(shù),通常來說會選擇10。

          2、將另外一個RDD,做普通的map映射操作,每條數(shù)據(jù)都打上一個10以內(nèi)的隨機(jī)數(shù)。

          3、最后將兩個處理后的RDD進(jìn)行join操作。

           

          局限性

          1、因為你的兩個RDD都很大,所以你沒有辦法去將某一個RDD擴(kuò)的特別大,一般咱們就是10倍。

          2、如果就是10倍的話,那么數(shù)據(jù)傾斜問題的確是只能說是緩解和減輕,不能說徹底解決。

          sample采樣傾斜key并單獨進(jìn)行join

          將key,從另外一個RDD中過濾出的數(shù)據(jù),可能只有一條或者幾條,此時,咱們可以任意進(jìn)行擴(kuò)容,擴(kuò)成1000倍。

          將從第一個RDD中拆分出來的那個傾斜key RDD,打上1000以內(nèi)的一個隨機(jī)數(shù)。

          這種情況下,還可以配合上,提升shuffle reduce并行度,join(rdd, 1000)。通常情況下,效果還是非常不錯的。

          打散成100份,甚至1000份,2000份,去進(jìn)行join,那么就肯定沒有數(shù)據(jù)傾斜的問題了吧。


          7

          實時計算程序性能調(diào)優(yōu)


          1、并行化數(shù)據(jù)接收:處理多個topic的數(shù)據(jù)時比較有效

          int numStreams = 5;List<JavaPairDStream<StringString>> kafkaStreams = new ArrayList<JavaPairDStream<StringString>>(numStreams);for (int i = 0; i < numStreams; i++) {  kafkaStreams.add(KafkaUtils.createStream(...));}JavaPairDStream<StringString> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));unifiedStream.print();


          2、spark.streaming.blockInterval:增加block數(shù)量,增加每個batch rdd的partition數(shù)量,增加處理并行度

          receiver從數(shù)據(jù)源源源不斷地獲取到數(shù)據(jù);首先是會按照block interval,將指定時間間隔的數(shù)據(jù),收集為一個block;默認(rèn)時間是200ms,官方推薦不要小于50ms;接著呢,會將指定batch interval時間間隔內(nèi)的block,合并為一個batch;創(chuàng)建為一個rdd,然后啟動一個job,去處理這個batch rdd中的數(shù)據(jù)

          batch rdd,它的partition數(shù)量是多少呢?一個batch有多少個block,就有多少個partition;就意味著并行度是多少;就意味著每個batch rdd有多少個task會并行計算和處理。

          當(dāng)然是希望可以比默認(rèn)的task數(shù)量和并行度再多一些了;可以手動調(diào)節(jié)block interval;減少block interval;每個batch可以包含更多的block;有更多的partition;也就有更多的task并行處理每個batch rdd。

          定死了,初始的rdd過來,直接就是固定的partition數(shù)量了


          3、inputStream.repartition(<number of partitions>):重分區(qū),增加每個batch rdd的partition數(shù)量

          有些時候,希望對某些dstream中的rdd進(jìn)行定制化的分區(qū)

          對dstream中的rdd進(jìn)行重分區(qū),去重分區(qū)成指定數(shù)量的分區(qū),這樣也可以提高指定dstream的rdd的計算并行度


          4、調(diào)節(jié)并行度

          spark.default.parallelismreduceByKey(numPartitions)


          5、使用Kryo序列化機(jī)制:

          spark streaming,也是有不少序列化的場景的

          提高序列化task發(fā)送到executor上執(zhí)行的性能,如果task很多的時候,task序列化和反序列化的性能開銷也比較可觀

          默認(rèn)輸入數(shù)據(jù)的存儲級別是StorageLevel.MEMORY_AND_DISK_SER_2,receiver接收到數(shù)據(jù),默認(rèn)就會進(jìn)行持久化操作;首先序列化數(shù)據(jù),存儲到內(nèi)存中;如果內(nèi)存資源不夠大,那么就寫入磁盤;而且,還會寫一份冗余副本到其他executor的block manager中,進(jìn)行數(shù)據(jù)冗余。


          6、batch interval:每個的處理時間必須小于batch interval

          實際上你的spark streaming跑起來以后,其實都是可以在spark ui上觀察它的運(yùn)行情況的;可以看到batch的處理時間;

          如果發(fā)現(xiàn)batch的處理時間大于batch interval,就必須調(diào)節(jié)batch interval

          盡量不要讓batch處理時間大于batch interval

          比如你的batch每隔5秒生成一次;你的batch處理時間要達(dá)到6秒;就會出現(xiàn),batch在你的內(nèi)存中日積月累,一直囤積著,沒法及時計算掉,釋放內(nèi)存空間;而且對內(nèi)存空間的占用越來越大,那么此時會導(dǎo)致內(nèi)存空間快速消耗

          如果發(fā)現(xiàn)batch處理時間比batch interval要大,就盡量將batch interval調(diào)節(jié)大一些

          瀏覽 57
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  豆花视频精品一区 | 91精品人妻少妇无码毛片91麻豆 | 成人做爰黄 片免费观看18 | 大香蕉伊人在线久久 | 最近中文字幕免费mv第一季歌词強上 |