<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          2小時(shí)入門Spark之RDD編程

          共 11617字,需瀏覽 24分鐘

           ·

          2020-12-29 18:33

          公眾號后臺回復(fù)關(guān)鍵字:pyspark,獲取本項(xiàng)目github地址。

          本節(jié)將介紹RDD數(shù)據(jù)結(jié)構(gòu)的常用函數(shù)。包括如下內(nèi)容:

          • 創(chuàng)建RDD
          • 常用Action操作
          • 常用Transformation操作
          • 常用PairRDD的轉(zhuǎn)換操作
          • 緩存操作
          • 共享變量
          • 分區(qū)操作

          這些函數(shù)中,我最常用的是如下15個(gè)函數(shù),需要認(rèn)真掌握其用法。

          • map
          • flatMap
          • mapPartitions
          • filter
          • count
          • reduce
          • take
          • saveAsTextFile
          • collect
          • join
          • union
          • persist
          • repartition
          • reduceByKey
          • aggregateByKey

          import?findspark

          #指定spark_home為剛才的解壓路徑,指定python路徑
          spark_home?=?"/Users/liangyun/ProgramFiles/spark-3.0.1-bin-hadoop3.2"
          python_path?=?"/Users/liangyun/anaconda3/bin/python"
          findspark.init(spark_home,python_path)

          import?pyspark?
          from?pyspark?import?SparkContext,?SparkConf
          conf?=?SparkConf().setAppName("rdd_tutorial").setMaster("local[4]")
          sc?=?SparkContext(conf=conf)

          print(pyspark.__version__)
          3.0.1

          一,創(chuàng)建RDD

          創(chuàng)建RDD主要有兩種方式,一個(gè)是textFile加載本地或者集群文件系統(tǒng)中的數(shù)據(jù),

          第二個(gè)是用parallelize方法將Driver中的數(shù)據(jù)結(jié)構(gòu)并行化成RDD。

          #從本地文件系統(tǒng)中加載數(shù)據(jù)
          file?=?"./data/hello.txt"
          rdd?=?sc.textFile(file,3)
          rdd.collect()
          ['hello?world',
          ?'hello?spark',
          ?'spark?love?jupyter',
          ?'spark?love?pandas',
          ?'spark?love?sql']
          #從集群文件系統(tǒng)中加載數(shù)據(jù)
          #file?=?"hdfs://localhost:9000/user/hadoop/data.txt"
          #也可以省去hdfs://localhost:9000
          #rdd?=?sc.textFile(file,3)
          #parallelize將Driver中的數(shù)據(jù)結(jié)構(gòu)生成RDD,第二個(gè)參數(shù)指定分區(qū)數(shù)
          rdd?=?sc.parallelize(range(1,11),2)
          rdd.collect()
          [1,?2,?3,?4,?5,?6,?7,?8,?9,?10]

          二,常用Action操作

          Action操作將觸發(fā)基于RDD依賴關(guān)系的計(jì)算。

          collect

          rdd?=?sc.parallelize(range(10),5)?
          #collect操作將數(shù)據(jù)匯集到Driver,數(shù)據(jù)過大時(shí)有超內(nèi)存風(fēng)險(xiǎn)
          all_data?=?rdd.collect()
          all_data
          [0,?1,?2,?3,?4,?5,?6,?7,?8,?9]

          take

          #take操作將前若干個(gè)數(shù)據(jù)匯集到Driver,相比collect安全
          rdd?=?sc.parallelize(range(10),5)?
          part_data?=?rdd.take(4)
          part_data
          [0,?1,?2,?3]

          takeSample

          #takeSample可以隨機(jī)取若干個(gè)到Driver,第一個(gè)參數(shù)設(shè)置是否放回抽樣
          rdd?=?sc.parallelize(range(10),5)?
          sample_data?=?rdd.takeSample(False,10,0)
          sample_data
          [7,?8,?1,?5,?3,?4,?2,?0,?9,?6]

          first

          #first取第一個(gè)數(shù)據(jù)
          rdd?=?sc.parallelize(range(10),5)?
          first_data?=?rdd.first()
          print(first_data)
          0

          count

          #count查看RDD元素?cái)?shù)量
          rdd?=?sc.parallelize(range(10),5)
          data_count?=?rdd.count()
          print(data_count)
          10

          reduce

          #reduce利用二元函數(shù)對數(shù)據(jù)進(jìn)行規(guī)約
          rdd?=?sc.parallelize(range(10),5)?
          rdd.reduce(lambda?x,y:x+y)

          45

          foreach

          #foreach對每一個(gè)元素執(zhí)行某種操作,不生成新的RDD
          #累加器用法詳見共享變量
          rdd?=?sc.parallelize(range(10),5)?
          accum?=?sc.accumulator(0)
          rdd.foreach(lambda?x:accum.add(x))
          print(accum.value)
          45

          countByKey

          #countByKey對Pair?RDD按key統(tǒng)計(jì)數(shù)量
          pairRdd?=?sc.parallelize([(1,1),(1,4),(3,9),(2,16)])?
          pairRdd.countByKey()
          defaultdict(int,?{1:?2,?3:?1,?2:?1})

          saveAsTextFile

          #saveAsTextFile保存rdd成text文件到本地
          text_file?=?"./data/rdd.txt"
          rdd?=?sc.parallelize(range(5))
          rdd.saveAsTextFile(text_file)

          #重新讀入會被解析文本
          rdd_loaded?=?sc.textFile(file)
          rdd_loaded.collect()
          ['2',?'3',?'4',?'1',?'0']

          三,常用Transformation操作

          Transformation轉(zhuǎn)換操作具有懶惰執(zhí)行的特性,它只指定新的RDD和其父RDD的依賴關(guān)系,只有當(dāng)Action操作觸發(fā)到該依賴的時(shí)候,它才被計(jì)算。

          map

          #map操作對每個(gè)元素進(jìn)行一個(gè)映射轉(zhuǎn)換
          rdd?=?sc.parallelize(range(10),3)
          rdd.collect()
          [0,?1,?2,?3,?4,?5,?6,?7,?8,?9]
          rdd.map(lambda?x:x**2).collect()
          [0,?1,?4,?9,?16,?25,?36,?49,?64,?81]

          filter

          #filter應(yīng)用過濾條件過濾掉一些數(shù)據(jù)
          rdd?=?sc.parallelize(range(10),3)
          rdd.filter(lambda?x:x>5).collect()
          [6,?7,?8,?9]

          flatMap

          #flatMap操作執(zhí)行將每個(gè)元素生成一個(gè)Array后壓平
          rdd?=?sc.parallelize(["hello?world","hello?China"])
          rdd.map(lambda?x:x.split("?")).collect()
          [['hello',?'world'],?['hello',?'China']]
          rdd.flatMap(lambda?x:x.split("?")).collect()
          ['hello',?'world',?'hello',?'China']

          sample

          #sample對原rdd在每個(gè)分區(qū)按照比例進(jìn)行抽樣,第一個(gè)參數(shù)設(shè)置是否可以重復(fù)抽樣
          rdd?=?sc.parallelize(range(10),1)
          rdd.sample(False,0.5,0).collect()
          [1,?4,?9]

          distinct

          #distinct去重
          rdd?=?sc.parallelize([1,1,2,2,3,3,4,5])
          rdd.distinct().collect()
          [4,?1,?5,?2,?3]

          subtract

          #subtract找到屬于前一個(gè)rdd而不屬于后一個(gè)rdd的元素
          a?=?sc.parallelize(range(10))
          b?=?sc.parallelize(range(5,15))
          a.subtract(b).collect()
          [0,?1,?2,?3,?4]

          union

          #union合并數(shù)據(jù)
          a?=?sc.parallelize(range(5))
          b?=?sc.parallelize(range(3,8))
          a.union(b).collect()
          [0,?1,?2,?3,?4,?3,?4,?5,?6,?7]

          intersection

          #intersection求交集
          a?=?sc.parallelize(range(1,6))
          b?=?sc.parallelize(range(3,9))
          a.intersection(b).collect()
          [3,?4,?5]

          cartesian

          #cartesian笛卡爾積
          boys?=?sc.parallelize(["LiLei","Tom"])
          girls?=?sc.parallelize(["HanMeiMei","Lily"])
          boys.cartesian(girls).collect()

          [('LiLei',?'HanMeiMei'),
          ?('LiLei',?'Lily'),
          ?('Tom',?'HanMeiMei'),
          ?('Tom',?'Lily')]

          sortBy

          #按照某種方式進(jìn)行排序
          #指定按照第3個(gè)元素大小進(jìn)行排序
          rdd?=?sc.parallelize([(1,2,3),(3,2,2),(4,1,1)])
          rdd.sortBy(lambda?x:x[2]).collect()

          [(4,?1,?1),?(3,?2,?2),?(1,?2,?3)]

          zip

          #按照拉鏈方式連接兩個(gè)RDD,效果類似python的zip函數(shù)
          #需要兩個(gè)RDD具有相同的分區(qū),每個(gè)分區(qū)元素?cái)?shù)量相同

          rdd_name?=?sc.parallelize(["LiLei","Hanmeimei","Lily"])
          rdd_age?=?sc.parallelize([19,18,20])

          rdd_zip?=?rdd_name.zip(rdd_age)
          print(rdd_zip.collect())
          [('LiLei',?19),?('Hanmeimei',?18),?('Lily',?20)]

          zipWithIndex

          #將RDD和一個(gè)從0開始的遞增序列按照拉鏈方式連接。
          rdd_name?=??sc.parallelize(["LiLei","Hanmeimei","Lily","Lucy","Ann","Dachui","RuHua"])
          rdd_index?=?rdd_name.zipWithIndex()
          print(rdd_index.collect())
          [('LiLei',?0),?('Hanmeimei',?1),?('Lily',?2),?('Lucy',?3),?('Ann',?4),?('Dachui',?5),?('RuHua',?6)]

          四,常用PairRDD的轉(zhuǎn)換操作

          PairRDD指的是數(shù)據(jù)為長度為2的tuple類似(k,v)結(jié)構(gòu)的數(shù)據(jù)類型的RDD,其每個(gè)數(shù)據(jù)的第一個(gè)元素被當(dāng)做key,第二個(gè)元素被當(dāng)做value.

          reduceByKey

          #reduceByKey對相同的key對應(yīng)的values應(yīng)用二元?dú)w并操作
          rdd?=?sc.parallelize([("hello",1),("world",2),
          ???????????????????????????????("hello",3),("world",5)])
          rdd.reduceByKey(lambda?x,y:x+y).collect()
          [('hello',?4),?('world',?7)]

          groupByKey

          #groupByKey將相同的key對應(yīng)的values收集成一個(gè)Iterator
          rdd?=?sc.parallelize([("hello",1),("world",2),("hello",3),("world",5)])
          rdd.groupByKey().collect()
          [('hello',?),
          ?('world',?)]

          sortByKey

          #sortByKey按照key排序,可以指定是否降序
          rdd?=?sc.parallelize([("hello",1),("world",2),
          ???????????????????????????????("China",3),("Beijing",5)])
          rdd.sortByKey(False).collect()
          [('world',?2),?('hello',?1),?('China',?3),?('Beijing',?5)]

          join

          #join相當(dāng)于根據(jù)key進(jìn)行內(nèi)連接
          age?=?sc.parallelize([("LiLei",18),
          ????????????????????????("HanMeiMei",16),("Jim",20)])
          gender?=?sc.parallelize([("LiLei","male"),
          ????????????????????????("HanMeiMei","female"),("Lucy","female")])
          age.join(gender).collect()

          [('LiLei',?(18,?'male')),?('HanMeiMei',?(16,?'female'))]

          leftOuterJoin和rightOuterJoin

          #leftOuterJoin相當(dāng)于關(guān)系表的左連接

          age?=?sc.parallelize([("LiLei",18),
          ????????????????????????("HanMeiMei",16)])
          gender?=?sc.parallelize([("LiLei","male"),
          ????????????????????????("HanMeiMei","female"),("Lucy","female")])
          age.leftOuterJoin(gender).collect()

          [('LiLei',?(18,?'male')),?('HanMeiMei',?(16,?'female'))]
          #rightOuterJoin相當(dāng)于關(guān)系表的右連接
          age?=?sc.parallelize([("LiLei",18),
          ????????????????????????("HanMeiMei",16),("Jim",20)])
          gender?=?sc.parallelize([("LiLei","male"),
          ????????????????????????("HanMeiMei","female")])
          age.rightOuterJoin(gender).collect()

          [('LiLei',?(18,?'male')),?('HanMeiMei',?(16,?'female'))]

          cogroup

          #cogroup相當(dāng)于對兩個(gè)輸入分別goupByKey然后再對結(jié)果進(jìn)行g(shù)roupByKey

          x?=?sc.parallelize([("a",1),("b",2),("a",3)])
          y?=?sc.parallelize([("a",2),("b",3),("b",5)])

          result?=?x.cogroup(y).collect()
          print(result)
          print(list(result[0][1][0]))
          [('a',?(,?)),?('b',?(,?))]
          [1,?3]

          subtractByKey

          #subtractByKey去除x中那些key也在y中的元素

          x?=?sc.parallelize([("a",1),("b",2),("c",3)])
          y?=?sc.parallelize([("a",2),("b",(1,2))])

          x.subtractByKey(y).collect()
          [('c',?3)]

          foldByKey

          #foldByKey的操作和reduceByKey類似,但是要提供一個(gè)初始值
          x?=?sc.parallelize([("a",1),("b",2),("a",3),("b",5)],1)
          x.foldByKey(1,lambda?x,y:x*y).collect()

          [('a',?3),?('b',?10)]

          五,緩存操作

          如果一個(gè)rdd被多個(gè)任務(wù)用作中間量,那么對其進(jìn)行cache緩存到內(nèi)存中對加快計(jì)算會非常有幫助。

          聲明對一個(gè)rdd進(jìn)行cache后,該rdd不會被立即緩存,而是等到它第一次被計(jì)算出來時(shí)才進(jìn)行緩存。

          可以使用persist明確指定存儲級別,常用的存儲級別是MEMORY_ONLY和EMORY_AND_DISK。

          如果一個(gè)RDD后面不再用到,可以用unpersist釋放緩存,unpersist是立即執(zhí)行的。

          緩存數(shù)據(jù)不會切斷血緣依賴關(guān)系,這是因?yàn)榫彺鏀?shù)據(jù)某些分區(qū)所在的節(jié)點(diǎn)有可能會有故障,例如內(nèi)存溢出或者節(jié)點(diǎn)損壞。

          這時(shí)候可以根據(jù)血緣關(guān)系重新計(jì)算這個(gè)分區(qū)的數(shù)據(jù)。

          #cache緩存到內(nèi)存中,使用存儲級別 MEMORY_ONLY。
          #MEMORY_ONLY意味著如果內(nèi)存存儲不下,放棄存儲其余部分,需要時(shí)重新計(jì)算。
          a?=?sc.parallelize(range(10000),5)
          a.cache()
          sum_a?=?a.reduce(lambda?x,y:x+y)
          cnt_a?=?a.count()
          mean_a?=?sum_a/cnt_a

          print(mean_a)

          #persist緩存到內(nèi)存或磁盤中,默認(rèn)使用存儲級別MEMORY_AND_DISK
          #MEMORY_AND_DISK意味著如果內(nèi)存存儲不下,其余部分存儲到磁盤中。
          #persist可以指定其它存儲級別,cache相當(dāng)于persist(MEMORY_ONLY)
          from??pyspark.storagelevel?import?StorageLevel
          a?=?sc.parallelize(range(10000),5)
          a.persist(StorageLevel.MEMORY_AND_DISK)
          sum_a?=?a.reduce(lambda?x,y:x+y)
          cnt_a?=?a.count()
          mean_a?=?sum_a/cnt_a

          a.unpersist()?#立即釋放緩存
          print(mean_a)

          六,共享變量

          當(dāng)spark集群在許多節(jié)點(diǎn)上運(yùn)行一個(gè)函數(shù)時(shí),默認(rèn)情況下會把這個(gè)函數(shù)涉及到的對象在每個(gè)節(jié)點(diǎn)生成一個(gè)副本。

          但是,有時(shí)候需要在不同節(jié)點(diǎn)或者節(jié)點(diǎn)和Driver之間共享變量。

          Spark提供兩種類型的共享變量,廣播變量和累加器。

          廣播變量是不可變變量,實(shí)現(xiàn)在不同節(jié)點(diǎn)不同任務(wù)之間共享數(shù)據(jù)。

          廣播變量在每個(gè)機(jī)器上緩存一個(gè)只讀的變量,而不是為每個(gè)task生成一個(gè)副本,可以減少數(shù)據(jù)的傳輸。

          累加器主要是不同節(jié)點(diǎn)和Driver之間共享變量,只能實(shí)現(xiàn)計(jì)數(shù)或者累加功能。

          累加器的值只有在Driver上是可讀的,在節(jié)點(diǎn)上不可見。

          #廣播變量?broadcast?不可變,在所有節(jié)點(diǎn)可讀

          broads?=?sc.broadcast(100)

          rdd?=?sc.parallelize(range(10))
          print(rdd.map(lambda?x:x+broads.value).collect())

          print(broads.value)
          [100,?101,?102,?103,?104,?105,?106,?107,?108,?109]
          100
          #累加器?只能在Driver上可讀,在其它節(jié)點(diǎn)只能進(jìn)行累加

          total?=?sc.accumulator(0)
          rdd?=?sc.parallelize(range(10),3)

          rdd.foreach(lambda?x:total.add(x))
          total.value
          45
          #?計(jì)算數(shù)據(jù)的平均值
          rdd?=?sc.parallelize([1.1,2.1,3.1,4.1])
          total?=?sc.accumulator(0.1)
          count?=?sc.accumulator(0)

          def?func(x):
          ????total.add(x)
          ????count.add(1)
          ????
          rdd.foreach(func)

          total.value/count.value
          2.625

          七,分區(qū)操作

          分區(qū)操作包括改變分區(qū)操作,以及針對分區(qū)執(zhí)行的一些轉(zhuǎn)換操作。

          glom:將一個(gè)分區(qū)內(nèi)的數(shù)據(jù)轉(zhuǎn)換為一個(gè)列表作為一行。

          coalesce:shuffle可選,默認(rèn)為False情況下窄依賴,不能增加分區(qū)。repartition和partitionBy調(diào)用它實(shí)現(xiàn)。

          repartition:按隨機(jī)數(shù)進(jìn)行shuffle,相同key不一定在同一個(gè)分區(qū)

          partitionBy:按key進(jìn)行shuffle,相同key放入同一個(gè)分區(qū)

          HashPartitioner:默認(rèn)分區(qū)器,根據(jù)key的hash值進(jìn)行分區(qū),相同的key進(jìn)入同一分區(qū),效率較高,key不可為Array.

          RangePartitioner:只在排序相關(guān)函數(shù)中使用,除相同的key進(jìn)入同一分區(qū),相鄰的key也會進(jìn)入同一分區(qū),key必須可排序。

          TaskContext: ?獲取當(dāng)前分區(qū)id方法 TaskContext.get.partitionId

          mapPartitions:每次處理分區(qū)內(nèi)的一批數(shù)據(jù),適合需要分批處理數(shù)據(jù)的情況,比如將數(shù)據(jù)插入某個(gè)表,每批數(shù)據(jù)只需要開啟一次數(shù)據(jù)庫連接,大大減少了連接開支

          mapPartitionsWithIndex:類似mapPartitions,提供了分區(qū)索引,輸入?yún)?shù)為(i,Iterator)

          foreachPartition:類似foreach,但每次提供一個(gè)Partition的一批數(shù)據(jù)

          glom

          #glom將一個(gè)分區(qū)內(nèi)的數(shù)據(jù)轉(zhuǎn)換為一個(gè)列表作為一行。
          a?=?sc.parallelize(range(10),2)
          b?=?a.glom()
          b.collect()?
          [[0,?1,?2,?3,?4],?[5,?6,?7,?8,?9]]

          coalesce

          #coalesce?默認(rèn)shuffle為False,不能增加分區(qū),只能減少分區(qū)
          #如果要增加分區(qū),要設(shè)置shuffle?=?true
          #parallelize等許多操作可以指定分區(qū)數(shù)
          a?=?sc.parallelize(range(10),3)??
          print(a.getNumPartitions())
          print(a.glom().collect())

          3
          [[0,?1,?2],?[3,?4,?5],?[6,?7,?8,?9]]
          b?=?a.coalesce(2)?
          print(b.glom().collect())
          [[0,?1,?2],?[3,?4,?5,?6,?7,?8,?9]]

          repartition

          #repartition按隨機(jī)數(shù)進(jìn)行shuffle,相同key不一定在一個(gè)分區(qū),可以增加分區(qū)
          #repartition實(shí)際上調(diào)用coalesce實(shí)現(xiàn),設(shè)置了shuffle?=?True
          a?=?sc.parallelize(range(10),3)??
          c?=?a.repartition(4)?
          print(c.glom().collect())

          [[6,?7,?8,?9],?[3,?4,?5],?[],?[0,?1,?2]]
          #repartition按隨機(jī)數(shù)進(jìn)行shuffle,相同key不一定在一個(gè)分區(qū)
          a?=?sc.parallelize([("a",1),("a",1),("a",2),("c",3)])??
          c?=?a.repartition(2)
          print(c.glom().collect())
          [[('a',?1),?('a',?2),?('c',?3)],?[('a',?1)]]

          partitionBy

          #partitionBy按key進(jìn)行shuffle,相同key一定在一個(gè)分區(qū)
          a?=?sc.parallelize([("a",1),("a",1),("a",2),("c",3)])??
          c?=?a.partitionBy(2)
          print(c.glom().collect())

          mapPartitions

          #mapPartitions可以對每個(gè)分區(qū)分別執(zhí)行操作
          #每次處理分區(qū)內(nèi)的一批數(shù)據(jù),適合需要按批處理數(shù)據(jù)的情況
          #例如將數(shù)據(jù)寫入數(shù)據(jù)庫時(shí),可以極大的減少連接次數(shù)。
          #mapPartitions的輸入分區(qū)內(nèi)數(shù)據(jù)組成的Iterator,其輸出也需要是一個(gè)Iterator
          #以下例子查看每個(gè)分區(qū)內(nèi)的數(shù)據(jù),相當(dāng)于用mapPartitions實(shí)現(xiàn)了glom的功能。
          a?=?sc.parallelize(range(10),2)
          a.mapPartitions(lambda?it:iter([list(it)])).collect()
          [[0,?1,?2,?3,?4],?[5,?6,?7,?8,?9]]

          mapPartitionsWithIndex

          #mapPartitionsWithIndex可以獲取兩個(gè)參數(shù)
          #即分區(qū)id和每個(gè)分區(qū)內(nèi)的數(shù)據(jù)組成的Iterator
          a?=?sc.parallelize(range(11),2)

          def?func(pid,it):
          ????s?=?sum(it)
          ????return(iter([str(pid)?+?"|"?+?str(s)]))
          ????[str(pid)?+?"|"?+?str]
          b?=?a.mapPartitionsWithIndex(func)
          b.collect()
          #利用TaskContext可以獲取當(dāng)前每個(gè)元素的分區(qū)
          from?pyspark.taskcontext?import?TaskContext
          a?=?sc.parallelize(range(5),3)
          c?=?a.map(lambda?x:(TaskContext.get().partitionId(),x))
          c.collect()

          [(0,?0),?(1,?1),?(1,?2),?(2,?3),?(2,?4)]

          foreachPartitions

          #foreachPartition對每個(gè)分區(qū)分別執(zhí)行操作
          #范例:求每個(gè)分區(qū)內(nèi)最大值的和
          total?=?sc.accumulator(0.0)

          a?=?sc.parallelize(range(1,101),3)

          def?func(it):
          ????total.add(max(it))
          ????
          a.foreachPartition(func)
          total.value
          199.0

          aggregate

          #aggregate是一個(gè)Action操作
          #aggregate比較復(fù)雜,先對每個(gè)分區(qū)執(zhí)行一個(gè)函數(shù),再對每個(gè)分區(qū)結(jié)果執(zhí)行一個(gè)合并函數(shù)。
          #例子:求元素之和以及元素個(gè)數(shù)
          #三個(gè)參數(shù),第一個(gè)參數(shù)為初始值,第二個(gè)為分區(qū)執(zhí)行函數(shù),第三個(gè)為結(jié)果合并執(zhí)行函數(shù)。
          rdd?=?sc.parallelize(range(1,21),3)
          def?inner_func(t,x):
          ????return((t[0]+x,t[1]+1))

          def?outer_func(p,q):
          ????return((p[0]+q[0],p[1]+q[1]))

          rdd.aggregate((0,0),inner_func,outer_func)

          (210,?20)

          aggregateByKey

          #aggregateByKey的操作和aggregate類似,但是會對每個(gè)key分別進(jìn)行操作
          #第一個(gè)參數(shù)為初始值,第二個(gè)參數(shù)為分區(qū)內(nèi)歸并函數(shù),第三個(gè)參數(shù)為分區(qū)間歸并函數(shù)

          a?=?sc.parallelize([("a",1),("b",1),("c",2),
          ?????????????????????????????("a",2),("b",3)],3)
          b?=?a.aggregateByKey(0,lambda?x,y:max(x,y),
          ????????????????????????????lambda?x,y:max(x,y))
          b.collect()
          [('b',?3),?('a',?2),?('c',?2)]


          e61a2fb3ffd175084ef9c782da9689a4.webp

          公眾號后臺回復(fù)關(guān)鍵字:pyspark,獲取本項(xiàng)目github地址。


          如果覺得這道RDD大餐味道還不錯(cuò)的話,?歡迎點(diǎn)個(gè)再看,分享給你的朋友們喲。??



          瀏覽 71
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  在线高清无码不卡 | 大鸡巴欧美 | 男女啪啪国产免费网站 | 亚洲精品乱码久久久久久蜜桃不卡 | 4438全国成人 |