hadoop之MapReduce推薦系統(tǒng)實(shí)戰(zhàn)
需求背景
前邊已經(jīng)使用mapReduce找到了每個(gè)月中溫度最高的兩天?,F(xiàn)在我們來(lái)看看平時(shí)的社交軟件,推薦系統(tǒng)的機(jī)制是怎樣的,其實(shí)也是非常簡(jiǎn)單。如下圖所示,我們需要給每個(gè)人推薦好友,既然是推薦的,肯定需要符合以下規(guī)則:
1 兩人不能是好友
2 兩人最有可能認(rèn)識(shí),定義關(guān)系鏈條只能跨越一個(gè)人,不能跨越兩個(gè)人。
例如,可以推薦的人:
從下圖,我們可以看出來(lái),要給劉備推薦的好友列表是:諸葛亮(徐庶推薦),馬超(劉璋推薦),貂蟬(袁紹推薦),呂布(袁紹推薦),夏侯惇(曹操推薦),龐統(tǒng)(魯肅推薦),孫權(quán)(魯肅推薦)。
不可以推薦的人:
趙子龍,因?yàn)橐呀?jīng)跨越了徐庶和諸葛亮兩個(gè)人了。曹操,因?yàn)閯浜筒懿倬褪呛糜选?/p>
接下來(lái)我們知道了規(guī)則之后,我們先將圖譜分為兩個(gè)map,其中一個(gè)map用來(lái)匹配直接關(guān)系,一個(gè)map用來(lái)匹配間接關(guān)系。然后從間接關(guān)系中排除掉直接關(guān)系,剩下的就是可以推薦的人選。
直接好友
如下圖示例,我們先列出來(lái)這22個(gè)人的直接關(guān)系數(shù)據(jù),只要是兩者相連的的,肯定是直接好友。

間接好友:
如下圖,由于一個(gè)人的朋友圈的人,很可能相互認(rèn)識(shí),所以只要是和這個(gè)人認(rèn)識(shí)的,都是間接好友關(guān)系。當(dāng)然,可能這兩個(gè)間接好友,早都已經(jīng)加了好友了。

劉備認(rèn)識(shí)曹操,曹操圈子里,劉備和夏侯惇是間接好友關(guān)系。然后我們看到劉備的直接好友里面沒(méi)有夏侯惇,這個(gè)時(shí)候我們就可以通過(guò)曹操給劉備推薦夏侯惇。
然后,雖然曹操圈子里,關(guān)羽和劉備是間接好友關(guān)系,但是我們?cè)趧浜糜蚜斜碇姓业搅岁P(guān)羽,是直接好友。所以不能通過(guò)曹操給劉備推薦關(guān)羽。
而且我們看到了,目前只有22個(gè)人的時(shí)候,我們可以很輕松的列出來(lái)直接好友關(guān)系,但是間接好友的數(shù)據(jù)量就會(huì)是比較龐大的。因此我們開(kāi)始借助程序來(lái)完成好友推薦功能。

數(shù)據(jù)模型
首先第一步當(dāng)然是根據(jù)需求來(lái)構(gòu)建我們的數(shù)據(jù)模型了,讓我們的程序能夠更好的批量處理數(shù)據(jù)之間的關(guān)系。以下便是我們此次構(gòu)建出來(lái)的數(shù)據(jù)模型,每一行代表第一個(gè)人與后邊的人都是直接好友關(guān)系。(數(shù)據(jù)模型后續(xù)有更改,不可直接搬運(yùn),不過(guò)出錯(cuò)了可以自己調(diào)節(jié)也算是鍛煉?。?!)

