3萬字長文,PySpark入門級學(xué)習(xí)教程,框架思維

為什么要學(xué)習(xí)Spark?作為數(shù)據(jù)從業(yè)者多年,個(gè)人覺得Spark已經(jīng)越來越走進(jìn)我們的日常工作了,無論是使用哪種編程語言,Python、Scala還是Java,都會或多或少接觸到Spark,它可以讓我們能夠用到集群的力量,可以對BigData進(jìn)行高效操作,實(shí)現(xiàn)很多之前由于計(jì)算資源而無法輕易實(shí)現(xiàn)的東西。網(wǎng)上有很多關(guān)于Spark的好處,這里就不做過多的贅述,我們直接進(jìn)入這篇文章的正文!

關(guān)于PySpark,我們知道它是Python調(diào)用Spark的接口,我們可以通過調(diào)用Python API的方式來編寫Spark程序,它支持了大多數(shù)的Spark功能,比如SparkDataFrame、Spark SQL、Streaming、MLlib等等。只要我們了解Python的基本語法,那么在Python里調(diào)用Spark的力量就顯得十分easy了。下面我將會從相對宏觀的層面介紹一下PySpark,讓我們對于這個(gè)神器有一個(gè)框架性的認(rèn)識,知道它能干什么,知道去哪里尋找問題解答,爭取看完這篇文章可以讓我們更加絲滑地入門PySpark。話不多說,馬上開始!
?? 目錄

?? 安裝指引
安裝這塊本文就不展開具體的步驟了,畢竟大家的機(jī)子環(huán)境都不盡相同。不過可以簡單說幾點(diǎn)重要的步驟,然后節(jié)末放上一些安裝示例供大家參考。
1)要使用PySpark,機(jī)子上要有Java開發(fā)環(huán)境
2)環(huán)境變量記得要配置完整
3)Mac下的/usr/local/ 路徑一般是隱藏的,PyCharm配置py4j和pyspark的時(shí)候可以使用 shift+command+G 來使用路徑訪問。
4)Mac下如果修改了 ~/.bash_profile 的話,記得要重啟下PyCharm才會生效的哈
5)版本記得要搞對,保險(xiǎn)起見Java的jdk版本選擇低版本(別問我為什么知道),我選擇的是Java8.
下面是一些示例demo,可以參考下:
1)Mac下安裝spark,并配置pycharm-pyspark完整教程
https://blog.csdn.net/shiyutianming/article/details/99946797
2)virtualBox里安裝開發(fā)環(huán)境
https://www.bilibili.com/video/BV1i4411i79a?p=3
3)快速搭建spark開發(fā)環(huán)境,云哥項(xiàng)目
https://github.com/lyhue1991/eat_pyspark_in_10_days
?? 基礎(chǔ)概念
關(guān)于Spark的基礎(chǔ)概念,我在先前的文章里也有寫過,大家可以一起來回顧一下 《想學(xué)習(xí)Spark?先帶你了解一些基礎(chǔ)的知識》。作為補(bǔ)充,今天在這里也介紹一些在Spark中會經(jīng)常遇見的專有名詞。
???♀? Q1: 什么是RDD
RDD的全稱是 Resilient Distributed Datasets,這是Spark的一種數(shù)據(jù)抽象集合,它可以被執(zhí)行在分布式的集群上進(jìn)行各種操作,而且有較強(qiáng)的容錯(cuò)機(jī)制。RDD可以被分為若干個(gè)分區(qū),每一個(gè)分區(qū)就是一個(gè)數(shù)據(jù)集片段,從而可以支持分布式計(jì)算。
???♀? Q2: RDD運(yùn)行時(shí)相關(guān)的關(guān)鍵名詞
簡單來說可以有 Client、Job、Master、Worker、Driver、Stage、Task以及Executor,這幾個(gè)東西在調(diào)優(yōu)的時(shí)候也會經(jīng)常遇到的。
Client:指的是客戶端進(jìn)程,主要負(fù)責(zé)提交job到Master;
Job:Job來自于我們編寫的程序,Application包含一個(gè)或者多個(gè)job,job包含各種RDD操作;
Master:指的是Standalone模式中的主控節(jié)點(diǎn),負(fù)責(zé)接收來自Client的job,并管理著worker,可以給worker分配任務(wù)和資源(主要是driver和executor資源);
Worker:指的是Standalone模式中的slave節(jié)點(diǎn),負(fù)責(zé)管理本節(jié)點(diǎn)的資源,同時(shí)受Master管理,需要定期給Master回報(bào)heartbeat(心跳),啟動Driver和Executor;
Driver:指的是 job(作業(yè))的主進(jìn)程,一般每個(gè)Spark作業(yè)都會有一個(gè)Driver進(jìn)程,負(fù)責(zé)整個(gè)作業(yè)的運(yùn)行,包括了job的解析、Stage的生成、調(diào)度Task到Executor上去執(zhí)行;
Stage:中文名 階段,是job的基本調(diào)度單位,因?yàn)槊總€(gè)job會分成若干組Task,每組任務(wù)就被稱為 Stage;
Task:任務(wù),指的是直接運(yùn)行在executor上的東西,是executor上的一個(gè)線程;
Executor:指的是 執(zhí)行器,顧名思義就是真正執(zhí)行任務(wù)的地方了,一個(gè)集群可以被配置若干個(gè)Executor,每個(gè)Executor接收來自Driver的Task,并執(zhí)行它(可同時(shí)執(zhí)行多個(gè)Task)。
???♀? Q3: 什么是DAG
全稱是 Directed Acyclic Graph,中文名是有向無環(huán)圖。Spark就是借用了DAG對RDD之間的關(guān)系進(jìn)行了建模,用來描述RDD之間的因果依賴關(guān)系。因?yàn)樵谝粋€(gè)Spark作業(yè)調(diào)度中,多個(gè)作業(yè)任務(wù)之間也是相互依賴的,有些任務(wù)需要在一些任務(wù)執(zhí)行完成了才可以執(zhí)行的。在Spark調(diào)度中就是有DAGscheduler,它負(fù)責(zé)將job分成若干組Task組成的Stage。

???♀? Q4: Spark的部署模式有哪些
主要有l(wèi)ocal模式、Standalone模式、Mesos模式、YARN模式。
更多的解釋可以參考這位老哥的解釋。https://www.jianshu.com/p/3b8f85329664
???♀? Q5: Shuffle操作是什么
Shuffle指的是數(shù)據(jù)從Map端到Reduce端的數(shù)據(jù)傳輸過程,Shuffle性能的高低直接會影響程序的性能。因?yàn)镽educe task需要跨節(jié)點(diǎn)去拉在分布在不同節(jié)點(diǎn)上的Map task計(jì)算結(jié)果,這一個(gè)過程是需要有磁盤IO消耗以及數(shù)據(jù)網(wǎng)絡(luò)傳輸?shù)南牡?,所以需要根?jù)實(shí)際數(shù)據(jù)情況進(jìn)行適當(dāng)調(diào)整。另外,Shuffle可以分為兩部分,分別是Map階段的數(shù)據(jù)準(zhǔn)備與Reduce階段的數(shù)據(jù)拷貝處理,在Map端我們叫Shuffle Write,在Reduce端我們叫Shuffle Read。
???♀? Q6: 什么是惰性執(zhí)行
這是RDD的一個(gè)特性,在RDD中的算子可以分為Transform算子和Action算子,其中Transform算子的操作都不會真正執(zhí)行,只會記錄一下依賴關(guān)系,直到遇見了Action算子,在這之前的所有Transform操作才會被觸發(fā)計(jì)算,這就是所謂的惰性執(zhí)行。具體哪些是Transform和Action算子,可以看下一節(jié)。
?? 常用函數(shù)
從網(wǎng)友的總結(jié)來看比較常用的算子大概可以分為下面幾種,所以就演示一下這些算子,如果需要看更多的算子或者解釋,建議可以移步到官方API文檔去Search一下哈。
pyspark.RDD:http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD

