<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spark Action 算子

          共 8469字,需瀏覽 17分鐘

           ·

          2024-04-11 09:45

          四、Action 算子

          1. reduce

          對(duì)整個(gè)結(jié)果集規(guī)約, 最終生成一條數(shù)據(jù), 是整個(gè)數(shù)據(jù)集的匯總。

          reducereduceByKey 完全不同, reduce 是一個(gè) action, 并不是 Shuffled 操作,本質(zhì)上 reduce 就是現(xiàn)在每個(gè) partition 上求值, 最終把每個(gè) partition 的結(jié)果再匯總。

          例如:

          • scala:
                
                var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
          println(
            p1.reduce((_+_))
          )

          • java:
                
                JavaRDD<Integer> p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
          System.out.println(
              p1.reduce(Integer::sum)
          );

          • python:
                
                 p1 = sc.parallelize((1, 2, 3, 4, 6))
           print(
               p1.reduce(lambda i1, i2: i1 + i2)
           )

          eb78bcb36d354b19cdcfb26383bc334d.webp

          2. collect

          以數(shù)組的形式返回?cái)?shù)據(jù)集中所有元素。例如:

          • scala:
                
                var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
          println(
            p1.collect()
          )

          • java:
                
                JavaRDD<Integer> p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
          System.out.println(
                 p1.collect()
          );

          • python:
                
                p1 = sc.parallelize((1, 2, 3, 4, 6))
          print(
              p1.collect()
          )

          9f913085586d0c50cd890efb26501935.webp

          3. count

          數(shù)據(jù)元素個(gè)數(shù):

          例如:

          • scala:
                
                var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
          println(
           p1.count()
          )

          • java:
                
                JavaRDD<Integer> p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
          System.out.println(
                p1.count()
          );

          • python:
                
                p1 = sc.parallelize((1, 2, 3, 4, 6))
          print(
             p1.count()
          )

          3a9c74b06a964dbdc3f052ba6d7aa379.webp

          4. first

          返回第一個(gè)元素:

          例如:

          • scala:
                
                var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
          println(
            p1.first()
          )

          • java:
                
                JavaRDD<Integer> p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
          System.out.println(
                p1.first()
          );

          • python:
                
                p1 = sc.parallelize((1, 2, 3, 4, 6))
          print(
              p1.first()
          )

          3be6278561daf4fe77c09f5b1f7718f5.webp

          5. countByKey

          求得整個(gè)數(shù)據(jù)集中 Key 以及對(duì)應(yīng) Key 出現(xiàn)的次數(shù):

          例如:

          • scala:
                
                val s1 = sc.parallelize(Seq("abc""abc""hello""hello""word""word"))
          println(
           s1.map((_,1)).countByKey()
          )

          • java:
                
                JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc""abc""hello""hello""word""word"))
          System.out.println(
                s1.mapToPair(s -> new Tuple2<>(s, 1)).countByKey()
          );

          • python:
                
                s1 = sc.parallelize(("abc""abc""hello""hello""word""word"))
          print(
              s1.map(lambda s: (s, 1)).countByKey()
          )

          6a2298d704dd6b93a2428c6f7ebc858d.webp

          6. take

          返回前 N 個(gè)元素:

          例如:

          • scala:
                
                val s1 = sc.parallelize(Seq("abc""abc""hello""hello""word""word"))
          println(
            s1.take(3)
          )

          • java:
                
                JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc""abc""hello""hello""word""word"));
          System.out.println(
                 s1.take(3)
          );

          • python:
                
                s1 = sc.parallelize(("abc""abc""hello""hello""word""word"))
          print(
              s1.take(3)
          )

          f322434da3c754d8fa85825445b6d895.webp

          7. saveAsTextFile

          將結(jié)果存入 path 對(duì)應(yīng)的目錄中:

          例如:

          • scala:
                
                 val s1 = sc.parallelize(Seq("abc""abc""hello""hello""word""word"))
           s1.saveAsTextFile("D:/test/output/text/")

          • java:
                
                JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc""abc""hello""hello""word""word"));
          s1.saveAsTextFile("D:/test/output/text/");

          • python:
                
                s1 = sc.parallelize(("abc""abc""hello""hello""word""word"))
          s1.saveAsTextFile("D:/test/output/text/")

          3b3b22f4dc24c48a8d58e86784c82ca6.webp

          8. lookup

          根據(jù) key 查詢對(duì)應(yīng)的 value

          例如:

          • scala:
                
                val s1 = sc.parallelize(Seq("小明:15.5""小明:13.3""張三:14.4""張三:37.6""李四:95.9""李四:45.4"))
          println(
           s1.map(s=>(s.split(":")(0),s.split(":")(1).toDouble))
             .lookup("小明").toList
          )

          • java:
                
                JavaRDD<String> s1 = sc.parallelize(Arrays.asList("小明:15.5""小明:13.3""張三:14.4""張三:37.6""李四:95.9""李四:45.4"));
          System.out.println(
              s1.mapToPair(s -> new Tuple2<>(s.split(":")[0], Double.parseDouble(s.split(":")[1])))
                      .lookup("小明")
          );

          • python:
                
                s1 = sc.parallelize(("小明:15.5""小明:13.3""張三:14.4""張三:37.6""李四:95.9""李四:45.4"))
          print(
             s1.map(lambda s: (s.split(":")[0], float(s.split(":")[1])))
                 .lookup("小明")
          )

          4913c05871954840ab82bdd57c4ca890.webp

          五、補(bǔ)充算子

          1. RDD 持久化

          對(duì)于需要復(fù)用的RDD,可以進(jìn)行緩存,已防止重復(fù)計(jì)算,持久化主要有三個(gè)算子,cache、persist、Checkpoint,其中persist可以指定存儲(chǔ)的類(lèi)型,是硬盤(pán)還是內(nèi)存,cache 底層調(diào)用的 persist 默認(rèn)存儲(chǔ)在內(nèi)存中 ,Checkpoint 則可以存儲(chǔ)在 HDFS 中:

          例如:

          • scala:
                
                val s1 = sc.parallelize(Seq("小明:15.5""小明:13.3""張三:14.4""張三:37.6""李四:95.9""李四:45.4"))
          //緩存
          s1.cache // 底層調(diào)用的 persist
          //持久化
          s1.persist(StorageLevel.MEMORY_AND_DISK) //使用內(nèi)存和磁盤(pán)(內(nèi)存不夠時(shí)才使用磁盤(pán))
          s1.persist(StorageLevel.MEMORY_ONLY) //持久化到內(nèi)存
          // Checkpoint  應(yīng)使用Checkpoint把數(shù)據(jù)發(fā)在HDFS上
          sc.setCheckpointDir("/data/spark/") //實(shí)際中寫(xiě)HDFS目錄
          s1.checkpoint()
          //清空緩存
          s1.unpersist()

          • java:
                
                JavaRDD<String> s1 = sc.parallelize(Arrays.asList("小明:15.5""小明:13.3""張三:14.4""張三:37.6""李四:95.9""李四:45.4"));
          //緩存
          s1.cache(); // 底層調(diào)用的 persist
          //持久化
          s1.persist(StorageLevel.MEMORY_AND_DISK()); //使用內(nèi)存和磁盤(pán)(內(nèi)存不夠時(shí)才使用磁盤(pán))
          s1.persist(StorageLevel.MEMORY_ONLY()); //持久化到內(nèi)存
          // Checkpoint  應(yīng)使用Checkpoint把數(shù)據(jù)發(fā)在HDFS上
          sc.setCheckpointDir("/data/spark/");//實(shí)際中寫(xiě)HDFS目錄
          s1.checkpoint();
          //清空緩存
          s1.unpersist();

          • python:
                
                s1 = sc.parallelize(("小明:15.5""小明:13.3""張三:14.4""張三:37.6""李四:95.9""李四:45.4"))
          # 緩存
          s1.cache() # 底層調(diào)用的persist
          # 持久化
          s1.persist(StorageLevel.MEMORY_AND_DISK) # 使用內(nèi)存和磁盤(pán)(內(nèi)存不夠時(shí)才使用磁盤(pán))
          s1.persist(StorageLevel.MEMORY_ONLY) # 持久化到內(nèi)存
          # Checkpoint 使用Checkpoint把數(shù)據(jù)發(fā)在HDFS上
          sc.setCheckpointDir("/data/spark/"# 實(shí)際中寫(xiě)HDFS目錄
          s1.checkpoint()
          # 清空緩存
          s1.unpersist()

          2. 共享變量,累加器

          支持在所有 不同節(jié)點(diǎn)上進(jìn)行全局累加計(jì)算:

          例如:

          • scala:
                
                //創(chuàng)建一個(gè)計(jì)數(shù)器/累加器
          var ljq = sc.longAccumulator("mycounter")
          ljq.add(2)
          println(ljq.value)

          • java:
                
                SparkContext sparkContext = JavaSparkContext.toSparkContext(sc);
          //創(chuàng)建一個(gè)計(jì)數(shù)器/累加器
          LongAccumulator ljq = sparkContext.longAccumulator("mycounter");
          ljq.add(2);
          System.out.println(ljq.value());

          • python:
                
                ljq = sc.accumulator("mycounter")
          ljq.add(2)
          print(ljq.value)

          3. 共享變量,廣播變量

          支持在所有 不同節(jié)點(diǎn)上進(jìn)行全局累加計(jì)算:

          例如:

          • scala:
                
                val list = Seq(1, 2, 3, 4, 6)
          val broadcast = sc.broadcast(list)
          val value = broadcast.value
          println(value.toList)

          • java:
                
                List<Integer> list = Arrays.asList(1, 2, 3, 4, 6);
          Broadcast<List<Integer>> broadcast = sc.broadcast(list);
          List<Integer> value = broadcast.getValue();
          System.out.println(value);

          • python:
                
                list = (1, 2, 3, 4, 6)
          broadcast = sc.broadcast(list)
          value = broadcast.value
          print(value)


          瀏覽 52
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美日韩高清一区二区 | 黄片在线免费视频 | 精品AA一级黄片 | 三级久久久 | 亚洲高清av |