PySpark入門級學(xué)習(xí)教程,框架思維(中)

“這周工作好忙,晚上陸陸續(xù)續(xù)寫了好幾波,周末來一次集合輸出,不過這個PySpark原定是分上下兩篇的,但是越學(xué)感覺越多,所以就分成了3 Parts,今天這一part主要就是講一下Spark SQL,這個實在好用!建議收藏學(xué)習(xí)哈哈。上一節(jié)的可點擊回顧下哈。《PySpark入門級學(xué)習(xí)教程,框架思維(上)》
?? Spark SQL使用
在講Spark SQL前,先解釋下這個模塊。這個模塊是Spark中用來處理結(jié)構(gòu)化數(shù)據(jù)的,提供一個叫SparkDataFrame的東西并且自動解析為分布式SQL查詢數(shù)據(jù)。我們之前用過Python的Pandas庫,也大致了解了DataFrame,這個其實和它沒有太大的區(qū)別,只是調(diào)用的API可能有些不同罷了。
我們通過使用Spark SQL來處理數(shù)據(jù),會讓我們更加地熟悉,比如可以用SQL語句、用SparkDataFrame的API或者Datasets API,我們可以按照需求隨心轉(zhuǎn)換,通過SparkDataFrame API 和 SQL 寫的邏輯,會被Spark優(yōu)化器Catalyst自動優(yōu)化成RDD,即便寫得不好也可能運行得很快(如果是直接寫RDD可能就掛了哈哈)。
創(chuàng)建SparkDataFrame
開始講SparkDataFrame,我們先學(xué)習(xí)下幾種創(chuàng)建的方法,分別是使用RDD來創(chuàng)建、使用python的DataFrame來創(chuàng)建、使用List來創(chuàng)建、讀取數(shù)據(jù)文件來創(chuàng)建、通過讀取數(shù)據(jù)庫來創(chuàng)建。
1. 使用RDD來創(chuàng)建
主要使用RDD的toDF方法。
rdd = sc.parallelize([("Sam", 28, 88), ("Flora", 28, 90), ("Run", 1, 60)])
df = rdd.toDF(["name", "age", "score"])
df.show()
df.printSchema()
# +-----+---+-----+
# | name|age|score|
# +-----+---+-----+
# | Sam| 28| 88|
# |Flora| 28| 90|
# | Run| 1| 60|
# +-----+---+-----+
# root
# |-- name: string (nullable = true)
# |-- age: long (nullable = true)
# |-- score: long (nullable = true)
2. 使用python的DataFrame來創(chuàng)建
df = pd.DataFrame([['Sam', 28, 88], ['Flora', 28, 90], ['Run', 1, 60]],
columns=['name', 'age', 'score'])
print(">> 打印DataFrame:")
print(df)
print("\n")
Spark_df = spark.createDataFrame(df)
print(">> 打印SparkDataFrame:")
Spark_df.show()
# >> 打印DataFrame:
# name age score
# 0 Sam 28 88
# 1 Flora 28 90
# 2 Run 1 60
# >> 打印SparkDataFrame:
# +-----+---+-----+
# | name|age|score|
# +-----+---+-----+
# | Sam| 28| 88|
# |Flora| 28| 90|
# | Run| 1| 60|
# +-----+---+-----+
3. 使用List來創(chuàng)建
list_values = [['Sam', 28, 88], ['Flora', 28, 90], ['Run', 1, 60]]
Spark_df = spark.createDataFrame(list_values, ['name', 'age', 'score'])
Spark_df.show()
# +-----+---+-----+
# | name|age|score|
# +-----+---+-----+
# | Sam| 28| 88|
# |Flora| 28| 90|
# | Run| 1| 60|
# +-----+---+-----+
4. 讀取數(shù)據(jù)文件來創(chuàng)建
# 4.1 CSV文件
df = spark.read.option("header", "true")\
.option("inferSchema", "true")\
.option("delimiter", ",")\
.csv("./test/data/titanic/train.csv")
df.show(5)
df.printSchema()
# 4.2 json文件
df = spark.read.json("./test/data/hello_samshare.json")
df.show(5)
df.printSchema()
5. 通過讀取數(shù)據(jù)庫來創(chuàng)建
# 5.1 讀取hive數(shù)據(jù)
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'data/kv1.txt' INTO TABLE src")
df = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
df.show(5)
# 5.2 讀取mysql數(shù)據(jù)
url = "jdbc:mysql://localhost:3306/test"
df = spark.read.format("jdbc") \
.option("url", url) \
.option("dbtable", "runoob_tbl") \
.option("user", "root") \
.option("password", "8888") \
.load()\
df.show()
常用的SparkDataFrame API
這里我大概是分成了幾部分來看這些APIs,分別是查看DataFrame的APIs、簡單處理DataFrame的APIs、DataFrame的列操作APIs、DataFrame的一些思路變換操作APIs、DataFrame的一些統(tǒng)計操作APIs,這樣子也有助于我們了解這些API的功能,以后遇見實際問題的時候可以解決。
首先我們這小節(jié)全局用到的數(shù)據(jù)集如下:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
# SparkSQL的許多功能封裝在SparkSession的方法接口中, SparkContext則不行的。
spark = SparkSession.builder \
.appName("sam_SamShare") \
.config("master", "local[4]") \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext
# 創(chuàng)建一個SparkDataFrame
rdd = sc.parallelize([("Sam", 28, 88, "M"),
("Flora", 28, 90, "F"),
("Run", 1, 60, None),
("Peter", 55, 100, "M"),
("Mei", 54, 95, "F")])
df = rdd.toDF(["name", "age", "score", "sex"])
df.show()
df.printSchema()
# +-----+---+-----+----+
# | name|age|score| sex|
# +-----+---+-----+----+
# | Sam| 28| 88| M|
# |Flora| 28| 90| F|
# | Run| 1| 60|null|
# |Peter| 55| 100| M|
# | Mei| 54| 95| F|
# +-----+---+-----+----+
# root
# |-- name: string (nullable = true)
# |-- age: long (nullable = true)
# |-- score: long (nullable = true)
# |-- sex: string (nullable = true)
1、查看DataFrame的APIs
# DataFrame.collect
# 以列表形式返回行
df.collect()
# [Row(name='Sam', age=28, score=88, sex='M'),
# Row(name='Flora', age=28, score=90, sex='F'),
# Row(name='Run', age=1, score=60, sex=None),
# Row(name='Peter', age=55, score=100, sex='M'),
# Row(name='Mei', age=54, score=95, sex='F')]
# DataFrame.count
df.count()
# 5
# DataFrame.columns
df.columns
# ['name', 'age', 'score', 'sex']
# DataFrame.dtypes
df.dtypes
# [('name', 'string'), ('age', 'bigint'), ('score', 'bigint'), ('sex', 'string')]
# DataFrame.describe
# 返回列的基礎(chǔ)統(tǒng)計信息
df.describe(['age']).show()
# +-------+------------------+
# |summary| age|
# +-------+------------------+
# | count| 5|
# | mean| 33.2|
# | stddev|22.353970564532826|
# | min| 1|
# | max| 55|
# +-------+------------------+
df.describe().show()
# +-------+-----+------------------+------------------+----+
# |summary| name| age| score| sex|
# +-------+-----+------------------+------------------+----+
# | count| 5| 5| 5| 4|
# | mean| null| 33.2| 86.6|null|
# | stddev| null|22.353970564532826|15.582040944625966|null|
# | min|Flora| 1| 60| F|
# | max| Sam| 55| 100| M|
# +-------+-----+------------------+------------------+----+
# DataFrame.select
# 選定指定列并按照一定順序呈現(xiàn)
df.select("sex", "score").show()
# DataFrame.first
# DataFrame.head
# 查看第1條數(shù)據(jù)
df.first()
# Row(name='Sam', age=28, score=88, sex='M')
df.head(1)
# [Row(name='Sam', age=28, score=88, sex='M')]
# DataFrame.freqItems
# 查看指定列的枚舉值
df.freqItems(["age","sex"]).show()
# +---------------+-------------+
# | age_freqItems|sex_freqItems|
# +---------------+-------------+
# |[55, 1, 28, 54]| [M, F,]|
# +---------------+-------------+
# DataFrame.summary
df.summary().show()
# +-------+-----+------------------+------------------+----+
# |summary| name| age| score| sex|
# +-------+-----+------------------+------------------+----+
# | count| 5| 5| 5| 4|
# | mean| null| 33.2| 86.6|null|
# | stddev| null|22.353970564532826|15.582040944625966|null|
# | min|Flora| 1| 60| F|
# | 25%| null| 28| 88|null|
# | 50%| null| 28| 90|null|
# | 75%| null| 54| 95|null|
# | max| Sam| 55| 100| M|
# +-------+-----+------------------+------------------+----+
# DataFrame.sample
# 按照一定規(guī)則從df隨機抽樣數(shù)據(jù)
df.sample(0.5).show()
# +-----+---+-----+----+
# | name|age|score| sex|
# +-----+---+-----+----+
# | Sam| 28| 88| M|
# | Run| 1| 60|null|
# |Peter| 55| 100| M|
# +-----+---+-----+----+
2、簡單處理DataFrame的APIs
# DataFrame.distinct
# 對數(shù)據(jù)集進行去重
df.distinct().show()
# DataFrame.dropDuplicates
# 對指定列去重
df.dropDuplicates(["sex"]).show()
# +-----+---+-----+----+
# | name|age|score| sex|
# +-----+---+-----+----+
# |Flora| 28| 90| F|
# | Run| 1| 60|null|
# | Sam| 28| 88| M|
# +-----+---+-----+----+
# DataFrame.exceptAll
# DataFrame.subtract
# 根據(jù)指定的df對df進行去重
df1 = spark.createDataFrame(
[("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark.createDataFrame([("a", 1), ("b", 3)], ["C1", "C2"])
df3 = df1.exceptAll(df2) # 沒有去重的功效
df4 = df1.subtract(df2) # 有去重的奇效
df1.show()
df2.show()
df3.show()
df4.show()
# +---+---+
# | C1| C2|
# +---+---+
# | a| 1|
# | a| 1|
# | b| 3|
# | c| 4|
# +---+---+
# +---+---+
# | C1| C2|
# +---+---+
# | a| 1|
# | b| 3|
# +---+---+
# +---+---+
# | C1| C2|
# +---+---+
# | a| 1|
# | c| 4|
# +---+---+
# +---+---+
# | C1| C2|
# +---+---+
# | c| 4|
# +---+---+
# DataFrame.intersectAll
# 返回兩個DataFrame的交集
df1 = spark.createDataFrame(
[("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark.createDataFrame([("a", 1), ("b", 4)], ["C1", "C2"])
df1.intersectAll(df2).show()
# +---+---+
# | C1| C2|
# +---+---+
# | a| 1|
# +---+---+
# DataFrame.drop
# 丟棄指定列
df.drop('age').show()
# DataFrame.withColumn
# 新增列
df1 = df.withColumn("birth_year", 2021 - df.age)
df1.show()
# +-----+---+-----+----+----------+
# | name|age|score| sex|birth_year|
# +-----+---+-----+----+----------+
# | Sam| 28| 88| M| 1993|
# |Flora| 28| 90| F| 1993|
# | Run| 1| 60|null| 2020|
# |Peter| 55| 100| M| 1966|
# | Mei| 54| 95| F| 1967|
# +-----+---+-----+----+----------+
# DataFrame.withColumnRenamed
# 重命名列名
df1 = df.withColumnRenamed("sex", "gender")
df1.show()
# +-----+---+-----+------+
# | name|age|score|gender|
# +-----+---+-----+------+
# | Sam| 28| 88| M|
# |Flora| 28| 90| F|
# | Run| 1| 60| null|
# |Peter| 55| 100| M|
# | Mei| 54| 95| F|
# +-----+---+-----+------+
# DataFrame.dropna
# 丟棄空值,DataFrame.dropna(how='any', thresh=None, subset=None)
df.dropna(how='all', subset=['sex']).show()
# +-----+---+-----+---+
# | name|age|score|sex|
# +-----+---+-----+---+
# | Sam| 28| 88| M|
# |Flora| 28| 90| F|
# |Peter| 55| 100| M|
# | Mei| 54| 95| F|
# +-----+---+-----+---+
# DataFrame.fillna
# 空值填充操作
df1 = spark.createDataFrame(
[("a", None), ("a", 1), (None, 3), ("c", 4)], ["C1", "C2"])
# df2 = df1.na.fill({"C1": "d", "C2": 99})
df2 = df1.fillna({"C1": "d", "C2": 99})
df1.show()
df2.show()
# DataFrame.filter
# 根據(jù)條件過濾
df.filter(df.age>50).show()
# +-----+---+-----+---+
# | name|age|score|sex|
# +-----+---+-----+---+
# |Peter| 55| 100| M|
# | Mei| 54| 95| F|
# +-----+---+-----+---+
df.where(df.age==28).show()
# +-----+---+-----+---+
# | name|age|score|sex|
# +-----+---+-----+---+
# | Sam| 28| 88| M|
# |Flora| 28| 90| F|
# +-----+---+-----+---+
df.filter("age<18").show()
# +----+---+-----+----+
# |name|age|score| sex|
# +----+---+-----+----+
# | Run| 1| 60|null|
# +----+---+-----+----+
# DataFrame.join
# 這個不用多解釋了,直接上案例來看看具體的語法即可,DataFrame.join(other, on=None, how=None)
df1 = spark.createDataFrame(
[("a", 1), ("d", 1), ("b", 3), ("c", 4)], ["id", "num1"])
df2 = spark.createDataFrame([("a", 1), ("b", 3)], ["id", "num2"])
df1.join(df2, df1.id == df2.id, 'left').select(df1.id.alias("df1_id"),
df1.num1.alias("df1_num"),
df2.num2.alias("df2_num")
).sort(["df1_id"], ascending=False)\
.show()
# DataFrame.agg(*exprs)
# 聚合數(shù)據(jù),可以寫多個聚合方法,如果不寫groupBy的話就是對整個DF進行聚合
# DataFrame.alias
# 設(shè)置列或者DataFrame別名
# DataFrame.groupBy
# 根據(jù)某幾列進行聚合,如有多列用列表寫在一起,如 df.groupBy(["sex", "age"])
df.groupBy("sex").agg(F.min(df.age).alias("最小年齡"),
F.expr("avg(age)").alias("平均年齡"),
F.expr("collect_list(name)").alias("姓名集合")
).show()
# +----+--------+--------+------------+
# | sex|最小年齡|平均年齡| 姓名集合|
# +----+--------+--------+------------+
# | F| 28| 41.0|[Flora, Mei]|
# |null| 1| 1.0| [Run]|
# | M| 28| 41.5|[Sam, Peter]|
# +----+--------+--------+------------+
# DataFrame.foreach
# 對每一行進行函數(shù)方法的應(yīng)用
def f(person):
print(person.name)
df.foreach(f)
# Peter
# Run
# Sam
# Flora
# Mei
# DataFrame.replace
# 修改df里的某些值
df1 = df.na.replace({"M": "Male", "F": "Female"})
df1.show()
# DataFrame.union
# 相當(dāng)于SQL里的union all操作
df1 = spark.createDataFrame(
[("a", 1), ("d", 1), ("b", 3), ("c", 4)], ["id", "num"])
df2 = spark.createDataFrame([("a", 1), ("b", 3)], ["id", "num"])
df1.union(df2).show()
df1.unionAll(df2).show()
# 這里union沒有去重,不知道為啥,有知道的朋友麻煩解釋下,謝謝了。
# +---+---+
# | id|num|
# +---+---+
# | a| 1|
# | d| 1|
# | b| 3|
# | c| 4|
# | a| 1|
# | b| 3|
# +---+---+
# DataFrame.unionByName
# 根據(jù)列名來進行合并數(shù)據(jù)集
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()
# +----+----+----+
# |col0|col1|col2|
# +----+----+----+
# | 1| 2| 3|
# | 6| 4| 5|
# +----+----+----+
3、DataFrame的列操作APIs
這里主要針對的是列進行操作,比如說重命名、排序、空值判斷、類型判斷等,這里就不展開寫demo了,看看語法應(yīng)該大家都懂了。
Column.alias(*alias, **kwargs) # 重命名列名
Column.asc() # 按照列進行升序排序
Column.desc() # 按照列進行降序排序
Column.astype(dataType) # 類型轉(zhuǎn)換
Column.cast(dataType) # 強制轉(zhuǎn)換類型
Column.between(lowerBound, upperBound) # 返回布爾值,是否在指定區(qū)間范圍內(nèi)
Column.contains(other) # 是否包含某個關(guān)鍵詞
Column.endswith(other) # 以什么結(jié)束的值,如 df.filter(df.name.endswith('ice')).collect()
Column.isNotNull() # 篩選非空的行
Column.isNull()
Column.isin(*cols) # 返回包含某些值的行 df[df.name.isin("Bob", "Mike")].collect()
Column.like(other) # 返回含有關(guān)鍵詞的行
Column.when(condition, value) # 給True的賦值
Column.otherwise(value) # 與when搭配使用,df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()
Column.rlike(other) # 可以使用正則的匹配 df.filter(df.name.rlike('ice$')).collect()
Column.startswith(other) # df.filter(df.name.startswith('Al')).collect()
Column.substr(startPos, length) # df.select(df.name.substr(1, 3).alias("col")).collect()
4、DataFrame的一些思路變換操作APIs
# DataFrame.createOrReplaceGlobalTempView
# DataFrame.dropGlobalTempView
# 創(chuàng)建全局的試圖,注冊后可以使用sql語句來進行操作,生命周期取決于Spark application本身
df.createOrReplaceGlobalTempView("people")
spark.sql("select * from global_temp.people where sex = 'M' ").show()
# +-----+---+-----+---+
# | name|age|score|sex|
# +-----+---+-----+---+
# | Sam| 28| 88| M|
# |Peter| 55| 100| M|
# +-----+---+-----+---+
# DataFrame.createOrReplaceTempView
# DataFrame.dropTempView
# 創(chuàng)建本地臨時試圖,生命周期取決于用來創(chuàng)建此數(shù)據(jù)集的SparkSession
df.createOrReplaceTempView("tmp_people")
spark.sql("select * from tmp_people where sex = 'F' ").show()
# +-----+---+-----+---+
# | name|age|score|sex|
# +-----+---+-----+---+
# |Flora| 28| 90| F|
# | Mei| 54| 95| F|
# +-----+---+-----+---+
# DataFrame.cache\DataFrame.persist
# 可以把一些數(shù)據(jù)放入緩存中,default storage level (MEMORY_AND_DISK).
df.cache()
df.persist()
df.unpersist()
# DataFrame.crossJoin
# 返回兩個DataFrame的笛卡爾積關(guān)聯(lián)的DataFrame
df1 = df.select("name", "sex")
df2 = df.select("name", "sex")
df3 = df1.crossJoin(df2)
print("表1的記錄數(shù)", df1.count())
print("表2的記錄數(shù)", df2.count())
print("笛卡爾積后的記錄數(shù)", df3.count())
# 表1的記錄數(shù) 5
# 表2的記錄數(shù) 5
# 笛卡爾積后的記錄數(shù) 25
# DataFrame.toPandas
# 把SparkDataFrame轉(zhuǎn)為 Pandas的DataFrame
df.toPandas()
# DataFrame.rdd
# 把SparkDataFrame轉(zhuǎn)為rdd,這樣子可以用rdd的語法來操作數(shù)據(jù)
df.rdd
5、DataFrame的一些統(tǒng)計操作APIs
# DataFrame.cov
# 計算指定兩列的樣本協(xié)方差
df.cov("age", "score")
# 324.59999999999997
# DataFrame.corr
# 計算指定兩列的相關(guān)系數(shù),DataFrame.corr(col1, col2, method=None),目前method只支持Pearson相關(guān)系數(shù)
df.corr("age", "score", method="pearson")
# 0.9319004030498815
# DataFrame.cube
# 創(chuàng)建多維度聚合的結(jié)果,通常用于分析數(shù)據(jù),比如我們指定兩個列進行聚合,比如name和age,那么這個函數(shù)返回的聚合結(jié)果會
# groupby("name", "age")
# groupby("name")
# groupby("age")
# groupby(all)
# 四個聚合結(jié)果的union all 的結(jié)果
df1 = df.filter(df.name != "Run")
print(df1.show())
df1.cube("name", "sex").count().show()
# +-----+---+-----+---+
# | name|age|score|sex|
# +-----+---+-----+---+
# | Sam| 28| 88| M|
# |Flora| 28| 90| F|
# |Peter| 55| 100| M|
# | Mei| 54| 95| F|
# +-----+---+-----+---+
# cube 聚合之后的結(jié)果
# +-----+----+-----+
# | name| sex|count|
# +-----+----+-----+
# | null| F| 2|
# | null|null| 4|
# |Flora|null| 1|
# |Peter|null| 1|
# | null| M| 2|
# |Peter| M| 1|
# | Sam| M| 1|
# | Sam|null| 1|
# | Mei| F| 1|
# | Mei|null| 1|
# |Flora| F| 1|
# +-----+----+-----+
嘻嘻,恭喜你讀完啦,獎勵你一首歌,一起加油。
