欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

基于hadoop的多个reduce 输出

程序员文章站 2022-03-31 18:12:45
...
import java.io.File;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MultipOutputWordCount extends Configured implements Tool {
	/*
	 * Mapper<Object, Text, Text, IntWritable>
	 * Object ,读取的字节偏移量
	 * Text Map读取的文本行
	 * Text Map的输出Key
	 * IntWritable 的输出Value
	 */
	public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			//一行行读取文件内容,一行行处理文件
			StringTokenizer itr = new StringTokenizer(value.toString());//对输入行切词,eg:Hello World,Hello Hadoop
			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				context.write(word, one);//<Hello,1>,<World,1>,<Hello,1>,<Hadoop,1>
			}
		}
	}

	/**
	 *   Reducer<Text, IntWritable, Text, IntWritable> 
	 * Text:Reduce 输入Key
	 * IntWritable:Reduce的输入Value
	 * Text: Reduce 输出Key 默认类型
	 * IntWritable,输入Value,默认类型LongWritable
	 */
	public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();
		@SuppressWarnings("rawtypes")
		private MultipleOutputs multipleOutputs;

		protected void setup(Context context) throws IOException, InterruptedException {
			multipleOutputs =new MultipleOutputs<Text,IntWritable>(context);
		}
		
		protected void cleanup(Context context) throws IOException,
				InterruptedException {
			multipleOutputs.close();
		}
		@SuppressWarnings("unchecked")
		public void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			result.set(sum);
			multipleOutputs.write(NullWritable.get(), new Text(key.toString()+":"+result), "1");
			multipleOutputs.write(NullWritable.get(), key, "2");
			multipleOutputs.write(NullWritable.get(), "我是你大爷", "3");
		}
	}
	
	public static class MultipOutputWordFormat extends MultipleTextOutputFormat<Text, IntWritable>{
		
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new MultipOutputWordCount(), args));
	}

	@Override
	public int run(String[] args) throws Exception {
		File jarFile = EJob.createTempJar("bin");
		ClassLoader classLoader = EJob.getClassLoader();
		Thread.currentThread().setContextClassLoader(classLoader);
		//Hadoop 运行环境
		Configuration conf = new Configuration();
		conf.set("mapred.job.tracker", "bfdbjc1:12001");
		
		//任务参数设置
		  //a.创建任务,并设置名称,以便跟踪
		Job job = new Job(conf, "word count");
		  //b.运行主类,Map类,Reduce类
		job.setJarByClass(MultipOutputWordCount.class);
		job.setMapperClass(MultipOutputWordCount.TokenizerMapper.class);
		job.setReducerClass(MultipOutputWordCount.IntSumReducer.class);
		//下面两行不需要写,Map默认输出类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		  //c.设置Reduce输入输出类型,Map默认出及Reduce默认输入是<Text,IntWritable>
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);
		
		//HDFS输入,如果是路径默认读取路径下所有文件.
		FileInputFormat.addInputPath(job, new Path("hdfs://bfdbjc1:12000/user/work/a.txt"));
		//reduce 输出路径
		FileOutputFormat.setOutputPath(job, new Path("hdfs://bfdbjc1:12000/user/work/output/2da1"));
		
		//Eclipse 本地提交
		((JobConf) job.getConfiguration()).setJar(jarFile.toString());
		
		//等待任务运行完成
		 job.waitForCompletion(true);
		 return 0;
	}
}