欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

mapreduce 求最大值最小值问题

程序员文章站 2022-03-31 18:19:28
...
import java.io.File;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class GetMinMaxKeyMapReduce {
	
	public static class GetMinMaxKeyMap extends Mapper<Object, Text, Text,Text> {
		private Text min = new Text();
		private Text max = new Text();
		private Long i = new Long(0);
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			String[] strs = value.toString().split("\t");
			if (strs!=null && strs.length>5 &&strs[3].length() > 20 && strs[3].indexOf(" ") == -1 && strs[3].indexOf("=") == -1) {
				if(i==0){
					min= new Text(strs[3]);
					max= new Text(strs[3]);
				}
				if(strs[3].compareTo(min.toString())<0){
					min=new Text(strs[3]);
				}
				if(strs[3].compareTo(max.toString())>0){
					max=new Text(strs[3]);
				}
				i++;
			}
		}
		
		@Override
		protected void cleanup(Context context) throws IOException, InterruptedException {
			context.write(new Text("min"), min);
			context.write(new Text("max"), max);
		}
	}

	public static class GetMinMaxKeyReducer extends Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
			String result ="";
			for (Text value : values) {
				if(result.equals("")){
					result = value.toString();
				}
				if (("min").equals(key.toString())) {
					if(value.toString().compareTo(result)<0){
						result=value.toString();
					}
				} else if (("max").equals(key.toString())) {
					if(value.toString().compareTo(result)>0){
						result=value.toString();
					}
				} else {
					System.err.println("未知reduce 输入key:" + key.toString());
				}
			}
			context.write(key, new Text(result));
		}
	}

	public static void main(String[] args) throws Exception { 
		File jarFile = EJob.createTempJar("bin");
		ClassLoader classLoader = EJob.getClassLoader();
		Thread.currentThread().setContextClassLoader(classLoader);
		
		//Hadoop 运行环境
		Configuration conf = new Configuration();
		conf.set("mapred.job.tracker", "bfdbjc1:12001");;
		
		//任务参数设置
		Job job = new Job(conf, "GetMinMaxKey");

		job.setJarByClass(GetMinMaxKeyMapReduce.class);
		job.setMapperClass(GetMinMaxKeyMap.class);
		job.setReducerClass(GetMinMaxKeyReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job, new Path("hdfs://bfdbjc1:12000/user/work/tables2/raw_kafka/l_date=2013-09-15"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://bfdbjc1:12000/user/work/output/minmaxkey/"));
		
		//Eclipse 本地提交
		((JobConf) job.getConfiguration()).setJar(jarFile.toString());
		
		//等待任务运行完成
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}