Python大數(shù)據(jù)處理利器,PySpark的入門實(shí)戰(zhàn)
PySpark極速入門
一:Pyspark簡介與安裝
什么是Pyspark?
PySpark是Spark的Python語言接口,通過它,可以使用Python API編寫Spark應(yīng)用程序,目前支持絕大多數(shù)Spark功能。目前Spark官方在其支持的所有語言中,將Python置于首位。

如何安裝?
在終端輸入
pip intsall pyspark

或者使用pycharm,在GUI界面安裝

二:編程實(shí)踐
加載、轉(zhuǎn)換數(shù)據(jù)
# 導(dǎo)入pyspark
# 導(dǎo)入pandas, 稍后與pyspark中的數(shù)據(jù)結(jié)構(gòu)做對比
import pyspark
import pandas as pd
在編寫spark程序前,我們要?jiǎng)?chuàng)建一個(gè)SparkSession對象
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark極速入門").getOrCreate()
可以看到會話的一些信息:使用的Spark版本、運(yùn)行模式、應(yīng)用程序名字
演示環(huán)境用的是local本地模式, * 代表的是使用全部線程 如果想用集群模式的話,可以去查看集群搭建的相關(guān)教程 屆時(shí)pyspark程序作為spark的客戶端,設(shè)置連接集群,就是真正的分布式計(jì)算了 目前只是本地模式,用多線程去模擬分布式計(jì)算。
spark

看看我們將用到的test1數(shù)據(jù)吧

使用read方法,用option設(shè)置是否讀取csv的頭,再指定路徑就可以讀取數(shù)據(jù)了
df_spark = spark.read.option("header", "true").csv("./data/test1.csv")
看看是什么類型
type(df_spark)
pyspark.sql.dataframe.DataFrame再看看用pandas讀取是什么類型
type(pd.read_csv("./data/test1.csv"))
pandas.core.frame.DataFrame可以發(fā)現(xiàn)Spark讀取這種結(jié)構(gòu)化數(shù)據(jù)時(shí),用的也是和pandas類似的dataframe結(jié)構(gòu) 這也是Spark應(yīng)用最廣泛的數(shù)據(jù)結(jié)構(gòu)
使用show方法打印數(shù)據(jù)
df_spark.show()
+---------+---+----------+------+
| Name|age|Experience|Salary|
+---------+---+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
+---------+---+----------+------+使用printSchema方法打印元數(shù)據(jù)信息,發(fā)現(xiàn)明明是數(shù)值類型的,它卻讀取為了字符串類型
df_spark.printSchema()
root
|-- Name: string (nullable = true)
|-- age: string (nullable = true)
|-- Experience: string (nullable = true)
|-- Salary: string (nullable = true)在讀取時(shí),加上類型推斷,發(fā)現(xiàn)此時(shí)已經(jīng)能正確讀取了
df_spark = spark.read.option("header", "true").csv("./data/test1.csv",inferSchema=True)
df_spark.printSchema()
root
|-- Name: string (nullable = true)
|-- age: integer (nullable = true)
|-- Experience: integer (nullable = true)
|-- Salary: integer (nullable = true)選擇某些列, 可以發(fā)現(xiàn)不管選多列還是選單列,返回的都是dataframe 返回的也同樣可以printSchema、show等dataframe使用的方法,做到了結(jié)構(gòu)的統(tǒng)一
df_spark.select(["Name", "age"])
DataFrame[Name: string, age: int]df_spark.select("Name")
DataFrame[Name: string]df_spark.select(["Name", "age", "Salary"]).printSchema()
root
|-- Name: string (nullable = true)
|-- age: integer (nullable = true)
|-- Salary: integer (nullable = true)不用select,而用[]直接選取,就有點(diǎn)類似與pandas的series了
df_spark["Name"]
Column<'Name'>column就不能直接show了
df_spark["age"].show()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Input In [15], in <cell line: 1>()
----> 1 df_spark["age"].show()
TypeError: 'Column' object is not callable用describe方法可以對dataframe做一些簡單的統(tǒng)計(jì)
df_spark.describe().show()
+-------+------+------------------+-----------------+------------------+
|summary| Name| age| Experience| Salary|
+-------+------+------------------+-----------------+------------------+
| count| 6| 6| 6| 6|
| mean| null|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev| null| 4.179314138308661|3.559026084010437| 5354.126134736337|
| min|Harsha| 21| 1| 15000|
| max| Sunny| 31| 10| 30000|
+-------+------+------------------+-----------------+------------------+用withColumn方法給dataframe加上一列
df_spark = df_spark.withColumn("Experience After 3 year", df_spark["Experience"] + 3)
df_spark.show()
+---------+---+----------+------+-----------------------+
| Name|age|Experience|Salary|Experience After 3 year|
+---------+---+----------+------+-----------------------+
| Krish| 31| 10| 30000| 13|
|Sudhanshu| 30| 8| 25000| 11|
| Sunny| 29| 4| 20000| 7|
| Paul| 24| 3| 20000| 6|
| Harsha| 21| 1| 15000| 4|
| Shubham| 23| 2| 18000| 5|
+---------+---+----------+------+-----------------------+用drop方法刪除列
df_spark = df_spark.drop("Experience After 3 year")
df_spark.show()
+---------+---+----------+------+
| Name|age|Experience|Salary|
+---------+---+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
+---------+---+----------+------+用withColumnRename方法重命名列
df_spark.withColumnRenamed("Name", "New Name").show()
+---------+---+----------+------+
| New Name|age|Experience|Salary|
+---------+---+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
+---------+---+----------+------+處理缺失值
看看接下來要帶缺失值的test2數(shù)據(jù)吧

