通過WordCount學(xué)習(xí)MapReduce
文章已收錄到我的Github精選,歡迎Star:https://github.com/yehongzhi/learningSummary
MapReduce介紹
MapReduce主要分為兩個(gè)部分,分別是map和reduce,采用的是“分而治之”的思想,Mapper負(fù)責(zé)“分”,把一個(gè)龐大的任務(wù)分成若干個(gè)小任務(wù)來(lái)進(jìn)行處理,而Reduce則是負(fù)責(zé)對(duì)map階段的結(jié)果進(jìn)行匯總。
比如我們要統(tǒng)計(jì)一個(gè)很大的文本,里面每個(gè)單詞出現(xiàn)的頻率,也就是WordCount。怎么工作呢?請(qǐng)看下圖:

在map階段把input輸入的文本拆成一個(gè)一個(gè)的單詞,key是單詞,value則是出現(xiàn)的次數(shù)。接著到Reduce階段匯總,相同的key則次數(shù)加1。最后得到結(jié)果,輸出到文件保存。
WordCount例子
下面進(jìn)入實(shí)戰(zhàn),怎么實(shí)現(xiàn)WordCount的功能呢?
創(chuàng)建項(xiàng)目
首先我們得創(chuàng)建一個(gè)maven項(xiàng)目,依賴如下:
<project?xmlns="http://maven.apache.org/POM/4.0.0"
?????????xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?http://maven.apache.org/xsd/maven-4.0.0.xsd">
????<modelVersion>4.0.0modelVersion>
????<groupId>io.github.yehongzhigroupId>
????<artifactId>hadooptestartifactId>
????<version>1.0-SNAPSHOTversion>
????<packaging>jarpackaging>
????<repositories>
????????<repository>
????????????<id>apacheid>
????????????<url>http://maven.apache.orgurl>
????????repository>
????repositories>
????<dependencies>
????????<dependency>
????????????<groupId>org.apache.hadoopgroupId>
????????????<artifactId>hadoop-commonartifactId>
????????????<version>2.6.5version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.hadoopgroupId>
????????????<artifactId>hadoop-hdfsartifactId>
????????????<version>2.6.5version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.hadoopgroupId>
????????????<artifactId>hadoop-mapreduce-client-coreartifactId>
????????????<version>2.6.5version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.hadoopgroupId>
????????????<artifactId>hadoop-mapreduce-client-jobclientartifactId>
????????????<version>2.6.5version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.hadoopgroupId>
????????????<artifactId>hadoop-mapreduce-client-commonartifactId>
????????????<version>2.6.5version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.hadoopgroupId>
????????????<artifactId>hadoop-clientartifactId>
????????????<version>2.6.5version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.hadoopgroupId>
????????????<artifactId>hadoop-coreartifactId>
????????????<version>1.2.0version>
????????dependency>
????dependencies>
project>
第一步是Mapper階段,創(chuàng)建類WordcountMapper:
/**
?*?Mapper有四個(gè)泛型參數(shù)需要填寫
?*?第一個(gè)參數(shù)KEYIN:默認(rèn)情況下,是mr框架所讀到的一行文本的起始偏移量,類型為L(zhǎng)ongWritable
?*?第二個(gè)參數(shù)VALUEIN:默認(rèn)情況下,是mr框架所讀的一行文本的內(nèi)容,類型為Text
?*?第三個(gè)參數(shù)KEYOUT:是邏輯處理完成之后輸出數(shù)據(jù)的key,在此處是每一個(gè)單詞,類型為Text
?*?第四個(gè)參數(shù)VALUEOUT:是邏輯處理完成之后輸出數(shù)據(jù)的value,在此處是次數(shù),類型為Intwriterable
?*?*/
public?class?WordcountMapper?extends?Mapper<LongWritable,?Text,?Text,?IntWritable>?{
????@Override
????protected?void?map(LongWritable?key,?Text?value,?Context?context)?throws?IOException,?InterruptedException?{
????????//將輸入的文本轉(zhuǎn)成String
????????String?string?=?value.toString();
????????//使用空格分割每個(gè)單詞
????????String[]?words?=?string.split("?");
????????//輸出為<單詞,1>
????????for?(String?word?:?words)?{
????????????//將單詞作為key,次數(shù)1作為value
????????????context.write(new?Text(word),?new?IntWritable(1));
????????}
????}
}
接著到Reduce階段,創(chuàng)建類WordcountReduce:
/**
?*?KEYIN,?VALUEIN,?對(duì)應(yīng)mapper階段的KEYOUT,VALUEOUT的類型
?*
?*?KEYOUT,?VALUEOUT,則是reduce邏輯處理結(jié)果的輸出數(shù)據(jù)類型
?*
?*?KEYOUT是單詞,類型為Text
?*?VALUEOUT是總次數(shù),類型為IntWritable
?*/
public?class?WordcountReduce?extends?Reducer<Text,?IntWritable,?Text,?IntWritable>?{
????@Override
????protected?void?reduce(Text?key,?Iterable?values,?Context?context) ?throws?IOException,?InterruptedException?{
????????int?count?=?0;
????????//次數(shù)相加
????????for?(IntWritable?value?:?values)?{
????????????count?+=?value.get();
????????}
????????//輸出<單詞,總次數(shù)>
????????context.write(key,?new?IntWritable(count));
????}
}
最后再創(chuàng)建類WordCount,提供入口:
public?class?WordCount?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????Configuration?configuration?=?new?Configuration();
????????Job?job?=?Job.getInstance(configuration);
????????//指定本程序的jar包所在的本地路徑??把jar包提交到y(tǒng)arn
????????job.setJarByClass(WordCount.class);
????????/*
???*?告訴框架調(diào)用哪個(gè)類
???*?指定本業(yè)務(wù)job要用的mapper/Reducer業(yè)務(wù)類
???*/
????????job.setMapperClass(WordcountMapper.class);
????????job.setReducerClass(WordcountReduce.class);
????????/*
???*?指定mapper輸出數(shù)據(jù)KV類型
???*/
????????job.setMapOutputKeyClass(Text.class);
????????job.setMapOutputValueClass(IntWritable.class);
????????//指定最終的輸出數(shù)據(jù)的kv類型
????????job.setOutputKeyClass(Text.class);
????????job.setOutputValueClass(IntWritable.class);
????????//指定job?的輸入文件所在的目錄
????????FileInputFormat.setInputPaths(job,?new?Path(args[0]));
????????//?指定job?的輸出結(jié)果所在的目錄
????????FileOutputFormat.setOutputPath(job,?new?Path(args[1]));
????????boolean?completion?=?job.waitForCompletion(true);
????????System.exit(completion???0?:?1);
????}
}
寫到這里就完成了。接下來(lái)使用maven打包成jar包,上傳到部署了hadoop的服務(wù)器。

