<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spark的Cache和Checkpoint區(qū)別和聯(lián)系拾遺

          共 8658字,需瀏覽 18分鐘

           ·

          2021-04-11 23:33

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)
          回復(fù)”資源“獲取更多資源


          作為區(qū)別于 Hadoop 的一個(gè)重要 feature,cache 機(jī)制保證了需要訪問(wèn)重復(fù)數(shù)據(jù)的應(yīng)用(如迭代型算法和交互式應(yīng)用)可以運(yùn)行的更快。與 Hadoop MapReduce job 不同的是 Spark 的邏輯/物理執(zhí)行圖可能很龐大,task 中 computing chain 可能會(huì)很長(zhǎng),計(jì)算某些 RDD 也可能會(huì)很耗時(shí)。這時(shí),如果 task 中途運(yùn)行出錯(cuò),那么 task 的整個(gè) computing chain 需要重算,代價(jià)太高。因此,有必要將計(jì)算代價(jià)較大的 RDD checkpoint 一下,這樣,當(dāng)下游 RDD 計(jì)算出錯(cuò)時(shí),可以直接從 checkpoint 過(guò)的 RDD 那里讀取數(shù)據(jù)繼續(xù)算。

          Cache 機(jī)制

          回到 Overview 提到的 GroupByTest 的例子,里面對(duì) FlatMappedRDD 進(jìn)行了 cache,這樣 Job 1 在執(zhí)行時(shí)就直接從 FlatMappedRDD 開(kāi)始算了??梢?jiàn) cache 能夠讓重復(fù)數(shù)據(jù)在同一個(gè) application 中的 jobs 間共享。

          邏輯執(zhí)行圖:

          物理執(zhí)行圖:

          問(wèn)題:哪些 RDD 需要 cache?

          會(huì)被重復(fù)使用的(但不能太大)。

          問(wèn)題:用戶怎么設(shè)定哪些 RDD 要 cache?

          因?yàn)橛脩糁慌c driver program 打交道,因此只能用 rdd.cache() 去 cache 用戶能看到的 RDD。所謂能看到指的是調(diào)用 transformation() 后生成的 RDD,而某些在 transformation() 中 Spark 自己生成的 RDD 是不能被用戶直接 cache 的,比如 reduceByKey() 中會(huì)生成的 ShuffledRDD、MapPartitionsRDD 是不能被用戶直接 cache 的。

          問(wèn)題:driver program 設(shè)定 rdd.cache() 后,系統(tǒng)怎么對(duì) RDD 進(jìn)行 cache?

          先不看實(shí)現(xiàn),自己來(lái)想象一下如何完成 cache:當(dāng) task 計(jì)算得到 RDD 的某個(gè) partition 的第一個(gè) record 后,就去判斷該 RDD 是否要被 cache,如果要被 cache 的話,將這個(gè) record 及后續(xù)計(jì)算的到的 records 直接丟給本地 blockManager 的 memoryStore,如果 memoryStore 存不下就交給 diskStore 存放到磁盤(pán)。

          實(shí)際實(shí)現(xiàn)與設(shè)想的基本類(lèi)似,區(qū)別在于:將要計(jì)算 RDD partition 的時(shí)候(而不是已經(jīng)計(jì)算得到第一個(gè) record 的時(shí)候)就去判斷 partition 要不要被 cache。如果要被 cache 的話,先將 partition 計(jì)算出來(lái),然后 cache 到內(nèi)存。cache 只使用 memory,寫(xiě)磁盤(pán)的話那就叫 checkpoint 了。

          調(diào)用 rdd.cache() 后, rdd 就變成 persistRDD 了,其 StorageLevel 為 MEMORY_ONLY。persistRDD 會(huì)告知 driver 說(shuō)自己是需要被 persist 的。

          如果用代碼表示:

          rdd.iterator()
          => SparkEnv.get.cacheManager.getOrCompute(thisRDD, split, context, storageLevel)
          => key = RDDBlockId(rdd.id, split.index)
          => blockManager.get(key)
          => computedValues = rdd.computeOrReadCheckpoint(split, context)
          if (isCheckpointed) firstParent[T].iterator(split, context)
          else compute(split, context)
          => elements = new ArrayBuffer[Any]
          => elements ++= computedValues
          => updatedBlocks = blockManager.put(key, elements, tellMaster = true)

          當(dāng) rdd.iterator() 被調(diào)用的時(shí)候,也就是要計(jì)算該 rdd 中某個(gè) partition 的時(shí)候,會(huì)先去 cacheManager 那里領(lǐng)取一個(gè) blockId,表明是要存哪個(gè) RDD 的哪個(gè) partition,這個(gè) blockId 類(lèi)型是 RDDBlockId(memoryStore 里面可能還存放有 task 的 result 等數(shù)據(jù),因此 blockId 的類(lèi)型是用來(lái)區(qū)分不同的數(shù)據(jù))。然后去 blockManager 里面查看該 partition 是不是已經(jīng)被 checkpoint 了,如果是,表明以前運(yùn)行過(guò)該 task,那就不用計(jì)算該 partition 了,直接從 checkpoint 中讀取該 partition 的所有 records 放到叫做 elements 的 ArrayBuffer 里面。如果沒(méi)有被 checkpoint 過(guò),先將 partition 計(jì)算出來(lái),然后將其所有 records 放到 elements 里面。最后將 elements 交給 blockManager 進(jìn)行 cache。

          blockManager 將 elements(也就是 partition) 存放到 memoryStore 管理的 LinkedHashMap[BlockId, Entry] 里面。如果 partition 大于 memoryStore 的存儲(chǔ)極限(默認(rèn)是 60% 的 heap),那么直接返回說(shuō)存不下。如果剩余空間也許能放下,會(huì)先 drop 掉一些早先被 cached 的 RDD 的 partition,為新來(lái)的 partition 騰地方,如果騰出的地方夠,就把新來(lái)的 partition 放到 LinkedHashMap 里面,騰不出就返回說(shuō)存不下。注意 drop 的時(shí)候不會(huì)去 drop 與新來(lái)的 partition 同屬于一個(gè) RDD 的 partition。drop 的時(shí)候先 drop 最早被 cache 的 partition。(說(shuō)好的 LRU 替換算法呢?)

          問(wèn)題:cached RDD 怎么被讀???

          下次計(jì)算(一般是同一 application 的下一個(gè) job 計(jì)算)時(shí)如果用到 cached RDD,task 會(huì)直接去 blockManager 的 memoryStore 中讀取。具體地講,當(dāng)要計(jì)算某個(gè) rdd 中的 partition 時(shí)候(通過(guò)調(diào)用 rdd.iterator())會(huì)先去 blockManager 里面查找是否已經(jīng)被 cache 了,如果 partition 被 cache 在本地,就直接使用 blockManager.getLocal() 去本地 memoryStore 里讀取。如果該 partition 被其他節(jié)點(diǎn)上 blockManager cache 了,會(huì)通過(guò) blockManager.getRemote() 去其他節(jié)點(diǎn)上讀取,讀取過(guò)程如下圖。

          獲取 cached partitions 的存儲(chǔ)位置:partition 被 cache 后所在節(jié)點(diǎn)上的 blockManager 會(huì)通知 driver 上的 blockMangerMasterActor 說(shuō)某 rdd 的 partition 已經(jīng)被我 cache 了,這個(gè)信息會(huì)存儲(chǔ)在 blockMangerMasterActor 的 blockLocations: HashMap中。等到 task 執(zhí)行需要 cached rdd 的時(shí)候,會(huì)調(diào)用 blockManagerMaster 的 getLocations(blockId) 去詢問(wèn)某 partition 的存儲(chǔ)位置,這個(gè)詢問(wèn)信息會(huì)發(fā)到 driver 那里,driver 查詢 blockLocations 獲得位置信息并將信息送回。

          讀取其他節(jié)點(diǎn)上的 cached partition:task 得到 cached partition 的位置信息后,將 GetBlock(blockId) 的請(qǐng)求通過(guò) connectionManager 發(fā)送到目標(biāo)節(jié)點(diǎn)。目標(biāo)節(jié)點(diǎn)收到請(qǐng)求后從本地 blockManager 那里的 memoryStore 讀取 cached partition,最后發(fā)送回來(lái)。

          Checkpoint

          問(wèn)題:哪些 RDD 需要 checkpoint?

          運(yùn)算時(shí)間很長(zhǎng)或運(yùn)算量太大才能得到的 RDD,computing chain 過(guò)長(zhǎng)或依賴其他 RDD 很多的 RDD。實(shí)際上,將 ShuffleMapTask 的輸出結(jié)果存放到本地磁盤(pán)也算是 checkpoint,只不過(guò)這個(gè) checkpoint 的主要目的是去 partition 輸出數(shù)據(jù)。

          問(wèn)題:什么時(shí)候 checkpoint?

          cache 機(jī)制是每計(jì)算出一個(gè)要 cache 的 partition 就直接將其 cache 到內(nèi)存了。但 checkpoint 沒(méi)有使用這種第一次計(jì)算得到就存儲(chǔ)的方法,而是等到 job 結(jié)束后另外啟動(dòng)專(zhuān)門(mén)的 job 去完成 checkpoint 。也就是說(shuō)需要 checkpoint 的 RDD 會(huì)被計(jì)算兩次。因此,在使用 rdd.checkpoint() 的時(shí)候,建議加上 rdd.cache(),這樣第二次運(yùn)行的 job 就不用再去計(jì)算該 rdd 了,直接讀取 cache 寫(xiě)磁盤(pán)。其實(shí) Spark 提供了 rdd.persist(StorageLevel.DISK_ONLY) 這樣的方法,相當(dāng)于 cache 到磁盤(pán)上,這樣可以做到 rdd 第一次被計(jì)算得到時(shí)就存儲(chǔ)到磁盤(pán)上,但這個(gè) persist 和 checkpoint 有很多不同,之后會(huì)討論。

          問(wèn)題:checkpoint 怎么實(shí)現(xiàn)?

          RDD 需要經(jīng)過(guò) [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ] 這幾個(gè)階段才能被 checkpoint。

          Initialized:首先 driver program 需要使用 rdd.checkpoint() 去設(shè)定哪些 rdd 需要 checkpoint,設(shè)定后,該 rdd 就接受 RDDCheckpointData 管理。用戶還要設(shè)定 checkpoint 的存儲(chǔ)路徑,一般在 HDFS 上。

          marked for checkpointing:初始化后,RDDCheckpointData 會(huì)將 rdd 標(biāo)記為 MarkedForCheckpoint。

          checkpointing in progress:每個(gè) job 運(yùn)行結(jié)束后會(huì)調(diào)用 finalRdd.doCheckpoint(),finalRdd 會(huì)順著 computing chain 回溯掃描,碰到要 checkpoint 的 RDD 就將其標(biāo)記為 CheckpointingInProgress,然后將寫(xiě)磁盤(pán)(比如寫(xiě) HDFS)需要的配置文件(如 core-site.xml 等)broadcast 到其他 worker 節(jié)點(diǎn)上的 blockManager。完成以后,啟動(dòng)一個(gè) job 來(lái)完成 checkpoint(使用 rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf)))。

          checkpointed:job 完成 checkpoint 后,將該 rdd 的 dependency 全部清掉,并設(shè)定該 rdd 狀態(tài)為 checkpointed。然后,為該 rdd 強(qiáng)加一個(gè)依賴,設(shè)置該 rdd 的 parent rdd 為 CheckpointRDD,該 CheckpointRDD 負(fù)責(zé)以后讀取在文件系統(tǒng)上的 checkpoint 文件,生成該 rdd 的 partition。

          有意思的是我在 driver program 里 checkpoint 了兩個(gè) rdd,結(jié)果只有一個(gè)(下面的 result)被 checkpoint 成功,pairs2 沒(méi)有被 checkpoint,也不知道是 bug 還是故意只 checkpoint 下游的 RDD:

          val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'), 
          (4, 'd'), (5, 'e'), (3, 'f'), (2, 'g'), (1, 'h'))
          val pairs1 = sc.parallelize(data1, 3)

          val data2 = Array[(Int, Char)]((1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'))
          val pairs2 = sc.parallelize(data2, 2)

          pairs2.checkpoint

          val result = pairs1.join(pairs2)
          result.checkpoint

          問(wèn)題:怎么讀取 checkpoint 過(guò)的 RDD?

          在 runJob() 的時(shí)候會(huì)先調(diào)用 finalRDD 的 partitions() 來(lái)確定最后會(huì)有多個(gè) task。rdd.partitions() 會(huì)去檢查(通過(guò) RDDCheckpointData 去檢查,因?yàn)樗?fù)責(zé)管理被 checkpoint 過(guò)的 rdd)該 rdd 是會(huì)否被 checkpoint 過(guò)了,如果該 rdd 已經(jīng)被 checkpoint 過(guò)了,直接返回該 rdd 的 partitions 也就是 Array[Partition]。

          當(dāng)調(diào)用 rdd.iterator() 去計(jì)算該 rdd 的 partition 的時(shí)候,會(huì)調(diào)用 computeOrReadCheckpoint(split: Partition) 去查看該 rdd 是否被 checkpoint 過(guò)了,如果是,就調(diào)用該 rdd 的 parent rdd 的 iterator() 也就是 CheckpointRDD.iterator(),CheckpointRDD 負(fù)責(zé)讀取文件系統(tǒng)上的文件,生成該 rdd 的 partition。這就解釋了為什么那么 trickly 地為 checkpointed rdd 添加一個(gè) parent CheckpointRDD。

          問(wèn)題:cache 與 checkpoint 的區(qū)別?

          關(guān)于這個(gè)問(wèn)題,Tathagata Das 有一段回答: There is a significant difference between cache and checkpoint. Cache materializes the RDD and keeps it in memory and/or disk(其實(shí)只有 memory). But the lineage(也就是 computing chain) of RDD (that is, seq of operations that generated the RDD) will be remembered, so that if there are node failures and parts of the cached RDDs are lost, they can be regenerated. However, checkpoint saves the RDD to an HDFS file and actually forgets the lineage completely. This is allows long lineages to be truncated and the data to be saved reliably in HDFS (which is naturally fault tolerant by replication).

          深入一點(diǎn)討論,rdd.persist(StorageLevel.DISK_ONLY) 與 checkpoint 也有區(qū)別。前者雖然可以將 RDD 的 partition 持久化到磁盤(pán),但該 partition 由 blockManager 管理。一旦 driver program 執(zhí)行結(jié)束,也就是 executor 所在進(jìn)程 CoarseGrainedExecutorBackend stop,blockManager 也會(huì) stop,被 cache 到磁盤(pán)上的 RDD 也會(huì)被清空(整個(gè) blockManager 使用的 local 文件夾被刪除)。而 checkpoint 將 RDD 持久化到 HDFS 或本地文件夾,如果不被手動(dòng) remove 掉(話說(shuō)怎么 remove checkpoint 過(guò)的 RDD?),是一直存在的,也就是說(shuō)可以被下一個(gè) driver program 使用,而 cached RDD 不能被其他 dirver program 使用。

          Discussion

          Hadoop MapReduce 在執(zhí)行 job 的時(shí)候,不停地做持久化,每個(gè) task 運(yùn)行結(jié)束做一次,每個(gè) job 運(yùn)行結(jié)束做一次(寫(xiě)到 HDFS)。在 task 運(yùn)行過(guò)程中也不停地在內(nèi)存和磁盤(pán)間 swap 來(lái) swap 去。可是諷刺的是,Hadoop 中的 task 太傻,中途出錯(cuò)需要完全重新運(yùn)行,比如 shuffle 了一半的數(shù)據(jù)存放到了磁盤(pán),下次重新運(yùn)行時(shí)仍然要重新 shuffle。Spark 好的一點(diǎn)在于盡量不去持久化,所以使用 pipeline,cache 等機(jī)制。用戶如果感覺(jué) job 可能會(huì)出錯(cuò)可以手動(dòng)去 checkpoint 一些 critical 的 RDD,job 如果出錯(cuò),下次運(yùn)行時(shí)直接從 checkpoint 中讀取數(shù)據(jù)。唯一不足的是,checkpoint 需要兩次運(yùn)行 job。

          貌似還沒(méi)有發(fā)現(xiàn)官方給出的 checkpoint 的例子,這里我寫(xiě)了一個(gè):

          package internals

          import org.apache.spark.SparkContext
          import org.apache.spark.SparkContext._
          import org.apache.spark.SparkConf

          object groupByKeyTest {

          def main(args: Array[String]) {
          val conf = new SparkConf().setAppName("GroupByKey").setMaster("local")
          val sc = new SparkContext(conf)
          sc.setCheckpointDir("/Users/xulijie/Documents/data/checkpoint")

          val data = Array[(Int, Char)]((1, 'a'), (2, 'b'),
          (3, 'c'), (4, 'd'),
          (5, 'e'), (3, 'f'),
          (2, 'g'), (1, 'h')
          )
          val pairs = sc.parallelize(data, 3)

          pairs.checkpoint
          pairs.count

          val result = pairs.groupByKey(2)

          result.foreachWith(i => i)((x, i) => println("[PartitionIndex " + i + "] " + x))

          println(result.toDebugString)
          }
          }
          不再需要ZooKeeper,Kafka 2.8獨(dú)立運(yùn)行!

          Flink在實(shí)時(shí)計(jì)算平臺(tái)和實(shí)時(shí)數(shù)倉(cāng)中的企業(yè)級(jí)應(yīng)用小結(jié)

          Flink性能調(diào)優(yōu)小小總結(jié)

          歡迎點(diǎn)贊+收藏+轉(zhuǎn)發(fā)朋友圈素質(zhì)三連

          文章不錯(cuò)?點(diǎn)個(gè)【在看】吧! 
          瀏覽 64
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  中文字第一页幕精品导航网站国产乱伦 | 亚洲无码日韩高清AV | 在线免费看片黄 | 欧美精品性视频 | 日逼天堂 |