下面我們用自己創(chuàng)建的RDD:sc.parallelize(range(1,11),4)
import?os
import?pyspark
from?pyspark?import?SparkContext,?SparkConf
conf?=?SparkConf().setAppName("test_SamShare").setMaster("local[4]")
sc?=?SparkContext(conf=conf)
#?使用?parallelize方法直接實(shí)例化一個(gè)RDD
rdd?=?sc.parallelize(range(1,11),4)?#?這里的?4?指的是分區(qū)數(shù)量
rdd.take(100)
#?[1,?2,?3,?4,?5,?6,?7,?8,?9,?10]
"""
----------------------------------------------
????????????????Transform算子解析
----------------------------------------------
"""
#?以下的操作由于是Transform操作,因?yàn)槲覀冃枰谧詈蠹由弦粋€(gè)collect算子用來觸發(fā)計(jì)算。
#?1.?map:?和python差不多,map轉(zhuǎn)換就是對每一個(gè)元素進(jìn)行一個(gè)映射
rdd?=?sc.parallelize(range(1,?11),?4)
rdd_map?=?rdd.map(lambda?x:?x*2)
print("原始數(shù)據(jù):",?rdd.collect())
print("擴(kuò)大2倍:",?rdd_map.collect())
#?原始數(shù)據(jù):?[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
#?擴(kuò)大2倍:?[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
#?2.?flatMap:?這個(gè)相比于map多一個(gè)flat(壓平)操作,顧名思義就是要把高維的數(shù)組變成一維
rdd2?=?sc.parallelize(["hello?SamShare",?"hello?PySpark"])
print("原始數(shù)據(jù):",?rdd2.collect())
print("直接split之后的map結(jié)果:",?rdd2.map(lambda?x:?x.split("?")).collect())
print("直接split之后的flatMap結(jié)果:",?rdd2.flatMap(lambda?x:?x.split("?")).collect())
#?直接split之后的map結(jié)果:?[['hello', 'SamShare'], ['hello', 'PySpark']]
#?直接split之后的flatMap結(jié)果:?['hello', 'SamShare', 'hello', 'PySpark']
#?3.?filter:?過濾數(shù)據(jù)
rdd?=?sc.parallelize(range(1,?11),?4)
print("原始數(shù)據(jù):",?rdd.collect())
print("過濾奇數(shù):",?rdd.filter(lambda?x:?x?%?2?==?0).collect())
#?原始數(shù)據(jù):?[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
#?過濾奇數(shù):?[2, 4, 6, 8, 10]
#?4.?distinct:?去重元素
rdd?=?sc.parallelize([2,?2,?4,?8,?8,?8,?8,?16,?32,?32])
print("原始數(shù)據(jù):",?rdd.collect())
print("去重?cái)?shù)據(jù):",?rdd.distinct().collect())
#?原始數(shù)據(jù):?[2, 2, 4, 8, 8, 8, 8, 16, 32, 32]
#?去重?cái)?shù)據(jù):?[4, 8, 16, 32, 2]
#?5.?reduceByKey:?根據(jù)key來映射數(shù)據(jù)
from?operator?import?add
rdd?=?sc.parallelize([("a",?1),?("b",?1),?("a",?1)])
print("原始數(shù)據(jù):",?rdd.collect())
print("原始數(shù)據(jù):",?rdd.reduceByKey(add).collect())
#?原始數(shù)據(jù):?[('a', 1), ('b', 1), ('a', 1)]
#?原始數(shù)據(jù):?[('b', 1), ('a', 2)]
#?6.?mapPartitions:?根據(jù)分區(qū)內(nèi)的數(shù)據(jù)進(jìn)行映射操作
rdd?=?sc.parallelize([1,?2,?3,?4],?2)
def?f(iterator):
????yield?sum(iterator)
print(rdd.collect())
print(rdd.mapPartitions(f).collect())
#?[1,?2,?3,?4]
#?[3,?7]
#?7.?sortBy:?根據(jù)規(guī)則進(jìn)行排序
tmp?=?[('a',?1),?('b',?2),?('1',?3),?('d',?4),?('2',?5)]
print(sc.parallelize(tmp).sortBy(lambda?x:?x[0]).collect())
print(sc.parallelize(tmp).sortBy(lambda?x:?x[1]).collect())
#?[('1',?3),?('2',?5),?('a',?1),?('b',?2),?('d',?4)]
#?[('a',?1),?('b',?2),?('1',?3),?('d',?4),?('2',?5)]
#?8.?subtract:?數(shù)據(jù)集相減,?Return?each?value?in?self?that?is?not?contained?in?other.
x?=?sc.parallelize([("a",?1),?("b",?4),?("b",?5),?("a",?3)])
y?=?sc.parallelize([("a",?3),?("c",?None)])
print(sorted(x.subtract(y).collect()))
#?[('a',?1),?('b',?4),?('b',?5)]
#?9.?union:?合并兩個(gè)RDD
rdd?=?sc.parallelize([1,?1,?2,?3])
print(rdd.union(rdd).collect())
#?[1,?1,?2,?3,?1,?1,?2,?3]
#?10.?intersection:?取兩個(gè)RDD的交集,同時(shí)有去重的功效
rdd1?=?sc.parallelize([1,?10,?2,?3,?4,?5,?2,?3])
rdd2?=?sc.parallelize([1,?6,?2,?3,?7,?8])
print(rdd1.intersection(rdd2).collect())
#?[1,?2,?3]
#?11.?cartesian:?生成笛卡爾積
rdd?=?sc.parallelize([1,?2])
print(sorted(rdd.cartesian(rdd).collect()))
#?[(1,?1),?(1,?2),?(2,?1),?(2,?2)]
#?12.?zip:?拉鏈合并,需要兩個(gè)RDD具有相同的長度以及分區(qū)數(shù)量
x?=?sc.parallelize(range(0,?5))
y?=?sc.parallelize(range(1000,?1005))
print(x.collect())
print(y.collect())
print(x.zip(y).collect())
#?[0,?1,?2,?3,?4]
#?[1000,?1001,?1002,?1003,?1004]
#?[(0,?1000),?(1,?1001),?(2,?1002),?(3,?1003),?(4,?1004)]
# 13. zipWithIndex:?將RDD和一個(gè)從0開始的遞增序列按照拉鏈方式連接。
rdd_name?=?sc.parallelize(["LiLei",?"Hanmeimei",?"Lily",?"Lucy",?"Ann",?"Dachui",?"RuHua"])
rdd_index?=?rdd_name.zipWithIndex()
print(rdd_index.collect())
#?[('LiLei',?0),?('Hanmeimei',?1),?('Lily',?2),?('Lucy',?3),?('Ann',?4),?('Dachui',?5),?('RuHua',?6)]
#?14.?groupByKey:?按照key來聚合數(shù)據(jù)
rdd?=?sc.parallelize([("a",?1),?("b",?1),?("a",?1)])
print(rdd.collect())
print(sorted(rdd.groupByKey().mapValues(len).collect()))
print(sorted(rdd.groupByKey().mapValues(list).collect()))
#?[('a',?1),?('b',?1),?('a',?1)]
#?[('a',?2),?('b',?1)]
#?[('a',?[1,?1]),?('b',?[1])]
#?15.?sortByKey:
tmp?=?[('a',?1),?('b',?2),?('1',?3),?('d',?4),?('2',?5)]
print(sc.parallelize(tmp).sortByKey(True,?1).collect())
#?[('1',?3),?('2',?5),?('a',?1),?('b',?2),?('d',?4)]
#?16.?join:
x?=?sc.parallelize([("a",?1),?("b",?4)])
y?=?sc.parallelize([("a",?2),?("a",?3)])
print(sorted(x.join(y).collect()))
#?[('a',?(1,?2)),?('a',?(1,?3))]
#?17.?leftOuterJoin/rightOuterJoin
x?=?sc.parallelize([("a",?1),?("b",?4)])
y?=?sc.parallelize([("a",?2)])
print(sorted(x.leftOuterJoin(y).collect()))
#?[('a',?(1,?2)),?('b',?(4,?None))]
"""
----------------------------------------------
????????????????Action算子解析
----------------------------------------------
"""
#?1.?collect:?指的是把數(shù)據(jù)都匯集到driver端,便于后續(xù)的操作
rdd?=?sc.parallelize(range(0,?5))
rdd_collect?=?rdd.collect()
print(rdd_collect)
#?[0,?1,?2,?3,?4]
#?2.?first:?取第一個(gè)元素
sc.parallelize([2,?3,?4]).first()
#?2
#?3.?collectAsMap:?轉(zhuǎn)換為dict,使用這個(gè)要注意了,不要對大數(shù)據(jù)用,不然全部載入到driver端會爆內(nèi)存
m?=?sc.parallelize([(1,?2),?(3,?4)]).collectAsMap()
m
#?{1:?2,?3:?4}
#?4.?reduce:?逐步對兩個(gè)元素進(jìn)行操作
rdd?=?sc.parallelize(range(10),5)
print(rdd.reduce(lambda?x,y:x+y))
#?45
#?5.?countByKey/countByValue:
rdd?=?sc.parallelize([("a",?1),?("b",?1),?("a",?1)])
print(sorted(rdd.countByKey().items()))
print(sorted(rdd.countByValue().items()))
#?[('a',?2),?('b',?1)]
#?[(('a',?1),?2),?(('b',?1),?1)]
#?6.?take:?相當(dāng)于取幾個(gè)數(shù)據(jù)到driver端
rdd?=?sc.parallelize([("a",?1),?("b",?1),?("a",?1)])
print(rdd.take(5))
#?[('a',?1),?('b',?1),?('a',?1)]
#?7.?saveAsTextFile:?保存rdd成text文件到本地
text_file?=?"./data/rdd.txt"
rdd?=?sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)
#?8.?takeSample:?隨機(jī)取數(shù)
rdd?=?sc.textFile("./test/data/hello_samshare.txt",?4)??#?這里的?4?指的是分區(qū)數(shù)量
rdd_sample?=?rdd.takeSample(True,?2,?0)??# withReplacement 參數(shù)1:代表是否是有放回抽樣
rdd_sample
#?9.?foreach:?對每一個(gè)元素執(zhí)行某種操作,不生成新的RDD
rdd?=?sc.parallelize(range(10),?5)
accum?=?sc.accumulator(0)
rdd.foreach(lambda?x:?accum.add(x))
print(accum.value)
#?45
?? Spark SQL使用
在講Spark SQL前,先解釋下這個(gè)模塊。這個(gè)模塊是Spark中用來處理結(jié)構(gòu)化數(shù)據(jù)的,提供一個(gè)叫SparkDataFrame的東西并且自動解析為分布式SQL查詢數(shù)據(jù)。我們之前用過Python的Pandas庫,也大致了解了DataFrame,這個(gè)其實(shí)和它沒有太大的區(qū)別,只是調(diào)用的API可能有些不同罷了。
我們通過使用Spark SQL來處理數(shù)據(jù),會讓我們更加地熟悉,比如可以用SQL語句、用SparkDataFrame的API或者Datasets API,我們可以按照需求隨心轉(zhuǎn)換,通過SparkDataFrame API 和 SQL 寫的邏輯,會被Spark優(yōu)化器Catalyst自動優(yōu)化成RDD,即便寫得不好也可能運(yùn)行得很快(如果是直接寫RDD可能就掛了哈哈)。
創(chuàng)建SparkDataFrame
開始講SparkDataFrame,我們先學(xué)習(xí)下幾種創(chuàng)建的方法,分別是使用RDD來創(chuàng)建、使用python的DataFrame來創(chuàng)建、使用List來創(chuàng)建、讀取數(shù)據(jù)文件來創(chuàng)建、通過讀取數(shù)據(jù)庫來創(chuàng)建。
1. 使用RDD來創(chuàng)建
主要使用RDD的toDF方法。
rdd?=?sc.parallelize([("Sam",?28,?88),?("Flora",?28,?90),?("Run",?1,?60)])
df?=?rdd.toDF(["name",?"age",?"score"])
df.show()
df.printSchema()
#?+-----+---+-----+
#?|?name|age|score|
#?+-----+---+-----+
#?|??Sam|?28|???88|
#?|Flora|?28|???90|
#?|??Run|??1|???60|
#?+-----+---+-----+
#?root
#??|--?name:?string?(nullable?=?true)
#??|--?age:?long?(nullable?=?true)
#??|--?score:?long?(nullable?=?true)
2. 使用python的DataFrame來創(chuàng)建
df?=?pd.DataFrame([['Sam',?28,?88],?['Flora',?28,?90],?['Run',?1,?60]],
??????????????????columns=['name',?'age',?'score'])
print(">>?打印DataFrame:")
print(df)
print("\n")
Spark_df?=?spark.createDataFrame(df)
print(">>?打印SparkDataFrame:")
Spark_df.show()
#?>>?打印DataFrame:
#?????name??age??score
#?0????Sam???28?????88
#?1??Flora???28?????90
#?2????Run????1?????60
#?>>?打印SparkDataFrame:
#?+-----+---+-----+
#?|?name|age|score|
#?+-----+---+-----+
#?|??Sam|?28|???88|
#?|Flora|?28|???90|
#?|??Run|??1|???60|
#?+-----+---+-----+
3. 使用List來創(chuàng)建
list_values?=?[['Sam',?28,?88],?['Flora',?28,?90],?['Run',?1,?60]]
Spark_df?=?spark.createDataFrame(list_values,?['name',?'age',?'score'])
Spark_df.show()
#?+-----+---+-----+
#?|?name|age|score|
#?+-----+---+-----+
#?|??Sam|?28|???88|
#?|Flora|?28|???90|
#?|??Run|??1|???60|
#?+-----+---+-----+
4. 讀取數(shù)據(jù)文件來創(chuàng)建
#?4.1?CSV文件
df?=?spark.read.option("header",?"true")\
????.option("inferSchema",?"true")\
????.option("delimiter",?",")\
????.csv("./test/data/titanic/train.csv")
df.show(5)
df.printSchema()
#?4.2?json文件
df?=?spark.read.json("./test/data/hello_samshare.json")
df.show(5)
df.printSchema()
5. 通過讀取數(shù)據(jù)庫來創(chuàng)建
#?5.1?讀取hive數(shù)據(jù)
spark.sql("CREATE?TABLE?IF?NOT?EXISTS?src?(key?INT,?value?STRING)?USING?hive")
spark.sql("LOAD?DATA?LOCAL?INPATH?'data/kv1.txt'?INTO?TABLE?src")
df?=?spark.sql("SELECT?key,?value?FROM?src?WHERE?key?<?10?ORDER?BY?key")
df.show(5)
#?5.2?讀取mysql數(shù)據(jù)
url?=?"jdbc:mysql://localhost:3306/test"
df?=?spark.read.format("jdbc")?\
?.option("url",?url)?\
?.option("dbtable",?"runoob_tbl")?\
?.option("user",?"root")?\
?.option("password",?"8888")?\
?.load()\
df.show()
常用的SparkDataFrame API
這里我大概是分成了幾部分來看這些APIs,分別是查看DataFrame的APIs、簡單處理DataFrame的APIs、DataFrame的列操作APIs、DataFrame的一些思路變換操作APIs、DataFrame的一些統(tǒng)計(jì)操作APIs,這樣子也有助于我們了解這些API的功能,以后遇見實(shí)際問題的時(shí)候可以解決。
首先我們這小節(jié)全局用到的數(shù)據(jù)集如下:
from?pyspark.sql?import?functions?as?F
from?pyspark.sql?import?SparkSession
# SparkSQL的許多功能封裝在SparkSession的方法接口中, SparkContext則不行的。
spark?=?SparkSession.builder?\
????.appName("sam_SamShare")?\
????.config("master",?"local[4]")?\
????.enableHiveSupport()?\
????.getOrCreate()
sc?=?spark.sparkContext
#?創(chuàng)建一個(gè)SparkDataFrame
rdd?=?sc.parallelize([("Sam",?28,?88,?"M"),
??????????????????????("Flora",?28,?90,?"F"),
??????????????????????("Run",?1,?60,?None),
??????????????????????("Peter",?55,?100,?"M"),
??????????????????????("Mei",?54,?95,?"F")])
df?=?rdd.toDF(["name",?"age",?"score",?"sex"])
df.show()
df.printSchema()
#?+-----+---+-----+----+
#?|?name|age|score|?sex|
#?+-----+---+-----+----+
#?|??Sam|?28|???88|???M|
#?|Flora|?28|???90|???F|
#?|??Run|??1|???60|null|
#?|Peter|?55|??100|???M|
#?|??Mei|?54|???95|???F|
#?+-----+---+-----+----+
#?root
#??|--?name:?string?(nullable?=?true)
#??|--?age:?long?(nullable?=?true)
#??|--?score:?long?(nullable?=?true)
#??|--?sex:?string?(nullable?=?true)
1. 查看DataFrame的APIs
#?DataFrame.collect
#?以列表形式返回行
df.collect()
#?[Row(name='Sam',?age=28,?score=88,?sex='M'),
#?Row(name='Flora',?age=28,?score=90,?sex='F'),
#?Row(name='Run',?age=1,?score=60,?sex=None),
#?Row(name='Peter',?age=55,?score=100,?sex='M'),
#?Row(name='Mei',?age=54,?score=95,?sex='F')]
#?DataFrame.count
df.count()
#?5
#?DataFrame.columns
df.columns
#?['name',?'age',?'score',?'sex']
#?DataFrame.dtypes
df.dtypes
#?[('name',?'string'),?('age',?'bigint'),?('score',?'bigint'),?('sex',?'string')]
#?DataFrame.describe
#?返回列的基礎(chǔ)統(tǒng)計(jì)信息
df.describe(['age']).show()
#?+-------+------------------+
#?|summary|???????????????age|
#?+-------+------------------+
#?|??count|?????????????????5|
#?|???mean|??????????????33.2|
#?|?stddev|22.353970564532826|
#?|????min|?????????????????1|
#?|????max|????????????????55|
#?+-------+------------------+
df.describe().show()
#?+-------+-----+------------------+------------------+----+
#?|summary|?name|???????????????age|?????????????score|?sex|
#?+-------+-----+------------------+------------------+----+
#?|??count|????5|?????????????????5|?????????????????5|???4|
#?|???mean|?null|??????????????33.2|??????????????86.6|null|
#?|?stddev|?null|22.353970564532826|15.582040944625966|null|
#?|????min|Flora|?????????????????1|????????????????60|???F|
#?|????max|??Sam|????????????????55|???????????????100|???M|
#?+-------+-----+------------------+------------------+----+
#?DataFrame.select
#?選定指定列并按照一定順序呈現(xiàn)
df.select("sex",?"score").show()
#?DataFrame.first
#?DataFrame.head
#?查看第1條數(shù)據(jù)
df.first()
#?Row(name='Sam',?age=28,?score=88,?sex='M')
df.head(1)
#?[Row(name='Sam',?age=28,?score=88,?sex='M')]
#?DataFrame.freqItems
#?查看指定列的枚舉值
df.freqItems(["age","sex"]).show()
#?+---------------+-------------+
#?|??age_freqItems|sex_freqItems|
#?+---------------+-------------+
#?|[55,?1,?28,?54]|??????[M,?F,]|
#?+---------------+-------------+
#?DataFrame.summary
df.summary().show()
#?+-------+-----+------------------+------------------+----+
#?|summary|?name|???????????????age|?????????????score|?sex|
#?+-------+-----+------------------+------------------+----+
#?|??count|????5|?????????????????5|?????????????????5|???4|
#?|???mean|?null|??????????????33.2|??????????????86.6|null|
#?|?stddev|?null|22.353970564532826|15.582040944625966|null|
#?|????min|Flora|?????????????????1|????????????????60|???F|
#?|????25%|?null|????????????????28|????????????????88|null|
#?|????50%|?null|????????????????28|????????????????90|null|
#?|????75%|?null|????????????????54|????????????????95|null|
#?|????max|??Sam|????????????????55|???????????????100|???M|
#?+-------+-----+------------------+------------------+----+
#?DataFrame.sample
#?按照一定規(guī)則從df隨機(jī)抽樣數(shù)據(jù)
df.sample(0.5).show()
#?+-----+---+-----+----+
#?|?name|age|score|?sex|
#?+-----+---+-----+----+
#?|??Sam|?28|???88|???M|
#?|??Run|??1|???60|null|
#?|Peter|?55|??100|???M|
#?+-----+---+-----+----+
2. 簡單處理DataFrame的APIs
#?DataFrame.distinct
#?對數(shù)據(jù)集進(jìn)行去重
df.distinct().show()
#?DataFrame.dropDuplicates
#?對指定列去重
df.dropDuplicates(["sex"]).show()
#?+-----+---+-----+----+
#?|?name|age|score|?sex|
#?+-----+---+-----+----+
#?|Flora|?28|???90|???F|
#?|??Run|??1|???60|null|
#?|??Sam|?28|???88|???M|
#?+-----+---+-----+----+
#?DataFrame.exceptAll
#?DataFrame.subtract
#?根據(jù)指定的df對df進(jìn)行去重
df1?=?spark.createDataFrame(
????????[("a",?1),?("a",?1),?("b",??3),?("c",?4)],?["C1",?"C2"])
df2?=?spark.createDataFrame([("a",?1),?("b",?3)],?["C1",?"C2"])
df3?=?df1.exceptAll(df2)??#?沒有去重的功效
df4?=?df1.subtract(df2)??#?有去重的奇效
df1.show()
df2.show()
df3.show()
df4.show()
#?+---+---+
#?|?C1|?C2|
#?+---+---+
#?|??a|??1|
#?|??a|??1|
#?|??b|??3|
#?|??c|??4|
#?+---+---+
#?+---+---+
#?|?C1|?C2|
#?+---+---+
#?|??a|??1|
#?|??b|??3|
#?+---+---+
#?+---+---+
#?|?C1|?C2|
#?+---+---+
#?|??a|??1|
#?|??c|??4|
#?+---+---+
#?+---+---+
#?|?C1|?C2|
#?+---+---+
#?|??c|??4|
#?+---+---+
#?DataFrame.intersectAll
#?返回兩個(gè)DataFrame的交集
df1?=?spark.createDataFrame(
????????[("a",?1),?("a",?1),?("b",??3),?("c",?4)],?["C1",?"C2"])
df2?=?spark.createDataFrame([("a",?1),?("b",?4)],?["C1",?"C2"])
df1.intersectAll(df2).show()
#?+---+---+
#?|?C1|?C2|
#?+---+---+
#?|??a|??1|
#?+---+---+
#?DataFrame.drop
#?丟棄指定列
df.drop('age').show()
#?DataFrame.withColumn
#?新增列
df1?=?df.withColumn("birth_year",?2021?-?df.age)
df1.show()
#?+-----+---+-----+----+----------+
#?|?name|age|score|?sex|birth_year|
#?+-----+---+-----+----+----------+
#?|??Sam|?28|???88|???M|??????1993|
#?|Flora|?28|???90|???F|??????1993|
#?|??Run|??1|???60|null|??????2020|
#?|Peter|?55|??100|???M|??????1966|
#?|??Mei|?54|???95|???F|??????1967|
#?+-----+---+-----+----+----------+
#?DataFrame.withColumnRenamed
#?重命名列名
df1?=?df.withColumnRenamed("sex",?"gender")
df1.show()
#?+-----+---+-----+------+
#?|?name|age|score|gender|
#?+-----+---+-----+------+
#?|??Sam|?28|???88|?????M|
#?|Flora|?28|???90|?????F|
#?|??Run|??1|???60|??null|
#?|Peter|?55|??100|?????M|
#?|??Mei|?54|???95|?????F|
#?+-----+---+-----+------+
#?DataFrame.dropna
#?丟棄空值,DataFrame.dropna(how='any',?thresh=None,?subset=None)
df.dropna(how='all',?subset=['sex']).show()
#?+-----+---+-----+---+
#?|?name|age|score|sex|
#?+-----+---+-----+---+
#?|??Sam|?28|???88|??M|
#?|Flora|?28|???90|??F|
#?|Peter|?55|??100|??M|
#?|??Mei|?54|???95|??F|
#?+-----+---+-----+---+
#?DataFrame.fillna
#?空值填充操作
df1?=?spark.createDataFrame(
????????[("a",?None),?("a",?1),?(None,??3),?("c",?4)],?["C1",?"C2"])
#?df2?=?df1.na.fill({"C1":?"d",?"C2":?99})
df2?=?df1.fillna({"C1":?"d",?"C2":?99})
df1.show()
df2.show()
#?DataFrame.filter
#?根據(jù)條件過濾
df.filter(df.age>50).show()
#?+-----+---+-----+---+
#?|?name|age|score|sex|
#?+-----+---+-----+---+
#?|Peter|?55|??100|??M|
#?|??Mei|?54|???95|??F|
#?+-----+---+-----+---+
df.where(df.age==28).show()
#?+-----+---+-----+---+
#?|?name|age|score|sex|
#?+-----+---+-----+---+
#?|??Sam|?28|???88|??M|
#?|Flora|?28|???90|??F|
#?+-----+---+-----+---+
df.filter("age<18").show()
#?+----+---+-----+----+
#?|name|age|score|?sex|
#?+----+---+-----+----+
#?|?Run|??1|???60|null|
#?+----+---+-----+----+
#?DataFrame.join
#?這個(gè)不用多解釋了,直接上案例來看看具體的語法即可,DataFrame.join(other,?on=None,?how=None)
df1?=?spark.createDataFrame(
????????[("a",?1),?("d",?1),?("b",??3),?("c",?4)],?["id",?"num1"])
df2?=?spark.createDataFrame([("a",?1),?("b",?3)],?["id",?"num2"])
df1.join(df2,?df1.id?==?df2.id,?'left').select(df1.id.alias("df1_id"),
???????????????????????????????????????????????df1.num1.alias("df1_num"),
???????????????????????????????????????????????df2.num2.alias("df2_num")
???????????????????????????????????????????????).sort(["df1_id"],?ascending=False)\
????.show()
#?DataFrame.agg(*exprs)
#?聚合數(shù)據(jù),可以寫多個(gè)聚合方法,如果不寫groupBy的話就是對整個(gè)DF進(jìn)行聚合
#?DataFrame.alias
#?設(shè)置列或者DataFrame別名
#?DataFrame.groupBy
#?根據(jù)某幾列進(jìn)行聚合,如有多列用列表寫在一起,如?df.groupBy(["sex",?"age"])
df.groupBy("sex").agg(F.min(df.age).alias("最小年齡"),
??????????????????????F.expr("avg(age)").alias("平均年齡"),
??????????????????????F.expr("collect_list(name)").alias("姓名集合")
??????????????????????).show()
#?+----+--------+--------+------------+
#?|?sex|最小年齡|平均年齡|????姓名集合|
#?+----+--------+--------+------------+
#?|???F|??????28|????41.0|[Flora,?Mei]|
#?|null|???????1|?????1.0|???????[Run]|
#?|???M|??????28|????41.5|[Sam,?Peter]|
#?+----+--------+--------+------------+
#?DataFrame.foreach
#?對每一行進(jìn)行函數(shù)方法的應(yīng)用
def?f(person):
????print(person.name)
df.foreach(f)
#?Peter
#?Run
#?Sam
#?Flora
#?Mei
#?DataFrame.replace
#?修改df里的某些值
df1?=?df.na.replace({"M":?"Male",?"F":?"Female"})
df1.show()
#?DataFrame.union
#?相當(dāng)于SQL里的union?all操作
df1?=?spark.createDataFrame(
????????[("a",?1),?("d",?1),?("b",??3),?("c",?4)],?["id",?"num"])
df2?=?spark.createDataFrame([("a",?1),?("b",?3)],?["id",?"num"])
df1.union(df2).show()
df1.unionAll(df2).show()
#?這里union沒有去重,不知道為啥,有知道的朋友麻煩解釋下,謝謝了。
#?+---+---+
#?|?id|num|
#?+---+---+
#?|??a|??1|
#?|??d|??1|
#?|??b|??3|
#?|??c|??4|
#?|??a|??1|
#?|??b|??3|
#?+---+---+
#?DataFrame.unionByName
#?根據(jù)列名來進(jìn)行合并數(shù)據(jù)集
df1?=?spark.createDataFrame([[1,?2,?3]],?["col0",?"col1",?"col2"])
df2?=?spark.createDataFrame([[4,?5,?6]],?["col1",?"col2",?"col0"])
df1.unionByName(df2).show()
#?+----+----+----+
#?|col0|col1|col2|
#?+----+----+----+
#?|???1|???2|???3|
#?|???6|???4|???5|
#?+----+----+----+
3. DataFrame的列操作APIs
這里主要針對的是列進(jìn)行操作,比如說重命名、排序、空值判斷、類型判斷等,這里就不展開寫demo了,看看語法應(yīng)該大家都懂了。
Column.alias(*alias,?**kwargs)??#?重命名列名
Column.asc()??#?按照列進(jìn)行升序排序
Column.desc()??#?按照列進(jìn)行降序排序
Column.astype(dataType)??#?類型轉(zhuǎn)換
Column.cast(dataType)??#?強(qiáng)制轉(zhuǎn)換類型
Column.between(lowerBound,?upperBound)??#?返回布爾值,是否在指定區(qū)間范圍內(nèi)
Column.contains(other)??#?是否包含某個(gè)關(guān)鍵詞
Column.endswith(other)??#?以什么結(jié)束的值,如?df.filter(df.name.endswith('ice')).collect()
Column.isNotNull()??#?篩選非空的行
Column.isNull()
Column.isin(*cols)??#?返回包含某些值的行?df[df.name.isin("Bob",?"Mike")].collect()
Column.like(other)??#?返回含有關(guān)鍵詞的行
Column.when(condition,?value)??#?給True的賦值
Column.otherwise(value)??#?與when搭配使用,df.select(df.name,?F.when(df.age?>?3,?1).otherwise(0)).show()
Column.rlike(other)??#?可以使用正則的匹配?df.filter(df.name.rlike('ice$')).collect()
Column.startswith(other)??#?df.filter(df.name.startswith('Al')).collect()
Column.substr(startPos,?length)??#?df.select(df.name.substr(1,?3).alias("col")).collect()
4. DataFrame的一些思路變換操作APIs
#?DataFrame.createOrReplaceGlobalTempView
#?DataFrame.dropGlobalTempView
#?創(chuàng)建全局的試圖,注冊后可以使用sql語句來進(jìn)行操作,生命周期取決于Spark?application本身
df.createOrReplaceGlobalTempView("people")
spark.sql("select?*?from?global_temp.people?where?sex?=?'M'?").show()
#?+-----+---+-----+---+
#?|?name|age|score|sex|
#?+-----+---+-----+---+
#?|??Sam|?28|???88|??M|
#?|Peter|?55|??100|??M|
#?+-----+---+-----+---+
#?DataFrame.createOrReplaceTempView
#?DataFrame.dropTempView
#?創(chuàng)建本地臨時(shí)試圖,生命周期取決于用來創(chuàng)建此數(shù)據(jù)集的SparkSession
df.createOrReplaceTempView("tmp_people")
spark.sql("select?*?from?tmp_people?where?sex?=?'F'?").show()
#?+-----+---+-----+---+
#?|?name|age|score|sex|
#?+-----+---+-----+---+
#?|Flora|?28|???90|??F|
#?|??Mei|?54|???95|??F|
#?+-----+---+-----+---+
#?DataFrame.cache\DataFrame.persist
#?可以把一些數(shù)據(jù)放入緩存中,default?storage?level?(MEMORY_AND_DISK).
df.cache()
df.persist()
df.unpersist()
#?DataFrame.crossJoin
#?返回兩個(gè)DataFrame的笛卡爾積關(guān)聯(lián)的DataFrame
df1?=?df.select("name",?"sex")
df2?=?df.select("name",?"sex")
df3?=?df1.crossJoin(df2)
print("表1的記錄數(shù)",?df1.count())
print("表2的記錄數(shù)",?df2.count())
print("笛卡爾積后的記錄數(shù)",?df3.count())
#?表1的記錄數(shù)?5
#?表2的記錄數(shù)?5
#?笛卡爾積后的記錄數(shù)?25
#?DataFrame.toPandas
#?把SparkDataFrame轉(zhuǎn)為?Pandas的DataFrame
df.toPandas()
#?DataFrame.rdd
#?把SparkDataFrame轉(zhuǎn)為rdd,這樣子可以用rdd的語法來操作數(shù)據(jù)
df.rdd
5. DataFrame的一些統(tǒng)計(jì)操作APIs
#?DataFrame.cov
#?計(jì)算指定兩列的樣本協(xié)方差
df.cov("age",?"score")
#?324.59999999999997
#?DataFrame.corr
#?計(jì)算指定兩列的相關(guān)系數(shù),DataFrame.corr(col1,?col2,?method=None),目前method只支持Pearson相關(guān)系數(shù)
df.corr("age",?"score",?method="pearson")
#?0.9319004030498815
#?DataFrame.cube
#?創(chuàng)建多維度聚合的結(jié)果,通常用于分析數(shù)據(jù),比如我們指定兩個(gè)列進(jìn)行聚合,比如name和age,那么這個(gè)函數(shù)返回的聚合結(jié)果會
#?groupby("name",?"age")
#?groupby("name")
#?groupby("age")
#?groupby(all)
#?四個(gè)聚合結(jié)果的union?all?的結(jié)果
df1?=?df.filter(df.name?!=?"Run")
print(df1.show())
df1.cube("name",?"sex").count().show()
#?+-----+---+-----+---+
#?|?name|age|score|sex|
#?+-----+---+-----+---+
#?|??Sam|?28|???88|??M|
#?|Flora|?28|???90|??F|
#?|Peter|?55|??100|??M|
#?|??Mei|?54|???95|??F|
#?+-----+---+-----+---+
#?cube?聚合之后的結(jié)果
#?+-----+----+-----+
#?|?name|?sex|count|
#?+-----+----+-----+
#?|?null|???F|????2|
#?|?null|null|????4|
#?|Flora|null|????1|
#?|Peter|null|????1|
#?|?null|???M|????2|
#?|Peter|???M|????1|
#?|??Sam|???M|????1|
#?|??Sam|null|????1|
#?|??Mei|???F|????1|
#?|??Mei|null|????1|
#?|Flora|???F|????1|
#?+-----+----+-----+
保存數(shù)據(jù)/寫入數(shù)據(jù)庫
這里的保存數(shù)據(jù)主要是保存到Hive中的栗子,主要包括了overwrite、append等方式。
1. 當(dāng)結(jié)果集為SparkDataFrame的時(shí)候
import?pandas?as?pd
from?datetime?import?datetime
from?pyspark?import?SparkConf
from?pyspark?import?SparkContext
from?pyspark.sql?import?HiveContext
conf?=?SparkConf()\
??????.setAppName("test")\
??????.set("hive.exec.dynamic.partition.mode",?"nonstrict")?#?動態(tài)寫入hive分區(qū)表
sc?=?SparkContext(conf=conf)
hc?=?HiveContext(sc)
sc.setLogLevel("ERROR")
????
list_values?=?[['Sam',?28,?88],?['Flora',?28,?90],?['Run',?1,?60]]
Spark_df?=?spark.createDataFrame(list_values,?['name',?'age',?'score'])
print(Spark_df.show())
save_table?=?"tmp.samshare_pyspark_savedata"
#?方式1:直接寫入到Hive
Spark_df.write.format("hive").mode("overwrite").saveAsTable(save_table)?#?或者改成append模式
print(datetime.now().strftime("%y/%m/%d?%H:%M:%S"),?"測試數(shù)據(jù)寫入到表"?+?save_table)
#?方式2:注冊為臨時(shí)表,使用SparkSQL來寫入分區(qū)表
Spark_df.createOrReplaceTempView("tmp_table")
write_sql?=?"""
insert?overwrite?table?{0}?partitions?(pt_date='{1}')
select?*?from?tmp_table
""".format(save_table,?"20210520")
hc.sql(write_sql)
print(datetime.now().strftime("%y/%m/%d?%H:%M:%S"),?"測試數(shù)據(jù)寫入到表"?+?save_table)
2. 當(dāng)結(jié)果集為Python的DataFrame的時(shí)候
如果是Python的DataFrame,我們就需要多做一步把它轉(zhuǎn)換為SparkDataFrame,其余操作就一樣了。
import?pandas?as?pd
from?datetime?import?datetime
from?pyspark?import?SparkConf
from?pyspark?import?SparkContext
from?pyspark.sql?import?HiveContext
conf?=?SparkConf()\
??????.setAppName("test")\
??????.set("hive.exec.dynamic.partition.mode",?"nonstrict")?#?動態(tài)寫入hive分區(qū)表
sc?=?SparkContext(conf=conf)
hc?=?HiveContext(sc)
sc.setLogLevel("ERROR")
????
result_df?=?pd.DataFrame([1,2,3],?columns=['a'])
save_table?=?"tmp.samshare_pyspark_savedata"
#?獲取DataFrame的schema
c1?=?list(result_df.columns)
#?轉(zhuǎn)為SparkDataFrame
result?=?hc.createDataFrame(result_df.astype(str),?c1)
result.write.format("hive").mode("overwrite").saveAsTable(save_table)?#?或者改成append模式
print(datetime.now().strftime("%y/%m/%d?%H:%M:%S"),?"測試數(shù)據(jù)寫入到表"?+?save_table)
?? Spark調(diào)優(yōu)思路
這一小節(jié)的內(nèi)容算是對pyspark入門的一個(gè)ending了,全文主要是參考學(xué)習(xí)了美團(tuán)Spark性能優(yōu)化指南的基礎(chǔ)篇和高級篇內(nèi)容,主體脈絡(luò)和這兩篇文章是一樣的,只不過是基于自己學(xué)習(xí)后的理解進(jìn)行了一次總結(jié)復(fù)盤,而原文中主要是用Java來舉例的,我這邊主要用pyspark來舉例。文章主要會從4個(gè)方面(或者說4個(gè)思路)來優(yōu)化我們的Spark任務(wù),主要就是下面的圖片所示:

