<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          PySpark入門級學(xué)習(xí)教程,框架思維(上)

          共 11811字,需瀏覽 24分鐘

           ·

          2021-04-12 23:01


          為什么要學(xué)習(xí)Spark?作為數(shù)據(jù)從業(yè)者多年,個人覺得Spark已經(jīng)越來越走進我們的日常工作了,無論是使用哪種編程語言,Python、Scala還是Java,都會或多或少接觸到Spark,它可以讓我們能夠用到集群的力量,可以對BigData進行高效操作,實現(xiàn)很多之前由于計算資源而無法輕易實現(xiàn)的東西。網(wǎng)上有很多關(guān)于Spark的好處,這里就不做過多的贅述,我們直接進入這篇文章的正文!

          關(guān)于PySpark,我們知道它是Python調(diào)用Spark的接口,我們可以通過調(diào)用Python API的方式來編寫Spark程序,它支持了大多數(shù)的Spark功能,比如SparkDataFrame、Spark SQL、Streaming、MLlib等等。只要我們了解Python的基本語法,那么在Python里調(diào)用Spark的力量就顯得十分easy了。下面我將會從相對宏觀的層面介紹一下PySpark,讓我們對于這個神器有一個框架性的認識,知道它能干什么,知道去哪里尋找問題解答,爭取看完這篇文章可以讓我們更加絲滑地入門PySpark。話不多說,馬上開始!

          ?? 目錄:

          • 安裝指引
          • 基礎(chǔ)概念
          • 常用函數(shù)
          • Sparksql使用
          • 調(diào)優(yōu)思路
          • 學(xué)習(xí)資源推薦

          ?? 安裝指引:

          安裝這塊本文就不展開具體的步驟了,畢竟大家的機子環(huán)境都不盡相同。不過可以簡單說幾點重要的步驟,然后節(jié)末放上一些安裝示例供大家參考。

          1)要使用PySpark,機子上要有Java開發(fā)環(huán)境
          2)環(huán)境變量記得要配置完整
          3)Mac下的/usr/local/ 路徑一般是隱藏的,PyCharm配置py4j和pyspark的時候可以使用 shift+command+G 來使用路徑訪問。
          4)Mac下如果修改了 ~/.bash_profile 的話,記得要重啟下PyCharm才會生效的哈
          5)版本記得要搞對,保險起見Java的jdk版本選擇低版本(別問我為什么知道),我選擇的是Java8.

          下面是一些示例,可以參考下:

          1)Mac下安裝spark,并配置pycharm-pyspark完整教程

          https://blog.csdn.net/shiyutianming/article/details/99946797

          2)virtualBox里安裝開發(fā)環(huán)境

          https://www.bilibili.com/video/BV1i4411i79a?p=3

          3)快速搭建spark開發(fā)環(huán)境,云哥項目

          https://github.com/lyhue1991/eat_pyspark_in_10_days

          ?? 基礎(chǔ)概念

          關(guān)于Spark的基礎(chǔ)概念,我在先前的文章里也有寫過,大家可以一起來回顧一下 《想學(xué)習(xí)Spark?先帶你了解一些基礎(chǔ)的知識》。作為補充,今天在這里也介紹一些在Spark中會經(jīng)常遇見的專有名詞。

          ???♀? Q1: 什么是RDD

          RDD的全稱是 Resilient Distributed Datasets,這是Spark的一種數(shù)據(jù)抽象集合,它可以被執(zhí)行在分布式的集群上進行各種操作,而且有較強的容錯機制。RDD可以被分為若干個分區(qū),每一個分區(qū)就是一個數(shù)據(jù)集片段,從而可以支持分布式計算。

          ???♀? Q2: RDD運行時相關(guān)的關(guān)鍵名詞

          簡單來說可以有 Client、Job、Master、Worker、Driver、Stage、Task以及Executor,這幾個東西在調(diào)優(yōu)的時候也會經(jīng)常遇到的。

          Client:指的是客戶端進程,主要負責(zé)提交job到Master;

          Job:Job來自于我們編寫的程序,Application包含一個或者多個job,job包含各種RDD操作;

          Master:指的是Standalone模式中的主控節(jié)點,負責(zé)接收來自Client的job,并管理著worker,可以給worker分配任務(wù)和資源(主要是driver和executor資源);

          Worker:指的是Standalone模式中的slave節(jié)點,負責(zé)管理本節(jié)點的資源,同時受Master管理,需要定期給Master回報heartbeat(心跳),啟動Driver和Executor;

          Driver:指的是 job(作業(yè))的主進程,一般每個Spark作業(yè)都會有一個Driver進程,負責(zé)整個作業(yè)的運行,包括了job的解析、Stage的生成、調(diào)度Task到Executor上去執(zhí)行;

          Stage:中文名 階段,是job的基本調(diào)度單位,因為每個job會分成若干組Task,每組任務(wù)就被稱為 Stage;

          Task:任務(wù),指的是直接運行在executor上的東西,是executor上的一個線程;

          Executor:指的是 執(zhí)行器,顧名思義就是真正執(zhí)行任務(wù)的地方了,一個集群可以被配置若干個Executor,每個Executor接收來自Driver的Task,并執(zhí)行它(可同時執(zhí)行多個Task)。

          ???♀? Q3: 什么是DAG

          全稱是 Directed Acyclic Graph,中文名是有向無環(huán)圖。Spark就是借用了DAG對RDD之間的關(guān)系進行了建模,用來描述RDD之間的因果依賴關(guān)系。因為在一個Spark作業(yè)調(diào)度中,多個作業(yè)任務(wù)之間也是相互依賴的,有些任務(wù)需要在一些任務(wù)執(zhí)行完成了才可以執(zhí)行的。在Spark調(diào)度中就是有DAGscheduler,它負責(zé)將job分成若干組Task組成的Stage。

          ???♀? Q4: Spark的部署模式有哪些

          主要有l(wèi)ocal模式、Standalone模式、Mesos模式、YARN模式。

          更多的解釋可以參考這位老哥的解釋。https://www.jianshu.com/p/3b8f85329664

          ???♀? Q5: Shuffle操作是什么

          Shuffle指的是數(shù)據(jù)從Map端到Reduce端的數(shù)據(jù)傳輸過程,Shuffle性能的高低直接會影響程序的性能。因為Reduce task需要跨節(jié)點去拉在分布在不同節(jié)點上的Map task計算結(jié)果,這一個過程是需要有磁盤IO消耗以及數(shù)據(jù)網(wǎng)絡(luò)傳輸?shù)南牡?,所以需要根?jù)實際數(shù)據(jù)情況進行適當(dāng)調(diào)整。另外,Shuffle可以分為兩部分,分別是Map階段的數(shù)據(jù)準(zhǔn)備與Reduce階段的數(shù)據(jù)拷貝處理,在Map端我們叫Shuffle Write,在Reduce端我們叫Shuffle Read。

          ???♀? Q6: 什么是惰性執(zhí)行

          這是RDD的一個特性,在RDD中的算子可以分為Transform算子和Action算子,其中Transform算子的操作都不會真正執(zhí)行,只會記錄一下依賴關(guān)系,直到遇見了Action算子,在這之前的所有Transform操作才會被觸發(fā)計算,這就是所謂的惰性執(zhí)行。具體哪些是Transform和Action算子,可以看下一節(jié)。

          ?? 常用函數(shù)

          從網(wǎng)友的總結(jié)來看比較常用的算子大概可以分為下面幾種,所以就演示一下這些算子,如果需要看更多的算子或者解釋,建議可以移步到官方API文檔去Search一下哈。

          pyspark.RDD:http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD

          圖來自 edureka 的pyspark入門教程

          下面我們用自己創(chuàng)建的RDD:sc.parallelize(range(1,11),4)

          import os
          import pyspark
          from pyspark import SparkContext, SparkConf

          conf = SparkConf().setAppName("test_SamShare").setMaster("local[4]")
          sc = SparkContext(conf=conf)

          # 使用 parallelize方法直接實例化一個RDD
          rdd = sc.parallelize(range(1,11),4# 這里的 4 指的是分區(qū)數(shù)量
          rdd.take(100)
          # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


          """
          ----------------------------------------------
                          Transform算子解析
          ----------------------------------------------
          """

          # 以下的操作由于是Transform操作,因為我們需要在最后加上一個collect算子用來觸發(fā)計算。
          # 1. map: 和python差不多,map轉(zhuǎn)換就是對每一個元素進行一個映射
          rdd = sc.parallelize(range(111), 4)
          rdd_map = rdd.map(lambda x: x*2)
          print("原始數(shù)據(jù):", rdd.collect())
          print("擴大2倍:", rdd_map.collect())
          # 原始數(shù)據(jù): [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
          # 擴大2倍: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

          # 2. flatMap: 這個相比于map多一個flat(壓平)操作,顧名思義就是要把高維的數(shù)組變成一維
          rdd2 = sc.parallelize(["hello SamShare""hello PySpark"])
          print("原始數(shù)據(jù):", rdd2.collect())
          print("直接split之后的map結(jié)果:", rdd2.map(lambda x: x.split(" ")).collect())
          print("直接split之后的flatMap結(jié)果:", rdd2.flatMap(lambda x: x.split(" ")).collect())
          # 直接split之后的map結(jié)果: [['hello', 'SamShare'], ['hello', 'PySpark']]
          # 直接split之后的flatMap結(jié)果: ['hello', 'SamShare', 'hello', 'PySpark']

          # 3. filter: 過濾數(shù)據(jù)
          rdd = sc.parallelize(range(111), 4)
          print("原始數(shù)據(jù):", rdd.collect())
          print("過濾奇數(shù):", rdd.filter(lambda x: x % 2 == 0).collect())
          # 原始數(shù)據(jù): [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
          # 過濾奇數(shù): [2, 4, 6, 8, 10]

          # 4. distinct: 去重元素
          rdd = sc.parallelize([2248888163232])
          print("原始數(shù)據(jù):", rdd.collect())
          print("去重數(shù)據(jù):", rdd.distinct().collect())
          # 原始數(shù)據(jù): [2, 2, 4, 8, 8, 8, 8, 16, 32, 32]
          # 去重數(shù)據(jù): [4, 8, 16, 32, 2]

          # 5. reduceByKey: 根據(jù)key來映射數(shù)據(jù)
          from operator import add
          rdd = sc.parallelize([("a"1), ("b"1), ("a"1)])
          print("原始數(shù)據(jù):", rdd.collect())
          print("原始數(shù)據(jù):", rdd.reduceByKey(add).collect())
          # 原始數(shù)據(jù): [('a', 1), ('b', 1), ('a', 1)]
          # 原始數(shù)據(jù): [('b', 1), ('a', 2)]

          # 6. mapPartitions: 根據(jù)分區(qū)內(nèi)的數(shù)據(jù)進行映射操作
          rdd = sc.parallelize([1234], 2)
          def f(iterator):
              yield sum(iterator)
          print(rdd.collect())
          print(rdd.mapPartitions(f).collect())
          # [1, 2, 3, 4]
          # [3, 7]

          # 7. sortBy: 根據(jù)規(guī)則進行排序
          tmp = [('a'1), ('b'2), ('1'3), ('d'4), ('2'5)]
          print(sc.parallelize(tmp).sortBy(lambda x: x[0]).collect())
          print(sc.parallelize(tmp).sortBy(lambda x: x[1]).collect())
          # [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
          # [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

          # 8. subtract: 數(shù)據(jù)集相減, Return each value in self that is not contained in other.
          x = sc.parallelize([("a"1), ("b"4), ("b"5), ("a"3)])
          y = sc.parallelize([("a"3), ("c"None)])
          print(sorted(x.subtract(y).collect()))
          # [('a', 1), ('b', 4), ('b', 5)]

          # 9. union: 合并兩個RDD
          rdd = sc.parallelize([1123])
          print(rdd.union(rdd).collect())
          # [1, 1, 2, 3, 1, 1, 2, 3]

          # 10. intersection: 取兩個RDD的交集,同時有去重的功效
          rdd1 = sc.parallelize([110234523])
          rdd2 = sc.parallelize([162378])
          print(rdd1.intersection(rdd2).collect())
          # [1, 2, 3]

          # 11. cartesian: 生成笛卡爾積
          rdd = sc.parallelize([12])
          print(sorted(rdd.cartesian(rdd).collect()))
          # [(1, 1), (1, 2), (2, 1), (2, 2)]

          # 12. zip: 拉鏈合并,需要兩個RDD具有相同的長度以及分區(qū)數(shù)量
          x = sc.parallelize(range(05))
          y = sc.parallelize(range(10001005))
          print(x.collect())
          print(y.collect())
          print(x.zip(y).collect())
          # [0, 1, 2, 3, 4]
          # [1000, 1001, 1002, 1003, 1004]
          # [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]

          # 13. zipWithIndex: 將RDD和一個從0開始的遞增序列按照拉鏈方式連接。
          rdd_name = sc.parallelize(["LiLei""Hanmeimei""Lily""Lucy""Ann""Dachui""RuHua"])
          rdd_index = rdd_name.zipWithIndex()
          print(rdd_index.collect())
          # [('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]

          # 14. groupByKey: 按照key來聚合數(shù)據(jù)
          rdd = sc.parallelize([("a"1), ("b"1), ("a"1)])
          print(rdd.collect())
          print(sorted(rdd.groupByKey().mapValues(len).collect()))
          print(sorted(rdd.groupByKey().mapValues(list).collect()))
          # [('a', 1), ('b', 1), ('a', 1)]
          # [('a', 2), ('b', 1)]
          # [('a', [1, 1]), ('b', [1])]

          # 15. sortByKey:
          tmp = [('a'1), ('b'2), ('1'3), ('d'4), ('2'5)]
          print(sc.parallelize(tmp).sortByKey(True1).collect())
          # [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

          # 16. join:
          x = sc.parallelize([("a"1), ("b"4)])
          y = sc.parallelize([("a"2), ("a"3)])
          print(sorted(x.join(y).collect()))
          # [('a', (1, 2)), ('a', (1, 3))]

          # 17. leftOuterJoin/rightOuterJoin
          x = sc.parallelize([("a"1), ("b"4)])
          y = sc.parallelize([("a"2)])
          print(sorted(x.leftOuterJoin(y).collect()))
          # [('a', (1, 2)), ('b', (4, None))]

          """
          ----------------------------------------------
                          Action算子解析
          ----------------------------------------------
          """

          # 1. collect: 指的是把數(shù)據(jù)都匯集到driver端,便于后續(xù)的操作
          rdd = sc.parallelize(range(05))
          rdd_collect = rdd.collect()
          print(rdd_collect)
          # [0, 1, 2, 3, 4]

          # 2. first: 取第一個元素
          sc.parallelize([234]).first()
          # 2

          # 3. collectAsMap: 轉(zhuǎn)換為dict,使用這個要注意了,不要對大數(shù)據(jù)用,不然全部載入到driver端會爆內(nèi)存
          m = sc.parallelize([(12), (34)]).collectAsMap()
          m
          # {1: 2, 3: 4}

          # 4. reduce: 逐步對兩個元素進行操作
          rdd = sc.parallelize(range(10),5)
          print(rdd.reduce(lambda x,y:x+y))
          # 45

          # 5. countByKey/countByValue:
          rdd = sc.parallelize([("a"1), ("b"1), ("a"1)])
          print(sorted(rdd.countByKey().items()))
          print(sorted(rdd.countByValue().items()))
          # [('a', 2), ('b', 1)]
          # [(('a', 1), 2), (('b', 1), 1)]

          # 6. take: 相當(dāng)于取幾個數(shù)據(jù)到driver端
          rdd = sc.parallelize([("a"1), ("b"1), ("a"1)])
          print(rdd.take(5))
          # [('a', 1), ('b', 1), ('a', 1)]

          # 7. saveAsTextFile: 保存rdd成text文件到本地
          text_file = "./data/rdd.txt"
          rdd = sc.parallelize(range(5))
          rdd.saveAsTextFile(text_file)

          # 8. takeSample: 隨機取數(shù)
          rdd = sc.textFile("./test/data/hello_samshare.txt"4)  # 這里的 4 指的是分區(qū)數(shù)量
          rdd_sample = rdd.takeSample(True20)  # withReplacement 參數(shù)1:代表是否是有放回抽樣
          rdd_sample

          # 9. foreach: 對每一個元素執(zhí)行某種操作,不生成新的RDD
          rdd = sc.parallelize(range(10), 5)
          accum = sc.accumulator(0)
          rdd.foreach(lambda x: accum.add(x))
          print(accum.value)
          # 45


          Sam:未完待續(xù)... 文章較長,分上下兩篇文章來寫哈。


          ??學(xué)習(xí)資源推薦:

          1)edureka about PySpark Tutorial
          印度老哥的課程,B站可直接看,不過口音略難聽懂不過還好有字幕。
          https://www.bilibili.com/video/BV1i4411i79a?p=1
          2)eat_pyspark_in_10_days
          梁云大哥的課程,講得超級清晰,建議精讀。
          https://github.com/lyhue1991/eat_pyspark_in_10_days
          3)官方文檔
          http://spark.apache.org/docs/latest/api/python/reference/index.html

          瀏覽 81
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲日日日 | 欧美色图15 | 韩国免费毛片 | 色天天男人天堂 | 欧美一级欧美一级在线播放 |