<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          講一些關(guān)于Spark的Broadcast你不知道的細節(jié)

          共 7078字,需瀏覽 15分鐘

           ·

          2021-09-17 08:51

          顧名思義,broadcast 就是將數(shù)據(jù)從一個節(jié)點發(fā)送到其他各個節(jié)點上去。這樣的場景很多,比如 driver 上有一張表,其他節(jié)點上運行的 task 需要 lookup 這張表,那么 driver 可以先把這張表 copy 到這些節(jié)點,這樣 task 就可以在本地查表了。如何實現(xiàn)一個可靠高效的 broadcast 機制是一個有挑戰(zhàn)性的問題。先看看 Spark 官網(wǎng)上的一段話:
          Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

          問題:為什么只能 broadcast 只讀的變量?

          這就涉及一致性的問題,如果變量可以被更新,那么一旦變量被某個節(jié)點更新,其他節(jié)點要不要一塊更新?如果多個節(jié)點同時在更新,更新順序是什么?怎么做同步?還會涉及 fault-tolerance 的問題。為了避免維護數(shù)據(jù)一致性問題,Spark 目前只支持 broadcast 只讀變量。

          問題:broadcast 到節(jié)點而不是 broadcast 到每個 task?

          因為每個 task 是一個線程,而且同在一個進程運行 tasks 都屬于同一個 application。因此每個節(jié)點(executor)上放一份就可以被所有 task 共享。

          問題:具體怎么用 broadcast?

          driver program 例子:
          val data = List(1, 2, 3, 4, 5, 6)
          val bdata = sc.broadcast(data)

          val rdd = sc.parallelize(1 to 6, 2)
          val observedSizes = rdd.map(_ => bdata.value.size)
          driver 使用 sc.broadcast() 聲明要 broadcast 的 data,bdata 的類型是 Broadcast。
          當 rdd.transformation(func) 需要用 bdata 時,直接在 func 中調(diào)用,比如上面的例子中的 map() 就使用了 bdata.value.size。

          問題:怎么實現(xiàn) broadcast?

          broadcast 的實現(xiàn)機制很有意思:

          -  分發(fā)task的時候先分發(fā)bdata的元信息-

          Driver 先建一個本地文件夾用以存放需要 broadcast 的 data,并啟動一個可以訪問該文件夾的 HttpServer。當調(diào)用val bdata = sc.broadcast(data)時就把 data 寫入文件夾,同時寫入 driver 自己的 blockManger 中(StorageLevel 為內(nèi)存+磁盤),獲得一個 blockId,類型為 BroadcastBlockId。當調(diào)用rdd.transformation(func)時,如果 func 用到了 bdata,那么 driver submitTask() 的時候會將 bdata 一同 func 進行序列化得到 serialized task,注意序列化的時候不會序列化 bdata 中包含的 data。上一章講到 serialized task 從 driverActor 傳遞到 executor 時使用 Akka 的傳消息機制,消息不能太大,而實際的 data 可能很大,所以這時候還不能 broadcast data。
          driver 為什么會同時將 data 放到磁盤和 blockManager 里面?放到磁盤是為了讓 HttpServer 訪問到,放到 blockManager 是為了讓 driver program 自身使用 bdata 時方便(其實我覺得不放到 blockManger 里面也行)。
          那么什么時候傳送真正的 data?在 executor 反序列化 task 的時候,會同時反序列化 task 中的 bdata 對象,這時候會調(diào)用 bdata 的 readObject() 方法。該方法先去本地 blockManager 那里詢問 bdata 的 data 在不在 blockManager 里面,如果不在就使用下面的兩種 fetch 方式之一去將 data fetch 過來。得到 data 后,將其存放到 blockManager 里面,這樣后面運行的 task 如果需要 bdata 就不需要再去 fetch data 了。如果在,就直接拿來用了。
          下面探討 broadcast data 時候的兩種實現(xiàn)方式:

          -  HttpBroadcast -

          顧名思義,HttpBroadcast 就是每個 executor 通過的 http 協(xié)議連接 driver 并從 driver 那里 fetch data。
          Driver 先準備好要 broadcast 的 data,調(diào)用sc.broadcast(data)后會調(diào)用工廠方法建立一個 HttpBroadcast 對象。該對象做的第一件事就是將 data 存到 driver 的 blockManager 里面,StorageLevel 為內(nèi)存+磁盤,blockId 類型為 BroadcastBlockId。
          同時 driver 也會將 broadcast 的 data 寫到本地磁盤,例如寫入后得到 /var/folders/87/grpn1_fn4xq5wdqmxk31v0l00000gp/T/spark-6233b09c-3c72-4a4d-832b-6c0791d0eb9c/broadcast_0, 這個文件夾作為 HttpServer 的文件目錄。
          Driver 和 executor 啟動的時候,都會生成 broadcastManager 對象,調(diào)用 HttpBroadcast.initialize(),driver 會在本地建立一個臨時目錄用來存放 broadcast 的 data,并啟動可以訪問該目錄的 httpServer。
          Fetch data:在 executor 反序列化 task 的時候,會同時反序列化 task 中的 bdata 對象,這時候會調(diào)用 bdata 的 readObject() 方法。該方法先去本地 blockManager 那里詢問 bdata 的 data 在不在 blockManager 里面,如果不在就使用 http 協(xié)議連接 driver 上的 httpServer,將 data fetch 過來。得到 data 后,將其存放到 blockManager 里面,這樣后面運行的 task 如果需要 bdata 就不需要再去 fetch data 了。如果在,就直接拿來用了。
          HttpBroadcast 最大的問題就是 driver 所在的節(jié)點可能會出現(xiàn)網(wǎng)絡(luò)擁堵,因為 worker 上的 executor 都會去 driver 那里 fetch 數(shù)據(jù)。

          -  TorrentBoadcast -

          為了解決 HttpBroadast 中 driver 單點網(wǎng)絡(luò)瓶頸的問題,Spark 又設(shè)計了一種 broadcast 的方法稱為 TorrentBroadcast,這個類似于大家常用的 BitTorrent 技術(shù)。基本思想就是將 data 分塊成 data blocks,然后假設(shè)有 executor fetch 到了一些 data blocks,那么這個 executor 就可以被當作 data server 了,隨著 fetch 的 executor 越來越多,有更多的 data server 加入,data 就很快能傳播到全部的 executor 那里去了。
          HttpBroadcast 是通過傳統(tǒng)的 http 協(xié)議和 httpServer 去傳 data,在 TorrentBroadcast 里面使用在上一章介紹的 blockManager.getRemote() => NIO ConnectionManager 傳數(shù)據(jù)的方法來傳遞,讀取數(shù)據(jù)的過程與讀取 cached rdd 的方式類似。

          下面討論 TorrentBroadcast 的一些細節(jié):

          driver 端:

          Driver 先把 data 序列化到 byteArray,然后切割成 BLOCK_SIZE(由 spark.broadcast.blockSize = 4MB 設(shè)置)大小的 data block,每個 data block 被 TorrentBlock 對象持有。切割完 byteArray 后,會將其回收,因此內(nèi)存消耗雖然可以達到 2 * Size(data),但這是暫時的。
          完成分塊切割后,就將分塊信息(稱為 meta 信息)存放到 driver 自己的 blockManager 里面,StorageLevel 為內(nèi)存+磁盤,同時會通知 driver 自己的 blockManagerMaster 說 meta 信息已經(jīng)存放好。通知 blockManagerMaster 這一步很重要,因為 blockManagerMaster 可以被 driver 和所有 executor 訪問到,信息被存放到 blockManagerMaster 就變成了全局信息。
          之后將每個分塊 data block 存放到 driver 的 blockManager 里面,StorageLevel 為內(nèi)存+磁盤。存放后仍然通知 blockManagerMaster 說 blocks 已經(jīng)存放好。到這一步,driver 的任務(wù)已經(jīng)完成。

          Executor 端:

          executor 收到 serialized task 后,先反序列化 task,這時候會反序列化 serialized task 中包含的 bdata 類型是 TorrentBroadcast,也就是去調(diào)用 TorrentBroadcast.readObject()。這個方法首先得到 bdata 對象,然后發(fā)現(xiàn) bdata 里面沒有包含實際的 data。怎么辦?先詢問所在的 executor 里的 blockManager 是會否包含 data(通過查詢 data 的 broadcastId),包含就直接從本地 blockManager 讀取 data。否則,就通過本地 blockManager 去連接 driver 的 blockManagerMaster 獲取 data 分塊的 meta 信息,獲取信息后,就開始了 BT 過程。
          BT 過程:task 先在本地開一個數(shù)組用于存放將要 fetch 過來的 data blocks arrayOfBlocks = new Array[TorrentBlock](totalBlocks),TorrentBlock 是對 data block 的包裝。然后打亂要 fetch 的 data blocks 的順序,比如如果 data block 共有 5 個,那么打亂后的 fetch 順序可能是 3-1-2-4-5。然后按照打亂后的順序去 fetch 一個個 data block。fetch 的過程就是通過 “本地 blockManager -本地 connectionManager-driver/executor 的 connectionManager-driver/executor 的 blockManager-data” 得到 data,這個過程與 fetch cached rdd 類似。每 fetch 到一個 block 就將其存放到 executor 的 blockManager 里面,同時通知 driver 上的 blockManagerMaster 說該 data block 多了一個存儲地址。這一步通知非常重要,意味著 blockManagerMaster 知道 data block 現(xiàn)在在 cluster 中有多份,下一個不同節(jié)點上的 task 再去 fetch 這個 data block 的時候,可以有兩個選擇了,而且會隨機選擇一個去 fetch。這個過程持續(xù)下去就是 BT 協(xié)議,隨著下載的客戶端越來越多,data block 服務(wù)器也越來越多,就變成 p2p下載了。關(guān)于 BT 協(xié)議)。
          整個 fetch 過程結(jié)束后,task 會開一個大 Array[Byte],大小為 data 的總大小,然后將 data block 都 copy 到這個 Array,然后對 Array 中 bytes 進行反序列化得到原始的 data,這個過程就是 driver 序列化 data 的反過程。
          最后將 data 存放到 task 所在 executor 的 blockManager 里面,StorageLevel 為內(nèi)存+磁盤。顯然,這時候 data 在 blockManager 里存了兩份,不過等全部 executor 都 fetch 結(jié)束,存儲 data blocks 那份可以刪掉了。

          問題:broadcast RDD 會怎樣?

          不會怎樣,就是這個rdd在每個executor中實例化一份。

          -  Discussion -

          公共數(shù)據(jù)的 broadcast 是很實用的功能,在 Hadoop 中使用DistributedCache,比如常用的-libjars就是使用 DistributedCache 來將 task 依賴的 jars 分發(fā)到每個 task 的工作目錄。不過分發(fā)前 DistributedCache 要先將文件上傳到 HDFS。這種方式的主要問題是資源浪費,如果某個節(jié)點上要運行來自同一 job 的 4 個 mapper,那么公共數(shù)據(jù)會在該節(jié)點上存在 4 份(每個 task 的工作目錄會有一份)。但是通過 HDFS 進行 broadcast 的好處在于單點瓶頸不明顯,因為公共 data 首先被分成多個 block,然后不同的 block 存放在不同的節(jié)點。這樣,只要所有的 task 不是同時去同一個節(jié)點 fetch 同一個 block,網(wǎng)絡(luò)擁塞不會很嚴重。
          對于 Spark 來講,broadcast 時考慮的不僅是如何將公共 data 分發(fā)下去的問題,還要考慮如何讓同一節(jié)點上的 task 共享 data。
          對于第一個問題,Spark 設(shè)計了兩種 broadcast 的方式,傳統(tǒng)存在單點瓶頸問題的 HttpBroadcast,和類似 BT 方式的 TorrentBroadcast。HttpBroadcast 使用傳統(tǒng)的 client-server 形式的 HttpServer 來傳遞真正的 data,而 TorrentBroadcast 使用 blockManager 自帶的 NIO 通信方式來傳遞 data。TorrentBroadcast 存在的問題是慢啟動占內(nèi)存,慢啟動指的是剛開始 data 只在 driver 上有,要等 executors fetch 很多輪 data block 后,data server 才會變得可觀,后面的 fetch 速度才會變快。executor 所占內(nèi)存的在 fetch 完 data blocks 后進行反序列化時需要將近兩倍 data size 的內(nèi)存消耗。不管哪一種方式,driver 在分塊時會有兩倍 data size 的內(nèi)存消耗。
          對于第二個問題,每個 executor 都包含一個 blockManager 用來管理存放在 executor 里的數(shù)據(jù),將公共數(shù)據(jù)存放在 blockManager 中(StorageLevel 為內(nèi)存+磁盤),可以保證在 executor 執(zhí)行的 tasks 能夠共享 data。
          更深入點,broadcast 可以用多播協(xié)議來做,不過多播使用 UDP,不是可靠的,仍然需要應(yīng)用層的設(shè)計一些可靠性保障機制。

          文章來源:
          https://mp.weixin.qq.com/s/26DymNZAzbTxr9x5gRZP5w
          瀏覽 21
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  91干色| 日韩视频-熊猫成人网 | wwwjiujiu | 国产91女技师一区二区三区 | 日本级婬乱片A片AAA毛片A |