開發(fā)習(xí)慣調(diào)優(yōu)
1. 盡可能復(fù)用同一個(gè)RDD,避免重復(fù)創(chuàng)建,并且適當(dāng)持久化數(shù)據(jù)
這種開發(fā)習(xí)慣是需要我們對于即將要開發(fā)的應(yīng)用邏輯有比較深刻的思考,并且可以通過code review來發(fā)現(xiàn)的,講白了就是要記得我們創(chuàng)建過啥數(shù)據(jù)集,可以復(fù)用的盡量廣播(broadcast)下,能很好提升性能。
#?最低級寫法,相同數(shù)據(jù)集重復(fù)創(chuàng)建。
rdd1?=?sc.textFile("./test/data/hello_samshare.txt",?4)?#?這里的?4?指的是分區(qū)數(shù)量
rdd2?=?sc.textFile("./test/data/hello_samshare.txt",?4)?#?這里的?4?指的是分區(qū)數(shù)量
print(rdd1.take(10))
print(rdd2.map(lambda?x:x[0:1]).take(10))
#?稍微進(jìn)階一些,復(fù)用相同數(shù)據(jù)集,但因中間結(jié)果沒有緩存,數(shù)據(jù)會重復(fù)計(jì)算
rdd1?=?sc.textFile("./test/data/hello_samshare.txt",?4)?#?這里的?4?指的是分區(qū)數(shù)量
print(rdd1.take(10))
print(rdd1.map(lambda?x:x[0:1]).take(10))
#?相對比較高效,使用緩存來持久化數(shù)據(jù)
rdd?=?sc.parallelize(range(1,?11),?4).cache()??#?或者persist()
rdd_map?=?rdd.map(lambda?x:?x*2)
rdd_reduce?=?rdd.reduce(lambda?x,?y:?x+y)
print(rdd_map.take(10))
print(rdd_reduce)
下面我們就來對比一下使用緩存能給我們的Spark程序帶來多大的效率提升吧,我們先構(gòu)造一個(gè)程序運(yùn)行時(shí)長測量器。
import?time
#?統(tǒng)計(jì)程序運(yùn)行時(shí)間
def?time_me(info="used"):
????def?_time_me(fn):
[email protected](fn)
????????def?_wrapper(*args,?**kwargs):
????????????start?=?time.time()
????????????fn(*args,?**kwargs)
????????????print("%s?%s?%s"?%?(fn.__name__,?info,?time.time()?-?start),?"second")
????????return?_wrapper
????return?_time_me
下面我們運(yùn)行下面的代碼,看下使用了cache帶來的效率提升:
@time_me()
def?test(types=0):
????if?types?==?1:
????????print("使用持久化緩存")
????????rdd?=?sc.parallelize(range(1,?10000000),?4)
????????rdd1?=?rdd.map(lambda?x:?x*x?+?2*x?+?1).cache()??#?或者?persist(StorageLevel.MEMORY_AND_DISK_SER)
????????print(rdd1.take(10))
????????rdd2?=?rdd1.reduce(lambda?x,?y:?x+y)
????????rdd3?=?rdd1.reduce(lambda?x,?y:?x?+?y)
????????rdd4?=?rdd1.reduce(lambda?x,?y:?x?+?y)
????????rdd5?=?rdd1.reduce(lambda?x,?y:?x?+?y)
????????print(rdd5)
????else:
????????print("不使用持久化緩存")
????????rdd?=?sc.parallelize(range(1,?10000000),?4)
????????rdd1?=?rdd.map(lambda?x:?x?*?x?+?2?*?x?+?1)
????????print(rdd1.take(10))
????????rdd2?=?rdd1.reduce(lambda?x,?y:?x?+?y)
????????rdd3?=?rdd1.reduce(lambda?x,?y:?x?+?y)
????????rdd4?=?rdd1.reduce(lambda?x,?y:?x?+?y)
????????rdd5?=?rdd1.reduce(lambda?x,?y:?x?+?y)
????????print(rdd5)
????????
test()???#?不使用持久化緩存
time.sleep(10)
test(1)??#?使用持久化緩存
#?output:
#?使用持久化緩存
#?[4,?9,?16,?25,?36,?49,?64,?81,?100,?121]
#?333333383333334999999
#?test?used?26.36529278755188?second
#?使用持久化緩存
#?[4,?9,?16,?25,?36,?49,?64,?81,?100,?121]
#?333333383333334999999
#?test?used?17.49532413482666?second
同時(shí)我們打開YARN日志來看看:http://localhost:4040/jobs/

