【Python】對(duì)比Pandas,學(xué)習(xí)PySpark大數(shù)據(jù)處理
在這篇文章中,我們將對(duì)比用于基本數(shù)據(jù)操作任務(wù)的 pandas 代碼片段和它們?cè)?PySpark 中的對(duì)應(yīng)功能的代碼片段。利用 pandas 數(shù)據(jù)操作技能來學(xué)習(xí) PySpark 。
本文的前提是,假設(shè)讀者在 Python 中熟練使用 pandas 操作數(shù)據(jù)。
數(shù)據(jù)集
從導(dǎo)包開始。在 PySpark 中,需要?jiǎng)?chuàng)建一個(gè) Spark 會(huì)話 SparkSession。創(chuàng)建 Spark 會(huì)話后,可以從以下位置訪問 Spark Web 用戶界面 (Web UI):http://localhost:4040/。下面定義的應(yīng)用程序名稱appName為“PyDataStudio”,將顯示為 Web UI 右上角的應(yīng)用程序名稱。本文將不會(huì)使用 Web UI,但是,如果您有興趣了解更多信息,請(qǐng)查看官方文檔[1]。
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('PyDataStudio').getOrCreate()

我們將在這篇文章中使用企鵝數(shù)據(jù)集[2]。使用下面的腳本,我們將penguins.csv數(shù)據(jù)的修改版本保存在工作目錄中。
from seaborn import load_dataset
(load_dataset('penguins')
.drop(columns=['bill_length_mm', 'bill_depth_mm'])
.rename(columns={'flipper_length_mm': 'flipper',
'body_mass_g': 'mass'})
.to_csv('penguins.csv', index=False))

看一下兩個(gè)庫(kù)之間的語(yǔ)法比較。為了簡(jiǎn)潔,我們僅保留顯示 PySpark 輸出。
基本使用
兩個(gè)庫(kù)的數(shù)據(jù)對(duì)象都稱為 DataFrame:pandas DataFrame vs PySpark DataFrame。
導(dǎo)入數(shù)據(jù)并檢查其形狀
# pandas
df = pd.read_csv('penguins.csv')
df.shape
# PySpark
df = spark.read.csv('penguins.csv', header=True, inferSchema=True)
df.count(), len(df.columns)
(344, 5)使用 PySpark 導(dǎo)入數(shù)據(jù)時(shí),指定header=True數(shù)據(jù)類型用第一行作標(biāo)題,并設(shè)置inferSchema=True。可以嘗試不使用這些選項(xiàng)導(dǎo)入并檢查 DataFrame 及其數(shù)據(jù)類型(類似于 pandas 使用df.dtype 檢查 PySpark DataFrames 的數(shù)據(jù)類型)。
與 pandas DataFrame 不同,PySpark DataFrame 沒有像.shape可以直接查看數(shù)據(jù)的形狀。所以要得到數(shù)據(jù)形狀,我們分別求行數(shù)和列數(shù)。
檢查有關(guān)數(shù)據(jù)的高級(jí)信息
# pandas
df.info()
# PySpark
df.printSchema()
root
|-- species: string (nullable = true)
|-- island: string (nullable = true)
|-- flipper: double (nullable = true)
|-- mass: double (nullable = true)
|-- sex: string (nullable = true)雖然此方法不會(huì)提供與df.info()相同的輸出,但它是最接近的內(nèi)置方法之一。
查看數(shù)據(jù)的前幾行
# pandas
df.head()
# PySpark
df.show(5)
+-------+---------+-------+------+------+
|species| island|flipper| mass| sex|
+-------+---------+-------+------+------+
| Adelie|Torgersen| 181.0|3750.0| Male|
| Adelie|Torgersen| 186.0|3800.0|Female|
| Adelie|Torgersen| 195.0|3250.0|Female|
| Adelie|Torgersen| null| null| null|
| Adelie|Torgersen| 193.0|3450.0|Female|
+-------+---------+-------+------+------+
only showing top 5 rows默認(rèn)情況下,df.show()默認(rèn)顯示前 20 行。PySpark DataFrame 實(shí)際上有一個(gè)名為.head()的方法,可以查看前幾行的數(shù)據(jù),并以row對(duì)象形式打印出。運(yùn)行df.head(5)提供如下輸出:
df.head(5)