df_spark = spark.read.csv("./data/test2.csv", header=True, inferSchema=True)
df_spark.show()
+---------+----+----------+------+
| Name| age|Experience|Salary|
+---------+----+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
| Mahesh|null| null| 40000|
| null| 34| 10| 38000|
| null| 36| null| null|
+---------+----+----------+------+用na.drop刪除缺失值 how參數(shù)設(shè)置策略,any意思是只要一行里有缺失值,那就刪了 any也是how的默認(rèn)參數(shù)
df_spark.na.drop(how="any").show()
+---------+---+----------+------+
| Name|age|Experience|Salary|
+---------+---+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
+---------+---+----------+------+可以通過thresh參數(shù)設(shè)置閾值,代表超過一行中缺失值的數(shù)量超過這個(gè)值,才會被刪除
df_spark.na.drop(how="any", thresh=2).show()
+---------+----+----------+------+
| Name| age|Experience|Salary|
+---------+----+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
| Mahesh|null| null| 40000|
| null| 34| 10| 38000|
+---------+----+----------+------+也可以用subset參數(shù)設(shè)置關(guān)注的列 下面代碼意思是,在Experience列中,只要有缺失值就刪掉
df_spark.na.drop(how="any", subset=["Experience"]).show()
+---------+---+----------+------+
| Name|age|Experience|Salary|
+---------+---+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
| null| 34| 10| 38000|
+---------+---+----------+------+用fillna填充缺失值, 可以用字典對各列的填充值進(jìn)行設(shè)置
df_spark.fillna({'Name': 'unknown', 'age': 18, 'Experience': 0, 'Salary': 0}).show()
+---------+---+----------+------+
| Name|age|Experience|Salary|
+---------+---+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
| Mahesh| 18| 0| 40000|
| unknown| 34| 10| 38000|
| unknown| 36| 0| 0|
+---------+---+----------+------+還可以調(diào)用機(jī)器學(xué)習(xí)模塊的相關(guān)方法, 通過設(shè)置策略,可以用平均數(shù)、眾數(shù)等方式填充
from pyspark.ml.feature import Imputer
imputer = Imputer(
inputCols = ['age', 'Experience', 'Salary'],
outputCols = [f"{c}_imputed" for c in ['age', 'Experience', 'Salary']]
).setStrategy("mean")
imputer.fit(df_spark).transform(df_spark).show()
+---------+----+----------+------+-----------+------------------+--------------+
| Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
| Krish| 31| 10| 30000| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000| 30| 8| 25000|
| Sunny| 29| 4| 20000| 29| 4| 20000|
| Paul| 24| 3| 20000| 24| 3| 20000|
| Harsha| 21| 1| 15000| 21| 1| 15000|
| Shubham| 23| 2| 18000| 23| 2| 18000|
| Mahesh|null| null| 40000| 28| 5| 40000|
| null| 34| 10| 38000| 34| 10| 38000|
| null| 36| null| null| 36| 5| 25750|
+---------+----+----------+------+-----------+------------------+--------------+過濾操作
還是切換到test1數(shù)據(jù)
df_spark = spark.read.csv("./data/test1.csv", header=True, inferSchema=True)
df_spark.show()
+---------+---+----------+------+
| Name|age|Experience|Salary|
+---------+---+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
+---------+---+----------+------+可以使用filter方法對數(shù)據(jù)進(jìn)行過濾操作,類似于SQL中的where 可以使用字符串的方式,也可以利用column方式去傳遞條件
df_spark.filter("Salary <= 20000").show()
+-------+---+----------+------+
| Name|age|Experience|Salary|
+-------+---+----------+------+
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
|Shubham| 23| 2| 18000|
+-------+---+----------+------+df_spark.filter(df_spark["Salary"]<=20000).show()
+-------+---+----------+------+
| Name|age|Experience|Salary|
+-------+---+----------+------+
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
|Shubham| 23| 2| 18000|
+-------+---+----------+------+如果是字符串,用 and 表示同時(shí)滿足多個(gè)條件 如果是用column,用( & ) 連接多個(gè)條件
df_spark.filter("Salary <= 20000 and age <= 24").show()
+-------+---+----------+------+
| Name|age|Experience|Salary|
+-------+---+----------+------+
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
|Shubham| 23| 2| 18000|
+-------+---+----------+------+df_spark.filter(
(df_spark["Salary"]<=20000)
& (df_spark["age"]<=24)
).show()
+-------+---+----------+------+
| Name|age|Experience|Salary|
+-------+---+----------+------+
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
|Shubham| 23| 2| 18000|
+-------+---+----------+------+column中,用|表示或, ~表示取反
df_spark.filter(
(df_spark["Salary"]<=20000)
| (df_spark["age"]<=24)
).show()
+-------+---+----------+------+
| Name|age|Experience|Salary|
+-------+---+----------+------+
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
|Shubham| 23| 2| 18000|
+-------+---+----------+------+df_spark.filter(
(df_spark["Salary"]<=20000)
| ~(df_spark["age"]<=24)
).show()
+---------+---+----------+------+
| Name|age|Experience|Salary|
+---------+---+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
+---------+---+----------+------+分組聚合
換一個(gè)數(shù)據(jù)集test3

