PySpark入門級學(xué)習(xí)教程,框架思維(上)

為什么要學(xué)習(xí)Spark?作為數(shù)據(jù)從業(yè)者多年,個人覺得Spark已經(jīng)越來越走進我們的日常工作了,無論是使用哪種編程語言,Python、Scala還是Java,都會或多或少接觸到Spark,它可以讓我們能夠用到集群的力量,可以對BigData進行高效操作,實現(xiàn)很多之前由于計算資源而無法輕易實現(xiàn)的東西。網(wǎng)上有很多關(guān)于Spark的好處,這里就不做過多的贅述,我們直接進入這篇文章的正文!

關(guān)于PySpark,我們知道它是Python調(diào)用Spark的接口,我們可以通過調(diào)用Python API的方式來編寫Spark程序,它支持了大多數(shù)的Spark功能,比如SparkDataFrame、Spark SQL、Streaming、MLlib等等。只要我們了解Python的基本語法,那么在Python里調(diào)用Spark的力量就顯得十分easy了。下面我將會從相對宏觀的層面介紹一下PySpark,讓我們對于這個神器有一個框架性的認識,知道它能干什么,知道去哪里尋找問題解答,爭取看完這篇文章可以讓我們更加絲滑地入門PySpark。話不多說,馬上開始!
?? 目錄:
安裝指引 基礎(chǔ)概念 常用函數(shù) Sparksql使用 調(diào)優(yōu)思路 學(xué)習(xí)資源推薦
?? 安裝指引:
安裝這塊本文就不展開具體的步驟了,畢竟大家的機子環(huán)境都不盡相同。不過可以簡單說幾點重要的步驟,然后節(jié)末放上一些安裝示例供大家參考。
下面是一些示例,可以參考下:
1)Mac下安裝spark,并配置pycharm-pyspark完整教程
https://blog.csdn.net/shiyutianming/article/details/99946797
2)virtualBox里安裝開發(fā)環(huán)境
https://www.bilibili.com/video/BV1i4411i79a?p=3
3)快速搭建spark開發(fā)環(huán)境,云哥項目
https://github.com/lyhue1991/eat_pyspark_in_10_days
?? 基礎(chǔ)概念
關(guān)于Spark的基礎(chǔ)概念,我在先前的文章里也有寫過,大家可以一起來回顧一下 《想學(xué)習(xí)Spark?先帶你了解一些基礎(chǔ)的知識》。作為補充,今天在這里也介紹一些在Spark中會經(jīng)常遇見的專有名詞。
???♀? Q1: 什么是RDD
RDD的全稱是 Resilient Distributed Datasets,這是Spark的一種數(shù)據(jù)抽象集合,它可以被執(zhí)行在分布式的集群上進行各種操作,而且有較強的容錯機制。RDD可以被分為若干個分區(qū),每一個分區(qū)就是一個數(shù)據(jù)集片段,從而可以支持分布式計算。
???♀? Q2: RDD運行時相關(guān)的關(guān)鍵名詞
簡單來說可以有 Client、Job、Master、Worker、Driver、Stage、Task以及Executor,這幾個東西在調(diào)優(yōu)的時候也會經(jīng)常遇到的。
Client:指的是客戶端進程,主要負責(zé)提交job到Master;
Job:Job來自于我們編寫的程序,Application包含一個或者多個job,job包含各種RDD操作;
Master:指的是Standalone模式中的主控節(jié)點,負責(zé)接收來自Client的job,并管理著worker,可以給worker分配任務(wù)和資源(主要是driver和executor資源);
Worker:指的是Standalone模式中的slave節(jié)點,負責(zé)管理本節(jié)點的資源,同時受Master管理,需要定期給Master回報heartbeat(心跳),啟動Driver和Executor;
Driver:指的是 job(作業(yè))的主進程,一般每個Spark作業(yè)都會有一個Driver進程,負責(zé)整個作業(yè)的運行,包括了job的解析、Stage的生成、調(diào)度Task到Executor上去執(zhí)行;
Stage:中文名 階段,是job的基本調(diào)度單位,因為每個job會分成若干組Task,每組任務(wù)就被稱為 Stage;
Task:任務(wù),指的是直接運行在executor上的東西,是executor上的一個線程;
Executor:指的是 執(zhí)行器,顧名思義就是真正執(zhí)行任務(wù)的地方了,一個集群可以被配置若干個Executor,每個Executor接收來自Driver的Task,并執(zhí)行它(可同時執(zhí)行多個Task)。
???♀? Q3: 什么是DAG
全稱是 Directed Acyclic Graph,中文名是有向無環(huán)圖。Spark就是借用了DAG對RDD之間的關(guān)系進行了建模,用來描述RDD之間的因果依賴關(guān)系。因為在一個Spark作業(yè)調(diào)度中,多個作業(yè)任務(wù)之間也是相互依賴的,有些任務(wù)需要在一些任務(wù)執(zhí)行完成了才可以執(zhí)行的。在Spark調(diào)度中就是有DAGscheduler,它負責(zé)將job分成若干組Task組成的Stage。

