3w字長(zhǎng)文!神器!比SQL/Python都好用!
大家好,我是寶器!
為什么要學(xué)習(xí)Spark?作為數(shù)據(jù)從業(yè)者多年,個(gè)人覺(jué)得Spark已經(jīng)越來(lái)越走進(jìn)我們的日常工作了,無(wú)論是使用哪種編程語(yǔ)言,Python、Scala還是Java,都會(huì)或多或少接觸到Spark,它可以讓我們能夠用到集群的力量,可以對(duì)BigData進(jìn)行高效操作,實(shí)現(xiàn)很多之前由于計(jì)算資源而無(wú)法輕易實(shí)現(xiàn)的東西。網(wǎng)上有很多關(guān)于Spark的好處,這里就不做過(guò)多的贅述,我們直接進(jìn)入這篇文章的正文!

關(guān)于PySpark,我們知道它是Python調(diào)用Spark的接口,我們可以通過(guò)調(diào)用Python API的方式來(lái)編寫(xiě)Spark程序,它支持了大多數(shù)的Spark功能,比如SparkDataFrame、Spark SQL、Streaming、MLlib等等。只要我們了解Python的基本語(yǔ)法,那么在Python里調(diào)用Spark的力量就顯得十分easy了。下面我將會(huì)從相對(duì)宏觀的層面介紹一下PySpark,讓我們對(duì)于這個(gè)神器有一個(gè)框架性的認(rèn)識(shí),知道它能干什么,知道去哪里尋找問(wèn)題解答,爭(zhēng)取看完這篇文章可以讓我們更加絲滑地入門(mén)PySpark。話不多說(shuō),馬上開(kāi)始!
?? 目錄

?? 安裝指引
安裝這塊本文就不展開(kāi)具體的步驟了,畢竟大家的機(jī)子環(huán)境都不盡相同。不過(guò)可以簡(jiǎn)單說(shuō)幾點(diǎn)重要的步驟,然后節(jié)末放上一些安裝示例供大家參考。
1)要使用PySpark,機(jī)子上要有Java開(kāi)發(fā)環(huán)境
2)環(huán)境變量記得要配置完整
3)Mac下的/usr/local/ 路徑一般是隱藏的,PyCharm配置py4j和pyspark的時(shí)候可以使用 shift+command+G 來(lái)使用路徑訪問(wèn)。
4)Mac下如果修改了 ~/.bash_profile 的話,記得要重啟下PyCharm才會(huì)生效的哈
5)版本記得要搞對(duì),保險(xiǎn)起見(jiàn)Java的jdk版本選擇低版本(別問(wèn)我為什么知道),我選擇的是Java8.
下面是一些示例demo,可以參考下:
1)Mac下安裝spark,并配置pycharm-pyspark完整教程
https://blog.csdn.net/shiyutianming/article/details/99946797
2)virtualBox里安裝開(kāi)發(fā)環(huán)境
https://www.bilibili.com/video/BV1i4411i79a?p=3
3)快速搭建spark開(kāi)發(fā)環(huán)境,云哥項(xiàng)目
https://github.com/lyhue1991/eat_pyspark_in_10_days
?? 基礎(chǔ)概念
關(guān)于Spark的基礎(chǔ)概念,我在先前的文章里也有寫(xiě)過(guò),大家可以一起來(lái)回顧一下 《想學(xué)習(xí)Spark?先帶你了解一些基礎(chǔ)的知識(shí)》。作為補(bǔ)充,今天在這里也介紹一些在Spark中會(huì)經(jīng)常遇見(jiàn)的專(zhuān)有名詞。
???♀? Q1: 什么是RDD
RDD的全稱(chēng)是 Resilient Distributed Datasets,這是Spark的一種數(shù)據(jù)抽象集合,它可以被執(zhí)行在分布式的集群上進(jìn)行各種操作,而且有較強(qiáng)的容錯(cuò)機(jī)制。RDD可以被分為若干個(gè)分區(qū),每一個(gè)分區(qū)就是一個(gè)數(shù)據(jù)集片段,從而可以支持分布式計(jì)算。
???♀? Q2: RDD運(yùn)行時(shí)相關(guān)的關(guān)鍵名詞
簡(jiǎn)單來(lái)說(shuō)可以有 Client、Job、Master、Worker、Driver、Stage、Task以及Executor,這幾個(gè)東西在調(diào)優(yōu)的時(shí)候也會(huì)經(jīng)常遇到的。
Client:指的是客戶端進(jìn)程,主要負(fù)責(zé)提交job到Master;
Job:Job來(lái)自于我們編寫(xiě)的程序,Application包含一個(gè)或者多個(gè)job,job包含各種RDD操作;
Master:指的是Standalone模式中的主控節(jié)點(diǎn),負(fù)責(zé)接收來(lái)自Client的job,并管理著worker,可以給worker分配任務(wù)和資源(主要是driver和executor資源);
Worker:指的是Standalone模式中的slave節(jié)點(diǎn),負(fù)責(zé)管理本節(jié)點(diǎn)的資源,同時(shí)受Master管理,需要定期給Master回報(bào)heartbeat(心跳),啟動(dòng)Driver和Executor;
Driver:指的是 job(作業(yè))的主進(jìn)程,一般每個(gè)Spark作業(yè)都會(huì)有一個(gè)Driver進(jìn)程,負(fù)責(zé)整個(gè)作業(yè)的運(yùn)行,包括了job的解析、Stage的生成、調(diào)度Task到Executor上去執(zhí)行;
Stage:中文名 階段,是job的基本調(diào)度單位,因?yàn)槊總€(gè)job會(huì)分成若干組Task,每組任務(wù)就被稱(chēng)為 Stage;
Task:任務(wù),指的是直接運(yùn)行在executor上的東西,是executor上的一個(gè)線程;
Executor:指的是 執(zhí)行器,顧名思義就是真正執(zhí)行任務(wù)的地方了,一個(gè)集群可以被配置若干個(gè)Executor,每個(gè)Executor接收來(lái)自Driver的Task,并執(zhí)行它(可同時(shí)執(zhí)行多個(gè)Task)。
???♀? Q3: 什么是DAG
全稱(chēng)是 Directed Acyclic Graph,中文名是有向無(wú)環(huán)圖。Spark就是借用了DAG對(duì)RDD之間的關(guān)系進(jìn)行了建模,用來(lái)描述RDD之間的因果依賴(lài)關(guān)系。因?yàn)樵谝粋€(gè)Spark作業(yè)調(diào)度中,多個(gè)作業(yè)任務(wù)之間也是相互依賴(lài)的,有些任務(wù)需要在一些任務(wù)執(zhí)行完成了才可以執(zhí)行的。在Spark調(diào)度中就是有DAGscheduler,它負(fù)責(zé)將job分成若干組Task組成的Stage。

???♀? Q4: Spark的部署模式有哪些
主要有l(wèi)ocal模式、Standalone模式、Mesos模式、YARN模式。
更多的解釋可以參考這位老哥的解釋。https://www.jianshu.com/p/3b8f85329664
???♀? Q5: Shuffle操作是什么
Shuffle指的是數(shù)據(jù)從Map端到Reduce端的數(shù)據(jù)傳輸過(guò)程,Shuffle性能的高低直接會(huì)影響程序的性能。因?yàn)镽educe task需要跨節(jié)點(diǎn)去拉在分布在不同節(jié)點(diǎn)上的Map task計(jì)算結(jié)果,這一個(gè)過(guò)程是需要有磁盤(pán)IO消耗以及數(shù)據(jù)網(wǎng)絡(luò)傳輸?shù)南牡?,所以需要根?jù)實(shí)際數(shù)據(jù)情況進(jìn)行適當(dāng)調(diào)整。另外,Shuffle可以分為兩部分,分別是Map階段的數(shù)據(jù)準(zhǔn)備與Reduce階段的數(shù)據(jù)拷貝處理,在Map端我們叫Shuffle Write,在Reduce端我們叫Shuffle Read。
???♀? Q6: 什么是惰性執(zhí)行
這是RDD的一個(gè)特性,在RDD中的算子可以分為T(mén)ransform算子和Action算子,其中Transform算子的操作都不會(huì)真正執(zhí)行,只會(huì)記錄一下依賴(lài)關(guān)系,直到遇見(jiàn)了Action算子,在這之前的所有Transform操作才會(huì)被觸發(fā)計(jì)算,這就是所謂的惰性執(zhí)行。具體哪些是Transform和Action算子,可以看下一節(jié)。
?? 常用函數(shù)
從網(wǎng)友的總結(jié)來(lái)看比較常用的算子大概可以分為下面幾種,所以就演示一下這些算子,如果需要看更多的算子或者解釋?zhuān)ㄗh可以移步到官方API文檔去Search一下哈。
pyspark.RDD:http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD

下面我們用自己創(chuàng)建的RDD:sc.parallelize(range(1,11),4)
import os
import pyspark
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("test_SamShare").setMaster("local[4]")
sc = SparkContext(conf=conf)
# 使用 parallelize方法直接實(shí)例化一個(gè)RDD
rdd = sc.parallelize(range(1,11),4) # 這里的 4 指的是分區(qū)數(shù)量
rdd.take(100)
# [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
"""
----------------------------------------------
Transform算子解析
----------------------------------------------
"""
# 以下的操作由于是Transform操作,因?yàn)槲覀冃枰谧詈蠹由弦粋€(gè)collect算子用來(lái)觸發(fā)計(jì)算。
# 1. map: 和python差不多,map轉(zhuǎn)換就是對(duì)每一個(gè)元素進(jìn)行一個(gè)映射
rdd = sc.parallelize(range(1, 11), 4)
rdd_map = rdd.map(lambda x: x*2)
print("原始數(shù)據(jù):", rdd.collect())
print("擴(kuò)大2倍:", rdd_map.collect())
# 原始數(shù)據(jù): [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 擴(kuò)大2倍: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
# 2. flatMap: 這個(gè)相比于map多一個(gè)flat(壓平)操作,顧名思義就是要把高維的數(shù)組變成一維
rdd2 = sc.parallelize(["hello SamShare", "hello PySpark"])
print("原始數(shù)據(jù):", rdd2.collect())
print("直接split之后的map結(jié)果:", rdd2.map(lambda x: x.split(" ")).collect())
print("直接split之后的flatMap結(jié)果:", rdd2.flatMap(lambda x: x.split(" ")).collect())
# 直接split之后的map結(jié)果: [['hello', 'SamShare'], ['hello', 'PySpark']]
# 直接split之后的flatMap結(jié)果: ['hello', 'SamShare', 'hello', 'PySpark']
# 3. filter: 過(guò)濾數(shù)據(jù)
rdd = sc.parallelize(range(1, 11), 4)
print("原始數(shù)據(jù):", rdd.collect())
print("過(guò)濾奇數(shù):", rdd.filter(lambda x: x % 2 == 0).collect())
# 原始數(shù)據(jù): [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 過(guò)濾奇數(shù): [2, 4, 6, 8, 10]
# 4. distinct: 去重元素
rdd = sc.parallelize([2, 2, 4, 8, 8, 8, 8, 16, 32, 32])
print("原始數(shù)據(jù):", rdd.collect())
print("去重?cái)?shù)據(jù):", rdd.distinct().collect())
# 原始數(shù)據(jù): [2, 2, 4, 8, 8, 8, 8, 16, 32, 32]
# 去重?cái)?shù)據(jù): [4, 8, 16, 32, 2]
# 5. reduceByKey: 根據(jù)key來(lái)映射數(shù)據(jù)
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print("原始數(shù)據(jù):", rdd.collect())
print("原始數(shù)據(jù):", rdd.reduceByKey(add).collect())
# 原始數(shù)據(jù): [('a', 1), ('b', 1), ('a', 1)]
# 原始數(shù)據(jù): [('b', 1), ('a', 2)]
# 6. mapPartitions: 根據(jù)分區(qū)內(nèi)的數(shù)據(jù)進(jìn)行映射操作
rdd = sc.parallelize([1, 2, 3, 4], 2)
def f(iterator):
yield sum(iterator)
print(rdd.collect())
print(rdd.mapPartitions(f).collect())
# [1, 2, 3, 4]
# [3, 7]
# 7. sortBy: 根據(jù)規(guī)則進(jìn)行排序
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
print(sc.parallelize(tmp).sortBy(lambda x: x[0]).collect())
print(sc.parallelize(tmp).sortBy(lambda x: x[1]).collect())
# [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
# [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
# 8. subtract: 數(shù)據(jù)集相減, Return each value in self that is not contained in other.
x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
y = sc.parallelize([("a", 3), ("c", None)])
print(sorted(x.subtract(y).collect()))
# [('a', 1), ('b', 4), ('b', 5)]
# 9. union: 合并兩個(gè)RDD
rdd = sc.parallelize([1, 1, 2, 3])
print(rdd.union(rdd).collect())
# [1, 1, 2, 3, 1, 1, 2, 3]
# 10. intersection: 取兩個(gè)RDD的交集,同時(shí)有去重的功效
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5, 2, 3])
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
print(rdd1.intersection(rdd2).collect())
# [1, 2, 3]
# 11. cartesian: 生成笛卡爾積
rdd = sc.parallelize([1, 2])
print(sorted(rdd.cartesian(rdd).collect()))
# [(1, 1), (1, 2), (2, 1), (2, 2)]
# 12. zip: 拉鏈合并,需要兩個(gè)RDD具有相同的長(zhǎng)度以及分區(qū)數(shù)量
x = sc.parallelize(range(0, 5))
y = sc.parallelize(range(1000, 1005))
print(x.collect())
print(y.collect())
print(x.zip(y).collect())
# [0, 1, 2, 3, 4]
# [1000, 1001, 1002, 1003, 1004]
# [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
# 13. zipWithIndex: 將RDD和一個(gè)從0開(kāi)始的遞增序列按照拉鏈方式連接。
rdd_name = sc.parallelize(["LiLei", "Hanmeimei", "Lily", "Lucy", "Ann", "Dachui", "RuHua"])
rdd_index = rdd_name.zipWithIndex()
print(rdd_index.collect())
# [('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]
# 14. groupByKey: 按照key來(lái)聚合數(shù)據(jù)
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(rdd.collect())
print(sorted(rdd.groupByKey().mapValues(len).collect()))
print(sorted(rdd.groupByKey().mapValues(list).collect()))
# [('a', 1), ('b', 1), ('a', 1)]
# [('a', 2), ('b', 1)]
# [('a', [1, 1]), ('b', [1])]
# 15. sortByKey:
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
print(sc.parallelize(tmp).sortByKey(True, 1).collect())
# [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
# 16. join:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
print(sorted(x.join(y).collect()))
# [('a', (1, 2)), ('a', (1, 3))]
# 17. leftOuterJoin/rightOuterJoin
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
print(sorted(x.leftOuterJoin(y).collect()))
# [('a', (1, 2)), ('b', (4, None))]
"""
----------------------------------------------
Action算子解析
----------------------------------------------
"""
# 1. collect: 指的是把數(shù)據(jù)都匯集到driver端,便于后續(xù)的操作
rdd = sc.parallelize(range(0, 5))
rdd_collect = rdd.collect()
print(rdd_collect)
# [0, 1, 2, 3, 4]
# 2. first: 取第一個(gè)元素
sc.parallelize([2, 3, 4]).first()
# 2
# 3. collectAsMap: 轉(zhuǎn)換為dict,使用這個(gè)要注意了,不要對(duì)大數(shù)據(jù)用,不然全部載入到driver端會(huì)爆內(nèi)存
m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
m
# {1: 2, 3: 4}
# 4. reduce: 逐步對(duì)兩個(gè)元素進(jìn)行操作
rdd = sc.parallelize(range(10),5)
print(rdd.reduce(lambda x,y:x+y))
# 45
# 5. countByKey/countByValue:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(sorted(rdd.countByKey().items()))
print(sorted(rdd.countByValue().items()))
# [('a', 2), ('b', 1)]
# [(('a', 1), 2), (('b', 1), 1)]
# 6. take: 相當(dāng)于取幾個(gè)數(shù)據(jù)到driver端
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(rdd.take(5))
# [('a', 1), ('b', 1), ('a', 1)]
# 7. saveAsTextFile: 保存rdd成text文件到本地
text_file = "./data/rdd.txt"
rdd = sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)
# 8. takeSample: 隨機(jī)取數(shù)
rdd = sc.textFile("./test/data/hello_samshare.txt", 4) # 這里的 4 指的是分區(qū)數(shù)量
rdd_sample = rdd.takeSample(True, 2, 0) # withReplacement 參數(shù)1:代表是否是有放回抽樣
rdd_sample
# 9. foreach: 對(duì)每一個(gè)元素執(zhí)行某種操作,不生成新的RDD
rdd = sc.parallelize(range(10), 5)
accum = sc.accumulator(0)
rdd.foreach(lambda x: accum.add(x))
print(accum.value)
# 45
?? Spark SQL使用
在講Spark SQL前,先解釋下這個(gè)模塊。這個(gè)模塊是Spark中用來(lái)處理結(jié)構(gòu)化數(shù)據(jù)的,提供一個(gè)叫SparkDataFrame的東西并且自動(dòng)解析為分布式SQL查詢(xún)數(shù)據(jù)。我們之前用過(guò)Python的Pandas庫(kù),也大致了解了DataFrame,這個(gè)其實(shí)和它沒(méi)有太大的區(qū)別,只是調(diào)用的API可能有些不同罷了。
我們通過(guò)使用Spark SQL來(lái)處理數(shù)據(jù),會(huì)讓我們更加地熟悉,比如可以用SQL語(yǔ)句、用SparkDataFrame的API或者Datasets API,我們可以按照需求隨心轉(zhuǎn)換,通過(guò)SparkDataFrame API 和 SQL 寫(xiě)的邏輯,會(huì)被Spark優(yōu)化器Catalyst自動(dòng)優(yōu)化成RDD,即便寫(xiě)得不好也可能運(yùn)行得很快(如果是直接寫(xiě)RDD可能就掛了哈哈)。
創(chuàng)建SparkDataFrame
開(kāi)始講SparkDataFrame,我們先學(xué)習(xí)下幾種創(chuàng)建的方法,分別是使用RDD來(lái)創(chuàng)建、使用python的DataFrame來(lái)創(chuàng)建、使用List來(lái)創(chuàng)建、讀取數(shù)據(jù)文件來(lái)創(chuàng)建、通過(guò)讀取數(shù)據(jù)庫(kù)來(lái)創(chuàng)建。
1. 使用RDD來(lái)創(chuàng)建
主要使用RDD的toDF方法。
rdd = sc.parallelize([("Sam", 28, 88), ("Flora", 28, 90), ("Run", 1, 60)])
df = rdd.toDF(["name", "age", "score"])
df.show()
df.printSchema()
# +-----+---+-----+
# | name|age|score|
# +-----+---+-----+
# | Sam| 28| 88|
# |Flora| 28| 90|
# | Run| 1| 60|
# +-----+---+-----+
# root
# |-- name: string (nullable = true)
# |-- age: long (nullable = true)
# |-- score: long (nullable = true)
2. 使用python的DataFrame來(lái)創(chuàng)建
df = pd.DataFrame([['Sam', 28, 88], ['Flora', 28, 90], ['Run', 1, 60]],
columns=['name', 'age', 'score'])
print(">> 打印DataFrame:")
print(df)
print("\n")
Spark_df = spark.createDataFrame(df)
print(">> 打印SparkDataFrame:")
Spark_df.show()
# >> 打印DataFrame:
# name age score
# 0 Sam 28 88
# 1 Flora 28 90
# 2 Run 1 60
# >> 打印SparkDataFrame:
# +-----+---+-----+
# | name|age|score|
# +-----+---+-----+
# | Sam| 28| 88|
# |Flora| 28| 90|
# | Run| 1| 60|
# +-----+---+-----+
3. 使用List來(lái)創(chuàng)建
list_values = [['Sam', 28, 88], ['Flora', 28, 90], ['Run', 1, 60]]
Spark_df = spark.createDataFrame(list_values, ['name', 'age', 'score'])
Spark_df.show()
# +-----+---+-----+
# | name|age|score|
# +-----+---+-----+
# | Sam| 28| 88|
# |Flora| 28| 90|
# | Run| 1| 60|
# +-----+---+-----+
4. 讀取數(shù)據(jù)文件來(lái)創(chuàng)建
# 4.1 CSV文件
df = spark.read.option("header", "true")\
.option("inferSchema", "true")\
.option("delimiter", ",")\
.csv("./test/data/titanic/train.csv")
df.show(5)
df.printSchema()
# 4.2 json文件
df = spark.read.json("./test/data/hello_samshare.json")
df.show(5)
df.printSchema()
5. 通過(guò)讀取數(shù)據(jù)庫(kù)來(lái)創(chuàng)建
# 5.1 讀取hive數(shù)據(jù)
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'data/kv1.txt' INTO TABLE src")
df = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
df.show(5)
# 5.2 讀取mysql數(shù)據(jù)
url = "jdbc:mysql://localhost:3306/test"
df = spark.read.format("jdbc") \
.option("url", url) \
.option("dbtable", "runoob_tbl") \
.option("user", "root") \
.option("password", "8888") \
.load()\
df.show()
常用的SparkDataFrame API
這里我大概是分成了幾部分來(lái)看這些APIs,分別是查看DataFrame的APIs、簡(jiǎn)單處理DataFrame的APIs、DataFrame的列操作APIs、DataFrame的一些思路變換操作APIs、DataFrame的一些統(tǒng)計(jì)操作APIs,這樣子也有助于我們了解這些API的功能,以后遇見(jiàn)實(shí)際問(wèn)題的時(shí)候可以解決。
首先我們這小節(jié)全局用到的數(shù)據(jù)集如下:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
# SparkSQL的許多功能封裝在SparkSession的方法接口中, SparkContext則不行的。
spark = SparkSession.builder \
.appName("sam_SamShare") \
.config("master", "local[4]") \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext
# 創(chuàng)建一個(gè)SparkDataFrame
rdd = sc.parallelize([("Sam", 28, 88, "M"),
("Flora", 28, 90, "F"),
("Run", 1, 60, None),
("Peter", 55, 100, "M"),
("Mei", 54, 95, "F")])
df = rdd.toDF(["name", "age", "score", "sex"])
df.show()
df.printSchema()
# +-----+---+-----+----+
# | name|age|score| sex|
# +-----+---+-----+----+
# | Sam| 28| 88| M|
# |Flora| 28| 90| F|
# | Run| 1| 60|null|
# |Peter| 55| 100| M|
# | Mei| 54| 95| F|
# +-----+---+-----+----+
# root
# |-- name: string (nullable = true)
# |-- age: long (nullable = true)
# |-- score: long (nullable = true)
# |-- sex: string (nullable = true)
1. 查看DataFrame的APIs
# DataFrame.collect
# 以列表形式返回行
df.collect()
# [Row(name='Sam', age=28, score=88, sex='M'),
# Row(name='Flora', age=28, score=90, sex='F'),
# Row(name='Run', age=1, score=60, sex=None),
# Row(name='Peter', age=55, score=100, sex='M'),
# Row(name='Mei', age=54, score=95, sex='F')]
# DataFrame.count
df.count()
# 5
# DataFrame.columns
df.columns
# ['name', 'age', 'score', 'sex']
# DataFrame.dtypes
df.dtypes
# [('name', 'string'), ('age', 'bigint'), ('score', 'bigint'), ('sex', 'string')]
# DataFrame.describe
# 返回列的基礎(chǔ)統(tǒng)計(jì)信息
df.describe(['age']).show()
# +-------+------------------+
# |summary| age|
# +-------+------------------+
# | count| 5|
# | mean| 33.2|
# | stddev|22.353970564532826|
# | min| 1|
# | max| 55|
# +-------+------------------+
df.describe().show()
# +-------+-----+------------------+------------------+----+
# |summary| name| age| score| sex|
# +-------+-----+------------------+------------------+----+
# | count| 5| 5| 5| 4|
# | mean| null| 33.2| 86.6|null|
# | stddev| null|22.353970564532826|15.582040944625966|null|
# | min|Flora| 1| 60| F|
# | max| Sam| 55| 100| M|
# +-------+-----+------------------+------------------+----+
# DataFrame.select
# 選定指定列并按照一定順序呈現(xiàn)
df.select("sex", "score").show()
# DataFrame.first
# DataFrame.head
# 查看第1條數(shù)據(jù)
df.first()
# Row(name='Sam', age=28, score=88, sex='M')
df.head(1)
# [Row(name='Sam', age=28, score=88, sex='M')]
# DataFrame.freqItems
# 查看指定列的枚舉值
df.freqItems(["age","sex"]).show()
# +---------------+-------------+
# | age_freqItems|sex_freqItems|
# +---------------+-------------+
# |[55, 1, 28, 54]| [M, F,]|
# +---------------+-------------+
# DataFrame.summary
df.summary().show()
# +-------+-----+------------------+------------------+----+
# |summary| name| age| score| sex|
# +-------+-----+------------------+------------------+----+
# | count| 5| 5| 5| 4|
# | mean| null| 33.2| 86.6|null|
# | stddev| null|22.353970564532826|15.582040944625966|null|
# | min|Flora| 1| 60| F|
# | 25%| null| 28| 88|null|
# | 50%| null| 28| 90|null|
# | 75%| null| 54| 95|null|
# | max| Sam| 55| 100| M|
# +-------+-----+------------------+------------------+----+
# DataFrame.sample
# 按照一定規(guī)則從df隨機(jī)抽樣數(shù)據(jù)
df.sample(0.5).show()
# +-----+---+-----+----+
# | name|age|score| sex|
# +-----+---+-----+----+
# | Sam| 28| 88| M|
# | Run| 1| 60|null|
# |Peter| 55| 100| M|
# +-----+---+-----+----+
2. 簡(jiǎn)單處理DataFrame的APIs
# DataFrame.distinct
# 對(duì)數(shù)據(jù)集進(jìn)行去重
df.distinct().show()
# DataFrame.dropDuplicates
# 對(duì)指定列去重
df.dropDuplicates(["sex"]).show()
# +-----+---+-----+----+
# | name|age|score| sex|
# +-----+---+-----+----+
# |Flora| 28| 90| F|
# | Run| 1| 60|null|
# | Sam| 28| 88| M|
# +-----+---+-----+----+
# DataFrame.exceptAll
# DataFrame.subtract
# 根據(jù)指定的df對(duì)df進(jìn)行去重
df1 = spark.createDataFrame(
[("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark.createDataFrame([("a", 1), ("b", 3)], ["C1", "C2"])
df3 = df1.exceptAll(df2) # 沒(méi)有去重的功效
df4 = df1.subtract(df2) # 有去重的奇效
df1.show()
df2.show()
df3.show()
df4.show()
# +---+---+
# | C1| C2|
# +---+---+
# | a| 1|
# | a| 1|
# | b| 3|
# | c| 4|
# +---+---+
# +---+---+
# | C1| C2|
# +---+---+
# | a| 1|
# | b| 3|
# +---+---+
# +---+---+
# | C1| C2|
# +---+---+
# | a| 1|
# | c| 4|
# +---+---+
# +---+---+
# | C1| C2|
# +---+---+
# | c| 4|
# +---+---+
# DataFrame.intersectAll
# 返回兩個(gè)DataFrame的交集
df1 = spark.createDataFrame(
[("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark.createDataFrame([("a", 1), ("b", 4)], ["C1", "C2"])
df1.intersectAll(df2).show()
# +---+---+
# | C1| C2|
# +---+---+
# | a| 1|
# +---+---+
# DataFrame.drop
# 丟棄指定列
df.drop('age').show()
# DataFrame.withColumn
# 新增列
df1 = df.withColumn("birth_year", 2021 - df.age)
df1.show()
# +-----+---+-----+----+----------+
# | name|age|score| sex|birth_year|
# +-----+---+-----+----+----------+
# | Sam| 28| 88| M| 1993|
# |Flora| 28| 90| F| 1993|
# | Run| 1| 60|null| 2020|
# |Peter| 55| 100| M| 1966|
# | Mei| 54| 95| F| 1967|
# +-----+---+-----+----+----------+
# DataFrame.withColumnRenamed
# 重命名列名
df1 = df.withColumnRenamed("sex", "gender")
df1.show()
# +-----+---+-----+------+
# | name|age|score|gender|
# +-----+---+-----+------+
# | Sam| 28| 88| M|
# |Flora| 28| 90| F|
# | Run| 1| 60| null|
# |Peter| 55| 100| M|
# | Mei| 54| 95| F|
# +-----+---+-----+------+
# DataFrame.dropna
# 丟棄空值,DataFrame.dropna(how='any', thresh=None, subset=None)
df.dropna(how='all', subset=['sex']).show()
# +-----+---+-----+---+
# | name|age|score|sex|
# +-----+---+-----+---+
# | Sam| 28| 88| M|
# |Flora| 28| 90| F|
# |Peter| 55| 100| M|
# | Mei| 54| 95| F|
# +-----+---+-----+---+
# DataFrame.fillna
# 空值填充操作
df1 = spark.createDataFrame(
[("a", None), ("a", 1), (None, 3), ("c", 4)], ["C1", "C2"])
# df2 = df1.na.fill({"C1": "d", "C2": 99})
df2 = df1.fillna({"C1": "d", "C2": 99})
df1.show()
df2.show()
# DataFrame.filter
# 根據(jù)條件過(guò)濾
df.filter(df.age>50).show()
# +-----+---+-----+---+
# | name|age|score|sex|
# +-----+---+-----+---+
# |Peter| 55| 100| M|
# | Mei| 54| 95| F|
# +-----+---+-----+---+
df.where(df.age==28).show()
# +-----+---+-----+---+
# | name|age|score|sex|
# +-----+---+-----+---+
# | Sam| 28| 88| M|
# |Flora| 28| 90| F|
# +-----+---+-----+---+
df.filter("age<18").show()
# +----+---+-----+----+
# |name|age|score| sex|
# +----+---+-----+----+
# | Run| 1| 60|null|
# +----+---+-----+----+
# DataFrame.join
# 這個(gè)不用多解釋了,直接上案例來(lái)看看具體的語(yǔ)法即可,DataFrame.join(other, on=None, how=None)
df1 = spark.createDataFrame(
[("a", 1), ("d", 1), ("b", 3), ("c", 4)], ["id", "num1"])
df2 = spark.createDataFrame([("a", 1), ("b", 3)], ["id", "num2"])
df1.join(df2, df1.id == df2.id, 'left').select(df1.id.alias("df1_id"),
df1.num1.alias("df1_num"),
df2.num2.alias("df2_num")
).sort(["df1_id"], ascending=False)\
.show()
# DataFrame.agg(*exprs)
# 聚合數(shù)據(jù),可以寫(xiě)多個(gè)聚合方法,如果不寫(xiě)groupBy的話就是對(duì)整個(gè)DF進(jìn)行聚合
# DataFrame.alias
# 設(shè)置列或者DataFrame別名
# DataFrame.groupBy
# 根據(jù)某幾列進(jìn)行聚合,如有多列用列表寫(xiě)在一起,如 df.groupBy(["sex", "age"])
df.groupBy("sex").agg(F.min(df.age).alias("最小年齡"),
F.expr("avg(age)").alias("平均年齡"),
F.expr("collect_list(name)").alias("姓名集合")
).show()
# +----+--------+--------+------------+
# | sex|最小年齡|平均年齡| 姓名集合|
# +----+--------+--------+------------+
# | F| 28| 41.0|[Flora, Mei]|
# |null| 1| 1.0| [Run]|
# | M| 28| 41.5|[Sam, Peter]|
# +----+--------+--------+------------+
# DataFrame.foreach
# 對(duì)每一行進(jìn)行函數(shù)方法的應(yīng)用
def f(person):
print(person.name)
df.foreach(f)
# Peter
# Run
# Sam
# Flora
# Mei
# DataFrame.replace
# 修改df里的某些值
df1 = df.na.replace({"M": "Male", "F": "Female"})
df1.show()
# DataFrame.union
# 相當(dāng)于SQL里的union all操作
df1 = spark.createDataFrame(
[("a", 1), ("d", 1), ("b", 3), ("c", 4)], ["id", "num"])
df2 = spark.createDataFrame([("a", 1), ("b", 3)], ["id", "num"])
df1.union(df2).show()
df1.unionAll(df2).show()
# 這里union沒(méi)有去重,不知道為啥,有知道的朋友麻煩解釋下,謝謝了。
# +---+---+
# | id|num|
# +---+---+
# | a| 1|
# | d| 1|
# | b| 3|
# | c| 4|
# | a| 1|
# | b| 3|
# +---+---+
# DataFrame.unionByName
# 根據(jù)列名來(lái)進(jìn)行合并數(shù)據(jù)集
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()
# +----+----+----+
# |col0|col1|col2|
# +----+----+----+
# | 1| 2| 3|
# | 6| 4| 5|
# +----+----+----+
3. DataFrame的列操作APIs
這里主要針對(duì)的是列進(jìn)行操作,比如說(shuō)重命名、排序、空值判斷、類(lèi)型判斷等,這里就不展開(kāi)寫(xiě)demo了,看看語(yǔ)法應(yīng)該大家都懂了。
Column.alias(*alias, **kwargs) # 重命名列名
Column.asc() # 按照列進(jìn)行升序排序
Column.desc() # 按照列進(jìn)行降序排序
Column.astype(dataType) # 類(lèi)型轉(zhuǎn)換
Column.cast(dataType) # 強(qiáng)制轉(zhuǎn)換類(lèi)型
Column.between(lowerBound, upperBound) # 返回布爾值,是否在指定區(qū)間范圍內(nèi)
Column.contains(other) # 是否包含某個(gè)關(guān)鍵詞
Column.endswith(other) # 以什么結(jié)束的值,如 df.filter(df.name.endswith('ice')).collect()
Column.isNotNull() # 篩選非空的行
Column.isNull()
Column.isin(*cols) # 返回包含某些值的行 df[df.name.isin("Bob", "Mike")].collect()
Column.like(other) # 返回含有關(guān)鍵詞的行
Column.when(condition, value) # 給True的賦值
Column.otherwise(value) # 與when搭配使用,df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()
Column.rlike(other) # 可以使用正則的匹配 df.filter(df.name.rlike('ice$')).collect()
Column.startswith(other) # df.filter(df.name.startswith('Al')).collect()
Column.substr(startPos, length) # df.select(df.name.substr(1, 3).alias("col")).collect()
4. DataFrame的一些思路變換操作APIs
# DataFrame.createOrReplaceGlobalTempView
# DataFrame.dropGlobalTempView
# 創(chuàng)建全局的試圖,注冊(cè)后可以使用sql語(yǔ)句來(lái)進(jìn)行操作,生命周期取決于Spark application本身
df.createOrReplaceGlobalTempView("people")
spark.sql("select * from global_temp.people where sex = 'M' ").show()
# +-----+---+-----+---+
# | name|age|score|sex|
# +-----+---+-----+---+
# | Sam| 28| 88| M|
# |Peter| 55| 100| M|
# +-----+---+-----+---+
# DataFrame.createOrReplaceTempView
# DataFrame.dropTempView
# 創(chuàng)建本地臨時(shí)試圖,生命周期取決于用來(lái)創(chuàng)建此數(shù)據(jù)集的SparkSession
df.createOrReplaceTempView("tmp_people")
spark.sql("select * from tmp_people where sex = 'F' ").show()
# +-----+---+-----+---+
# | name|age|score|sex|
# +-----+---+-----+---+
# |Flora| 28| 90| F|
# | Mei| 54| 95| F|
# +-----+---+-----+---+
# DataFrame.cache\DataFrame.persist
# 可以把一些數(shù)據(jù)放入緩存中,default storage level (MEMORY_AND_DISK).
df.cache()
df.persist()
df.unpersist()
# DataFrame.crossJoin
# 返回兩個(gè)DataFrame的笛卡爾積關(guān)聯(lián)的DataFrame
df1 = df.select("name", "sex")
df2 = df.select("name", "sex")
df3 = df1.crossJoin(df2)
print("表1的記錄數(shù)", df1.count())
print("表2的記錄數(shù)", df2.count())
print("笛卡爾積后的記錄數(shù)", df3.count())
# 表1的記錄數(shù) 5
# 表2的記錄數(shù) 5
# 笛卡爾積后的記錄數(shù) 25
# DataFrame.toPandas
# 把SparkDataFrame轉(zhuǎn)為 Pandas的DataFrame
df.toPandas()
# DataFrame.rdd
# 把SparkDataFrame轉(zhuǎn)為rdd,這樣子可以用rdd的語(yǔ)法來(lái)操作數(shù)據(jù)
df.rdd
5. DataFrame的一些統(tǒng)計(jì)操作APIs
# DataFrame.cov
# 計(jì)算指定兩列的樣本協(xié)方差
df.cov("age", "score")
# 324.59999999999997
# DataFrame.corr
# 計(jì)算指定兩列的相關(guān)系數(shù),DataFrame.corr(col1, col2, method=None),目前method只支持Pearson相關(guān)系數(shù)
df.corr("age", "score", method="pearson")
# 0.9319004030498815
# DataFrame.cube
# 創(chuàng)建多維度聚合的結(jié)果,通常用于分析數(shù)據(jù),比如我們指定兩個(gè)列進(jìn)行聚合,比如name和age,那么這個(gè)函數(shù)返回的聚合結(jié)果會(huì)
# groupby("name", "age")
# groupby("name")
# groupby("age")
# groupby(all)
# 四個(gè)聚合結(jié)果的union all 的結(jié)果
df1 = df.filter(df.name != "Run")
print(df1.show())
df1.cube("name", "sex").count().show()
# +-----+---+-----+---+
# | name|age|score|sex|
# +-----+---+-----+---+
# | Sam| 28| 88| M|
# |Flora| 28| 90| F|
# |Peter| 55| 100| M|
# | Mei| 54| 95| F|
# +-----+---+-----+---+
# cube 聚合之后的結(jié)果
# +-----+----+-----+
# | name| sex|count|
# +-----+----+-----+
# | null| F| 2|
# | null|null| 4|
# |Flora|null| 1|
# |Peter|null| 1|
# | null| M| 2|
# |Peter| M| 1|
# | Sam| M| 1|
# | Sam|null| 1|
# | Mei| F| 1|
# | Mei|null| 1|
# |Flora| F| 1|
# +-----+----+-----+
保存數(shù)據(jù)/寫(xiě)入數(shù)據(jù)庫(kù)
這里的保存數(shù)據(jù)主要是保存到Hive中的栗子,主要包括了overwrite、append等方式。
1. 當(dāng)結(jié)果集為SparkDataFrame的時(shí)候
import pandas as pd
from datetime import datetime
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import HiveContext
conf = SparkConf()\
.setAppName("test")\
.set("hive.exec.dynamic.partition.mode", "nonstrict") # 動(dòng)態(tài)寫(xiě)入hive分區(qū)表
sc = SparkContext(conf=conf)
hc = HiveContext(sc)
sc.setLogLevel("ERROR")
list_values = [['Sam', 28, 88], ['Flora', 28, 90], ['Run', 1, 60]]
Spark_df = spark.createDataFrame(list_values, ['name', 'age', 'score'])
print(Spark_df.show())
save_table = "tmp.samshare_pyspark_savedata"
# 方式1:直接寫(xiě)入到Hive
Spark_df.write.format("hive").mode("overwrite").saveAsTable(save_table) # 或者改成append模式
print(datetime.now().strftime("%y/%m/%d %H:%M:%S"), "測(cè)試數(shù)據(jù)寫(xiě)入到表" + save_table)
# 方式2:注冊(cè)為臨時(shí)表,使用SparkSQL來(lái)寫(xiě)入分區(qū)表
Spark_df.createOrReplaceTempView("tmp_table")
write_sql = """
insert overwrite table {0} partitions (pt_date='{1}')
select * from tmp_table
""".format(save_table, "20210520")
hc.sql(write_sql)
print(datetime.now().strftime("%y/%m/%d %H:%M:%S"), "測(cè)試數(shù)據(jù)寫(xiě)入到表" + save_table)
2. 當(dāng)結(jié)果集為Python的DataFrame的時(shí)候
如果是Python的DataFrame,我們就需要多做一步把它轉(zhuǎn)換為SparkDataFrame,其余操作就一樣了。
import pandas as pd
from datetime import datetime
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import HiveContext
conf = SparkConf()\
.setAppName("test")\
.set("hive.exec.dynamic.partition.mode", "nonstrict") # 動(dòng)態(tài)寫(xiě)入hive分區(qū)表
sc = SparkContext(conf=conf)
hc = HiveContext(sc)
sc.setLogLevel("ERROR")
result_df = pd.DataFrame([1,2,3], columns=['a'])
save_table = "tmp.samshare_pyspark_savedata"
# 獲取DataFrame的schema
c1 = list(result_df.columns)
# 轉(zhuǎn)為SparkDataFrame
result = hc.createDataFrame(result_df.astype(str), c1)
result.write.format("hive").mode("overwrite").saveAsTable(save_table) # 或者改成append模式
print(datetime.now().strftime("%y/%m/%d %H:%M:%S"), "測(cè)試數(shù)據(jù)寫(xiě)入到表" + save_table)
?? Spark調(diào)優(yōu)思路
這一小節(jié)的內(nèi)容算是對(duì)pyspark入門(mén)的一個(gè)ending了,全文主要是參考學(xué)習(xí)了美團(tuán)Spark性能優(yōu)化指南的基礎(chǔ)篇和高級(jí)篇內(nèi)容,主體脈絡(luò)和這兩篇文章是一樣的,只不過(guò)是基于自己學(xué)習(xí)后的理解進(jìn)行了一次總結(jié)復(fù)盤(pán),而原文中主要是用Java來(lái)舉例的,我這邊主要用pyspark來(lái)舉例。文章主要會(huì)從4個(gè)方面(或者說(shuō)4個(gè)思路)來(lái)優(yōu)化我們的Spark任務(wù),主要就是下面的圖片所示:

開(kāi)發(fā)習(xí)慣調(diào)優(yōu)
1. 盡可能復(fù)用同一個(gè)RDD,避免重復(fù)創(chuàng)建,并且適當(dāng)持久化數(shù)據(jù)
這種開(kāi)發(fā)習(xí)慣是需要我們對(duì)于即將要開(kāi)發(fā)的應(yīng)用邏輯有比較深刻的思考,并且可以通過(guò)code review來(lái)發(fā)現(xiàn)的,講白了就是要記得我們創(chuàng)建過(guò)啥數(shù)據(jù)集,可以復(fù)用的盡量廣播(broadcast)下,能很好提升性能。
# 最低級(jí)寫(xiě)法,相同數(shù)據(jù)集重復(fù)創(chuàng)建。
rdd1 = sc.textFile("./test/data/hello_samshare.txt", 4) # 這里的 4 指的是分區(qū)數(shù)量
rdd2 = sc.textFile("./test/data/hello_samshare.txt", 4) # 這里的 4 指的是分區(qū)數(shù)量
print(rdd1.take(10))
print(rdd2.map(lambda x:x[0:1]).take(10))
# 稍微進(jìn)階一些,復(fù)用相同數(shù)據(jù)集,但因中間結(jié)果沒(méi)有緩存,數(shù)據(jù)會(huì)重復(fù)計(jì)算
rdd1 = sc.textFile("./test/data/hello_samshare.txt", 4) # 這里的 4 指的是分區(qū)數(shù)量
print(rdd1.take(10))
print(rdd1.map(lambda x:x[0:1]).take(10))
# 相對(duì)比較高效,使用緩存來(lái)持久化數(shù)據(jù)
rdd = sc.parallelize(range(1, 11), 4).cache() # 或者persist()
rdd_map = rdd.map(lambda x: x*2)
rdd_reduce = rdd.reduce(lambda x, y: x+y)
print(rdd_map.take(10))
print(rdd_reduce)
下面我們就來(lái)對(duì)比一下使用緩存能給我們的Spark程序帶來(lái)多大的效率提升吧,我們先構(gòu)造一個(gè)程序運(yùn)行時(shí)長(zhǎng)測(cè)量器。
import time
# 統(tǒng)計(jì)程序運(yùn)行時(shí)間
def time_me(info="used"):
def _time_me(fn):
@functools.wraps(fn)
def _wrapper(*args, **kwargs):
start = time.time()
fn(*args, **kwargs)
print("%s %s %s" % (fn.__name__, info, time.time() - start), "second")
return _wrapper
return _time_me
下面我們運(yùn)行下面的代碼,看下使用了cache帶來(lái)的效率提升:
@time_me()
def test(types=0):
if types == 1:
print("使用持久化緩存")
rdd = sc.parallelize(range(1, 10000000), 4)
rdd1 = rdd.map(lambda x: x*x + 2*x + 1).cache() # 或者 persist(StorageLevel.MEMORY_AND_DISK_SER)
print(rdd1.take(10))
rdd2 = rdd1.reduce(lambda x, y: x+y)
rdd3 = rdd1.reduce(lambda x, y: x + y)
rdd4 = rdd1.reduce(lambda x, y: x + y)
rdd5 = rdd1.reduce(lambda x, y: x + y)
print(rdd5)
else:
print("不使用持久化緩存")
rdd = sc.parallelize(range(1, 10000000), 4)
rdd1 = rdd.map(lambda x: x * x + 2 * x + 1)
print(rdd1.take(10))
rdd2 = rdd1.reduce(lambda x, y: x + y)
rdd3 = rdd1.reduce(lambda x, y: x + y)
rdd4 = rdd1.reduce(lambda x, y: x + y)
rdd5 = rdd1.reduce(lambda x, y: x + y)
print(rdd5)
test() # 不使用持久化緩存
time.sleep(10)
test(1) # 使用持久化緩存
# output:
# 使用持久化緩存
# [4, 9, 16, 25, 36, 49, 64, 81, 100, 121]
# 333333383333334999999
# test used 26.36529278755188 second
# 使用持久化緩存
# [4, 9, 16, 25, 36, 49, 64, 81, 100, 121]
# 333333383333334999999
# test used 17.49532413482666 second
同時(shí)我們打開(kāi)YARN日志來(lái)看看:http://localhost:4040/jobs/

因?yàn)槲覀兊拇a是需要重復(fù)調(diào)用RDD1的,當(dāng)沒(méi)有對(duì)RDD1進(jìn)行持久化的時(shí)候,每次當(dāng)它被action算子消費(fèi)了之后,就釋放了,等下一個(gè)算子計(jì)算的時(shí)候要用,就從頭開(kāi)始計(jì)算一下RDD1。代碼中需要重復(fù)調(diào)用RDD1 五次,所以沒(méi)有緩存的話,差不多每次都要6秒,總共需要耗時(shí)26秒左右,但是,做了緩存,每次就只需要3s不到,總共需要耗時(shí)17秒左右。
另外,這里需要提及一下一個(gè)知識(shí)點(diǎn),那就是持久化的級(jí)別,一般cache的話就是放入內(nèi)存中,就沒(méi)有什么好說(shuō)的,需要講一下的就是另外一個(gè) persist(),它的持久化級(jí)別是可以被我們所配置的:
| 持久化級(jí)別 | 含義解釋 |
|---|---|
| MEMORY_ONLY | 將數(shù)據(jù)保存在內(nèi)存中。如果內(nèi)存不夠存放所有的數(shù)據(jù),則數(shù)據(jù)可能就不會(huì)進(jìn)行持久化。使用cache()方法時(shí),實(shí)際就是使用的這種持久化策略,性能也是最高的。 |
| MEMORY_AND_DISK | 優(yōu)先嘗試將數(shù)據(jù)保存在內(nèi)存中,如果內(nèi)存不夠存放所有的數(shù)據(jù),會(huì)將數(shù)據(jù)寫(xiě)入磁盤(pán)文件中。 |
| MEMORY_ONLY_SER | 基本含義同MEMORY_ONLY。唯一的區(qū)別是,會(huì)將RDD中的數(shù)據(jù)進(jìn)行序列化,RDD的每個(gè)partition會(huì)被序列化成一個(gè)字節(jié)數(shù)組。這種方式更加節(jié)省內(nèi)存,從而可以避免持久化的數(shù)據(jù)占用過(guò)多內(nèi)存導(dǎo)致頻繁GC。 |
| MEMORY_AND_DISK_SER | 基本含義同MEMORY_AND_DISK。唯一的區(qū)別是會(huì)先序列化,節(jié)約內(nèi)存。 |
| DISK_ONLY | 使用未序列化的Java對(duì)象格式,將數(shù)據(jù)全部寫(xiě)入磁盤(pán)文件中。一般不推薦使用。 |
| MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等. | 對(duì)于上述任意一種持久化策略,如果加上后綴_2,代表的是將每個(gè)持久化的數(shù)據(jù),都復(fù)制一份副本,并將副本保存到其他節(jié)點(diǎn)上。這種基于副本的持久化機(jī)制主要用于進(jìn)行容錯(cuò)。假如某個(gè)節(jié)點(diǎn)掛掉,節(jié)點(diǎn)的內(nèi)存或磁盤(pán)中的持久化數(shù)據(jù)丟失了,那么后續(xù)對(duì)RDD計(jì)算時(shí)還可以使用該數(shù)據(jù)在其他節(jié)點(diǎn)上的副本。如果沒(méi)有副本的話,就只能將這些數(shù)據(jù)從源頭處重新計(jì)算一遍了。一般也不推薦使用。 |
2. 盡量避免使用低性能算子
shuffle類(lèi)算子算是低性能算子的一種代表,所謂的shuffle類(lèi)算子,指的是會(huì)產(chǎn)生shuffle過(guò)程的操作,就是需要把各個(gè)節(jié)點(diǎn)上的相同key寫(xiě)入到本地磁盤(pán)文件中,然后其他的節(jié)點(diǎn)通過(guò)網(wǎng)絡(luò)傳輸拉取自己需要的key,把相同key拉到同一個(gè)節(jié)點(diǎn)上進(jìn)行聚合計(jì)算,這種操作必然就是有大量的數(shù)據(jù)網(wǎng)絡(luò)傳輸與磁盤(pán)讀寫(xiě)操作,性能往往不是很好的。
那么,Spark中有哪些算子會(huì)產(chǎn)生shuffle過(guò)程呢?
| 操作類(lèi)別 | shuffle類(lèi)算子 | 備注 |
|---|---|---|
| 分區(qū)操作 | repartition()、repartitionAndSortWithinPartitions()、coalesce(shuffle=true) | 重分區(qū)操作一般都會(huì)shuffle,因?yàn)樾枰獙?duì)所有的分區(qū)數(shù)據(jù)進(jìn)行打亂。 |
| 聚合操作 | reduceByKey、groupByKey、sortByKey | 需要對(duì)相同key進(jìn)行操作,所以需要拉到同一個(gè)節(jié)點(diǎn)上。 |
| 關(guān)聯(lián)操作 | join類(lèi)操作 | 需要把相同key的數(shù)據(jù)shuffle到同一個(gè)節(jié)點(diǎn)然后進(jìn)行笛卡爾積 |
| 去重操作 | distinct等 | 需要對(duì)相同key進(jìn)行操作,所以需要shuffle到同一個(gè)節(jié)點(diǎn)上。 |
| 排序操作 | sortByKey等 | 需要對(duì)相同key進(jìn)行操作,所以需要shuffle到同一個(gè)節(jié)點(diǎn)上。 |
這里進(jìn)一步介紹一個(gè)替代join的方案,因?yàn)閖oin其實(shí)在業(yè)務(wù)中還是蠻常見(jiàn)的。
# 原則2:盡量避免使用低性能算子
rdd1 = sc.parallelize([('A1', 211), ('A1', 212), ('A2', 22), ('A4', 24), ('A5', 25)])
rdd2 = sc.parallelize([('A1', 11), ('A2', 12), ('A3', 13), ('A4', 14)])
# 低效的寫(xiě)法,也是傳統(tǒng)的寫(xiě)法,直接join
rdd_join = rdd1.join(rdd2)
print(rdd_join.collect())
# [('A4', (24, 14)), ('A2', (22, 12)), ('A1', (211, 11)), ('A1', (212, 11))]
rdd_left_join = rdd1.leftOuterJoin(rdd2)
print(rdd_left_join.collect())
# [('A4', (24, 14)), ('A2', (22, 12)), ('A5', (25, None)), ('A1', (211, 11)), ('A1', (212, 11))]
rdd_full_join = rdd1.fullOuterJoin(rdd2)
print(rdd_full_join.collect())
# [('A4', (24, 14)), ('A3', (None, 13)), ('A2', (22, 12)), ('A5', (25, None)), ('A1', (211, 11)), ('A1', (212, 11))]
# 高效的寫(xiě)法,使用廣播+map來(lái)實(shí)現(xiàn)相同效果
# tips1: 這里需要注意的是,用來(lái)broadcast的RDD不可以太大,最好不要超過(guò)1G
# tips2: 這里需要注意的是,用來(lái)broadcast的RDD不可以有重復(fù)的key的
rdd1 = sc.parallelize([('A1', 11), ('A2', 12), ('A3', 13), ('A4', 14)])
rdd2 = sc.parallelize([('A1', 211), ('A1', 212), ('A2', 22), ('A4', 24), ('A5', 25)])
# step1: 先將小表進(jìn)行廣播,也就是collect到driver端,然后廣播到每個(gè)Executor中去。
rdd_small_bc = sc.broadcast(rdd1.collect())
# step2:從Executor中獲取存入字典便于后續(xù)map操作
rdd_small_dict = dict(rdd_small_bc.value)
# step3:定義join方法
def broadcast_join(line, rdd_small_dict, join_type):
k = line[0]
v = line[1]
small_table_v = rdd_small_dict[k] if k in rdd_small_dict else None
if join_type == 'join':
return (k, (v, small_table_v)) if k in rdd_small_dict else None
elif join_type == 'left_join':
return (k, (v, small_table_v if small_table_v is not None else None))
else:
print("not support join type!")
# step4:使用 map 實(shí)現(xiàn) 兩個(gè)表join的功能
rdd_join = rdd2.map(lambda line: broadcast_join(line, rdd_small_dict, "join")).filter(lambda line: line is not None)
rdd_left_join = rdd2.map(lambda line: broadcast_join(line, rdd_small_dict, "left_join")).filter(lambda line: line is not None)
print(rdd_join.collect())
print(rdd_left_join.collect())
# [('A1', (211, 11)), ('A1', (212, 11)), ('A2', (22, 12)), ('A4', (24, 14))]
# [('A1', (211, 11)), ('A1', (212, 11)), ('A2', (22, 12)), ('A4', (24, 14)), ('A5', (25, None))]
上面的RDD join被改寫(xiě)為 broadcast+map的PySpark版本實(shí)現(xiàn),不過(guò)里面有兩個(gè)點(diǎn)需要注意:
tips1: 用來(lái)broadcast的RDD不可以太大,最好不要超過(guò)1G tips2: 用來(lái)broadcast的RDD不可以有重復(fù)的key的
3. 盡量使用高性能算子
上一節(jié)講到了低效算法,自然地就會(huì)有一些高效的算子。
| 原算子 | 高效算子(替換算子) | 說(shuō)明 |
|---|---|---|
| map | mapPartitions | 直接map的話,每次只會(huì)處理一條數(shù)據(jù),而mapPartitions則是每次處理一個(gè)分區(qū)的數(shù)據(jù),在某些場(chǎng)景下相對(duì)比較高效。(分區(qū)數(shù)據(jù)量不大的情況下使用,如果有數(shù)據(jù)傾斜的話容易發(fā)生OOM) |
| groupByKey | reduceByKey/aggregateByKey | 這類(lèi)算子會(huì)在原節(jié)點(diǎn)先map-side預(yù)聚合,相對(duì)高效些。 |
| foreach | foreachPartitions | 同第一條記錄一樣。 |
| filter | filter+coalesce | 當(dāng)我們對(duì)數(shù)據(jù)進(jìn)行filter之后,有很多partition的數(shù)據(jù)會(huì)劇減,然后直接進(jìn)行下一步操作的話,可能就partition數(shù)量很多但處理的數(shù)據(jù)又很少,task數(shù)量沒(méi)有減少,反而整體速度很慢;但如果執(zhí)行了coalesce算子,就會(huì)減少一些partition數(shù)量,把數(shù)據(jù)都相對(duì)壓縮到一起,用更少的task處理完全部數(shù)據(jù),一定場(chǎng)景下還是可以達(dá)到整體性能的提升。 |
| repartition+sort | repartitionAndSortWithinPartitions | 直接用就是了。 |
4. 廣播大變量
如果我們有一個(gè)數(shù)據(jù)集很大,并且在后續(xù)的算子執(zhí)行中會(huì)被反復(fù)調(diào)用,那么就建議直接把它廣播(broadcast)一下。當(dāng)變量被廣播后,會(huì)保證每個(gè)executor的內(nèi)存中只會(huì)保留一份副本,同個(gè)executor內(nèi)的task都可以共享這個(gè)副本數(shù)據(jù)。如果沒(méi)有廣播,常規(guī)過(guò)程就是把大變量進(jìn)行網(wǎng)絡(luò)傳輸?shù)矫恳粋€(gè)相關(guān)task中去,這樣子做,一來(lái)頻繁的網(wǎng)絡(luò)數(shù)據(jù)傳輸,效率極其低下;二來(lái)executor下的task不斷存儲(chǔ)同一份大數(shù)據(jù),很有可能就造成了內(nèi)存溢出或者頻繁GC,效率也是極其低下的。
# 原則4:廣播大變量
rdd1 = sc.parallelize([('A1', 11), ('A2', 12), ('A3', 13), ('A4', 14)])
rdd1_broadcast = sc.broadcast(rdd1.collect())
print(rdd1.collect())
print(rdd1_broadcast.value)
# [('A1', 11), ('A2', 12), ('A3', 13), ('A4', 14)]
# [('A1', 11), ('A2', 12), ('A3', 13), ('A4', 14)]
資源參數(shù)調(diào)優(yōu)
如果要進(jìn)行資源調(diào)優(yōu),我們就必須先知道Spark運(yùn)行的機(jī)制與流程。

下面我們就來(lái)講解一些常用的Spark資源配置的參數(shù)吧,了解其參數(shù)原理便于我們依據(jù)實(shí)際的數(shù)據(jù)情況進(jìn)行配置。
1)num-executors
指的是執(zhí)行器的數(shù)量,數(shù)量的多少代表了并行的stage數(shù)量(假如executor是單核的話),但也并不是越多越快,受你集群資源的限制,所以一般設(shè)置50-100左右吧。
2)executor-memory
這里指的是每一個(gè)執(zhí)行器的內(nèi)存大小,內(nèi)存越大當(dāng)然對(duì)于程序運(yùn)行是很好的了,但是也不是無(wú)節(jié)制地大下去,同樣受我們集群資源的限制。假設(shè)我們集群資源為500core,一般1core配置4G內(nèi)存,所以集群最大的內(nèi)存資源只有2000G左右。num-executors x executor-memory 是不能超過(guò)2000G的,但是也不要太接近這個(gè)值,不然的話集群其他同事就沒(méi)法正常跑數(shù)據(jù)了,一般我們?cè)O(shè)置4G-8G。
3)executor-cores
這里設(shè)置的是executor的CPU core數(shù)量,決定了executor進(jìn)程并行處理task的能力。
4)driver-memory
設(shè)置driver的內(nèi)存,一般設(shè)置2G就好了。但如果想要做一些Python的DataFrame操作可以適當(dāng)?shù)匕堰@個(gè)值設(shè)大一些。
5)driver-cores
與executor-cores類(lèi)似的功能。
6)spark.default.parallelism
設(shè)置每個(gè)stage的task數(shù)量。一般Spark任務(wù)我們?cè)O(shè)置task數(shù)量在500-1000左右比較合適,如果不去設(shè)置的話,Spark會(huì)根據(jù)底層HDFS的block數(shù)量來(lái)自行設(shè)置task數(shù)量。有的時(shí)候會(huì)設(shè)置得偏少,這樣子程序就會(huì)跑得很慢,即便你設(shè)置了很多的executor,但也沒(méi)有用。
下面說(shuō)一個(gè)基本的參數(shù)設(shè)置的shell腳本,一般我們都是通過(guò)一個(gè)shell腳本來(lái)設(shè)置資源參數(shù)配置,接著就去調(diào)用我們的主函數(shù)。
#!/bin/bash
basePath=$(cd "$(dirname )"$(cd "$(dirname "$0"): pwd)")": pwd)
spark-submit \
--master yarn \
--queue samshare \
--deploy-mode client \
--num-executors 100 \
--executor-memory 4G \
--executor-cores 4 \
--driver-memory 2G \
--driver-cores 2 \
--conf spark.default.parallelism=1000 \
--conf spark.yarn.executor.memoryOverhead=8G \
--conf spark.sql.shuffle.partitions=1000 \
--conf spark.network.timeout=1200 \
--conf spark.python.worker.memory=64m \
--conf spark.sql.catalogImplementation=hive \
--conf spark.sql.crossJoin.enabled=True \
--conf spark.dynamicAllocation.enabled=True \
--conf spark.shuffle.service.enabled=True \
--conf spark.scheduler.listenerbus.eventqueue.size=100000 \
--conf spark.pyspark.driver.python=python3 \
--conf spark.pyspark.python=python3 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3 \
--conf spark.sql.pivotMaxValues=500000 \
--conf spark.hadoop.hive.exec.dynamic.partition=True \
--conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \
--conf spark.hadoop.hive.exec.max.dynamic.partitions.pernode=100000 \
--conf spark.hadoop.hive.exec.max.dynamic.partitions=100000 \
--conf spark.hadoop.hive.exec.max.created.files=100000 \
${bashPath}/project_name/main.py $v_var1 $v_var2
數(shù)據(jù)傾斜調(diào)優(yōu)
相信我們對(duì)于數(shù)據(jù)傾斜并不陌生了,很多時(shí)間數(shù)據(jù)跑不出來(lái)有很大的概率就是出現(xiàn)了數(shù)據(jù)傾斜,在Spark開(kāi)發(fā)中無(wú)法避免的也會(huì)遇到這類(lèi)問(wèn)題,而這不是一個(gè)嶄新的問(wèn)題,成熟的解決方案也是有蠻多的,今天來(lái)簡(jiǎn)單介紹一些比較常用并且有效的方案。
首先我們要知道,在Spark中比較容易出現(xiàn)傾斜的操作,主要集中在distinct、groupByKey、reduceByKey、aggregateByKey、join、repartition等,可以?xún)?yōu)先看這些操作的前后代碼。而為什么使用了這些操作就容易導(dǎo)致數(shù)據(jù)傾斜呢?大多數(shù)情況就是進(jìn)行操作的key分布不均,然后使得大量的數(shù)據(jù)集中在同一個(gè)處理節(jié)點(diǎn)上,從而發(fā)生了數(shù)據(jù)傾斜。
查看Key 分布
# 針對(duì)Spark SQL
hc.sql("select key, count(0) nums from table_name group by key")
# 針對(duì)RDD
RDD.countByKey()
Plan A: 過(guò)濾掉導(dǎo)致傾斜的key
這個(gè)方案并不是所有場(chǎng)景都可以使用的,需要結(jié)合業(yè)務(wù)邏輯來(lái)分析這個(gè)key到底還需要不需要,大多數(shù)情況可能就是一些異常值或者空串,這種就直接進(jìn)行過(guò)濾就好了。
Plan B: 提前處理聚合
如果有些Spark應(yīng)用場(chǎng)景需要頻繁聚合數(shù)據(jù),而數(shù)據(jù)key又少的,那么我們可以把這些存量數(shù)據(jù)先用hive算好(每天算一次),然后落到中間表,后續(xù)Spark應(yīng)用直接用聚合好的表+新的數(shù)據(jù)進(jìn)行二度聚合,效率會(huì)有很高的提升。
Plan C: 調(diào)高shuffle并行度
# 針對(duì)Spark SQL
--conf spark.sql.shuffle.partitions=1000 # 在配置信息中設(shè)置參數(shù)
# 針對(duì)RDD
rdd.reduceByKey(1000) # 默認(rèn)是200
Plan D: 分配隨機(jī)數(shù)再聚合
大概的思路就是對(duì)一些大量出現(xiàn)的key,人工打散,從而可以利用多個(gè)task來(lái)增加任務(wù)并行度,以達(dá)到效率提升的目的,下面是代碼demo,分別從RDD 和 SparkSQL來(lái)實(shí)現(xiàn)。
# Way1: PySpark RDD實(shí)現(xiàn)
import pyspark
from pyspark import SparkContext, SparkConf, HiveContext
from random import randint
import pandas as pd
# SparkSQL的許多功能封裝在SparkSession的方法接口中, SparkContext則不行的。
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("sam_SamShare") \
.config("master", "local[4]") \
.enableHiveSupport() \
.getOrCreate()
conf = SparkConf().setAppName("test_SamShare").setMaster("local[4]")
sc = SparkContext(conf=conf)
hc = HiveContext(sc)
# 分配隨機(jī)數(shù)再聚合
rdd1 = sc.parallelize([('sam', 1), ('sam', 1), ('sam', 1), ('sam', 1), ('sam', 1), ('sam', 1)])
# 給key分配隨機(jī)數(shù)后綴
rdd2 = rdd1.map(lambda x: (x[0] + "_" + str(randint(1,5)), x[1]))
print(rdd.take(10))
# [('sam_5', 1), ('sam_5', 1), ('sam_3', 1), ('sam_5', 1), ('sam_5', 1), ('sam_3', 1)]
# 局部聚合
rdd3 = rdd2.reduceByKey(lambda x,y : (x+y))
print(rdd3.take(10))
# [('sam_5', 4), ('sam_3', 2)]
# 去除后綴
rdd4 = rdd3.map(lambda x: (x[0][:-2], x[1]))
print(rdd4.take(10))
# [('sam', 4), ('sam', 2)]
# 全局聚合
rdd5 = rdd4.reduceByKey(lambda x,y : (x+y))
print(rdd5.take(10))
# [('sam', 6)]
# Way2: PySpark SparkSQL實(shí)現(xiàn)
df = pd.DataFrame(5*[['Sam', 1],['Flora', 1]],
columns=['name', 'nums'])
Spark_df = spark.createDataFrame(df)
print(Spark_df.show(10))
Spark_df.createOrReplaceTempView("tmp_table") # 注冊(cè)為視圖供SparkSQl使用
sql = """
with t1 as (
select concat(name,"_",int(10*rand())) as new_name, name, nums
from tmp_table
),
t2 as (
select new_name, sum(nums) as n
from t1
group by new_name
),
t3 as (
select substr(new_name,0,length(new_name) -2) as name, sum(n) as nums_sum
from t2
group by substr(new_name,0,length(new_name) -2)
)
select *
from t3
"""
tt = hc.sql(sql).toPandas()
tt
下面是原理圖。


推薦閱讀
歡迎長(zhǎng)按掃碼關(guān)注「數(shù)據(jù)管道」
??學(xué)習(xí)資源推薦
1)edureka about PySpark Tutorial
印度老哥的課程,B站可直接看,不過(guò)口音略難聽(tīng)懂不過(guò)還好有字幕。
https://www.bilibili.com/video/BV1i4411i79a?p=1
2)eat_pyspark_in_10_days
梁云大哥的課程,講得超級(jí)清晰,建議精讀。
https://github.com/lyhue1991/eat_pyspark_in_10_days
3)官方文檔
http://spark.apache.org/docs/latest/api/python/reference/index.html
4)《Spark性能優(yōu)化指南——基礎(chǔ)篇》
https://tech.meituan.com/2016/04/29/spark-tuning-basic.html
5)《Spark性能優(yōu)化指南——高級(jí)篇》
https://tech.meituan.com/2016/05/12/spark-tuning-pro.html