因?yàn)槲覀兊拇a是需要重復(fù)調(diào)用RDD1的,當(dāng)沒有對RDD1進(jìn)行持久化的時(shí)候,每次當(dāng)它被action算子消費(fèi)了之后,就釋放了,等下一個(gè)算子計(jì)算的時(shí)候要用,就從頭開始計(jì)算一下RDD1。代碼中需要重復(fù)調(diào)用RDD1 五次,所以沒有緩存的話,差不多每次都要6秒,總共需要耗時(shí)26秒左右,但是,做了緩存,每次就只需要3s不到,總共需要耗時(shí)17秒左右。
另外,這里需要提及一下一個(gè)知識點(diǎn),那就是持久化的級別,一般cache的話就是放入內(nèi)存中,就沒有什么好說的,需要講一下的就是另外一個(gè) persist(),它的持久化級別是可以被我們所配置的:
| 持久化級別 | 含義解釋 |
|---|---|
| MEMORY_ONLY | 將數(shù)據(jù)保存在內(nèi)存中。如果內(nèi)存不夠存放所有的數(shù)據(jù),則數(shù)據(jù)可能就不會進(jìn)行持久化。使用cache()方法時(shí),實(shí)際就是使用的這種持久化策略,性能也是最高的。 |
| MEMORY_AND_DISK | 優(yōu)先嘗試將數(shù)據(jù)保存在內(nèi)存中,如果內(nèi)存不夠存放所有的數(shù)據(jù),會將數(shù)據(jù)寫入磁盤文件中。 |
| MEMORY_ONLY_SER | 基本含義同MEMORY_ONLY。唯一的區(qū)別是,會將RDD中的數(shù)據(jù)進(jìn)行序列化,RDD的每個(gè)partition會被序列化成一個(gè)字節(jié)數(shù)組。這種方式更加節(jié)省內(nèi)存,從而可以避免持久化的數(shù)據(jù)占用過多內(nèi)存導(dǎo)致頻繁GC。 |
| MEMORY_AND_DISK_SER | 基本含義同MEMORY_AND_DISK。唯一的區(qū)別是會先序列化,節(jié)約內(nèi)存。 |
| DISK_ONLY | 使用未序列化的Java對象格式,將數(shù)據(jù)全部寫入磁盤文件中。一般不推薦使用。 |
| MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等. | 對于上述任意一種持久化策略,如果加上后綴_2,代表的是將每個(gè)持久化的數(shù)據(jù),都復(fù)制一份副本,并將副本保存到其他節(jié)點(diǎn)上。這種基于副本的持久化機(jī)制主要用于進(jìn)行容錯(cuò)。假如某個(gè)節(jié)點(diǎn)掛掉,節(jié)點(diǎn)的內(nèi)存或磁盤中的持久化數(shù)據(jù)丟失了,那么后續(xù)對RDD計(jì)算時(shí)還可以使用該數(shù)據(jù)在其他節(jié)點(diǎn)上的副本。如果沒有副本的話,就只能將這些數(shù)據(jù)從源頭處重新計(jì)算一遍了。一般也不推薦使用。 |
2. 盡量避免使用低性能算子
shuffle類算子算是低性能算子的一種代表,所謂的shuffle類算子,指的是會產(chǎn)生shuffle過程的操作,就是需要把各個(gè)節(jié)點(diǎn)上的相同key寫入到本地磁盤文件中,然后其他的節(jié)點(diǎn)通過網(wǎng)絡(luò)傳輸拉取自己需要的key,把相同key拉到同一個(gè)節(jié)點(diǎn)上進(jìn)行聚合計(jì)算,這種操作必然就是有大量的數(shù)據(jù)網(wǎng)絡(luò)傳輸與磁盤讀寫操作,性能往往不是很好的。
那么,Spark中有哪些算子會產(chǎn)生shuffle過程呢?
| 操作類別 | shuffle類算子 | 備注 |
|---|---|---|
| 分區(qū)操作 | repartition()、repartitionAndSortWithinPartitions()、coalesce(shuffle=true) | 重分區(qū)操作一般都會shuffle,因?yàn)樾枰獙λ械姆謪^(qū)數(shù)據(jù)進(jìn)行打亂。 |
| 聚合操作 | reduceByKey、groupByKey、sortByKey | 需要對相同key進(jìn)行操作,所以需要拉到同一個(gè)節(jié)點(diǎn)上。 |
| 關(guān)聯(lián)操作 | join類操作 | 需要把相同key的數(shù)據(jù)shuffle到同一個(gè)節(jié)點(diǎn)然后進(jìn)行笛卡爾積 |
| 去重操作 | distinct等 | 需要對相同key進(jìn)行操作,所以需要shuffle到同一個(gè)節(jié)點(diǎn)上。 |
| 排序操作 | sortByKey等 | 需要對相同key進(jìn)行操作,所以需要shuffle到同一個(gè)節(jié)點(diǎn)上。 |
這里進(jìn)一步介紹一個(gè)替代join的方案,因?yàn)閖oin其實(shí)在業(yè)務(wù)中還是蠻常見的。
#?原則2:盡量避免使用低性能算子
rdd1?=?sc.parallelize([('A1',?211),?('A1',?212),?('A2',?22),?('A4',?24),?('A5',?25)])
rdd2?=?sc.parallelize([('A1',?11),?('A2',?12),?('A3',?13),?('A4',?14)])
#?低效的寫法,也是傳統(tǒng)的寫法,直接join
rdd_join?=?rdd1.join(rdd2)
print(rdd_join.collect())
#?[('A4',?(24,?14)),?('A2',?(22,?12)),?('A1',?(211,?11)),?('A1',?(212,?11))]
rdd_left_join?=?rdd1.leftOuterJoin(rdd2)
print(rdd_left_join.collect())
#?[('A4',?(24,?14)),?('A2',?(22,?12)),?('A5',?(25,?None)),?('A1',?(211,?11)),?('A1',?(212,?11))]
rdd_full_join?=?rdd1.fullOuterJoin(rdd2)
print(rdd_full_join.collect())
#?[('A4',?(24,?14)),?('A3',?(None,?13)),?('A2',?(22,?12)),?('A5',?(25,?None)),?('A1',?(211,?11)),?('A1',?(212,?11))]
#?高效的寫法,使用廣播+map來實(shí)現(xiàn)相同效果
#?tips1:?這里需要注意的是,用來broadcast的RDD不可以太大,最好不要超過1G
#?tips2:?這里需要注意的是,用來broadcast的RDD不可以有重復(fù)的key的
rdd1?=?sc.parallelize([('A1',?11),?('A2',?12),?('A3',?13),?('A4',?14)])
rdd2?=?sc.parallelize([('A1',?211),?('A1',?212),?('A2',?22),?('A4',?24),?('A5',?25)])
# step1:?先將小表進(jìn)行廣播,也就是collect到driver端,然后廣播到每個(gè)Executor中去。
rdd_small_bc?=?sc.broadcast(rdd1.collect())
# step2:從Executor中獲取存入字典便于后續(xù)map操作
rdd_small_dict?=?dict(rdd_small_bc.value)
# step3:定義join方法
def?broadcast_join(line,?rdd_small_dict,?join_type):
????k?=?line[0]
????v?=?line[1]
????small_table_v?=?rdd_small_dict[k]?if?k?in?rdd_small_dict?else?None
????if?join_type?==?'join':
????????return?(k,?(v,?small_table_v))?if?k?in?rdd_small_dict?else?None
????elif?join_type?==?'left_join':
????????return?(k,?(v,?small_table_v?if?small_table_v?is?not?None?else?None))
????else:
????????print("not?support?join?type!")
# step4:使用 map 實(shí)現(xiàn)?兩個(gè)表join的功能
rdd_join?=?rdd2.map(lambda?line:?broadcast_join(line,?rdd_small_dict,?"join")).filter(lambda?line:?line?is?not?None)
rdd_left_join?=?rdd2.map(lambda?line:?broadcast_join(line,?rdd_small_dict,?"left_join")).filter(lambda?line:?line?is?not?None)
print(rdd_join.collect())
print(rdd_left_join.collect())
#?[('A1',?(211,?11)),?('A1',?(212,?11)),?('A2',?(22,?12)),?('A4',?(24,?14))]
#?[('A1',?(211,?11)),?('A1',?(212,?11)),?('A2',?(22,?12)),?('A4',?(24,?14)),?('A5',?(25,?None))]
上面的RDD join被改寫為 broadcast+map的PySpark版本實(shí)現(xiàn),不過里面有兩個(gè)點(diǎn)需要注意:
- tips1: 用來broadcast的RDD不可以太大,最好不要超過1G
- tips2: 用來broadcast的RDD不可以有重復(fù)的key的
3. 盡量使用高性能算子
上一節(jié)講到了低效算法,自然地就會有一些高效的算子。
| 原算子 | 高效算子(替換算子) | 說明 |
|---|---|---|
| map | mapPartitions | 直接map的話,每次只會處理一條數(shù)據(jù),而mapPartitions則是每次處理一個(gè)分區(qū)的數(shù)據(jù),在某些場景下相對比較高效。(分區(qū)數(shù)據(jù)量不大的情況下使用,如果有數(shù)據(jù)傾斜的話容易發(fā)生OOM) |
| groupByKey | reduceByKey/aggregateByKey | 這類算子會在原節(jié)點(diǎn)先map-side預(yù)聚合,相對高效些。 |
| foreach | foreachPartitions | 同第一條記錄一樣。 |
| filter | filter+coalesce | 當(dāng)我們對數(shù)據(jù)進(jìn)行filter之后,有很多partition的數(shù)據(jù)會劇減,然后直接進(jìn)行下一步操作的話,可能就partition數(shù)量很多但處理的數(shù)據(jù)又很少,task數(shù)量沒有減少,反而整體速度很慢;但如果執(zhí)行了coalesce算子,就會減少一些partition數(shù)量,把數(shù)據(jù)都相對壓縮到一起,用更少的task處理完全部數(shù)據(jù),一定場景下還是可以達(dá)到整體性能的提升。 |
| repartition+sort | repartitionAndSortWithinPartitions | 直接用就是了。 |
4. 廣播大變量
如果我們有一個(gè)數(shù)據(jù)集很大,并且在后續(xù)的算子執(zhí)行中會被反復(fù)調(diào)用,那么就建議直接把它廣播(broadcast)一下。當(dāng)變量被廣播后,會保證每個(gè)executor的內(nèi)存中只會保留一份副本,同個(gè)executor內(nèi)的task都可以共享這個(gè)副本數(shù)據(jù)。如果沒有廣播,常規(guī)過程就是把大變量進(jìn)行網(wǎng)絡(luò)傳輸?shù)矫恳粋€(gè)相關(guān)task中去,這樣子做,一來頻繁的網(wǎng)絡(luò)數(shù)據(jù)傳輸,效率極其低下;二來executor下的task不斷存儲同一份大數(shù)據(jù),很有可能就造成了內(nèi)存溢出或者頻繁GC,效率也是極其低下的。
#?原則4:廣播大變量
rdd1?=?sc.parallelize([('A1',?11),?('A2',?12),?('A3',?13),?('A4',?14)])
rdd1_broadcast?=?sc.broadcast(rdd1.collect())
print(rdd1.collect())
print(rdd1_broadcast.value)
#?[('A1',?11),?('A2',?12),?('A3',?13),?('A4',?14)]
#?[('A1',?11),?('A2',?12),?('A3',?13),?('A4',?14)]
資源參數(shù)調(diào)優(yōu)
如果要進(jìn)行資源調(diào)優(yōu),我們就必須先知道Spark運(yùn)行的機(jī)制與流程。

