Spark Transformations 算子
下面對相關(guān)常用算子進(jìn)行演示。
三、Transformations 算子
1. map
將 RDD 中的數(shù)據(jù) 一對一 的轉(zhuǎn)為另一種形式:
例如:
- scala:
val num = sc.parallelize(Seq(1, 2, 3, 4, 5))
println(
num.map(_+1).collect().toList
)
- java:
JavaRDD<Integer> num = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
System.out.println(
num.map(i -> i + 1).collect()
);
- python:
num = sc.parallelize((1, 2, 3, 4, 5))
print(
num.map(lambda i:i+1).collect()
)
2. flatMap
和 Map 算子類似,但是 FlatMap 是一對多,并都轉(zhuǎn)化為一維數(shù)據(jù):
例如:
- scala:
val text = sc.parallelize(Seq("abc def", "hello word", "dfg,okh", "he,word"))
println(
text.flatMap(_.split(" ")).flatMap(_.split(",")).collect().toList
)
- java:
JavaRDD<String> text = sc.parallelize(Arrays.asList("abc def", "hello word", "dfg,okh", "he,word"));
System.out.println(
text.flatMap(s ->Arrays.asList(s.split(" ")).iterator())
.flatMap(s ->Arrays.asList(s.split(",")).iterator())
.collect()
);
- python:
text = sc.parallelize(("abc def", "hello word", "dfg,okh", "he,word"))
print(
text.flatMap(lambda s: s.split(" ")).flatMap(lambda s: s.split(",")).collect()
)
3. filter
過濾掉不需要的內(nèi)容:
例如:
- scala:
val text = sc.parallelize(Seq("hello", "hello", "word", "word"))
println(
text.filter(_.equals("hello")).collect().toList
)
- java:
JavaRDD<String> text = sc.parallelize(Arrays.asList("hello", "hello", "word", "word"));
System.out.println(
text.filter(s -> Objects.equals(s,"hello"))
.collect()
);
- python:
text = sc.parallelize(("hello", "hello", "word", "word"))
print(
text.filter(lambda s: s == 'hello').collect()
)
4. mapPartitions
和 map 類似,針對整個(gè)分區(qū)的數(shù)據(jù)轉(zhuǎn)換,拿到的是每個(gè)分區(qū)的集合:
例如:
- scala:
val text = sc.parallelize(Seq("hello", "hello", "word", "word"), 2)
println(
text.mapPartitions(iter => {<!-- -->
iter.map(_ + "333")
}).collect().toList
)
- java:
JavaRDD<String> text = sc.parallelize(Arrays.asList("hello", "hello", "word", "word"), 2);
System.out.println(
text.mapPartitions(iter -> {<!-- -->
List<String> list = new ArrayList<>();
iter.forEachRemaining(s -> list.add(s+"333"));
return list.iterator();
}).collect()
);
- python:
text = sc.parallelize(("hello", "hello", "word", "word"), 2)
def partition(par):
tmpArr = []
for s in par:
tmpArr.append(s + "333")
return tmpArr
print(
text.mapPartitions(partition).collect()
)
5. mapPartitionsWithIndex
和 mapPartitions 類似, 只是在函數(shù)中增加了分區(qū)的 Index :
例如:
- scala:
val text = sc.parallelize(Seq("hello", "hello", "word", "word"), 2)
println(
text.mapPartitionsWithIndex((index, iter) => {<!-- -->
println("當(dāng)前分區(qū)" + index)
iter.map(_ + "333")
}, true).collect().toList
)
- java:
JavaRDD<String> text = sc.parallelize(Arrays.asList("hello", "hello", "word", "word"), 2);
System.out.println(
text.mapPartitionsWithIndex((index, iter) -> {<!-- -->
System.out.println("當(dāng)前分區(qū)" + index);
List<String> list = new ArrayList<>();
iter.forEachRemaining(s -> list.add(s + "333"));
return list.iterator();
}, true).collect()
);
- python:
text = sc.parallelize(("hello", "hello", "word", "word"), 2)
def partition(index, par):
print("當(dāng)前分區(qū)" + str(index))
tmpArr = []
for s in par:
tmpArr.append(s + "333")
return tmpArr
print(
text.mapPartitionsWithIndex(partition).collect()
)
6. mapValues
只能作用于 Key-Value 型數(shù)據(jù), 和 Map 類似, 也是使用函數(shù)按照轉(zhuǎn)換數(shù)據(jù), 不同點(diǎn)是 MapValues 只轉(zhuǎn)換 Key-Value 中的 Value:
例如:
- scala:
val text = sc.parallelize(Seq("abc", "bbb", "ccc", "dd"))
println(
text.map((_, "v" + _))
.mapValues(_ + "66")
.collect().toList
)
- java:
JavaRDD<String> text = sc.parallelize(Arrays.asList("abc", "bbb", "ccc", "dd"));
System.out.println(
text.mapToPair(s -> new Tuple2<>(s, "v" + s))
.mapValues(v -> v + "66").collect()
);
- python:
text = sc.parallelize(("abc", "bbb", "ccc", "dd"))
print(
text.map(lambda s: (s, "v" + s)).mapValues(lambda v: v + "66").collect()
)
7. sample
可以從一個(gè)數(shù)據(jù)集中抽樣出來一部分, 常用作于減小數(shù)據(jù)集以保證運(yùn)行速度, 并且盡可能少規(guī)律的損失:
第一個(gè)參數(shù)為withReplacement, 意為是否取樣以后是否還放回原數(shù)據(jù)集供下次使用, 簡單的說,如果這個(gè)參數(shù)的值為 true, 則抽樣出來的數(shù)據(jù)集中可能會有重復(fù)。
第二個(gè)參數(shù)為fraction, 意為抽樣的比例。
第三個(gè)參數(shù)為seed, 隨機(jī)數(shù)種子, 用于 Sample 內(nèi)部隨機(jī)生成下標(biāo),一般不指定,使用默認(rèn)值。
例如:
- scala:
val num = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
println(
num.sample(true,0.6,2)
.collect().toList
)
- java:
JavaRDD<Integer> num = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
System.out.println(
num.sample(true, 0.6, 2).collect()
);
- python:
num = sc.parallelize((1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
print(
num.sample(True, 0.6, 2).collect()
)
8. union
兩個(gè)數(shù)據(jù)并集,類似于數(shù)據(jù)庫的 union :
例如:
- scala:
val text1 = sc.parallelize(Seq("aa", "bb"))
val text2 = sc.parallelize(Seq("cc", "dd"))
println(
text1.union(text2).collect().toList
)
- java:
JavaRDD<String> text1 = sc.parallelize(Arrays.asList("aa", "bb"));
JavaRDD<String> text2 = sc.parallelize(Arrays.asList("cc", "dd"));
System.out.println(
text1.union(text2).collect()
);
- python:
text1 = sc.parallelize(("aa", "bb"))
text2 = sc.parallelize(("cc", "dd"))
print(
text1.union(text2).collect()
)
9. join,leftOuterJoin,rightOuterJoin
兩個(gè)(key,value)數(shù)據(jù)集,根據(jù) key 取連接、左連接、右連接,類似數(shù)據(jù)庫中的連接:
例如:
- scala:
val s1 = sc.parallelize(Seq("1,3", "2,6", "3,8", "4,2"))
val s2 = sc.parallelize(Seq("1,小明", "2,小張", "3,小李", "4,小紅", "5,張三"))
val s3 = s1.map(s => (s.split(",")(0), s.split(",")(0)))
val s4 = s2.map(s => (s.split(",")(0), s.split(",")(1)))
println(s3.join(s4).collectAsMap)
println(s3.leftOuterJoin(s4).collectAsMap)
println(s3.rightOuterJoin(s4).collectAsMap)
- java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("1,3", "2,6", "3,8", "4,2"));
JavaRDD<String> s2 = sc.parallelize(Arrays.asList("1,小明", "2,小張", "3,小李", "4,小紅", "5,張三"));
JavaPairRDD<String, String> s3 = s1.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));
JavaPairRDD<String, String> s4 = s2.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));
System.out.println(s3.join(s4).collectAsMap());
System.out.println(s3.leftOuterJoin(s4).collectAsMap());
System.out.println(s3.rightOuterJoin(s4).collectAsMap());
- python:
s1 = sc.parallelize(("1,3", "2,6", "3,8", "4,2"))
s2 = sc.parallelize(("1,小明", "2,小張", "3,小李", "4,小紅", "5,張三"))
s3 = s1.map(lambda s:(s.split(",")[0], s.split(",")[0]))
s4 = s2.map(lambda s:(s.split(",")[0], s.split(",")[1]))
print(s3.join(s4).collectAsMap())
print(s3.leftOuterJoin(s4).collectAsMap())
print(s3.rightOuterJoin(s4).collectAsMap())
10. intersection
獲取兩個(gè)集合的交集 :
例如:
- scala:
val s1 = sc.parallelize(Seq("abc", "dfe", "hello"))
val s2 = sc.parallelize(Seq("fgh", "nbv", "hello", "word", "jkl", "abc"))
println(
s1.intersection(s2).collect().toList
)
- java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "dfe", "hello"));
JavaRDD<String> s2 = sc.parallelize(Arrays.asList("fgh", "nbv", "hello", "word", "jkl", "abc"));
System.out.println(
s1.intersection(s2).collect()
);
- python:
s1 = sc.parallelize(("abc", "dfe", "hello"))
s2 = sc.parallelize(("fgh", "nbv", "hello", "word", "jkl", "abc"))
print(
s1.intersection(s2).collect()
)
11. subtract
獲取差集,a - b ,取 a 集合中 b 集合沒有的元素:
例如:
- scala:
val s1 = sc.parallelize(Seq("abc", "dfe", "hello"))
val s2 = sc.parallelize(Seq("fgh", "nbv", "hello", "word", "jkl", "abc"))
println(
s1.subtract(s2).collect().toList
)
- java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "dfe", "hello"));
JavaRDD<String> s2 = sc.parallelize(Arrays.asList("fgh", "nbv", "hello", "word", "jkl", "abc"));
System.out.println(
s1.subtract(s2).collect()
);
- python:
s1 = sc.parallelize(("abc", "dfe", "hello"))
s2 = sc.parallelize(("fgh", "nbv", "hello", "word", "jkl", "abc"))
print(
s1.subtract(s2).collect()
)
12. distinct
元素去重,是一個(gè)需要 Shuffled 的操作:
例如:
- scala:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(
s1.distinct().collect().toList
)
- java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
System.out.println(
s1.distinct().collect()
);
- python:
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(
s1.distinct().collect()
)
13. reduceByKey
只能作用于 Key-Value 型數(shù)據(jù),根據(jù) Key 分組生成一個(gè) Tuple,然后針對每個(gè)組執(zhí)行 reduce 算子,傳入兩個(gè)參數(shù),一個(gè)是當(dāng)前值,一個(gè)是局部匯總,這個(gè)函數(shù)需要有一個(gè)輸出, 輸出就是這個(gè) Key 的匯總結(jié)果,是一個(gè)需要 Shuffled 的操作:
例如:
- scala:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(
s1.map((_, 1))
.reduceByKey(Integer.sum)
.collectAsMap
)
- java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
System.out.println(
s1.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey(Integer::sum)
.collectAsMap()
);
- python:
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(
s1.map(lambda s: (s, 1))
.reduceByKey(lambda v1, v2: v1 + v2)
.collectAsMap()
)
14. groupByKey
只能作用于 Key-Value 型數(shù)據(jù),根據(jù) Key 分組, 和 ReduceByKey 有點(diǎn)類似, 但是 GroupByKey 并不求聚合, 只是列舉 Key 對應(yīng)的所有 Value,是一個(gè)需要 Shuffled 的操作。
GroupByKey 和 ReduceByKey 不同,因?yàn)樾枰信e Key 對應(yīng)的所有數(shù)據(jù), 所以無法在 Map 端做 Combine, 所以 GroupByKey 的性能并沒有 ReduceByKey 好:
例如:
- scala:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(
s1.map((_, 1))
.groupByKey()
.collectAsMap
)
- java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
System.out.println(
s1.mapToPair(s -> new Tuple2<>(s, 1))
.groupByKey()
.collectAsMap()
);
- python:
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(
s1.map(lambda s: (s, 1))
.reduceByKey()
.collectAsMap()
)
