<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          PySpark入門級學(xué)習(xí)教程,框架思維(中)

          共 27164字,需瀏覽 55分鐘

           ·

          2021-04-21 05:30

          這周工作好忙,晚上陸陸續(xù)續(xù)寫了好幾波,周末來一次集合輸出,不過這個PySpark原定是分上下兩篇的,但是越學(xué)感覺越多,所以就分成了3 Parts,今天這一part主要就是講一下Spark SQL,這個實在好用!建議收藏學(xué)習(xí)哈哈。上一節(jié)的可點擊回顧下哈。《PySpark入門級學(xué)習(xí)教程,框架思維(上)》

          ?? Spark SQL使用

          在講Spark SQL前,先解釋下這個模塊。這個模塊是Spark中用來處理結(jié)構(gòu)化數(shù)據(jù)的,提供一個叫SparkDataFrame的東西并且自動解析為分布式SQL查詢數(shù)據(jù)。我們之前用過Python的Pandas庫,也大致了解了DataFrame,這個其實和它沒有太大的區(qū)別,只是調(diào)用的API可能有些不同罷了。

          我們通過使用Spark SQL來處理數(shù)據(jù),會讓我們更加地熟悉,比如可以用SQL語句、用SparkDataFrame的API或者Datasets API,我們可以按照需求隨心轉(zhuǎn)換,通過SparkDataFrame API 和 SQL 寫的邏輯,會被Spark優(yōu)化器Catalyst自動優(yōu)化成RDD,即便寫得不好也可能運行得很快(如果是直接寫RDD可能就掛了哈哈)。

          創(chuàng)建SparkDataFrame

          開始講SparkDataFrame,我們先學(xué)習(xí)下幾種創(chuàng)建的方法,分別是使用RDD來創(chuàng)建使用python的DataFrame來創(chuàng)建、使用List來創(chuàng)建、讀取數(shù)據(jù)文件來創(chuàng)建通過讀取數(shù)據(jù)庫來創(chuàng)建。

          1. 使用RDD來創(chuàng)建

          主要使用RDD的toDF方法。

          rdd = sc.parallelize([("Sam"2888), ("Flora"2890), ("Run"160)])
          df = rdd.toDF(["name""age""score"])
          df.show()
          df.printSchema()

          # +-----+---+-----+
          # | name|age|score|
          # +-----+---+-----+
          # |  Sam| 28|   88|
          # |Flora| 28|   90|
          # |  Run|  1|   60|
          # +-----+---+-----+
          # root
          #  |-- name: string (nullable = true)
          #  |-- age: long (nullable = true)
          #  |-- score: long (nullable = true)
          2. 使用python的DataFrame來創(chuàng)建
          df = pd.DataFrame([['Sam'2888], ['Flora'2890], ['Run'160]],
                            columns=['name''age''score'])
          print(">> 打印DataFrame:")
          print(df)
          print("\n")
          Spark_df = spark.createDataFrame(df)
          print(">> 打印SparkDataFrame:")
          Spark_df.show()
          # >> 打印DataFrame:
          #     name  age  score
          # 0    Sam   28     88
          # 1  Flora   28     90
          # 2    Run    1     60
          # >> 打印SparkDataFrame:
          # +-----+---+-----+
          # | name|age|score|
          # +-----+---+-----+
          # |  Sam| 28|   88|
          # |Flora| 28|   90|
          # |  Run|  1|   60|
          # +-----+---+-----+
          3. 使用List來創(chuàng)建
          list_values = [['Sam'2888], ['Flora'2890], ['Run'160]]
          Spark_df = spark.createDataFrame(list_values, ['name''age''score'])
          Spark_df.show()
          # +-----+---+-----+
          # | name|age|score|
          # +-----+---+-----+
          # |  Sam| 28|   88|
          # |Flora| 28|   90|
          # |  Run|  1|   60|
          # +-----+---+-----+
          4. 讀取數(shù)據(jù)文件來創(chuàng)建
          # 4.1 CSV文件
          df = spark.read.option("header""true")\
              .option("inferSchema""true")\
              .option("delimiter"",")\
              .csv("./test/data/titanic/train.csv")
          df.show(5)
          df.printSchema()

          # 4.2 json文件
          df = spark.read.json("./test/data/hello_samshare.json")
          df.show(5)
          df.printSchema()
          5. 通過讀取數(shù)據(jù)庫來創(chuàng)建
          # 5.1 讀取hive數(shù)據(jù)
          spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
          spark.sql("LOAD DATA LOCAL INPATH 'data/kv1.txt' INTO TABLE src")
          df = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
          df.show(5)

          # 5.2 讀取mysql數(shù)據(jù)
          url = "jdbc:mysql://localhost:3306/test"
          df = spark.read.format("jdbc") \
           .option("url", url) \
           .option("dbtable""runoob_tbl") \
           .option("user""root") \
           .option("password""8888") \
           .load()\
          df.show()

          常用的SparkDataFrame API

          這里我大概是分成了幾部分來看這些APIs,分別是查看DataFrame的APIs、簡單處理DataFrame的APIs、DataFrame的列操作APIs、DataFrame的一些思路變換操作APIsDataFrame的一些統(tǒng)計操作APIs,這樣子也有助于我們了解這些API的功能,以后遇見實際問題的時候可以解決。

          首先我們這小節(jié)全局用到的數(shù)據(jù)集如下:

          from pyspark.sql import functions as F
          from pyspark.sql import SparkSession
          # SparkSQL的許多功能封裝在SparkSession的方法接口中, SparkContext則不行的。
          spark = SparkSession.builder \
              .appName("sam_SamShare") \
              .config("master""local[4]") \
              .enableHiveSupport() \
              .getOrCreate()
          sc = spark.sparkContext

          # 創(chuàng)建一個SparkDataFrame
          rdd = sc.parallelize([("Sam"2888"M"),
                                ("Flora"2890"F"),
                                ("Run"160None),
                                ("Peter"55100"M"),
                                ("Mei"5495"F")])
          df = rdd.toDF(["name""age""score""sex"])
          df.show()
          df.printSchema()

          # +-----+---+-----+----+
          # | name|age|score| sex|
          # +-----+---+-----+----+
          # |  Sam| 28|   88|   M|
          # |Flora| 28|   90|   F|
          # |  Run|  1|   60|null|
          # |Peter| 55|  100|   M|
          # |  Mei| 54|   95|   F|
          # +-----+---+-----+----+
          # root
          #  |-- name: string (nullable = true)
          #  |-- age: long (nullable = true)
          #  |-- score: long (nullable = true)
          #  |-- sex: string (nullable = true)
          1、查看DataFrame的APIs
          # DataFrame.collect
          # 以列表形式返回行
          df.collect()
          # [Row(name='Sam', age=28, score=88, sex='M'),
          # Row(name='Flora', age=28, score=90, sex='F'),
          # Row(name='Run', age=1, score=60, sex=None),
          # Row(name='Peter', age=55, score=100, sex='M'),
          # Row(name='Mei', age=54, score=95, sex='F')]

          # DataFrame.count
          df.count()
          # 5

          # DataFrame.columns
          df.columns
          # ['name', 'age', 'score', 'sex']

          # DataFrame.dtypes
          df.dtypes
          # [('name', 'string'), ('age', 'bigint'), ('score', 'bigint'), ('sex', 'string')]

          # DataFrame.describe
          # 返回列的基礎(chǔ)統(tǒng)計信息
          df.describe(['age']).show()
          # +-------+------------------+
          # |summary|               age|
          # +-------+------------------+
          # |  count|                 5|
          # |   mean|              33.2|
          # | stddev|22.353970564532826|
          # |    min|                 1|
          # |    max|                55|
          # +-------+------------------+
          df.describe().show()
          # +-------+-----+------------------+------------------+----+
          # |summary| name|               age|             score| sex|
          # +-------+-----+------------------+------------------+----+
          # |  count|    5|                 5|                 5|   4|
          # |   mean| null|              33.2|              86.6|null|
          # | stddev| null|22.353970564532826|15.582040944625966|null|
          # |    min|Flora|                 1|                60|   F|
          # |    max|  Sam|                55|               100|   M|
          # +-------+-----+------------------+------------------+----+

          # DataFrame.select
          # 選定指定列并按照一定順序呈現(xiàn)
          df.select("sex""score").show()

          # DataFrame.first
          # DataFrame.head
          # 查看第1條數(shù)據(jù)
          df.first()
          # Row(name='Sam', age=28, score=88, sex='M')
          df.head(1)
          # [Row(name='Sam', age=28, score=88, sex='M')]


          # DataFrame.freqItems
          # 查看指定列的枚舉值
          df.freqItems(["age","sex"]).show()
          # +---------------+-------------+
          # |  age_freqItems|sex_freqItems|
          # +---------------+-------------+
          # |[55, 1, 28, 54]|      [M, F,]|
          # +---------------+-------------+

          # DataFrame.summary
          df.summary().show()
          # +-------+-----+------------------+------------------+----+
          # |summary| name|               age|             score| sex|
          # +-------+-----+------------------+------------------+----+
          # |  count|    5|                 5|                 5|   4|
          # |   mean| null|              33.2|              86.6|null|
          # | stddev| null|22.353970564532826|15.582040944625966|null|
          # |    min|Flora|                 1|                60|   F|
          # |    25%| null|                28|                88|null|
          # |    50%| null|                28|                90|null|
          # |    75%| null|                54|                95|null|
          # |    max|  Sam|                55|               100|   M|
          # +-------+-----+------------------+------------------+----+

          # DataFrame.sample
          # 按照一定規(guī)則從df隨機抽樣數(shù)據(jù)
          df.sample(0.5).show()
          # +-----+---+-----+----+
          # | name|age|score| sex|
          # +-----+---+-----+----+
          # |  Sam| 28|   88|   M|
          # |  Run|  1|   60|null|
          # |Peter| 55|  100|   M|
          # +-----+---+-----+----+
          2、簡單處理DataFrame的APIs
          # DataFrame.distinct
          # 對數(shù)據(jù)集進行去重
          df.distinct().show()

          # DataFrame.dropDuplicates
          # 對指定列去重
          df.dropDuplicates(["sex"]).show()
          # +-----+---+-----+----+
          # | name|age|score| sex|
          # +-----+---+-----+----+
          # |Flora| 28|   90|   F|
          # |  Run|  1|   60|null|
          # |  Sam| 28|   88|   M|
          # +-----+---+-----+----+

          # DataFrame.exceptAll
          # DataFrame.subtract
          # 根據(jù)指定的df對df進行去重
          df1 = spark.createDataFrame(
                  [("a"1), ("a"1), ("b",  3), ("c"4)], ["C1""C2"])
          df2 = spark.createDataFrame([("a"1), ("b"3)], ["C1""C2"])
          df3 = df1.exceptAll(df2)  # 沒有去重的功效
          df4 = df1.subtract(df2)  # 有去重的奇效
          df1.show()
          df2.show()
          df3.show()
          df4.show()
          # +---+---+
          # | C1| C2|
          # +---+---+
          # |  a|  1|
          # |  a|  1|
          # |  b|  3|
          # |  c|  4|
          # +---+---+
          # +---+---+
          # | C1| C2|
          # +---+---+
          # |  a|  1|
          # |  b|  3|
          # +---+---+
          # +---+---+
          # | C1| C2|
          # +---+---+
          # |  a|  1|
          # |  c|  4|
          # +---+---+
          # +---+---+
          # | C1| C2|
          # +---+---+
          # |  c|  4|
          # +---+---+

          # DataFrame.intersectAll
          # 返回兩個DataFrame的交集
          df1 = spark.createDataFrame(
                  [("a"1), ("a"1), ("b",  3), ("c"4)], ["C1""C2"])
          df2 = spark.createDataFrame([("a"1), ("b"4)], ["C1""C2"])
          df1.intersectAll(df2).show()
          # +---+---+
          # | C1| C2|
          # +---+---+
          # |  a|  1|
          # +---+---+

          # DataFrame.drop
          # 丟棄指定列
          df.drop('age').show()

          # DataFrame.withColumn
          # 新增列
          df1 = df.withColumn("birth_year"2021 - df.age)
          df1.show()
          # +-----+---+-----+----+----------+
          # | name|age|score| sex|birth_year|
          # +-----+---+-----+----+----------+
          # |  Sam| 28|   88|   M|      1993|
          # |Flora| 28|   90|   F|      1993|
          # |  Run|  1|   60|null|      2020|
          # |Peter| 55|  100|   M|      1966|
          # |  Mei| 54|   95|   F|      1967|
          # +-----+---+-----+----+----------+

          # DataFrame.withColumnRenamed
          # 重命名列名
          df1 = df.withColumnRenamed("sex""gender")
          df1.show()
          # +-----+---+-----+------+
          # | name|age|score|gender|
          # +-----+---+-----+------+
          # |  Sam| 28|   88|     M|
          # |Flora| 28|   90|     F|
          # |  Run|  1|   60|  null|
          # |Peter| 55|  100|     M|
          # |  Mei| 54|   95|     F|
          # +-----+---+-----+------+


          # DataFrame.dropna
          # 丟棄空值,DataFrame.dropna(how='any', thresh=None, subset=None)
          df.dropna(how='all', subset=['sex']).show()
          # +-----+---+-----+---+
          # | name|age|score|sex|
          # +-----+---+-----+---+
          # |  Sam| 28|   88|  M|
          # |Flora| 28|   90|  F|
          # |Peter| 55|  100|  M|
          # |  Mei| 54|   95|  F|
          # +-----+---+-----+---+

          # DataFrame.fillna
          # 空值填充操作
          df1 = spark.createDataFrame(
                  [("a"None), ("a"1), (None,  3), ("c"4)], ["C1""C2"])
          # df2 = df1.na.fill({"C1": "d", "C2": 99})
          df2 = df1.fillna({"C1""d""C2"99})
          df1.show()
          df2.show()

          # DataFrame.filter
          # 根據(jù)條件過濾
          df.filter(df.age>50).show()
          # +-----+---+-----+---+
          # | name|age|score|sex|
          # +-----+---+-----+---+
          # |Peter| 55|  100|  M|
          # |  Mei| 54|   95|  F|
          # +-----+---+-----+---+
          df.where(df.age==28).show()
          # +-----+---+-----+---+
          # | name|age|score|sex|
          # +-----+---+-----+---+
          # |  Sam| 28|   88|  M|
          # |Flora| 28|   90|  F|
          # +-----+---+-----+---+
          df.filter("age<18").show()
          # +----+---+-----+----+
          # |name|age|score| sex|
          # +----+---+-----+----+
          # | Run|  1|   60|null|
          # +----+---+-----+----+


          # DataFrame.join
          # 這個不用多解釋了,直接上案例來看看具體的語法即可,DataFrame.join(other, on=None, how=None)
          df1 = spark.createDataFrame(
                  [("a"1), ("d"1), ("b",  3), ("c"4)], ["id""num1"])
          df2 = spark.createDataFrame([("a"1), ("b"3)], ["id""num2"])
          df1.join(df2, df1.id == df2.id, 'left').select(df1.id.alias("df1_id"),
                                                         df1.num1.alias("df1_num"),
                                                         df2.num2.alias("df2_num")
                                                         ).sort(["df1_id"], ascending=False)\
              .show()


          # DataFrame.agg(*exprs)
          # 聚合數(shù)據(jù),可以寫多個聚合方法,如果不寫groupBy的話就是對整個DF進行聚合
          # DataFrame.alias
          # 設(shè)置列或者DataFrame別名
          # DataFrame.groupBy
          # 根據(jù)某幾列進行聚合,如有多列用列表寫在一起,如 df.groupBy(["sex", "age"])
          df.groupBy("sex").agg(F.min(df.age).alias("最小年齡"),
                                F.expr("avg(age)").alias("平均年齡"),
                                F.expr("collect_list(name)").alias("姓名集合")
                                ).show()
          # +----+--------+--------+------------+
          # | sex|最小年齡|平均年齡|    姓名集合|
          # +----+--------+--------+------------+
          # |   F|      28|    41.0|[Flora, Mei]|
          # |null|       1|     1.0|       [Run]|
          # |   M|      28|    41.5|[Sam, Peter]|
          # +----+--------+--------+------------+


          # DataFrame.foreach
          # 對每一行進行函數(shù)方法的應(yīng)用
          def f(person):
              print(person.name)
          df.foreach(f)
          # Peter
          # Run
          # Sam
          # Flora
          # Mei

          # DataFrame.replace
          # 修改df里的某些值
          df1 = df.na.replace({"M""Male""F""Female"})
          df1.show()

          # DataFrame.union
          # 相當(dāng)于SQL里的union all操作
          df1 = spark.createDataFrame(
                  [("a"1), ("d"1), ("b",  3), ("c"4)], ["id""num"])
          df2 = spark.createDataFrame([("a"1), ("b"3)], ["id""num"])
          df1.union(df2).show()
          df1.unionAll(df2).show()
          # 這里union沒有去重,不知道為啥,有知道的朋友麻煩解釋下,謝謝了。
          # +---+---+
          # | id|num|
          # +---+---+
          # |  a|  1|
          # |  d|  1|
          # |  b|  3|
          # |  c|  4|
          # |  a|  1|
          # |  b|  3|
          # +---+---+

          # DataFrame.unionByName
          # 根據(jù)列名來進行合并數(shù)據(jù)集
          df1 = spark.createDataFrame([[123]], ["col0""col1""col2"])
          df2 = spark.createDataFrame([[456]], ["col1""col2""col0"])
          df1.unionByName(df2).show()
          # +----+----+----+
          # |col0|col1|col2|
          # +----+----+----+
          # |   1|   2|   3|
          # |   6|   4|   5|
          # +----+----+----+
          3、DataFrame的列操作APIs

          這里主要針對的是列進行操作,比如說重命名、排序、空值判斷、類型判斷等,這里就不展開寫demo了,看看語法應(yīng)該大家都懂了。

          Column.alias(*alias, **kwargs)  # 重命名列名
          Column.asc()  # 按照列進行升序排序
          Column.desc()  # 按照列進行降序排序
          Column.astype(dataType)  # 類型轉(zhuǎn)換
          Column.cast(dataType)  # 強制轉(zhuǎn)換類型
          Column.between(lowerBound, upperBound)  # 返回布爾值,是否在指定區(qū)間范圍內(nèi)
          Column.contains(other)  # 是否包含某個關(guān)鍵詞
          Column.endswith(other)  # 以什么結(jié)束的值,如 df.filter(df.name.endswith('ice')).collect()
          Column.isNotNull()  # 篩選非空的行
          Column.isNull()
          Column.isin(*cols)  # 返回包含某些值的行 df[df.name.isin("Bob", "Mike")].collect()
          Column.like(other)  # 返回含有關(guān)鍵詞的行
          Column.when(condition, value)  # 給True的賦值
          Column.otherwise(value)  # 與when搭配使用,df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()
          Column.rlike(other)  # 可以使用正則的匹配 df.filter(df.name.rlike('ice$')).collect()
          Column.startswith(other)  # df.filter(df.name.startswith('Al')).collect()
          Column.substr(startPos, length)  # df.select(df.name.substr(1, 3).alias("col")).collect()
          4、DataFrame的一些思路變換操作APIs
          # DataFrame.createOrReplaceGlobalTempView
          # DataFrame.dropGlobalTempView
          # 創(chuàng)建全局的試圖,注冊后可以使用sql語句來進行操作,生命周期取決于Spark application本身
          df.createOrReplaceGlobalTempView("people")
          spark.sql("select * from global_temp.people where sex = 'M' ").show()
          # +-----+---+-----+---+
          # | name|age|score|sex|
          # +-----+---+-----+---+
          # |  Sam| 28|   88|  M|
          # |Peter| 55|  100|  M|
          # +-----+---+-----+---+

          # DataFrame.createOrReplaceTempView
          # DataFrame.dropTempView
          # 創(chuàng)建本地臨時試圖,生命周期取決于用來創(chuàng)建此數(shù)據(jù)集的SparkSession
          df.createOrReplaceTempView("tmp_people")
          spark.sql("select * from tmp_people where sex = 'F' ").show()
          # +-----+---+-----+---+
          # | name|age|score|sex|
          # +-----+---+-----+---+
          # |Flora| 28|   90|  F|
          # |  Mei| 54|   95|  F|
          # +-----+---+-----+---+

          # DataFrame.cache\DataFrame.persist
          # 可以把一些數(shù)據(jù)放入緩存中,default storage level (MEMORY_AND_DISK).
          df.cache()
          df.persist()
          df.unpersist()

          # DataFrame.crossJoin
          # 返回兩個DataFrame的笛卡爾積關(guān)聯(lián)的DataFrame
          df1 = df.select("name""sex")
          df2 = df.select("name""sex")
          df3 = df1.crossJoin(df2)
          print("表1的記錄數(shù)", df1.count())
          print("表2的記錄數(shù)", df2.count())
          print("笛卡爾積后的記錄數(shù)", df3.count())
          # 表1的記錄數(shù) 5
          # 表2的記錄數(shù) 5
          # 笛卡爾積后的記錄數(shù) 25

          # DataFrame.toPandas
          # 把SparkDataFrame轉(zhuǎn)為 Pandas的DataFrame
          df.toPandas()

          # DataFrame.rdd
          # 把SparkDataFrame轉(zhuǎn)為rdd,這樣子可以用rdd的語法來操作數(shù)據(jù)
          df.rdd
          5、DataFrame的一些統(tǒng)計操作APIs
          # DataFrame.cov
          # 計算指定兩列的樣本協(xié)方差
          df.cov("age""score")
          # 324.59999999999997

          # DataFrame.corr
          # 計算指定兩列的相關(guān)系數(shù),DataFrame.corr(col1, col2, method=None),目前method只支持Pearson相關(guān)系數(shù)
          df.corr("age""score", method="pearson")
          # 0.9319004030498815

          # DataFrame.cube
          # 創(chuàng)建多維度聚合的結(jié)果,通常用于分析數(shù)據(jù),比如我們指定兩個列進行聚合,比如name和age,那么這個函數(shù)返回的聚合結(jié)果會
          # groupby("name", "age")
          # groupby("name")
          # groupby("age")
          # groupby(all)
          # 四個聚合結(jié)果的union all 的結(jié)果

          df1 = df.filter(df.name != "Run")
          print(df1.show())
          df1.cube("name""sex").count().show()
          # +-----+---+-----+---+
          # | name|age|score|sex|
          # +-----+---+-----+---+
          # |  Sam| 28|   88|  M|
          # |Flora| 28|   90|  F|
          # |Peter| 55|  100|  M|
          # |  Mei| 54|   95|  F|
          # +-----+---+-----+---+
          # cube 聚合之后的結(jié)果
          # +-----+----+-----+
          # | name| sex|count|
          # +-----+----+-----+
          # | null|   F|    2|
          # | null|null|    4|
          # |Flora|null|    1|
          # |Peter|null|    1|
          # | null|   M|    2|
          # |Peter|   M|    1|
          # |  Sam|   M|    1|
          # |  Sam|null|    1|
          # |  Mei|   F|    1|
          # |  Mei|null|    1|
          # |Flora|   F|    1|
          # +-----+----+-----+


          嘻嘻,恭喜你讀完啦,獎勵你一首歌,一起加油。


          瀏覽 45
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  精品久久一区二区三区四区 | 在线观看日韩三级片 | 午夜免费AV| 97国产精品视频人人做人人爱 | 操比视频在线观看 |