下面我們就來講解一些常用的Spark資源配置的參數(shù)吧,了解其參數(shù)原理便于我們依據(jù)實(shí)際的數(shù)據(jù)情況進(jìn)行配置。
1)num-executors
指的是執(zhí)行器的數(shù)量,數(shù)量的多少代表了并行的stage數(shù)量(假如executor是單核的話),但也并不是越多越快,受你集群資源的限制,所以一般設(shè)置50-100左右吧。
2)executor-memory
這里指的是每一個(gè)執(zhí)行器的內(nèi)存大小,內(nèi)存越大當(dāng)然對于程序運(yùn)行是很好的了,但是也不是無節(jié)制地大下去,同樣受我們集群資源的限制。假設(shè)我們集群資源為500core,一般1core配置4G內(nèi)存,所以集群最大的內(nèi)存資源只有2000G左右。num-executors x executor-memory 是不能超過2000G的,但是也不要太接近這個(gè)值,不然的話集群其他同事就沒法正常跑數(shù)據(jù)了,一般我們設(shè)置4G-8G。
3)executor-cores
這里設(shè)置的是executor的CPU core數(shù)量,決定了executor進(jìn)程并行處理task的能力。
4)driver-memory
設(shè)置driver的內(nèi)存,一般設(shè)置2G就好了。但如果想要做一些Python的DataFrame操作可以適當(dāng)?shù)匕堰@個(gè)值設(shè)大一些。
5)driver-cores
與executor-cores類似的功能。
6)spark.default.parallelism
設(shè)置每個(gè)stage的task數(shù)量。一般Spark任務(wù)我們設(shè)置task數(shù)量在500-1000左右比較合適,如果不去設(shè)置的話,Spark會根據(jù)底層HDFS的block數(shù)量來自行設(shè)置task數(shù)量。有的時(shí)候會設(shè)置得偏少,這樣子程序就會跑得很慢,即便你設(shè)置了很多的executor,但也沒有用。
下面說一個(gè)基本的參數(shù)設(shè)置的shell腳本,一般我們都是通過一個(gè)shell腳本來設(shè)置資源參數(shù)配置,接著就去調(diào)用我們的主函數(shù)。
#!/bin/bash
basePath=$(cd?"$(dirname?)"$(cd?"$(dirname?"$0"):?pwd)")":?pwd)
spark-submit?\
????--master?yarn?\
????--queue?samshare?\
????--deploy-mode?client?\
????--num-executors?100?\
????--executor-memory?4G?\
????--executor-cores?4?\
????--driver-memory?2G?\
????--driver-cores?2?\
????--conf?spark.default.parallelism=1000?\
????--conf?spark.yarn.executor.memoryOverhead=8G?\
????--conf?spark.sql.shuffle.partitions=1000?\
????--conf?spark.network.timeout=1200?\
????--conf?spark.python.worker.memory=64m?\
????--conf?spark.sql.catalogImplementation=hive?\
????--conf?spark.sql.crossJoin.enabled=True?\
????--conf?spark.dynamicAllocation.enabled=True?\
????--conf?spark.shuffle.service.enabled=True?\
????--conf?spark.scheduler.listenerbus.eventqueue.size=100000?\
????--conf?spark.pyspark.driver.python=python3?\
????--conf?spark.pyspark.python=python3?\
????--conf?spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3?\
????--conf?spark.sql.pivotMaxValues=500000?\
????--conf?spark.hadoop.hive.exec.dynamic.partition=True?\
????--conf?spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict?\
????--conf?spark.hadoop.hive.exec.max.dynamic.partitions.pernode=100000?\
????--conf?spark.hadoop.hive.exec.max.dynamic.partitions=100000?\
????--conf?spark.hadoop.hive.exec.max.created.files=100000?\
????${bashPath}/project_name/main.py?$v_var1?$v_var2
數(shù)據(jù)傾斜調(diào)優(yōu)
相信我們對于數(shù)據(jù)傾斜并不陌生了,很多時(shí)間數(shù)據(jù)跑不出來有很大的概率就是出現(xiàn)了數(shù)據(jù)傾斜,在Spark開發(fā)中無法避免的也會遇到這類問題,而這不是一個(gè)嶄新的問題,成熟的解決方案也是有蠻多的,今天來簡單介紹一些比較常用并且有效的方案。
首先我們要知道,在Spark中比較容易出現(xiàn)傾斜的操作,主要集中在distinct、groupByKey、reduceByKey、aggregateByKey、join、repartition等,可以優(yōu)先看這些操作的前后代碼。而為什么使用了這些操作就容易導(dǎo)致數(shù)據(jù)傾斜呢?大多數(shù)情況就是進(jìn)行操作的key分布不均,然后使得大量的數(shù)據(jù)集中在同一個(gè)處理節(jié)點(diǎn)上,從而發(fā)生了數(shù)據(jù)傾斜。
查看Key 分布
#?針對Spark?SQL
hc.sql("select?key,?count(0)?nums?from?table_name?group?by?key")
#?針對RDD
RDD.countByKey()
Plan A: 過濾掉導(dǎo)致傾斜的key
這個(gè)方案并不是所有場景都可以使用的,需要結(jié)合業(yè)務(wù)邏輯來分析這個(gè)key到底還需要不需要,大多數(shù)情況可能就是一些異常值或者空串,這種就直接進(jìn)行過濾就好了。
Plan B: 提前處理聚合
如果有些Spark應(yīng)用場景需要頻繁聚合數(shù)據(jù),而數(shù)據(jù)key又少的,那么我們可以把這些存量數(shù)據(jù)先用hive算好(每天算一次),然后落到中間表,后續(xù)Spark應(yīng)用直接用聚合好的表+新的數(shù)據(jù)進(jìn)行二度聚合,效率會有很高的提升。
Plan C: 調(diào)高shuffle并行度
#?針對Spark?SQL?
--conf?spark.sql.shuffle.partitions=1000??#?在配置信息中設(shè)置參數(shù)
#?針對RDD
rdd.reduceByKey(1000)?#?默認(rèn)是200
Plan D: 分配隨機(jī)數(shù)再聚合
大概的思路就是對一些大量出現(xiàn)的key,人工打散,從而可以利用多個(gè)task來增加任務(wù)并行度,以達(dá)到效率提升的目的,下面是代碼demo,分別從RDD 和 SparkSQL來實(shí)現(xiàn)。
#?Way1:?PySpark?RDD實(shí)現(xiàn)
import?pyspark
from?pyspark?import?SparkContext,?SparkConf,?HiveContext
from?random?import?randint
import?pandas?as?pd
# SparkSQL的許多功能封裝在SparkSession的方法接口中, SparkContext則不行的。
from?pyspark.sql?import?SparkSession
spark?=?SparkSession.builder?\
????.appName("sam_SamShare")?\
????.config("master",?"local[4]")?\
????.enableHiveSupport()?\
????.getOrCreate()
conf?=?SparkConf().setAppName("test_SamShare").setMaster("local[4]")
sc?=?SparkContext(conf=conf)
hc?=?HiveContext(sc)
#?分配隨機(jī)數(shù)再聚合
rdd1?=?sc.parallelize([('sam',?1),?('sam',?1),?('sam',?1),?('sam',?1),?('sam',?1),?('sam',?1)])
#?給key分配隨機(jī)數(shù)后綴
rdd2?=?rdd1.map(lambda?x:?(x[0]?+?"_"?+?str(randint(1,5)),?x[1]))
print(rdd.take(10))
#?[('sam_5',?1),?('sam_5',?1),?('sam_3',?1),?('sam_5',?1),?('sam_5',?1),?('sam_3',?1)]
#?局部聚合
rdd3?=?rdd2.reduceByKey(lambda?x,y?:?(x+y))
print(rdd3.take(10))
#?[('sam_5',?4),?('sam_3',?2)]
#?去除后綴
rdd4?=?rdd3.map(lambda?x:?(x[0][:-2],?x[1]))
print(rdd4.take(10))
#?[('sam',?4),?('sam',?2)]
#?全局聚合
rdd5?=?rdd4.reduceByKey(lambda?x,y?:?(x+y))
print(rdd5.take(10))
#?[('sam',?6)]
#?Way2:?PySpark?SparkSQL實(shí)現(xiàn)
df?=?pd.DataFrame(5*[['Sam',?1],['Flora',?1]],
??????????????????columns=['name',?'nums'])
Spark_df?=?spark.createDataFrame(df)
print(Spark_df.show(10))
Spark_df.createOrReplaceTempView("tmp_table")?#?注冊為視圖供SparkSQl使用
sql?=?"""
with?t1?as?(
????select?concat(name,"_",int(10*rand()))?as?new_name,?name,?nums
????from?tmp_table
),
t2?as?(
????select?new_name,?sum(nums)?as?n
????from?t1
????group?by?new_name
),
t3?as?(
????select?substr(new_name,0,length(new_name)?-2)?as?name,?sum(n)?as?nums_sum?
????from?t2
????group?by?substr(new_name,0,length(new_name)?-2)
)
select?*
from?t3
"""
tt?=?hc.sql(sql).toPandas()
tt
下面是原理圖。

全文終!
如果想下載PDF,可以在后臺輸入 “pyspark” 獲取
??學(xué)習(xí)資源推薦
1)edureka about PySpark Tutorial
印度老哥的課程,B站可直接看,不過口音略難聽懂不過還好有字幕。
https://www.bilibili.com/video/BV1i4411i79a?p=1
2)eat_pyspark_in_10_days
梁云大哥的課程,講得超級清晰,建議精讀。
https://github.com/lyhue1991/eat_pyspark_in_10_days
3)官方文檔
http://spark.apache.org/docs/latest/api/python/reference/index.html
4)《Spark性能優(yōu)化指南——基礎(chǔ)篇》
https://tech.meituan.com/2016/04/29/spark-tuning-basic.html
5)《Spark性能優(yōu)化指南——高級篇》
https://tech.meituan.com/2016/05/12/spark-tuning-pro.html