上傳文件到hadoop
接著上傳需要統(tǒng)計(jì)單詞的文本文件上去hadoop,這里我隨便拿一個(gè)redis的配置文件(字?jǐn)?shù)夠多,哈哈)上傳上去。
先改個(gè)名字為input.txt然后用ftp工具上傳到/usr/local/hadoop-3.2.2/input目錄,接著在hadoop創(chuàng)建/user/root文件夾。
hdfs?dfs?-mkdir?/user
hdfs?dfs?-mkdir?/user/root
hadoop?fs?-mkdir?input
//上傳文件到hdfs
hadoop?fs?-put?/usr/local/hadoop-3.2.2/input/input.txt?input
//上傳成功之后,可以使用下面的命令查看
hadoop?fs?-ls?/user/root/input
執(zhí)行程序
第一步先啟動(dòng)hadoop,到sbin目錄下使用命令./start-all.sh,啟動(dòng)成功后,使用jps查看到以下進(jìn)程。

執(zhí)行以下命令執(zhí)行jar包:
hadoop?jar?/usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar?WordCount?input?output
#?/usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar?表示jar包的位置
#?WordCount?是類名
#?input?是輸入文件所在的文件夾
#?output?輸出的文件夾

這表示運(yùn)行成功,我們打開web管理界面,找到output文件夾。

輸出結(jié)果就是這個(gè)文件,下載下來(lái)。

然后打開該文件,可以看到統(tǒng)計(jì)結(jié)果,以下截圖為其中一部分結(jié)果:

遇到的問題
如果出現(xiàn)Running Job一直沒有響應(yīng),更改mapred-site.xml文件內(nèi)容:
更改前:
<configuration>
????<property>
???????????<name>mapreduce.framework.namename>
???????????<value>yarnvalue>
????property>
configuration>
更改后:
<configuration>
????<property>
??????????<name>mapreduce.job.trackername>
??????????<value>hdfs://192.168.1.4:8001value>
??????????<final>truefinal>
?????property>
configuration>
然后重新啟動(dòng)hadoop,再執(zhí)行命令運(yùn)行jar包任務(wù)。
總結(jié)
WordCount相當(dāng)于大數(shù)據(jù)的HelloWord程序,對(duì)剛?cè)腴T的同學(xué)來(lái)說能夠通過這個(gè)例子學(xué)習(xí)MapReduce的基本操作,還有搭建環(huán)境,還是很有幫助的。接下來(lái)還會(huì)繼續(xù)學(xué)習(xí)大數(shù)據(jù)相關(guān)的知識(shí),希望這篇文章對(duì)你有所幫助。
覺得有用就點(diǎn)個(gè)贊吧,你的點(diǎn)贊是我創(chuàng)作的最大動(dòng)力~
我是一個(gè)努力讓大家記住的程序員。我們下期再見?。?!
能力有限,如果有什么錯(cuò)誤或者不當(dāng)之處,請(qǐng)大家批評(píng)指正,一起學(xué)習(xí)交流!
