欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop自定义分组Group

程序员文章站 2022-03-02 16:57:49
...
matadata:
hadoop  a  
spark   a  
hive    a  
hbase   a  
tachyon a  
storm   a  
redis   a  


自定义分组
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class MyGroup {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if(otherArgs.length!=2){
			System.err.println("Usage databaseV1 <inputpath> <outputpath>");
		}
		
		Job job = Job.getInstance(conf, MyGroup.class.getSimpleName() + "1");
		job.setJarByClass(MyGroup.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setMapperClass(MyMapper1.class);
		job.setGroupingComparatorClass(MyGroupComparator.class);
		job.setReducerClass(MyReducer1.class);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		job.waitForCompletion(true);
	}
	public static class MyMapper1 extends Mapper<LongWritable, Text, Text, Text>{
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String[] spl=value.toString().split("\t");
			context.write(new Text(spl[0].trim()), new Text(spl[1].trim()));
		}
	}
	public static class MyReducer1 extends Reducer<Text, Text, Text, Text>{
		@Override
		protected void reduce(Text k2, Iterable<Text> v2s, Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			Long count=0L;
			for (@SuppressWarnings("unused") Text v2 : v2s) {
				count++;
				context.write(new Text("in--"+k2), new Text(count.toString()));
			}
			context.write(new Text("out--"+k2), new Text(count.toString()));
		}
	}
	public static class MyGroupComparator extends WritableComparator{
		public MyGroupComparator(){
			super(Text.class,true);
		}
		@SuppressWarnings("rawtypes")
		public int compare(WritableComparable a, WritableComparable b) {
			Text p1 = (Text) a;
			Text p2 = (Text) b;
			p1.compareTo(p2);
			return  0;
		  }
	}
}


结果
in--hadoop      1
in--hbase       2
in--hive        3
in--redis       4
in--spark       5
in--storm       6
in--tachyon     7
out--tachyon    7


然后看下默认分组
public static class MyGroupComparator extends WritableComparator{
		public MyGroupComparator(){
			super(Text.class,true);
		}
		@SuppressWarnings("rawtypes")
		public int compare(WritableComparable a, WritableComparable b) {
			Text p1 = (Text) a;
			Text p2 = (Text) b;
			return p1.compareTo(p2);
		  }
	}


结果
in--hadoop      1
out--hadoop     1
in--hbase       1
out--hbase      1
in--hive        1
out--hive       1
in--redis       1
out--redis      1
in--spark       1
out--spark      1
in--storm       1
out--storm      1
in--tachyon     1
out--tachyon    1


通过对比,自定义分组就很容易理解了
相关标签: hadoop