Spark面試八股文(上萬字面試必備寶典)
1. Spark 的運行流程?

具體運行流程如下:
SparkContext 向資源管理器注冊并向資源管理器申請運行 Executor
資源管理器分配 Executor,然后資源管理器啟動 Executor
Executor 發(fā)送心跳至資源管理器
SparkContext 構建 DAG 有向無環(huán)圖
將 DAG 分解成 Stage(TaskSet)
把 Stage 發(fā)送給 TaskScheduler
Executor 向 SparkContext 申請 Task
TaskScheduler 將 Task 發(fā)送給 Executor 運行
同時 SparkContext 將應用程序代碼發(fā)放給 Executor
Task 在 Executor 上運行,運行完畢釋放所有資源
2. Spark 有哪些組件?
master:管理集群和節(jié)點,不參與計算。
worker:計算節(jié)點,進程本身不參與計算,和 master 匯報。
Driver:運行程序的 main 方法,創(chuàng)建 spark context 對象。
spark context:控制整個 application 的生命周期,包括 dagsheduler 和 task scheduler 等組件。
client:用戶提交程序的入口。
3. Spark 中的 RDD 機制理解嗎?
rdd 分布式彈性數據集,簡單的理解成一種數據結構,是 spark 框架上的通用貨幣。所有算子都是基于 rdd 來執(zhí)行的,不同的場景會有不同的 rdd 實現類,但是都可以進行互相轉換。rdd 執(zhí)行過程中會形成 dag 圖,然后形成 lineage 保證容錯性等。從物理的角度來看 rdd 存儲的是 block 和 node 之間的映射。
RDD 是 spark 提供的核心抽象,全稱為彈性分布式數據集。
RDD 在邏輯上是一個 hdfs 文件,在抽象上是一種元素集合,包含了數據。它是被分區(qū)的,分為多個分區(qū),每個分區(qū)分布在集群中的不同結點上,從而讓 RDD 中的數據可以被并行操作(分布式數據集)
比如有個 RDD 有 90W 數據,3 個 partition,則每個分區(qū)上有 30W 數據。RDD 通常通過 Hadoop 上的文件,即 HDFS 或者 HIVE 表來創(chuàng)建,還可以通過應用程序中的集合來創(chuàng)建;RDD 最重要的特性就是容錯性,可以自動從節(jié)點失敗中恢復過來。即如果某個結點上的 RDD partition 因為節(jié)點故障,導致數據丟失,那么 RDD 可以通過自己的數據來源重新計算該 partition。這一切對使用者都是透明的。
RDD 的數據默認存放在內存中,但是當內存資源不足時,spark 會自動將 RDD 數據寫入磁盤。比如某結點內存只能處理 20W 數據,那么這 20W 數據就會放入內存中計算,剩下 10W 放到磁盤中。RDD 的彈性體現在于 RDD 上自動進行內存和磁盤之間權衡和切換的機制。
4. RDD 中 reduceBykey 與 groupByKey 哪個性能好,為什么?
reduceByKey:reduceByKey 會在結果發(fā)送至 reducer 之前會對每個 mapper 在本地進行 merge,有點類似于在 MapReduce 中的 combiner。這樣做的好處在于,在 map 端進行一次 reduce 之后,數據量會大幅度減小,從而減小傳輸,保證 reduce 端能夠更快的進行結果計算。
groupByKey:groupByKey 會對每一個 RDD 中的 value 值進行聚合形成一個序列(Iterator),此操作發(fā)生在 reduce 端,所以勢必會將所有的數據通過網絡進行傳輸,造成不必要的浪費。同時如果數據量十分大,可能還會造成 OutOfMemoryError。
所以在進行大量數據的 reduce 操作時候建議使用 reduceByKey。不僅可以提高速度,還可以防止使用 groupByKey 造成的內存溢出問題。
5. 介紹一下 cogroup rdd 實現原理,你在什么場景下用過這個 rdd?
cogroup:對多個(2~4)RDD 中的 KV 元素,每個 RDD 中相同 key 中的元素分別聚合成一個集合。
與 reduceByKey 不同的是:reduceByKey 針對一個 RDD中相同的 key 進行合并。而 cogroup 針對多個 RDD中相同的 key 的元素進行合并。
cogroup 的函數實現:這個實現根據要進行合并的兩個 RDD 操作,生成一個 CoGroupedRDD 的實例,這個 RDD 的返回結果是把相同的 key 中兩個 RDD 分別進行合并操作,最后返回的 RDD 的 value 是一個 Pair 的實例,這個實例包含兩個 Iterable 的值,第一個值表示的是 RDD1 中相同 KEY 的值,第二個值表示的是 RDD2 中相同 key 的值。
由于做 cogroup 的操作,需要通過 partitioner 進行重新分區(qū)的操作,因此,執(zhí)行這個流程時,需要執(zhí)行一次 shuffle 的操作(如果要進行合并的兩個 RDD 的都已經是 shuffle 后的 rdd,同時他們對應的 partitioner 相同時,就不需要執(zhí)行 shuffle)。
場景:表關聯(lián)查詢或者處理重復的 key。
6. 如何區(qū)分 RDD 的寬窄依賴?
窄依賴:父 RDD 的一個分區(qū)只會被子 RDD 的一個分區(qū)依賴;
寬依賴:父 RDD 的一個分區(qū)會被子 RDD 的多個分區(qū)依賴(涉及到 shuffle)。
7. 為什么要設計寬窄依賴?
對于窄依賴:
窄依賴的多個分區(qū)可以并行計算;
窄依賴的一個分區(qū)的數據如果丟失只需要重新計算對應的分區(qū)的數據就可以了。對于寬依賴:
劃分 Stage(階段)的依據:對于寬依賴,必須等到上一階段計算完成才能計算下一階段。
8. DAG 是什么?
DAG(Directed Acyclic Graph 有向無環(huán)圖)指的是數據轉換執(zhí)行的過程,有方向,無閉環(huán)(其實就是 RDD 執(zhí)行的流程);
原始的 RDD 通過一系列的轉換操作就形成了 DAG 有向無環(huán)圖,任務執(zhí)行時,可以按照 DAG 的描述,執(zhí)行真正的計算(數據被操作的一個過程)。
9. DAG 中為什么要劃分 Stage?
并行計算。
一個復雜的業(yè)務邏輯如果有 shuffle,那么就意味著前面階段產生結果后,才能執(zhí)行下一個階段,即下一個階段的計算要依賴上一個階段的數據。那么我們按照 shuffle 進行劃分(也就是按照寬依賴就行劃分),就可以將一個 DAG 劃分成多個 Stage/階段,在同一個 Stage 中,會有多個算子操作,可以形成一個 pipeline 流水線,流水線內的多個平行的分區(qū)可以并行執(zhí)行。
10. 如何劃分 DAG 的 stage?
對于窄依賴,partition 的轉換處理在 stage 中完成計算,不劃分(將窄依賴盡量放在在同一個 stage 中,可以實現流水線計算)。
對于寬依賴,由于有 shuffle 的存在,只能在父 RDD 處理完成后,才能開始接下來的計算,也就是說需要要劃分 stage。
11. DAG 劃分為 Stage 的算法了解嗎?
核心算法:回溯算法
從后往前回溯/反向解析,遇到窄依賴加入本 Stage,遇見寬依賴進行 Stage 切分。
Spark 內核會從觸發(fā) Action 操作的那個 RDD 開始從后往前推,首先會為最后一個 RDD 創(chuàng)建一個 Stage,然后繼續(xù)倒推,如果發(fā)現對某個 RDD 是寬依賴,那么就會將寬依賴的那個 RDD 創(chuàng)建一個新的 Stage,那個 RDD 就是新的 Stage 的最后一個 RDD。然后依次類推,繼續(xù)倒推,根據窄依賴或者寬依賴進行 Stage 的劃分,直到所有的 RDD 全部遍歷完成為止。
具體劃分算法請參考:AMP 實驗室發(fā)表的論文
《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》http://xueshu.baidu.com/usercenter/paper/show?paperid=b33564e60f0a7e7a1889a9da10963461&site=xueshu_se
12. 對于 Spark 中的數據傾斜問題你有什么好的方案?
前提是定位數據傾斜,是 OOM 了,還是任務執(zhí)行緩慢,看日志,看 WebUI
解決方法,有多個方面:
避免不必要的 shuffle,如使用廣播小表的方式,將 reduce-side-join 提升為 map-side-join 分拆發(fā)生數據傾斜的記錄,分成幾個部分進行,然后合并 join 后的結果 改變并行度,可能并行度太少了,導致個別 task 數據壓力大 兩階段聚合,先局部聚合,再全局聚合 自定義 paritioner,分散 key 的分布,使其更加均勻
13. Spark 中的 OOM 問題?
map 類型的算子執(zhí)行中內存溢出如 flatMap,mapPatitions
原因:map 端過程產生大量對象導致內存溢出:這種溢出的原因是在單個 map 中產生了大量的對象導致的針對這種問題。
解決方案:
增加堆內內存。 在不增加內存的情況下,可以減少每個 Task 處理數據量,使每個 Task 產生大量的對象時,Executor 的內存也能夠裝得下。具體做法可以在會產生大量對象的 map 操作之前調用 repartition 方法,分區(qū)成更小的塊傳入 map。
shuffle 后內存溢出如 join,reduceByKey,repartition。
shuffle 內存溢出的情況可以說都是 shuffle 后,單個文件過大導致的。在 shuffle 的使用,需要傳入一個 partitioner,大部分 Spark 中的 shuffle 操作,默認的 partitioner 都是 HashPatitioner,默認值是父 RDD 中最大的分區(qū)數.這個參數 spark.default.parallelism 只對 HashPartitioner 有效.如果是別的 partitioner 導致的 shuffle 內存溢出就需要重寫 partitioner 代碼了.
driver 內存溢出
用戶在 Dirver 端口生成大對象,比如創(chuàng)建了一個大的集合數據結構。解決方案:將大對象轉換成 Executor 端加載,比如調用 sc.textfile 或者評估大對象占用的內存,增加 dirver 端的內存
從 Executor 端收集數據(collect)回 Dirver 端,建議將 driver 端對 collect 回來的數據所作的操作,轉換成 executor 端 rdd 操作。
14. Spark 中數據的位置是被誰管理的?
每個數據分片都對應具體物理位置,數據的位置是被blockManager管理,無論數據是在磁盤,內存還是 tacyan,都是由 blockManager 管理。
15. Spaek 程序執(zhí)行,有時候默認為什么會產生很多 task,怎么修改默認 task 執(zhí)行個數?
輸入數據有很多 task,尤其是有很多小文件的時候,有多少個輸入 block 就會有多少個 task 啟動;
spark 中有 partition 的概念,每個 partition 都會對應一個 task,task 越多,在處理大規(guī)模數據的時候,就會越有效率。不過 task 并不是越多越好,如果平時測試,或者數據量沒有那么大,則沒有必要 task 數量太多。
參數可以通過 spark_home/conf/spark-default.conf 配置文件設置:
針對 spark sql 的 task 數量:spark.sql.shuffle.partitions=50
非 spark sql 程序設置生效:spark.default.parallelism=10
16. 介紹一下 join 操作優(yōu)化經驗?
這道題???,這里只是給大家一個思路,簡單說下!面試之前還需做更多準備。
join 其實常見的就分為兩類:map-side join 和 reduce-side join。
當大表和小表 join 時,用 map-side join 能顯著提高效率。
將多份數據進行關聯(lián)是數據處理過程中非常普遍的用法,不過在分布式計算系統(tǒng)中,這個問題往往會變的非常麻煩,因為框架提供的 join 操作一般會將所有數據根據 key 發(fā)送到所有的 reduce 分區(qū)中去,也就是 shuffle 的過程。造成大量的網絡以及磁盤 IO 消耗,運行效率極其低下,這個過程一般被稱為 reduce-side-join。
如果其中有張表較小的話,我們則可以自己實現在 map 端實現數據關聯(lián),跳過大量數據進行 shuffle 的過程,運行時間得到大量縮短,根據不同數據可能會有幾倍到數十倍的性能提升。
在大數據量的情況下,join 是一中非常昂貴的操作,需要在 join 之前應盡可能的先縮小數據量。
對于縮小數據量,有以下幾條建議:
若兩個 RDD 都有重復的 key,join 操作會使得數據量會急劇的擴大。所有,最好先使用 distinct 或者 combineByKey 操作來減少 key 空間或者用 cogroup 來處理重復的 key,而不是產生所有的交叉結果。在 combine 時,進行機智的分區(qū),可以避免第二次 shuffle。
如果只在一個 RDD 出現,那你將在無意中丟失你的數據。所以使用外連接會更加安全,這樣你就能確保左邊的 RDD 或者右邊的 RDD 的數據完整性,在 join 之后再過濾數據。
如果我們容易得到 RDD 的可以的有用的子集合,那么我們可以先用 filter 或者 reduce,如何在再用 join。
17. Spark 與 MapReduce 的 Shuffle 的區(qū)別?
相同點:都是將 mapper(Spark 里是 ShuffleMapTask)的輸出進行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一個 stage 里的 ShuffleMapTask,也可能是 ResultTask)
不同點:
MapReduce 默認是排序的,spark 默認不排序,除非使用 sortByKey 算子。
MapReduce 可以劃分成 split,map()、spill、merge、shuffle、sort、reduce()等階段,spark 沒有明顯的階段劃分,只有不同的 stage 和算子操作。
MR 落盤,Spark 不落盤,spark 可以解決 mr 落盤導致效率低下的問題。
18. Spark SQL 執(zhí)行的流程?
這個問題如果深挖還挺復雜的,這里簡單介紹下總體流程:
parser:基于 antlr 框架對 sql 解析,生成抽象語法樹。
變量替換:通過正則表達式找出符合規(guī)則的字符串,替換成系統(tǒng)緩存環(huán)境的變量
SQLConf 中的
spark.sql.variable.substitute,默認是可用的;參考SparkSqlParser
parser:將 antlr 的 tree 轉成 spark catalyst 的 LogicPlan,也就是 未解析的邏輯計劃;詳細參考
AstBuild,ParseDriveranalyzer:通過分析器,結合 catalog,把 logical plan 和實際的數據綁定起來,將 未解析的邏輯計劃 生成 邏輯計劃;詳細參考
QureyExecution緩存替換:通過 CacheManager,替換有相同結果的 logical plan(邏輯計劃)
logical plan 優(yōu)化,基于規(guī)則的優(yōu)化;優(yōu)化規(guī)則參考 Optimizer,優(yōu)化執(zhí)行器 RuleExecutor
生成 spark plan,也就是物理計劃;參考
QueryPlanner和SparkStrategiesspark plan 準備階段
構造 RDD 執(zhí)行,涉及 spark 的 wholeStageCodegenExec 機制,基于 janino 框架生成 java 代碼并編譯
19. Spark SQL 是如何將數據寫到 Hive 表的?
方式一:是利用 Spark RDD 的 API 將數據寫入 hdfs 形成 hdfs 文件,之后再將 hdfs 文件和 hive 表做加載映射。
方式二:利用 Spark SQL 將獲取的數據 RDD 轉換成 DataFrame,再將 DataFrame 寫成緩存表,最后利用 Spark SQL 直接插入 hive 表中。而對于利用 Spark SQL 寫 hive 表官方有兩種常見的 API,第一種是利用 JavaBean 做映射,第二種是利用 StructType 創(chuàng)建 Schema 做映射。
20. 通常來說,Spark 與 MapReduce 相比,Spark 運行效率更高。請說明效率更高來源于 Spark 內置的哪些機制?
基于內存計算,減少低效的磁盤交互; 高效的調度算法,基于 DAG; 容錯機制 Linage。
重點部分就是 DAG 和 Lingae
21. Hadoop 和 Spark 的相同點和不同點?
Hadoop 底層使用 MapReduce 計算架構,只有 map 和 reduce 兩種操作,表達能力比較欠缺,而且在 MR 過程中會重復的讀寫 hdfs,造成大量的磁盤 io 讀寫操作,所以適合高時延環(huán)境下批處理計算的應用;
Spark 是基于內存的分布式計算架構,提供更加豐富的數據集操作類型,主要分成轉化操作和行動操作,包括 map、reduce、filter、flatmap、groupbykey、reducebykey、union 和 join 等,數據分析更加快速,所以適合低時延環(huán)境下計算的應用;
spark 與 hadoop 最大的區(qū)別在于迭代式計算模型?;?mapreduce 框架的 Hadoop 主要分為 map 和 reduce 兩個階段,兩個階段完了就結束了,所以在一個 job 里面能做的處理很有限;spark 計算模型是基于內存的迭代式計算模型,可以分為 n 個階段,根據用戶編寫的 RDD 算子和程序,在處理完一個階段后可以繼續(xù)往下處理很多個階段,而不只是兩個階段。所以 spark 相較于 mapreduce,計算模型更加靈活,可以提供更強大的功能。
但是 spark 也有劣勢,由于 spark 基于內存進行計算,雖然開發(fā)容易,但是真正面對大數據的時候,在沒有進行調優(yōu)的情況下,可能會出現各種各樣的問題,比如 OOM 內存溢出等情況,導致 spark 程序可能無法運行起來,而 mapreduce 雖然運行緩慢,但是至少可以慢慢運行完。
22. Hadoop 和 Spark 使用場景?
Hadoop/MapReduce 和 Spark 最適合的都是做離線型的數據分析,但 Hadoop 特別適合是單次分析的數據量“很大”的情景,而 Spark 則適用于數據量不是很大的情景。
一般情況下,對于中小互聯(lián)網和企業(yè)級的大數據應用而言,單次分析的數量都不會“很大”,因此可以優(yōu)先考慮使用 Spark。
業(yè)務通常認為 Spark 更適用于機器學習之類的“迭代式”應用,80GB 的壓縮數據(解壓后超過 200GB),10 個節(jié)點的集群規(guī)模,跑類似“sum+group-by”的應用,MapReduce 花了 5 分鐘,而 spark 只需要 2 分鐘。
23. Spark 如何保證宕機迅速恢復?
適當增加 spark standby master
編寫 shell 腳本,定期檢測 master 狀態(tài),出現宕機后對 master 進行重啟操作
24. RDD 持久化原理?
spark 非常重要的一個功能特性就是可以將 RDD 持久化在內存中。
調用 cache()和 persist()方法即可。cache()和 persist()的區(qū)別在于,cache()是 persist()的一種簡化方式,cache()的底層就是調用 persist()的無參版本 persist(MEMORY_ONLY),將數據持久化到內存中。
如果需要從內存中清除緩存,可以使用 unpersist()方法。RDD 持久化是可以手動選擇不同的策略的。在調用 persist()時傳入對應的 StorageLevel 即可。
25. Checkpoint 檢查點機制?
應用場景:當 spark 應用程序特別復雜,從初始的 RDD 開始到最后整個應用程序完成有很多的步驟,而且整個應用運行時間特別長,這種情況下就比較適合使用 checkpoint 功能。
原因:對于特別復雜的 Spark 應用,會出現某個反復使用的 RDD,即使之前持久化過但由于節(jié)點的故障導致數據丟失了,沒有容錯機制,所以需要重新計算一次數據。
Checkpoint 首先會調用 SparkContext 的 setCheckPointDIR()方法,設置一個容錯的文件系統(tǒng)的目錄,比如說 HDFS;然后對 RDD 調用 checkpoint()方法。之后在 RDD 所處的 job 運行結束之后,會啟動一個單獨的 job,來將 checkpoint 過的 RDD 數據寫入之前設置的文件系統(tǒng),進行高可用、容錯的類持久化操作。
檢查點機制是我們在 spark streaming 中用來保障容錯性的主要機制,它可以使 spark streaming 階段性的把應用數據存儲到諸如 HDFS 等可靠存儲系統(tǒng)中,以供恢復時使用。具體來說基于以下兩個目的服務:
控制發(fā)生失敗時需要重算的狀態(tài)數。Spark streaming 可以通過轉化圖的譜系圖來重算狀態(tài),檢查點機制則可以控制需要在轉化圖中回溯多遠。
提供驅動器程序容錯。如果流計算應用中的驅動器程序崩潰了,你可以重啟驅動器程序并讓驅動器程序從檢查點恢復,這樣 spark streaming 就可以讀取之前運行的程序處理數據的進度,并從那里繼續(xù)。
26. Checkpoint 和持久化機制的區(qū)別?
最主要的區(qū)別在于持久化只是將數據保存在 BlockManager 中,但是 RDD 的 lineage(血緣關系,依賴關系)是不變的。但是 checkpoint 執(zhí)行完之后,rdd 已經沒有之前所謂的依賴 rdd 了,而只有一個強行為其設置的 checkpointRDD,checkpoint 之后 rdd 的 lineage 就改變了。
持久化的數據丟失的可能性更大,因為節(jié)點的故障會導致磁盤、內存的數據丟失。但是 checkpoint 的數據通常是保存在高可用的文件系統(tǒng)中,比如 HDFS 中,所以數據丟失可能性比較低
27. Spark Streaming 以及基本工作原理?
Spark streaming 是 spark core API 的一種擴展,可以用于進行大規(guī)模、高吞吐量、容錯的實時數據流的處理。
它支持從多種數據源讀取數據,比如 Kafka、Flume、Twitter 和 TCP Socket,并且能夠使用算子比如 map、reduce、join 和 window 等來處理數據,處理后的數據可以保存到文件系統(tǒng)、數據庫等存儲中。
Spark streaming 內部的基本工作原理是:接受實時輸入數據流,然后將數據拆分成 batch,比如每收集一秒的數據封裝成一個 batch,然后將每個 batch 交給 spark 的計算引擎進行處理,最后會生產處一個結果數據流,其中的數據也是一個一個的 batch 組成的。
28. DStream 以及基本工作原理?
DStream 是 spark streaming 提供的一種高級抽象,代表了一個持續(xù)不斷的數據流。
DStream 可以通過輸入數據源來創(chuàng)建,比如 Kafka、flume 等,也可以通過其他 DStream 的高階函數來創(chuàng)建,比如 map、reduce、join 和 window 等。
DStream 內部其實不斷產生 RDD,每個 RDD 包含了一個時間段的數據。
Spark streaming 一定是有一個輸入的 DStream 接收數據,按照時間劃分成一個一個的 batch,并轉化為一個 RDD,RDD 的數據是分散在各個子節(jié)點的 partition 中。
29. Spark Streaming 整合 Kafka 的兩種模式?
receiver 方式:將數據拉取到 executor 中做操作,若數據量大,內存存儲不下,可以通過 WAL,設置了本地存儲,保證數據不丟失,然后使用 Kafka 高級 API 通過 zk 來維護偏移量,保證消費數據。receiver 消費的數據偏移量是在 zk 獲取的,此方式效率低,容易出現數據丟失。
receiver 方式的容錯性:在默認的配置下,這種方式可能會因為底層的失敗而丟失數據。如果要啟用高可靠機制,讓數據零丟失,就必須啟用 Spark Streaming 的預寫日志機制(Write Ahead Log,WAL)。該機制會同步地將接收到的 Kafka 數據寫入分布式文件系統(tǒng)(比如 HDFS)上的預寫日志中。所以,即使底層節(jié)點出現了失敗,也可以使用預寫日志中的數據進行恢復。
Kafka 中的 topic 的 partition,與 Spark 中的 RDD 的 partition 是沒有關系的。在 1、KafkaUtils.createStream()中,提高 partition 的數量,只會增加 Receiver 方式中讀取 partition 的線程的數量。不會增加 Spark 處理數據的并行度。可以創(chuàng)建多個 Kafka 輸入 DStream,使用不同的 consumer group 和 topic,來通過多個 receiver 并行接收數據。
基于 Direct 方式:使用 Kafka 底層 Api,其消費者直接連接 kafka 的分區(qū)上,因為 createDirectStream 創(chuàng)建的 DirectKafkaInputDStream 每個 batch 所對應的 RDD 的分區(qū)與 kafka 分區(qū)一一對應,但是需要自己維護偏移量,即用即取,不會給內存造成太大的壓力,效率高。
優(yōu)點:簡化并行讀?。喝绻x取多個 partition,不需要創(chuàng)建多個輸入 DStream 然后對它們進行 union 操作。Spark 會創(chuàng)建跟 Kafka partition 一樣多的 RDD partition,并且會并行從 Kafka 中讀取數據。所以在 Kafka partition 和 RDD partition 之間,有一個一對一的映射關系。
高性能:如果要保證零數據丟失,在基于 receiver 的方式中,需要開啟 WAL 機制。這種方式其實效率低下,因為數據實際上被復制了兩份,Kafka 自己本身就有高可靠的機制,會對數據復制一份,而這里又會復制一份到 WAL 中。而基于 direct 的方式,不依賴 Receiver,不需要開啟 WAL 機制,只要 Kafka 中作了數據的復制,那么就可以通過 Kafka 的副本進行恢復。
receiver 與和 direct 的比較:
基于 receiver 的方式,是使用 Kafka 的高階 API 來在 ZooKeeper 中保存消費過的 offset 的。這是消費 Kafka 數據的傳統(tǒng)方式。這種方式配合著 WAL 機制可以保證數據零丟失的高可靠性,但是卻無法保證數據被處理一次且僅一次,可能會處理兩次。因為 Spark 和 ZooKeeper 之間可能是不同步的。
基于 direct 的方式,使用 Kafka 的低階 API,Spark Streaming 自己就負責追蹤消費的 offset,并保存在 checkpoint 中。Spark 自己一定是同步的,因此可以保證數據是消費一次且僅消費一次。
Receiver 方式是通過 zookeeper 來連接 kafka 隊列,Direct 方式是直接連接到 kafka 的節(jié)點上獲取數據。
30. Spark 主備切換機制原理知道嗎?
Master 實際上可以配置兩個,Spark 原生的 standalone 模式是支持 Master 主備切換的。當 Active Master 節(jié)點掛掉以后,我們可以將 Standby Master 切換為 Active Master。
Spark Master 主備切換可以基于兩種機制,一種是基于文件系統(tǒng)的,一種是基于 ZooKeeper 的。
基于文件系統(tǒng)的主備切換機制,需要在 Active Master 掛掉之后手動切換到 Standby Master 上;
而基于 Zookeeper 的主備切換機制,可以實現自動切換 Master。
31. Spark 解決了 Hadoop 的哪些問題?
MR:抽象層次低,需要使用手工代碼來完成程序編寫,使用上難以上手;
Spark:Spark 采用 RDD 計算模型,簡單容易上手。
MR:只提供 map 和 reduce 兩個操作,表達能力欠缺;
Spark:Spark 采用更加豐富的算子模型,包括 map、flatmap、groupbykey、reducebykey 等;
MR:一個 job 只能包含 map 和 reduce 兩個階段,復雜的任務需要包含很多個 job,這些 job 之間的管理以來需要開發(fā)者自己進行管理;
Spark:Spark 中一個 job 可以包含多個轉換操作,在調度時可以生成多個 stage,而且如果多個 map 操作的分區(qū)不變,是可以放在同一個 task 里面去執(zhí)行;
MR:中間結果存放在 hdfs 中;
Spark:Spark 的中間結果一般存在內存中,只有當內存不夠了,才會存入本地磁盤,而不是 hdfs;
MR:只有等到所有的 map task 執(zhí)行完畢后才能執(zhí)行 reduce task;
Spark:Spark 中分區(qū)相同的轉換構成流水線在一個 task 中執(zhí)行,分區(qū)不同的需要進行 shuffle 操作,被劃分成不同的 stage 需要等待前面的 stage 執(zhí)行完才能執(zhí)行。
MR:只適合 batch 批處理,時延高,對于交互式處理和實時處理支持不夠;
Spark:Spark streaming 可以將流拆成時間間隔的 batch 進行處理,實時計算。
32. 數據傾斜的產生和解決辦法?
數據傾斜以為著某一個或者某幾個 partition 的數據特別大,導致這幾個 partition 上的計算需要耗費相當長的時間。
在 spark 中同一個應用程序劃分成多個 stage,這些 stage 之間是串行執(zhí)行的,而一個 stage 里面的多個 task 是可以并行執(zhí)行,task 數目由 partition 數目決定,如果一個 partition 的數目特別大,那么導致這個 task 執(zhí)行時間很長,導致接下來的 stage 無法執(zhí)行,從而導致整個 job 執(zhí)行變慢。
避免數據傾斜,一般是要選用合適的 key,或者自己定義相關的 partitioner,通過加鹽或者哈希值來拆分這些 key,從而將這些數據分散到不同的 partition 去執(zhí)行。
如下算子會導致 shuffle 操作,是導致數據傾斜可能發(fā)生的關鍵點所在:groupByKey;reduceByKey;aggregaByKey;join;cogroup;
33. 你用 Spark Sql 處理的時候, 處理過程中用的 DataFrame 還是直接寫的 Sql?為什么?
這個問題的宗旨是問你 spark sql 中 dataframe 和 sql 的區(qū)別,從執(zhí)行原理、操作方便程度和自定義程度來分析 這個問題。
34. Spark Master HA 主從切換過程不會影響到集群已有作業(yè)的運行,為什么?
不會的。
因為程序在運行之前,已經申請過資源了,driver 和 Executors 通訊,不需要和 master 進行通訊的。
35. Spark Master 使用 Zookeeper 進行 HA,有哪些源數據保存到 Zookeeper 里面?
spark 通過這個參數 spark.deploy.zookeeper.dir 指定 master 元數據在 zookeeper 中保存的位置,包括 Worker,Driver 和 Application 以及 Executors。standby 節(jié)點要從 zk 中,獲得元數據信息,恢復集群運行狀態(tài),才能對外繼續(xù)提供服務,作業(yè)提交資源申請等,在恢復前是不能接受請求的。
注:Master 切換需要注意 2 點:
1、在 Master 切換的過程中,所有的已經在運行的程序皆正常運行!因為 Spark Application 在運行前就已經通過 Cluster Manager 獲得了計算資源,所以在運行時 Job 本身的 調度和處理和 Master 是沒有任何關系。
2、在 Master 的切換過程中唯一的影響是不能提交新的 Job:一方面不能夠提交新的應用程序給集群, 因為只有 Active Master 才能接受新的程序的提交請求;另外一方面,已經運行的程序中也不能夠因 Action 操作觸發(fā)新的 Job 的提交請求。
