7道RDD編程練習(xí)題
公眾號(hào)后臺(tái)回復(fù)關(guān)鍵字:pyspark,獲取本項(xiàng)目github地址。
為強(qiáng)化RDD編程API的使用經(jīng)驗(yàn),現(xiàn)提供一些小練習(xí)題。
讀者可以使用RDD的編程API完成這些小練習(xí)題,并輸出結(jié)果。
這些練習(xí)題基本可以在15行代碼以?xún)?nèi)完成,如果遇到困難,建議回看上一節(jié)RDD的API介紹。
完成這些練習(xí)題后,可以查看本節(jié)后面的參考答案,和自己的實(shí)現(xiàn)方案進(jìn)行對(duì)比。
import findspark
#指定spark_home為剛才的解壓路徑,指定python路徑
spark_home = "/Users/liangyun/ProgramFiles/spark-3.0.1-bin-hadoop3.2"
python_path = "/Users/liangyun/anaconda3/bin/python"
findspark.init(spark_home,python_path)
import pyspark
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("rdd_tutorial").setMaster("local[4]")
sc = SparkContext(conf=conf)
print(pyspark.__version__)
一,練習(xí)題列表
1,求平均數(shù)
#任務(wù):求data的平均值
data = [1,5,7,10,23,20,6,5,10,7,10]
2,求眾數(shù)
#任務(wù):求data中出現(xiàn)次數(shù)最多的數(shù)
data = [1,5,7,10,23,20,6,5,10,7,10]
3,求TopN
#任務(wù):有一批學(xué)生信息表格,包括name,age,score, 找出score排名前3的學(xué)生, score相同可以任取
students = [("LiLei",18,87),("HanMeiMei",16,77),("DaChui",16,66),("Jim",18,77),("RuHua",18,50)]
n = 3
4,排序并返回序號(hào)
#任務(wù):排序并返回序號(hào), 大小相同的序號(hào)可以不同
data = [1,7,8,5,3,18,34,9,0,12,8]
5,二次排序
#任務(wù):有一批學(xué)生信息表格,包括name,age,score
#首先根據(jù)學(xué)生的score從大到小排序,如果score相同,根據(jù)age從大到小
students = [("LiLei",18,87),("HanMeiMei",16,77),("DaChui",16,66),("Jim",18,77),("RuHua",18,50)]
6,連接操作
#任務(wù):已知班級(jí)信息表和成績(jī)表,找出班級(jí)平均分在75分以上的班級(jí)
#班級(jí)信息表包括class,name,成績(jī)表包括name,score
classes = [("class1","LiLei"), ("class1","HanMeiMei"),("class2","DaChui"),("class2","RuHua")]
scores = [("LiLei",76),("HanMeiMei",80),("DaChui",70),("RuHua",60)]
7,分組求眾數(shù)
#任務(wù):有一批學(xué)生信息表格,包括class和age。求每個(gè)班級(jí)學(xué)生年齡的眾數(shù)。
students = [("class1",15),("class1",15),("class2",16),("class2",16),("class1",17),("class2",19)]
二,練習(xí)題參考答案
1,求平均數(shù)
#任務(wù):求data的平均值
data = [1,5,7,10,23,20,6,5,10,7,10]
rdd_data = sc.parallelize(data)
s = rdd_data.reduce(lambda x,y:x+y+0.0)
n = rdd_data.count()
avg = s/n
print("average:",avg)
average: 9.454545454545455
2,求眾數(shù)
#任務(wù):求data中出現(xiàn)次數(shù)最多的數(shù),若有多個(gè),求這些數(shù)的平均值
data = [1,5,7,10,23,20,7,5,10,7,10]
rdd_data = sc.parallelize(data)
rdd_count = rdd_data.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
max_count = rdd_count.map(lambda x:x[1]).reduce(lambda x,y: x if x>=y else y)
rdd_mode = rdd_count.filter(lambda x:x[1]==max_count).map(lambda x:x[0])
mode = rdd_mode.reduce(lambda x,y:x+y+0.0)/rdd_mode.count()
print("mode:",mode)
mode: 8.5
3,求TopN
#任務(wù):有一批學(xué)生信息表格,包括name,age,score, 找出score排名前3的學(xué)生, score相同可以任取
students = [("LiLei",18,87),("HanMeiMei",16,77),("DaChui",16,66),("Jim",18,77),("RuHua",18,50)]
n = 3
rdd_students = sc.parallelize(students)
rdd_sorted = rdd_students.sortBy(lambda x:x[2],ascending = False)
students_topn = rdd_sorted.take(n)
print(students_topn)
[('LiLei', 18, 87), ('HanMeiMei', 16, 77), ('Jim', 18, 77)]
4,排序并返回序號(hào)
#任務(wù):按從小到大排序并返回序號(hào), 大小相同的序號(hào)可以不同
data = [1,7,8,5,3,18,34,9,0,12,8]
rdd_data = sc.parallelize(data)
rdd_sorted = rdd_data.map(lambda x:(x,1)).sortByKey().map(lambda x:x[0])
rdd_sorted_index = rdd_sorted.zipWithIndex()
print(rdd_sorted_index.collect())
[(0, 0), (1, 1), (3, 2), (5, 3), (7, 4), (8, 5), (8, 6), (9, 7), (12, 8), (18, 9), (34, 10)]
5,二次排序
#任務(wù):有一批學(xué)生信息表格,包括name,age,score
#首先根據(jù)學(xué)生的score從大到小排序,如果score相同,根據(jù)age從大到小
students = [("LiLei",18,87),("HanMeiMei",16,77),("DaChui",16,66),("Jim",18,77),("RuHua",18,50)]
rdd_students = sc.parallelize(students)
%%writefile student.py
#為了在RDD中使用自定義類(lèi),需要將類(lèi)的創(chuàng)建代碼其寫(xiě)入到一個(gè)文件中,否則會(huì)有序列化錯(cuò)誤
class Student:
def __init__(self,name,age,score):
self.name = name
self.age = age
self.score = score
def __gt__(self,other):
if self.score > other.score:
return True
elif self.score==other.score and self.age>other.age:
return True
else:
return False
from student import Student
rdd_sorted = rdd_students \
.map(lambda t:Student(t[0],t[1],t[2]))\
.sortBy(lambda x:x,ascending = False)\
.map(lambda student:(student.name,student.age,student.score))
#參考方案:此處巧妙地對(duì)score和age進(jìn)行編碼來(lái)表達(dá)其排序優(yōu)先級(jí)關(guān)系,除非age超過(guò)100000,以下邏輯無(wú)錯(cuò)誤。
#rdd_sorted = rdd_students.sortBy(lambda x:100000*x[2]+x[1],ascending=False)
rdd_sorted.collect()
[('LiLei', 18, 87),
('Jim', 18, 77),
('HanMeiMei', 16, 77),
('DaChui', 16, 66),
('RuHua', 18, 50)]
6,連接操作
#任務(wù):已知班級(jí)信息表和成績(jī)表,找出班級(jí)平均分在75分以上的班級(jí)
#班級(jí)信息表包括class,name,成績(jī)表包括name,score
classes = [("class1","LiLei"), ("class1","HanMeiMei"),("class2","DaChui"),("class2","RuHua")]
scores = [("LiLei",76),("HanMeiMei",80),("DaChui",70),("RuHua",60)]
rdd_classes = sc.parallelize(classes).map(lambda x:(x[1],x[0]))
rdd_scores = sc.parallelize(scores)
rdd_join = rdd_scores.join(rdd_classes).map(lambda t:(t[1][1],t[1][0]))
def average(iterator):
data = list(iterator)
s = 0.0
for x in data:
s = s + x
return s/len(data)
rdd_result = rdd_join.groupByKey().map(lambda t:(t[0],average(t[1]))).filter(lambda t:t[1]>75)
print(rdd_result.collect())
[('class1', 78.0)]
7,分組求眾數(shù)
#任務(wù):有一批學(xué)生信息表格,包括class和age。求每個(gè)班級(jí)學(xué)生年齡的眾數(shù)。
students = [("class1",15),("class1",15),("class2",16),("class2",16),("class1",17),("class2",19)]
def mode(arr):
dict_cnt = {}
for x in arr:
dict_cnt[x] = dict_cnt.get(x,0)+1
max_cnt = max(dict_cnt.values())
most_values = [k for k,v in dict_cnt.items() if v==max_cnt]
s = 0.0
for x in most_values:
s = s + x
return s/len(most_values)
rdd_students = sc.parallelize(students)
rdd_classes = rdd_students.aggregateByKey([],lambda arr,x:arr+[x],lambda arr1,arr2:arr1+arr2)
rdd_mode = rdd_classes.map(lambda t:(t[0],mode(t[1])))
print(rdd_mode.collect())
[('class1', 15.0), ('class2', 16.0)]

公眾號(hào)后臺(tái)回復(fù)關(guān)鍵字:pyspark,獲取本項(xiàng)目github地址。
也可以在公眾號(hào)后臺(tái)回復(fù)關(guān)鍵字:spark加群,加入spark和大數(shù)據(jù)讀者交流群和大家討論。
猜你喜歡:
評(píng)論
圖片
表情
