<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          【技術難點】Spark性能調優(yōu)-RDD算子調優(yōu)篇

          共 8809字,需瀏覽 18分鐘

           ·

          2021-03-05 12:00


          Spark調優(yōu)之RDD算子調優(yōu)

          不廢話,直接進入正題!

          1. RDD復用

          在對RDD進行算子時,要避免相同的算子和計算邏輯之下對RDD進行重復的計算,如下圖所示:

          RDD的重復計算

          對上圖中的RDD計算架構進行修改,得到如下圖所示的優(yōu)化結果:

          RDD架構優(yōu)化

          2. 盡早filter

          獲取到初始RDD后,應該考慮盡早地過濾掉不需要的數據,進而減少對內存的占用,從而提升Spark作業(yè)的運行效率。

          3. 讀取大量小文件-用wholeTextFiles

          當我們將一個文本文件讀取為 RDD 時,輸入的每一行都會成為RDD的一個元素。

          也可以將多個完整的文本文件一次性讀取為一個pairRDD,其中鍵是文件名,值是文件內容。

          val input:RDD[String] = sc.textFile("dir/*.log"

          如果傳遞目錄,則將目錄下的所有文件讀取作為RDD。文件路徑支持通配符。

          但是這樣對于大量的小文件讀取效率并不高,應該使用 wholeTextFiles
          返回值為RDD[(String, String)],其中Key是文件的名稱,Value是文件的內容。

          def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])

          wholeTextFiles讀取小文件:

          val filesRDD: RDD[(String, String)] =
          sc.wholeTextFiles("D:\\data\\files", minPartitions = 3)
          val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\\r\\n"))
          val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))
          wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)

          4. mapPartition和foreachPartition

          • mapPartitions

          map(_….)  表示每一個元素

          mapPartitions(_….)  表示每個分區(qū)的數據組成的迭代器

          普通的map算子對RDD中的每一個元素進行操作,而mapPartitions算子對RDD中每一個分區(qū)進行操作。

          如果是普通的map算子,假設一個partition有1萬條數據,那么map算子中的function要執(zhí)行1萬次,也就是對每個元素進行操作。

          map 算子

          如果是mapPartition算子,由于一個task處理一個RDD的partition,那么一個task只會執(zhí)行一次function,function一次接收所有的partition數據,效率比較高。

          mapPartition 算子

          比如,當要把RDD中的所有數據通過JDBC寫入數據,如果使用map算子,那么需要對RDD中的每一個元素都創(chuàng)建一個數據庫連接,這樣對資源的消耗很大,如果使用mapPartitions算子,那么針對一個分區(qū)的數據,只需要建立一個數據庫連接。

          mapPartitions算子也存在一些缺點:對于普通的map操作,一次處理一條數據,如果在處理了2000條數據后內存不足,那么可以將已經處理完的2000條數據從內存中垃圾回收掉;但是如果使用mapPartitions算子,但數據量非常大時,function一次處理一個分區(qū)的數據,如果一旦內存不足,此時無法回收內存,就可能會OOM,即內存溢出。

          因此,mapPartitions算子適用于數據量不是特別大的時候,此時使用mapPartitions算子對性能的提升效果還是不錯的。(當數據量很大的時候,一旦使用mapPartitions算子,就會直接OOM)

          在項目中,應該首先估算一下RDD的數據量、每個partition的數據量,以及分配給每個Executor的內存資源,如果資源允許,可以考慮使用mapPartitions算子代替map。

          • foreachPartition

          rrd.foreache(_….) 表示每一個元素

          rrd.forPartitions(_….)  表示每個分區(qū)的數據組成的迭代器

          在生產環(huán)境中,通常使用foreachPartition算子來完成數據庫的寫入,通過foreachPartition算子的特性,可以優(yōu)化寫數據庫的性能。

          如果使用foreach算子完成數據庫的操作,由于foreach算子是遍歷RDD的每條數據,因此,每條數據都會建立一個數據庫連接,這是對資源的極大浪費,因此,對于寫數據庫操作,我們應當使用foreachPartition算子。

          與mapPartitions算子非常相似,foreachPartition是將RDD的每個分區(qū)作為遍歷對象,一次處理一個分區(qū)的數據,也就是說,如果涉及數據庫的相關操作,一個分區(qū)的數據只需要創(chuàng)建一次數據庫連接,如下圖所示:

          foreachPartition 算子

          使用了foreachPartition 算子后,可以獲得以下的性能提升:

          1. 對于我們寫的function函數,一次處理一整個分區(qū)的數據;

          2. 對于一個分區(qū)內的數據,創(chuàng)建唯一的數據庫連接;

          3. 只需要向數據庫發(fā)送一次SQL語句和多組參數;

          在生產環(huán)境中,全部都會使用foreachPartition算子完成數據庫操作。foreachPartition算子存在一個問題,與mapPartitions算子類似,如果一個分區(qū)的數據量特別大,可能會造成OOM,即內存溢出。

          5. filter+coalesce/repartition(減少分區(qū))

          在Spark任務中我們經常會使用filter算子完成RDD中數據的過濾,在任務初始階段,從各個分區(qū)中加載到的數據量是相近的,但是一旦進過filter過濾后,每個分區(qū)的數據量有可能會存在較大差異,如下圖所示:

          分區(qū)數據過濾結果

          根據上圖我們可以發(fā)現兩個問題:

          1. 每個partition的數據量變小了,如果還按照之前與partition相等的task個數去處理當前數據,有點浪費task的計算資源;

          2. 每個partition的數據量不一樣,會導致后面的每個task處理每個partition數據的時候,每個task要處理的數據量不同,這很有可能導致數據傾斜問題。

          如上圖所示,第二個分區(qū)的數據過濾后只剩100條,而第三個分區(qū)的數據過濾后剩下800條,在相同的處理邏輯下,第二個分區(qū)對應的task處理的數據量與第三個分區(qū)對應的task處理的數據量差距達到了8倍,這也會導致運行速度可能存在數倍的差距,這也就是數據傾斜問題

          針對上述的兩個問題,我們分別進行分析:

          1. 針對第一個問題,既然分區(qū)的數據量變小了,我們希望可以對分區(qū)數據進行重新分配,比如將原來4個分區(qū)的數據轉化到2個分區(qū)中,這樣只需要用后面的兩個task進行處理即可,避免了資源的浪費。

          2. 針對第二個問題,解決方法和第一個問題的解決方法非常相似,對分區(qū)數據重新分配,讓每個partition中的數據量差不多,這就避免了數據傾斜問題。

          那么具體應該如何實現上面的解決思路?我們需要coalesce算子。

          repartition與coalesce都可以用來進行重分區(qū),其中repartition只是coalesce接口中shuffle為true的簡易實現,coalesce默認情況下不進行shuffle,但是可以通過參數進行設置。

          假設我們希望將原本的分區(qū)個數A通過重新分區(qū)變?yōu)锽,那么有以下幾種情況:

          1. A > B(多數分區(qū)合并為少數分區(qū))

            • A與B相差值不大

              此時使用coalesce即可,無需shuffle過程。

            • A與B相差值很大

              此時可以使用coalesce并且不啟用shuffle過程,但是會導致合并過程性能低下,所以推薦設置coalesce的第二個參數為true,即啟動shuffle過程。

          2. A < B(少數分區(qū)分解為多數分區(qū))

          此時使用repartition即可,如果使用coalesce需要將shuffle設置為true,否則coalesce無效。

          我們可以在filter操作之后,使用coalesce算子針對每個partition的數據量各不相同的情況,壓縮partition的數量,而且讓每個partition的數據量盡量均勻緊湊,以便于后面的task進行計算操作,在某種程度上能夠在一定程度上提升性能

          注意:local模式是進程內模擬集群運行,已經對并行度和分區(qū)數量有了一定的內部優(yōu)化,因此不用去設置并行度和分區(qū)數量。

          6. 并行度設置

          Spark作業(yè)中的并行度指各個stage的task的數量。

          如果并行度設置不合理而導致并行度過低,會導致資源的極大浪費,例如,20個Executor,每個Executor分配3個CPU core,而Spark作業(yè)有40個task,這樣每個Executor分配到的task個數是2個,這就使得每個Executor有一個CPU core空閑,導致資源的浪費。

          理想的并行度設置,應該是讓并行度與資源相匹配,簡單來說就是在資源允許的前提下,并行度要設置的盡可能大,達到可以充分利用集群資源。合理的設置并行度,可以提升整個Spark作業(yè)的性能和運行速度。

          Spark官方推薦,task數量應該設置為Spark作業(yè)總CPU core數量的2~3倍。之所以沒有推薦task數量與CPU core總數相等,是因為task的執(zhí)行時間不同,有的task執(zhí)行速度快而有的task執(zhí)行速度慢,如果task數量與CPU core總數相等,那么執(zhí)行快的task執(zhí)行完成后,會出現CPU core空閑的情況。如果task數量設置為CPU core總數的2~3倍,那么一個task執(zhí)行完畢后,CPU core會立刻執(zhí)行下一個task,降低了資源的浪費,同時提升了Spark作業(yè)運行的效率。

          Spark作業(yè)并行度的設置如下:

          val conf = new SparkConf().set("spark.default.parallelism""500")

          原則:讓 cpu 的 core(cpu 核心數) 充分利用起來, 如有100個 core,那么并行度可以設置為200~300

          7. repartition/coalesce調節(jié)并行度

          Spark 中雖然可以設置并行度的調節(jié)策略,但是,并行度的設置對于Spark SQL是不生效的,用戶設置的并行度只對于Spark SQL以外的所有Spark的stage生效

          Spark SQL的并行度不允許用戶自己指定,Spark SQL自己會默認根據hive表對應的HDFS文件的split個數自動設置Spark SQL所在的那個stage的并行度,用戶自己通 spark.default.parallelism 參數指定的并行度,只會在沒Spark SQL的stage中生效。

          由于Spark SQL所在stage的并行度無法手動設置,如果數據量較大,并且此stage中后續(xù)的transformation操作有著復雜的業(yè)務邏輯,而Spark SQL自動設置的task數量很少,這就意味著每個task要處理為數不少的數據量,然后還要執(zhí)行非常復雜的處理邏輯,這就可能表現為第一個有Spark SQL的stage速度很慢,而后續(xù)的沒有Spark SQL的stage運行速度非???。

          為了解決Spark SQL無法設置并行度和task數量的問題,我們可以使用repartition算子。

          repartition 算子使用前后對比圖如下:

          repartition 算子使用前后對比圖

          Spark SQL這一步的并行度和task數量肯定是沒有辦法去改變了,但是,對于Spark SQL查詢出來的RDD,立即使用repartition算子,去重新進行分區(qū),這樣可以重新分區(qū)為多個partition,從repartition之后的RDD操作,由于不再涉及Spark SQL,因此stage的并行度就會等于你手動設置的值,這樣就避免了Spark SQL所在的stage只能用少量的task去處理大量數據并執(zhí)行復雜的算法邏輯。使用repartition算子的前后對比如上圖所示

          8. reduceByKey本地預聚合

          reduceByKey相較于普通的shuffle操作一個顯著的特點就是會進行map端的本地聚合,map端會先對本地的數據進行combine操作,然后將數據寫入給下個stage的每個task創(chuàng)建的文件中,也就是在map端,對每一個key對應的value,執(zhí)行reduceByKey算子函數。

          reduceByKey算子的執(zhí)行過程如下圖所示:

          reduceByKey 算子執(zhí)行過程

          使用reduceByKey對性能的提升如下:

          1. 本地聚合后,在map端的數據量變少,減少了磁盤IO,也減少了對磁盤空間的占用;

          2. 本地聚合后,下一個stage拉取的數據量變少,減少了網絡傳輸的數據量;

          3. 本地聚合后,在reduce端進行數據緩存的內存占用減少;

          4. 本地聚合后,在reduce端進行聚合的數據量減少。

          基于reduceByKey的本地聚合特征,我們應該考慮使用reduceByKey代替其他的shuffle算子,例如groupByKey。

          groupByKey與reduceByKey的運行原理如下圖1和圖2所示:

          圖1:groupByKey原理
          圖2:reduceByKey原理

          根據上圖可知,groupByKey不會進行map端的聚合,而是將所有map端的數據shuffle到reduce端,然后在reduce端進行數據的聚合操作。由于reduceByKey有map端聚合的特性,使得網絡傳輸的數據量減小,因此效率要明顯高于groupByKey。

          9. 使用持久化+checkpoint

          Spark持久化在大部分情況下是沒有問題的,但是有時數據可能會丟失,如果數據一旦丟失,就需要對丟失的數據重新進行計算,計算完后再緩存和使用,為了避免數據的丟失,可以選擇對這個RDD進行checkpoint,也就是將數據持久化一份到容錯的文件系統(tǒng)上(比如HDFS)。

          一個RDD緩存并checkpoint后,如果一旦發(fā)現緩存丟失,就會優(yōu)先查看checkpoint數據存不存在,如果有,就會使用checkpoint數據,而不用重新計算。也即是說,checkpoint可以視為cache的保障機制,如果cache失敗,就使用checkpoint的數據。

          使用checkpoint的優(yōu)點在于提高了Spark作業(yè)的可靠性,一旦緩存出現問題,不需要重新計算數據,缺點在于,checkpoint時需要將數據寫入HDFS等文件系統(tǒng),對性能的消耗較大。

          持久化設置如下:

          sc.setCheckpointDir(‘HDFS’)
          rdd.cache/persist(memory_and_disk)
          rdd.checkpoint

          10. 使用廣播變量

          默認情況下,task中的算子中如果使用了外部的變量,每個task都會獲取一份變量的復本,這就造成了內存的極大消耗。一方面,如果后續(xù)對RDD進行持久化,可能就無法將RDD數據存入內存,只能寫入磁盤,磁盤IO將會嚴重消耗性能;另一方面,task在創(chuàng)建對象的時候,也許會發(fā)現堆內存無法存放新創(chuàng)建的對象,這就會導致頻繁的GC,GC會導致工作線程停止,進而導致Spark暫停工作一段時間,嚴重影響Spark性能。

          假設當前任務配置了20個Executor,指定500個task,有一個20M的變量被所有task共用,此時會在500個task中產生500個副本,耗費集群10G的內存,如果使用了廣播變量, 那么每個Executor保存一個副本,一共消耗400M內存,內存消耗減少了5倍。

          廣播變量在每個Executor保存一個副本,此Executor的所有task共用此廣播變量,這讓變量產生的副本數量大大減少。

          在初始階段,廣播變量只在Driver中有一份副本。task在運行的時候,想要使用廣播變量中的數據,此時首先會在自己本地的Executor對應的BlockManager中嘗試獲取變量,如果本地沒有,BlockManager就會從Driver或者其他節(jié)點的BlockManager上遠程拉取變量的復本,并由本地的BlockManager進行管理;之后此Executor的所有task都會直接從本地的BlockManager中獲取變量。

          對于多個Task可能會共用的數據可以廣播到每個Executor上:

          val 廣播變量名= sc.broadcast(會被各個Task用到的變量,即需要廣播的變量)

          廣播變量名.value//獲取廣播變量

          11. 使用Kryo序列化

          默認情況下,Spark使用Java的序列化機制。Java的序列化機制使用方便,不需要額外的配置,在算子中使用的變量實現Serializable接口即可,但是,Java序列化機制的效率不高,序列化速度慢并且序列化后的數據所占用的空間依然較大。

          Spark官方宣稱Kryo序列化機制比Java序列化機制性能提高10倍左右,Spark之所以沒有默認使用Kryo作為序列化類庫,是因為它不支持所有對象的序列化,同時Kryo需要用戶在使用前注冊需要序列化的類型,不夠方便,但從Spark 2.0.0版本開始,簡單類型、簡單類型數組、字符串類型的Shuffling RDDs 已經默認使用Kryo序列化方式了

          Kryo序列化注冊方式的代碼如下:

          public class MyKryoRegistrator implements KryoRegistrator{
            @Override
            public void registerClasses(Kryo kryo){
              kryo.register(StartupReportLogs.class);
            }
          }

          配置Kryo序列化方式的代碼如下:

          //創(chuàng)建SparkConf對象
          val conf = new SparkConf().setMaster(…).setAppName(…)
          //使用Kryo序列化庫
          conf.set("spark.serializer""org.apache.spark.serializer.KryoSerializer");  
          //在Kryo序列化庫中注冊自定義的類集合
          conf.set("spark.kryo.registrator""bigdata.com.MyKryoRegistrator"); 


          --end--


          掃描下方二維碼
          添加好友,備注【交流
          可私聊交流,也可進資源豐富學習群


          更文不易,點個“在看”支持一下??

          瀏覽 36
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  熟妇xxxxxx | 天干天天干在线视频 | 韩国一区在线 | 久久免费黄色 | 鸡巴操骚逼 |