hadoop的wordcount实例代码
可以通过一个简单的例子来说明mapreduce到底是什么:
我们要统计一个大文件中的各个单词出现的次数。由于文件太大。我们把这个文件切分成如果小文件,然后安排多个人去统计。这个过程就是”map”。然后把每个人统计的数字合并起来,这个就是“reduce"。
上面的例子如果在mapreduce去做呢,就需要创建一个任务job,由job把文件切分成若干独立的数据块,并分布在不同的机器节点中。然后通过分散在不同节点中的map任务以完全并行的方式进行处理。mapreduce会对map的输出地行收集,再将结果输出送给reduce进行下一步的处理。
对于一个任务的具体执行过程,会有一个名为"jobtracker"的进程负责协调mapreduce执行过程中的所有任务。若干条tasktracker进程用来运行单独的map任务,并随时将任务的执行情况汇报给jobtracker。如果一个tasktracker汇报任务失败或者长时间未对本身任务进行汇报,jobtracker会启动另外一个tasktracker重新执行单独的map任务。
下面的具体的代码实现:
1. 编写wordcount的相关job
(1)eclipse下创建相关maven项目,依赖jar包如下(也可参照hadoop源码包下的hadoop-mapreduce-examples项目的pom配置)
注意:要配置一个maven插件maven-jar-plugin,并指定mainclass
<dependencies> <dependency> <groupid>junit</groupid> <artifactid>junit</artifactid> <version>4.11</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-mapreduce-client-core</artifactid> <version>2.5.2</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-common</artifactid> <version>2.5.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupid>org.apache.maven.plugins</groupid> <artifactid>maven-jar-plugin</artifactid> <configuration> <archive> <manifest> <mainclass>com.xxx.demo.hadoop.wordcount.wordcount</mainclass> </manifest> </archive> </configuration> </plugin> </plugins> </build>
(2)根据mapreduce的运行机制,一个job至少要编写三个类分别用来完成map逻辑、reduce逻辑、作业调度这三件事。
map的代码可继承org.apache.hadoop.mapreduce.mapper类
public static class tokenizermapper extends mapper<object, text, text, intwritable>{ private final static intwritable one = new intwritable(1); private text word = new text(); //由于该例子未用到key的参数,所以该处key的类型就简单指定为object public void map(object key, text value, context context ) throws ioexception, interruptedexception { stringtokenizer itr = new stringtokenizer(value.tostring()); while (itr.hasmoretokens()) { word.set(itr.nexttoken()); context.write(word, one); } } }
reduce的代码可继承org.apache.hadoop.mapreduce.reducer类
public class intsumreducer extends reducer<text,intwritable,text,intwritable> { private intwritable result = new intwritable(); public void reduce(text key, iterable<intwritable> values, context context ) throws ioexception, interruptedexception { int sum = 0; for (intwritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
编写main方法进行作业调度
public static void main(string[] args) throws exception { configuration conf = new configuration(); job job = job.getinstance(conf, "word count"); job.setjarbyclass(wordcount.class); job.setmapperclass(tokenizermapper.class); job.setcombinerclass(intsumreducer.class); job.setreducerclass(intsumreducer.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); fileinputformat.addinputpath(job, new path(args[0])); fileoutputformat.setoutputpath(job, new path(args[1])); job.waitforcompletion(true) ; //system.exit(job.waitforcompletion(true) ? 0 : 1); }
2. 上传数据文件到hadoop集群环境
执行mvn install把项目打成jar文件然后上传到linux集群环境,使用hdfs dfs -mkdir命令在hdfs文件系统中创建相应的命令,使用hdfs dfs -put 把需要处理的数据文件上传到hdfs系统中,示例:hdfs dfs -put ${linux_path/数据文件} ${hdfs_path}
3. 执行job
在集群环境中执行命令: hadoop jar ${linux_path}/wordcount.jar ${hdfs_input_path} ${hdfs_output_path}
4. 查看统计结果
hdfs dfs -cat ${hdfs_output_path}/输出文件名
以上的方式在未启动hadoop集群环境时,是以local模式运行,此时hdfs和yarn都不起作用。下面是在伪分布式模式下执行mapreduce job时需要做的工作,先把官网上列的步骤摘录出来:
配置主机名
# vi /etc/sysconfig/network
例如:
networking=yes hostname=master vi /etc/hosts
填入以下内容
127.0.0.1 localhost
配置ssh免密码互通
ssh-keygen -t rsa
# cat?~/.ssh/id_rsa.pub?>>?~/.ssh/authorized_keys
配置core-site.xml文件(位于${hadoop_home}/etc/hadoop/
<configuration> <property> <name>fs.defaultfs</name> <value>hdfs://localhost:9000</value> </property> </configuration>
配置hdfs-site.xml文件
<configuration> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration>
下面的命令可以在单机伪分布模式下运行mapreduce的job
1.format the filesystem:
$ bin/hdfs namenode -format
2.start namenode daemon and datanode daemon:
$ sbin/start-dfs.sh
3.the hadoop daemon log output is written to the $hadoop_log_dir directory (defaults to $hadoop_home/logs).4.browse the web interface for the namenode; by default it is available at:
namenode - http://localhost:50070/
make the hdfs directories required to execute mapreduce jobs:
$ bin/hdfs dfs -mkdir /user
$ bin/hdfs dfs -mkdir /user/<username>
5.copy the input files into the distributed filesystem:
$ bin/hdfs dfs -put etc/hadoop input
6.run some of the examples provided:
$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.2.jar grep input output 'dfs[a-z.]+'
7.examine the output files:
copy the output files from the distributed filesystem to the local filesystem and examine them:$ bin/hdfs dfs -get output output
$ cat output/*
orview the output files on the distributed filesystem:
$ bin/hdfs dfs -cat output/*
8.when you're done, stop the daemons with:
$ sbin/stop-dfs.sh
总结
以上就是本文关于hadoop的wordcount实例代码的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站其他相关专题,如有不足之处,欢迎留言指出。感谢朋友们对本站的支持!