上傳數(shù)據(jù)文件
數(shù)據(jù)模型建立好之后,我們就該上傳文件至hdfs了。
hdfs dfs -D dfs.blocksize=1048576 -put friend.txt /data/friend/input編寫程序
mapper
map中,我們主要是通過(guò)遍歷每一行,來(lái)將每一行的直接朋友關(guān)系和間接朋友關(guān)系寫入文件中。很顯然,每一行的第一列和后邊的所有列都是直接好友。從第二列開(kāi)始,兩兩互成間接好友關(guān)系。同時(shí)我們將兩個(gè)人關(guān)系進(jìn)行排序,否則,劉備-關(guān)羽,和關(guān)羽-劉備 程序會(huì)識(shí)別為不同的組合。
/**
* @author Ted
* @version 1.0
* @date 2022/2/2 16:19
*/
public class FMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
Text mkey = new Text();
IntWritable mval = new IntWritable();
/**
* 劉備 劉璋 關(guān)羽 徐庶 張飛 袁紹 曹操 魯肅
* 徐庶 諸葛亮 劉備
* 袁紹 呂布 劉備 曹操 貂蟬
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//value:劉備 劉璋 關(guān)羽 徐庶 張飛 袁紹 曹操 魯肅
String[] directFriends = StringUtils.split(value.toString(), ' ');
//間接關(guān)系為1 , 直接關(guān)系為 0
for (int i=1;i<directFriends.length;i++){
mkey.set(getFriendPair(directFriends[0],directFriends[i]));
mval.set(0);
// 寫入文件
context.write(mkey,mval);
for (int j=i+1;j<directFriends.length;j++){
mkey.set(getFriendPair(directFriends[i],directFriends[j]));
mval.set(1);
//寫入文件
context.write(mkey,mval);
}
}
}
/**
* 排序
* @param person1
* @param person2
* @return
*/
public static String getFriendPair(String person1,String person2){
if(person1.compareTo(person2)>0){
return person1+"-"+person2;
}else {
return person2+"-"+person1;
}
}
}reducer
reduce中,我們對(duì)map產(chǎn)生的文件進(jìn)行讀取,由于兩兩一組,value是0代表直接關(guān)系,value是1代表間接關(guān)系。我們判斷如果這兩人有直接關(guān)系,存在為0的情況,這組推薦關(guān)系拋棄掉。如果都是1,我們就將每一組相同的key值相加,來(lái)判斷間接關(guān)系的權(quán)重。
/**
* @author Ted
* @version 1.0
* @date 2022/2/2 16:33
*/
public class FReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
IntWritable rValue = new IntWritable();
/**
* 一組key值 每個(gè)key代表的是某個(gè)兩個(gè)人,同時(shí)分到一個(gè)組,值就是這兩個(gè)人到底是直接朋友,還是間接朋友
* 如果最后循環(huán)最后,沒(méi)有一個(gè)是0的,那么兩個(gè)人肯定是間接關(guān)系,rValue就是間接關(guān)系的深度
* 如果間接關(guān)系深厚,關(guān)聯(lián)的好友很多,那么就優(yōu)先推薦。
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 劉備 關(guān)羽 0
// 劉備 關(guān)羽 0
// 劉備 關(guān)羽 1
// 劉備 關(guān)羽 1
int flag = 0;
int sum = 0;
for (IntWritable val: values){
if(val.get()==0){
flag = 1;
}
sum = val.get();
}
if(flag == 0){
rValue.set(sum);
context.write(key,rValue);
}
}
}客戶端程序
/**
* @author Ted
* @version 1.0
* @date 2022/2/2 16:16
*/
public class MyFriend {
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration(true);
// 獲取客戶端啟動(dòng) 指定參數(shù)列表
String[] params = new GenericOptionsParser(configuration, args).getRemainingArgs();
configuration.set("mapreduce.app-submission.cross-platform","true");
// 創(chuàng)建任務(wù)
Job job = Job.getInstance(configuration);
job.setJarByClass(MyFriend.class);
job.setJobName("myFriend");
job.setJar("D:\\javaEngineer\\javaproject\\hadoop\\hadoophdfs\\target\\hadoop-hdfs-1.0-0.1.jar");
// 輸入路徑
TextInputFormat.addInputPath(job,new Path(params[0]));
Path outputFile = new Path(params[1]);
if(outputFile.getFileSystem(configuration).exists(outputFile)){
outputFile.getFileSystem(configuration).delete(outputFile,true);
}
// 輸出路徑
TextOutputFormat.setOutputPath(job,outputFile);
// 設(shè)置Map 參數(shù)
job.setMapperClass(FMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//設(shè)置reduce參數(shù)
job.setReducerClass(FReduce.class);
// 執(zhí)行任務(wù) 等待結(jié)束
job.waitForCompletion(true);
}
}啟動(dòng)hadoop集群
程序?qū)懞昧?,接下?lái)當(dāng)然是啟動(dòng)我們的hadoop集群了。

