Hadoop系列修炼入门笔记14
程序员文章站
2022-03-15 11:19:17
核心内容:
多输入
MutltiInputs(多输入);使用多个输入作为job的输入来源。
也就是在InputFormat 前把添加各种不同的序列源
里面的方法也就是 addInput...
核心内容:
多输入
MutltiInputs(多输入);使用多个输入作为job的输入来源。
也就是在InputFormat 前把添加各种不同的序列源
里面的方法也就是 addInputPath等等。。。。
map也可以在这个流程中套进来。
1.文本件TextFile的Map函数:
package com.shujuelin.hdfs.mr.shujuelin.hdfs.mr.multiinput; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 文本文件Text的Map */ public class WCTextMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Text keyOut = new Text(); IntWritable valueOut = new IntWritable(); String[] arr = value.toString().split(" "); for(String s : arr){ keyOut.set(s); valueOut.set(1); context.write(keyOut,valueOut); } } }2.序列文件SequenceFile的map函数:
package com.shujuelin.hdfs.mr.shujuelin.hdfs.mr.multiinput; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** *Seqt 序列文件的 Map */ public class WCSeqtMapper extends Mapper { protected void map(IntWritable key, Text value, Context context) throws IOException, InterruptedException { Text keyOut = new Text(); IntWritable valueOut = new IntWritable(); String[] arr = value.toString().split(" "); for(String s : arr){ keyOut.set(s); valueOut.set(1); context.write(keyOut,valueOut); } } }3、Reduce函数:
package com.shujuelin.hdfs.mr.shujuelin.hdfs.mr.multiinput; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import javax.xml.soap.Text; import java.io.IOException; public class WCReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int count = 0 ; for(IntWritable iw : values){ count = count + iw.get() ; } String tno = Thread.currentThread().getName(); System.out.println(tno + " : WCReducer :" + key.toString() + "=" + count); context.write(key,new IntWritable(count)); } }4、主函数
package com.shujuelin.hdfs.mr.shujuelin.hdfs.mr.multiinput; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import javax.xml.soap.Text; import java.io.IOException; /** * 多个输出 */ public class WCApp { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); Job job = Job.getInstance(conf); //设置job的各种属性 job.setJobName("WCAppMulti"); //作业名称 job.setJarByClass(WCApp.class); //搜索类 //多个输入 MultipleInputs.addInputPath(job,new Path("file///e:/compress/hadoop01"),TextInputFormat.class,WCTextMapper.class); MultipleInputs.addInputPath(job,new Path("file///e:/compress/hadoop01"),SequenceFileInputFormat.class,WCSeqtMapper.class); //设置输出 FileOutputFormat.setOutputPath(job,new Path(args[0])); job.setReducerClass(WCReducer.class); //reducer类 job.setNumReduceTasks(3); //reduce个数 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // job.waitForCompletion(true); } }end:坚持
推荐阅读
-
剑指前端(前端入门笔记系列)——DOM(元素节点)
-
剑指前端(前端入门笔记系列)——DOM(属性节点)
-
剑指前端(前端入门笔记系列)——数组(方法)
-
C# 基础知识系列- 14 IO篇之入门IO
-
iPhone 14系列配置曝光 苹果拉大差距!入门版内存劝退?
-
3000元档极致性价比 荣耀MagicBook 14系列笔记本首发
-
iPhone 14系列配置曝光 苹果拉大差距!入门版内存劝退?
-
Android入门到精通|安卓/Android开发零基础系列Ⅱ【职坐标】-学习笔记(1)-- 常用控件及资源介绍
-
RHEL8.0快速入门系列笔记--Linux高级权限管理及ACL策略(九)
-
Hadoop系列修炼笔记12