基于hadoop的多个reduce 输出
程序员文章站
2022-03-31 18:17:28
...
import java.io.File; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MultipOutputWordCount extends Configured implements Tool { /* * Mapper<Object, Text, Text, IntWritable> * Object ,读取的字节偏移量 * Text Map读取的文本行 * Text Map的输出Key * IntWritable 的输出Value */ public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { //一行行读取文件内容,一行行处理文件 StringTokenizer itr = new StringTokenizer(value.toString());//对输入行切词,eg:Hello World,Hello Hadoop while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one);//<Hello,1>,<World,1>,<Hello,1>,<Hadoop,1> } } } /** * Reducer<Text, IntWritable, Text, IntWritable> * Text:Reduce 输入Key * IntWritable:Reduce的输入Value * Text: Reduce 输出Key 默认类型 * IntWritable,输入Value,默认类型LongWritable */ public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @SuppressWarnings("rawtypes") private MultipleOutputs multipleOutputs; protected void setup(Context context) throws IOException, InterruptedException { multipleOutputs =new MultipleOutputs<Text,IntWritable>(context); } protected void cleanup(Context context) throws IOException, InterruptedException { multipleOutputs.close(); } @SuppressWarnings("unchecked") public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); multipleOutputs.write(NullWritable.get(), new Text(key.toString()+":"+result), "1"); multipleOutputs.write(NullWritable.get(), key, "2"); multipleOutputs.write(NullWritable.get(), "我是你大爷", "3"); } } public static class MultipOutputWordFormat extends MultipleTextOutputFormat<Text, IntWritable>{ } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new MultipOutputWordCount(), args)); } @Override public int run(String[] args) throws Exception { File jarFile = EJob.createTempJar("bin"); ClassLoader classLoader = EJob.getClassLoader(); Thread.currentThread().setContextClassLoader(classLoader); //Hadoop 运行环境 Configuration conf = new Configuration(); conf.set("mapred.job.tracker", "bfdbjc1:12001"); //任务参数设置 //a.创建任务,并设置名称,以便跟踪 Job job = new Job(conf, "word count"); //b.运行主类,Map类,Reduce类 job.setJarByClass(MultipOutputWordCount.class); job.setMapperClass(MultipOutputWordCount.TokenizerMapper.class); job.setReducerClass(MultipOutputWordCount.IntSumReducer.class); //下面两行不需要写,Map默认输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //c.设置Reduce输入输出类型,Map默认出及Reduce默认输入是<Text,IntWritable> job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); //HDFS输入,如果是路径默认读取路径下所有文件. FileInputFormat.addInputPath(job, new Path("hdfs://bfdbjc1:12000/user/work/a.txt")); //reduce 输出路径 FileOutputFormat.setOutputPath(job, new Path("hdfs://bfdbjc1:12000/user/work/output/2da1")); //Eclipse 本地提交 ((JobConf) job.getConfiguration()).setJar(jarFile.toString()); //等待任务运行完成 job.waitForCompletion(true); return 0; } }
推荐阅读
-
shell脚本输出多个主机的网卡速率的方法
-
BJFU—214基于链式存储结构的图书信息表的创建和输出
-
PHP基于cookie与session统计网站访问量并输出显示的方法
-
hadoop基于Linux7的安装配置图文详解
-
基于empty函数的输出详解
-
IE 报错 -- ‘SyntaxError:strict 模式下不允许一个属性有多个定义‘ ,基于vue element-ui页面跳转坑的解决
-
基于ambari2.5.0.3+hdp2.5.3的hadoop集群环境搭建图文教程
-
基于Hadoop的数据压缩与解压缩实例
-
编写程序:输入多个字符串,输出其中最短的字符串
-
Hadoop Map/Reduce的工作流