配置啟動(dòng)參數(shù)
接下來(lái)我們來(lái)指定mapReduce需要的參數(shù),其中輸入路徑和輸出路徑是最基本的要求,剩下的排序分組分區(qū)我們都采用系統(tǒng)默認(rèn)的方式。

啟動(dòng)客戶端
然后我們開(kāi)始運(yùn)行客戶端,得到如下文件。
hdfs dfs -ls -R /data/friend
我們可以看看里面的內(nèi)容是不是和真實(shí)的場(chǎng)景相吻合。
hdfs dfs -cat /data/friend/output/part-r-00000亂碼

分析原因
可以看到上傳的文件也是亂碼的。

將文件另存為為UTF-8編碼格式之后,文件正常

重啟客戶端
輸出結(jié)果如下。
劉璋-關(guān)羽 1
呂布-劉備 1
周瑜-劉備 1
夏侯惇-關(guān)羽 1
夏侯惇-劉備 1
孫權(quán)-關(guān)羽 1
孫權(quán)-劉備 1
孫權(quán)-夏侯惇 1
孫權(quán)-大喬 1
孫策-周瑜 1
孫策-大喬 1
小喬-周瑜 1
小喬-孫權(quán) 1
龐統(tǒng)-劉備 1
龐統(tǒng)-周瑜 1
龐統(tǒng)-孫權(quán) 1
張飛-劉璋 1
徐庶-關(guān)羽 1
徐庶-劉璋 1
徐庶-張飛 1
曹操-劉璋 1
曹操-呂布 1
曹操-周瑜 1
曹操-孫策 1
曹操-張飛 1
曹操-徐庶 1
袁紹-關(guān)羽 1
袁紹-劉璋 1
袁紹-夏侯惇 1
袁紹-孫權(quán) 1
袁紹-張飛 1
袁紹-徐庶 1
諸葛亮-劉備 1
諸葛亮-周瑜 1
諸葛亮-孫權(quán) 1
諸葛瑾-孫策 1
諸葛瑾-徐庶 1
諸葛瑾-曹操 1
貂蟬-劉備 1
貂蟬-曹操 1
趙子龍-徐庶 1
趙子龍-諸葛瑾 1
馬超-劉備 1
魯肅-關(guān)羽 1
魯肅-劉璋 1
魯肅-大喬 1
魯肅-孫策 1
魯肅-張飛 1
魯肅-徐庶 1
魯肅-曹操 1
魯肅-袁紹 1
魯肅-諸葛瑾 1
黃忠-徐庶 1
黃忠-諸葛瑾 1
黃忠-趙子龍 1我們截取給曹操推薦的朋友圈人選
曹操-劉璋 1 曹操-呂布 1 曹操-周瑜 1 曹操-孫策 1 曹操-張飛 1 曹操-徐庶 1
魯肅-曹操 1 貂蟬-曹操 1 諸葛瑾-曹操 1
再來(lái)看下關(guān)系圖譜,可以看出來(lái)是正確的,權(quán)重關(guān)系都為1。

