大数据私房菜--MapReduce实战案例
1. MapReduce需求
有三个文件file1、file2、file3,文件中每一行都是一个数字,MapReduce程序读取这三个文件,对三个文件中的数字进行整体升序排序,并输出到一个结果文件中,结果文件中的每一行有两个数字(两个数字之间使用制表符分隔),第一个数字代表排序,第二个数字代表原始数据
2.需求分析
-
输入:三个文件每一行为一个数字
-
输出:对三个文件的数字进行全排序,按升序排序,并输出其对应的序号
-
分析:对三个文件的数字进行全排序,则只用到一个Reduce,那么使用默认值即可。我们需要充分运用MapRudce的排序,即Map端的快速排序,溢写合并过程的归并排序,这时候我们只需要在Map端将数字作为key并指向一个Reduce,即可完成升序的全排序(默认为升序)。另外需要输出一个序号,那么序号的来源可以直接通过索引值获取即可,因为我们的经过shuffle排序之后,就已经得到了一个升序的数列,那么递归循环这个数列就可以得到每一个数字对应的序号
-
Map的K-V设计:需要利用map的排序就要把数字作为key,那么Map输出的<K, V>我们可以设计为<IntWritable, NullWritable>
-
Reduce的K-V设计:默认的numPartitions为1,这正是我们需要的,把所有的Map输出结果集都输入到同一个ReduceTask里面去排序之后,ReduceTask通过for循环输出k的值,并把索引值定义为其序号,然后将<索引值, key>构造为一个Text对象,作为Reduce的输出即可完成需求
3.Mapper设计
package com.lagou.order;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* # _*_ coding: utf-8 _*_
* # @Time : 2020/7/7 13:00
* # @Author : Depa
* # @Version:V 0.1
**/
public class OrderMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> {
@Override
protected void map(final LongWritable key, final Text value, final Context context) throws IOException, InterruptedException {
// 1.读取一行文本转换为Int
final int num = Integer.parseInt(value.toString());
// 2.将num作为key的输出,Map输出<num , Null>
context.write( new IntWritable(num), NullWritable.get());
}
}
- 新建一个OrderMapper对象,继承Mapper;输入为<LongWritable, Text>,输出为<IntWritable, NullWritable>;Mapper的输出按照需求分析可以知道,我们把文件里面的数字放到Map的输出key当中即可,再把map的结果发到一个reduce当中去进去全排序
- 之后我们重写map方法,将文本的每一行通过Integer.parseInt方法转换成一个整型数值
- 之后我们将num作为key的输出,value赋予一个空值。即context.write( new IntWritable(num), NullWritable.get());
3.Reducer设计
package com.lagou.order;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* # _*_ coding: utf-8 _*_
* # @Time : 2020/7/7 13:12
* # @Author : Depa
* # @Version:V 0.1
**/
public class OrderReducer extends Reducer<IntWritable, NullWritable, Text, NullWritable> {
Text k = new Text();
int i = 0;
@Override
protected void reduce(final IntWritable key, final Iterable<NullWritable> values, final Context context) throws IOException, InterruptedException {
for (final NullWritable value : values) {
System.out.println("order:"+ i + "\t" + "value: "+key.toString());
i ++ ;
String tmp = i + "\t" + key.toString();
k.set(tmp);
context.write(k, NullWritable.get());
}
}
}
通过map阶段的排序之后,默认的partition分区数就是1,我们在shuffle中完成排序之后,在reduce中通过循环写reduce的输出,即索引值作为序号,key值作为也作为输出。我们设置全局变量Text作为reduce的输出,定义个i 作为序号逐渐递增。
4.DRIVER
package com.lagou.order;
import com.lagou.mr.partition.CustomPartitioner;
import com.lagou.sort.SortDriver;
import com.lagou.sort.SortMapper;
import com.lagou.sort.SortReducer;
import com.lagou.sort.SpeakBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* # _*_ coding: utf-8 _*_
* # @Time : 2020/7/7 13:33
* # @Author : Depa
* # @Version:V 0.1
**/
public class OrderDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
/*
1. 获取配置文件对象,获取job对象实例
2. 指定程序jar的本地路径
3. 指定Mapper/Reducer类
4. 指定Mapper输出的kv数据类型
5. 指定最终输出的kv数据类型
6. 指定job处理的原始数据路径
7. 指定job输出结果路径
8. 提交作业
*/
// 1. 获取配置文件对象,获取job对象实例
final Configuration conf = new Configuration();
final Job job = Job.getInstance(conf, "OrderDriver");
// 2. 指定程序jar的本地路径
job.setJarByClass(OrderDriver.class);
// 3. 指定Mapper/Reducer类
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);
// 4. 指定Mapper输出的kv数据类型
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
// 5. 指定最终输出的kv数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 6.1 初始化inputpath和outpath,并判断output目录是否存在,存在将其删除
Path inputPath = new Path("E:/input/");
Path outputPath = new Path("E:/output/");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
// 6. 指定job输入数据路径
FileInputFormat.setInputPaths(job, inputPath); //指定读取数据的原始路径
// 7. 指定job输出结果路径
FileOutputFormat.setOutputPath(job, outputPath); //指定结果数据输出路径
// 8. 提交作业
final boolean flag = job.waitForCompletion(true);
//jvm退出:正常退出0,非0值则是错误退出
System.exit(flag ? 0 : 1);
}
}
通过driver去驱动MapReduce任务,在driver中我们加入了判断output目录是否存在,如果存在就删除,这样就可以方便我们一直重复运行。
5 数据验证
输入文件:
输出文件:
上一篇: Unity中Scale详解
下一篇: 让我头疼一下午的Excel合并单元格