Hadoop MapReduce 实现二次排序
程序员文章站
2022-03-24 13:46:11
...
在CSDN上看了很多做二次排序的,但总感觉自己看的都很蒙蔽,最后请教了老师,自己完成了二次排序,在这里和大家分享一下。
先上代码:
这个是我的 主函数、自定义mapper、自定义reducer
public class myGameFive {
public static void main(String [] args) throws IOException, ClassNotFoundException, InterruptedException {
//设置一个config对象 连接集群
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://locahost:9000");
//设置job对象 指定jar map reduce 输出函数类型 格式化方法 输入路径 输出路径 退出
Job job = Job.getInstance(conf,"myGameThree");
job.setJarByClass(myGameFive.class);
job.setMapperClass(myGameMapper.class);
job.setReducerClass(myGameReducer.class);
//如果mapper的 3,4 参数 和reduce的3,4参数一样,则可以省去前两行
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//格式化方法:一、第一个参数为偏移量 第二个参数为正航内容的值 类型 LongWritable Text
job.setInputFormatClass(TextInputFormat.class);
//输入路径
FileInputFormat.addInputPath(job, new Path("/pro/mygame/Input/game.log"));
//输出路径 如果输出路径存在则删除路径 新建一个空的路径
Path outputPath=new Path("/pro/mygame/output/3.51/");
FileSystem.get(conf).delete(outputPath,true);
FileOutputFormat.setOutputPath(job,outputPath);
System.exit(job.waitForCompletion(true)?0:1);
}
public static class myGameMapper extends Mapper<LongWritable, Text , Text, Text>{
Text t = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] strs = value.toString().split("\\s+");
t.set(strs[3] + "\t"+strs[strs.length-1]);
context.write(new Text(strs[0]),t);
}
}
public static class myGameReducer extends Reducer<Text, Text, Text, NullWritable> {
ArrayList<String> list = new ArrayList<String>();
Text text = new Text();
@Override
protected void reduce(Text key, Iterable<Text> iterable,Context context)
throws IOException, InterruptedException {
long sum = 0;
int loginTimes=0;
list.clear();
for (Text text : iterable) {
list.add(text.toString().split("\\s+")[0]);
loginTimes++;
sum +=Integer.parseInt(text.toString().split("\\s+")[1]);
}
list.sort(new Comparator<String>() {
public int compare(String o1, String o2) {
return o1.compareTo(o2);
};
});
text .set(key.toString()+"\t"+sum+"\t"+loginTimes+"\t"+list.get(0));
context.write(text, NullWritable.get());
}
}
}
这个是继承自WritableComparator的自定义排序类
public class SecondSortJob extends WritableComparator {
String uid;
Long totalTime;
Integer loginTimes;
String earyTime;
public SecondSortJob() {
// TODO Auto-generated constructor stub
}
public SecondSortJob(String uid, long totalTime, int loginTimes, String earyTime) {
this.uid = uid;
this.totalTime = totalTime;
this.loginTimes = loginTimes;
this.earyTime = earyTime;
}
public int compareTo(SecondSortJob o) {
int ref = -this.totalTime.compareTo(o.totalTime);
// 如果totalTime比较结果相同,再比较loginTimes
if(ref == 0) {
ref = -this.loginTimes.compareTo(o.loginTimes);
if (ref == 0) {
ref = this.earyTime.compareTo(o.earyTime);
}
}
return ref;
}
@Override
public int hashCode() {
// TODO Auto-generated method stub
return super.hashCode();
}
@Override
public boolean equals(Object obj) {
// TODO Auto-generated method stub
return super.equals(obj);
}
@Override
public String toString() {
return uid + "\t" + totalTime + "\t" + loginTimes + "\t" + earyTime ;
}
}
实际上比普通的MapReduce程序只多了一个继承自WritableComparator的自定义类来对map里输出的数据进行一个封装,就实现了二次排序,这是为什么呢?
因为map本身就有对key值排序的功能。
而它在排序的时候会调用key对象的compareTo方法,因此,我们只要重写了这个自定义类的compareTo方法,并在map输出时,将数据封装在自定义类中,以key的形式输出,mapReduce就会自行完成二次排序。
就是这么简单。
如果有同学想掌握更深层次的二次排序,那还需要看看其他博友的文章了,如果只是要简单实现二次排序的功能,那这篇就够用了。
推荐阅读
-
大数据-Hadoop生态(18)-MapReduce框架原理-WritableComparable排序和GroupingComparator分组
-
mapreduce二次排序详解
-
Hadoop分布环境搭建步骤,及自带MapReduce单词计数程序实现
-
Hadoop学习之路(7)MapReduce自定义排序
-
Hadoop学习之路(6)MapReduce自定义分区实现
-
MapReduce编程实例:二次排序
-
hadoop编程(6)-MapReduce案例:Partitioner应用实例——全局排序
-
Hadoop对文本文件的快速全局排序实现方法及分析
-
Hadoop之MapReduce应用实例2(分组排序)
-
MapReduce实际案例,MapTask运行机制,ReduceTask运行机制,MapReduce执行流程,hadoop数据压缩,Join算法的实现