欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop MapReduce 实现二次排序

程序员文章站 2022-03-24 13:46:11
...

在CSDN上看了很多做二次排序的,但总感觉自己看的都很蒙蔽,最后请教了老师,自己完成了二次排序,在这里和大家分享一下。

先上代码:

这个是我的 主函数、自定义mapper、自定义reducer

public class myGameFive {
	
	public static void main(String [] args) throws IOException, ClassNotFoundException, InterruptedException {
		//设置一个config对象  连接集群
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS","hdfs://locahost:9000");
		//设置job对象 指定jar map reduce 输出函数类型 格式化方法 输入路径 输出路径 退出
		Job job = Job.getInstance(conf,"myGameThree");
		job.setJarByClass(myGameFive.class);
		job.setMapperClass(myGameMapper.class);
		job.setReducerClass(myGameReducer.class);
		
		
		//如果mapper的 3,4 参数 和reduce的3,4参数一样,则可以省去前两行
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		//格式化方法:一、第一个参数为偏移量 第二个参数为正航内容的值 类型  LongWritable Text
		job.setInputFormatClass(TextInputFormat.class);
		//输入路径
	        FileInputFormat.addInputPath(job, new Path("/pro/mygame/Input/game.log"));
	        //输出路径 如果输出路径存在则删除路径 新建一个空的路径
	        Path outputPath=new Path("/pro/mygame/output/3.51/");
	        FileSystem.get(conf).delete(outputPath,true);
	        FileOutputFormat.setOutputPath(job,outputPath);
	      
	        System.exit(job.waitForCompletion(true)?0:1);
		
		
	}
	

	 public static class myGameMapper extends Mapper<LongWritable, Text , Text, Text>{
		 Text t = new Text();
		 @Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			 String[] strs = value.toString().split("\\s+");
			 t.set(strs[3] + "\t"+strs[strs.length-1]);
			 context.write(new Text(strs[0]),t);
			 
		 }
	  }

	 
	 public static class myGameReducer extends Reducer<Text, Text, Text, NullWritable> {
		
		 ArrayList<String> list = new ArrayList<String>();
		 Text text = new Text();
		 @Override
		protected void reduce(Text key, Iterable<Text> iterable,Context context)
				throws IOException, InterruptedException {
			long sum = 0;
			int loginTimes=0;
			list.clear();
			for (Text text : iterable) {
				list.add(text.toString().split("\\s+")[0]);
				loginTimes++;
				sum +=Integer.parseInt(text.toString().split("\\s+")[1]);
			}
			list.sort(new Comparator<String>() {
				public int compare(String o1, String o2) {
					return o1.compareTo(o2);
				};
			});
			
			text .set(key.toString()+"\t"+sum+"\t"+loginTimes+"\t"+list.get(0));
			context.write(text, NullWritable.get());
		}
	 }
}

 

这个是继承自WritableComparator的自定义排序类

public class SecondSortJob extends WritableComparator {
	String uid;
	Long totalTime;
	Integer loginTimes;
	String earyTime;
	public SecondSortJob() {
		// TODO Auto-generated constructor stub
	}
	public SecondSortJob(String uid, long totalTime, int loginTimes, String earyTime) {
		this.uid = uid;
		this.totalTime = totalTime;
		this.loginTimes = loginTimes;
		this.earyTime = earyTime;
	}
	
    public int compareTo(SecondSortJob o) {
    	int ref = -this.totalTime.compareTo(o.totalTime);
        // 如果totalTime比较结果相同,再比较loginTimes
        if(ref == 0) {
            ref = -this.loginTimes.compareTo(o.loginTimes);
            if (ref == 0) {
				ref = this.earyTime.compareTo(o.earyTime);
			}
        }
        return ref;
    }
    
    @Override
    public int hashCode() {
    	// TODO Auto-generated method stub
    	return super.hashCode();
    }
    
    @Override
    public boolean equals(Object obj) {
    	// TODO Auto-generated method stub
    	return super.equals(obj);
    }
    
	@Override
    public String toString() {
        return uid + "\t" + totalTime + "\t" + loginTimes + "\t" + earyTime ;
    }
	
}

 

实际上比普通的MapReduce程序只多了一个继承自WritableComparator的自定义类来对map里输出的数据进行一个封装,就实现了二次排序,这是为什么呢?

因为map本身就有对key值排序的功能。

而它在排序的时候会调用key对象的compareTo方法,因此,我们只要重写了这个自定义类的compareTo方法,并在map输出时,将数据封装在自定义类中,以key的形式输出,mapReduce就会自行完成二次排序。

就是这么简单。

如果有同学想掌握更深层次的二次排序,那还需要看看其他博友的文章了,如果只是要简单实现二次排序的功能,那这篇就够用了。