MapReduce自定义数据类型
程序员文章站
2022-06-07 18:42:13
...
MapReduce自定义数据类型
原因
- 首先我们要将自己定义的类进行MapReduce计算的时候自定义数据类型就有了它存在的必要性,自定数据类型还可以实现二次排序,自定义数据类型扩大了数据在MapReduce之间传输的局限性。
做法
- 需要在自定义的类上实现Hadoop规则的数据序列化,如果有必要的话要重新定义这个类的比较规则,因为这些数据要在不同主机上进行传输,想要实现传输那么就得必须实现它规则下的序列化。
具体的操作
- 首先这个类得实现WritableComparable接口
- 接着实现 write()方法,这个是序列化数据的方法,注意:这个类中的每个元素都得进行序列化
- 然后实现 readFields()方法,这个是反序列化数据的方法,注意:这个类中的每个数据都得进行反序列化
- 最后可以实现compareTo()方法,这个是自定义比较规则,便于reduce端的聚合。
实际的例子
例子:在input文件中存的是学生的单科目的成绩,现在要进行一个按总分的高低进行排序,排出的结果按总分的由高到低输出到output文件中去。
-
输入文件
input.txt -------------------- lh 92 68 70 zyt 94 88 75 ls 96 78 78 hgw 90 70 56 yxx 80 88 73 hz 90 98 70 xyd 60 88 73 hj 90 58 70 cs 50 58 11 --------------------
-
自定义数据类型
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @description 学生的分数类 * @author: LuoDeSong [email protected] * @create: 2018-09-26 22:02:18 **/ public class StudentScore implements WritableComparable<StudentScore> {//这里是实现了WritableComparable这个接口 //这个是语数外三科分数的三个元素 private double cH; private double mT; private double eG; public StudentScore() { super(); } public StudentScore(double cH, double mT, double eG) { super(); this.cH = cH; this.mT = mT; this.eG = eG; } public double getcH() { return cH; } public void setcH(double cH) { this.cH = cH; } public double getmT() { return mT; } public void setmT(double mT) { this.mT = mT; } public double geteG() { return eG; } public void seteG(double eG) { this.eG = eG; } //实现write方法,为的是将三个元素进行序列化,便于传输 @Override public void write(DataOutput out) throws IOException { out.writeDouble(cH); out.writeDouble(mT); out.writeDouble(eG); } //实现了readFileds方法,为的是将序列化的元素反序列化回来,便于读取 @Override public void readFields(DataInput in) throws IOException { this.cH = in.readDouble(); this.mT = in.readDouble(); this.eG = in.readDouble(); } //重新定义了比较规则,是以三科的总分进行排序 @Override public int compareTo(StudentScore o) { double one = this.cH + this.mT + this.eG; double tow = o.cH + o.eG + o.mT; return one < tow ? 1 : -1; } }
-
书写mapper代码
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.qianfeng.bigdata.bean.StudentScore; import java.io.IOException; /** * @description * @author: LuoDeSong [email protected] * @create: 2018-09-27 12:59:09 **/ /* 这里的第三个参数StudentScore就是我们自己定义的数据类型,实现前面的规则的之后,我们就可以将这个数据类型使用在MapReduce中进行传输 */ public class SortStudentScoreMapper extends Mapper<LongWritable, Text, StudentScore, Text> { StudentScore studentScore = new StudentScore(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(" "); double cH = Double.parseDouble(split[1]); double mT = Double.parseDouble(split[2]); double eG = Double.parseDouble(split[3]); studentScore.setcH(cH); studentScore.setmT(mT); studentScore.seteG(eG); context.write(studentScore, new Text(split[0]));//将使用自定数据类型传给reducer端让他聚合。 } }
-
书写reducer代码
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.qianfeng.bigdata.bean.StudentScore; import java.io.IOException; /** * @description * @author: LuoDeSong [email protected] * @create: 2018-09-27 15:46:54 **/ /* 这里的StudentScore就是Mapper端传过来的自定义数据类型,因为这个类型自定义了比较规则,所以在reducer里聚合的时候就会自动按照我们定义的总分和 高低进行排序。 */ public class SortStudentScoreReducer extends Reducer<StudentScore, Text, Text, Text> { //设置输出文件的文件头格式 @Override protected void setup(Context context) throws IOException, InterruptedException { context.write(new Text("按总成绩排序后:"), new Text("")); } //输出数据到文件中去 @Override protected void reduce(StudentScore key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for(Text text : values) { context.write(text, new Text( "总成绩为:" + (key.getcH() + key.geteG() + key.getmT()))); } } }
-
书写driver驱动器代码
mport org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.qianfeng.bigdata.bean.StudentScore; import org.qianfeng.bigdata.mapreduce.sortstudentscore.mapper.SortStudentScoreMapper; import org.qianfeng.bigdata.mapreduce.sortstudentscore.reducer.SortStudentScoreReducer; import java.io.IOException; /** * @description * @author: LuoDeSong [email protected] * @create: 2018-09-27 15:52:46 **/ public class SortStudentScoreDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); //构建Job类的对象 Job job = Job.getInstance(conf); //给当前job类的对象设置job名称 job.setJobName("erery subject v2 avg score"); //设置运行主类 job.setJarByClass(SortStudentScoreDriver.class); //设置job的Mapper及其输出K,V的类型 job.setMapperClass(SortStudentScoreMapper.class); job.setMapOutputKeyClass(StudentScore.class); job.setMapOutputValueClass(Text.class); //本程序不要自定义实现Reducer //设置job的输出K,V的类型,也可以说是Reducer输出的K,V的类型 job.setReducerClass(SortStudentScoreReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置要处理的HDFS上的文件的路径 FileInputFormat.addInputPath(job,new Path("E:\\Test-workspace\\test\\input\\five.txt")); //设置最终输出结果的路径 FileOutputFormat.setOutputPath(job,new Path("E:\\Test-workspace\\test\\output\\six")); //等待程序完成后自动结束程序 System.exit(job.waitForCompletion(true)?0:1); } }
-
结果样例
output --------------------- 按总成绩排序后: hz 总成绩为:258.0 zyt 总成绩为:257.0 ls 总成绩为:252.0 yxx 总成绩为:241.0 lh 总成绩为:230.0 xyd 总成绩为:221.0 hj 总成绩为:218.0 hgw 总成绩为:216.0 cs 总成绩为:119.0 --------------------
总结:自定义数据类型突破了MapReduce数据传输的单一性,扩大了传输的数据类型。结合自定义比较规则也修改了reducer的聚合规则。
上一篇: 200行描述MVC