<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Python大數(shù)據(jù)處理利器,PySpark的入門實(shí)戰(zhàn)

          共 12950字,需瀏覽 26分鐘

           ·

          2022-06-14 13:13

          PySpark極速入門

          一:Pyspark簡介與安裝

          什么是Pyspark?

          PySpark是Spark的Python語言接口,通過它,可以使用Python API編寫Spark應(yīng)用程序,目前支持絕大多數(shù)Spark功能。目前Spark官方在其支持的所有語言中,將Python置于首位。

          如何安裝?

          在終端輸入

          pip intsall pyspark

          或者使用pycharm,在GUI界面安裝

          二:編程實(shí)踐

          加載、轉(zhuǎn)換數(shù)據(jù)

          # 導(dǎo)入pyspark
          # 導(dǎo)入pandas, 稍后與pyspark中的數(shù)據(jù)結(jié)構(gòu)做對比
          import pyspark
          import pandas as pd

          在編寫spark程序前,我們要?jiǎng)?chuàng)建一個(gè)SparkSession對象

          from pyspark.sql import SparkSession
          spark = SparkSession.builder.appName("Spark極速入門").getOrCreate()

          可以看到會話的一些信息:使用的Spark版本、運(yùn)行模式、應(yīng)用程序名字

          演示環(huán)境用的是local本地模式, * 代表的是使用全部線程 如果想用集群模式的話,可以去查看集群搭建的相關(guān)教程 屆時(shí)pyspark程序作為spark的客戶端,設(shè)置連接集群,就是真正的分布式計(jì)算了 目前只是本地模式,用多線程去模擬分布式計(jì)算。

          spark

          看看我們將用到的test1數(shù)據(jù)吧

          使用read方法,用option設(shè)置是否讀取csv的頭,再指定路徑就可以讀取數(shù)據(jù)了

          df_spark = spark.read.option("header""true").csv("./data/test1.csv")

          看看是什么類型

          type(df_spark)
          pyspark.sql.dataframe.DataFrame

          再看看用pandas讀取是什么類型

          type(pd.read_csv("./data/test1.csv"))
          pandas.core.frame.DataFrame

          可以發(fā)現(xiàn)Spark讀取這種結(jié)構(gòu)化數(shù)據(jù)時(shí),用的也是和pandas類似的dataframe結(jié)構(gòu) 這也是Spark應(yīng)用最廣泛的數(shù)據(jù)結(jié)構(gòu)

          使用show方法打印數(shù)據(jù)

          df_spark.show()
          +---------+---+----------+------+
          | Name|age|Experience|Salary|
          +---------+---+----------+------+
          | Krish| 31| 10| 30000|
          |Sudhanshu| 30| 8| 25000|
          | Sunny| 29| 4| 20000|
          | Paul| 24| 3| 20000|
          | Harsha| 21| 1| 15000|
          | Shubham| 23| 2| 18000|
          +---------+---+----------+------+


          使用printSchema方法打印元數(shù)據(jù)信息,發(fā)現(xiàn)明明是數(shù)值類型的,它卻讀取為了字符串類型

          df_spark.printSchema()
          root
          |-- Name: string (nullable = true)
          |-- age: string (nullable = true)
          |-- Experience: string (nullable = true)
          |-- Salary: string (nullable = true)


          在讀取時(shí),加上類型推斷,發(fā)現(xiàn)此時(shí)已經(jīng)能正確讀取了

          df_spark = spark.read.option("header""true").csv("./data/test1.csv",inferSchema=True)
          df_spark.printSchema()
          root
          |-- Name: string (nullable = true)
          |-- age: integer (nullable = true)
          |-- Experience: integer (nullable = true)
          |-- Salary: integer (nullable = true)


          選擇某些列, 可以發(fā)現(xiàn)不管選多列還是選單列,返回的都是dataframe 返回的也同樣可以printSchema、show等dataframe使用的方法,做到了結(jié)構(gòu)的統(tǒng)一

          df_spark.select(["Name""age"])
          DataFrame[Name: string, age: int]
          df_spark.select("Name")
          DataFrame[Name: string]
          df_spark.select(["Name""age""Salary"]).printSchema()
          root
          |-- Name: string (nullable = true)
          |-- age: integer (nullable = true)
          |-- Salary: integer (nullable = true)


          不用select,而用[]直接選取,就有點(diǎn)類似與pandas的series了

          df_spark["Name"]
          Column<'Name'>

          column就不能直接show了

          df_spark["age"].show()
          ---------------------------------------------------------------------------

          TypeError Traceback (most recent call last)

          Input In [15], in <cell line: 1>()
          ----> 1 df_spark["age"].show()


          TypeError: 'Column' object is not callable

          用describe方法可以對dataframe做一些簡單的統(tǒng)計(jì)

          df_spark.describe().show()
          +-------+------+------------------+-----------------+------------------+
          |summary| Name| age| Experience| Salary|
          +-------+------+------------------+-----------------+------------------+
          | count| 6| 6| 6| 6|
          | mean| null|26.333333333333332|4.666666666666667|21333.333333333332|
          | stddev| null| 4.179314138308661|3.559026084010437| 5354.126134736337|
          | min|Harsha| 21| 1| 15000|
          | max| Sunny| 31| 10| 30000|
          +-------+------+------------------+-----------------+------------------+


          用withColumn方法給dataframe加上一列

          df_spark = df_spark.withColumn("Experience After 3 year", df_spark["Experience"] + 3)
          df_spark.show()
          +---------+---+----------+------+-----------------------+
          | Name|age|Experience|Salary|Experience After 3 year|
          +---------+---+----------+------+-----------------------+
          | Krish| 31| 10| 30000| 13|
          |Sudhanshu| 30| 8| 25000| 11|
          | Sunny| 29| 4| 20000| 7|
          | Paul| 24| 3| 20000| 6|
          | Harsha| 21| 1| 15000| 4|
          | Shubham| 23| 2| 18000| 5|
          +---------+---+----------+------+-----------------------+


          用drop方法刪除列

          df_spark = df_spark.drop("Experience After 3 year")
          df_spark.show()
          +---------+---+----------+------+
          | Name|age|Experience|Salary|
          +---------+---+----------+------+
          | Krish| 31| 10| 30000|
          |Sudhanshu| 30| 8| 25000|
          | Sunny| 29| 4| 20000|
          | Paul| 24| 3| 20000|
          | Harsha| 21| 1| 15000|
          | Shubham| 23| 2| 18000|
          +---------+---+----------+------+


          用withColumnRename方法重命名列

          df_spark.withColumnRenamed("Name""New Name").show()
          +---------+---+----------+------+
          | New Name|age|Experience|Salary|
          +---------+---+----------+------+
          | Krish| 31| 10| 30000|
          |Sudhanshu| 30| 8| 25000|
          | Sunny| 29| 4| 20000|
          | Paul| 24| 3| 20000|
          | Harsha| 21| 1| 15000|
          | Shubham| 23| 2| 18000|
          +---------+---+----------+------+


          處理缺失值

          看看接下來要帶缺失值的test2數(shù)據(jù)吧



          CSeoe.png
          df_spark = spark.read.csv("./data/test2.csv", header=True, inferSchema=True)
          df_spark.show()
          +---------+----+----------+------+
          | Name| age|Experience|Salary|
          +---------+----+----------+------+
          | Krish| 31| 10| 30000|
          |Sudhanshu| 30| 8| 25000|
          | Sunny| 29| 4| 20000|
          | Paul| 24| 3| 20000|
          | Harsha| 21| 1| 15000|
          | Shubham| 23| 2| 18000|
          | Mahesh|null| null| 40000|
          | null| 34| 10| 38000|
          | null| 36| null| null|
          +---------+----+----------+------+


          用na.drop刪除缺失值 how參數(shù)設(shè)置策略,any意思是只要一行里有缺失值,那就刪了 any也是how的默認(rèn)參數(shù)

          df_spark.na.drop(how="any").show()
          +---------+---+----------+------+
          | Name|age|Experience|Salary|
          +---------+---+----------+------+
          | Krish| 31| 10| 30000|
          |Sudhanshu| 30| 8| 25000|
          | Sunny| 29| 4| 20000|
          | Paul| 24| 3| 20000|
          | Harsha| 21| 1| 15000|
          | Shubham| 23| 2| 18000|
          +---------+---+----------+------+


          可以通過thresh參數(shù)設(shè)置閾值,代表超過一行中缺失值的數(shù)量超過這個(gè)值,才會被刪除

          df_spark.na.drop(how="any", thresh=2).show()
          +---------+----+----------+------+
          | Name| age|Experience|Salary|
          +---------+----+----------+------+
          | Krish| 31| 10| 30000|
          |Sudhanshu| 30| 8| 25000|
          | Sunny| 29| 4| 20000|
          | Paul| 24| 3| 20000|
          | Harsha| 21| 1| 15000|
          | Shubham| 23| 2| 18000|
          | Mahesh|null| null| 40000|
          | null| 34| 10| 38000|
          +---------+----+----------+------+


          也可以用subset參數(shù)設(shè)置關(guān)注的列 下面代碼意思是,在Experience列中,只要有缺失值就刪掉

          df_spark.na.drop(how="any", subset=["Experience"]).show()
          +---------+---+----------+------+
          | Name|age|Experience|Salary|
          +---------+---+----------+------+
          | Krish| 31| 10| 30000|
          |Sudhanshu| 30| 8| 25000|
          | Sunny| 29| 4| 20000|
          | Paul| 24| 3| 20000|
          | Harsha| 21| 1| 15000|
          | Shubham| 23| 2| 18000|
          | null| 34| 10| 38000|
          +---------+---+----------+------+


          用fillna填充缺失值, 可以用字典對各列的填充值進(jìn)行設(shè)置

          df_spark.fillna({'Name''unknown''age'18'Experience'0'Salary'0}).show()
          +---------+---+----------+------+
          | Name|age|Experience|Salary|
          +---------+---+----------+------+
          | Krish| 31| 10| 30000|
          |Sudhanshu| 30| 8| 25000|
          | Sunny| 29| 4| 20000|
          | Paul| 24| 3| 20000|
          | Harsha| 21| 1| 15000|
          | Shubham| 23| 2| 18000|
          | Mahesh| 18| 0| 40000|
          | unknown| 34| 10| 38000|
          | unknown| 36| 0| 0|
          +---------+---+----------+------+


          還可以調(diào)用機(jī)器學(xué)習(xí)模塊的相關(guān)方法, 通過設(shè)置策略,可以用平均數(shù)、眾數(shù)等方式填充

          from pyspark.ml.feature import Imputer

          imputer = Imputer(
              inputCols = ['age''Experience''Salary'],
              outputCols = [f"{c}_imputed" for c in ['age''Experience''Salary']]
          ).setStrategy("mean")
          imputer.fit(df_spark).transform(df_spark).show()
          +---------+----+----------+------+-----------+------------------+--------------+
          | Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
          +---------+----+----------+------+-----------+------------------+--------------+
          | Krish| 31| 10| 30000| 31| 10| 30000|
          |Sudhanshu| 30| 8| 25000| 30| 8| 25000|
          | Sunny| 29| 4| 20000| 29| 4| 20000|
          | Paul| 24| 3| 20000| 24| 3| 20000|
          | Harsha| 21| 1| 15000| 21| 1| 15000|
          | Shubham| 23| 2| 18000| 23| 2| 18000|
          | Mahesh|null| null| 40000| 28| 5| 40000|
          | null| 34| 10| 38000| 34| 10| 38000|
          | null| 36| null| null| 36| 5| 25750|
          +---------+----+----------+------+-----------+------------------+--------------+


          過濾操作

          還是切換到test1數(shù)據(jù)

          df_spark = spark.read.csv("./data/test1.csv", header=True, inferSchema=True)
          df_spark.show()
          +---------+---+----------+------+
          | Name|age|Experience|Salary|
          +---------+---+----------+------+
          | Krish| 31| 10| 30000|
          |Sudhanshu| 30| 8| 25000|
          | Sunny| 29| 4| 20000|
          | Paul| 24| 3| 20000|
          | Harsha| 21| 1| 15000|
          | Shubham| 23| 2| 18000|
          +---------+---+----------+------+


          可以使用filter方法對數(shù)據(jù)進(jìn)行過濾操作,類似于SQL中的where 可以使用字符串的方式,也可以利用column方式去傳遞條件

          df_spark.filter("Salary <= 20000").show()
          +-------+---+----------+------+
          | Name|age|Experience|Salary|
          +-------+---+----------+------+
          | Sunny| 29| 4| 20000|
          | Paul| 24| 3| 20000|
          | Harsha| 21| 1| 15000|
          |Shubham| 23| 2| 18000|
          +-------+---+----------+------+


          df_spark.filter(df_spark["Salary"]<=20000).show()
          +-------+---+----------+------+
          | Name|age|Experience|Salary|
          +-------+---+----------+------+
          | Sunny| 29| 4| 20000|
          | Paul| 24| 3| 20000|
          | Harsha| 21| 1| 15000|
          |Shubham| 23| 2| 18000|
          +-------+---+----------+------+


          如果是字符串,用 and 表示同時(shí)滿足多個(gè)條件 如果是用column,用( & ) 連接多個(gè)條件

          df_spark.filter("Salary <= 20000 and age <= 24").show()
          +-------+---+----------+------+
          | Name|age|Experience|Salary|
          +-------+---+----------+------+
          | Paul| 24| 3| 20000|
          | Harsha| 21| 1| 15000|
          |Shubham| 23| 2| 18000|
          +-------+---+----------+------+


          df_spark.filter(
              (df_spark["Salary"]<=20000)
              & (df_spark["age"]<=24)
          ).show()
          +-------+---+----------+------+
          | Name|age|Experience|Salary|
          +-------+---+----------+------+
          | Paul| 24| 3| 20000|
          | Harsha| 21| 1| 15000|
          |Shubham| 23| 2| 18000|
          +-------+---+----------+------+


          column中,用|表示或, ~表示取反
          df_spark.filter(
              (df_spark["Salary"]<=20000)
              | (df_spark["age"]<=24)
          ).show()
          +-------+---+----------+------+
          | Name|age|Experience|Salary|
          +-------+---+----------+------+
          | Sunny| 29| 4| 20000|
          | Paul| 24| 3| 20000|
          | Harsha| 21| 1| 15000|
          |Shubham| 23| 2| 18000|
          +-------+---+----------+------+


          df_spark.filter(
              (df_spark["Salary"]<=20000)
              | ~(df_spark["age"]<=24)
          ).show()
          +---------+---+----------+------+
          | Name|age|Experience|Salary|
          +---------+---+----------+------+
          | Krish| 31| 10| 30000|
          |Sudhanshu| 30| 8| 25000|
          | Sunny| 29| 4| 20000|
          | Paul| 24| 3| 20000|
          | Harsha| 21| 1| 15000|
          | Shubham| 23| 2| 18000|
          +---------+---+----------+------+


          分組聚合

          換一個(gè)數(shù)據(jù)集test3

          df_spark = spark.read.csv("./data/test3.csv", header=True, inferSchema=True)
          df_spark.show()
          +---------+------------+------+
          | Name| Departments|salary|
          +---------+------------+------+
          | Krish|Data Science| 10000|
          | Krish| IOT| 5000|
          | Mahesh| Big Data| 4000|
          | Krish| Big Data| 4000|
          | Mahesh|Data Science| 3000|
          |Sudhanshu|Data Science| 20000|
          |Sudhanshu| IOT| 10000|
          |Sudhanshu| Big Data| 5000|
          | Sunny|Data Science| 10000|
          | Sunny| Big Data| 2000|
          +---------+------------+------+


          使用groupby方法對dataframe某些列進(jìn)行分組

          df_spark.groupBy("Name")
          <pyspark.sql.group.GroupedData at 0x227454d4be0>

          可以看到分組的結(jié)果是GroupedData對象,它不能使用show等方法打印 GroupedData對象需要進(jìn)行聚合操作,才能重新轉(zhuǎn)換為dataframe 聚合函數(shù)有sum、count、avg、max、min等

          df_spark.groupBy("Departments").sum().show()
          +------------+-----------+
          | Departments|sum(salary)|
          +------------+-----------+
          | IOT| 15000|
          | Big Data| 15000|
          |Data Science| 43000|
          +------------+-----------+

          三:總結(jié)

          Pandas的dataframe與PySpark的dataframe有許多相似之處,熟悉Pandas的同學(xué)可以很快適應(yīng)它的API。目前可以粗淺地把PySpark理解為”分布式的Pandas“,不過,PySpark還有分布式機(jī)器學(xué)習(xí)的功能——Spark MLlib(可以理解為分布式的Sklearn、TensorFlow等),后續(xù)會給大家介紹。在集群中,它的dataframe可以分布在不同的機(jī)器上,以此處理海量數(shù)據(jù)。有興趣的小伙伴可以通過虛擬機(jī)搭建一個(gè)Spark集群,進(jìn)一步學(xué)習(xí)Spark。

          Apache Spark? - 用于大規(guī)模數(shù)據(jù)分析的統(tǒng)一引擎

          每天鎖定螞蟻老師抖音直播間,給你介紹Python副業(yè)的玩法:



          瀏覽 65
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久操激情网 | 国产 丝袜 人妻 制服 一区 | 91视频在线 | 97成人网站 | 国产高清一级a片免费看古女 |