df_spark = spark.read.csv("./data/test3.csv", header=True, inferSchema=True)
df_spark.show()
+---------+------------+------+
| Name| Departments|salary|
+---------+------------+------+
| Krish|Data Science| 10000|
| Krish| IOT| 5000|
| Mahesh| Big Data| 4000|
| Krish| Big Data| 4000|
| Mahesh|Data Science| 3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu| IOT| 10000|
|Sudhanshu| Big Data| 5000|
| Sunny|Data Science| 10000|
| Sunny| Big Data| 2000|
+---------+------------+------+使用groupby方法對dataframe某些列進(jìn)行分組
df_spark.groupBy("Name")
<pyspark.sql.group.GroupedData at 0x227454d4be0>可以看到分組的結(jié)果是GroupedData對象,它不能使用show等方法打印 GroupedData對象需要進(jìn)行聚合操作,才能重新轉(zhuǎn)換為dataframe 聚合函數(shù)有sum、count、avg、max、min等
df_spark.groupBy("Departments").sum().show()
+------------+-----------+
| Departments|sum(salary)|
+------------+-----------+
| IOT| 15000|
| Big Data| 15000|
|Data Science| 43000|
+------------+-----------+三:總結(jié)
Pandas的dataframe與PySpark的dataframe有許多相似之處,熟悉Pandas的同學(xué)可以很快適應(yīng)它的API。目前可以粗淺地把PySpark理解為”分布式的Pandas“,不過,PySpark還有分布式機(jī)器學(xué)習(xí)的功能——Spark MLlib(可以理解為分布式的Sklearn、TensorFlow等),后續(xù)會給大家介紹。在集群中,它的dataframe可以分布在不同的機(jī)器上,以此處理海量數(shù)據(jù)。有興趣的小伙伴可以通過虛擬機(jī)搭建一個(gè)Spark集群,進(jìn)一步學(xué)習(xí)Spark。
Apache Spark? - 用于大規(guī)模數(shù)據(jù)分析的統(tǒng)一引擎
每天鎖定螞蟻老師抖音直播間,給你介紹Python副業(yè)的玩法:

