MapReduce原理及编程(含词频统计编程实例)
目录
1、MapReduce基础
1、什么是MapReduce
是一个分布式计算框架,它将大型数据操作作业分解为可以跨服务器集群并行执行的单个任务;
适用于大规模数据处理场景;
每个节点处理存储在该节点的数据。
2、MapReduce的设计思想
1、分而治之
简化并行计算的编程模型
2、构建抽象模型map和reduce
开发人员专注于实现Mapper和Reducer函数
3、隐藏系统层细节
开发人员专注于业务逻辑实现
3、MapReduce特点
1、优点
易于编程
可扩展性
高容错性
高吞吐量
2、不适用领域
难以实时计算
不适合流式计算
2、MapReduce编程模型
MapReduce由Map和Reduce组成,Map表示映射,Reduce表示归约;
编程模型图如下:
MapReduce编程三部曲
(1)输入Input。MapReduce输入一系列k1/v1。
(2)Map和Reduce阶段:
map: (k1,v1) → list (k2,v2)
reduce: (k2,list(v2)) → list (k3,v3)
其中k2/v2是中间结果对。
(3)输出Output:MapReduce输出一系列k2/v3
3、MapReduce词频统计编程实例
需求:使用MapReduce编程完成词频统计,文档内容如下:
Hello Hadoop BigData
Hello Hadoop MapReduce
Hello Hadoop HDFS
BigData Perfect
1、新建maven工程,添加pom文件依赖关系
版本号根据需求填写,代码如下:
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.6.0</version>
</dependency>
</dependencies>
2、编写WCMapper类
MapReduce API中提供了抽象类Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>,需要继承该抽象类并重写map方法,如下:
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
LongWritable key:表示输入的key;
Text value:一行文本内容,表示输入的value;
Context context:记录的是Map端的整个上下文,可以使用该对象将数据写到磁盘
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
* @Date 2020/9/10
* @Description
public class WCMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
context.write(new Text(word),new IntWritable(1));
}
}
}
Map阶段:并行读取文件,对每个单词执行重写后map(),形成<key,value>对。
读第一行:Hello Hadoop Bigdata,读取结果:<Hello,1><Hadoop,1><BigData,1>
读第二行:hello Hadoop MapReduce,读取结果:<Hello,1><Hadoop,1><MapReduce,1>
读第三行:Hello Hadoop HDFS,读取结果:<Hello,1><Hadoop,1><HDFS,1>
读第四行:BigData Perfect,读取结果:<BigData,1><Perfect,1>
3、编写WCReducer类
MapReduce API中提供了抽象类Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>,需要继承该抽象类并重写map方法,如下:
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
Text key:Map端输出key值
Iterable<IntWritable> values:Map端输出value集合(相同key的集合)
Context context:Reduce端上下文,与Map端参数作用一致
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WCReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int total=0;
for (IntWritable value : values) {
total+=value.get();
}
context.write(key,new IntWritable(total));
}
}
Reduce阶段:对Map的结果进行排序、合并,最后得出词频结果。注意:reduce()执行前会经过Shuffle(混排)等过程,将形成的Map中间结果中相同的key组合成value数组,如下:
<BigData,[1,1]><Hadoop,[1,1,1]><HDFS,[1]><Hello,[1,1,1]><MapReduce,[1]><Perfect,[1]>
然后Reduce端循环执行Reduce(K,V[]),分别统计每个单词出现的次数,得到结果如下:
<BigData,2><Hadoop,3><HDFS,1><Hello,3><MapReduce,1><Perfect,1>
4、编写WCDriver驱动类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.FileOutputStream;
import java.io.IOException;
public class WCDriver {
public static void main(String[] args) throws Exception {
//1、建立连接
Configuration cfg=new Configuration();
Job job = Job.getInstance(cfg,"job_wc");
job.setJarByClass(WCDriver.class);
//2、指定mapper和reducer
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
//指定mapper输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定partitioner
//job.setNumReduceTasks(4);
//job.setPartitionerClass(WCPartitioner.class);
//指定reduce输出类型
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定输入输出路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//3、运行
boolean result = job.waitForCompletion(true);
System.out.println(result ? "成功" : "失败");
System.exit(result?0:1);
}
}
如果想在Map端对Key进行分区,可编写WCPartitioner类,如下:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class WCPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text text, IntWritable intWritable, int i) {
return Math.abs(text.hashCode())%i;
}
}
5、运行
可以直接在java运行,args[0]是本地文件路径,args[1]是结果写入文件路径。
可以打jar包上传至Linux上操作:(jar包名wc.jar,假设传到opt目录下)
上传测试数据:hdfs dfs -mkdir /test
,hdfs dfs -put a.txt /test
opt目录下运行:hadoop jar wc.jar cn.kgc.kb09.mr.WCDriver /test/a.txt /test/Result
运行后会生成Result目录,
Result目录下生成的文件,
查看结果:hdfs dfs -cat /test/Result/part-*