???♀? Q4: Spark的部署模式有哪些
主要有l(wèi)ocal模式、Standalone模式、Mesos模式、YARN模式。
更多的解釋可以參考這位老哥的解釋。https://www.jianshu.com/p/3b8f85329664
???♀? Q5: Shuffle操作是什么
Shuffle指的是數(shù)據(jù)從Map端到Reduce端的數(shù)據(jù)傳輸過程,Shuffle性能的高低直接會影響程序的性能。因為Reduce task需要跨節(jié)點去拉在分布在不同節(jié)點上的Map task計算結(jié)果,這一個過程是需要有磁盤IO消耗以及數(shù)據(jù)網(wǎng)絡(luò)傳輸?shù)南牡?,所以需要根?jù)實際數(shù)據(jù)情況進行適當(dāng)調(diào)整。另外,Shuffle可以分為兩部分,分別是Map階段的數(shù)據(jù)準(zhǔn)備與Reduce階段的數(shù)據(jù)拷貝處理,在Map端我們叫Shuffle Write,在Reduce端我們叫Shuffle Read。
???♀? Q6: 什么是惰性執(zhí)行
這是RDD的一個特性,在RDD中的算子可以分為Transform算子和Action算子,其中Transform算子的操作都不會真正執(zhí)行,只會記錄一下依賴關(guān)系,直到遇見了Action算子,在這之前的所有Transform操作才會被觸發(fā)計算,這就是所謂的惰性執(zhí)行。具體哪些是Transform和Action算子,可以看下一節(jié)。
?? 常用函數(shù)
從網(wǎng)友的總結(jié)來看比較常用的算子大概可以分為下面幾種,所以就演示一下這些算子,如果需要看更多的算子或者解釋,建議可以移步到官方API文檔去Search一下哈。
pyspark.RDD:http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD

