<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spark Core——RDD何以替代Hadoop MapReduce?

          共 4843字,需瀏覽 10分鐘

           ·

          2020-09-09 21:11


          導(dǎo)讀

          繼續(xù)前期依次推文PySpark入門SQL DataFrame簡介的基礎(chǔ)上,今日對Spark中最重要的一個(gè)概念——RDD進(jìn)行介紹。雖然在Spark中,基于RDD的其他4大組件更為常用,但作為Spark core中的核心數(shù)據(jù)抽象,RDD是必須深刻理解的基礎(chǔ)概念。




          01 何為RDD

          RDD(Resilient Distributed Dataset),彈性分布式數(shù)據(jù)集,是Spark core中的核心數(shù)據(jù)抽象,其他4大組件都或多或少依賴于RDD。簡單理解,RDD就是一種特殊的數(shù)據(jù)結(jié)構(gòu),是為了適應(yīng)大數(shù)據(jù)分布式計(jì)算的特殊場景(此時(shí)傳統(tǒng)的數(shù)據(jù)集合無法滿足分布式、容錯(cuò)性等需求)而設(shè)計(jì)的一種數(shù)據(jù)形式,其三個(gè)核心關(guān)鍵詞是:

          • 彈性:主要包含4層含義:即數(shù)據(jù)大小可變、分區(qū)數(shù)可變、計(jì)算可容錯(cuò)、內(nèi)存硬盤存儲位置可變

          • 分布式:大數(shù)據(jù)一般都是分布式的,意味著多硬件依賴、多核心并行計(jì)算

          • 數(shù)據(jù)集:說明這是一組數(shù)據(jù)的集合,或者說數(shù)據(jù)結(jié)構(gòu)


          RDD在Spark中占據(jù)"core"的地位



          02 RDD為何快于MapReduce

          看一個(gè)人,可以看看他的對手;了解一個(gè)產(chǎn)品,也可以看看他的競品。Spark是為了解決Hadoop中 MapReduce計(jì)算框架效率低下而產(chǎn)生的大數(shù)據(jù)計(jì)算引擎,所以Spark起初的競爭對手就是MapReduce。


          MapReduce之所以計(jì)算效率低,主要原因在于每次計(jì)算都涉及從硬盤的數(shù)據(jù)讀寫問題,而Spark設(shè)計(jì)之初就考慮盡可能避免硬盤讀寫,所以Spark的第一大特點(diǎn)是數(shù)據(jù)優(yōu)先存儲于內(nèi)存中(除非內(nèi)存存儲不夠才放到硬盤中)。同時(shí),為了盡可能優(yōu)化RDD在內(nèi)存中的計(jì)算流程,Spark還引入了lazy特性。lazy特性其實(shí)質(zhì)就是直至"真正碰上事了"才計(jì)算,否則一直"推托下去",頗有不見兔子不撒鷹的味道。


          這實(shí)際上又涉及到了RDD的兩類算子:transformation和action,前者只是建立邏輯轉(zhuǎn)換流程,后者才真正落地執(zhí)行。transformation的結(jié)果是從一個(gè)RDD轉(zhuǎn)換到另一個(gè)RDD,而action則是從一個(gè)RDD轉(zhuǎn)換到一個(gè)非RDD,因此從執(zhí)行結(jié)果是否仍然是RDD也可推斷出該操作是transformation抑或action。進(jìn)一步地,在transformation過程中,Spark內(nèi)部調(diào)度RDD的計(jì)算過程是一個(gè)有向無環(huán)圖(Directed Acyclic Graph,DAG ),意味著所有RDD的轉(zhuǎn)換都帶有方向性(一個(gè)產(chǎn)生另一個(gè),即血緣關(guān)系),且不存在循環(huán)依賴的,這對Spark的容錯(cuò)性帶來了有效保證:當(dāng)一個(gè)環(huán)節(jié)出現(xiàn)問題時(shí)僅需按照方向關(guān)系追溯到相應(yīng)的父RDD即可,而無需從頭開始全流程計(jì)算。


          Spark中關(guān)于寬窄依賴的經(jīng)典圖例(圖片選自網(wǎng)絡(luò))


          上圖給出了寬窄依賴的一個(gè)圖例。實(shí)際上,這里的寬窄依賴是針對RDD的每個(gè)partition而言的,分析子RDD的每個(gè)partition來源就容易理解其依賴為寬或窄:

          • 窄依賴:子RDD和父RDD中的各partition是一一對應(yīng)關(guān)系,由于僅單個(gè)依賴,所以是窄的,也無需等待其他父RDD中的partition

          • 寬依賴:子RDD和父RDD中partition存在一對多的關(guān)系,也就是說生成子RDD中的某個(gè)partition不僅需要這個(gè)父RDD中的一個(gè)partition,還需要其他partition或其他父RDD的partition,由于依賴多個(gè)partition,所以是寬的,在實(shí)際執(zhí)行過程中要等到所有partition就位后方可執(zhí)行


          也正因如此,對于整個(gè)DAG而言,依據(jù)依賴類型可將Spark執(zhí)行過程劃分為多個(gè)階段,同一階段內(nèi)部Spark還會(huì)進(jìn)行相應(yīng)的調(diào)度和優(yōu)化??梢哉f,內(nèi)存計(jì)算+DAG兩大特性共同保證了Spark執(zhí)行的高效性。



          03 RDD創(chuàng)建

          RDD的創(chuàng)建主要有3類形式:

          • 從Python中的其他數(shù)據(jù)結(jié)構(gòu)創(chuàng)建,用到的方法為parallelize(),接收一個(gè)本地Python集合對象,返回一個(gè)RDD對象,一般適用于較小的數(shù)據(jù)集

          • 從本地或HDFS文件中創(chuàng)建RDD對象,適用于大數(shù)據(jù)集,也是生產(chǎn)部署中較為常用的方式

          • 從一個(gè)已有RDD中生成另一個(gè)RDD,所有transformation類算子其實(shí)都是執(zhí)行這一過程

          from?pyspark?import?SparkContext??#?SparkContext是spark?core的入口
          sc?=?SparkContext()??#?sc是一個(gè)單例
          rdd1?=?sc.parallelize(['Tom',?'John',?'Joy'])??#?從本地已有Python集合創(chuàng)建
          rdd2?=?sc.textFile('test.txt')??#?從本地文件序列化一個(gè)RDD
          rdd3?=?rdd1.map(lambda?x:(x,?1))??#?從一個(gè)RDD轉(zhuǎn)換為另一個(gè)RDD


          需要指出的是,RDD作為分布式的數(shù)據(jù)集合,其本身是不可變對象(immutable),所以所有的transformation算子都是從一個(gè)RDD轉(zhuǎn)換生成了一個(gè)新的RDD,這也印證了DAG中無環(huán)的概念。

          至于說轉(zhuǎn)換過程中仍然可以使用相同的變量名,這是由Python的特性所決定的,類似于字符串是不可變數(shù)據(jù)類型,但也可以由一個(gè)字符串生成另一個(gè)同名字符串一樣。



          04 三類算子

          Spark中的算子,其實(shí)就是一類操作,或者更具體說是一個(gè)函數(shù)!

          前面提到,Spark在執(zhí)行過程中,依據(jù)從一個(gè)RDD是生成另一個(gè)RDD還是其他數(shù)據(jù)類型,可將操作分為兩類:transformation和action。這實(shí)際上也是最為常用的RDD操作,甚至說Spark core編程模式就是先經(jīng)歷一系列的transformation,然后在action提取相應(yīng)的結(jié)果。


          然而,在系列transformation過程中,由于其lazy特性,當(dāng)且僅當(dāng)遇到action操作時(shí)才真正從頭至尾的完整執(zhí)行,所以就不得不面對一個(gè)問題:假如有RDD6是由前面系列的RDD1-5轉(zhuǎn)換生成,而RDD6既是RDD7的父RDD,也是RDD8的父RDD,所以在獨(dú)立執(zhí)行RDD7和RDD8時(shí),實(shí)際上會(huì)將RDD1=>RDD6的轉(zhuǎn)換操作執(zhí)行兩遍,存在資源和效率上的浪費(fèi)。當(dāng)存在2遍計(jì)算重復(fù)或許尚可接受,但若存在更多重復(fù)轉(zhuǎn)換時(shí),這種模式或許不是一個(gè)明智之舉,為此Spark還為RDD設(shè)計(jì)了第三類算子:持久化操作persistence。


          至此,RDD的三類常用算子介紹如下:

          1. transformation算子

          • map,接收一個(gè)函數(shù)作為參數(shù),實(shí)現(xiàn)將RDD中的每個(gè)元素一對一映射生成另一個(gè)RDD,其實(shí)與Python中的原生map函數(shù)功能類似

          • filter,接收一個(gè)函數(shù)作為參數(shù),實(shí)現(xiàn)將RDD中每個(gè)元素判斷條件是否滿足,進(jìn)行執(zhí)行過濾,與Python中的原生filter函數(shù)類似

          • flatMap,實(shí)際上包含了兩個(gè)步驟,首先執(zhí)行map功能,將RDD中的每個(gè)元素執(zhí)行一個(gè)映射轉(zhuǎn)換,當(dāng)轉(zhuǎn)換結(jié)果是多個(gè)元素時(shí)(例如轉(zhuǎn)換為列表),再將其各個(gè)元素展平,實(shí)現(xiàn)一對多映射

          • groupByKey,適用于RDD中每個(gè)元素是一個(gè)包含兩個(gè)元素的元組格式,例如(key, value)形式,進(jìn)而將相同key對應(yīng)的value構(gòu)成一個(gè)特殊的集合對象,實(shí)質(zhì)與SQL或者pandas中g(shù)roupby操作類似,一般還需與其他聚合函數(shù)配合操作

          • reduceByKey,實(shí)際上groupByKey只執(zhí)行了一半的聚合動(dòng)作,即只有"聚"的過程,而缺少實(shí)質(zhì)性的"合"的操作。reduceByKey則是在groupby之后加入了reduce的函數(shù),實(shí)現(xiàn)真正聚合。換句話說,reduceByKey = groupByKey + aggFunction

          • sortByKey,也比較簡單,即根據(jù)key值進(jìn)行排序的過程


          另外,針對以上函數(shù)還有一些功能相近的函數(shù),不再列出。


          2. action算子

          action算子Spark中真正執(zhí)行的操作,當(dāng)一個(gè)算子的執(zhí)行結(jié)果不再是RDD時(shí),那么它就是一個(gè)action算子,此時(shí)Spark意識到不能再簡單的進(jìn)行邏輯運(yùn)算標(biāo)記,而需要實(shí)質(zhì)性的執(zhí)行計(jì)算。常用的action算子包括如下:

          • collect,可能是日常功能調(diào)試中最為常用的算子,用于將RDD實(shí)際執(zhí)行并返回所有元素的列表格式,在功能調(diào)試或者數(shù)據(jù)集較小時(shí)較為常用,若是面對大數(shù)據(jù)集或者線上部署時(shí)切忌使用,因?yàn)橛锌赡茉斐蓛?nèi)存溢出

          • take,接收整數(shù)n,返回特定記錄條數(shù)

          • first,返回第一條記錄,相當(dāng)于take(1)

          • count,返回RDD記錄條數(shù)

          • reduce,對RDD的所有元素執(zhí)行聚合操作,與Python中的原生reduce功能類似,返回一個(gè)標(biāo)量

          • foreach,對RDD中每個(gè)元素執(zhí)行特定的操作,功能上類似map,但會(huì)實(shí)際執(zhí)行并返回結(jié)果


          3. persistence算子

          持久化的目的是為了短期內(nèi)將某一RDD存儲于內(nèi)存或硬盤中,使其可復(fù)用。主要操作有兩類:

          • persist,接收參數(shù)可以指定持久化級別,例如MEMORY_ONLY和MEMORY_AND_DISK,其中前者表示僅存儲于內(nèi)存中;后者表示優(yōu)先放于內(nèi)存,內(nèi)存不足再放硬盤中

          • cache,緩存,即僅將RDD存于內(nèi)存中,相當(dāng)于持久化級別為MEMORY_ONLY的persist操作

          另外,還有checkpoint也屬于持久化操作。對于一個(gè)已經(jīng)持久化的對象,當(dāng)無需繼續(xù)使用時(shí),可使用unpersist完成取消持久化。


          需知,持久化操作是為了便于多次重復(fù)調(diào)用同一RDD時(shí),防止發(fā)生重復(fù)計(jì)算而設(shè)計(jì)的操作,但其本身仍然是偏lazy的模式,即執(zhí)行了persist或者cache操作后,僅僅是將其標(biāo)記為需要持久化,而直至第一次遇到action觸發(fā)其執(zhí)行時(shí)才會(huì)真正的完成持久化。


          最后,舉一個(gè)Spark中hello world級別的WordCount例子,實(shí)戰(zhàn)一下各類算子的應(yīng)用:

          texts?=?['this?is?spark',?'this?is?RDD']
          rdd?=?sc.parallelize(texts)??#?從已有集合創(chuàng)建RDD對象
          #?rdd?=?['this?is?spark',?'this?is?RDD']
          rdd1?=?rdd.flatMap(lambda?x:x.split('?'))??#?flatMap將原來的句子用空格分割,并展平至單個(gè)詞
          #?rdd1?=?['this',?'is',?'spark',?'this',?'is',?'RDD']
          rdd2?=?rdd1.map(lambda?x:(x,?1))??#?將每個(gè)單詞映射為(單詞,1)的(key?value)對象格式
          #?rdd2?=?[('this',?1),?('is',?1),?('spark',?1),?('this',?1),?('is',?1),?('RDD',?1)]
          rdd3?=?rdd2.reduceByKey(lambda?a,?b:a+b)??#?依據(jù)單詞相同進(jìn)行聚合
          #?rdd3?=?[('spark',?1),?('RDD',?1),?('this',?2),?('is',?2)]
          rdd3.collect()??#?遇到action算子,將上述rdd=>rdd1=>rdd2=>rdd3有向無環(huán)圖真正執(zhí)行,并返回列表








          相關(guān)閱讀:


          瀏覽 58
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日日骚AV | 久久久久久高清毛片一级 | 九九国产夫妻自拍 | 日本成人少妇 | 五十路熟女影视在线看 |