Spark Action 算子
四、Action 算子
1. reduce
對(duì)整個(gè)結(jié)果集規(guī)約, 最終生成一條數(shù)據(jù), 是整個(gè)數(shù)據(jù)集的匯總。
reduce 和 reduceByKey 完全不同, reduce 是一個(gè) action, 并不是 Shuffled 操作,本質(zhì)上 reduce 就是現(xiàn)在每個(gè) partition 上求值, 最終把每個(gè) partition 的結(jié)果再匯總。
例如:
- scala:
var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
println(
p1.reduce((_+_))
)
- java:
JavaRDD<Integer> p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
System.out.println(
p1.reduce(Integer::sum)
);
- python:
p1 = sc.parallelize((1, 2, 3, 4, 6))
print(
p1.reduce(lambda i1, i2: i1 + i2)
)
2. collect
以數(shù)組的形式返回?cái)?shù)據(jù)集中所有元素。例如:
- scala:
var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
println(
p1.collect()
)
- java:
JavaRDD<Integer> p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
System.out.println(
p1.collect()
);
- python:
p1 = sc.parallelize((1, 2, 3, 4, 6))
print(
p1.collect()
)
3. count
數(shù)據(jù)元素個(gè)數(shù):
例如:
- scala:
var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
println(
p1.count()
)
- java:
JavaRDD<Integer> p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
System.out.println(
p1.count()
);
- python:
p1 = sc.parallelize((1, 2, 3, 4, 6))
print(
p1.count()
)
4. first
返回第一個(gè)元素:
例如:
- scala:
var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
println(
p1.first()
)
- java:
JavaRDD<Integer> p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
System.out.println(
p1.first()
);
- python:
p1 = sc.parallelize((1, 2, 3, 4, 6))
print(
p1.first()
)
5. countByKey
求得整個(gè)數(shù)據(jù)集中 Key 以及對(duì)應(yīng) Key 出現(xiàn)的次數(shù):
例如:
- scala:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(
s1.map((_,1)).countByKey()
)
- java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"))
System.out.println(
s1.mapToPair(s -> new Tuple2<>(s, 1)).countByKey()
);
- python:
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(
s1.map(lambda s: (s, 1)).countByKey()
)
6. take
返回前 N 個(gè)元素:
例如:
- scala:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(
s1.take(3)
)
- java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
System.out.println(
s1.take(3)
);
- python:
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(
s1.take(3)
)
7. saveAsTextFile
將結(jié)果存入 path 對(duì)應(yīng)的目錄中:
例如:
- scala:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
s1.saveAsTextFile("D:/test/output/text/")
- java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
s1.saveAsTextFile("D:/test/output/text/");
- python:
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
s1.saveAsTextFile("D:/test/output/text/")
8. lookup
根據(jù) key 查詢對(duì)應(yīng)的 value :
例如:
- scala:
val s1 = sc.parallelize(Seq("小明:15.5", "小明:13.3", "張三:14.4", "張三:37.6", "李四:95.9", "李四:45.4"))
println(
s1.map(s=>(s.split(":")(0),s.split(":")(1).toDouble))
.lookup("小明").toList
)
- java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("小明:15.5", "小明:13.3", "張三:14.4", "張三:37.6", "李四:95.9", "李四:45.4"));
System.out.println(
s1.mapToPair(s -> new Tuple2<>(s.split(":")[0], Double.parseDouble(s.split(":")[1])))
.lookup("小明")
);
- python:
s1 = sc.parallelize(("小明:15.5", "小明:13.3", "張三:14.4", "張三:37.6", "李四:95.9", "李四:45.4"))
print(
s1.map(lambda s: (s.split(":")[0], float(s.split(":")[1])))
.lookup("小明")
)
五、補(bǔ)充算子
1. RDD 持久化
對(duì)于需要復(fù)用的RDD,可以進(jìn)行緩存,已防止重復(fù)計(jì)算,持久化主要有三個(gè)算子,cache、persist、Checkpoint,其中persist可以指定存儲(chǔ)的類(lèi)型,是硬盤(pán)還是內(nèi)存,cache 底層調(diào)用的 persist 默認(rèn)存儲(chǔ)在內(nèi)存中 ,Checkpoint 則可以存儲(chǔ)在 HDFS 中:
例如:
- scala:
val s1 = sc.parallelize(Seq("小明:15.5", "小明:13.3", "張三:14.4", "張三:37.6", "李四:95.9", "李四:45.4"))
//緩存
s1.cache // 底層調(diào)用的 persist
//持久化
s1.persist(StorageLevel.MEMORY_AND_DISK) //使用內(nèi)存和磁盤(pán)(內(nèi)存不夠時(shí)才使用磁盤(pán))
s1.persist(StorageLevel.MEMORY_ONLY) //持久化到內(nèi)存
// Checkpoint 應(yīng)使用Checkpoint把數(shù)據(jù)發(fā)在HDFS上
sc.setCheckpointDir("/data/spark/") //實(shí)際中寫(xiě)HDFS目錄
s1.checkpoint()
//清空緩存
s1.unpersist()
- java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("小明:15.5", "小明:13.3", "張三:14.4", "張三:37.6", "李四:95.9", "李四:45.4"));
//緩存
s1.cache(); // 底層調(diào)用的 persist
//持久化
s1.persist(StorageLevel.MEMORY_AND_DISK()); //使用內(nèi)存和磁盤(pán)(內(nèi)存不夠時(shí)才使用磁盤(pán))
s1.persist(StorageLevel.MEMORY_ONLY()); //持久化到內(nèi)存
// Checkpoint 應(yīng)使用Checkpoint把數(shù)據(jù)發(fā)在HDFS上
sc.setCheckpointDir("/data/spark/");//實(shí)際中寫(xiě)HDFS目錄
s1.checkpoint();
//清空緩存
s1.unpersist();
- python:
s1 = sc.parallelize(("小明:15.5", "小明:13.3", "張三:14.4", "張三:37.6", "李四:95.9", "李四:45.4"))
# 緩存
s1.cache() # 底層調(diào)用的persist
# 持久化
s1.persist(StorageLevel.MEMORY_AND_DISK) # 使用內(nèi)存和磁盤(pán)(內(nèi)存不夠時(shí)才使用磁盤(pán))
s1.persist(StorageLevel.MEMORY_ONLY) # 持久化到內(nèi)存
# Checkpoint 使用Checkpoint把數(shù)據(jù)發(fā)在HDFS上
sc.setCheckpointDir("/data/spark/") # 實(shí)際中寫(xiě)HDFS目錄
s1.checkpoint()
# 清空緩存
s1.unpersist()
2. 共享變量,累加器
支持在所有 不同節(jié)點(diǎn)上進(jìn)行全局累加計(jì)算:
例如:
- scala:
//創(chuàng)建一個(gè)計(jì)數(shù)器/累加器
var ljq = sc.longAccumulator("mycounter")
ljq.add(2)
println(ljq.value)
- java:
SparkContext sparkContext = JavaSparkContext.toSparkContext(sc);
//創(chuàng)建一個(gè)計(jì)數(shù)器/累加器
LongAccumulator ljq = sparkContext.longAccumulator("mycounter");
ljq.add(2);
System.out.println(ljq.value());
- python:
ljq = sc.accumulator("mycounter")
ljq.add(2)
print(ljq.value)
3. 共享變量,廣播變量
支持在所有 不同節(jié)點(diǎn)上進(jìn)行全局累加計(jì)算:
例如:
- scala:
val list = Seq(1, 2, 3, 4, 6)
val broadcast = sc.broadcast(list)
val value = broadcast.value
println(value.toList)
- java:
List<Integer> list = Arrays.asList(1, 2, 3, 4, 6);
Broadcast<List<Integer>> broadcast = sc.broadcast(list);
List<Integer> value = broadcast.getValue();
System.out.println(value);
- python:
list = (1, 2, 3, 4, 6)
broadcast = sc.broadcast(list)
value = broadcast.value
print(value)
