MapReduce编程实战之WordCount简单案例分析
看图说话!
MapReduce 特点
MapReduce 为什么如此受欢迎?尤其现在互联网+时代,互联网+公司都在使用MapReduce。MapReduce 之所以如此受欢迎,它主要有以下几个特点。
MapReduce 易于编程 。
它简单的实现一些接口,就可以完成一个分布式程序,这个分布
式程序可以分布到大量廉价的 PC 机器运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得 MapReduce 编程变得非常流行。
2、良好的 扩展性 。当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展
它的计算能力。
3、 高容错性 。MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求
它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上面上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由
Hadoop 内部完成的。
4、适合 PB 级以上海量数据的 离线处理 。这里加红字体离线处理,说明它适合离线处理而不
适合在线处理。比如像毫秒级别的返回一个结果,MapReduce 很难做到。
MapReduce 虽然具有很多的优势,但是它也有不擅长的地方。这里的不擅长不代表它不能做,而是在有些场景下实现的效果差,并不适合 MapReduce 来处理,主要表现在以下几个方面:
1、实时计算。MapReduce 无法像 Mysql 一样,在毫秒或者秒级内返回结果
2、流式计算。流式计算的输入数据时动态的,而 MapReduce 的输入数据集是静态的,不能动态变化。这是因为 MapReduce 自身的设计特点决定了数据源必须是静态的。
3、DAG(有向图)计算。多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce 并不是不能做,而是使用后,每个MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
代码实例
package?com.wgy.mapreduce.wc;
import?java.io.IOException;
import?java.util.StringTokenizer;
import?org.apache.hadoop.conf.Configuration;
import?org.apache.hadoop.fs.FileSystem;
import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.io.IntWritable;
import?org.apache.hadoop.io.LongWritable;
import?org.apache.hadoop.io.Text;
import?org.apache.hadoop.mapreduce.Job;
import?org.apache.hadoop.mapreduce.Mapper;
import?org.apache.hadoop.mapreduce.Reducer;
import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public?class?Wordcount {
public?static?class?TokenizerMapper extends
Mapper,>
//这个Mapper类是一个泛型类型,它有四个形参类型,分别指定map函数的输入键、输入值、输出键、输出值的类型。hadoop没有直接使用Java内嵌的类型,而是自己开发了一套可以优化网络序列化传输的基本类型。这些类型都在org.apache.hadoop.io包中。
//比如这个例子中的Object类型,适用于字段需要使用多种类型的时候,Text类型相当于Java中的String类型,IntWritable类型相当于Java中的Integer类型
{
//定义两个变量
//private final static LongWritable one=new LongWritable(1);
private?final?static?IntWritable one?= new?IntWritable(1);//这个1表示每个单词出现一次,map的输出value就是1.
private?Text word?= new?Text();//每行数据
//实现map函数
public?void?map(Object key, Text value, Context context)
//context它是mapper的一个内部类,简单的说*接口是为了在map或是reduce任务中跟踪task的状态,很自然的MapContext就是记录了map执行的上下文,在mapper类中,这个context可以存储一些job conf的信息,比如job运行时参数等,我们可以在map函数中处理这个信息,这也是Hadoop中参数传递中一个很经典的例子,同时context作为了map和reduce执行中各个函数的一个桥梁,这个设计和Java web中的session对象、application对象很相似
//简单的说context对象保存了作业运行的上下文信息,比如:作业配置信息、InputSplit信息、任务ID等
//我们这里最直观的就是主要用到context的write方法。
throws?IOException, InterruptedException {
//The tokenizer?uses the default delimiter set, which is " \t\n\r": the space character, the tab character, the newline character, the carriage-return character
String line= value.toString(); ??// 将输入的纯文本文件的数据转化成String
?// 将输入的数据首先按行进行分割
StringTokenizer itr?= new?StringTokenizer(line);//将Text类型的value转化成字符串类型
//StringTokenizer是字符串分隔解析类型,StringTokenizer 用来分割字符串,你可以指定分隔符,比如',',或者空格之类的字符。
while?(itr.hasMoreTokens()) {//hasMoreTokens() 方法是用来测试是否有此标记生成器的字符串可用更多的标记。
//java.util.StringTokenizer.hasMoreTokens()
word.set(itr.nextToken());//nextToken()这是 StringTokenizer 类下的一个方法,nextToken() 用于返回下一个匹配的字段。
context.write(word, one);
}
}
}
public?static?class?IntSumReducer extends
Reducer
private?IntWritable result?= new?IntWritable();
//实现reduce函数
public?void?reduce(Text key, Iterable
Context context) throws?IOException, InterruptedException {
int?sum?= 0;
for?(IntWritable val?: values) {
sum?+= val.get();
}
result.set(sum);
context.write(key, result);
}
?}
public?static?void?main(String[] args) throws?Exception {
Configuration conf?= new?Configuration();
//Configuration类代表作业的配置,该类会加载mapred-site.xml、hdfs-site.xml、core-site.xml等配置文件。
//删除已经存在的输出目录
Path mypath?= new?Path("hdfs://cdh001:8020/user/wordcount-out");//输出路径
FileSystem hdfs?= mypath.getFileSystem(conf);//获取文件系统
//如果文件系统中存在这个输出路径,则删除掉,保证输出目录不能提前存在。
if?(hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
}
//job对象指定了作业执行规范,可以用它来控制整个作业的运行。
Job job?= Job.getInstance();// new Job(conf, "word count");
job.setJarByClass(Wordcount.class);//我们在hadoop集群上运行作业的时候,要把代码打包成一个jar文件,然后把这个文件
//传到集群上,然后通过命令来执行这个作业,但是命令中不必指定JAR文件的名称,在这条命令中通过job对象的setJarByClass()
//中传递一个主类就行,hadoop会通过这个主类来查找包含它的JAR文件。
job.setMapperClass(TokenizerMapper.class);
//job.setReducerClass(IntSumReducer.class);
job.setCombinerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//一般情况下mapper和reducer的输出的数据类型是一样的,所以我们用上面两条命令就行,如果不一样,我们就可以用下面两条命令单独指定mapper的输出key、value的数据类型
//job.setMapOutputKeyClass(Text.class);
//job.setMapOutputValueClass(IntWritable.class);
//hadoop默认的是TextInputFormat和TextOutputFormat,所以说我们这里可以不用配置。
//job.setInputFormatClass(TextInputFormat.class);
//job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new?Path(
"hdfs://cdh001:8020/user/worldcount.txt"));//FileInputFormat.addInputPath()指定的这个路径可以是单个文件、一个目录或符合特定文件模式的一系列文件。
//从方法名称可以看出,可以通过多次调用这个方法来实现多路径的输入。
FileOutputFormat.setOutputPath(job, new?Path(
"hdfs://cdh001:8020/user/wordcount-out"));//只能有一个输出路径,该路径指定的就是reduce函数输出文件的写入目录。
//特别注意:输出目录不能提前存在,否则hadoop会报错并拒绝执行作业,这样做的目的是防止数据丢失,因为长时间运行的作业如果结果被意外覆盖掉,那肯定不是我们想要的
System.exit(job.waitForCompletion(true) ? 0 : 1);
????//使用job.waitForCompletion()提交作业并等待执行完成,该方法返回一个boolean值,表示执行成功或者失败,这个布尔值被转换成程序退出代码0或1,该布尔参数还是一个详细标识,所以作业会把进度写到控制台。
//waitForCompletion()提交作业后,每秒会轮询作业的进度,如果发现和上次报告后有改变,就把进度报告到控制台,作业完成后,如果成功就显示作业计数器,如果失败则把导致作业失败的错误输出到控制台
}
}