<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spark Transformations 算子

          共 16056字,需瀏覽 33分鐘

           ·

          2024-04-11 10:51

          下面對相關(guān)常用算子進(jìn)行演示。

          三、Transformations 算子

          1. map

          RDD 中的數(shù)據(jù) 一對一 的轉(zhuǎn)為另一種形式:

          例如:

          • scala:
                
                val num = sc.parallelize(Seq(1, 2, 3, 4, 5))
          println(
            num.map(_+1).collect().toList
          )

          • java:
                
                JavaRDD<Integer> num = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
          System.out.println(
                 num.map(i -> i + 1).collect()
          );

          • python:
                
                num = sc.parallelize((1, 2, 3, 4, 5))
          print(
              num.map(lambda i:i+1).collect()
          )

          bd872f20d1f419cc13de8ca67770a0de.webp

          2. flatMap

          Map 算子類似,但是 FlatMap 是一對多,并都轉(zhuǎn)化為一維數(shù)據(jù):

          例如:

          • scala:
                
                val text = sc.parallelize(Seq("abc def""hello word""dfg,okh""he,word"))
          println(
            text.flatMap(_.split(" ")).flatMap(_.split(",")).collect().toList
          )

          • java:
                
                JavaRDD<String> text = sc.parallelize(Arrays.asList("abc def""hello word""dfg,okh""he,word"));
          System.out.println(
                  text.flatMap(s ->Arrays.asList(s.split(" ")).iterator())
                          .flatMap(s ->Arrays.asList(s.split(",")).iterator())
                          .collect()
          );

          • python:
                
                text = sc.parallelize(("abc def""hello word""dfg,okh""he,word"))
          print(
              text.flatMap(lambda s: s.split(" ")).flatMap(lambda s: s.split(",")).collect()
          )

          301407e77a8a33989b661bfdeb9531bb.webp

          3. filter

          過濾掉不需要的內(nèi)容:

          例如:

          • scala:
                
                val text = sc.parallelize(Seq("hello""hello""word""word"))
          println(
            text.filter(_.equals("hello")).collect().toList
          )

          • java:
                
                JavaRDD<String> text = sc.parallelize(Arrays.asList("hello""hello""word""word"));
          System.out.println(
                  text.filter(s -> Objects.equals(s,"hello"))
                          .collect()
          );

          • python:
                
                text = sc.parallelize(("hello""hello""word""word"))
          print(
              text.filter(lambda s: s == 'hello').collect()
          )

          b657425fc0db190f1bf8c89c7acdff0d.webp

          4. mapPartitions

          map 類似,針對整個(gè)分區(qū)的數(shù)據(jù)轉(zhuǎn)換,拿到的是每個(gè)分區(qū)的集合:

          例如:

          • scala:
                
                val text = sc.parallelize(Seq("hello""hello""word""word"), 2)
          println(
            text.mapPartitions(iter => {<!-- -->
              iter.map(_ + "333")
            }).collect().toList
          )

          • java:
                
                JavaRDD<String> text = sc.parallelize(Arrays.asList("hello""hello""word""word"), 2);
          System.out.println(
                  text.mapPartitions(iter -> {<!-- -->
                      List<String> list = new ArrayList<>();
                      iter.forEachRemaining(s -> list.add(s+"333"));
                      return list.iterator();
                  }).collect()
          );

          • python:
                
                 text = sc.parallelize(("hello""hello""word""word"), 2)
           
           def partition(par):
               tmpArr = []
               for s in par:
                   tmpArr.append(s + "333")
               return tmpArr

           print(
               text.mapPartitions(partition).collect()
           )

          957c23570fe9f2c1f614d505a23d5d3c.webp

          5. mapPartitionsWithIndex

          mapPartitions 類似, 只是在函數(shù)中增加了分區(qū)的 Index

          例如:

          • scala:
                
                val text = sc.parallelize(Seq("hello""hello""word""word"), 2)
          println(
            text.mapPartitionsWithIndex((index, iter) => {<!-- -->
              println("當(dāng)前分區(qū)" + index)
              iter.map(_ + "333")
            }, true).collect().toList
          )

          • java:
                
                JavaRDD<String> text = sc.parallelize(Arrays.asList("hello""hello""word""word"), 2);
          System.out.println(
                 text.mapPartitionsWithIndex((index, iter) -> {<!-- -->
                     System.out.println("當(dāng)前分區(qū)" + index);
                     List<String> list = new ArrayList<>();
                     iter.forEachRemaining(s -> list.add(s + "333"));
                     return list.iterator();
                 }, true).collect()
          );

          • python:
                
                text = sc.parallelize(("hello""hello""word""word"), 2)

          def partition(index, par):
              print("當(dāng)前分區(qū)" + str(index))
              tmpArr = []
              for s in par:
                  tmpArr.append(s + "333")
              return tmpArr

          print(
              text.mapPartitionsWithIndex(partition).collect()
          )

          fa4f803213527e674db8c638d196da17.webp

          6. mapValues

          只能作用于 Key-Value 型數(shù)據(jù), 和 Map 類似, 也是使用函數(shù)按照轉(zhuǎn)換數(shù)據(jù), 不同點(diǎn)是 MapValues 只轉(zhuǎn)換 Key-Value 中的 Value

          例如:

          • scala:
                
                val text = sc.parallelize(Seq("abc""bbb""ccc""dd"))
          println(
            text.map((_, "v" + _))
              .mapValues(_ + "66")
              .collect().toList
          )

          • java:
                
                JavaRDD<String> text = sc.parallelize(Arrays.asList("abc""bbb""ccc""dd"));
          System.out.println(
                 text.mapToPair(s -> new Tuple2<>(s, "v" + s))
                         .mapValues(v -> v + "66").collect()
          );

          • python:
                
                text = sc.parallelize(("abc""bbb""ccc""dd"))
          print(
              text.map(lambda s: (s, "v" + s)).mapValues(lambda v: v + "66").collect()
          )

          21fa756da7f491064beeaab51a9e5cac.webp

          7. sample

          可以從一個(gè)數(shù)據(jù)集中抽樣出來一部分, 常用作于減小數(shù)據(jù)集以保證運(yùn)行速度, 并且盡可能少規(guī)律的損失:

          第一個(gè)參數(shù)為withReplacement, 意為是否取樣以后是否還放回原數(shù)據(jù)集供下次使用, 簡單的說,如果這個(gè)參數(shù)的值為 true, 則抽樣出來的數(shù)據(jù)集中可能會有重復(fù)。

          第二個(gè)參數(shù)為fraction, 意為抽樣的比例。

          第三個(gè)參數(shù)為seed, 隨機(jī)數(shù)種子, 用于 Sample 內(nèi)部隨機(jī)生成下標(biāo),一般不指定,使用默認(rèn)值。

          例如:

          • scala:
                
                val num = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
          println(
            num.sample(true,0.6,2)
              .collect().toList
          )

          • java:
                
                JavaRDD<Integer> num = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
          System.out.println(
              num.sample(true, 0.6, 2).collect()
          );

          • python:
                
                num = sc.parallelize((1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
          print(
              num.sample(True, 0.6, 2).collect()
          )

          0e99fd459da1f475dad1e0b275db51ad.webp

          8. union

          兩個(gè)數(shù)據(jù)并集,類似于數(shù)據(jù)庫的 union

          例如:

          • scala:
                
                val text1 = sc.parallelize(Seq("aa""bb"))
          val text2 = sc.parallelize(Seq("cc""dd"))
          println(
            text1.union(text2).collect().toList
          )

          • java:
                
                JavaRDD<String> text1 = sc.parallelize(Arrays.asList("aa""bb"));
          JavaRDD<String> text2 = sc.parallelize(Arrays.asList("cc""dd"));
          System.out.println(
                  text1.union(text2).collect()
          );

          • python:
                
                text1 = sc.parallelize(("aa""bb"))
          text2 = sc.parallelize(("cc""dd"))
          print(
             text1.union(text2).collect()
          )

          aaf4ee4dacf3798524463ea3a9089d35.webp

          9. join,leftOuterJoin,rightOuterJoin

          兩個(gè)(key,value)數(shù)據(jù)集,根據(jù) key 取連接、左連接、右連接,類似數(shù)據(jù)庫中的連接:

          例如:

          • scala:
                
                val s1 = sc.parallelize(Seq("1,3""2,6""3,8""4,2"))
          val s2 = sc.parallelize(Seq("1,小明""2,小張""3,小李""4,小紅""5,張三"))

          val s3 = s1.map(s => (s.split(",")(0), s.split(",")(0)))
          val s4 = s2.map(s => (s.split(",")(0), s.split(",")(1)))

          println(s3.join(s4).collectAsMap)
          println(s3.leftOuterJoin(s4).collectAsMap)
          println(s3.rightOuterJoin(s4).collectAsMap)

          • java:
                
                JavaRDD<String> s1 = sc.parallelize(Arrays.asList("1,3""2,6""3,8""4,2"));
          JavaRDD<String> s2 = sc.parallelize(Arrays.asList("1,小明""2,小張""3,小李""4,小紅""5,張三"));

          JavaPairRDD<String, String> s3 = s1.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));
          JavaPairRDD<String, String> s4 = s2.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));

          System.out.println(s3.join(s4).collectAsMap());
          System.out.println(s3.leftOuterJoin(s4).collectAsMap());
          System.out.println(s3.rightOuterJoin(s4).collectAsMap());

          • python:
                
                s1 = sc.parallelize(("1,3""2,6""3,8""4,2"))
          s2 = sc.parallelize(("1,小明""2,小張""3,小李""4,小紅""5,張三"))

          s3 = s1.map(lambda s:(s.split(",")[0], s.split(",")[0]))
          s4 = s2.map(lambda s:(s.split(",")[0], s.split(",")[1]))

          print(s3.join(s4).collectAsMap())
          print(s3.leftOuterJoin(s4).collectAsMap())
          print(s3.rightOuterJoin(s4).collectAsMap())

          676ef863c47a9e9366e4e228d504e5ce.webp

          10. intersection

          獲取兩個(gè)集合的交集 :

          例如:

          • scala:
                
                val s1 = sc.parallelize(Seq("abc""dfe""hello"))
          val s2 = sc.parallelize(Seq("fgh""nbv""hello""word""jkl""abc"))
          println(
            s1.intersection(s2).collect().toList
          )

          • java:
                
                JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc""dfe""hello"));
          JavaRDD<String> s2 = sc.parallelize(Arrays.asList("fgh""nbv""hello""word""jkl""abc"));
          System.out.println(
               s1.intersection(s2).collect()
          );

          • python:
                
                s1 = sc.parallelize(("abc""dfe""hello"))
          s2 = sc.parallelize(("fgh""nbv""hello""word""jkl""abc"))
          print(
              s1.intersection(s2).collect()
          )

          8c677304f883e79473c2c7df92162a76.webp

          11. subtract

          獲取差集,a - b ,取 a 集合中 b 集合沒有的元素:

          例如:

          • scala:
                
                val s1 = sc.parallelize(Seq("abc""dfe""hello"))
          val s2 = sc.parallelize(Seq("fgh""nbv""hello""word""jkl""abc"))
          println(
            s1.subtract(s2).collect().toList
          )

          • java:
                
                JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc""dfe""hello"));
          JavaRDD<String> s2 = sc.parallelize(Arrays.asList("fgh""nbv""hello""word""jkl""abc"));
          System.out.println(
                  s1.subtract(s2).collect()
          );

          • python:
                
                s1 = sc.parallelize(("abc""dfe""hello"))
          s2 = sc.parallelize(("fgh""nbv""hello""word""jkl""abc"))
          print(
              s1.subtract(s2).collect()
          )

          a7a13d9f7dd31339a88e0425dd00f2c4.webp

          12. distinct

          元素去重,是一個(gè)需要 Shuffled 的操作:

          例如:

          • scala:
                
                val s1 = sc.parallelize(Seq("abc""abc""hello""hello""word""word"))
          println(
            s1.distinct().collect().toList
          )

          • java:
                
                JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc""abc""hello""hello""word""word"));
          System.out.println(
              s1.distinct().collect()
          );

          • python:
                
                s1 = sc.parallelize(("abc""abc""hello""hello""word""word"))
          print(
            s1.distinct().collect()
          )

          641f2f04339d6cf8d6f5382147674d7b.webp

          13. reduceByKey

          只能作用于 Key-Value 型數(shù)據(jù),根據(jù) Key 分組生成一個(gè) Tuple,然后針對每個(gè)組執(zhí)行 reduce 算子,傳入兩個(gè)參數(shù),一個(gè)是當(dāng)前值,一個(gè)是局部匯總,這個(gè)函數(shù)需要有一個(gè)輸出, 輸出就是這個(gè) Key 的匯總結(jié)果,是一個(gè)需要 Shuffled 的操作:

          例如:

          • scala:
                
                val s1 = sc.parallelize(Seq("abc""abc""hello""hello""word""word"))
          println(
            s1.map((_, 1))
              .reduceByKey(Integer.sum)
              .collectAsMap
          )

          • java:
                
                JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc""abc""hello""hello""word""word"));
          System.out.println(
             s1.mapToPair(s -> new Tuple2<>(s, 1))
                     .reduceByKey(Integer::sum)
                     .collectAsMap()
          );

          • python:
                
                s1 = sc.parallelize(("abc""abc""hello""hello""word""word"))
          print(
            s1.map(lambda s: (s, 1))
                .reduceByKey(lambda v1, v2: v1 + v2)
                .collectAsMap()
          )

          4d59bf8faa6ee8e3a012ffcc9a0ac2eb.webp

          14. groupByKey

          只能作用于 Key-Value 型數(shù)據(jù),根據(jù) Key 分組, 和 ReduceByKey 有點(diǎn)類似, 但是 GroupByKey 并不求聚合, 只是列舉 Key 對應(yīng)的所有 Value,是一個(gè)需要 Shuffled 的操作。

          GroupByKeyReduceByKey 不同,因?yàn)樾枰信e Key 對應(yīng)的所有數(shù)據(jù), 所以無法在 Map 端做 Combine, 所以 GroupByKey 的性能并沒有 ReduceByKey 好:

          例如:

          • scala:
                
                val s1 = sc.parallelize(Seq("abc""abc""hello""hello""word""word"))
          println(
            s1.map((_, 1))
              .groupByKey()
              .collectAsMap
          )

          • java:
                
                JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc""abc""hello""hello""word""word"));
          System.out.println(
                  s1.mapToPair(s -> new Tuple2<>(s, 1))
                          .groupByKey()
                          .collectAsMap()
          );

          • python:
                
                s1 = sc.parallelize(("abc""abc""hello""hello""word""word"))
          print(
              s1.map(lambda s: (s, 1))
                  .reduceByKey()
                  .collectAsMap()
          )


          瀏覽 83
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  美国 日本 韩国三级三级三级黄色A在线播放 | 亚洲家庭乱轮五月天 | 五月婷婷网站 | 日本电影一区二区三区 | 人人艹人人摸人人 |