MapReduce的工作原理及实现案例
MapReduce的工作原理
1、Client向ResourceManager提交任务申请,RM找到NodeManager并启动一个AppMaster,AM通过获取到的分片信息,向RM申请资源,并启动相应数量的maptask;
2、在maptask上读取文件,由TextInputFormat指定读取规则,调用RecordReader方法按行读取,将行号和每行数据组成文件块进行返回,返回的LongWritable和Text将作为Mapper中map方法的入口数据;
3、每次获取的行的偏移量和每一行内容通过map()方法进行逻辑运算形成新的键值对,通过context对象写到OutputCollector中;
4、OutputCollector把收集到的键值对发送到环形缓冲区,就进入了shuffle过程.环形缓冲区的默认大小为100M,当数据累计到其大小的80%,就会触发溢出;
5、spill溢出前是需要对数据进行分区和排序的,如果没有设置分区器,就会调用系统默认的分区方法,对环形缓冲区的每个键值对hash一个partition值,相同值得分为同一个区,分区数跟NumReduceTasks相关。然后再对Key值进行升序排序;
6、环形缓冲区中分区排序后的数据溢出到文件中,如果map阶段处理的数据量较大,则会溢出多个小文件。如果设置了combiner,相同Key的value值相加,当处理数据量较大时,目的是让尽可能少的数据写入到磁盘,提高运行效率,否则文件中将存在多个相同的Key;
7、多个溢出的小文件会被归并排序为一个大文件,大文件中的数据仍然是分区且分区有序的;
8、当mrAppmaster监控到所有的map task任务完成后,reduce task会根据自己的分区号去每个不同的map task节点上去取相同分区的数据,然后将取过来的数据在进行merge合并成一个大文件,大文件是按照k有序的,此时,shuffle过程结束;
9、在reduce task运算过程中,首先调用groupingComparator对大文件的数据进行分组,每次取出一组键值,通过自定义的reduce()方法进行逻辑处理,然后根据OutPutFormat指定的输出格式将数据写到hdfs文件中。输出文件的个数与reduce个数一致。
WordCount的MapReduce代码实现
public class WordMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
//LongWritable, Text(输入类型,行号和每一行内容),Text, IntWritable(输出类型)
private Text word = new Text();
private IntWritable count = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().replaceAll(",|\"|\\.|\\?|!|:|;","").split(" ");
for (String _word : words) {
word.set(_word);
//向reducer传输内容
context.write(word,count);
}
}
}
public class WordReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
private IntWritable count = new IntWritable(0);
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable value:values){
sum += value.get();
}
count.set(sum);
context.write(key,count);
}
}
public class WordTest {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//创建HDFS访问路径配置信息
Configuration config = new Configuration();
config.set("fs.defaultFS","hdfs://192.168.37.200:9000");
//创建mapreduce计算任务
Job job = Job.getInstance(config,"wordCount");
//设置主类,定位外部jar资源
job.setJarByClass(WordTest.class);
//设置任务数:和文件分片数和所存储的DN数有关
job.setNumReduceTasks(2);
//设置分区器Partitioner
job.setPartitionerClass(WordPartitioner.class);
//设置Combiner
job.setCombinerClass(WordReducer.class);
//设置mapper
job.setMapperClass(WordMapper.class);
//设置reducer
job.setReducerClass(WordReducer.class);
//设置mapper输出键值类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reducer输出键值类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置HDFS输入文件绑定job
FileInputFormat.setInputPaths(job,new Path("/kb10/Pollyanna.txt"));
FileOutputFormat.setOutputPath(job,new Path("/kb10/wordCount2"));
//等待所有job步骤完成
System.out.println(job.waitForCompletion(true));
}
}
上一篇: 临终关怀
下一篇: 第一次月测内容疑问总结