<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          3萬字長文,PySpark入門級學(xué)習(xí)教程,框架思維

          共 37427字,需瀏覽 75分鐘

           ·

          2021-08-04 17:24

          93e4a0151a5da3be7ba90b251c8bff83.webp

          為什么要學(xué)習(xí)Spark?作為數(shù)據(jù)從業(yè)者多年,個(gè)人覺得Spark已經(jīng)越來越走進(jìn)我們的日常工作了,無論是使用哪種編程語言,Python、Scala還是Java,都會或多或少接觸到Spark,它可以讓我們能夠用到集群的力量,可以對BigData進(jìn)行高效操作,實(shí)現(xiàn)很多之前由于計(jì)算資源而無法輕易實(shí)現(xiàn)的東西。網(wǎng)上有很多關(guān)于Spark的好處,這里就不做過多的贅述,我們直接進(jìn)入這篇文章的正文!

          d2e2258fa41d2856b999cd8606efceb0.webp


          關(guān)于PySpark,我們知道它是Python調(diào)用Spark的接口,我們可以通過調(diào)用Python API的方式來編寫Spark程序,它支持了大多數(shù)的Spark功能,比如SparkDataFrame、Spark SQL、Streaming、MLlib等等。只要我們了解Python的基本語法,那么在Python里調(diào)用Spark的力量就顯得十分easy了。下面我將會從相對宏觀的層面介紹一下PySpark,讓我們對于這個(gè)神器有一個(gè)框架性的認(rèn)識,知道它能干什么,知道去哪里尋找問題解答,爭取看完這篇文章可以讓我們更加絲滑地入門PySpark。話不多說,馬上開始!

          ?? 目錄

          ab2805fce923c5d8a93982b0682ea4de.webp

          ?? 安裝指引

          安裝這塊本文就不展開具體的步驟了,畢竟大家的機(jī)子環(huán)境都不盡相同。不過可以簡單說幾點(diǎn)重要的步驟,然后節(jié)末放上一些安裝示例供大家參考。

          1)要使用PySpark,機(jī)子上要有Java開發(fā)環(huán)境

          2)環(huán)境變量記得要配置完整

          3)Mac下的/usr/local/ 路徑一般是隱藏的,PyCharm配置py4j和pyspark的時(shí)候可以使用 shift+command+G 來使用路徑訪問。

          4)Mac下如果修改了 ~/.bash_profile 的話,記得要重啟下PyCharm才會生效的哈

          5)版本記得要搞對,保險(xiǎn)起見Java的jdk版本選擇低版本(別問我為什么知道),我選擇的是Java8.


          下面是一些示例demo,可以參考下:

          1)Mac下安裝spark,并配置pycharm-pyspark完整教程

          https://blog.csdn.net/shiyutianming/article/details/99946797

          2)virtualBox里安裝開發(fā)環(huán)境

          https://www.bilibili.com/video/BV1i4411i79a?p=3

          3)快速搭建spark開發(fā)環(huán)境,云哥項(xiàng)目

          https://github.com/lyhue1991/eat_pyspark_in_10_days

          ?? 基礎(chǔ)概念

          關(guān)于Spark的基礎(chǔ)概念,我在先前的文章里也有寫過,大家可以一起來回顧一下 《想學(xué)習(xí)Spark?先帶你了解一些基礎(chǔ)的知識》。作為補(bǔ)充,今天在這里也介紹一些在Spark中會經(jīng)常遇見的專有名詞。

          ???♀? Q1: 什么是RDD

          RDD的全稱是 Resilient Distributed Datasets,這是Spark的一種數(shù)據(jù)抽象集合,它可以被執(zhí)行在分布式的集群上進(jìn)行各種操作,而且有較強(qiáng)的容錯(cuò)機(jī)制。RDD可以被分為若干個(gè)分區(qū),每一個(gè)分區(qū)就是一個(gè)數(shù)據(jù)集片段,從而可以支持分布式計(jì)算。

          ???♀? Q2: RDD運(yùn)行時(shí)相關(guān)的關(guān)鍵名詞

          簡單來說可以有 Client、Job、Master、Worker、Driver、Stage、Task以及Executor,這幾個(gè)東西在調(diào)優(yōu)的時(shí)候也會經(jīng)常遇到的。

          Client:指的是客戶端進(jìn)程,主要負(fù)責(zé)提交job到Master;

          Job:Job來自于我們編寫的程序,Application包含一個(gè)或者多個(gè)job,job包含各種RDD操作;

          Master:指的是Standalone模式中的主控節(jié)點(diǎn),負(fù)責(zé)接收來自Client的job,并管理著worker,可以給worker分配任務(wù)和資源(主要是driver和executor資源);

          Worker:指的是Standalone模式中的slave節(jié)點(diǎn),負(fù)責(zé)管理本節(jié)點(diǎn)的資源,同時(shí)受Master管理,需要定期給Master回報(bào)heartbeat(心跳),啟動Driver和Executor;

          Driver:指的是 job(作業(yè))的主進(jìn)程,一般每個(gè)Spark作業(yè)都會有一個(gè)Driver進(jìn)程,負(fù)責(zé)整個(gè)作業(yè)的運(yùn)行,包括了job的解析、Stage的生成、調(diào)度Task到Executor上去執(zhí)行;

          Stage:中文名 階段,是job的基本調(diào)度單位,因?yàn)槊總€(gè)job會分成若干組Task,每組任務(wù)就被稱為 Stage;

          Task:任務(wù),指的是直接運(yùn)行在executor上的東西,是executor上的一個(gè)線程;

          Executor:指的是 執(zhí)行器,顧名思義就是真正執(zhí)行任務(wù)的地方了,一個(gè)集群可以被配置若干個(gè)Executor,每個(gè)Executor接收來自Driver的Task,并執(zhí)行它(可同時(shí)執(zhí)行多個(gè)Task)。

          ???♀? Q3: 什么是DAG

          全稱是 Directed Acyclic Graph,中文名是有向無環(huán)圖。Spark就是借用了DAG對RDD之間的關(guān)系進(jìn)行了建模,用來描述RDD之間的因果依賴關(guān)系。因?yàn)樵谝粋€(gè)Spark作業(yè)調(diào)度中,多個(gè)作業(yè)任務(wù)之間也是相互依賴的,有些任務(wù)需要在一些任務(wù)執(zhí)行完成了才可以執(zhí)行的。在Spark調(diào)度中就是有DAGscheduler,它負(fù)責(zé)將job分成若干組Task組成的Stage。

          96d9e17ee26a58df670644f459b1cec6.webp


          ???♀? Q4: Spark的部署模式有哪些

          主要有l(wèi)ocal模式、Standalone模式、Mesos模式、YARN模式。

          更多的解釋可以參考這位老哥的解釋。https://www.jianshu.com/p/3b8f85329664

          ???♀? Q5: Shuffle操作是什么

          Shuffle指的是數(shù)據(jù)從Map端到Reduce端的數(shù)據(jù)傳輸過程,Shuffle性能的高低直接會影響程序的性能。因?yàn)镽educe task需要跨節(jié)點(diǎn)去拉在分布在不同節(jié)點(diǎn)上的Map task計(jì)算結(jié)果,這一個(gè)過程是需要有磁盤IO消耗以及數(shù)據(jù)網(wǎng)絡(luò)傳輸?shù)南牡?,所以需要根?jù)實(shí)際數(shù)據(jù)情況進(jìn)行適當(dāng)調(diào)整。另外,Shuffle可以分為兩部分,分別是Map階段的數(shù)據(jù)準(zhǔn)備與Reduce階段的數(shù)據(jù)拷貝處理,在Map端我們叫Shuffle Write,在Reduce端我們叫Shuffle Read。

          ???♀? Q6: 什么是惰性執(zhí)行

          這是RDD的一個(gè)特性,在RDD中的算子可以分為Transform算子和Action算子,其中Transform算子的操作都不會真正執(zhí)行,只會記錄一下依賴關(guān)系,直到遇見了Action算子,在這之前的所有Transform操作才會被觸發(fā)計(jì)算,這就是所謂的惰性執(zhí)行。具體哪些是Transform和Action算子,可以看下一節(jié)。

          ?? 常用函數(shù)

          從網(wǎng)友的總結(jié)來看比較常用的算子大概可以分為下面幾種,所以就演示一下這些算子,如果需要看更多的算子或者解釋,建議可以移步到官方API文檔去Search一下哈。

          pyspark.RDD:http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD

          897dfca5cc380d9508898ff79837c52f.webp

          圖來自 edureka 的pyspark入門教程

          下面我們用自己創(chuàng)建的RDD:sc.parallelize(range(1,11),4)

          import?os
          import?pyspark
          from?pyspark?import?SparkContext,?SparkConf

          conf?=?SparkConf().setAppName("test_SamShare").setMaster("local[4]")
          sc?=?SparkContext(conf=conf)

          #?使用?parallelize方法直接實(shí)例化一個(gè)RDD
          rdd?=?sc.parallelize(range(1,11),4)?#?這里的?4?指的是分區(qū)數(shù)量
          rdd.take(100)
          #?[1,?2,?3,?4,?5,?6,?7,?8,?9,?10]


          """
          ----------------------------------------------
          ????????????????Transform算子解析
          ----------------------------------------------
          """

          #?以下的操作由于是Transform操作,因?yàn)槲覀冃枰谧詈蠹由弦粋€(gè)collect算子用來觸發(fā)計(jì)算。
          #?1.?map:?和python差不多,map轉(zhuǎn)換就是對每一個(gè)元素進(jìn)行一個(gè)映射
          rdd?=?sc.parallelize(range(1,?11),?4)
          rdd_map?=?rdd.map(lambda?x:?x*2)
          print("原始數(shù)據(jù):",?rdd.collect())
          print("擴(kuò)大2倍:",?rdd_map.collect())
          #?原始數(shù)據(jù):?[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
          #?擴(kuò)大2倍:?[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

          #?2.?flatMap:?這個(gè)相比于map多一個(gè)flat(壓平)操作,顧名思義就是要把高維的數(shù)組變成一維
          rdd2?=?sc.parallelize(["hello?SamShare",?"hello?PySpark"])
          print("原始數(shù)據(jù):",?rdd2.collect())
          print("直接split之后的map結(jié)果:",?rdd2.map(lambda?x:?x.split("?")).collect())
          print("直接split之后的flatMap結(jié)果:",?rdd2.flatMap(lambda?x:?x.split("?")).collect())
          #?直接split之后的map結(jié)果:?[['hello', 'SamShare'], ['hello', 'PySpark']]
          #?直接split之后的flatMap結(jié)果:?['hello', 'SamShare', 'hello', 'PySpark']

          #?3.?filter:?過濾數(shù)據(jù)
          rdd?=?sc.parallelize(range(1,?11),?4)
          print("原始數(shù)據(jù):",?rdd.collect())
          print("過濾奇數(shù):",?rdd.filter(lambda?x:?x?%?2?==?0).collect())
          #?原始數(shù)據(jù):?[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
          #?過濾奇數(shù):?[2, 4, 6, 8, 10]

          #?4.?distinct:?去重元素
          rdd?=?sc.parallelize([2,?2,?4,?8,?8,?8,?8,?16,?32,?32])
          print("原始數(shù)據(jù):",?rdd.collect())
          print("去重?cái)?shù)據(jù):",?rdd.distinct().collect())
          #?原始數(shù)據(jù):?[2, 2, 4, 8, 8, 8, 8, 16, 32, 32]
          #?去重?cái)?shù)據(jù):?[4, 8, 16, 32, 2]

          #?5.?reduceByKey:?根據(jù)key來映射數(shù)據(jù)
          from?operator?import?add
          rdd?=?sc.parallelize([("a",?1),?("b",?1),?("a",?1)])
          print("原始數(shù)據(jù):",?rdd.collect())
          print("原始數(shù)據(jù):",?rdd.reduceByKey(add).collect())
          #?原始數(shù)據(jù):?[('a', 1), ('b', 1), ('a', 1)]
          #?原始數(shù)據(jù):?[('b', 1), ('a', 2)]

          #?6.?mapPartitions:?根據(jù)分區(qū)內(nèi)的數(shù)據(jù)進(jìn)行映射操作
          rdd?=?sc.parallelize([1,?2,?3,?4],?2)
          def?f(iterator):
          ????yield?sum(iterator)
          print(rdd.collect())
          print(rdd.mapPartitions(f).collect())
          #?[1,?2,?3,?4]
          #?[3,?7]

          #?7.?sortBy:?根據(jù)規(guī)則進(jìn)行排序
          tmp?=?[('a',?1),?('b',?2),?('1',?3),?('d',?4),?('2',?5)]
          print(sc.parallelize(tmp).sortBy(lambda?x:?x[0]).collect())
          print(sc.parallelize(tmp).sortBy(lambda?x:?x[1]).collect())
          #?[('1',?3),?('2',?5),?('a',?1),?('b',?2),?('d',?4)]
          #?[('a',?1),?('b',?2),?('1',?3),?('d',?4),?('2',?5)]

          #?8.?subtract:?數(shù)據(jù)集相減,?Return?each?value?in?self?that?is?not?contained?in?other.
          x?=?sc.parallelize([("a",?1),?("b",?4),?("b",?5),?("a",?3)])
          y?=?sc.parallelize([("a",?3),?("c",?None)])
          print(sorted(x.subtract(y).collect()))
          #?[('a',?1),?('b',?4),?('b',?5)]

          #?9.?union:?合并兩個(gè)RDD
          rdd?=?sc.parallelize([1,?1,?2,?3])
          print(rdd.union(rdd).collect())
          #?[1,?1,?2,?3,?1,?1,?2,?3]

          #?10.?intersection:?取兩個(gè)RDD的交集,同時(shí)有去重的功效
          rdd1?=?sc.parallelize([1,?10,?2,?3,?4,?5,?2,?3])
          rdd2?=?sc.parallelize([1,?6,?2,?3,?7,?8])
          print(rdd1.intersection(rdd2).collect())
          #?[1,?2,?3]

          #?11.?cartesian:?生成笛卡爾積
          rdd?=?sc.parallelize([1,?2])
          print(sorted(rdd.cartesian(rdd).collect()))
          #?[(1,?1),?(1,?2),?(2,?1),?(2,?2)]

          #?12.?zip:?拉鏈合并,需要兩個(gè)RDD具有相同的長度以及分區(qū)數(shù)量
          x?=?sc.parallelize(range(0,?5))
          y?=?sc.parallelize(range(1000,?1005))
          print(x.collect())
          print(y.collect())
          print(x.zip(y).collect())
          #?[0,?1,?2,?3,?4]
          #?[1000,?1001,?1002,?1003,?1004]
          #?[(0,?1000),?(1,?1001),?(2,?1002),?(3,?1003),?(4,?1004)]

          # 13. zipWithIndex:?將RDD和一個(gè)從0開始的遞增序列按照拉鏈方式連接。
          rdd_name?=?sc.parallelize(["LiLei",?"Hanmeimei",?"Lily",?"Lucy",?"Ann",?"Dachui",?"RuHua"])
          rdd_index?=?rdd_name.zipWithIndex()
          print(rdd_index.collect())
          #?[('LiLei',?0),?('Hanmeimei',?1),?('Lily',?2),?('Lucy',?3),?('Ann',?4),?('Dachui',?5),?('RuHua',?6)]

          #?14.?groupByKey:?按照key來聚合數(shù)據(jù)
          rdd?=?sc.parallelize([("a",?1),?("b",?1),?("a",?1)])
          print(rdd.collect())
          print(sorted(rdd.groupByKey().mapValues(len).collect()))
          print(sorted(rdd.groupByKey().mapValues(list).collect()))
          #?[('a',?1),?('b',?1),?('a',?1)]
          #?[('a',?2),?('b',?1)]
          #?[('a',?[1,?1]),?('b',?[1])]

          #?15.?sortByKey:
          tmp?=?[('a',?1),?('b',?2),?('1',?3),?('d',?4),?('2',?5)]
          print(sc.parallelize(tmp).sortByKey(True,?1).collect())
          #?[('1',?3),?('2',?5),?('a',?1),?('b',?2),?('d',?4)]

          #?16.?join:
          x?=?sc.parallelize([("a",?1),?("b",?4)])
          y?=?sc.parallelize([("a",?2),?("a",?3)])
          print(sorted(x.join(y).collect()))
          #?[('a',?(1,?2)),?('a',?(1,?3))]

          #?17.?leftOuterJoin/rightOuterJoin
          x?=?sc.parallelize([("a",?1),?("b",?4)])
          y?=?sc.parallelize([("a",?2)])
          print(sorted(x.leftOuterJoin(y).collect()))
          #?[('a',?(1,?2)),?('b',?(4,?None))]

          """
          ----------------------------------------------
          ????????????????Action算子解析
          ----------------------------------------------
          """

          #?1.?collect:?指的是把數(shù)據(jù)都匯集到driver端,便于后續(xù)的操作
          rdd?=?sc.parallelize(range(0,?5))
          rdd_collect?=?rdd.collect()
          print(rdd_collect)
          #?[0,?1,?2,?3,?4]

          #?2.?first:?取第一個(gè)元素
          sc.parallelize([2,?3,?4]).first()
          #?2

          #?3.?collectAsMap:?轉(zhuǎn)換為dict,使用這個(gè)要注意了,不要對大數(shù)據(jù)用,不然全部載入到driver端會爆內(nèi)存
          m?=?sc.parallelize([(1,?2),?(3,?4)]).collectAsMap()
          m
          #?{1:?2,?3:?4}

          #?4.?reduce:?逐步對兩個(gè)元素進(jìn)行操作
          rdd?=?sc.parallelize(range(10),5)
          print(rdd.reduce(lambda?x,y:x+y))
          #?45

          #?5.?countByKey/countByValue:
          rdd?=?sc.parallelize([("a",?1),?("b",?1),?("a",?1)])
          print(sorted(rdd.countByKey().items()))
          print(sorted(rdd.countByValue().items()))
          #?[('a',?2),?('b',?1)]
          #?[(('a',?1),?2),?(('b',?1),?1)]

          #?6.?take:?相當(dāng)于取幾個(gè)數(shù)據(jù)到driver端
          rdd?=?sc.parallelize([("a",?1),?("b",?1),?("a",?1)])
          print(rdd.take(5))
          #?[('a',?1),?('b',?1),?('a',?1)]

          #?7.?saveAsTextFile:?保存rdd成text文件到本地
          text_file?=?"./data/rdd.txt"
          rdd?=?sc.parallelize(range(5))
          rdd.saveAsTextFile(text_file)

          #?8.?takeSample:?隨機(jī)取數(shù)
          rdd?=?sc.textFile("./test/data/hello_samshare.txt",?4)??#?這里的?4?指的是分區(qū)數(shù)量
          rdd_sample?=?rdd.takeSample(True,?2,?0)??# withReplacement 參數(shù)1:代表是否是有放回抽樣
          rdd_sample

          #?9.?foreach:?對每一個(gè)元素執(zhí)行某種操作,不生成新的RDD
          rdd?=?sc.parallelize(range(10),?5)
          accum?=?sc.accumulator(0)
          rdd.foreach(lambda?x:?accum.add(x))
          print(accum.value)
          #?45

          ?? Spark SQL使用

          在講Spark SQL前,先解釋下這個(gè)模塊。這個(gè)模塊是Spark中用來處理結(jié)構(gòu)化數(shù)據(jù)的,提供一個(gè)叫SparkDataFrame的東西并且自動解析為分布式SQL查詢數(shù)據(jù)。我們之前用過Python的Pandas庫,也大致了解了DataFrame,這個(gè)其實(shí)和它沒有太大的區(qū)別,只是調(diào)用的API可能有些不同罷了。

          我們通過使用Spark SQL來處理數(shù)據(jù),會讓我們更加地熟悉,比如可以用SQL語句、用SparkDataFrame的API或者Datasets API,我們可以按照需求隨心轉(zhuǎn)換,通過SparkDataFrame API 和 SQL 寫的邏輯,會被Spark優(yōu)化器Catalyst自動優(yōu)化成RDD,即便寫得不好也可能運(yùn)行得很快(如果是直接寫RDD可能就掛了哈哈)。

          創(chuàng)建SparkDataFrame

          開始講SparkDataFrame,我們先學(xué)習(xí)下幾種創(chuàng)建的方法,分別是使用RDD來創(chuàng)建使用python的DataFrame來創(chuàng)建、使用List來創(chuàng)建、讀取數(shù)據(jù)文件來創(chuàng)建、通過讀取數(shù)據(jù)庫來創(chuàng)建。

          1. 使用RDD來創(chuàng)建

          主要使用RDD的toDF方法。

          rdd?=?sc.parallelize([("Sam",?28,?88),?("Flora",?28,?90),?("Run",?1,?60)])
          df?=?rdd.toDF(["name",?"age",?"score"])
          df.show()
          df.printSchema()

          #?+-----+---+-----+
          #?|?name|age|score|
          #?+-----+---+-----+
          #?|??Sam|?28|???88|
          #?|Flora|?28|???90|
          #?|??Run|??1|???60|
          #?+-----+---+-----+
          #?root
          #??|--?name:?string?(nullable?=?true)
          #??|--?age:?long?(nullable?=?true)
          #??|--?score:?long?(nullable?=?true)
          2. 使用python的DataFrame來創(chuàng)建
          df?=?pd.DataFrame([['Sam',?28,?88],?['Flora',?28,?90],?['Run',?1,?60]],
          ??????????????????columns=['name',?'age',?'score'])
          print(">>?打印DataFrame:")
          print(df)
          print("\n")
          Spark_df?=?spark.createDataFrame(df)
          print(">>?打印SparkDataFrame:")
          Spark_df.show()
          #?>>?打印DataFrame:
          #?????name??age??score
          #?0????Sam???28?????88
          #?1??Flora???28?????90
          #?2????Run????1?????60
          #?>>?打印SparkDataFrame:
          #?+-----+---+-----+
          #?|?name|age|score|
          #?+-----+---+-----+
          #?|??Sam|?28|???88|
          #?|Flora|?28|???90|
          #?|??Run|??1|???60|
          #?+-----+---+-----+
          3. 使用List來創(chuàng)建
          list_values?=?[['Sam',?28,?88],?['Flora',?28,?90],?['Run',?1,?60]]
          Spark_df?=?spark.createDataFrame(list_values,?['name',?'age',?'score'])
          Spark_df.show()
          #?+-----+---+-----+
          #?|?name|age|score|
          #?+-----+---+-----+
          #?|??Sam|?28|???88|
          #?|Flora|?28|???90|
          #?|??Run|??1|???60|
          #?+-----+---+-----+
          4. 讀取數(shù)據(jù)文件來創(chuàng)建
          #?4.1?CSV文件
          df?=?spark.read.option("header",?"true")\
          ????.option("inferSchema",?"true")\
          ????.option("delimiter",?",")\
          ????.csv("./test/data/titanic/train.csv")
          df.show(5)
          df.printSchema()

          #?4.2?json文件
          df?=?spark.read.json("./test/data/hello_samshare.json")
          df.show(5)
          df.printSchema()
          5. 通過讀取數(shù)據(jù)庫來創(chuàng)建
          #?5.1?讀取hive數(shù)據(jù)
          spark.sql("CREATE?TABLE?IF?NOT?EXISTS?src?(key?INT,?value?STRING)?USING?hive")
          spark.sql("LOAD?DATA?LOCAL?INPATH?'data/kv1.txt'?INTO?TABLE?src")
          df?=?spark.sql("SELECT?key,?value?FROM?src?WHERE?key?<?10?ORDER?BY?key")
          df.show(5)

          #?5.2?讀取mysql數(shù)據(jù)
          url?=?"jdbc:mysql://localhost:3306/test"
          df?=?spark.read.format("jdbc")?\
          ?.option("url",?url)?\
          ?.option("dbtable",?"runoob_tbl")?\
          ?.option("user",?"root")?\
          ?.option("password",?"8888")?\
          ?.load()\
          df.show()

          常用的SparkDataFrame API

          這里我大概是分成了幾部分來看這些APIs,分別是查看DataFrame的APIs、簡單處理DataFrame的APIs、DataFrame的列操作APIs、DataFrame的一些思路變換操作APIsDataFrame的一些統(tǒng)計(jì)操作APIs,這樣子也有助于我們了解這些API的功能,以后遇見實(shí)際問題的時(shí)候可以解決。

          首先我們這小節(jié)全局用到的數(shù)據(jù)集如下:

          from?pyspark.sql?import?functions?as?F
          from?pyspark.sql?import?SparkSession
          # SparkSQL的許多功能封裝在SparkSession的方法接口中, SparkContext則不行的。
          spark?=?SparkSession.builder?\
          ????.appName("sam_SamShare")?\
          ????.config("master",?"local[4]")?\
          ????.enableHiveSupport()?\
          ????.getOrCreate()
          sc?=?spark.sparkContext

          #?創(chuàng)建一個(gè)SparkDataFrame
          rdd?=?sc.parallelize([("Sam",?28,?88,?"M"),
          ??????????????????????("Flora",?28,?90,?"F"),
          ??????????????????????("Run",?1,?60,?None),
          ??????????????????????("Peter",?55,?100,?"M"),
          ??????????????????????("Mei",?54,?95,?"F")])
          df?=?rdd.toDF(["name",?"age",?"score",?"sex"])
          df.show()
          df.printSchema()

          #?+-----+---+-----+----+
          #?|?name|age|score|?sex|
          #?+-----+---+-----+----+
          #?|??Sam|?28|???88|???M|
          #?|Flora|?28|???90|???F|
          #?|??Run|??1|???60|null|
          #?|Peter|?55|??100|???M|
          #?|??Mei|?54|???95|???F|
          #?+-----+---+-----+----+
          #?root
          #??|--?name:?string?(nullable?=?true)
          #??|--?age:?long?(nullable?=?true)
          #??|--?score:?long?(nullable?=?true)
          #??|--?sex:?string?(nullable?=?true)
          1. 查看DataFrame的APIs
          #?DataFrame.collect
          #?以列表形式返回行
          df.collect()
          #?[Row(name='Sam',?age=28,?score=88,?sex='M'),
          #?Row(name='Flora',?age=28,?score=90,?sex='F'),
          #?Row(name='Run',?age=1,?score=60,?sex=None),
          #?Row(name='Peter',?age=55,?score=100,?sex='M'),
          #?Row(name='Mei',?age=54,?score=95,?sex='F')]

          #?DataFrame.count
          df.count()
          #?5

          #?DataFrame.columns
          df.columns
          #?['name',?'age',?'score',?'sex']

          #?DataFrame.dtypes
          df.dtypes
          #?[('name',?'string'),?('age',?'bigint'),?('score',?'bigint'),?('sex',?'string')]

          #?DataFrame.describe
          #?返回列的基礎(chǔ)統(tǒng)計(jì)信息
          df.describe(['age']).show()
          #?+-------+------------------+
          #?|summary|???????????????age|
          #?+-------+------------------+
          #?|??count|?????????????????5|
          #?|???mean|??????????????33.2|
          #?|?stddev|22.353970564532826|
          #?|????min|?????????????????1|
          #?|????max|????????????????55|
          #?+-------+------------------+
          df.describe().show()
          #?+-------+-----+------------------+------------------+----+
          #?|summary|?name|???????????????age|?????????????score|?sex|
          #?+-------+-----+------------------+------------------+----+
          #?|??count|????5|?????????????????5|?????????????????5|???4|
          #?|???mean|?null|??????????????33.2|??????????????86.6|null|
          #?|?stddev|?null|22.353970564532826|15.582040944625966|null|
          #?|????min|Flora|?????????????????1|????????????????60|???F|
          #?|????max|??Sam|????????????????55|???????????????100|???M|
          #?+-------+-----+------------------+------------------+----+

          #?DataFrame.select
          #?選定指定列并按照一定順序呈現(xiàn)
          df.select("sex",?"score").show()

          #?DataFrame.first
          #?DataFrame.head
          #?查看第1條數(shù)據(jù)
          df.first()
          #?Row(name='Sam',?age=28,?score=88,?sex='M')
          df.head(1)
          #?[Row(name='Sam',?age=28,?score=88,?sex='M')]


          #?DataFrame.freqItems
          #?查看指定列的枚舉值
          df.freqItems(["age","sex"]).show()
          #?+---------------+-------------+
          #?|??age_freqItems|sex_freqItems|
          #?+---------------+-------------+
          #?|[55,?1,?28,?54]|??????[M,?F,]|
          #?+---------------+-------------+

          #?DataFrame.summary
          df.summary().show()
          #?+-------+-----+------------------+------------------+----+
          #?|summary|?name|???????????????age|?????????????score|?sex|
          #?+-------+-----+------------------+------------------+----+
          #?|??count|????5|?????????????????5|?????????????????5|???4|
          #?|???mean|?null|??????????????33.2|??????????????86.6|null|
          #?|?stddev|?null|22.353970564532826|15.582040944625966|null|
          #?|????min|Flora|?????????????????1|????????????????60|???F|
          #?|????25%|?null|????????????????28|????????????????88|null|
          #?|????50%|?null|????????????????28|????????????????90|null|
          #?|????75%|?null|????????????????54|????????????????95|null|
          #?|????max|??Sam|????????????????55|???????????????100|???M|
          #?+-------+-----+------------------+------------------+----+

          #?DataFrame.sample
          #?按照一定規(guī)則從df隨機(jī)抽樣數(shù)據(jù)
          df.sample(0.5).show()
          #?+-----+---+-----+----+
          #?|?name|age|score|?sex|
          #?+-----+---+-----+----+
          #?|??Sam|?28|???88|???M|
          #?|??Run|??1|???60|null|
          #?|Peter|?55|??100|???M|
          #?+-----+---+-----+----+
          2. 簡單處理DataFrame的APIs
          #?DataFrame.distinct
          #?對數(shù)據(jù)集進(jìn)行去重
          df.distinct().show()

          #?DataFrame.dropDuplicates
          #?對指定列去重
          df.dropDuplicates(["sex"]).show()
          #?+-----+---+-----+----+
          #?|?name|age|score|?sex|
          #?+-----+---+-----+----+
          #?|Flora|?28|???90|???F|
          #?|??Run|??1|???60|null|
          #?|??Sam|?28|???88|???M|
          #?+-----+---+-----+----+

          #?DataFrame.exceptAll
          #?DataFrame.subtract
          #?根據(jù)指定的df對df進(jìn)行去重
          df1?=?spark.createDataFrame(
          ????????[("a",?1),?("a",?1),?("b",??3),?("c",?4)],?["C1",?"C2"])
          df2?=?spark.createDataFrame([("a",?1),?("b",?3)],?["C1",?"C2"])
          df3?=?df1.exceptAll(df2)??#?沒有去重的功效
          df4?=?df1.subtract(df2)??#?有去重的奇效
          df1.show()
          df2.show()
          df3.show()
          df4.show()
          #?+---+---+
          #?|?C1|?C2|
          #?+---+---+
          #?|??a|??1|
          #?|??a|??1|
          #?|??b|??3|
          #?|??c|??4|
          #?+---+---+
          #?+---+---+
          #?|?C1|?C2|
          #?+---+---+
          #?|??a|??1|
          #?|??b|??3|
          #?+---+---+
          #?+---+---+
          #?|?C1|?C2|
          #?+---+---+
          #?|??a|??1|
          #?|??c|??4|
          #?+---+---+
          #?+---+---+
          #?|?C1|?C2|
          #?+---+---+
          #?|??c|??4|
          #?+---+---+

          #?DataFrame.intersectAll
          #?返回兩個(gè)DataFrame的交集
          df1?=?spark.createDataFrame(
          ????????[("a",?1),?("a",?1),?("b",??3),?("c",?4)],?["C1",?"C2"])
          df2?=?spark.createDataFrame([("a",?1),?("b",?4)],?["C1",?"C2"])
          df1.intersectAll(df2).show()
          #?+---+---+
          #?|?C1|?C2|
          #?+---+---+
          #?|??a|??1|
          #?+---+---+

          #?DataFrame.drop
          #?丟棄指定列
          df.drop('age').show()

          #?DataFrame.withColumn
          #?新增列
          df1?=?df.withColumn("birth_year",?2021?-?df.age)
          df1.show()
          #?+-----+---+-----+----+----------+
          #?|?name|age|score|?sex|birth_year|
          #?+-----+---+-----+----+----------+
          #?|??Sam|?28|???88|???M|??????1993|
          #?|Flora|?28|???90|???F|??????1993|
          #?|??Run|??1|???60|null|??????2020|
          #?|Peter|?55|??100|???M|??????1966|
          #?|??Mei|?54|???95|???F|??????1967|
          #?+-----+---+-----+----+----------+

          #?DataFrame.withColumnRenamed
          #?重命名列名
          df1?=?df.withColumnRenamed("sex",?"gender")
          df1.show()
          #?+-----+---+-----+------+
          #?|?name|age|score|gender|
          #?+-----+---+-----+------+
          #?|??Sam|?28|???88|?????M|
          #?|Flora|?28|???90|?????F|
          #?|??Run|??1|???60|??null|
          #?|Peter|?55|??100|?????M|
          #?|??Mei|?54|???95|?????F|
          #?+-----+---+-----+------+


          #?DataFrame.dropna
          #?丟棄空值,DataFrame.dropna(how='any',?thresh=None,?subset=None)
          df.dropna(how='all',?subset=['sex']).show()
          #?+-----+---+-----+---+
          #?|?name|age|score|sex|
          #?+-----+---+-----+---+
          #?|??Sam|?28|???88|??M|
          #?|Flora|?28|???90|??F|
          #?|Peter|?55|??100|??M|
          #?|??Mei|?54|???95|??F|
          #?+-----+---+-----+---+

          #?DataFrame.fillna
          #?空值填充操作
          df1?=?spark.createDataFrame(
          ????????[("a",?None),?("a",?1),?(None,??3),?("c",?4)],?["C1",?"C2"])
          #?df2?=?df1.na.fill({"C1":?"d",?"C2":?99})
          df2?=?df1.fillna({"C1":?"d",?"C2":?99})
          df1.show()
          df2.show()

          #?DataFrame.filter
          #?根據(jù)條件過濾
          df.filter(df.age>50).show()
          #?+-----+---+-----+---+
          #?|?name|age|score|sex|
          #?+-----+---+-----+---+
          #?|Peter|?55|??100|??M|
          #?|??Mei|?54|???95|??F|
          #?+-----+---+-----+---+
          df.where(df.age==28).show()
          #?+-----+---+-----+---+
          #?|?name|age|score|sex|
          #?+-----+---+-----+---+
          #?|??Sam|?28|???88|??M|
          #?|Flora|?28|???90|??F|
          #?+-----+---+-----+---+
          df.filter("age<18").show()
          #?+----+---+-----+----+
          #?|name|age|score|?sex|
          #?+----+---+-----+----+
          #?|?Run|??1|???60|null|
          #?+----+---+-----+----+


          #?DataFrame.join
          #?這個(gè)不用多解釋了,直接上案例來看看具體的語法即可,DataFrame.join(other,?on=None,?how=None)
          df1?=?spark.createDataFrame(
          ????????[("a",?1),?("d",?1),?("b",??3),?("c",?4)],?["id",?"num1"])
          df2?=?spark.createDataFrame([("a",?1),?("b",?3)],?["id",?"num2"])
          df1.join(df2,?df1.id?==?df2.id,?'left').select(df1.id.alias("df1_id"),
          ???????????????????????????????????????????????df1.num1.alias("df1_num"),
          ???????????????????????????????????????????????df2.num2.alias("df2_num")
          ???????????????????????????????????????????????).sort(["df1_id"],?ascending=False)\
          ????.show()


          #?DataFrame.agg(*exprs)
          #?聚合數(shù)據(jù),可以寫多個(gè)聚合方法,如果不寫groupBy的話就是對整個(gè)DF進(jìn)行聚合
          #?DataFrame.alias
          #?設(shè)置列或者DataFrame別名
          #?DataFrame.groupBy
          #?根據(jù)某幾列進(jìn)行聚合,如有多列用列表寫在一起,如?df.groupBy(["sex",?"age"])
          df.groupBy("sex").agg(F.min(df.age).alias("最小年齡"),
          ??????????????????????F.expr("avg(age)").alias("平均年齡"),
          ??????????????????????F.expr("collect_list(name)").alias("姓名集合")
          ??????????????????????).show()
          #?+----+--------+--------+------------+
          #?|?sex|最小年齡|平均年齡|????姓名集合|
          #?+----+--------+--------+------------+
          #?|???F|??????28|????41.0|[Flora,?Mei]|
          #?|null|???????1|?????1.0|???????[Run]|
          #?|???M|??????28|????41.5|[Sam,?Peter]|
          #?+----+--------+--------+------------+


          #?DataFrame.foreach
          #?對每一行進(jìn)行函數(shù)方法的應(yīng)用
          def?f(person):
          ????print(person.name)
          df.foreach(f)
          #?Peter
          #?Run
          #?Sam
          #?Flora
          #?Mei

          #?DataFrame.replace
          #?修改df里的某些值
          df1?=?df.na.replace({"M":?"Male",?"F":?"Female"})
          df1.show()

          #?DataFrame.union
          #?相當(dāng)于SQL里的union?all操作
          df1?=?spark.createDataFrame(
          ????????[("a",?1),?("d",?1),?("b",??3),?("c",?4)],?["id",?"num"])
          df2?=?spark.createDataFrame([("a",?1),?("b",?3)],?["id",?"num"])
          df1.union(df2).show()
          df1.unionAll(df2).show()
          #?這里union沒有去重,不知道為啥,有知道的朋友麻煩解釋下,謝謝了。
          #?+---+---+
          #?|?id|num|
          #?+---+---+
          #?|??a|??1|
          #?|??d|??1|
          #?|??b|??3|
          #?|??c|??4|
          #?|??a|??1|
          #?|??b|??3|
          #?+---+---+

          #?DataFrame.unionByName
          #?根據(jù)列名來進(jìn)行合并數(shù)據(jù)集
          df1?=?spark.createDataFrame([[1,?2,?3]],?["col0",?"col1",?"col2"])
          df2?=?spark.createDataFrame([[4,?5,?6]],?["col1",?"col2",?"col0"])
          df1.unionByName(df2).show()
          #?+----+----+----+
          #?|col0|col1|col2|
          #?+----+----+----+
          #?|???1|???2|???3|
          #?|???6|???4|???5|
          #?+----+----+----+
          3. DataFrame的列操作APIs

          這里主要針對的是列進(jìn)行操作,比如說重命名、排序、空值判斷、類型判斷等,這里就不展開寫demo了,看看語法應(yīng)該大家都懂了。

          Column.alias(*alias,?**kwargs)??#?重命名列名
          Column.asc()??#?按照列進(jìn)行升序排序
          Column.desc()??#?按照列進(jìn)行降序排序
          Column.astype(dataType)??#?類型轉(zhuǎn)換
          Column.cast(dataType)??#?強(qiáng)制轉(zhuǎn)換類型
          Column.between(lowerBound,?upperBound)??#?返回布爾值,是否在指定區(qū)間范圍內(nèi)
          Column.contains(other)??#?是否包含某個(gè)關(guān)鍵詞
          Column.endswith(other)??#?以什么結(jié)束的值,如?df.filter(df.name.endswith('ice')).collect()
          Column.isNotNull()??#?篩選非空的行
          Column.isNull()
          Column.isin(*cols)??#?返回包含某些值的行?df[df.name.isin("Bob",?"Mike")].collect()
          Column.like(other)??#?返回含有關(guān)鍵詞的行
          Column.when(condition,?value)??#?給True的賦值
          Column.otherwise(value)??#?與when搭配使用,df.select(df.name,?F.when(df.age?>?3,?1).otherwise(0)).show()
          Column.rlike(other)??#?可以使用正則的匹配?df.filter(df.name.rlike('ice$')).collect()
          Column.startswith(other)??#?df.filter(df.name.startswith('Al')).collect()
          Column.substr(startPos,?length)??#?df.select(df.name.substr(1,?3).alias("col")).collect()
          4. DataFrame的一些思路變換操作APIs
          #?DataFrame.createOrReplaceGlobalTempView
          #?DataFrame.dropGlobalTempView
          #?創(chuàng)建全局的試圖,注冊后可以使用sql語句來進(jìn)行操作,生命周期取決于Spark?application本身
          df.createOrReplaceGlobalTempView("people")
          spark.sql("select?*?from?global_temp.people?where?sex?=?'M'?").show()
          #?+-----+---+-----+---+
          #?|?name|age|score|sex|
          #?+-----+---+-----+---+
          #?|??Sam|?28|???88|??M|
          #?|Peter|?55|??100|??M|
          #?+-----+---+-----+---+

          #?DataFrame.createOrReplaceTempView
          #?DataFrame.dropTempView
          #?創(chuàng)建本地臨時(shí)試圖,生命周期取決于用來創(chuàng)建此數(shù)據(jù)集的SparkSession
          df.createOrReplaceTempView("tmp_people")
          spark.sql("select?*?from?tmp_people?where?sex?=?'F'?").show()
          #?+-----+---+-----+---+
          #?|?name|age|score|sex|
          #?+-----+---+-----+---+
          #?|Flora|?28|???90|??F|
          #?|??Mei|?54|???95|??F|
          #?+-----+---+-----+---+

          #?DataFrame.cache\DataFrame.persist
          #?可以把一些數(shù)據(jù)放入緩存中,default?storage?level?(MEMORY_AND_DISK).
          df.cache()
          df.persist()
          df.unpersist()

          #?DataFrame.crossJoin
          #?返回兩個(gè)DataFrame的笛卡爾積關(guān)聯(lián)的DataFrame
          df1?=?df.select("name",?"sex")
          df2?=?df.select("name",?"sex")
          df3?=?df1.crossJoin(df2)
          print("表1的記錄數(shù)",?df1.count())
          print("表2的記錄數(shù)",?df2.count())
          print("笛卡爾積后的記錄數(shù)",?df3.count())
          #?表1的記錄數(shù)?5
          #?表2的記錄數(shù)?5
          #?笛卡爾積后的記錄數(shù)?25

          #?DataFrame.toPandas
          #?把SparkDataFrame轉(zhuǎn)為?Pandas的DataFrame
          df.toPandas()

          #?DataFrame.rdd
          #?把SparkDataFrame轉(zhuǎn)為rdd,這樣子可以用rdd的語法來操作數(shù)據(jù)
          df.rdd
          5. DataFrame的一些統(tǒng)計(jì)操作APIs
          #?DataFrame.cov
          #?計(jì)算指定兩列的樣本協(xié)方差
          df.cov("age",?"score")
          #?324.59999999999997

          #?DataFrame.corr
          #?計(jì)算指定兩列的相關(guān)系數(shù),DataFrame.corr(col1,?col2,?method=None),目前method只支持Pearson相關(guān)系數(shù)
          df.corr("age",?"score",?method="pearson")
          #?0.9319004030498815

          #?DataFrame.cube
          #?創(chuàng)建多維度聚合的結(jié)果,通常用于分析數(shù)據(jù),比如我們指定兩個(gè)列進(jìn)行聚合,比如name和age,那么這個(gè)函數(shù)返回的聚合結(jié)果會
          #?groupby("name",?"age")
          #?groupby("name")
          #?groupby("age")
          #?groupby(all)
          #?四個(gè)聚合結(jié)果的union?all?的結(jié)果

          df1?=?df.filter(df.name?!=?"Run")
          print(df1.show())
          df1.cube("name",?"sex").count().show()
          #?+-----+---+-----+---+
          #?|?name|age|score|sex|
          #?+-----+---+-----+---+
          #?|??Sam|?28|???88|??M|
          #?|Flora|?28|???90|??F|
          #?|Peter|?55|??100|??M|
          #?|??Mei|?54|???95|??F|
          #?+-----+---+-----+---+
          #?cube?聚合之后的結(jié)果
          #?+-----+----+-----+
          #?|?name|?sex|count|
          #?+-----+----+-----+
          #?|?null|???F|????2|
          #?|?null|null|????4|
          #?|Flora|null|????1|
          #?|Peter|null|????1|
          #?|?null|???M|????2|
          #?|Peter|???M|????1|
          #?|??Sam|???M|????1|
          #?|??Sam|null|????1|
          #?|??Mei|???F|????1|
          #?|??Mei|null|????1|
          #?|Flora|???F|????1|
          #?+-----+----+-----+

          保存數(shù)據(jù)/寫入數(shù)據(jù)庫

          這里的保存數(shù)據(jù)主要是保存到Hive中的栗子,主要包括了overwrite、append等方式。

          1. 當(dāng)結(jié)果集為SparkDataFrame的時(shí)候
          import?pandas?as?pd
          from?datetime?import?datetime
          from?pyspark?import?SparkConf
          from?pyspark?import?SparkContext
          from?pyspark.sql?import?HiveContext

          conf?=?SparkConf()\
          ??????.setAppName("test")\
          ??????.set("hive.exec.dynamic.partition.mode",?"nonstrict")?#?動態(tài)寫入hive分區(qū)表
          sc?=?SparkContext(conf=conf)
          hc?=?HiveContext(sc)
          sc.setLogLevel("ERROR")
          ????
          list_values?=?[['Sam',?28,?88],?['Flora',?28,?90],?['Run',?1,?60]]
          Spark_df?=?spark.createDataFrame(list_values,?['name',?'age',?'score'])
          print(Spark_df.show())
          save_table?=?"tmp.samshare_pyspark_savedata"

          #?方式1:直接寫入到Hive
          Spark_df.write.format("hive").mode("overwrite").saveAsTable(save_table)?#?或者改成append模式
          print(datetime.now().strftime("%y/%m/%d?%H:%M:%S"),?"測試數(shù)據(jù)寫入到表"?+?save_table)

          #?方式2:注冊為臨時(shí)表,使用SparkSQL來寫入分區(qū)表
          Spark_df.createOrReplaceTempView("tmp_table")
          write_sql?=?"""
          insert?overwrite?table?{0}?partitions?(pt_date='{1}')
          select?*?from?tmp_table
          """
          .format(save_table,?"20210520")
          hc.sql(write_sql)
          print(datetime.now().strftime("%y/%m/%d?%H:%M:%S"),?"測試數(shù)據(jù)寫入到表"?+?save_table)
          2. 當(dāng)結(jié)果集為Python的DataFrame的時(shí)候

          如果是Python的DataFrame,我們就需要多做一步把它轉(zhuǎn)換為SparkDataFrame,其余操作就一樣了。

          import?pandas?as?pd
          from?datetime?import?datetime
          from?pyspark?import?SparkConf
          from?pyspark?import?SparkContext
          from?pyspark.sql?import?HiveContext

          conf?=?SparkConf()\
          ??????.setAppName("test")\
          ??????.set("hive.exec.dynamic.partition.mode",?"nonstrict")?#?動態(tài)寫入hive分區(qū)表
          sc?=?SparkContext(conf=conf)
          hc?=?HiveContext(sc)
          sc.setLogLevel("ERROR")
          ????
          result_df?=?pd.DataFrame([1,2,3],?columns=['a'])
          save_table?=?"tmp.samshare_pyspark_savedata"

          #?獲取DataFrame的schema
          c1?=?list(result_df.columns)
          #?轉(zhuǎn)為SparkDataFrame
          result?=?hc.createDataFrame(result_df.astype(str),?c1)
          result.write.format("hive").mode("overwrite").saveAsTable(save_table)?#?或者改成append模式
          print(datetime.now().strftime("%y/%m/%d?%H:%M:%S"),?"測試數(shù)據(jù)寫入到表"?+?save_table)

          ?? Spark調(diào)優(yōu)思路

          這一小節(jié)的內(nèi)容算是對pyspark入門的一個(gè)ending了,全文主要是參考學(xué)習(xí)了美團(tuán)Spark性能優(yōu)化指南的基礎(chǔ)篇和高級篇內(nèi)容,主體脈絡(luò)和這兩篇文章是一樣的,只不過是基于自己學(xué)習(xí)后的理解進(jìn)行了一次總結(jié)復(fù)盤,而原文中主要是用Java來舉例的,我這邊主要用pyspark來舉例。文章主要會從4個(gè)方面(或者說4個(gè)思路)來優(yōu)化我們的Spark任務(wù),主要就是下面的圖片所示:

          18e8cd0291ab89bedb3fda6fa28eae73.webp

          開發(fā)習(xí)慣調(diào)優(yōu)

          1. 盡可能復(fù)用同一個(gè)RDD,避免重復(fù)創(chuàng)建,并且適當(dāng)持久化數(shù)據(jù)

          這種開發(fā)習(xí)慣是需要我們對于即將要開發(fā)的應(yīng)用邏輯有比較深刻的思考,并且可以通過code review來發(fā)現(xiàn)的,講白了就是要記得我們創(chuàng)建過啥數(shù)據(jù)集,可以復(fù)用的盡量廣播(broadcast)下,能很好提升性能。

          #?最低級寫法,相同數(shù)據(jù)集重復(fù)創(chuàng)建。
          rdd1?=?sc.textFile("./test/data/hello_samshare.txt",?4)?#?這里的?4?指的是分區(qū)數(shù)量
          rdd2?=?sc.textFile("./test/data/hello_samshare.txt",?4)?#?這里的?4?指的是分區(qū)數(shù)量
          print(rdd1.take(10))
          print(rdd2.map(lambda?x:x[0:1]).take(10))

          #?稍微進(jìn)階一些,復(fù)用相同數(shù)據(jù)集,但因中間結(jié)果沒有緩存,數(shù)據(jù)會重復(fù)計(jì)算
          rdd1?=?sc.textFile("./test/data/hello_samshare.txt",?4)?#?這里的?4?指的是分區(qū)數(shù)量
          print(rdd1.take(10))
          print(rdd1.map(lambda?x:x[0:1]).take(10))

          #?相對比較高效,使用緩存來持久化數(shù)據(jù)
          rdd?=?sc.parallelize(range(1,?11),?4).cache()??#?或者persist()
          rdd_map?=?rdd.map(lambda?x:?x*2)
          rdd_reduce?=?rdd.reduce(lambda?x,?y:?x+y)
          print(rdd_map.take(10))
          print(rdd_reduce)

          下面我們就來對比一下使用緩存能給我們的Spark程序帶來多大的效率提升吧,我們先構(gòu)造一個(gè)程序運(yùn)行時(shí)長測量器。

          import?time
          #?統(tǒng)計(jì)程序運(yùn)行時(shí)間
          def?time_me(info="used"):
          ????def?_time_me(fn):
          [email protected](fn)
          ????????def?_wrapper(*args,?**kwargs):
          ????????????start?=?time.time()
          ????????????fn(*args,?**kwargs)
          ????????????print("%s?%s?%s"?%?(fn.__name__,?info,?time.time()?-?start),?"second")
          ????????return?_wrapper
          ????return?_time_me

          下面我們運(yùn)行下面的代碼,看下使用了cache帶來的效率提升:

          @time_me()
          def?test(types=0):
          ????if?types?==?1:
          ????????print("使用持久化緩存")
          ????????rdd?=?sc.parallelize(range(1,?10000000),?4)
          ????????rdd1?=?rdd.map(lambda?x:?x*x?+?2*x?+?1).cache()??#?或者?persist(StorageLevel.MEMORY_AND_DISK_SER)
          ????????print(rdd1.take(10))
          ????????rdd2?=?rdd1.reduce(lambda?x,?y:?x+y)
          ????????rdd3?=?rdd1.reduce(lambda?x,?y:?x?+?y)
          ????????rdd4?=?rdd1.reduce(lambda?x,?y:?x?+?y)
          ????????rdd5?=?rdd1.reduce(lambda?x,?y:?x?+?y)
          ????????print(rdd5)
          ????else:
          ????????print("不使用持久化緩存")
          ????????rdd?=?sc.parallelize(range(1,?10000000),?4)
          ????????rdd1?=?rdd.map(lambda?x:?x?*?x?+?2?*?x?+?1)
          ????????print(rdd1.take(10))
          ????????rdd2?=?rdd1.reduce(lambda?x,?y:?x?+?y)
          ????????rdd3?=?rdd1.reduce(lambda?x,?y:?x?+?y)
          ????????rdd4?=?rdd1.reduce(lambda?x,?y:?x?+?y)
          ????????rdd5?=?rdd1.reduce(lambda?x,?y:?x?+?y)
          ????????print(rdd5)

          ????????
          test()???#?不使用持久化緩存
          time.sleep(10)
          test(1)??#?使用持久化緩存
          #?output:
          #?使用持久化緩存
          #?[4,?9,?16,?25,?36,?49,?64,?81,?100,?121]
          #?333333383333334999999
          #?test?used?26.36529278755188?second
          #?使用持久化緩存
          #?[4,?9,?16,?25,?36,?49,?64,?81,?100,?121]
          #?333333383333334999999
          #?test?used?17.49532413482666?second

          同時(shí)我們打開YARN日志來看看:http://localhost:4040/jobs/

          9edb181fb015edb2ac868499f1fdd1d0.webp

          因?yàn)槲覀兊拇a是需要重復(fù)調(diào)用RDD1的,當(dāng)沒有對RDD1進(jìn)行持久化的時(shí)候,每次當(dāng)它被action算子消費(fèi)了之后,就釋放了,等下一個(gè)算子計(jì)算的時(shí)候要用,就從頭開始計(jì)算一下RDD1。代碼中需要重復(fù)調(diào)用RDD1 五次,所以沒有緩存的話,差不多每次都要6秒,總共需要耗時(shí)26秒左右,但是,做了緩存,每次就只需要3s不到,總共需要耗時(shí)17秒左右。

          另外,這里需要提及一下一個(gè)知識點(diǎn),那就是持久化的級別,一般cache的話就是放入內(nèi)存中,就沒有什么好說的,需要講一下的就是另外一個(gè) persist(),它的持久化級別是可以被我們所配置的:

          持久化級別含義解釋
          MEMORY_ONLY將數(shù)據(jù)保存在內(nèi)存中。如果內(nèi)存不夠存放所有的數(shù)據(jù),則數(shù)據(jù)可能就不會進(jìn)行持久化。使用cache()方法時(shí),實(shí)際就是使用的這種持久化策略,性能也是最高的。
          MEMORY_AND_DISK優(yōu)先嘗試將數(shù)據(jù)保存在內(nèi)存中,如果內(nèi)存不夠存放所有的數(shù)據(jù),會將數(shù)據(jù)寫入磁盤文件中。
          MEMORY_ONLY_SER基本含義同MEMORY_ONLY。唯一的區(qū)別是,會將RDD中的數(shù)據(jù)進(jìn)行序列化,RDD的每個(gè)partition會被序列化成一個(gè)字節(jié)數(shù)組。這種方式更加節(jié)省內(nèi)存,從而可以避免持久化的數(shù)據(jù)占用過多內(nèi)存導(dǎo)致頻繁GC。
          MEMORY_AND_DISK_SER基本含義同MEMORY_AND_DISK。唯一的區(qū)別是會先序列化,節(jié)約內(nèi)存。
          DISK_ONLY使用未序列化的Java對象格式,將數(shù)據(jù)全部寫入磁盤文件中。一般不推薦使用。
          MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等.對于上述任意一種持久化策略,如果加上后綴_2,代表的是將每個(gè)持久化的數(shù)據(jù),都復(fù)制一份副本,并將副本保存到其他節(jié)點(diǎn)上。這種基于副本的持久化機(jī)制主要用于進(jìn)行容錯(cuò)。假如某個(gè)節(jié)點(diǎn)掛掉,節(jié)點(diǎn)的內(nèi)存或磁盤中的持久化數(shù)據(jù)丟失了,那么后續(xù)對RDD計(jì)算時(shí)還可以使用該數(shù)據(jù)在其他節(jié)點(diǎn)上的副本。如果沒有副本的話,就只能將這些數(shù)據(jù)從源頭處重新計(jì)算一遍了。一般也不推薦使用。
          2. 盡量避免使用低性能算子

          shuffle類算子算是低性能算子的一種代表,所謂的shuffle類算子,指的是會產(chǎn)生shuffle過程的操作,就是需要把各個(gè)節(jié)點(diǎn)上的相同key寫入到本地磁盤文件中,然后其他的節(jié)點(diǎn)通過網(wǎng)絡(luò)傳輸拉取自己需要的key,把相同key拉到同一個(gè)節(jié)點(diǎn)上進(jìn)行聚合計(jì)算,這種操作必然就是有大量的數(shù)據(jù)網(wǎng)絡(luò)傳輸與磁盤讀寫操作,性能往往不是很好的。

          那么,Spark中有哪些算子會產(chǎn)生shuffle過程呢?

          操作類別shuffle類算子備注
          分區(qū)操作repartition()、repartitionAndSortWithinPartitions()、coalesce(shuffle=true)重分區(qū)操作一般都會shuffle,因?yàn)樾枰獙λ械姆謪^(qū)數(shù)據(jù)進(jìn)行打亂。
          聚合操作reduceByKey、groupByKey、sortByKey需要對相同key進(jìn)行操作,所以需要拉到同一個(gè)節(jié)點(diǎn)上。
          關(guān)聯(lián)操作join類操作需要把相同key的數(shù)據(jù)shuffle到同一個(gè)節(jié)點(diǎn)然后進(jìn)行笛卡爾積
          去重操作distinct等需要對相同key進(jìn)行操作,所以需要shuffle到同一個(gè)節(jié)點(diǎn)上。
          排序操作sortByKey等需要對相同key進(jìn)行操作,所以需要shuffle到同一個(gè)節(jié)點(diǎn)上。

          這里進(jìn)一步介紹一個(gè)替代join的方案,因?yàn)閖oin其實(shí)在業(yè)務(wù)中還是蠻常見的。

          #?原則2:盡量避免使用低性能算子
          rdd1?=?sc.parallelize([('A1',?211),?('A1',?212),?('A2',?22),?('A4',?24),?('A5',?25)])
          rdd2?=?sc.parallelize([('A1',?11),?('A2',?12),?('A3',?13),?('A4',?14)])
          #?低效的寫法,也是傳統(tǒng)的寫法,直接join
          rdd_join?=?rdd1.join(rdd2)
          print(rdd_join.collect())
          #?[('A4',?(24,?14)),?('A2',?(22,?12)),?('A1',?(211,?11)),?('A1',?(212,?11))]
          rdd_left_join?=?rdd1.leftOuterJoin(rdd2)
          print(rdd_left_join.collect())
          #?[('A4',?(24,?14)),?('A2',?(22,?12)),?('A5',?(25,?None)),?('A1',?(211,?11)),?('A1',?(212,?11))]
          rdd_full_join?=?rdd1.fullOuterJoin(rdd2)
          print(rdd_full_join.collect())
          #?[('A4',?(24,?14)),?('A3',?(None,?13)),?('A2',?(22,?12)),?('A5',?(25,?None)),?('A1',?(211,?11)),?('A1',?(212,?11))]

          #?高效的寫法,使用廣播+map來實(shí)現(xiàn)相同效果
          #?tips1:?這里需要注意的是,用來broadcast的RDD不可以太大,最好不要超過1G
          #?tips2:?這里需要注意的是,用來broadcast的RDD不可以有重復(fù)的key的
          rdd1?=?sc.parallelize([('A1',?11),?('A2',?12),?('A3',?13),?('A4',?14)])
          rdd2?=?sc.parallelize([('A1',?211),?('A1',?212),?('A2',?22),?('A4',?24),?('A5',?25)])

          # step1:?先將小表進(jìn)行廣播,也就是collect到driver端,然后廣播到每個(gè)Executor中去。
          rdd_small_bc?=?sc.broadcast(rdd1.collect())

          # step2:從Executor中獲取存入字典便于后續(xù)map操作
          rdd_small_dict?=?dict(rdd_small_bc.value)

          # step3:定義join方法
          def?broadcast_join(line,?rdd_small_dict,?join_type):
          ????k?=?line[0]
          ????v?=?line[1]
          ????small_table_v?=?rdd_small_dict[k]?if?k?in?rdd_small_dict?else?None
          ????if?join_type?==?'join':
          ????????return?(k,?(v,?small_table_v))?if?k?in?rdd_small_dict?else?None
          ????elif?join_type?==?'left_join':
          ????????return?(k,?(v,?small_table_v?if?small_table_v?is?not?None?else?None))
          ????else:
          ????????print("not?support?join?type!")

          # step4:使用 map 實(shí)現(xiàn)?兩個(gè)表join的功能
          rdd_join?=?rdd2.map(lambda?line:?broadcast_join(line,?rdd_small_dict,?"join")).filter(lambda?line:?line?is?not?None)
          rdd_left_join?=?rdd2.map(lambda?line:?broadcast_join(line,?rdd_small_dict,?"left_join")).filter(lambda?line:?line?is?not?None)
          print(rdd_join.collect())
          print(rdd_left_join.collect())
          #?[('A1',?(211,?11)),?('A1',?(212,?11)),?('A2',?(22,?12)),?('A4',?(24,?14))]
          #?[('A1',?(211,?11)),?('A1',?(212,?11)),?('A2',?(22,?12)),?('A4',?(24,?14)),?('A5',?(25,?None))]

          上面的RDD join被改寫為 broadcast+map的PySpark版本實(shí)現(xiàn),不過里面有兩個(gè)點(diǎn)需要注意:

          • tips1: 用來broadcast的RDD不可以太大,最好不要超過1G
          • tips2: 用來broadcast的RDD不可以有重復(fù)的key
          3. 盡量使用高性能算子

          上一節(jié)講到了低效算法,自然地就會有一些高效的算子。

          原算子高效算子(替換算子)說明
          mapmapPartitions直接map的話,每次只會處理一條數(shù)據(jù),而mapPartitions則是每次處理一個(gè)分區(qū)的數(shù)據(jù),在某些場景下相對比較高效。(分區(qū)數(shù)據(jù)量不大的情況下使用,如果有數(shù)據(jù)傾斜的話容易發(fā)生OOM)
          groupByKeyreduceByKey/aggregateByKey這類算子會在原節(jié)點(diǎn)先map-side預(yù)聚合,相對高效些。
          foreachforeachPartitions同第一條記錄一樣。
          filterfilter+coalesce當(dāng)我們對數(shù)據(jù)進(jìn)行filter之后,有很多partition的數(shù)據(jù)會劇減,然后直接進(jìn)行下一步操作的話,可能就partition數(shù)量很多但處理的數(shù)據(jù)又很少,task數(shù)量沒有減少,反而整體速度很慢;但如果執(zhí)行了coalesce算子,就會減少一些partition數(shù)量,把數(shù)據(jù)都相對壓縮到一起,用更少的task處理完全部數(shù)據(jù),一定場景下還是可以達(dá)到整體性能的提升。
          repartition+sortrepartitionAndSortWithinPartitions直接用就是了。
          4. 廣播大變量

          如果我們有一個(gè)數(shù)據(jù)集很大,并且在后續(xù)的算子執(zhí)行中會被反復(fù)調(diào)用,那么就建議直接把它廣播(broadcast)一下。當(dāng)變量被廣播后,會保證每個(gè)executor的內(nèi)存中只會保留一份副本,同個(gè)executor內(nèi)的task都可以共享這個(gè)副本數(shù)據(jù)。如果沒有廣播,常規(guī)過程就是把大變量進(jìn)行網(wǎng)絡(luò)傳輸?shù)矫恳粋€(gè)相關(guān)task中去,這樣子做,一來頻繁的網(wǎng)絡(luò)數(shù)據(jù)傳輸,效率極其低下;二來executor下的task不斷存儲同一份大數(shù)據(jù),很有可能就造成了內(nèi)存溢出或者頻繁GC,效率也是極其低下的。

          #?原則4:廣播大變量
          rdd1?=?sc.parallelize([('A1',?11),?('A2',?12),?('A3',?13),?('A4',?14)])
          rdd1_broadcast?=?sc.broadcast(rdd1.collect())
          print(rdd1.collect())
          print(rdd1_broadcast.value)
          #?[('A1',?11),?('A2',?12),?('A3',?13),?('A4',?14)]
          #?[('A1',?11),?('A2',?12),?('A3',?13),?('A4',?14)]

          資源參數(shù)調(diào)優(yōu)

          如果要進(jìn)行資源調(diào)優(yōu),我們就必須先知道Spark運(yùn)行的機(jī)制與流程。

          22c0e31404d8a9df3d396ec1c752490f.webp

          下面我們就來講解一些常用的Spark資源配置的參數(shù)吧,了解其參數(shù)原理便于我們依據(jù)實(shí)際的數(shù)據(jù)情況進(jìn)行配置。

          1)num-executors

          指的是執(zhí)行器的數(shù)量,數(shù)量的多少代表了并行的stage數(shù)量(假如executor是單核的話),但也并不是越多越快,受你集群資源的限制,所以一般設(shè)置50-100左右吧。

          2)executor-memory

          這里指的是每一個(gè)執(zhí)行器的內(nèi)存大小,內(nèi)存越大當(dāng)然對于程序運(yùn)行是很好的了,但是也不是無節(jié)制地大下去,同樣受我們集群資源的限制。假設(shè)我們集群資源為500core,一般1core配置4G內(nèi)存,所以集群最大的內(nèi)存資源只有2000G左右。num-executors x executor-memory 是不能超過2000G的,但是也不要太接近這個(gè)值,不然的話集群其他同事就沒法正常跑數(shù)據(jù)了,一般我們設(shè)置4G-8G。

          3)executor-cores

          這里設(shè)置的是executor的CPU core數(shù)量,決定了executor進(jìn)程并行處理task的能力。

          4)driver-memory

          設(shè)置driver的內(nèi)存,一般設(shè)置2G就好了。但如果想要做一些Python的DataFrame操作可以適當(dāng)?shù)匕堰@個(gè)值設(shè)大一些。

          5)driver-cores

          與executor-cores類似的功能。

          6)spark.default.parallelism

          設(shè)置每個(gè)stage的task數(shù)量。一般Spark任務(wù)我們設(shè)置task數(shù)量在500-1000左右比較合適,如果不去設(shè)置的話,Spark會根據(jù)底層HDFS的block數(shù)量來自行設(shè)置task數(shù)量。有的時(shí)候會設(shè)置得偏少,這樣子程序就會跑得很慢,即便你設(shè)置了很多的executor,但也沒有用。

          下面說一個(gè)基本的參數(shù)設(shè)置的shell腳本,一般我們都是通過一個(gè)shell腳本來設(shè)置資源參數(shù)配置,接著就去調(diào)用我們的主函數(shù)。

          #!/bin/bash
          basePath=$(cd?"$(dirname?)"$(cd?"$(dirname?"$0"):?pwd)")":?pwd)

          spark-submit?\
          ????--master?yarn?\
          ????--queue?samshare?\
          ????--deploy-mode?client?\
          ????--num-executors?100?\
          ????--executor-memory?4G?\
          ????--executor-cores?4?\
          ????--driver-memory?2G?\
          ????--driver-cores?2?\
          ????--conf?spark.default.parallelism=1000?\
          ????--conf?spark.yarn.executor.memoryOverhead=8G?\
          ????--conf?spark.sql.shuffle.partitions=1000?\
          ????--conf?spark.network.timeout=1200?\
          ????--conf?spark.python.worker.memory=64m?\
          ????--conf?spark.sql.catalogImplementation=hive?\
          ????--conf?spark.sql.crossJoin.enabled=True?\
          ????--conf?spark.dynamicAllocation.enabled=True?\
          ????--conf?spark.shuffle.service.enabled=True?\
          ????--conf?spark.scheduler.listenerbus.eventqueue.size=100000?\
          ????--conf?spark.pyspark.driver.python=python3?\
          ????--conf?spark.pyspark.python=python3?\
          ????--conf?spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3?\
          ????--conf?spark.sql.pivotMaxValues=500000?\
          ????--conf?spark.hadoop.hive.exec.dynamic.partition=True?\
          ????--conf?spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict?\
          ????--conf?spark.hadoop.hive.exec.max.dynamic.partitions.pernode=100000?\
          ????--conf?spark.hadoop.hive.exec.max.dynamic.partitions=100000?\
          ????--conf?spark.hadoop.hive.exec.max.created.files=100000?\
          ????${bashPath}/project_name/main.py?$v_var1?$v_var2

          數(shù)據(jù)傾斜調(diào)優(yōu)

          相信我們對于數(shù)據(jù)傾斜并不陌生了,很多時(shí)間數(shù)據(jù)跑不出來有很大的概率就是出現(xiàn)了數(shù)據(jù)傾斜,在Spark開發(fā)中無法避免的也會遇到這類問題,而這不是一個(gè)嶄新的問題,成熟的解決方案也是有蠻多的,今天來簡單介紹一些比較常用并且有效的方案。

          首先我們要知道,在Spark中比較容易出現(xiàn)傾斜的操作,主要集中在distinct、groupByKey、reduceByKey、aggregateByKey、join、repartition等,可以優(yōu)先看這些操作的前后代碼。而為什么使用了這些操作就容易導(dǎo)致數(shù)據(jù)傾斜呢?大多數(shù)情況就是進(jìn)行操作的key分布不均,然后使得大量的數(shù)據(jù)集中在同一個(gè)處理節(jié)點(diǎn)上,從而發(fā)生了數(shù)據(jù)傾斜。

          查看Key 分布
          #?針對Spark?SQL
          hc.sql("select?key,?count(0)?nums?from?table_name?group?by?key")

          #?針對RDD
          RDD.countByKey()
          Plan A: 過濾掉導(dǎo)致傾斜的key

          這個(gè)方案并不是所有場景都可以使用的,需要結(jié)合業(yè)務(wù)邏輯來分析這個(gè)key到底還需要不需要,大多數(shù)情況可能就是一些異常值或者空串,這種就直接進(jìn)行過濾就好了。

          Plan B: 提前處理聚合

          如果有些Spark應(yīng)用場景需要頻繁聚合數(shù)據(jù),而數(shù)據(jù)key又少的,那么我們可以把這些存量數(shù)據(jù)先用hive算好(每天算一次),然后落到中間表,后續(xù)Spark應(yīng)用直接用聚合好的表+新的數(shù)據(jù)進(jìn)行二度聚合,效率會有很高的提升。

          Plan C: 調(diào)高shuffle并行度
          #?針對Spark?SQL?
          --conf?spark.sql.shuffle.partitions=1000??#?在配置信息中設(shè)置參數(shù)
          #?針對RDD
          rdd.reduceByKey(1000)?#?默認(rèn)是200
          Plan D: 分配隨機(jī)數(shù)再聚合

          大概的思路就是對一些大量出現(xiàn)的key,人工打散,從而可以利用多個(gè)task來增加任務(wù)并行度,以達(dá)到效率提升的目的,下面是代碼demo,分別從RDD 和 SparkSQL來實(shí)現(xiàn)。

          #?Way1:?PySpark?RDD實(shí)現(xiàn)
          import?pyspark
          from?pyspark?import?SparkContext,?SparkConf,?HiveContext
          from?random?import?randint
          import?pandas?as?pd

          # SparkSQL的許多功能封裝在SparkSession的方法接口中, SparkContext則不行的。
          from?pyspark.sql?import?SparkSession
          spark?=?SparkSession.builder?\
          ????.appName("sam_SamShare")?\
          ????.config("master",?"local[4]")?\
          ????.enableHiveSupport()?\
          ????.getOrCreate()

          conf?=?SparkConf().setAppName("test_SamShare").setMaster("local[4]")
          sc?=?SparkContext(conf=conf)
          hc?=?HiveContext(sc)

          #?分配隨機(jī)數(shù)再聚合
          rdd1?=?sc.parallelize([('sam',?1),?('sam',?1),?('sam',?1),?('sam',?1),?('sam',?1),?('sam',?1)])

          #?給key分配隨機(jī)數(shù)后綴
          rdd2?=?rdd1.map(lambda?x:?(x[0]?+?"_"?+?str(randint(1,5)),?x[1]))
          print(rdd.take(10))
          #?[('sam_5',?1),?('sam_5',?1),?('sam_3',?1),?('sam_5',?1),?('sam_5',?1),?('sam_3',?1)]

          #?局部聚合
          rdd3?=?rdd2.reduceByKey(lambda?x,y?:?(x+y))
          print(rdd3.take(10))
          #?[('sam_5',?4),?('sam_3',?2)]

          #?去除后綴
          rdd4?=?rdd3.map(lambda?x:?(x[0][:-2],?x[1]))
          print(rdd4.take(10))
          #?[('sam',?4),?('sam',?2)]

          #?全局聚合
          rdd5?=?rdd4.reduceByKey(lambda?x,y?:?(x+y))
          print(rdd5.take(10))
          #?[('sam',?6)]


          #?Way2:?PySpark?SparkSQL實(shí)現(xiàn)
          df?=?pd.DataFrame(5*[['Sam',?1],['Flora',?1]],
          ??????????????????columns=['name',?'nums'])
          Spark_df?=?spark.createDataFrame(df)
          print(Spark_df.show(10))

          Spark_df.createOrReplaceTempView("tmp_table")?#?注冊為視圖供SparkSQl使用

          sql?=?"""
          with?t1?as?(
          ????select?concat(name,"_",int(10*rand()))?as?new_name,?name,?nums
          ????from?tmp_table
          ),
          t2?as?(
          ????select?new_name,?sum(nums)?as?n
          ????from?t1
          ????group?by?new_name
          ),
          t3?as?(
          ????select?substr(new_name,0,length(new_name)?-2)?as?name,?sum(n)?as?nums_sum?
          ????from?t2
          ????group?by?substr(new_name,0,length(new_name)?-2)
          )
          select?*
          from?t3
          """

          tt?=?hc.sql(sql).toPandas()
          tt

          下面是原理圖。

          06cd1951174fe5b1679c9c4fbf3dc97f.webp


          全文終!

          如果想下載PDF,可以在后臺輸入 “pyspark” 獲取

          ??學(xué)習(xí)資源推薦

          1)edureka about PySpark Tutorial

          印度老哥的課程,B站可直接看,不過口音略難聽懂不過還好有字幕。

          https://www.bilibili.com/video/BV1i4411i79a?p=1

          2)eat_pyspark_in_10_days

          梁云大哥的課程,講得超級清晰,建議精讀。

          https://github.com/lyhue1991/eat_pyspark_in_10_days

          3)官方文檔

          http://spark.apache.org/docs/latest/api/python/reference/index.html

          4)《Spark性能優(yōu)化指南——基礎(chǔ)篇》

          https://tech.meituan.com/2016/04/29/spark-tuning-basic.html

          5)《Spark性能優(yōu)化指南——高級篇》

          https://tech.meituan.com/2016/05/12/spark-tuning-pro.html


          瀏覽 58
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲少妞视频 | 色婷婷丁香五月亚洲 | 男女日逼视频 | 亚洲色综合视频 | 国产欧美毛片 |