.show()方法的輸出更簡(jiǎn)潔,因此在查看數(shù)據(jù)集的top行時(shí)用.show()。
選擇列
# pandas
df[['island', 'mass']].head(3)
# PySpark
df[['island', 'mass']].show(3)
+---------+------+
| island| mass|
+---------+------+
|Torgersen|3750.0|
|Torgersen|3800.0|
|Torgersen|3250.0|
+---------+------+
only showing top 3 rows雖然可以在這里使用的是類似于 pandas 的語(yǔ)法,而在 PySpark 中默認(rèn)使用如下代碼片段所示的方法選擇列:
df.select('island', 'mass').show(3)
df.select(['island', 'mass']).show(3)
過濾
根據(jù)條件過濾數(shù)據(jù)
# pandas
df[df['species']=='Gentoo'].head()
# PySpark
df[df['species']=='Gentoo'].show(5)
+-------+------+-------+------+------+
|species|island|flipper| mass| sex|
+-------+------+-------+------+------+
| Gentoo|Biscoe| 211.0|4500.0|Female|
| Gentoo|Biscoe| 230.0|5700.0| Male|
| Gentoo|Biscoe| 210.0|4450.0|Female|
| Gentoo|Biscoe| 218.0|5700.0| Male|
| Gentoo|Biscoe| 215.0|5400.0| Male|
+-------+------+-------+------+------+
only showing top 5 rows兩個(gè)庫(kù)之間的語(yǔ)法幾乎相同。要獲得相同的輸出,還可以使用:
df.filter(df['species']=='Gentoo').show(5) df.filter("species=='Gentoo'").show(5)
下面顯示了一些常見的過濾器比較:
# pandas
df[df['species'].isin(['Chinstrap', 'Gentoo'])].head()
df[df['species'].str.match('G.')] .head()
df[df['flipper'].between(225,229)].head()
df[df['mass'].isnull()].head()1b df.loc[df['species']!='Gentoo'].head()
df[~df['species'].isin(['Chinstrap', 'Gentoo'])].head()
df[-df['species'].str.match('G.')].head()
df[~df['flipper'].between(225,229)].head()
df[df['mass'].notnull()].head()6 df[(df['mass']<3400) & (df['sex']=='Male')].head()
df[(df['mass']<3400) | (df['sex']=='Male')].head()
# PySpark
df[df['species'].isin(['Chinstrap', 'Gentoo'])].show(5)
df[df['species'].rlike('G.')].show(5)
df[df['flipper'].between(225,229)].show(5)
df[df['mass'].isNull()].show(5)1b df[df['species']!='Gentoo'].show(5)
df[~df['species'].isin(['Chinstrap', 'Gentoo'])].show(5)
df[~df['species'].rlike('G.')].show(5)
df[~df['flipper'].between(225,229)].show(5)
df[df['mass'].isNotNull()].show(5)
df[(df['mass']<3400) & (df['sex']=='Male')].show(5)
df[(df['mass']<3400) |(df[ 'sex']=='Male')].show(5)
雖然~和-在 pandas 中都可以作為否定,但在 PySpark 中僅有~能作為有效的否定。
排序
對(duì)數(shù)據(jù)進(jìn)行排序并檢查mass最小的 5 行:
# pandas
df.nsmallest(5, 'mass')
# PySpark
df[df['mass'].isNotNull()].orderBy('mass').show(5)
+---------+------+-------+------+------+
| species|island|flipper| mass| sex|
+---------+------+-------+------+------+
|Chinstrap| Dream| 192.0|2700.0|Female|
| Adelie|Biscoe| 184.0|2850.0|Female|
| Adelie|Biscoe| 181.0|2850.0|Female|
| Adelie|Biscoe| 187.0|2900.0|Female|
| Adelie| Dream| 178.0|2900.0|Female|
+---------+------+-------+------+------+
only showing top 5 rowsPandas的.nsmallest()和.nlargest()方法會(huì)自動(dòng)排除缺失值。而 PySpark 沒有等效的方法。為了獲得相同的輸出,首先過濾掉缺失mass的行,然后對(duì)數(shù)據(jù)進(jìn)行排序并查看前 5 行。如果沒有刪除數(shù)據(jù),可以簡(jiǎn)寫為:
df.orderBy(‘mass’).show(5).sort()
代替的另一種排序方式.orderBy():
# pandas
df.nlargest(5, 'mass')
# PySpark
df.sort('mass', ascending=False).show(5)
+-------+------+-------+------+----+
|species|island|flipper| mass| sex|
+-------+------+-------+------+----+
| Gentoo|Biscoe| 221.0|6300.0|Male|
| Gentoo|Biscoe| 230.0|6050.0|Male|
| Gentoo|Biscoe| 220.0|6000.0|Male|
| Gentoo|Biscoe| 222.0|6000.0|Male|
| Gentoo|Biscoe| 229.0|5950.0|Male|
+-------+------+-------+------+----+
only showing top 5 rows這些語(yǔ)法的變體也是等效的:
df.sort(df['mass'].desc()).show(5)
df.orderBy('mass', ascending=False).show(5)
df.orderBy(df['mass'].desc( )).show(5)
按多列排序,如下所示:
# pandas
df.sort_values(['mass', 'flipper'], ascending=False).head()
# PySpark
df.orderBy(['mass', 'flipper'], ascending=False).show(5)
+-------+------+-------+------+----+
|species|island|flipper| mass| sex|
+-------+------+-------+------+----+
| Gentoo|Biscoe| 221.0|6300.0|Male|
| Gentoo|Biscoe| 230.0|6050.0|Male|
| Gentoo|Biscoe| 222.0|6000.0|Male|
| Gentoo|Biscoe| 220.0|6000.0|Male|
| Gentoo|Biscoe| 229.0|5950.0|Male|
+-------+------+-------+------+----+
only showing top 5 rows在 PySpark 中,可以在將所有列分別傳參數(shù),而不需要寫成列表的形式
df.orderBy('mass', 'flipper', ascending=False).show(5)
要按多列但按不同方向排序:
# pandas
df.sort_values(['mass', 'flipper'], ascending=[True, False]).head()
# PySpark
df[df['mass'].isNotNull()]\
.sort('mass', 'flipper', ascending=[True, False]).show(5)
+---------+---------+-------+------+------+
| species| island|flipper| mass| sex|
+---------+---------+-------+------+------+
|Chinstrap| Dream| 192.0|2700.0|Female|
| Adelie| Biscoe| 184.0|2850.0|Female|
| Adelie| Biscoe| 181.0|2850.0|Female|
| Adelie|Torgersen| 188.0|2900.0|Female|
| Adelie| Biscoe| 187.0|2900.0|Female|
+---------+---------+-------+------+------+
only showing top 5 rowspyspark的另一種寫法
df[df['mass'].isNotNull()]\
.orderBy(df['mass'].asc(), df['flipper'].desc()).show(5)
聚合
現(xiàn)在,看幾個(gè)聚合數(shù)據(jù)的示例。
簡(jiǎn)單的聚合
二者方法類似:
# pandas
df.agg({'flipper': 'mean'})
# PySpark
df.agg({'flipper': 'mean'}).show()
+------------------+
| avg(flipper)|
+------------------+
|200.91520467836258|
+------------------+多個(gè)聚合
需要采用不同的方法:
# pandas
df.agg({'flipper': ['min', 'max']})
# PySpark
from pyspark.sql import functions as F
df.agg(F.min('flipper'), F.max('flipper')).show()
+------------+------------+
|min(flipper)|max(flipper)|
+------------+------------+
| 172.0| 231.0|
+------------+------------+獲取唯一值
# pandas
df['species'].unique()
# PySpark
df.select('species').distinct().show()
+---------+
| species|
+---------+
| Gentoo|
| Adelie|
|Chinstrap|
+---------+要在列中獲取多個(gè)不同的值:
# pandas
df['species'].nunique()
# PySpark
df.select('species').distinct().count()
3按組聚合
到目前為止,PySpark 使用 camelCase 駝峰命名法來表示方法和函數(shù)。.groupBy()這也是如此。這是一個(gè)簡(jiǎn)單的按聚合分組的示例:
# pandas
df.groupby('species')['mass'].mean()
# PySpark
df.groupBy('species').agg({'mass': 'mean'}).show()
+---------+------------------+
| species| avg(mass)|
+---------+------------------+
| Gentoo| 5076.016260162602|
| Adelie| 3700.662251655629|
|Chinstrap|3733.0882352941176|
+---------+------------------+這是一個(gè)聚合多個(gè)選定列的示例:
# pandas
df.groupby('species').agg({'flipper': 'sum',
'mass': 'mean'})
# PySpark
df.groupBy('species').agg({'flipper': 'sum',
'mass': 'mean'}).show()
+---------+------------+--------------+
| species|sum(flipper)| avg(mass)|
+---------+------------+--------------+
| Gentoo| 26714.0| 5076.01626016|
| Adelie| 28683.0| 3700.66225165|
|Chinstrap| 13316.0|3733.088235294|
+---------+------------+--------------+如果我們不指定列,它將顯示所有數(shù)字列的統(tǒng)計(jì)信息:
# pandas
df.groupby('species').mean()
# PySpark
df.groupBy('species').mean().show()
+---------+--------------+--------------+
| species| avg(flipper)| avg(mass)|
+---------+--------------+--------------+
| Gentoo| 217.186991869| 5076.01626016|
| Adelie|189.9536423841| 3700.66225165|
|Chinstrap| 195.823529411|3733.088235294|
+---------+--------------+--------------+也可以將.mean()替換為.avg(),即可以使用df.groupBy(‘species’).avg().show()。
以上就是本文的所有內(nèi)容,希望能夠幫到你對(duì) PySpark 語(yǔ)法有所了解。我們注意到,在基本任務(wù)方面,這兩個(gè)庫(kù)之間有很多相似之處。這使得在熟悉 pandas 工作知識(shí)的人更容易開始使用 PySpark,在處理小數(shù)據(jù)分析與挖掘后,遇到大數(shù)據(jù)分析與挖掘時(shí),也能夠輕松面對(duì)。
參考資料
官方文檔: https://spark.apache.org/docs/latest/web-ui.html
[2]企鵝數(shù)據(jù)集: https://github.com/mwaskom/seaborn-data/blob/master/penguins.csv

往期精彩回顧
適合初學(xué)者入門人工智能的路線及資料下載 (圖文+視頻)機(jī)器學(xué)習(xí)入門系列下載 機(jī)器學(xué)習(xí)及深度學(xué)習(xí)筆記等資料打印 《統(tǒng)計(jì)學(xué)習(xí)方法》的代碼復(fù)現(xiàn)專輯 機(jī)器學(xué)習(xí)交流qq群955171419,加入微信群請(qǐng)掃碼