下面我們用自己創(chuàng)建的RDD:sc.parallelize(range(1,11),4)
import os
import pyspark
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("test_SamShare").setMaster("local[4]")
sc = SparkContext(conf=conf)
# 使用 parallelize方法直接實例化一個RDD
rdd = sc.parallelize(range(1,11),4) # 這里的 4 指的是分區(qū)數(shù)量
rdd.take(100)
# [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
"""
----------------------------------------------
Transform算子解析
----------------------------------------------
"""
# 以下的操作由于是Transform操作,因為我們需要在最后加上一個collect算子用來觸發(fā)計算。
# 1. map: 和python差不多,map轉(zhuǎn)換就是對每一個元素進行一個映射
rdd = sc.parallelize(range(1, 11), 4)
rdd_map = rdd.map(lambda x: x*2)
print("原始數(shù)據(jù):", rdd.collect())
print("擴大2倍:", rdd_map.collect())
# 原始數(shù)據(jù): [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 擴大2倍: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
# 2. flatMap: 這個相比于map多一個flat(壓平)操作,顧名思義就是要把高維的數(shù)組變成一維
rdd2 = sc.parallelize(["hello SamShare", "hello PySpark"])
print("原始數(shù)據(jù):", rdd2.collect())
print("直接split之后的map結(jié)果:", rdd2.map(lambda x: x.split(" ")).collect())
print("直接split之后的flatMap結(jié)果:", rdd2.flatMap(lambda x: x.split(" ")).collect())
# 直接split之后的map結(jié)果: [['hello', 'SamShare'], ['hello', 'PySpark']]
# 直接split之后的flatMap結(jié)果: ['hello', 'SamShare', 'hello', 'PySpark']
# 3. filter: 過濾數(shù)據(jù)
rdd = sc.parallelize(range(1, 11), 4)
print("原始數(shù)據(jù):", rdd.collect())
print("過濾奇數(shù):", rdd.filter(lambda x: x % 2 == 0).collect())
# 原始數(shù)據(jù): [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 過濾奇數(shù): [2, 4, 6, 8, 10]
# 4. distinct: 去重元素
rdd = sc.parallelize([2, 2, 4, 8, 8, 8, 8, 16, 32, 32])
print("原始數(shù)據(jù):", rdd.collect())
print("去重數(shù)據(jù):", rdd.distinct().collect())
# 原始數(shù)據(jù): [2, 2, 4, 8, 8, 8, 8, 16, 32, 32]
# 去重數(shù)據(jù): [4, 8, 16, 32, 2]
# 5. reduceByKey: 根據(jù)key來映射數(shù)據(jù)
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print("原始數(shù)據(jù):", rdd.collect())
print("原始數(shù)據(jù):", rdd.reduceByKey(add).collect())
# 原始數(shù)據(jù): [('a', 1), ('b', 1), ('a', 1)]
# 原始數(shù)據(jù): [('b', 1), ('a', 2)]
# 6. mapPartitions: 根據(jù)分區(qū)內(nèi)的數(shù)據(jù)進行映射操作
rdd = sc.parallelize([1, 2, 3, 4], 2)
def f(iterator):
yield sum(iterator)
print(rdd.collect())
print(rdd.mapPartitions(f).collect())
# [1, 2, 3, 4]
# [3, 7]
# 7. sortBy: 根據(jù)規(guī)則進行排序
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
print(sc.parallelize(tmp).sortBy(lambda x: x[0]).collect())
print(sc.parallelize(tmp).sortBy(lambda x: x[1]).collect())
# [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
# [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
# 8. subtract: 數(shù)據(jù)集相減, Return each value in self that is not contained in other.
x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
y = sc.parallelize([("a", 3), ("c", None)])
print(sorted(x.subtract(y).collect()))
# [('a', 1), ('b', 4), ('b', 5)]
# 9. union: 合并兩個RDD
rdd = sc.parallelize([1, 1, 2, 3])
print(rdd.union(rdd).collect())
# [1, 1, 2, 3, 1, 1, 2, 3]
# 10. intersection: 取兩個RDD的交集,同時有去重的功效
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5, 2, 3])
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
print(rdd1.intersection(rdd2).collect())
# [1, 2, 3]
# 11. cartesian: 生成笛卡爾積
rdd = sc.parallelize([1, 2])
print(sorted(rdd.cartesian(rdd).collect()))
# [(1, 1), (1, 2), (2, 1), (2, 2)]
# 12. zip: 拉鏈合并,需要兩個RDD具有相同的長度以及分區(qū)數(shù)量
x = sc.parallelize(range(0, 5))
y = sc.parallelize(range(1000, 1005))
print(x.collect())
print(y.collect())
print(x.zip(y).collect())
# [0, 1, 2, 3, 4]
# [1000, 1001, 1002, 1003, 1004]
# [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
# 13. zipWithIndex: 將RDD和一個從0開始的遞增序列按照拉鏈方式連接。
rdd_name = sc.parallelize(["LiLei", "Hanmeimei", "Lily", "Lucy", "Ann", "Dachui", "RuHua"])
rdd_index = rdd_name.zipWithIndex()
print(rdd_index.collect())
# [('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]
# 14. groupByKey: 按照key來聚合數(shù)據(jù)
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(rdd.collect())
print(sorted(rdd.groupByKey().mapValues(len).collect()))
print(sorted(rdd.groupByKey().mapValues(list).collect()))
# [('a', 1), ('b', 1), ('a', 1)]
# [('a', 2), ('b', 1)]
# [('a', [1, 1]), ('b', [1])]
# 15. sortByKey:
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
print(sc.parallelize(tmp).sortByKey(True, 1).collect())
# [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
# 16. join:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
print(sorted(x.join(y).collect()))
# [('a', (1, 2)), ('a', (1, 3))]
# 17. leftOuterJoin/rightOuterJoin
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
print(sorted(x.leftOuterJoin(y).collect()))
# [('a', (1, 2)), ('b', (4, None))]
"""
----------------------------------------------
Action算子解析
----------------------------------------------
"""
# 1. collect: 指的是把數(shù)據(jù)都匯集到driver端,便于后續(xù)的操作
rdd = sc.parallelize(range(0, 5))
rdd_collect = rdd.collect()
print(rdd_collect)
# [0, 1, 2, 3, 4]
# 2. first: 取第一個元素
sc.parallelize([2, 3, 4]).first()
# 2
# 3. collectAsMap: 轉(zhuǎn)換為dict,使用這個要注意了,不要對大數(shù)據(jù)用,不然全部載入到driver端會爆內(nèi)存
m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
m
# {1: 2, 3: 4}
# 4. reduce: 逐步對兩個元素進行操作
rdd = sc.parallelize(range(10),5)
print(rdd.reduce(lambda x,y:x+y))
# 45
# 5. countByKey/countByValue:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(sorted(rdd.countByKey().items()))
print(sorted(rdd.countByValue().items()))
# [('a', 2), ('b', 1)]
# [(('a', 1), 2), (('b', 1), 1)]
# 6. take: 相當(dāng)于取幾個數(shù)據(jù)到driver端
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(rdd.take(5))
# [('a', 1), ('b', 1), ('a', 1)]
# 7. saveAsTextFile: 保存rdd成text文件到本地
text_file = "./data/rdd.txt"
rdd = sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)
# 8. takeSample: 隨機取數(shù)
rdd = sc.textFile("./test/data/hello_samshare.txt", 4) # 這里的 4 指的是分區(qū)數(shù)量
rdd_sample = rdd.takeSample(True, 2, 0) # withReplacement 參數(shù)1:代表是否是有放回抽樣
rdd_sample
# 9. foreach: 對每一個元素執(zhí)行某種操作,不生成新的RDD
rdd = sc.parallelize(range(10), 5)
accum = sc.accumulator(0)
rdd.foreach(lambda x: accum.add(x))
print(accum.value)
# 45
Sam:未完待續(xù)... 文章較長,分上下兩篇文章來寫哈。
