RDD編程,熟悉算子,讀寫(xiě)文件
小編推薦
來(lái)源:子雨大數(shù)據(jù)
http://dblab.xmu.edu.cn/blog
內(nèi)容:RDD編程,熟悉算子,讀寫(xiě)文件
0 準(zhǔn)備工作
1.啟動(dòng) pyspark
from pyspark import SparkContextsc = SparkContext( 'local', 'test')
2.目錄下創(chuàng)建 word.txt 和 person.json,內(nèi)容分別為
面 第 一 行 首 先 從 外 部 文 件 d a t a . t x t 中 構(gòu) 建 得 到 一 個(gè) R D D ,名 稱 為 l i n e s
{"name":"Michael"}{"name":"Andy", "age":30}{"name":"Justin", "age":19}
第3章 Spark編程基礎(chǔ)
3.1 ?Spark入門:RDD編程 http://dblab.xmu.edu.cn/blog/1700-2/
(1)?從文件系統(tǒng)中加載數(shù)據(jù)創(chuàng)建RDD
textFile = sc.textFile("E:/pythonWp/sparkWP/wordCount/word.txt")wordCount = textFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a, b : a + b)wordCount.collect()
運(yùn)行效果:
[('This', 1),('is', 1),....('and', 1),('python', 1)]
(2)?通過(guò)并行集合(數(shù)組)創(chuàng)建RDD
nums = [1,2,3,4,5]rdd = sc.parallelize(nums)# 上面使用列表來(lái)創(chuàng)建。在Python中并沒(méi)有數(shù)組這個(gè)基本數(shù)據(jù)類型,# 為了便于理解,你可以把列表當(dāng)成其他語(yǔ)言的數(shù)組。
(3)?RDD操作
RDD被創(chuàng)建好以后,在后續(xù)使用過(guò)程中一般會(huì)發(fā)生兩種操作:
轉(zhuǎn)換(Transformation): 基于現(xiàn)有的數(shù)據(jù)集創(chuàng)建一個(gè)新的數(shù)據(jù)集。
行動(dòng)(Action):在數(shù)據(jù)集上進(jìn)行運(yùn)算,返回計(jì)算值。
1)??轉(zhuǎn)換操作
對(duì)于RDD而言,每一次轉(zhuǎn)換操作都會(huì)產(chǎn)生不同的RDD,供給下一個(gè)“轉(zhuǎn)換”使用。轉(zhuǎn)換得到的RDD是惰性求值的,也就是說(shuō),整個(gè)轉(zhuǎn)換過(guò)程只是記錄了轉(zhuǎn)換的軌跡,并不會(huì)發(fā)生真正的計(jì)算,只有遇到行動(dòng)操作時(shí),才會(huì)發(fā)生真正的計(jì)算,開(kāi)始從血緣關(guān)系源頭開(kāi)始,進(jìn)行物理的轉(zhuǎn)換操作。
下面列出一些常見(jiàn)的轉(zhuǎn)換操作(Transformation API):
filter(func):篩選出滿足函數(shù)func的元素,并返回一個(gè)新的數(shù)據(jù)集
map(func):將每個(gè)元素傳遞到函數(shù)func中,并將結(jié)果返回為一個(gè)新的數(shù)據(jù)集
flatMap(func):與map()相似,但每個(gè)輸入元素都可以映射到0或多個(gè)輸出結(jié)果
groupByKey():應(yīng)用于(K,V)鍵值對(duì)的數(shù)據(jù)集時(shí),返回一個(gè)新的(K, Iterable)形式的數(shù)據(jù)集
reduceByKey(func):應(yīng)用于(K,V)鍵值對(duì)的數(shù)據(jù)集時(shí),返回一個(gè)新的(K, V)形式的數(shù)據(jù)集,其中的每個(gè)值是將每個(gè)key傳遞到函數(shù)func中進(jìn)行聚合
2)??行動(dòng)操作
行動(dòng)操作是真正觸發(fā)計(jì)算的地方。Spark程序執(zhí)行到行動(dòng)操作時(shí),才會(huì)執(zhí)行真正的計(jì)算,從文件中加載數(shù)據(jù),完成一次又一次轉(zhuǎn)換操作,最終,完成行動(dòng)操作得到結(jié)果。 下面列出一些常見(jiàn)的行動(dòng)操作(Action API):
count() 返回?cái)?shù)據(jù)集中的元素個(gè)數(shù)
collect() 以數(shù)組的形式返回?cái)?shù)據(jù)集中的所有元素
first() 返回?cái)?shù)據(jù)集中的第一個(gè)元素
take(n) 以數(shù)組的形式返回?cái)?shù)據(jù)集中的前n個(gè)元素
reduce(func) 通過(guò)函數(shù)func(輸入兩個(gè)參數(shù)并返回一個(gè)值)聚合數(shù)據(jù)集中的元素
foreach(func) 將數(shù)據(jù)集中的每個(gè)元素傳遞到函數(shù)func中運(yùn)行*
(3)?惰性機(jī)制
這里給出一段簡(jiǎn)單的代碼來(lái)解釋Spark的惰性機(jī)制。
lines = sc.textFile("word.txt")print("lines.first():{0}".format(lines.first()))lineLengths = lines.map(lambda s : len(s))totalLength = lineLengths.reduce( lambda a, b : a + b)print("lineLengths:{0}".format(lineLengths))print("totalLength:{0}".format(totalLength))
上面第一行首先從外部文件data.txt中構(gòu)建得到一個(gè)RDD,名稱為lines,但是,由于textFile()方法只是一個(gè)轉(zhuǎn)換操作,因此,這行代碼執(zhí)行后,不會(huì)立即把data.txt文件加載到內(nèi)存中,這時(shí)的lines只是一個(gè)指向這個(gè)文件的指針。
第二行代碼用來(lái)計(jì)算每行的長(zhǎng)度(即每行包含多少個(gè)單詞),同樣,由于map()方法只是一個(gè)轉(zhuǎn)換操作,這行代碼執(zhí)行后,不會(huì)立即計(jì)算每行的長(zhǎng)度。
第三行代碼的reduce()方法是一個(gè)“動(dòng)作”類型的操作,這時(shí),就會(huì)觸發(fā)真正的計(jì)算。這時(shí),Spark會(huì)把計(jì)算分解成多個(gè)任務(wù)在不同的機(jī)器上執(zhí)行,每臺(tái)機(jī)器運(yùn)行位于屬于它自己的map和reduce,最后把結(jié)果返回給Driver Program。
(4)?持久化
前面我們已經(jīng)說(shuō)過(guò),在Spark中,RDD采用惰性求值的機(jī)制,每次遇到行動(dòng)操作,都會(huì)從頭開(kāi)始執(zhí)行計(jì)算。如果整個(gè)Spark程序中只有一次行動(dòng)操作,這當(dāng)然不會(huì)有什么問(wèn)題。但是,在一些情形下,我們需要多次調(diào)用不同的行動(dòng)操作,這就意味著,每次調(diào)用行動(dòng)操作,都會(huì)觸發(fā)一次從頭開(kāi)始的計(jì)算。這對(duì)于迭代計(jì)算而言,代價(jià)是很大的,迭代計(jì)算經(jīng)常需要多次重復(fù)使用同一組數(shù)據(jù)。
比如,下面就是多次計(jì)算同一個(gè)DD的例子:
list = ["Hadoop","Spark","Hive"]rdd = sc.parallelize(list)print(rdd.count()) #行動(dòng)操作,觸發(fā)一次真正從頭到尾的計(jì)算print(','.join(rdd.collect())) #行動(dòng)操作,觸發(fā)一次真正從頭到尾的計(jì)算
結(jié)果
3Hadoop,Spark,Hive
(5)?分區(qū)
RDD是彈性分布式數(shù)據(jù)集,通常RDD很大,會(huì)被分成很多個(gè)分區(qū),分別保存在不同的節(jié)點(diǎn)上。RDD分區(qū)的一個(gè)分區(qū)原則是使得分區(qū)的個(gè)數(shù)盡量等于集群中的CPU核心(core)數(shù)目。
對(duì)于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通過(guò)設(shè)置spark.default.parallelism這個(gè)參數(shù)的值,來(lái)配置默認(rèn)的分區(qū)數(shù)目,一般而言: *本地模式:默認(rèn)為本地機(jī)器的CPU數(shù)目,若設(shè)置了local[N],則默認(rèn)為N; *Apache Mesos:默認(rèn)的分區(qū)數(shù)為8; *Standalone或YARN:在“集群中所有CPU核心數(shù)目總和”和“2”二者中取較大值作為默認(rèn)值;
因此,對(duì)于parallelize而言,如果沒(méi)有在方法中指定分區(qū)數(shù),則默認(rèn)為spark.default.parallelism,比如:
array = [1,2,3,4,5]rdd = sc.parallelize(array,2) #設(shè)置兩個(gè)分區(qū)
對(duì)于textFile而言,如果沒(méi)有在方法中指定分區(qū)數(shù),則默認(rèn)為min(defaultParallelism,2),其中,defaultParallelism對(duì)應(yīng)的就是spark.default.parallelism。
如果是從HDFS中讀取文件,則分區(qū)數(shù)為文件分片數(shù)(比如,128MB/片)。
3.2 Spark入門:鍵值對(duì)RDD http://dblab.xmu.edu.cn/blog/1706-2/
(1)鍵值對(duì)RDD的創(chuàng)建
第一種創(chuàng)建方式:從文件中加載
lines = sc.textFile("word.txt")pairRDD = lines.flatMap(lambda line : line.split()).map(lambda word : (word,1))pairRDD.foreach(print)pairRDD.first()print (pairRDD.collect())
運(yùn)行結(jié)果
[('面', 1), ('第', 1), ('一', 1), ('行', 1), ('首', 1), ('先', 1), ('從', 1), ('外', 1), ('部', 1), ('文', 1), ('件', 1), ('d', 1), ('a', 1), ('t', 1), ('a', 1), ('.', 1), ('t', 1), ('x', 1), ('t', 1), ('中', 1), ('構(gòu)', 1), ('建', 1), ('得', 1), ('到', 1), ('一', 1), ('個(gè)', 1), ('R', 1), ('D', 1), ('D', 1), (',', 1), ('名', 1), ('稱', 1), ('為', 1), ('l', 1), ('i', 1), ('n', 1), ('e', 1), ('s', 1)]第二種創(chuàng)建方式:通過(guò)并行集合(列表)創(chuàng)建RDD
list = ["Hadoop","Spark","Hive","Spark"]rdd = sc.parallelize(list)airRDD = rdd.map(lambda word : (word,1))print (airRDD.collect())
運(yùn)行結(jié)果
[('Hadoop',?1),?('Spark',?1),?('Hive',?1),?('Spark',?1)](2)?常用的鍵值對(duì)轉(zhuǎn)換操作
常用的鍵值對(duì)轉(zhuǎn)換操作包括reduceByKey()、groupByKey()、sortByKey()、join()、cogroup()等,下面我們通過(guò)實(shí)例來(lái)介紹。
1.reduceByKey(func)
reduceByKey(func)的功能是,使用func函數(shù)合并具有相同鍵的值。比如,reduceByKey((a,b) => a+b),有四個(gè)鍵值對(duì)(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),對(duì)具有相同key的鍵值對(duì)進(jìn)行合并后的結(jié)果就是:(“spark”,3)、(“hadoop”,8)??梢钥闯?,(a,b) => a+b這個(gè)Lamda表達(dá)式中,a和b都是指value,比如,對(duì)于兩個(gè)具有相同key的鍵值對(duì)(“spark”,1)、(“spark”,2),a就是1,b就是2。
我們對(duì)上面第二種方式創(chuàng)建得到的pairRDD進(jìn)行reduceByKey()操作,代碼如下:
airRDD = airRDD.reduceByKey(lambda a,b : a+b)print (airRDD.collect())
[('Hadoop',?1),?('Spark',?2),?('Hive',?1)]2.groupByKey()
groupByKey()的功能是,對(duì)具有相同鍵的值進(jìn)行分組。比如,對(duì)四個(gè)鍵值對(duì)(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),采用groupByKey()后得到的結(jié)果是:(“spark”,(1,2))和(“hadoop”,(3,5))。 我們對(duì)上面第二種方式創(chuàng)建得到的pairRDD進(jìn)行g(shù)roupByKey()操作,代碼如下:
groupRDD = pairRDD.groupByKey()print (groupRDD.collect())
3.keys()
keys()只會(huì)把鍵值對(duì)RDD中的key返回形成一個(gè)新的RDD。比如,對(duì)四個(gè)鍵值對(duì)(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)構(gòu)成的RDD,采用keys()后得到的結(jié)果是一個(gè)RDD[Int],內(nèi)容是{“spark”,”spark”,”hadoop”,”hadoop”}。
我們對(duì)上面第二種方式創(chuàng)建得到的pairRDD進(jìn)行keys操作,代碼如下:
keyRDD = pairRDD.keys()print (keyRDD.collect())
['面', '第', '一', '行', '首', '先', '從', '外', '部', '文', '件', 'd', 'a', 't', 'a', '.', 't', 'x', 't', '中', '構(gòu)', '建', '得', '到', '一', '個(gè)', 'R', 'D', 'D', ',', '名', '稱', '為', 'l', 'i', 'n', 'e', 's']4.values()
values()只會(huì)把鍵值對(duì)RDD中的value返回形成一個(gè)新的RDD。比如,對(duì)四個(gè)鍵值對(duì)(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)構(gòu)成的RDD,采用values()后得到的結(jié)果是一個(gè)RDD[Int],內(nèi)容是{1,2,3,5}。
我們對(duì)上面第二種方式創(chuàng)建得到的pairRDD進(jìn)行values()操作,代碼如下:
valuesRDD = pairRDD.values()print (valuesRDD.collect())
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]5.sortByKey()
sortByKey()的功能是返回一個(gè)根據(jù)鍵排序的RDD。
我們對(duì)上面第二種方式創(chuàng)建得到的pairRDD進(jìn)行keys操作,代碼如下:
sortByKeyRDD = pairRDD.sortByKey()print (sortByKeyRDD.collect())
[('.', 1), ('D', 1), ('D', 1), ('R', 1), ('a', 1), ('a', 1), ('d', 1), ('e', 1), ('i', 1), ('l', 1), ('n', 1), ('s', 1), ('t', 1), ('t', 1), ('t', 1), ('x', 1), ('一', 1), ('一', 1), ('個(gè)', 1), ('中', 1), ('為', 1), ('從', 1), ('件', 1), ('先', 1), ('到', 1), ('名', 1), ('外', 1), ('建', 1), ('得', 1), ('文', 1), ('構(gòu)', 1), ('稱', 1), ('第', 1), ('行', 1), ('部', 1), ('面', 1), ('首', 1), (',', 1)]6.mapValues(func)
我們經(jīng)常會(huì)遇到一種情形,我們只想對(duì)鍵值對(duì)RDD的value部分進(jìn)行處理,而不是同時(shí)對(duì)key和value進(jìn)行處理。對(duì)于這種情形,Spark提供了mapValues(func),它的功能是,對(duì)鍵值對(duì)RDD中的每個(gè)value都應(yīng)用一個(gè)函數(shù),但是,key不會(huì)發(fā)生變化。比如,對(duì)四個(gè)鍵值對(duì)(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)構(gòu)成的pairRDD,如果執(zhí)行pairRDD.mapValues(lambda x : x+1),就會(huì)得到一個(gè)新的鍵值對(duì)RDD,它包含下面四個(gè)鍵值對(duì)(“spark”,2)、(“spark”,3)、(“hadoop”,4)和(“hadoop”,6)。
我們對(duì)上面第二種方式創(chuàng)建得到的pairRDD進(jìn)行keys操作,代碼如下:
mapValuesRDD = pairRDD.mapValues(lambda x : x+1)print (mapValuesRDD.collect())
[('面', 2), ('第', 2), ('一', 2), ('行', 2), ('首', 2), ('先', 2), ('從', 2), ('外', 2), ('部', 2), ('文', 2), ('件', 2), ('d', 2), ('a', 2), ('t', 2), ('a', 2), ('.', 2), ('t', 2), ('x', 2), ('t', 2), ('中', 2), ('構(gòu)', 2), ('建', 2), ('得', 2), ('到', 2), ('一', 2), ('個(gè)', 2), ('R', 2), ('D', 2), ('D', 2), (',', 2), ('名', 2), ('稱', 2), ('為', 2), ('l', 2), ('i', 2), ('n', 2), ('e', 2), ('s', 2)]7.join
join(連接)操作是鍵值對(duì)常用的操作?!斑B接”(join)這個(gè)概念來(lái)自于關(guān)系數(shù)據(jù)庫(kù)領(lǐng)域,因此,join的類型也和關(guān)系數(shù)據(jù)庫(kù)中的join一樣,包括內(nèi)連接(join)、左外連接(leftOuterJoin)、右外連接(rightOuterJoin)等。最常用的情形是內(nèi)連接,所以,join就表示內(nèi)連接。
對(duì)于內(nèi)連接,對(duì)于給定的兩個(gè)輸入數(shù)據(jù)集(K,V1)和(K,V2),只有在兩個(gè)數(shù)據(jù)集中都存在的key才會(huì)被輸出,最終得到一個(gè)(K,(V1,V2))類型的數(shù)據(jù)集。
比如,pairRDD1是一個(gè)鍵值對(duì)集合{(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)},pairRDD2是一個(gè)鍵值對(duì)集合{(“spark”,”fast”)},那么,pairRDD1.join(pairRDD2)的結(jié)果就是一個(gè)新的RDD,這個(gè)新的RDD是鍵值對(duì)集合{(“spark”,1,”fast”),(“spark”,2,”fast”)}。對(duì)于這個(gè)實(shí)例,我們下面在pyspark中運(yùn)行一下:
pairRDD1 = sc.parallelize([('spark',1),('spark',2),('hadoop',3),('hadoop',5)])pairRDD2 = sc.parallelize([('spark','fast')])pairRDD3 = pairRDD1.join(pairRDD2)print (pairRDD3.collect())
[('spark', (1, 'fast')), ('spark', (2, 'fast'))]3.3 Spark入門:共享變量 http://dblab.xmu.edu.cn/blog/1707-2/
Spark中的兩個(gè)重要抽象是RDD和共享變量。上一章我們已經(jīng)介紹了RDD,這里介紹共享變量。
在默認(rèn)情況下,當(dāng)Spark在集群的多個(gè)不同節(jié)點(diǎn)的多個(gè)任務(wù)上并行運(yùn)行一個(gè)函數(shù)時(shí),它會(huì)把函數(shù)中涉及到的每個(gè)變量,在每個(gè)任務(wù)上都生成一個(gè)副本。但是,有時(shí)候,需要在多個(gè)任務(wù)之間共享變量,或者在任務(wù)(Task)和任務(wù)控制節(jié)點(diǎn)(Driver Program)之間共享變量。為了滿足這種需求,Spark提供了兩種類型的變量:廣播變量(broadcast variables)和累加器(accumulators)。廣播變量用來(lái)把變量在所有節(jié)點(diǎn)的內(nèi)存之間進(jìn)行共享。累加器則支持在所有不同節(jié)點(diǎn)之間進(jìn)行累加計(jì)算(比如計(jì)數(shù)或者求和)。
廣播變量
廣播變量(broadcast variables)允許程序開(kāi)發(fā)人員在每個(gè)機(jī)器上緩存一個(gè)只讀的變量,而不是為機(jī)器上的每個(gè)任務(wù)都生成一個(gè)副本。通過(guò)這種方式,就可以非常高效地給每個(gè)節(jié)點(diǎn)(機(jī)器)提供一個(gè)大的輸入數(shù)據(jù)集的副本。Spark的“動(dòng)作”操作會(huì)跨越多個(gè)階段(stage),對(duì)于每個(gè)階段內(nèi)的所有任務(wù)所需要的公共數(shù)據(jù),Spark都會(huì)自動(dòng)進(jìn)行廣播。通過(guò)廣播方式進(jìn)行傳播的變量,會(huì)經(jīng)過(guò)序列化,然后在被任務(wù)使用時(shí)再進(jìn)行反序列化。這就意味著,顯式地創(chuàng)建廣播變量只有在下面的情形中是有用的:當(dāng)跨越多個(gè)階段的那些任務(wù)需要相同的數(shù)據(jù),或者當(dāng)以反序列化方式對(duì)數(shù)據(jù)進(jìn)行緩存是非常重要的。
可以通過(guò)調(diào)用SparkContext.broadcast(v)來(lái)從一個(gè)普通變量v中創(chuàng)建一個(gè)廣播變量。這個(gè)廣播變量就是對(duì)普通變量v的一個(gè)包裝器,通過(guò)調(diào)用value方法就可以獲得這個(gè)廣播變量的值,具體代碼如下:
broadcastVar = sc.broadcast([1, 2, 3])broadcastVar.value
[1, 2, 3]這個(gè)廣播變量被創(chuàng)建以后,那么在集群中的任何函數(shù)中,都應(yīng)該使用廣播變量broadcastVar的值,而不是使用v的值,這樣就不會(huì)把v重復(fù)分發(fā)到這些節(jié)點(diǎn)上。此外,一旦廣播變量創(chuàng)建后,普通變量v的值就不能再發(fā)生修改,從而確保所有節(jié)點(diǎn)都獲得這個(gè)廣播變量的相同的值。
累加器
累加器是僅僅被相關(guān)操作累加的變量,通??梢员挥脕?lái)實(shí)現(xiàn)計(jì)數(shù)器(counter)和求和(sum)。Spark原生地支持?jǐn)?shù)值型(numeric)的累加器,程序開(kāi)發(fā)人員可以編寫(xiě)對(duì)新類型的支持。如果創(chuàng)建累加器時(shí)指定了名字,則可以在Spark UI界面看到,這有利于理解每個(gè)執(zhí)行階段的進(jìn)程。
一個(gè)數(shù)值型的累加器,可以通過(guò)調(diào)用SparkContext.accumulator()來(lái)創(chuàng)建。運(yùn)行在集群中的任務(wù),就可以使用add方法來(lái)把數(shù)值累加到累加器上,但是,這些任務(wù)只能做累加操作,不能讀取累加器的值,只有任務(wù)控制節(jié)點(diǎn)(Driver Program)可以使用value方法來(lái)讀取累加器的值。
下面是一個(gè)代碼實(shí)例,演示了使用累加器來(lái)對(duì)一個(gè)數(shù)組中的元素進(jìn)行求和:
accum = sc.accumulator(0)sc.parallelize([1, 2, 3, 4]).foreach(lambda x : accum.add(x))accum.value
103.4 數(shù)據(jù)讀寫(xiě)
3.4.1 Spark入門:文件數(shù)據(jù)讀寫(xiě)http://dblab.xmu.edu.cn/blog/1708-2/
不同文件格式的文件系統(tǒng)的數(shù)據(jù)讀寫(xiě)
下面分別介紹本地文件系統(tǒng)的數(shù)據(jù)讀寫(xiě)和分布式文件系統(tǒng)HDFS的數(shù)據(jù)讀寫(xiě)。
本地文件文件的數(shù)據(jù)讀寫(xiě)
textFile = sc.textFile("word.txt")textFile.first()
'面 第 一 行 首 先 從 外 部 文 件 d a t a . t x t 中 構(gòu) 建 得 到 一 個(gè) R D D ,'本地 JSON 的數(shù)據(jù)讀寫(xiě)
jsonStr?=?sc.textFile("people.json")jsonStr.collect()
['{"name":"Michael"}','{"name":"Andy", "age":30}','{"name":"Justin", "age":19}']
從上面執(zhí)行結(jié)果可以看出,people.json文件加載到RDD中以后,在RDD中存在三個(gè)字符串。我們下面要做的事情,就是把這三個(gè)JSON格式的字符串解析出來(lái),比如說(shuō),第一個(gè)字符串{“name”:”Michael”},經(jīng)過(guò)解析后,解析得到key是”name”,value是”Michael”。
現(xiàn)在我們編寫(xiě)程序完成對(duì)上面字符串的解析工作。
Scala中有一個(gè)自帶的JSON庫(kù)——scala.util.parsing.json.JSON,可以實(shí)現(xiàn)對(duì)JSON數(shù)據(jù)的解析。JSON.parseFull(jsonString:String)函數(shù),以一個(gè)JSON字符串作為輸入并進(jìn)行解析,如果解析成功則返回一個(gè)Some(map: Map[String, Any]),如果解析失敗則返回None。
因此,我們可以使用模式匹配來(lái)處理解析結(jié)果
請(qǐng)執(zhí)行以下命令:
import jsoninputFile = "people.json"jsonStrs = sc.textFile(inputFile)result = jsonStrs.map(lambda s : json.loads(s))result.collect()
[{'name': 'Michael'},{'name': 'Andy', 'age': 30},{'name': 'Justin', 'age': 19}]
