<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          【機器學(xué)習(xí)】3萬字長文,PySpark入門級學(xué)習(xí)教程,框架思維

          共 58270字,需瀏覽 117分鐘

           ·

          2021-09-13 23:01

          為什么要學(xué)習(xí)Spark?作為數(shù)據(jù)從業(yè)者多年,個人覺得Spark已經(jīng)越來越走進我們的日常工作了,無論是使用哪種編程語言,Python、Scala還是Java,都會或多或少接觸到Spark,它可以讓我們能夠用到集群的力量,可以對BigData進行高效操作,實現(xiàn)很多之前由于計算資源而無法輕易實現(xiàn)的東西。網(wǎng)上有很多關(guān)于Spark的好處,這里就不做過多的贅述,我們直接進入這篇文章的正文!


          關(guān)于PySpark,我們知道它是Python調(diào)用Spark的接口,我們可以通過調(diào)用Python API的方式來編寫Spark程序,它支持了大多數(shù)的Spark功能,比如SparkDataFrame、Spark SQL、Streaming、MLlib等等。只要我們了解Python的基本語法,那么在Python里調(diào)用Spark的力量就顯得十分easy了。下面我將會從相對宏觀的層面介紹一下PySpark,讓我們對于這個神器有一個框架性的認識,知道它能干什么,知道去哪里尋找問題解答,爭取看完這篇文章可以讓我們更加絲滑地入門PySpark。話不多說,馬上開始!

          ?? 目錄

          ?? 安裝指引

          安裝這塊本文就不展開具體的步驟了,畢竟大家的機子環(huán)境都不盡相同。不過可以簡單說幾點重要的步驟,然后節(jié)末放上一些安裝示例供大家參考。

          1)要使用PySpark,機子上要有Java開發(fā)環(huán)境

          2)環(huán)境變量記得要配置完整

          3)Mac下的/usr/local/ 路徑一般是隱藏的,PyCharm配置py4j和pyspark的時候可以使用 shift+command+G 來使用路徑訪問。

          4)Mac下如果修改了 ~/.bash_profile 的話,記得要重啟下PyCharm才會生效的哈

          5)版本記得要搞對,保險起見Java的jdk版本選擇低版本(別問我為什么知道),我選擇的是Java8.


          下面是一些示例demo,可以參考下:

          1)Mac下安裝spark,并配置pycharm-pyspark完整教程

          https://blog.csdn.net/shiyutianming/article/details/99946797

          2)virtualBox里安裝開發(fā)環(huán)境

          https://www.bilibili.com/video/BV1i4411i79a?p=3

          3)快速搭建spark開發(fā)環(huán)境,云哥項目

          https://github.com/lyhue1991/eat_pyspark_in_10_days

          ?? 基礎(chǔ)概念

          關(guān)于Spark的基礎(chǔ)概念,我在先前的文章里也有寫過,大家可以一起來回顧一下 《想學(xué)習(xí)Spark?先帶你了解一些基礎(chǔ)的知識》。作為補充,今天在這里也介紹一些在Spark中會經(jīng)常遇見的專有名詞。

          ???♀? Q1: 什么是RDD

          RDD的全稱是 Resilient Distributed Datasets,這是Spark的一種數(shù)據(jù)抽象集合,它可以被執(zhí)行在分布式的集群上進行各種操作,而且有較強的容錯機制。RDD可以被分為若干個分區(qū),每一個分區(qū)就是一個數(shù)據(jù)集片段,從而可以支持分布式計算。

          ???♀? Q2: RDD運行時相關(guān)的關(guān)鍵名詞

          簡單來說可以有 Client、Job、Master、Worker、Driver、Stage、Task以及Executor,這幾個東西在調(diào)優(yōu)的時候也會經(jīng)常遇到的。

          Client:指的是客戶端進程,主要負責(zé)提交job到Master;

          Job:Job來自于我們編寫的程序,Application包含一個或者多個job,job包含各種RDD操作;

          Master:指的是Standalone模式中的主控節(jié)點,負責(zé)接收來自Client的job,并管理著worker,可以給worker分配任務(wù)和資源(主要是driver和executor資源);

          Worker:指的是Standalone模式中的slave節(jié)點,負責(zé)管理本節(jié)點的資源,同時受Master管理,需要定期給Master回報heartbeat(心跳),啟動Driver和Executor;

          Driver:指的是 job(作業(yè))的主進程,一般每個Spark作業(yè)都會有一個Driver進程,負責(zé)整個作業(yè)的運行,包括了job的解析、Stage的生成、調(diào)度Task到Executor上去執(zhí)行;

          Stage:中文名 階段,是job的基本調(diào)度單位,因為每個job會分成若干組Task,每組任務(wù)就被稱為 Stage;

          Task:任務(wù),指的是直接運行在executor上的東西,是executor上的一個線程;

          Executor:指的是 執(zhí)行器,顧名思義就是真正執(zhí)行任務(wù)的地方了,一個集群可以被配置若干個Executor,每個Executor接收來自Driver的Task,并執(zhí)行它(可同時執(zhí)行多個Task)。

          ???♀? Q3: 什么是DAG

          全稱是 Directed Acyclic Graph,中文名是有向無環(huán)圖。Spark就是借用了DAG對RDD之間的關(guān)系進行了建模,用來描述RDD之間的因果依賴關(guān)系。因為在一個Spark作業(yè)調(diào)度中,多個作業(yè)任務(wù)之間也是相互依賴的,有些任務(wù)需要在一些任務(wù)執(zhí)行完成了才可以執(zhí)行的。在Spark調(diào)度中就是有DAGscheduler,它負責(zé)將job分成若干組Task組成的Stage。


          ???♀? Q4: Spark的部署模式有哪些

          主要有l(wèi)ocal模式、Standalone模式、Mesos模式、YARN模式。

          更多的解釋可以參考這位老哥的解釋。https://www.jianshu.com/p/3b8f85329664

          ???♀? Q5: Shuffle操作是什么

          Shuffle指的是數(shù)據(jù)從Map端到Reduce端的數(shù)據(jù)傳輸過程,Shuffle性能的高低直接會影響程序的性能。因為Reduce task需要跨節(jié)點去拉在分布在不同節(jié)點上的Map task計算結(jié)果,這一個過程是需要有磁盤IO消耗以及數(shù)據(jù)網(wǎng)絡(luò)傳輸?shù)南牡模孕枰鶕?jù)實際數(shù)據(jù)情況進行適當(dāng)調(diào)整。另外,Shuffle可以分為兩部分,分別是Map階段的數(shù)據(jù)準(zhǔn)備與Reduce階段的數(shù)據(jù)拷貝處理,在Map端我們叫Shuffle Write,在Reduce端我們叫Shuffle Read。

          ???♀? Q6: 什么是惰性執(zhí)行

          這是RDD的一個特性,在RDD中的算子可以分為Transform算子和Action算子,其中Transform算子的操作都不會真正執(zhí)行,只會記錄一下依賴關(guān)系,直到遇見了Action算子,在這之前的所有Transform操作才會被觸發(fā)計算,這就是所謂的惰性執(zhí)行。具體哪些是Transform和Action算子,可以看下一節(jié)。

          ?? 常用函數(shù)

          從網(wǎng)友的總結(jié)來看比較常用的算子大概可以分為下面幾種,所以就演示一下這些算子,如果需要看更多的算子或者解釋,建議可以移步到官方API文檔去Search一下哈。

          pyspark.RDD:http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD

          圖來自 edureka 的pyspark入門教程

          下面我們用自己創(chuàng)建的RDD:sc.parallelize(range(1,11),4)

          import os
          import pyspark
          from pyspark import SparkContext, SparkConf

          conf = SparkConf().setAppName("test_SamShare").setMaster("local[4]")
          sc = SparkContext(conf=conf)

          # 使用 parallelize方法直接實例化一個RDD
          rdd = sc.parallelize(range(1,11),4# 這里的 4 指的是分區(qū)數(shù)量
          rdd.take(100)
          # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


          """
          ----------------------------------------------
                          Transform算子解析
          ----------------------------------------------
          """

          # 以下的操作由于是Transform操作,因為我們需要在最后加上一個collect算子用來觸發(fā)計算。
          # 1. map: 和python差不多,map轉(zhuǎn)換就是對每一個元素進行一個映射
          rdd = sc.parallelize(range(111), 4)
          rdd_map = rdd.map(lambda x: x*2)
          print("原始數(shù)據(jù):", rdd.collect())
          print("擴大2倍:", rdd_map.collect())
          # 原始數(shù)據(jù): [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
          # 擴大2倍: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

          # 2. flatMap: 這個相比于map多一個flat(壓平)操作,顧名思義就是要把高維的數(shù)組變成一維
          rdd2 = sc.parallelize(["hello SamShare""hello PySpark"])
          print("原始數(shù)據(jù):", rdd2.collect())
          print("直接split之后的map結(jié)果:", rdd2.map(lambda x: x.split(" ")).collect())
          print("直接split之后的flatMap結(jié)果:", rdd2.flatMap(lambda x: x.split(" ")).collect())
          # 直接split之后的map結(jié)果: [['hello', 'SamShare'], ['hello', 'PySpark']]
          # 直接split之后的flatMap結(jié)果: ['hello', 'SamShare', 'hello', 'PySpark']

          # 3. filter: 過濾數(shù)據(jù)
          rdd = sc.parallelize(range(111), 4)
          print("原始數(shù)據(jù):", rdd.collect())
          print("過濾奇數(shù):", rdd.filter(lambda x: x % 2 == 0).collect())
          # 原始數(shù)據(jù): [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
          # 過濾奇數(shù): [2, 4, 6, 8, 10]

          # 4. distinct: 去重元素
          rdd = sc.parallelize([2248888163232])
          print("原始數(shù)據(jù):", rdd.collect())
          print("去重數(shù)據(jù):", rdd.distinct().collect())
          # 原始數(shù)據(jù): [2, 2, 4, 8, 8, 8, 8, 16, 32, 32]
          # 去重數(shù)據(jù): [4, 8, 16, 32, 2]

          # 5. reduceByKey: 根據(jù)key來映射數(shù)據(jù)
          from operator import add
          rdd = sc.parallelize([("a"1), ("b"1), ("a"1)])
          print("原始數(shù)據(jù):", rdd.collect())
          print("原始數(shù)據(jù):", rdd.reduceByKey(add).collect())
          # 原始數(shù)據(jù): [('a', 1), ('b', 1), ('a', 1)]
          # 原始數(shù)據(jù): [('b', 1), ('a', 2)]

          # 6. mapPartitions: 根據(jù)分區(qū)內(nèi)的數(shù)據(jù)進行映射操作
          rdd = sc.parallelize([1234], 2)
          def f(iterator):
              yield sum(iterator)
          print(rdd.collect())
          print(rdd.mapPartitions(f).collect())
          # [1, 2, 3, 4]
          # [3, 7]

          # 7. sortBy: 根據(jù)規(guī)則進行排序
          tmp = [('a'1), ('b'2), ('1'3), ('d'4), ('2'5)]
          print(sc.parallelize(tmp).sortBy(lambda x: x[0]).collect())
          print(sc.parallelize(tmp).sortBy(lambda x: x[1]).collect())
          # [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
          # [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

          # 8. subtract: 數(shù)據(jù)集相減, Return each value in self that is not contained in other.
          x = sc.parallelize([("a"1), ("b"4), ("b"5), ("a"3)])
          y = sc.parallelize([("a"3), ("c"None)])
          print(sorted(x.subtract(y).collect()))
          # [('a', 1), ('b', 4), ('b', 5)]

          # 9. union: 合并兩個RDD
          rdd = sc.parallelize([1123])
          print(rdd.union(rdd).collect())
          # [1, 1, 2, 3, 1, 1, 2, 3]

          # 10. intersection: 取兩個RDD的交集,同時有去重的功效
          rdd1 = sc.parallelize([110234523])
          rdd2 = sc.parallelize([162378])
          print(rdd1.intersection(rdd2).collect())
          # [1, 2, 3]

          # 11. cartesian: 生成笛卡爾積
          rdd = sc.parallelize([12])
          print(sorted(rdd.cartesian(rdd).collect()))
          # [(1, 1), (1, 2), (2, 1), (2, 2)]

          # 12. zip: 拉鏈合并,需要兩個RDD具有相同的長度以及分區(qū)數(shù)量
          x = sc.parallelize(range(05))
          y = sc.parallelize(range(10001005))
          print(x.collect())
          print(y.collect())
          print(x.zip(y).collect())
          # [0, 1, 2, 3, 4]
          # [1000, 1001, 1002, 1003, 1004]
          # [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]

          # 13. zipWithIndex: 將RDD和一個從0開始的遞增序列按照拉鏈方式連接。
          rdd_name = sc.parallelize(["LiLei""Hanmeimei""Lily""Lucy""Ann""Dachui""RuHua"])
          rdd_index = rdd_name.zipWithIndex()
          print(rdd_index.collect())
          # [('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]

          # 14. groupByKey: 按照key來聚合數(shù)據(jù)
          rdd = sc.parallelize([("a"1), ("b"1), ("a"1)])
          print(rdd.collect())
          print(sorted(rdd.groupByKey().mapValues(len).collect()))
          print(sorted(rdd.groupByKey().mapValues(list).collect()))
          # [('a', 1), ('b', 1), ('a', 1)]
          # [('a', 2), ('b', 1)]
          # [('a', [1, 1]), ('b', [1])]

          # 15. sortByKey:
          tmp = [('a'1), ('b'2), ('1'3), ('d'4), ('2'5)]
          print(sc.parallelize(tmp).sortByKey(True1).collect())
          # [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

          # 16. join:
          x = sc.parallelize([("a"1), ("b"4)])
          y = sc.parallelize([("a"2), ("a"3)])
          print(sorted(x.join(y).collect()))
          # [('a', (1, 2)), ('a', (1, 3))]

          # 17. leftOuterJoin/rightOuterJoin
          x = sc.parallelize([("a"1), ("b"4)])
          y = sc.parallelize([("a"2)])
          print(sorted(x.leftOuterJoin(y).collect()))
          # [('a', (1, 2)), ('b', (4, None))]

          """
          ----------------------------------------------
                          Action算子解析
          ----------------------------------------------
          """

          # 1. collect: 指的是把數(shù)據(jù)都匯集到driver端,便于后續(xù)的操作
          rdd = sc.parallelize(range(05))
          rdd_collect = rdd.collect()
          print(rdd_collect)
          # [0, 1, 2, 3, 4]

          # 2. first: 取第一個元素
          sc.parallelize([234]).first()
          # 2

          # 3. collectAsMap: 轉(zhuǎn)換為dict,使用這個要注意了,不要對大數(shù)據(jù)用,不然全部載入到driver端會爆內(nèi)存
          m = sc.parallelize([(12), (34)]).collectAsMap()
          m
          # {1: 2, 3: 4}

          # 4. reduce: 逐步對兩個元素進行操作
          rdd = sc.parallelize(range(10),5)
          print(rdd.reduce(lambda x,y:x+y))
          # 45

          # 5. countByKey/countByValue:
          rdd = sc.parallelize([("a"1), ("b"1), ("a"1)])
          print(sorted(rdd.countByKey().items()))
          print(sorted(rdd.countByValue().items()))
          # [('a', 2), ('b', 1)]
          # [(('a', 1), 2), (('b', 1), 1)]

          # 6. take: 相當(dāng)于取幾個數(shù)據(jù)到driver端
          rdd = sc.parallelize([("a"1), ("b"1), ("a"1)])
          print(rdd.take(5))
          # [('a', 1), ('b', 1), ('a', 1)]

          # 7. saveAsTextFile: 保存rdd成text文件到本地
          text_file = "./data/rdd.txt"
          rdd = sc.parallelize(range(5))
          rdd.saveAsTextFile(text_file)

          # 8. takeSample: 隨機取數(shù)
          rdd = sc.textFile("./test/data/hello_samshare.txt"4)  # 這里的 4 指的是分區(qū)數(shù)量
          rdd_sample = rdd.takeSample(True20)  # withReplacement 參數(shù)1:代表是否是有放回抽樣
          rdd_sample

          # 9. foreach: 對每一個元素執(zhí)行某種操作,不生成新的RDD
          rdd = sc.parallelize(range(10), 5)
          accum = sc.accumulator(0)
          rdd.foreach(lambda x: accum.add(x))
          print(accum.value)
          # 45

          ?? Spark SQL使用

          在講Spark SQL前,先解釋下這個模塊。這個模塊是Spark中用來處理結(jié)構(gòu)化數(shù)據(jù)的,提供一個叫SparkDataFrame的東西并且自動解析為分布式SQL查詢數(shù)據(jù)。我們之前用過Python的Pandas庫,也大致了解了DataFrame,這個其實和它沒有太大的區(qū)別,只是調(diào)用的API可能有些不同罷了。

          我們通過使用Spark SQL來處理數(shù)據(jù),會讓我們更加地熟悉,比如可以用SQL語句、用SparkDataFrame的API或者Datasets API,我們可以按照需求隨心轉(zhuǎn)換,通過SparkDataFrame API 和 SQL 寫的邏輯,會被Spark優(yōu)化器Catalyst自動優(yōu)化成RDD,即便寫得不好也可能運行得很快(如果是直接寫RDD可能就掛了哈哈)。

          創(chuàng)建SparkDataFrame

          開始講SparkDataFrame,我們先學(xué)習(xí)下幾種創(chuàng)建的方法,分別是使用RDD來創(chuàng)建、使用python的DataFrame來創(chuàng)建使用List來創(chuàng)建、讀取數(shù)據(jù)文件來創(chuàng)建通過讀取數(shù)據(jù)庫來創(chuàng)建。

          1. 使用RDD來創(chuàng)建

          主要使用RDD的toDF方法。

          rdd = sc.parallelize([("Sam"2888), ("Flora"2890), ("Run"160)])
          df = rdd.toDF(["name""age""score"])
          df.show()
          df.printSchema()

          # +-----+---+-----+
          # | name|age|score|
          # +-----+---+-----+
          # |  Sam| 28|   88|
          # |Flora| 28|   90|
          # |  Run|  1|   60|
          # +-----+---+-----+
          # root
          #  |-- name: string (nullable = true)
          #  |-- age: long (nullable = true)
          #  |-- score: long (nullable = true)
          2. 使用python的DataFrame來創(chuàng)建
          df = pd.DataFrame([['Sam'2888], ['Flora'2890], ['Run'160]],
                            columns=['name''age''score'])
          print(">> 打印DataFrame:")
          print(df)
          print("\n")
          Spark_df = spark.createDataFrame(df)
          print(">> 打印SparkDataFrame:")
          Spark_df.show()
          # >> 打印DataFrame:
          #     name  age  score
          # 0    Sam   28     88
          # 1  Flora   28     90
          # 2    Run    1     60
          # >> 打印SparkDataFrame:
          # +-----+---+-----+
          # | name|age|score|
          # +-----+---+-----+
          # |  Sam| 28|   88|
          # |Flora| 28|   90|
          # |  Run|  1|   60|
          # +-----+---+-----+
          3. 使用List來創(chuàng)建
          list_values = [['Sam'2888], ['Flora'2890], ['Run'160]]
          Spark_df = spark.createDataFrame(list_values, ['name''age''score'])
          Spark_df.show()
          # +-----+---+-----+
          # | name|age|score|
          # +-----+---+-----+
          # |  Sam| 28|   88|
          # |Flora| 28|   90|
          # |  Run|  1|   60|
          # +-----+---+-----+
          4. 讀取數(shù)據(jù)文件來創(chuàng)建
          # 4.1 CSV文件
          df = spark.read.option("header""true")\
              .option("inferSchema""true")\
              .option("delimiter"",")\
              .csv("./test/data/titanic/train.csv")
          df.show(5)
          df.printSchema()

          # 4.2 json文件
          df = spark.read.json("./test/data/hello_samshare.json")
          df.show(5)
          df.printSchema()
          5. 通過讀取數(shù)據(jù)庫來創(chuàng)建
          # 5.1 讀取hive數(shù)據(jù)
          spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
          spark.sql("LOAD DATA LOCAL INPATH 'data/kv1.txt' INTO TABLE src")
          df = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
          df.show(5)

          # 5.2 讀取mysql數(shù)據(jù)
          url = "jdbc:mysql://localhost:3306/test"
          df = spark.read.format("jdbc") \
           .option("url", url) \
           .option("dbtable""runoob_tbl") \
           .option("user""root") \
           .option("password""8888") \
           .load()\
          df.show()

          常用的SparkDataFrame API

          這里我大概是分成了幾部分來看這些APIs,分別是查看DataFrame的APIs、簡單處理DataFrame的APIs、DataFrame的列操作APIs、DataFrame的一些思路變換操作APIs、DataFrame的一些統(tǒng)計操作APIs,這樣子也有助于我們了解這些API的功能,以后遇見實際問題的時候可以解決。

          首先我們這小節(jié)全局用到的數(shù)據(jù)集如下:

          from pyspark.sql import functions as F
          from pyspark.sql import SparkSession
          # SparkSQL的許多功能封裝在SparkSession的方法接口中, SparkContext則不行的。
          spark = SparkSession.builder \
              .appName("sam_SamShare") \
              .config("master""local[4]") \
              .enableHiveSupport() \
              .getOrCreate()
          sc = spark.sparkContext

          # 創(chuàng)建一個SparkDataFrame
          rdd = sc.parallelize([("Sam"2888"M"),
                                ("Flora"2890"F"),
                                ("Run"160None),
                                ("Peter"55100"M"),
                                ("Mei"5495"F")])
          df = rdd.toDF(["name""age""score""sex"])
          df.show()
          df.printSchema()

          # +-----+---+-----+----+
          # | name|age|score| sex|
          # +-----+---+-----+----+
          # |  Sam| 28|   88|   M|
          # |Flora| 28|   90|   F|
          # |  Run|  1|   60|null|
          # |Peter| 55|  100|   M|
          # |  Mei| 54|   95|   F|
          # +-----+---+-----+----+
          # root
          #  |-- name: string (nullable = true)
          #  |-- age: long (nullable = true)
          #  |-- score: long (nullable = true)
          #  |-- sex: string (nullable = true)
          1. 查看DataFrame的APIs
          # DataFrame.collect
          # 以列表形式返回行
          df.collect()
          # [Row(name='Sam', age=28, score=88, sex='M'),
          # Row(name='Flora', age=28, score=90, sex='F'),
          # Row(name='Run', age=1, score=60, sex=None),
          # Row(name='Peter', age=55, score=100, sex='M'),
          # Row(name='Mei', age=54, score=95, sex='F')]

          # DataFrame.count
          df.count()
          # 5

          # DataFrame.columns
          df.columns
          # ['name', 'age', 'score', 'sex']

          # DataFrame.dtypes
          df.dtypes
          # [('name', 'string'), ('age', 'bigint'), ('score', 'bigint'), ('sex', 'string')]

          # DataFrame.describe
          # 返回列的基礎(chǔ)統(tǒng)計信息
          df.describe(['age']).show()
          # +-------+------------------+
          # |summary|               age|
          # +-------+------------------+
          # |  count|                 5|
          # |   mean|              33.2|
          # | stddev|22.353970564532826|
          # |    min|                 1|
          # |    max|                55|
          # +-------+------------------+
          df.describe().show()
          # +-------+-----+------------------+------------------+----+
          # |summary| name|               age|             score| sex|
          # +-------+-----+------------------+------------------+----+
          # |  count|    5|                 5|                 5|   4|
          # |   mean| null|              33.2|              86.6|null|
          # | stddev| null|22.353970564532826|15.582040944625966|null|
          # |    min|Flora|                 1|                60|   F|
          # |    max|  Sam|                55|               100|   M|
          # +-------+-----+------------------+------------------+----+

          # DataFrame.select
          # 選定指定列并按照一定順序呈現(xiàn)
          df.select("sex""score").show()

          # DataFrame.first
          # DataFrame.head
          # 查看第1條數(shù)據(jù)
          df.first()
          # Row(name='Sam', age=28, score=88, sex='M')
          df.head(1)
          # [Row(name='Sam', age=28, score=88, sex='M')]


          # DataFrame.freqItems
          # 查看指定列的枚舉值
          df.freqItems(["age","sex"]).show()
          # +---------------+-------------+
          # |  age_freqItems|sex_freqItems|
          # +---------------+-------------+
          # |[55, 1, 28, 54]|      [M, F,]|
          # +---------------+-------------+

          # DataFrame.summary
          df.summary().show()
          # +-------+-----+------------------+------------------+----+
          # |summary| name|               age|             score| sex|
          # +-------+-----+------------------+------------------+----+
          # |  count|    5|                 5|                 5|   4|
          # |   mean| null|              33.2|              86.6|null|
          # | stddev| null|22.353970564532826|15.582040944625966|null|
          # |    min|Flora|                 1|                60|   F|
          # |    25%| null|                28|                88|null|
          # |    50%| null|                28|                90|null|
          # |    75%| null|                54|                95|null|
          # |    max|  Sam|                55|               100|   M|
          # +-------+-----+------------------+------------------+----+

          # DataFrame.sample
          # 按照一定規(guī)則從df隨機抽樣數(shù)據(jù)
          df.sample(0.5).show()
          # +-----+---+-----+----+
          # | name|age|score| sex|
          # +-----+---+-----+----+
          # |  Sam| 28|   88|   M|
          # |  Run|  1|   60|null|
          # |Peter| 55|  100|   M|
          # +-----+---+-----+----+
          2. 簡單處理DataFrame的APIs
          # DataFrame.distinct
          # 對數(shù)據(jù)集進行去重
          df.distinct().show()

          # DataFrame.dropDuplicates
          # 對指定列去重
          df.dropDuplicates(["sex"]).show()
          # +-----+---+-----+----+
          # | name|age|score| sex|
          # +-----+---+-----+----+
          # |Flora| 28|   90|   F|
          # |  Run|  1|   60|null|
          # |  Sam| 28|   88|   M|
          # +-----+---+-----+----+

          # DataFrame.exceptAll
          # DataFrame.subtract
          # 根據(jù)指定的df對df進行去重
          df1 = spark.createDataFrame(
                  [("a"1), ("a"1), ("b",  3), ("c"4)], ["C1""C2"])
          df2 = spark.createDataFrame([("a"1), ("b"3)], ["C1""C2"])
          df3 = df1.exceptAll(df2)  # 沒有去重的功效
          df4 = df1.subtract(df2)  # 有去重的奇效
          df1.show()
          df2.show()
          df3.show()
          df4.show()
          # +---+---+
          # | C1| C2|
          # +---+---+
          # |  a|  1|
          # |  a|  1|
          # |  b|  3|
          # |  c|  4|
          # +---+---+
          # +---+---+
          # | C1| C2|
          # +---+---+
          # |  a|  1|
          # |  b|  3|
          # +---+---+
          # +---+---+
          # | C1| C2|
          # +---+---+
          # |  a|  1|
          # |  c|  4|
          # +---+---+
          # +---+---+
          # | C1| C2|
          # +---+---+
          # |  c|  4|
          # +---+---+

          # DataFrame.intersectAll
          # 返回兩個DataFrame的交集
          df1 = spark.createDataFrame(
                  [("a"1), ("a"1), ("b",  3), ("c"4)], ["C1""C2"])
          df2 = spark.createDataFrame([("a"1), ("b"4)], ["C1""C2"])
          df1.intersectAll(df2).show()
          # +---+---+
          # | C1| C2|
          # +---+---+
          # |  a|  1|
          # +---+---+

          # DataFrame.drop
          # 丟棄指定列
          df.drop('age').show()

          # DataFrame.withColumn
          # 新增列
          df1 = df.withColumn("birth_year"2021 - df.age)
          df1.show()
          # +-----+---+-----+----+----------+
          # | name|age|score| sex|birth_year|
          # +-----+---+-----+----+----------+
          # |  Sam| 28|   88|   M|      1993|
          # |Flora| 28|   90|   F|      1993|
          # |  Run|  1|   60|null|      2020|
          # |Peter| 55|  100|   M|      1966|
          # |  Mei| 54|   95|   F|      1967|
          # +-----+---+-----+----+----------+

          # DataFrame.withColumnRenamed
          # 重命名列名
          df1 = df.withColumnRenamed("sex""gender")
          df1.show()
          # +-----+---+-----+------+
          # | name|age|score|gender|
          # +-----+---+-----+------+
          # |  Sam| 28|   88|     M|
          # |Flora| 28|   90|     F|
          # |  Run|  1|   60|  null|
          # |Peter| 55|  100|     M|
          # |  Mei| 54|   95|     F|
          # +-----+---+-----+------+


          # DataFrame.dropna
          # 丟棄空值,DataFrame.dropna(how='any', thresh=None, subset=None)
          df.dropna(how='all', subset=['sex']).show()
          # +-----+---+-----+---+
          # | name|age|score|sex|
          # +-----+---+-----+---+
          # |  Sam| 28|   88|  M|
          # |Flora| 28|   90|  F|
          # |Peter| 55|  100|  M|
          # |  Mei| 54|   95|  F|
          # +-----+---+-----+---+

          # DataFrame.fillna
          # 空值填充操作
          df1 = spark.createDataFrame(
                  [("a"None), ("a"1), (None,  3), ("c"4)], ["C1""C2"])
          # df2 = df1.na.fill({"C1": "d", "C2": 99})
          df2 = df1.fillna({"C1""d""C2"99})
          df1.show()
          df2.show()

          # DataFrame.filter
          # 根據(jù)條件過濾
          df.filter(df.age>50).show()
          # +-----+---+-----+---+
          # | name|age|score|sex|
          # +-----+---+-----+---+
          # |Peter| 55|  100|  M|
          # |  Mei| 54|   95|  F|
          # +-----+---+-----+---+
          df.where(df.age==28).show()
          # +-----+---+-----+---+
          # | name|age|score|sex|
          # +-----+---+-----+---+
          # |  Sam| 28|   88|  M|
          # |Flora| 28|   90|  F|
          # +-----+---+-----+---+
          df.filter("age<18").show()
          # +----+---+-----+----+
          # |name|age|score| sex|
          # +----+---+-----+----+
          # | Run|  1|   60|null|
          # +----+---+-----+----+


          # DataFrame.join
          # 這個不用多解釋了,直接上案例來看看具體的語法即可,DataFrame.join(other, on=None, how=None)
          df1 = spark.createDataFrame(
                  [("a"1), ("d"1), ("b",  3), ("c"4)], ["id""num1"])
          df2 = spark.createDataFrame([("a"1), ("b"3)], ["id""num2"])
          df1.join(df2, df1.id == df2.id, 'left').select(df1.id.alias("df1_id"),
                                                         df1.num1.alias("df1_num"),
                                                         df2.num2.alias("df2_num")
                                                         ).sort(["df1_id"], ascending=False)\
              .show()


          # DataFrame.agg(*exprs)
          # 聚合數(shù)據(jù),可以寫多個聚合方法,如果不寫groupBy的話就是對整個DF進行聚合
          # DataFrame.alias
          # 設(shè)置列或者DataFrame別名
          # DataFrame.groupBy
          # 根據(jù)某幾列進行聚合,如有多列用列表寫在一起,如 df.groupBy(["sex", "age"])
          df.groupBy("sex").agg(F.min(df.age).alias("最小年齡"),
                                F.expr("avg(age)").alias("平均年齡"),
                                F.expr("collect_list(name)").alias("姓名集合")
                                ).show()
          # +----+--------+--------+------------+
          # | sex|最小年齡|平均年齡|    姓名集合|
          # +----+--------+--------+------------+
          # |   F|      28|    41.0|[Flora, Mei]|
          # |null|       1|     1.0|       [Run]|
          # |   M|      28|    41.5|[Sam, Peter]|
          # +----+--------+--------+------------+


          # DataFrame.foreach
          # 對每一行進行函數(shù)方法的應(yīng)用
          def f(person):
              print(person.name)
          df.foreach(f)
          # Peter
          # Run
          # Sam
          # Flora
          # Mei

          # DataFrame.replace
          # 修改df里的某些值
          df1 = df.na.replace({"M""Male""F""Female"})
          df1.show()

          # DataFrame.union
          # 相當(dāng)于SQL里的union all操作
          df1 = spark.createDataFrame(
                  [("a"1), ("d"1), ("b",  3), ("c"4)], ["id""num"])
          df2 = spark.createDataFrame([("a"1), ("b"3)], ["id""num"])
          df1.union(df2).show()
          df1.unionAll(df2).show()
          # 這里union沒有去重,不知道為啥,有知道的朋友麻煩解釋下,謝謝了。
          # +---+---+
          # | id|num|
          # +---+---+
          # |  a|  1|
          # |  d|  1|
          # |  b|  3|
          # |  c|  4|
          # |  a|  1|
          # |  b|  3|
          # +---+---+

          # DataFrame.unionByName
          # 根據(jù)列名來進行合并數(shù)據(jù)集
          df1 = spark.createDataFrame([[123]], ["col0""col1""col2"])
          df2 = spark.createDataFrame([[456]], ["col1""col2""col0"])
          df1.unionByName(df2).show()
          # +----+----+----+
          # |col0|col1|col2|
          # +----+----+----+
          # |   1|   2|   3|
          # |   6|   4|   5|
          # +----+----+----+
          3. DataFrame的列操作APIs

          這里主要針對的是列進行操作,比如說重命名、排序、空值判斷、類型判斷等,這里就不展開寫demo了,看看語法應(yīng)該大家都懂了。

          Column.alias(*alias, **kwargs)  # 重命名列名
          Column.asc()  # 按照列進行升序排序
          Column.desc()  # 按照列進行降序排序
          Column.astype(dataType)  # 類型轉(zhuǎn)換
          Column.cast(dataType)  # 強制轉(zhuǎn)換類型
          Column.between(lowerBound, upperBound)  # 返回布爾值,是否在指定區(qū)間范圍內(nèi)
          Column.contains(other)  # 是否包含某個關(guān)鍵詞
          Column.endswith(other)  # 以什么結(jié)束的值,如 df.filter(df.name.endswith('ice')).collect()
          Column.isNotNull()  # 篩選非空的行
          Column.isNull()
          Column.isin(*cols)  # 返回包含某些值的行 df[df.name.isin("Bob", "Mike")].collect()
          Column.like(other)  # 返回含有關(guān)鍵詞的行
          Column.when(condition, value)  # 給True的賦值
          Column.otherwise(value)  # 與when搭配使用,df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()
          Column.rlike(other)  # 可以使用正則的匹配 df.filter(df.name.rlike('ice$')).collect()
          Column.startswith(other)  # df.filter(df.name.startswith('Al')).collect()
          Column.substr(startPos, length)  # df.select(df.name.substr(1, 3).alias("col")).collect()
          4. DataFrame的一些思路變換操作APIs
          # DataFrame.createOrReplaceGlobalTempView
          # DataFrame.dropGlobalTempView
          # 創(chuàng)建全局的試圖,注冊后可以使用sql語句來進行操作,生命周期取決于Spark application本身
          df.createOrReplaceGlobalTempView("people")
          spark.sql("select * from global_temp.people where sex = 'M' ").show()
          # +-----+---+-----+---+
          # | name|age|score|sex|
          # +-----+---+-----+---+
          # |  Sam| 28|   88|  M|
          # |Peter| 55|  100|  M|
          # +-----+---+-----+---+

          # DataFrame.createOrReplaceTempView
          # DataFrame.dropTempView
          # 創(chuàng)建本地臨時試圖,生命周期取決于用來創(chuàng)建此數(shù)據(jù)集的SparkSession
          df.createOrReplaceTempView("tmp_people")
          spark.sql("select * from tmp_people where sex = 'F' ").show()
          # +-----+---+-----+---+
          # | name|age|score|sex|
          # +-----+---+-----+---+
          # |Flora| 28|   90|  F|
          # |  Mei| 54|   95|  F|
          # +-----+---+-----+---+

          # DataFrame.cache\DataFrame.persist
          # 可以把一些數(shù)據(jù)放入緩存中,default storage level (MEMORY_AND_DISK).
          df.cache()
          df.persist()
          df.unpersist()

          # DataFrame.crossJoin
          # 返回兩個DataFrame的笛卡爾積關(guān)聯(lián)的DataFrame
          df1 = df.select("name""sex")
          df2 = df.select("name""sex")
          df3 = df1.crossJoin(df2)
          print("表1的記錄數(shù)", df1.count())
          print("表2的記錄數(shù)", df2.count())
          print("笛卡爾積后的記錄數(shù)", df3.count())
          # 表1的記錄數(shù) 5
          # 表2的記錄數(shù) 5
          # 笛卡爾積后的記錄數(shù) 25

          # DataFrame.toPandas
          # 把SparkDataFrame轉(zhuǎn)為 Pandas的DataFrame
          df.toPandas()

          # DataFrame.rdd
          # 把SparkDataFrame轉(zhuǎn)為rdd,這樣子可以用rdd的語法來操作數(shù)據(jù)
          df.rdd
          5. DataFrame的一些統(tǒng)計操作APIs
          # DataFrame.cov
          # 計算指定兩列的樣本協(xié)方差
          df.cov("age""score")
          # 324.59999999999997

          # DataFrame.corr
          # 計算指定兩列的相關(guān)系數(shù),DataFrame.corr(col1, col2, method=None),目前method只支持Pearson相關(guān)系數(shù)
          df.corr("age""score", method="pearson")
          # 0.9319004030498815

          # DataFrame.cube
          # 創(chuàng)建多維度聚合的結(jié)果,通常用于分析數(shù)據(jù),比如我們指定兩個列進行聚合,比如name和age,那么這個函數(shù)返回的聚合結(jié)果會
          # groupby("name", "age")
          # groupby("name")
          # groupby("age")
          # groupby(all)
          # 四個聚合結(jié)果的union all 的結(jié)果

          df1 = df.filter(df.name != "Run")
          print(df1.show())
          df1.cube("name""sex").count().show()
          # +-----+---+-----+---+
          # | name|age|score|sex|
          # +-----+---+-----+---+
          # |  Sam| 28|   88|  M|
          # |Flora| 28|   90|  F|
          # |Peter| 55|  100|  M|
          # |  Mei| 54|   95|  F|
          # +-----+---+-----+---+
          # cube 聚合之后的結(jié)果
          # +-----+----+-----+
          # | name| sex|count|
          # +-----+----+-----+
          # | null|   F|    2|
          # | null|null|    4|
          # |Flora|null|    1|
          # |Peter|null|    1|
          # | null|   M|    2|
          # |Peter|   M|    1|
          # |  Sam|   M|    1|
          # |  Sam|null|    1|
          # |  Mei|   F|    1|
          # |  Mei|null|    1|
          # |Flora|   F|    1|
          # +-----+----+-----+

          保存數(shù)據(jù)/寫入數(shù)據(jù)庫

          這里的保存數(shù)據(jù)主要是保存到Hive中的栗子,主要包括了overwrite、append等方式。

          1. 當(dāng)結(jié)果集為SparkDataFrame的時候
          import pandas as pd
          from datetime import datetime
          from pyspark import SparkConf
          from pyspark import SparkContext
          from pyspark.sql import HiveContext

          conf = SparkConf()\
                .setAppName("test")\
                .set("hive.exec.dynamic.partition.mode""nonstrict"# 動態(tài)寫入hive分區(qū)表
          sc = SparkContext(conf=conf)
          hc = HiveContext(sc)
          sc.setLogLevel("ERROR")
              
          list_values = [['Sam'2888], ['Flora'2890], ['Run'160]]
          Spark_df = spark.createDataFrame(list_values, ['name''age''score'])
          print(Spark_df.show())
          save_table = "tmp.samshare_pyspark_savedata"

          # 方式1:直接寫入到Hive
          Spark_df.write.format("hive").mode("overwrite").saveAsTable(save_table) # 或者改成append模式
          print(datetime.now().strftime("%y/%m/%d %H:%M:%S"), "測試數(shù)據(jù)寫入到表" + save_table)

          # 方式2:注冊為臨時表,使用SparkSQL來寫入分區(qū)表
          Spark_df.createOrReplaceTempView("tmp_table")
          write_sql = """
          insert overwrite table {0} partitions (pt_date='{1}')
          select * from tmp_table
          """
          .format(save_table, "20210520")
          hc.sql(write_sql)
          print(datetime.now().strftime("%y/%m/%d %H:%M:%S"), "測試數(shù)據(jù)寫入到表" + save_table)
          2. 當(dāng)結(jié)果集為Python的DataFrame的時候

          如果是Python的DataFrame,我們就需要多做一步把它轉(zhuǎn)換為SparkDataFrame,其余操作就一樣了。

          import pandas as pd
          from datetime import datetime
          from pyspark import SparkConf
          from pyspark import SparkContext
          from pyspark.sql import HiveContext

          conf = SparkConf()\
                .setAppName("test")\
                .set("hive.exec.dynamic.partition.mode""nonstrict"# 動態(tài)寫入hive分區(qū)表
          sc = SparkContext(conf=conf)
          hc = HiveContext(sc)
          sc.setLogLevel("ERROR")
              
          result_df = pd.DataFrame([1,2,3], columns=['a'])
          save_table = "tmp.samshare_pyspark_savedata"

          # 獲取DataFrame的schema
          c1 = list(result_df.columns)
          # 轉(zhuǎn)為SparkDataFrame
          result = hc.createDataFrame(result_df.astype(str), c1)
          result.write.format("hive").mode("overwrite").saveAsTable(save_table) # 或者改成append模式
          print(datetime.now().strftime("%y/%m/%d %H:%M:%S"), "測試數(shù)據(jù)寫入到表" + save_table)

          ?? Spark調(diào)優(yōu)思路

          這一小節(jié)的內(nèi)容算是對pyspark入門的一個ending了,全文主要是參考學(xué)習(xí)了美團Spark性能優(yōu)化指南的基礎(chǔ)篇和高級篇內(nèi)容,主體脈絡(luò)和這兩篇文章是一樣的,只不過是基于自己學(xué)習(xí)后的理解進行了一次總結(jié)復(fù)盤,而原文中主要是用Java來舉例的,我這邊主要用pyspark來舉例。文章主要會從4個方面(或者說4個思路)來優(yōu)化我們的Spark任務(wù),主要就是下面的圖片所示:

          開發(fā)習(xí)慣調(diào)優(yōu)

          1. 盡可能復(fù)用同一個RDD,避免重復(fù)創(chuàng)建,并且適當(dāng)持久化數(shù)據(jù)

          這種開發(fā)習(xí)慣是需要我們對于即將要開發(fā)的應(yīng)用邏輯有比較深刻的思考,并且可以通過code review來發(fā)現(xiàn)的,講白了就是要記得我們創(chuàng)建過啥數(shù)據(jù)集,可以復(fù)用的盡量廣播(broadcast)下,能很好提升性能。

          # 最低級寫法,相同數(shù)據(jù)集重復(fù)創(chuàng)建。
          rdd1 = sc.textFile("./test/data/hello_samshare.txt"4# 這里的 4 指的是分區(qū)數(shù)量
          rdd2 = sc.textFile("./test/data/hello_samshare.txt"4# 這里的 4 指的是分區(qū)數(shù)量
          print(rdd1.take(10))
          print(rdd2.map(lambda x:x[0:1]).take(10))

          # 稍微進階一些,復(fù)用相同數(shù)據(jù)集,但因中間結(jié)果沒有緩存,數(shù)據(jù)會重復(fù)計算
          rdd1 = sc.textFile("./test/data/hello_samshare.txt"4# 這里的 4 指的是分區(qū)數(shù)量
          print(rdd1.take(10))
          print(rdd1.map(lambda x:x[0:1]).take(10))

          # 相對比較高效,使用緩存來持久化數(shù)據(jù)
          rdd = sc.parallelize(range(111), 4).cache()  # 或者persist()
          rdd_map = rdd.map(lambda x: x*2)
          rdd_reduce = rdd.reduce(lambda x, y: x+y)
          print(rdd_map.take(10))
          print(rdd_reduce)

          下面我們就來對比一下使用緩存能給我們的Spark程序帶來多大的效率提升吧,我們先構(gòu)造一個程序運行時長測量器。

          import time
          # 統(tǒng)計程序運行時間
          def time_me(info="used"):
              def _time_me(fn):
                  @functools.wraps(fn)
                  def _wrapper(*args, **kwargs):
                      start = time.time()
                      fn(*args, **kwargs)
                      print("%s %s %s" % (fn.__name__, info, time.time() - start), "second")
                  return _wrapper
              return _time_me

          下面我們運行下面的代碼,看下使用了cache帶來的效率提升:

          @time_me()
          def test(types=0):
              if types == 1:
                  print("使用持久化緩存")
                  rdd = sc.parallelize(range(110000000), 4)
                  rdd1 = rdd.map(lambda x: x*x + 2*x + 1).cache()  # 或者 persist(StorageLevel.MEMORY_AND_DISK_SER)
                  print(rdd1.take(10))
                  rdd2 = rdd1.reduce(lambda x, y: x+y)
                  rdd3 = rdd1.reduce(lambda x, y: x + y)
                  rdd4 = rdd1.reduce(lambda x, y: x + y)
                  rdd5 = rdd1.reduce(lambda x, y: x + y)
                  print(rdd5)
              else:
                  print("不使用持久化緩存")
                  rdd = sc.parallelize(range(110000000), 4)
                  rdd1 = rdd.map(lambda x: x * x + 2 * x + 1)
                  print(rdd1.take(10))
                  rdd2 = rdd1.reduce(lambda x, y: x + y)
                  rdd3 = rdd1.reduce(lambda x, y: x + y)
                  rdd4 = rdd1.reduce(lambda x, y: x + y)
                  rdd5 = rdd1.reduce(lambda x, y: x + y)
                  print(rdd5)

                  
          test()   # 不使用持久化緩存
          time.sleep(10)
          test(1)  # 使用持久化緩存
          # output:
          # 使用持久化緩存
          # [4, 9, 16, 25, 36, 49, 64, 81, 100, 121]
          # 333333383333334999999
          # test used 26.36529278755188 second
          # 使用持久化緩存
          # [4, 9, 16, 25, 36, 49, 64, 81, 100, 121]
          # 333333383333334999999
          # test used 17.49532413482666 second

          同時我們打開YARN日志來看看:http://localhost:4040/jobs/

          因為我們的代碼是需要重復(fù)調(diào)用RDD1的,當(dāng)沒有對RDD1進行持久化的時候,每次當(dāng)它被action算子消費了之后,就釋放了,等下一個算子計算的時候要用,就從頭開始計算一下RDD1。代碼中需要重復(fù)調(diào)用RDD1 五次,所以沒有緩存的話,差不多每次都要6秒,總共需要耗時26秒左右,但是,做了緩存,每次就只需要3s不到,總共需要耗時17秒左右。

          另外,這里需要提及一下一個知識點,那就是持久化的級別,一般cache的話就是放入內(nèi)存中,就沒有什么好說的,需要講一下的就是另外一個 persist(),它的持久化級別是可以被我們所配置的:

          持久化級別含義解釋
          MEMORY_ONLY將數(shù)據(jù)保存在內(nèi)存中。如果內(nèi)存不夠存放所有的數(shù)據(jù),則數(shù)據(jù)可能就不會進行持久化。使用cache()方法時,實際就是使用的這種持久化策略,性能也是最高的。
          MEMORY_AND_DISK優(yōu)先嘗試將數(shù)據(jù)保存在內(nèi)存中,如果內(nèi)存不夠存放所有的數(shù)據(jù),會將數(shù)據(jù)寫入磁盤文件中。
          MEMORY_ONLY_SER基本含義同MEMORY_ONLY。唯一的區(qū)別是,會將RDD中的數(shù)據(jù)進行序列化,RDD的每個partition會被序列化成一個字節(jié)數(shù)組。這種方式更加節(jié)省內(nèi)存,從而可以避免持久化的數(shù)據(jù)占用過多內(nèi)存導(dǎo)致頻繁GC。
          MEMORY_AND_DISK_SER基本含義同MEMORY_AND_DISK。唯一的區(qū)別是會先序列化,節(jié)約內(nèi)存。
          DISK_ONLY使用未序列化的Java對象格式,將數(shù)據(jù)全部寫入磁盤文件中。一般不推薦使用。
          MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等.對于上述任意一種持久化策略,如果加上后綴_2,代表的是將每個持久化的數(shù)據(jù),都復(fù)制一份副本,并將副本保存到其他節(jié)點上。這種基于副本的持久化機制主要用于進行容錯。假如某個節(jié)點掛掉,節(jié)點的內(nèi)存或磁盤中的持久化數(shù)據(jù)丟失了,那么后續(xù)對RDD計算時還可以使用該數(shù)據(jù)在其他節(jié)點上的副本。如果沒有副本的話,就只能將這些數(shù)據(jù)從源頭處重新計算一遍了。一般也不推薦使用。
          2. 盡量避免使用低性能算子

          shuffle類算子算是低性能算子的一種代表,所謂的shuffle類算子,指的是會產(chǎn)生shuffle過程的操作,就是需要把各個節(jié)點上的相同key寫入到本地磁盤文件中,然后其他的節(jié)點通過網(wǎng)絡(luò)傳輸拉取自己需要的key,把相同key拉到同一個節(jié)點上進行聚合計算,這種操作必然就是有大量的數(shù)據(jù)網(wǎng)絡(luò)傳輸與磁盤讀寫操作,性能往往不是很好的。

          那么,Spark中有哪些算子會產(chǎn)生shuffle過程呢?

          操作類別shuffle類算子備注
          分區(qū)操作repartition()、repartitionAndSortWithinPartitions()、coalesce(shuffle=true)重分區(qū)操作一般都會shuffle,因為需要對所有的分區(qū)數(shù)據(jù)進行打亂。
          聚合操作reduceByKey、groupByKey、sortByKey需要對相同key進行操作,所以需要拉到同一個節(jié)點上。
          關(guān)聯(lián)操作join類操作需要把相同key的數(shù)據(jù)shuffle到同一個節(jié)點然后進行笛卡爾積
          去重操作distinct等需要對相同key進行操作,所以需要shuffle到同一個節(jié)點上。
          排序操作sortByKey等需要對相同key進行操作,所以需要shuffle到同一個節(jié)點上。

          這里進一步介紹一個替代join的方案,因為join其實在業(yè)務(wù)中還是蠻常見的。

          # 原則2:盡量避免使用低性能算子
          rdd1 = sc.parallelize([('A1'211), ('A1'212), ('A2'22), ('A4'24), ('A5'25)])
          rdd2 = sc.parallelize([('A1'11), ('A2'12), ('A3'13), ('A4'14)])
          # 低效的寫法,也是傳統(tǒng)的寫法,直接join
          rdd_join = rdd1.join(rdd2)
          print(rdd_join.collect())
          # [('A4', (24, 14)), ('A2', (22, 12)), ('A1', (211, 11)), ('A1', (212, 11))]
          rdd_left_join = rdd1.leftOuterJoin(rdd2)
          print(rdd_left_join.collect())
          # [('A4', (24, 14)), ('A2', (22, 12)), ('A5', (25, None)), ('A1', (211, 11)), ('A1', (212, 11))]
          rdd_full_join = rdd1.fullOuterJoin(rdd2)
          print(rdd_full_join.collect())
          # [('A4', (24, 14)), ('A3', (None, 13)), ('A2', (22, 12)), ('A5', (25, None)), ('A1', (211, 11)), ('A1', (212, 11))]

          # 高效的寫法,使用廣播+map來實現(xiàn)相同效果
          # tips1: 這里需要注意的是,用來broadcast的RDD不可以太大,最好不要超過1G
          # tips2: 這里需要注意的是,用來broadcast的RDD不可以有重復(fù)的key的
          rdd1 = sc.parallelize([('A1'11), ('A2'12), ('A3'13), ('A4'14)])
          rdd2 = sc.parallelize([('A1'211), ('A1'212), ('A2'22), ('A4'24), ('A5'25)])

          # step1: 先將小表進行廣播,也就是collect到driver端,然后廣播到每個Executor中去。
          rdd_small_bc = sc.broadcast(rdd1.collect())

          # step2:從Executor中獲取存入字典便于后續(xù)map操作
          rdd_small_dict = dict(rdd_small_bc.value)

          # step3:定義join方法
          def broadcast_join(line, rdd_small_dict, join_type):
              k = line[0]
              v = line[1]
              small_table_v = rdd_small_dict[k] if k in rdd_small_dict else None
              if join_type == 'join':
                  return (k, (v, small_table_v)) if k in rdd_small_dict else None
              elif join_type == 'left_join':
                  return (k, (v, small_table_v if small_table_v is not None else None))
              else:
                  print("not support join type!")

          # step4:使用 map 實現(xiàn) 兩個表join的功能
          rdd_join = rdd2.map(lambda line: broadcast_join(line, rdd_small_dict, "join")).filter(lambda line: line is not None)
          rdd_left_join = rdd2.map(lambda line: broadcast_join(line, rdd_small_dict, "left_join")).filter(lambda line: line is not None)
          print(rdd_join.collect())
          print(rdd_left_join.collect())
          # [('A1', (211, 11)), ('A1', (212, 11)), ('A2', (22, 12)), ('A4', (24, 14))]
          # [('A1', (211, 11)), ('A1', (212, 11)), ('A2', (22, 12)), ('A4', (24, 14)), ('A5', (25, None))]

          上面的RDD join被改寫為 broadcast+map的PySpark版本實現(xiàn),不過里面有兩個點需要注意:

          • tips1: 用來broadcast的RDD不可以太大,最好不要超過1G
          • tips2: 用來broadcast的RDD不可以有重復(fù)的key
          3. 盡量使用高性能算子

          上一節(jié)講到了低效算法,自然地就會有一些高效的算子。

          原算子高效算子(替換算子)說明
          mapmapPartitions直接map的話,每次只會處理一條數(shù)據(jù),而mapPartitions則是每次處理一個分區(qū)的數(shù)據(jù),在某些場景下相對比較高效。(分區(qū)數(shù)據(jù)量不大的情況下使用,如果有數(shù)據(jù)傾斜的話容易發(fā)生OOM)
          groupByKeyreduceByKey/aggregateByKey這類算子會在原節(jié)點先map-side預(yù)聚合,相對高效些。
          foreachforeachPartitions同第一條記錄一樣。
          filterfilter+coalesce當(dāng)我們對數(shù)據(jù)進行filter之后,有很多partition的數(shù)據(jù)會劇減,然后直接進行下一步操作的話,可能就partition數(shù)量很多但處理的數(shù)據(jù)又很少,task數(shù)量沒有減少,反而整體速度很慢;但如果執(zhí)行了coalesce算子,就會減少一些partition數(shù)量,把數(shù)據(jù)都相對壓縮到一起,用更少的task處理完全部數(shù)據(jù),一定場景下還是可以達到整體性能的提升。
          repartition+sortrepartitionAndSortWithinPartitions直接用就是了。
          4. 廣播大變量

          如果我們有一個數(shù)據(jù)集很大,并且在后續(xù)的算子執(zhí)行中會被反復(fù)調(diào)用,那么就建議直接把它廣播(broadcast)一下。當(dāng)變量被廣播后,會保證每個executor的內(nèi)存中只會保留一份副本,同個executor內(nèi)的task都可以共享這個副本數(shù)據(jù)。如果沒有廣播,常規(guī)過程就是把大變量進行網(wǎng)絡(luò)傳輸?shù)矫恳粋€相關(guān)task中去,這樣子做,一來頻繁的網(wǎng)絡(luò)數(shù)據(jù)傳輸,效率極其低下;二來executor下的task不斷存儲同一份大數(shù)據(jù),很有可能就造成了內(nèi)存溢出或者頻繁GC,效率也是極其低下的。

          # 原則4:廣播大變量
          rdd1 = sc.parallelize([('A1'11), ('A2'12), ('A3'13), ('A4'14)])
          rdd1_broadcast = sc.broadcast(rdd1.collect())
          print(rdd1.collect())
          print(rdd1_broadcast.value)
          # [('A1', 11), ('A2', 12), ('A3', 13), ('A4', 14)]
          # [('A1', 11), ('A2', 12), ('A3', 13), ('A4', 14)]

          資源參數(shù)調(diào)優(yōu)

          如果要進行資源調(diào)優(yōu),我們就必須先知道Spark運行的機制與流程。

          下面我們就來講解一些常用的Spark資源配置的參數(shù)吧,了解其參數(shù)原理便于我們依據(jù)實際的數(shù)據(jù)情況進行配置。

          1)num-executors

          指的是執(zhí)行器的數(shù)量,數(shù)量的多少代表了并行的stage數(shù)量(假如executor是單核的話),但也并不是越多越快,受你集群資源的限制,所以一般設(shè)置50-100左右吧。

          2)executor-memory

          這里指的是每一個執(zhí)行器的內(nèi)存大小,內(nèi)存越大當(dāng)然對于程序運行是很好的了,但是也不是無節(jié)制地大下去,同樣受我們集群資源的限制。假設(shè)我們集群資源為500core,一般1core配置4G內(nèi)存,所以集群最大的內(nèi)存資源只有2000G左右。num-executors x executor-memory 是不能超過2000G的,但是也不要太接近這個值,不然的話集群其他同事就沒法正常跑數(shù)據(jù)了,一般我們設(shè)置4G-8G。

          3)executor-cores

          這里設(shè)置的是executor的CPU core數(shù)量,決定了executor進程并行處理task的能力。

          4)driver-memory

          設(shè)置driver的內(nèi)存,一般設(shè)置2G就好了。但如果想要做一些Python的DataFrame操作可以適當(dāng)?shù)匕堰@個值設(shè)大一些。

          5)driver-cores

          與executor-cores類似的功能。

          6)spark.default.parallelism

          設(shè)置每個stage的task數(shù)量。一般Spark任務(wù)我們設(shè)置task數(shù)量在500-1000左右比較合適,如果不去設(shè)置的話,Spark會根據(jù)底層HDFS的block數(shù)量來自行設(shè)置task數(shù)量。有的時候會設(shè)置得偏少,這樣子程序就會跑得很慢,即便你設(shè)置了很多的executor,但也沒有用。

          下面說一個基本的參數(shù)設(shè)置的shell腳本,一般我們都是通過一個shell腳本來設(shè)置資源參數(shù)配置,接著就去調(diào)用我們的主函數(shù)。

          #!/bin/bash
          basePath=$(cd "$(dirname )"$(cd "$(dirname "$0"): pwd)")": pwd)

          spark-submit \
              --master yarn \
              --queue samshare \
              --deploy-mode client \
              --num-executors 100 \
              --executor-memory 4G \
              --executor-cores 4 \
              --driver-memory 2G \
              --driver-cores 2 \
              --conf spark.default.parallelism=1000 \
              --conf spark.yarn.executor.memoryOverhead=8G \
              --conf spark.sql.shuffle.partitions=1000 \
              --conf spark.network.timeout=1200 \
              --conf spark.python.worker.memory=64m \
              --conf spark.sql.catalogImplementation=hive \
              --conf spark.sql.crossJoin.enabled=True \
              --conf spark.dynamicAllocation.enabled=True \
              --conf spark.shuffle.service.enabled=True \
              --conf spark.scheduler.listenerbus.eventqueue.size=100000 \
              --conf spark.pyspark.driver.python=python3 \
              --conf spark.pyspark.python=python3 \
              --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3 \
              --conf spark.sql.pivotMaxValues=500000 \
              --conf spark.hadoop.hive.exec.dynamic.partition=True \
              --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \
              --conf spark.hadoop.hive.exec.max.dynamic.partitions.pernode=100000 \
              --conf spark.hadoop.hive.exec.max.dynamic.partitions=100000 \
              --conf spark.hadoop.hive.exec.max.created.files=100000 \
              ${bashPath}/project_name/main.py $v_var1 $v_var2

          數(shù)據(jù)傾斜調(diào)優(yōu)

          相信我們對于數(shù)據(jù)傾斜并不陌生了,很多時間數(shù)據(jù)跑不出來有很大的概率就是出現(xiàn)了數(shù)據(jù)傾斜,在Spark開發(fā)中無法避免的也會遇到這類問題,而這不是一個嶄新的問題,成熟的解決方案也是有蠻多的,今天來簡單介紹一些比較常用并且有效的方案。

          首先我們要知道,在Spark中比較容易出現(xiàn)傾斜的操作,主要集中在distinct、groupByKey、reduceByKey、aggregateByKey、join、repartition等,可以優(yōu)先看這些操作的前后代碼。而為什么使用了這些操作就容易導(dǎo)致數(shù)據(jù)傾斜呢?大多數(shù)情況就是進行操作的key分布不均,然后使得大量的數(shù)據(jù)集中在同一個處理節(jié)點上,從而發(fā)生了數(shù)據(jù)傾斜。

          查看Key 分布
          # 針對Spark SQL
          hc.sql("select key, count(0) nums from table_name group by key")

          # 針對RDD
          RDD.countByKey()
          Plan A: 過濾掉導(dǎo)致傾斜的key

          這個方案并不是所有場景都可以使用的,需要結(jié)合業(yè)務(wù)邏輯來分析這個key到底還需要不需要,大多數(shù)情況可能就是一些異常值或者空串,這種就直接進行過濾就好了。

          Plan B: 提前處理聚合

          如果有些Spark應(yīng)用場景需要頻繁聚合數(shù)據(jù),而數(shù)據(jù)key又少的,那么我們可以把這些存量數(shù)據(jù)先用hive算好(每天算一次),然后落到中間表,后續(xù)Spark應(yīng)用直接用聚合好的表+新的數(shù)據(jù)進行二度聚合,效率會有很高的提升。

          Plan C: 調(diào)高shuffle并行度
          # 針對Spark SQL 
          --conf spark.sql.shuffle.partitions=1000  # 在配置信息中設(shè)置參數(shù)
          # 針對RDD
          rdd.reduceByKey(1000# 默認是200
          Plan D: 分配隨機數(shù)再聚合

          大概的思路就是對一些大量出現(xiàn)的key,人工打散,從而可以利用多個task來增加任務(wù)并行度,以達到效率提升的目的,下面是代碼demo,分別從RDD 和 SparkSQL來實現(xiàn)。

          # Way1: PySpark RDD實現(xiàn)
          import pyspark
          from pyspark import SparkContext, SparkConf, HiveContext
          from random import randint
          import pandas as pd

          # SparkSQL的許多功能封裝在SparkSession的方法接口中, SparkContext則不行的。
          from pyspark.sql import SparkSession
          spark = SparkSession.builder \
              .appName("sam_SamShare") \
              .config("master""local[4]") \
              .enableHiveSupport() \
              .getOrCreate()

          conf = SparkConf().setAppName("test_SamShare").setMaster("local[4]")
          sc = SparkContext(conf=conf)
          hc = HiveContext(sc)

          # 分配隨機數(shù)再聚合
          rdd1 = sc.parallelize([('sam'1), ('sam'1), ('sam'1), ('sam'1), ('sam'1), ('sam'1)])

          # 給key分配隨機數(shù)后綴
          rdd2 = rdd1.map(lambda x: (x[0] + "_" + str(randint(1,5)), x[1]))
          print(rdd.take(10))
          # [('sam_5', 1), ('sam_5', 1), ('sam_3', 1), ('sam_5', 1), ('sam_5', 1), ('sam_3', 1)]

          # 局部聚合
          rdd3 = rdd2.reduceByKey(lambda x,y : (x+y))
          print(rdd3.take(10))
          # [('sam_5', 4), ('sam_3', 2)]

          # 去除后綴
          rdd4 = rdd3.map(lambda x: (x[0][:-2], x[1]))
          print(rdd4.take(10))
          # [('sam', 4), ('sam', 2)]

          # 全局聚合
          rdd5 = rdd4.reduceByKey(lambda x,y : (x+y))
          print(rdd5.take(10))
          # [('sam', 6)]


          # Way2: PySpark SparkSQL實現(xiàn)
          df = pd.DataFrame(5*[['Sam'1],['Flora'1]],
                            columns=['name''nums'])
          Spark_df = spark.createDataFrame(df)
          print(Spark_df.show(10))

          Spark_df.createOrReplaceTempView("tmp_table"# 注冊為視圖供SparkSQl使用

          sql = """
          with t1 as (
              select concat(name,"_",int(10*rand())) as new_name, name, nums
              from tmp_table
          ),
          t2 as (
              select new_name, sum(nums) as n
              from t1
              group by new_name
          ),
          t3 as (
              select substr(new_name,0,length(new_name) -2) as name, sum(n) as nums_sum 
              from t2
              group by substr(new_name,0,length(new_name) -2)
          )
          select *
          from t3
          """

          tt = hc.sql(sql).toPandas()
          tt

          下面是原理圖。


          全文終!

          往期精彩回顧




          本站qq群851320808,加入微信群請掃碼:
          瀏覽 57
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲无码不卡 | 六月婷婷综合 | 淫色视频网站 | 欧美怡春院 | 西西西444www无码视 |