<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          萬字詳解 Spark開發(fā)調(diào)優(yōu)(建議收藏)

          共 13318字,需瀏覽 27分鐘

           ·

          2021-09-12 15:14

          1一、前言

          在大數(shù)據(jù)計算領(lǐng)域,Spark 已經(jīng)成為了越來越流行、越來越受歡迎的計算平臺之一。Spark 的功能涵蓋了大數(shù)據(jù)領(lǐng)域的離線批處理、SQL類處理、流式/實時計算、機器學(xué)習(xí)、圖計算等各種不同類型的計算操作,應(yīng)用范圍與前景非常廣泛。大多數(shù)同學(xué)(包括筆者在內(nèi)),最初開始嘗試使用Spark的原因很簡單,主要就是為了讓大數(shù)據(jù)計算作業(yè)的執(zhí)行速度更快、性能更高。

          然而,通過Spark開發(fā)出高性能的大數(shù)據(jù)計算作業(yè),并不是那么簡單的。如果沒有對Spark作業(yè)進行合理的調(diào)優(yōu),Spark作業(yè)的執(zhí)行速度可能會很慢,這樣就完全體現(xiàn)不出Spark作為一種快速大數(shù)據(jù)計算引擎的優(yōu)勢來。因此,想要用好Spark,就必須對其進行合理的性能優(yōu)化。

          Spark的性能調(diào)優(yōu)實際上是由很多部分組成的,不是調(diào)節(jié)幾個參數(shù)就可以立竿見影提升作業(yè)性能的。我們需要根據(jù)不同的業(yè)務(wù)場景以及數(shù)據(jù)情況,對Spark作業(yè)進行綜合性的分析,然后進行多個方面的調(diào)節(jié)和優(yōu)化,才能獲得最佳性能。

          本文作為Spark性能優(yōu)化指南的基礎(chǔ),主要講解開發(fā)調(diào)優(yōu)以及資源調(diào)優(yōu)。

          2二、開發(fā)調(diào)優(yōu)

          3三、調(diào)優(yōu)概述

          Spark性能優(yōu)化的第一步,就是要在開發(fā)Spark作業(yè)的過程中注意和應(yīng)用一些性能優(yōu)化的基本原則。開發(fā)調(diào)優(yōu),就是要讓大家了解以下一些Spark基本開發(fā)原則,包括:RDD lineage設(shè)計、算子的合理使用、特殊操作的優(yōu)化等。在開發(fā)過程中,時時刻刻都應(yīng)該注意以上原則,并將這些原則根據(jù)具體的業(yè)務(wù)以及實際的應(yīng)用場景,靈活地運用到自己的Spark作業(yè)中。

          4原則一:避免創(chuàng)建重復(fù)的RDD

          通常來說,我們在開發(fā)一個Spark作業(yè)時,首先是基于某個數(shù)據(jù)源(比如Hive表或HDFS文件)創(chuàng)建一個初始的RDD;接著對這個RDD執(zhí)行某個算子操作,然后得到下一個RDD;以此類推,循環(huán)往復(fù),直到計算出最終我們需要的結(jié)果。在這個過程中,多個RDD會通過不同的算子操作(比如map、reduce等)串起來,這個“RDD串”,就是RDD lineage,也就是“RDD的血緣關(guān)系鏈”。

          我們在開發(fā)過程中要注意:對于同一份數(shù)據(jù),只應(yīng)該創(chuàng)建一個RDD,不能創(chuàng)建多個RDD來代表同一份數(shù)據(jù)。

          一些Spark初學(xué)者在剛開始開發(fā)Spark作業(yè)時,或者是有經(jīng)驗的工程師在開發(fā)RDD lineage極其冗長的Spark作業(yè)時,可能會忘了自己之前對于某一份數(shù)據(jù)已經(jīng)創(chuàng)建過一個RDD了,從而導(dǎo)致對于同一份數(shù)據(jù),創(chuàng)建了多個RDD。這就意味著,我們的Spark作業(yè)會進行多次重復(fù)計算來創(chuàng)建多個代表相同數(shù)據(jù)的RDD,進而增加了作業(yè)的性能開銷。

          一個簡單的例子

          // 需要對名為“hello.txt”的HDFS文件進行一次map操作,再進行一次reduce操作。也就是說,需要對一份數(shù)據(jù)執(zhí)行兩次算子操作。

          // 錯誤的做法:對于同一份數(shù)據(jù)執(zhí)行多次算子操作時,創(chuàng)建多個RDD。
          // 這里執(zhí)行了兩次textFile方法,針對同一個HDFS文件,創(chuàng)建了兩個RDD出來,然后分別對每個RDD都執(zhí)行了一個算子操作。
          // 這種情況下,Spark需要從HDFS上兩次加載hello.txt文件的內(nèi)容,并創(chuàng)建兩個單獨的RDD;第二次加載HDFS文件以及創(chuàng)建RDD的性能開銷,很明顯是白白浪費掉的。
          val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
          rdd1.map(...)
          val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
          rdd2.reduce(...)

          // 正確的用法:對于一份數(shù)據(jù)執(zhí)行多次算子操作時,只使用一個RDD。
          // 這種寫法很明顯比上一種寫法要好多了,因為我們對于同一份數(shù)據(jù)只創(chuàng)建了一個RDD,然后對這一個RDD執(zhí)行了多次算子操作。
          // 但是要注意到這里為止優(yōu)化還沒有結(jié)束,由于rdd1被執(zhí)行了兩次算子操作,第二次執(zhí)行reduce操作的時候,還會再次從源頭處重新計算一次rdd1的數(shù)據(jù),因此還是會有重復(fù)計算的性能開銷。
          // 要徹底解決這個問題,必須結(jié)合“原則三:對多次使用的RDD進行持久化”,才能保證一個RDD被多次使用時只被計算一次。
          val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
          rdd1.map(...)
          rdd1.reduce(...)

          5原則二:盡可能復(fù)用同一個RDD

          除了要避免在開發(fā)過程中對一份完全相同的數(shù)據(jù)創(chuàng)建多個RDD之外,在對不同的數(shù)據(jù)執(zhí)行算子操作時還要盡可能地復(fù)用一個RDD。比如說,有一個RDD的數(shù)據(jù)格式是key-value類型的,另一個是單value類型的,這兩個RDD的value數(shù)據(jù)是完全一樣的。那么此時我們可以只使用key-value類型的那個RDD,因為其中已經(jīng)包含了另一個的數(shù)據(jù)。對于類似這種多個RDD的數(shù)據(jù)有重疊或者包含的情況,我們應(yīng)該盡量復(fù)用一個RDD,這樣可以盡可能地減少RDD的數(shù)量,從而盡可能減少算子執(zhí)行的次數(shù)。

          一個簡單的例子

          // 錯誤的做法。

          // 有一個<Long, String>格式的RDD,即rdd1。
          // 接著由于業(yè)務(wù)需要,對rdd1執(zhí)行了一個map操作,創(chuàng)建了一個rdd2,而rdd2中的數(shù)據(jù)僅僅是rdd1中的value值而已,也就是說,rdd2是rdd1的子集。
          JavaPairRDD<Long, String> rdd1 = ...
          JavaRDD<String> rdd2 = rdd1.map(...)

          // 分別對rdd1和rdd2執(zhí)行了不同的算子操作。
          rdd1.reduceByKey(...)
          rdd2.map(...)

          // 正確的做法。

          // 上面這個case中,其實rdd1和rdd2的區(qū)別無非就是數(shù)據(jù)格式不同而已,rdd2的數(shù)據(jù)完全就是rdd1的子集而已,卻創(chuàng)建了兩個rdd,并對兩個rdd都執(zhí)行了一次算子操作。
          // 此時會因為對rdd1執(zhí)行map算子來創(chuàng)建rdd2,而多執(zhí)行一次算子操作,進而增加性能開銷。

          // 其實在這種情況下完全可以復(fù)用同一個RDD。
          // 我們可以使用rdd1,既做reduceByKey操作,也做map操作。
          // 在進行第二個map操作時,只使用每個數(shù)據(jù)的tuple._2,也就是rdd1中的value值,即可。
          JavaPairRDD<Long, String> rdd1 = ...
          rdd1.reduceByKey(...)
          rdd1.map(tuple._2...)

          // 第二種方式相較于第一種方式而言,很明顯減少了一次rdd2的計算開銷。
          // 但是到這里為止,優(yōu)化還沒有結(jié)束,對rdd1我們還是執(zhí)行了兩次算子操作,rdd1實際上還是會被計算兩次。
          // 因此還需要配合“原則三:對多次使用的RDD進行持久化”進行使用,才能保證一個RDD被多次使用時只被計算一次。

          6原則三:對多次使用的RDD進行持久化

          當(dāng)你在Spark代碼中多次對一個RDD做了算子操作后,恭喜,你已經(jīng)實現(xiàn)Spark作業(yè)第一步的優(yōu)化了,也就是盡可能復(fù)用RDD。此時就該在這個基礎(chǔ)之上,進行第二步優(yōu)化了,也就是要保證對一個RDD執(zhí)行多次算子操作時,這個RDD本身僅僅被計算一次。

          Spark中對于一個RDD執(zhí)行多次算子的默認(rèn)原理是這樣的:每次你對一個RDD執(zhí)行一個算子操作時,都會重新從源頭處計算一遍,計算出那個RDD來,然后再對這個RDD執(zhí)行你的算子操作。這種方式的性能是很差的。

          因此對于這種情況,我們的建議是:對多次使用的RDD進行持久化。此時Spark就會根據(jù)你的持久化策略,將RDD中的數(shù)據(jù)保存到內(nèi)存或者磁盤中。以后每次對這個RDD進行算子操作時,都會直接從內(nèi)存或磁盤中提取持久化的RDD數(shù)據(jù),然后執(zhí)行算子,而不會從源頭處重新計算一遍這個RDD,再執(zhí)行算子操作。

          對多次使用的RDD進行持久化的代碼示例

          // 如果要對一個RDD進行持久化,只要對這個RDD調(diào)用cache()和persist()即可。

          // 正確的做法。
          // cache()方法表示:使用非序列化的方式將RDD中的數(shù)據(jù)全部嘗試持久化到內(nèi)存中。
          // 此時再對rdd1執(zhí)行兩次算子操作時,只有在第一次執(zhí)行map算子時,才會將這個rdd1從源頭處計算一次。
          // 第二次執(zhí)行reduce算子時,就會直接從內(nèi)存中提取數(shù)據(jù)進行計算,不會重復(fù)計算一個rdd。
          val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
          rdd1.map(...)
          rdd1.reduce(...)

          // persist()方法表示:手動選擇持久化級別,并使用指定的方式進行持久化。
          // 比如說,StorageLevel.MEMORY_AND_DISK_SER表示,內(nèi)存充足時優(yōu)先持久化到內(nèi)存中,內(nèi)存不充足時持久化到磁盤文件中。
          // 而且其中的_SER后綴表示,使用序列化的方式來保存RDD數(shù)據(jù),此時RDD中的每個partition都會序列化成一個大的字節(jié)數(shù)組,然后再持久化到內(nèi)存或磁盤中。
          // 序列化的方式可以減少持久化的數(shù)據(jù)對內(nèi)存/磁盤的占用量,進而避免內(nèi)存被持久化數(shù)據(jù)占用過多,從而發(fā)生頻繁GC。
          val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER)
          rdd1.map(...)
          rdd1.reduce(...)

          對于persist()方法而言,我們可以根據(jù)不同的業(yè)務(wù)場景選擇不同的持久化級別。

          Spark的持久化級別

          持久化級別含義解釋
          MEMORY_ONLY使用未序列化的Java對象格式,將數(shù)據(jù)保存在內(nèi)存中。如果內(nèi)存不夠存放所有的數(shù)據(jù),則數(shù)據(jù)可能就不會進行持久化。那么下次對這個RDD執(zhí)行算子操作時,那些沒有被持久化的數(shù)據(jù),需要從源頭處重新計算一遍。這是默認(rèn)的持久化策略,使用cache()方法時,實際就是使用的這種持久化策略。
          MEMORY_AND_DISK使用未序列化的Java對象格式,優(yōu)先嘗試將數(shù)據(jù)保存在內(nèi)存中。如果內(nèi)存不夠存放所有的數(shù)據(jù),會將數(shù)據(jù)寫入磁盤文件中,下次對這個RDD執(zhí)行算子時,持久化在磁盤文件中的數(shù)據(jù)會被讀取出來使用。
          MEMORY_ONLY_SER基本含義同MEMORY_ONLY。唯一的區(qū)別是,會將RDD中的數(shù)據(jù)進行序列化,RDD的每個partition會被序列化成一個字節(jié)數(shù)組。這種方式更加節(jié)省內(nèi)存,從而可以避免持久化的數(shù)據(jù)占用過多內(nèi)存導(dǎo)致頻繁GC。
          MEMORY_AND_DISK_SER基本含義同MEMORY_AND_DISK。唯一的區(qū)別是,會將RDD中的數(shù)據(jù)進行序列化,RDD的每個partition會被序列化成一個字節(jié)數(shù)組。這種方式更加節(jié)省內(nèi)存,從而可以避免持久化的數(shù)據(jù)占用過多內(nèi)存導(dǎo)致頻繁GC。
          DISK_ONLY使用未序列化的Java對象格式,將數(shù)據(jù)全部寫入磁盤文件中。
          MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等.對于上述任意一種持久化策略,如果加上后綴_2,代表的是將每個持久化的數(shù)據(jù),都復(fù)制一份副本,并將副本保存到其他節(jié)點上。這種基于副本的持久化機制主要用于進行容錯。假如某個節(jié)點掛掉,節(jié)點的內(nèi)存或磁盤中的持久化數(shù)據(jù)丟失了,那么后續(xù)對RDD計算時還可以使用該數(shù)據(jù)在其他節(jié)點上的副本。如果沒有副本的話,就只能將這些數(shù)據(jù)從源頭處重新計算一遍了。

          如何選擇一種最合適的持久化策略

          • 默認(rèn)情況下,性能最高的當(dāng)然是MEMORY_ONLY,但前提是你的內(nèi)存必須足夠足夠大,可以綽綽有余地存放下整個RDD的所有數(shù)據(jù)。因為不進行序列化與反序列化操作,就避免了這部分的性能開銷;對這個RDD的后續(xù)算子操作,都是基于純內(nèi)存中的數(shù)據(jù)的操作,不需要從磁盤文件中讀取數(shù)據(jù),性能也很高;而且不需要復(fù)制一份數(shù)據(jù)副本,并遠程傳送到其他節(jié)點上。但是這里必須要注意的是,在實際的生產(chǎn)環(huán)境中,恐怕能夠直接用這種策略的場景還是有限的,如果RDD中數(shù)據(jù)比較多時(比如幾十億),直接用這種持久化級別,會導(dǎo)致JVM的OOM內(nèi)存溢出異常。

          • 如果使用MEMORY_ONLY級別時發(fā)生了內(nèi)存溢出,那么建議嘗試使用MEMORY_ONLY_SER級別。該級別會將RDD數(shù)據(jù)序列化后再保存在內(nèi)存中,此時每個partition僅僅是一個字節(jié)數(shù)組而已,大大減少了對象數(shù)量,并降低了內(nèi)存占用。這種級別比MEMORY_ONLY多出來的性能開銷,主要就是序列化與反序列化的開銷。但是后續(xù)算子可以基于純內(nèi)存進行操作,因此性能總體還是比較高的。此外,可能發(fā)生的問題同上,如果RDD中的數(shù)據(jù)量過多的話,還是可能會導(dǎo)致OOM內(nèi)存溢出的異常。

          • 如果純內(nèi)存的級別都無法使用,那么建議使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因為既然到了這一步,就說明RDD的數(shù)據(jù)量很大,內(nèi)存無法完全放下。序列化后的數(shù)據(jù)比較少,可以節(jié)省內(nèi)存和磁盤的空間開銷。同時該策略會優(yōu)先盡量嘗試將數(shù)據(jù)緩存在內(nèi)存中,內(nèi)存緩存不下才會寫入磁盤。

          • 通常不建議使用DISK_ONLY和后綴為_2的級別:因為完全基于磁盤文件進行數(shù)據(jù)的讀寫,會導(dǎo)致性能急劇降低,有時還不如重新計算一次所有RDD。后綴為_2的級別,必須將所有數(shù)據(jù)都復(fù)制一份副本,并發(fā)送到其他節(jié)點上,數(shù)據(jù)復(fù)制以及網(wǎng)絡(luò)傳輸會導(dǎo)致較大的性能開銷,除非是要求作業(yè)的高可用性,否則不建議使用。

          7原則四:盡量避免使用shuffle類算子

          如果有可能的話,要盡量避免使用shuffle類算子。因為Spark作業(yè)運行過程中,最消耗性能的地方就是shuffle過程。shuffle過程,簡單來說,就是將分布在集群中多個節(jié)點上的同一個key,拉取到同一個節(jié)點上,進行聚合或join等操作。比如reduceByKey、join等算子,都會觸發(fā)shuffle操作。

          shuffle過程中,各個節(jié)點上的相同key都會先寫入本地磁盤文件中,然后其他節(jié)點需要通過網(wǎng)絡(luò)傳輸拉取各個節(jié)點上的磁盤文件中的相同key。而且相同key都拉取到同一個節(jié)點進行聚合操作時,還有可能會因為一個節(jié)點上處理的key過多,導(dǎo)致內(nèi)存不夠存放,進而溢寫到磁盤文件中。因此在shuffle過程中,可能會發(fā)生大量的磁盤文件讀寫的IO操作,以及數(shù)據(jù)的網(wǎng)絡(luò)傳輸操作。磁盤IO和網(wǎng)絡(luò)數(shù)據(jù)傳輸也是shuffle性能較差的主要原因。

          因此在我們的開發(fā)過程中,能避免則盡可能避免使用reduceByKey、join、distinct、repartition等會進行shuffle的算子,盡量使用map類的非shuffle算子。這樣的話,沒有shuffle操作或者僅有較少shuffle操作的Spark作業(yè),可以大大減少性能開銷。

          Broadcast與map進行join代碼示例

          // 傳統(tǒng)的join操作會導(dǎo)致shuffle操作。
          // 因為兩個RDD中,相同的key都需要通過網(wǎng)絡(luò)拉取到一個節(jié)點上,由一個task進行join操作。
          val rdd3 = rdd1.join(rdd2)

          // Broadcast+map的join操作,不會導(dǎo)致shuffle操作。
          // 使用Broadcast將一個數(shù)據(jù)量較小的RDD作為廣播變量。
          val rdd2Data = rdd2.collect()
          val rdd2DataBroadcast = sc.broadcast(rdd2Data)

          // 在rdd1.map算子中,可以從rdd2DataBroadcast中,獲取rdd2的所有數(shù)據(jù)。
          // 然后進行遍歷,如果發(fā)現(xiàn)rdd2中某條數(shù)據(jù)的key與rdd1的當(dāng)前數(shù)據(jù)的key是相同的,那么就判定可以進行join。
          // 此時就可以根據(jù)自己需要的方式,將rdd1當(dāng)前數(shù)據(jù)與rdd2中可以連接的數(shù)據(jù),拼接在一起(String或Tuple)。
          val rdd3 = rdd1.map(rdd2DataBroadcast...)

          // 注意,以上操作,建議僅僅在rdd2的數(shù)據(jù)量比較少(比如幾百M,或者一兩G)的情況下使用。
          // 因為每個Executor的內(nèi)存中,都會駐留一份rdd2的全量數(shù)據(jù)。

          8原則五:使用map-side預(yù)聚合的shuffle操作

          如果因為業(yè)務(wù)需要,一定要使用shuffle操作,無法用map類的算子來替代,那么盡量使用可以map-side預(yù)聚合的算子。

          所謂的map-side預(yù)聚合,說的是在每個節(jié)點本地對相同的key進行一次聚合操作,類似于MapReduce中的本地combiner。map-side預(yù)聚合之后,每個節(jié)點本地就只會有一條相同的key,因為多條相同的key都被聚合起來了。其他節(jié)點在拉取所有節(jié)點上的相同key時,就會大大減少需要拉取的數(shù)據(jù)數(shù)量,從而也就減少了磁盤IO以及網(wǎng)絡(luò)傳輸開銷。通常來說,在可能的情況下,建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子。因為reduceByKey和aggregateByKey算子都會使用用戶自定義的函數(shù)對每個節(jié)點本地的相同key進行預(yù)聚合。而groupByKey算子是不會進行預(yù)聚合的,全量的數(shù)據(jù)會在集群的各個節(jié)點之間分發(fā)和傳輸,性能相對來說比較差。

          比如如下兩幅圖,就是典型的例子,分別基于reduceByKey和groupByKey進行單詞計數(shù)。其中第一張圖是groupByKey的原理圖,可以看到,沒有進行任何本地聚合時,所有數(shù)據(jù)都會在集群節(jié)點之間傳輸;第二張圖是reduceByKey的原理圖,可以看到,每個節(jié)點本地的相同key數(shù)據(jù),都進行了預(yù)聚合,然后才傳輸?shù)狡渌?jié)點上進行全局聚合。

          9原則六:使用高性能的算子

          除了shuffle相關(guān)的算子有優(yōu)化原則之外,其他的算子也都有著相應(yīng)的優(yōu)化原則。

          使用reduceByKey/aggregateByKey替代groupByKey

          詳情見“原則五:使用map-side預(yù)聚合的shuffle操作”。

          使用mapPartitions替代普通map

          mapPartitions類的算子,一次函數(shù)調(diào)用會處理一個partition所有的數(shù)據(jù),而不是一次函數(shù)調(diào)用處理一條,性能相對來說會高一些。但是有的時候,使用mapPartitions會出現(xiàn)OOM(內(nèi)存溢出)的問題。因為單次函數(shù)調(diào)用就要處理掉一個partition所有的數(shù)據(jù),如果內(nèi)存不夠,垃圾回收時是無法回收掉太多對象的,很可能出現(xiàn)OOM異常。所以使用這類操作時要慎重!

          使用foreachPartitions替代foreach

          原理類似于“使用mapPartitions替代map”,也是一次函數(shù)調(diào)用處理一個partition的所有數(shù)據(jù),而不是一次函數(shù)調(diào)用處理一條數(shù)據(jù)。在實踐中發(fā)現(xiàn),foreachPartitions類的算子,對性能的提升還是很有幫助的。比如在foreach函數(shù)中,將RDD中所有數(shù)據(jù)寫MySQL,那么如果是普通的foreach算子,就會一條數(shù)據(jù)一條數(shù)據(jù)地寫,每次函數(shù)調(diào)用可能就會創(chuàng)建一個數(shù)據(jù)庫連接,此時就勢必會頻繁地創(chuàng)建和銷毀數(shù)據(jù)庫連接,性能是非常低下;但是如果用foreachPartitions算子一次性處理一個partition的數(shù)據(jù),那么對于每個partition,只要創(chuàng)建一個數(shù)據(jù)庫連接即可,然后執(zhí)行批量插入操作,此時性能是比較高的。實踐中發(fā)現(xiàn),對于1萬條左右的數(shù)據(jù)量寫MySQL,性能可以提升30%以上。

          使用filter之后進行coalesce操作

          通常對一個RDD執(zhí)行filter算子過濾掉RDD中較多數(shù)據(jù)后(比如30%以上的數(shù)據(jù)),建議使用coalesce算子,手動減少RDD的partition數(shù)量,將RDD中的數(shù)據(jù)壓縮到更少的partition中去。因為filter之后,RDD的每個partition中都會有很多數(shù)據(jù)被過濾掉,此時如果照常進行后續(xù)的計算,其實每個task處理的partition中的數(shù)據(jù)量并不是很多,有一點資源浪費,而且此時處理的task越多,可能速度反而越慢。因此用coalesce減少partition數(shù)量,將RDD中的數(shù)據(jù)壓縮到更少的partition之后,只要使用更少的task即可處理完所有的partition。在某些場景下,對于性能的提升會有一定的幫助。

          使用repartitionAndSortWithinPartitions替代repartition與sort類操作

          repartitionAndSortWithinPartitions是Spark官網(wǎng)推薦的一個算子,官方建議,如果需要在repartition重分區(qū)之后,還要進行排序,建議直接使用repartitionAndSortWithinPartitions算子。因為該算子可以一邊進行重分區(qū)的shuffle操作,一邊進行排序。shuffle與sort兩個操作同時進行,比先shuffle再sort來說,性能可能是要高的。

          10原則七:廣播大變量

          有時在開發(fā)過程中,會遇到需要在算子函數(shù)中使用外部變量的場景(尤其是大變量,比如100M以上的大集合),那么此時就應(yīng)該使用Spark的廣播(Broadcast)功能來提升性能。

          在算子函數(shù)中使用到外部變量時,默認(rèn)情況下,Spark會將該變量復(fù)制多個副本,通過網(wǎng)絡(luò)傳輸?shù)絫ask中,此時每個task都有一個變量副本。如果變量本身比較大的話(比如100M,甚至1G),那么大量的變量副本在網(wǎng)絡(luò)中傳輸?shù)男阅荛_銷,以及在各個節(jié)點的Executor中占用過多內(nèi)存導(dǎo)致的頻繁GC,都會極大地影響性能。

          因此對于上述情況,如果使用的外部變量比較大,建議使用Spark的廣播功能,對該變量進行廣播。廣播后的變量,會保證每個Executor的內(nèi)存中,只駐留一份變量副本,而Executor中的task執(zhí)行時共享該Executor中的那份變量副本。這樣的話,可以大大減少變量副本的數(shù)量,從而減少網(wǎng)絡(luò)傳輸?shù)男阅荛_銷,并減少對Executor內(nèi)存的占用開銷,降低GC的頻率。

          廣播大變量的代碼示例

          // 以下代碼在算子函數(shù)中,使用了外部的變量。
          // 此時沒有做任何特殊操作,每個task都會有一份list1的副本。
          val list1 = ...
          rdd1.map(list1...)

          // 以下代碼將list1封裝成了Broadcast類型的廣播變量。
          // 在算子函數(shù)中,使用廣播變量時,首先會判斷當(dāng)前task所在Executor內(nèi)存中,是否有變量副本。
          // 如果有則直接使用;如果沒有則從Driver或者其他Executor節(jié)點上遠程拉取一份放到本地Executor內(nèi)存中。
          // 每個Executor內(nèi)存中,就只會駐留一份廣播變量副本。
          val list1 = ...
          val list1Broadcast = sc.broadcast(list1)
          rdd1.map(list1Broadcast...)

          11原則八:使用Kryo優(yōu)化序列化性能

          在Spark中,主要有三個地方涉及到了序列化:

          • 在算子函數(shù)中使用到外部變量時,該變量會被序列化后進行網(wǎng)絡(luò)傳輸(見“原則七:廣播大變量”中的講解)。
          • 將自定義的類型作為RDD的泛型類型時(比如JavaRDD,Student是自定義類型),所有自定義類型對象,都會進行序列化。因此這種情況下,也要求自定義的類必須實現(xiàn)Serializable接口。
          • 使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個partition都序列化成一個大的字節(jié)數(shù)組。

          對于這三種出現(xiàn)序列化的地方,我們都可以通過使用Kryo序列化類庫,來優(yōu)化序列化和反序列化的性能。Spark默認(rèn)使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。但是Spark同時支持使用Kryo序列化庫,Kryo序列化類庫的性能比Java序列化類庫的性能要高很多。官方介紹,Kryo序列化機制比Java序列化機制,性能高10倍左右。Spark之所以默認(rèn)沒有使用Kryo作為序列化類庫,是因為Kryo要求最好要注冊所有需要進行序列化的自定義類型,因此對于開發(fā)者來說,這種方式比較麻煩。

          以下是使用Kryo的代碼示例,我們只要設(shè)置序列化類,再注冊要序列化的自定義類型即可(比如算子函數(shù)中使用到的外部變量類型、作為RDD泛型類型的自定義類型等):

          // 創(chuàng)建SparkConf對象。
          val conf = new SparkConf().setMaster(...).setAppName(...)
          // 設(shè)置序列化器為KryoSerializer。
          conf.set("spark.serializer""org.apache.spark.serializer.KryoSerializer")
          // 注冊要序列化的自定義類型。
          conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

          12原則九:優(yōu)化數(shù)據(jù)結(jié)構(gòu)

          Java中,有三種類型比較耗費內(nèi)存:

          • 對象,每個Java對象都有對象頭、引用等額外的信息,因此比較占用內(nèi)存空間。
          • 字符串,每個字符串內(nèi)部都有一個字符數(shù)組以及長度等額外信息。
          • 集合類型,比如HashMap、LinkedList等,因為集合類型內(nèi)部通常會使用一些內(nèi)部類來封裝集合元素,比如Map.Entry。

          因此Spark官方建議,在Spark編碼實現(xiàn)中,特別是對于算子函數(shù)中的代碼,盡量不要使用上述三種數(shù)據(jù)結(jié)構(gòu),盡量使用字符串替代對象,使用原始類型(比如Int、Long)替代字符串,使用數(shù)組替代集合類型,這樣盡可能地減少內(nèi)存占用,從而降低GC頻率,提升性能。

          但是在筆者的編碼實踐中發(fā)現(xiàn),要做到該原則其實并不容易。因為我們同時要考慮到代碼的可維護性,如果一個代碼中,完全沒有任何對象抽象,全部是字符串拼接的方式,那么對于后續(xù)的代碼維護和修改,無疑是一場巨大的災(zāi)難。同理,如果所有操作都基于數(shù)組實現(xiàn),而不使用HashMap、LinkedList等集合類型,那么對于我們的編碼難度以及代碼可維護性,也是一個極大的挑戰(zhàn)。因此筆者建議,在可能以及合適的情況下,使用占用內(nèi)存較少的數(shù)據(jù)結(jié)構(gòu),但是前提是要保證代碼的可維護性。

          13原則十:Data Locality本地化級別

          PROCESS_LOCAL:進程本地化,代碼和數(shù)據(jù)在同一個進程中,也就是在同一個executor中;計算數(shù)據(jù)的task由executor執(zhí)行,數(shù)據(jù)在executor的BlockManager中;性能最好

          NODE_LOCAL:節(jié)點本地化,代碼和數(shù)據(jù)在同一個節(jié)點中;比如說,數(shù)據(jù)作為一個HDFS block塊,就在節(jié)點上,而task在節(jié)點上某個executor中運行;或者是,數(shù)據(jù)和task在一個節(jié)點上的不同executor中;數(shù)據(jù)需要在進程間進行傳輸NO_PREF:對于task來說,數(shù)據(jù)從哪里獲取都一樣,沒有好壞之分RACK_LOCAL:機架本地化,數(shù)據(jù)和task在一個機架的兩個節(jié)點上;數(shù)據(jù)需要通過網(wǎng)絡(luò)在節(jié)點之間進行傳輸ANY:數(shù)據(jù)和task可能在集群中的任何地方,而且不在一個機架中,性能最差

          spark.locality.wait,默認(rèn)是3s

          Spark在Driver上,對Application的每一個stage的task,進行分配之前,都會計算出每個task要計算的是哪個分片數(shù)據(jù),RDD的某個partition;Spark的task分配算法,優(yōu)先,會希望每個task正好分配到它要計算的數(shù)據(jù)所在的節(jié)點,這樣的話,就不用在網(wǎng)絡(luò)間傳輸數(shù)據(jù);

          但是可能task沒有機會分配到它的數(shù)據(jù)所在的節(jié)點,因為可能那個節(jié)點的計算資源和計算能力都滿了;所以呢,這種時候,通常來說,Spark會等待一段時間,默認(rèn)情況下是3s鐘(不是絕對的,還有很多種情況,對不同的本地化級別,都會去等待),到最后,實在是等待不了了,就會選擇一個比較差的本地化級別,比如說,將task分配到靠它要計算的數(shù)據(jù)所在節(jié)點,比較近的一個節(jié)點,然后進行計算。

          但是對于第二種情況,通常來說,肯定是要發(fā)生數(shù)據(jù)傳輸,task會通過其所在節(jié)點的BlockManager來獲取數(shù)據(jù),BlockManager發(fā)現(xiàn)自己本地沒有數(shù)據(jù),會通過一個getRemote()方法,通過TransferService(網(wǎng)絡(luò)數(shù)據(jù)傳輸組件)從數(shù)據(jù)所在節(jié)點的BlockManager中,獲取數(shù)據(jù),通過網(wǎng)絡(luò)傳輸回task所在節(jié)點。

          對于我們來說,當(dāng)然不希望是類似于第二種情況的了。最好的,當(dāng)然是task和數(shù)據(jù)在一個節(jié)點上,直接從本地executor的BlockManager中獲取數(shù)據(jù),純內(nèi)存,或者帶一點磁盤IO;如果要通過網(wǎng)絡(luò)傳輸數(shù)據(jù)的話,那么實在是,性能肯定會下降的,大量網(wǎng)絡(luò)傳輸,以及磁盤IO,都是性能的殺手。

          什么時候要調(diào)節(jié)這個參數(shù)?

          觀察日志,spark作業(yè)的運行日志,推薦大家在測試的時候,先用client模式,在本地就直接可以看到比較全的日志。日志里面會顯示,starting task,PROCESS LOCAL、NODE LOCAL,觀察大部分task的數(shù)據(jù)本地化級別。

          如果大多都是PROCESS_LOCAL,那就不用調(diào)節(jié)了,如果是發(fā)現(xiàn),好多的級別都是NODE_LOCAL、ANY,那么最好就去調(diào)節(jié)一下數(shù)據(jù)本地化的等待時長調(diào)節(jié)完,應(yīng)該是要反復(fù)調(diào)節(jié),每次調(diào)節(jié)完以后,再來運行,觀察日志 看看大部分的task的本地化級別有沒有提升;看看,整個spark作業(yè)的運行時間有沒有縮短

          但是注意別本末倒置,本地化級別倒是提升了,但是因為大量的等待時長,spark作業(yè)的運行時間反而增加了,那就還是不要調(diào)節(jié)了。

          spark.locality.wait,默認(rèn)是3s;可以改成6s,10s

          默認(rèn)情況下,下面3個的等待時長,都是跟上面那個是一樣的,都是3s。

          spark.locality.wait.process//建議60s
          spark.locality.wait.node//建議30s
          spark.locality.wait.rack//建議20s


          資源獲取 獲取Flink面試題,Spark面試題,程序員必備軟件,hive面試題,Hadoop面試題,Docker面試題,簡歷模板等資源請去 GitHub自行下載 https://github.com/lhh2002/Framework-Of-BigData Gitee 自行下載  https://gitee.com/li_hey_hey/dashboard/projects

          推薦閱讀:

          世界的真實格局分析,地球人類社會底層運行原理

          不是你需要中臺,而是一名合格的架構(gòu)師(附各大廠中臺建設(shè)PPT)

          企業(yè)IT技術(shù)架構(gòu)規(guī)劃方案

          論數(shù)字化轉(zhuǎn)型——轉(zhuǎn)什么,如何轉(zhuǎn)?

          華為干部與人才發(fā)展手冊(附PPT)

          企業(yè)10大管理流程圖,數(shù)字化轉(zhuǎn)型從業(yè)者必備!

          【中臺實踐】華為大數(shù)據(jù)中臺架構(gòu)分享.pdf

          華為的數(shù)字化轉(zhuǎn)型方法論

          華為如何實施數(shù)字化轉(zhuǎn)型(附PPT)

          超詳細280頁Docker實戰(zhàn)文檔!開放下載

          華為大數(shù)據(jù)解決方案(PPT)

          瀏覽 41
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  人人操天天射 | 香蕉视频日本免费色老板 | 欧美群妇大交群实录 | 色五月无码| 91无码内射 |