<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RDD編程,熟悉算子,讀寫(xiě)文件

          共 10086字,需瀏覽 21分鐘

           ·

          2021-02-22 07:40

          小編推薦

          來(lái)源:子雨大數(shù)據(jù)

          http://dblab.xmu.edu.cn/blog





          內(nèi)容:RDD編程,熟悉算子,讀寫(xiě)文件


          0 準(zhǔn)備工作

          1.啟動(dòng) pyspark

          from pyspark import SparkContextsc = SparkContext( 'local', 'test')

          2.目錄下創(chuàng)建 word.txt 和 person.json,內(nèi)容分別為

          面 第 一 行 首 先 從 外 部 文 件 d a t a . t x t 中 構(gòu) 建 得 到 一 個(gè) R D D ,名 稱 為 l i n e s 


          {"name":"Michael"}{"name":"Andy", "age":30}{"name":"Justin", "age":19}


          第3章 Spark編程基礎(chǔ)

          3.1 ?Spark入門:RDD編程
          http://dblab.xmu.edu.cn/blog/1700-2/

          (1)?從文件系統(tǒng)中加載數(shù)據(jù)創(chuàng)建RDD


          textFile = sc.textFile("E:/pythonWp/sparkWP/wordCount/word.txt")wordCount = textFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a, b : a + b)wordCount.collect()

          運(yùn)行效果:

          [('This', 1), ('is', 1), .... ('and', 1), ('python', 1)]


          (2)?通過(guò)并行集合(數(shù)組)創(chuàng)建RDD

          nums = [1,2,3,4,5]rdd = sc.parallelize(nums)# 上面使用列表來(lái)創(chuàng)建。在Python中并沒(méi)有數(shù)組這個(gè)基本數(shù)據(jù)類型,# 為了便于理解,你可以把列表當(dāng)成其他語(yǔ)言的數(shù)組。

          (3)?RDD操作

          RDD被創(chuàng)建好以后,在后續(xù)使用過(guò)程中一般會(huì)發(fā)生兩種操作:

          • 轉(zhuǎn)換(Transformation): 基于現(xiàn)有的數(shù)據(jù)集創(chuàng)建一個(gè)新的數(shù)據(jù)集。

          • 行動(dòng)(Action):在數(shù)據(jù)集上進(jìn)行運(yùn)算,返回計(jì)算值。


          1)??轉(zhuǎn)換操作

          對(duì)于RDD而言,每一次轉(zhuǎn)換操作都會(huì)產(chǎn)生不同的RDD,供給下一個(gè)“轉(zhuǎn)換”使用。轉(zhuǎn)換得到的RDD是惰性求值的,也就是說(shuō),整個(gè)轉(zhuǎn)換過(guò)程只是記錄了轉(zhuǎn)換的軌跡,并不會(huì)發(fā)生真正的計(jì)算,只有遇到行動(dòng)操作時(shí),才會(huì)發(fā)生真正的計(jì)算,開(kāi)始從血緣關(guān)系源頭開(kāi)始,進(jìn)行物理的轉(zhuǎn)換操作。

          下面列出一些常見(jiàn)的轉(zhuǎn)換操作(Transformation API):

          • filter(func):篩選出滿足函數(shù)func的元素,并返回一個(gè)新的數(shù)據(jù)集

          • map(func):將每個(gè)元素傳遞到函數(shù)func中,并將結(jié)果返回為一個(gè)新的數(shù)據(jù)集

          • flatMap(func):與map()相似,但每個(gè)輸入元素都可以映射到0或多個(gè)輸出結(jié)果

          • groupByKey():應(yīng)用于(K,V)鍵值對(duì)的數(shù)據(jù)集時(shí),返回一個(gè)新的(K, Iterable)形式的數(shù)據(jù)集

          • reduceByKey(func):應(yīng)用于(K,V)鍵值對(duì)的數(shù)據(jù)集時(shí),返回一個(gè)新的(K, V)形式的數(shù)據(jù)集,其中的每個(gè)值是將每個(gè)key傳遞到函數(shù)func中進(jìn)行聚合


          2)??行動(dòng)操作

          行動(dòng)操作是真正觸發(fā)計(jì)算的地方。Spark程序執(zhí)行到行動(dòng)操作時(shí),才會(huì)執(zhí)行真正的計(jì)算,從文件中加載數(shù)據(jù),完成一次又一次轉(zhuǎn)換操作,最終,完成行動(dòng)操作得到結(jié)果。 下面列出一些常見(jiàn)的行動(dòng)操作(Action API):

          • count() 返回?cái)?shù)據(jù)集中的元素個(gè)數(shù)

          • collect() 以數(shù)組的形式返回?cái)?shù)據(jù)集中的所有元素

          • first() 返回?cái)?shù)據(jù)集中的第一個(gè)元素

          • take(n) 以數(shù)組的形式返回?cái)?shù)據(jù)集中的前n個(gè)元素

          • reduce(func) 通過(guò)函數(shù)func(輸入兩個(gè)參數(shù)并返回一個(gè)值)聚合數(shù)據(jù)集中的元素

          • foreach(func) 將數(shù)據(jù)集中的每個(gè)元素傳遞到函數(shù)func中運(yùn)行*


          (3)?惰性機(jī)制

          這里給出一段簡(jiǎn)單的代碼來(lái)解釋Spark的惰性機(jī)制。

          lines = sc.textFile("word.txt")print("lines.first():{0}".format(lines.first()))lineLengths = lines.map(lambda s : len(s))totalLength = lineLengths.reduce( lambda a, b : a + b)print("lineLengths:{0}".format(lineLengths))print("totalLength:{0}".format(totalLength))


          上面第一行首先從外部文件data.txt中構(gòu)建得到一個(gè)RDD,名稱為lines,但是,由于textFile()方法只是一個(gè)轉(zhuǎn)換操作,因此,這行代碼執(zhí)行后,不會(huì)立即把data.txt文件加載到內(nèi)存中,這時(shí)的lines只是一個(gè)指向這個(gè)文件的指針。

          第二行代碼用來(lái)計(jì)算每行的長(zhǎng)度(即每行包含多少個(gè)單詞),同樣,由于map()方法只是一個(gè)轉(zhuǎn)換操作,這行代碼執(zhí)行后,不會(huì)立即計(jì)算每行的長(zhǎng)度。

          第三行代碼的reduce()方法是一個(gè)“動(dòng)作”類型的操作,這時(shí),就會(huì)觸發(fā)真正的計(jì)算。這時(shí),Spark會(huì)把計(jì)算分解成多個(gè)任務(wù)在不同的機(jī)器上執(zhí)行,每臺(tái)機(jī)器運(yùn)行位于屬于它自己的map和reduce,最后把結(jié)果返回給Driver Program。


          (4)?持久化

          前面我們已經(jīng)說(shuō)過(guò),在Spark中,RDD采用惰性求值的機(jī)制,每次遇到行動(dòng)操作,都會(huì)從頭開(kāi)始執(zhí)行計(jì)算。如果整個(gè)Spark程序中只有一次行動(dòng)操作,這當(dāng)然不會(huì)有什么問(wèn)題。但是,在一些情形下,我們需要多次調(diào)用不同的行動(dòng)操作,這就意味著,每次調(diào)用行動(dòng)操作,都會(huì)觸發(fā)一次從頭開(kāi)始的計(jì)算。這對(duì)于迭代計(jì)算而言,代價(jià)是很大的,迭代計(jì)算經(jīng)常需要多次重復(fù)使用同一組數(shù)據(jù)。

          比如,下面就是多次計(jì)算同一個(gè)DD的例子:

          list = ["Hadoop","Spark","Hive"]rdd = sc.parallelize(list)print(rdd.count()) #行動(dòng)操作,觸發(fā)一次真正從頭到尾的計(jì)算print(','.join(rdd.collect())) #行動(dòng)操作,觸發(fā)一次真正從頭到尾的計(jì)算

          結(jié)果

          3Hadoop,Spark,Hive

          (5)?分區(qū)

          RDD是彈性分布式數(shù)據(jù)集,通常RDD很大,會(huì)被分成很多個(gè)分區(qū),分別保存在不同的節(jié)點(diǎn)上。RDD分區(qū)的一個(gè)分區(qū)原則是使得分區(qū)的個(gè)數(shù)盡量等于集群中的CPU核心(core)數(shù)目。

          對(duì)于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通過(guò)設(shè)置spark.default.parallelism這個(gè)參數(shù)的值,來(lái)配置默認(rèn)的分區(qū)數(shù)目,一般而言: *本地模式:默認(rèn)為本地機(jī)器的CPU數(shù)目,若設(shè)置了local[N],則默認(rèn)為N; *Apache Mesos:默認(rèn)的分區(qū)數(shù)為8; *Standalone或YARN:在“集群中所有CPU核心數(shù)目總和”和“2”二者中取較大值作為默認(rèn)值;

          因此,對(duì)于parallelize而言,如果沒(méi)有在方法中指定分區(qū)數(shù),則默認(rèn)為spark.default.parallelism,比如:


          array = [1,2,3,4,5]rdd = sc.parallelize(array,2) #設(shè)置兩個(gè)分區(qū)

          對(duì)于textFile而言,如果沒(méi)有在方法中指定分區(qū)數(shù),則默認(rèn)為min(defaultParallelism,2),其中,defaultParallelism對(duì)應(yīng)的就是spark.default.parallelism。

          如果是從HDFS中讀取文件,則分區(qū)數(shù)為文件分片數(shù)(比如,128MB/片)。


          3.2 Spark入門:鍵值對(duì)RDD
          http://dblab.xmu.edu.cn/blog/1706-2/

          (1)鍵值對(duì)RDD的創(chuàng)建

          第一種創(chuàng)建方式:從文件中加載

          lines = sc.textFile("word.txt")pairRDD = lines.flatMap(lambda line : line.split()).map(lambda word : (word,1))pairRDD.foreach(print)pairRDD.first()print (pairRDD.collect())

          運(yùn)行結(jié)果

          [('面', 1), ('第', 1), ('一', 1), ('行', 1), ('首', 1), ('先', 1), ('從', 1), ('外', 1), ('部', 1), ('文', 1), ('件', 1), ('d', 1), ('a', 1), ('t', 1), ('a', 1), ('.', 1), ('t', 1), ('x', 1), ('t', 1), ('中', 1), ('構(gòu)', 1), ('建', 1), ('得', 1), ('到', 1), ('一', 1), ('個(gè)', 1), ('R', 1), ('D', 1), ('D', 1), (',', 1), ('名', 1), ('稱', 1), ('為', 1), ('l', 1), ('i', 1), ('n', 1), ('e', 1), ('s', 1)]


          第二種創(chuàng)建方式:通過(guò)并行集合(列表)創(chuàng)建RDD

          list = ["Hadoop","Spark","Hive","Spark"]rdd = sc.parallelize(list)airRDD = rdd.map(lambda word : (word,1))print (airRDD.collect())

          運(yùn)行結(jié)果

          [('Hadoop',?1),?('Spark',?1),?('Hive',?1),?('Spark',?1)]


          (2)?常用的鍵值對(duì)轉(zhuǎn)換操作

          常用的鍵值對(duì)轉(zhuǎn)換操作包括reduceByKey()、groupByKey()、sortByKey()、join()、cogroup()等,下面我們通過(guò)實(shí)例來(lái)介紹。

          1.reduceByKey(func)

          reduceByKey(func)的功能是,使用func函數(shù)合并具有相同鍵的值。比如,reduceByKey((a,b) => a+b),有四個(gè)鍵值對(duì)(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),對(duì)具有相同key的鍵值對(duì)進(jìn)行合并后的結(jié)果就是:(“spark”,3)、(“hadoop”,8)??梢钥闯?,(a,b) => a+b這個(gè)Lamda表達(dá)式中,a和b都是指value,比如,對(duì)于兩個(gè)具有相同key的鍵值對(duì)(“spark”,1)、(“spark”,2),a就是1,b就是2。

          我們對(duì)上面第二種方式創(chuàng)建得到的pairRDD進(jìn)行reduceByKey()操作,代碼如下:

          airRDD = airRDD.reduceByKey(lambda a,b : a+b)print (airRDD.collect())

          [('Hadoop',?1),?('Spark',?2),?('Hive',?1)]

          2.groupByKey()

          groupByKey()的功能是,對(duì)具有相同鍵的值進(jìn)行分組。比如,對(duì)四個(gè)鍵值對(duì)(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),采用groupByKey()后得到的結(jié)果是:(“spark”,(1,2))和(“hadoop”,(3,5))。 我們對(duì)上面第二種方式創(chuàng)建得到的pairRDD進(jìn)行g(shù)roupByKey()操作,代碼如下:

          groupRDD = pairRDD.groupByKey()print (groupRDD.collect())

          3.keys()

          keys()只會(huì)把鍵值對(duì)RDD中的key返回形成一個(gè)新的RDD。比如,對(duì)四個(gè)鍵值對(duì)(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)構(gòu)成的RDD,采用keys()后得到的結(jié)果是一個(gè)RDD[Int],內(nèi)容是{“spark”,”spark”,”hadoop”,”hadoop”}。

          我們對(duì)上面第二種方式創(chuàng)建得到的pairRDD進(jìn)行keys操作,代碼如下:

          keyRDD = pairRDD.keys()print (keyRDD.collect())

          ['面', '第', '一', '行', '首', '先', '從', '外', '部', '文', '件', 'd', 'a', 't', 'a', '.', 't', 'x', 't', '中', '構(gòu)', '建', '得', '到', '一', '個(gè)', 'R', 'D', 'D', ',', '名', '稱', '為', 'l', 'i', 'n', 'e', 's']

          4.values()

          values()只會(huì)把鍵值對(duì)RDD中的value返回形成一個(gè)新的RDD。比如,對(duì)四個(gè)鍵值對(duì)(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)構(gòu)成的RDD,采用values()后得到的結(jié)果是一個(gè)RDD[Int],內(nèi)容是{1,2,3,5}。

          我們對(duì)上面第二種方式創(chuàng)建得到的pairRDD進(jìn)行values()操作,代碼如下:

          valuesRDD = pairRDD.values()print (valuesRDD.collect())
          [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

          5.sortByKey()

          sortByKey()的功能是返回一個(gè)根據(jù)鍵排序的RDD。

          我們對(duì)上面第二種方式創(chuàng)建得到的pairRDD進(jìn)行keys操作,代碼如下:

          sortByKeyRDD = pairRDD.sortByKey()print (sortByKeyRDD.collect())

          [('.', 1), ('D', 1), ('D', 1), ('R', 1), ('a', 1), ('a', 1), ('d', 1), ('e', 1), ('i', 1), ('l', 1), ('n', 1), ('s', 1), ('t', 1), ('t', 1), ('t', 1), ('x', 1), ('一', 1), ('一', 1), ('個(gè)', 1), ('中', 1), ('為', 1), ('從', 1), ('件', 1), ('先', 1), ('到', 1), ('名', 1), ('外', 1), ('建', 1), ('得', 1), ('文', 1), ('構(gòu)', 1), ('稱', 1), ('第', 1), ('行', 1), ('部', 1), ('面', 1), ('首', 1), (',', 1)]

          6.mapValues(func)

          我們經(jīng)常會(huì)遇到一種情形,我們只想對(duì)鍵值對(duì)RDD的value部分進(jìn)行處理,而不是同時(shí)對(duì)key和value進(jìn)行處理。對(duì)于這種情形,Spark提供了mapValues(func),它的功能是,對(duì)鍵值對(duì)RDD中的每個(gè)value都應(yīng)用一個(gè)函數(shù),但是,key不會(huì)發(fā)生變化。比如,對(duì)四個(gè)鍵值對(duì)(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)構(gòu)成的pairRDD,如果執(zhí)行pairRDD.mapValues(lambda x : x+1),就會(huì)得到一個(gè)新的鍵值對(duì)RDD,它包含下面四個(gè)鍵值對(duì)(“spark”,2)、(“spark”,3)、(“hadoop”,4)和(“hadoop”,6)。

          我們對(duì)上面第二種方式創(chuàng)建得到的pairRDD進(jìn)行keys操作,代碼如下:

          mapValuesRDD = pairRDD.mapValues(lambda x : x+1)print (mapValuesRDD.collect())

          [('面', 2), ('第', 2), ('一', 2), ('行', 2), ('首', 2), ('先', 2), ('從', 2), ('外', 2), ('部', 2), ('文', 2), ('件', 2), ('d', 2), ('a', 2), ('t', 2), ('a', 2), ('.', 2), ('t', 2), ('x', 2), ('t', 2), ('中', 2), ('構(gòu)', 2), ('建', 2), ('得', 2), ('到', 2), ('一', 2), ('個(gè)', 2), ('R', 2), ('D', 2), ('D', 2), (',', 2), ('名', 2), ('稱', 2), ('為', 2), ('l', 2), ('i', 2), ('n', 2), ('e', 2), ('s', 2)]

          7.join

          join(連接)操作是鍵值對(duì)常用的操作?!斑B接”(join)這個(gè)概念來(lái)自于關(guān)系數(shù)據(jù)庫(kù)領(lǐng)域,因此,join的類型也和關(guān)系數(shù)據(jù)庫(kù)中的join一樣,包括內(nèi)連接(join)、左外連接(leftOuterJoin)、右外連接(rightOuterJoin)等。最常用的情形是內(nèi)連接,所以,join就表示內(nèi)連接。

          對(duì)于內(nèi)連接,對(duì)于給定的兩個(gè)輸入數(shù)據(jù)集(K,V1)和(K,V2),只有在兩個(gè)數(shù)據(jù)集中都存在的key才會(huì)被輸出,最終得到一個(gè)(K,(V1,V2))類型的數(shù)據(jù)集。

          比如,pairRDD1是一個(gè)鍵值對(duì)集合{(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)},pairRDD2是一個(gè)鍵值對(duì)集合{(“spark”,”fast”)},那么,pairRDD1.join(pairRDD2)的結(jié)果就是一個(gè)新的RDD,這個(gè)新的RDD是鍵值對(duì)集合{(“spark”,1,”fast”),(“spark”,2,”fast”)}。對(duì)于這個(gè)實(shí)例,我們下面在pyspark中運(yùn)行一下:

          pairRDD1 = sc.parallelize([('spark',1),('spark',2),('hadoop',3),('hadoop',5)])pairRDD2 = sc.parallelize([('spark','fast')])pairRDD3 = pairRDD1.join(pairRDD2)print (pairRDD3.collect())

          [('spark', (1, 'fast')), ('spark', (2, 'fast'))]


          3.3 Spark入門:共享變量
          http://dblab.xmu.edu.cn/blog/1707-2/

          Spark中的兩個(gè)重要抽象是RDD和共享變量。上一章我們已經(jīng)介紹了RDD,這里介紹共享變量。

          在默認(rèn)情況下,當(dāng)Spark在集群的多個(gè)不同節(jié)點(diǎn)的多個(gè)任務(wù)上并行運(yùn)行一個(gè)函數(shù)時(shí),它會(huì)把函數(shù)中涉及到的每個(gè)變量,在每個(gè)任務(wù)上都生成一個(gè)副本。但是,有時(shí)候,需要在多個(gè)任務(wù)之間共享變量,或者在任務(wù)(Task)和任務(wù)控制節(jié)點(diǎn)(Driver Program)之間共享變量。為了滿足這種需求,Spark提供了兩種類型的變量:廣播變量(broadcast variables)和累加器(accumulators)。廣播變量用來(lái)把變量在所有節(jié)點(diǎn)的內(nèi)存之間進(jìn)行共享。累加器則支持在所有不同節(jié)點(diǎn)之間進(jìn)行累加計(jì)算(比如計(jì)數(shù)或者求和)。


          廣播變量

          廣播變量(broadcast variables)允許程序開(kāi)發(fā)人員在每個(gè)機(jī)器上緩存一個(gè)只讀的變量,而不是為機(jī)器上的每個(gè)任務(wù)都生成一個(gè)副本。通過(guò)這種方式,就可以非常高效地給每個(gè)節(jié)點(diǎn)(機(jī)器)提供一個(gè)大的輸入數(shù)據(jù)集的副本。Spark的“動(dòng)作”操作會(huì)跨越多個(gè)階段(stage),對(duì)于每個(gè)階段內(nèi)的所有任務(wù)所需要的公共數(shù)據(jù),Spark都會(huì)自動(dòng)進(jìn)行廣播。通過(guò)廣播方式進(jìn)行傳播的變量,會(huì)經(jīng)過(guò)序列化,然后在被任務(wù)使用時(shí)再進(jìn)行反序列化。這就意味著,顯式地創(chuàng)建廣播變量只有在下面的情形中是有用的:當(dāng)跨越多個(gè)階段的那些任務(wù)需要相同的數(shù)據(jù),或者當(dāng)以反序列化方式對(duì)數(shù)據(jù)進(jìn)行緩存是非常重要的。

          可以通過(guò)調(diào)用SparkContext.broadcast(v)來(lái)從一個(gè)普通變量v中創(chuàng)建一個(gè)廣播變量。這個(gè)廣播變量就是對(duì)普通變量v的一個(gè)包裝器,通過(guò)調(diào)用value方法就可以獲得這個(gè)廣播變量的值,具體代碼如下:

          broadcastVar = sc.broadcast([1, 2, 3])broadcastVar.value
          [1, 2, 3]

          這個(gè)廣播變量被創(chuàng)建以后,那么在集群中的任何函數(shù)中,都應(yīng)該使用廣播變量broadcastVar的值,而不是使用v的值,這樣就不會(huì)把v重復(fù)分發(fā)到這些節(jié)點(diǎn)上。此外,一旦廣播變量創(chuàng)建后,普通變量v的值就不能再發(fā)生修改,從而確保所有節(jié)點(diǎn)都獲得這個(gè)廣播變量的相同的值。

          累加器

          累加器是僅僅被相關(guān)操作累加的變量,通??梢员挥脕?lái)實(shí)現(xiàn)計(jì)數(shù)器(counter)和求和(sum)。Spark原生地支持?jǐn)?shù)值型(numeric)的累加器,程序開(kāi)發(fā)人員可以編寫(xiě)對(duì)新類型的支持。如果創(chuàng)建累加器時(shí)指定了名字,則可以在Spark UI界面看到,這有利于理解每個(gè)執(zhí)行階段的進(jìn)程。

          一個(gè)數(shù)值型的累加器,可以通過(guò)調(diào)用SparkContext.accumulator()來(lái)創(chuàng)建。運(yùn)行在集群中的任務(wù),就可以使用add方法來(lái)把數(shù)值累加到累加器上,但是,這些任務(wù)只能做累加操作,不能讀取累加器的值,只有任務(wù)控制節(jié)點(diǎn)(Driver Program)可以使用value方法來(lái)讀取累加器的值。

          下面是一個(gè)代碼實(shí)例,演示了使用累加器來(lái)對(duì)一個(gè)數(shù)組中的元素進(jìn)行求和:

          accum = sc.accumulator(0)sc.parallelize([1, 2, 3, 4]).foreach(lambda x : accum.add(x))accum.value
          10


          3.4 數(shù)據(jù)讀寫(xiě)
          3.4.1 Spark入門:文件數(shù)據(jù)讀寫(xiě)
          http://dblab.xmu.edu.cn/blog/1708-2/

          不同文件格式的文件系統(tǒng)的數(shù)據(jù)讀寫(xiě)

          下面分別介紹本地文件系統(tǒng)的數(shù)據(jù)讀寫(xiě)和分布式文件系統(tǒng)HDFS的數(shù)據(jù)讀寫(xiě)。


          本地文件文件的數(shù)據(jù)讀寫(xiě)

          textFile = sc.textFile("word.txt")textFile.first()


          '面 第 一 行 首 先 從 外 部 文 件 d a t a . t x t 中 構(gòu) 建 得 到 一 個(gè) R D D ,'


          本地 JSON 的數(shù)據(jù)讀寫(xiě)


          jsonStr?=?sc.textFile("people.json")jsonStr.collect()

          ['{"name":"Michael"}', '{"name":"Andy", "age":30}', '{"name":"Justin", "age":19}']


          從上面執(zhí)行結(jié)果可以看出,people.json文件加載到RDD中以后,在RDD中存在三個(gè)字符串。我們下面要做的事情,就是把這三個(gè)JSON格式的字符串解析出來(lái),比如說(shuō),第一個(gè)字符串{“name”:”Michael”},經(jīng)過(guò)解析后,解析得到key是”name”,value是”Michael”。

          現(xiàn)在我們編寫(xiě)程序完成對(duì)上面字符串的解析工作。

          Scala中有一個(gè)自帶的JSON庫(kù)——scala.util.parsing.json.JSON,可以實(shí)現(xiàn)對(duì)JSON數(shù)據(jù)的解析。JSON.parseFull(jsonString:String)函數(shù),以一個(gè)JSON字符串作為輸入并進(jìn)行解析,如果解析成功則返回一個(gè)Some(map: Map[String, Any]),如果解析失敗則返回None。

          因此,我們可以使用模式匹配來(lái)處理解析結(jié)果

          請(qǐng)執(zhí)行以下命令:


          import json
          inputFile = "people.json"jsonStrs = sc.textFile(inputFile)result = jsonStrs.map(lambda s : json.loads(s))result.collect()
          [{'name': 'Michael'}, {'name': 'Andy', 'age': 30}, {'name': 'Justin', 'age': 19}]



          瀏覽 70
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩无码网站 | 日韩靠逼| 日韩无码做爱视频 | 亚洲黄色性爱 | 日韩视频在线观看 |