講一些關(guān)于Spark的Broadcast你不知道的細節(jié)
問題:為什么只能 broadcast 只讀的變量?
問題:broadcast 到節(jié)點而不是 broadcast 到每個 task?
問題:具體怎么用 broadcast?
val data = List(1, 2, 3, 4, 5, 6)
val bdata = sc.broadcast(data)
val rdd = sc.parallelize(1 to 6, 2)
val observedSizes = rdd.map(_ => bdata.value.size)
sc.broadcast() 聲明要 broadcast 的 data,bdata 的類型是 Broadcast。rdd.transformation(func) 需要用 bdata 時,直接在 func 中調(diào)用,比如上面的例子中的 map() 就使用了 bdata.value.size。問題:怎么實現(xiàn) broadcast?
val bdata = sc.broadcast(data)時就把 data 寫入文件夾,同時寫入 driver 自己的 blockManger 中(StorageLevel 為內(nèi)存+磁盤),獲得一個 blockId,類型為 BroadcastBlockId。當調(diào)用rdd.transformation(func)時,如果 func 用到了 bdata,那么 driver submitTask() 的時候會將 bdata 一同 func 進行序列化得到 serialized task,注意序列化的時候不會序列化 bdata 中包含的 data。上一章講到 serialized task 從 driverActor 傳遞到 executor 時使用 Akka 的傳消息機制,消息不能太大,而實際的 data 可能很大,所以這時候還不能 broadcast data。driver 為什么會同時將 data 放到磁盤和 blockManager 里面?放到磁盤是為了讓 HttpServer 訪問到,放到 blockManager 是為了讓 driver program 自身使用 bdata 時方便(其實我覺得不放到 blockManger 里面也行)。
sc.broadcast(data)后會調(diào)用工廠方法建立一個 HttpBroadcast 對象。該對象做的第一件事就是將 data 存到 driver 的 blockManager 里面,StorageLevel 為內(nèi)存+磁盤,blockId 類型為 BroadcastBlockId。/var/folders/87/grpn1_fn4xq5wdqmxk31v0l00000gp/T/spark-6233b09c-3c72-4a4d-832b-6c0791d0eb9c/broadcast_0, 這個文件夾作為 HttpServer 的文件目錄。Driver 和 executor 啟動的時候,都會生成 broadcastManager 對象,調(diào)用 HttpBroadcast.initialize(),driver 會在本地建立一個臨時目錄用來存放 broadcast 的 data,并啟動可以訪問該目錄的 httpServer。


driver 端:
spark.broadcast.blockSize = 4MB 設(shè)置)大小的 data block,每個 data block 被 TorrentBlock 對象持有。切割完 byteArray 后,會將其回收,因此內(nèi)存消耗雖然可以達到 2 * Size(data),但這是暫時的。Executor 端:
arrayOfBlocks = new Array[TorrentBlock](totalBlocks),TorrentBlock 是對 data block 的包裝。然后打亂要 fetch 的 data blocks 的順序,比如如果 data block 共有 5 個,那么打亂后的 fetch 順序可能是 3-1-2-4-5。然后按照打亂后的順序去 fetch 一個個 data block。fetch 的過程就是通過 “本地 blockManager -本地 connectionManager-driver/executor 的 connectionManager-driver/executor 的 blockManager-data” 得到 data,這個過程與 fetch cached rdd 類似。每 fetch 到一個 block 就將其存放到 executor 的 blockManager 里面,同時通知 driver 上的 blockManagerMaster 說該 data block 多了一個存儲地址。這一步通知非常重要,意味著 blockManagerMaster 知道 data block 現(xiàn)在在 cluster 中有多份,下一個不同節(jié)點上的 task 再去 fetch 這個 data block 的時候,可以有兩個選擇了,而且會隨機選擇一個去 fetch。這個過程持續(xù)下去就是 BT 協(xié)議,隨著下載的客戶端越來越多,data block 服務(wù)器也越來越多,就變成 p2p下載了。關(guān)于 BT 協(xié)議)。問題:broadcast RDD 會怎樣?
-libjars就是使用 DistributedCache 來將 task 依賴的 jars 分發(fā)到每個 task 的工作目錄。不過分發(fā)前 DistributedCache 要先將文件上傳到 HDFS。這種方式的主要問題是資源浪費,如果某個節(jié)點上要運行來自同一 job 的 4 個 mapper,那么公共數(shù)據(jù)會在該節(jié)點上存在 4 份(每個 task 的工作目錄會有一份)。但是通過 HDFS 進行 broadcast 的好處在于單點瓶頸不明顯,因為公共 data 首先被分成多個 block,然后不同的 block 存放在不同的節(jié)點。這樣,只要所有的 task 不是同時去同一個節(jié)點 fetch 同一個 block,網(wǎng)絡(luò)擁塞不會很嚴重。評論
圖片
表情