增加復(fù)雜度
接下來(lái)我們?cè)黾右幌玛P(guān)系復(fù)雜度,讓徐庶加張飛和關(guān)羽好友,讓諸葛亮加劉備為好友,接下來(lái),諸葛亮通過(guò)徐庶和劉備兩個(gè)人,會(huì)和張飛關(guān)羽分別產(chǎn)生兩次間接關(guān)系。圖譜變?yōu)槿缦聵幼樱?/p>
數(shù)據(jù)模型變?yōu)槿缦聵幼?,再次上傳文件,運(yùn)行客戶端程序。

得到結(jié)果如下,發(fā)現(xiàn)并不是想象的樣子。

取消reduce
接下來(lái)我們?nèi)∠鹯educe任務(wù),只進(jìn)行輸出map文件,來(lái)調(diào)節(jié)bug。
// 如果將reduce任務(wù)設(shè)置為0,那么就可以得到map輸出
job.setNumReduceTasks(0);輸出map文件如下:

很明顯,從map中,我們看到了兩個(gè)諸葛亮-關(guān)羽,兩個(gè)諸葛亮-張飛,因此reduce中的代碼是有錯(cuò)誤的。并沒(méi)有正確完成權(quán)重相加。

最后發(fā)現(xiàn)代碼書(shū)寫錯(cuò)誤,居然只寫了等于號(hào)。

修改代碼,程序正確之后,我們重新maven打包。一定要maven打包,不然jar中的代碼還是原先的代碼。

添加reduce
修改代碼,再次開(kāi)放reduce任務(wù)。獲取結(jié)果如下??梢钥吹?,諸葛亮-關(guān)羽的權(quán)重是2,諸葛亮-張飛的權(quán)重是2,應(yīng)該優(yōu)先給諸葛亮推送張飛和關(guān)羽這兩位好朋友。

最終結(jié)果如下圖,我們可以總hadoop的web端下載文件。
劉璋-關(guān)羽 1
呂布-劉備 1
周瑜-劉備 1
夏侯惇-關(guān)羽 1
夏侯惇-劉備 1
孫權(quán)-關(guān)羽 1
孫權(quán)-劉備 2
孫權(quán)-夏侯惇 1
孫權(quán)-大喬 1
孫策-周瑜 1
孫策-大喬 1
小喬-周瑜 1
小喬-孫權(quán) 1
龐統(tǒng)-劉備 1
龐統(tǒng)-周瑜 1
龐統(tǒng)-孫權(quán) 1
張飛-劉璋 1
徐庶-劉璋 1
曹操-劉璋 1
曹操-呂布 1
曹操-周瑜 1
曹操-孫策 1
曹操-張飛 2
曹操-徐庶 1
袁紹-關(guān)羽 2
袁紹-劉璋 1
袁紹-夏侯惇 1
袁紹-孫權(quán) 1
袁紹-張飛 1
袁紹-徐庶 1
諸葛亮-關(guān)羽 2
諸葛亮-劉璋 1
諸葛亮-周瑜 1
諸葛亮-孫權(quán) 1
諸葛亮-張飛 2
諸葛亮-曹操 1
諸葛亮-袁紹 1
諸葛瑾-劉備 1
諸葛瑾-孫策 1
諸葛瑾-徐庶 1
諸葛瑾-曹操 1
貂蟬-劉備 1
貂蟬-曹操 1
趙子龍-劉備 1
趙子龍-徐庶 1
趙子龍-諸葛瑾 1
馬超-劉備 1
魯肅-關(guān)羽 1
魯肅-劉璋 1
魯肅-大喬 1
魯肅-孫策 1
魯肅-張飛 1
魯肅-徐庶 1
魯肅-曹操 2
魯肅-袁紹 1
魯肅-諸葛亮 1
魯肅-諸葛瑾 1
黃忠-劉備 1
黃忠-徐庶 1
黃忠-諸葛瑾 1
黃忠-趙子龍 1