【MapReduce】一个没有足够经验无法解决的简单基础MapReduce问题
一. 事件起因
我在学习Hadopp相关知识时做了一道极为简单的练习题,然而在解决问题的过程中,却引发了一个对我而言,极为困难,一个礼拜才勉强解决的惊天大BUG。(小弟才疏学浅,各位的独到见解和批评讨论都可以留言,虚心接受)
二.练习原题以及数据准备
问题题目
(两表 inner join)
在 hdfs 目录/tmp/table/student 中存在 stu.txt 文件,按 tab分隔,字段名为(学号,姓名,课程号,班级名称),
hdfs 目录/tmp/table/student_location 中存在 stu_location.txt 文件,按 tab 分隔,字段名为(学号,省份,城市,区名),
对两个 hdfs目录的按学号求交集,输出结果结构按 tab 分隔后的四个字段为(学号,姓名,课程号,班级名称)
文件内容
stu.txt 文件内容:
stu_location.txt 文件内容:
三.初次实现代码
(略长可选择式查看,或只看实现思路)
实现思路:
(1)在map阶段读取参数文件,即stu.txt和stu_location.txt,如果这个文件的记录切割完有4个元素,说明他是学生信息,有5个元素说明他是地址信息,将两个信息都按照学号分组向reduce输出。
(2)在reduce阶段按学号分组,接收分组信息,并存入"inflist"集合中,如果inflist中的这个元素的size==2,说明他既有学生信息,也有地址信息,那么就是需要的元素,拼接成字符串输出即可。
public class DIY15 {
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(DIY15.class);
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(SinfoWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileInputFormat.addInputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
System.exit(job.waitForCompletion(true)?0:1);
}
//Mapper
public static class JoinMapper extends Mapper<LongWritable, Text, IntWritable, SinfoWritable>{
SinfoWritable sin=new SinfoWritable();
@Override
protected void map(
LongWritable key,Text value,Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String[] fileds=value.toString().split("\\t");
if(fileds!=null&&fileds.length==4){
sin.set(Integer.parseInt(fileds[0]),fileds[1], Integer.parseInt(fileds[2]), fileds[3],
"", "", "", "");
}else if (fileds!=null&&fileds.length==5) {
sin.set(Integer.parseInt(fileds[0]), "", 0, "",
fileds[1], fileds[2],fileds[3],fileds[4]);
}else {
return;
}
context.write(new IntWritable(Integer.parseInt(fileds[0])), sin);
}
}
//Reducer
public static class JoinReducer extends Reducer<IntWritable, SinfoWritable, Text, NullWritable>{
@Override
protected void reduce(
IntWritable sno,Iterable<SinfoWritable> values,Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
Iterator<SinfoWritable> it=values.iterator();
List<SinfoWritable> inflist=new ArrayList<SinfoWritable>();
StringBuilder rsBuilder=new StringBuilder();
while(it.hasNext()){
SinfoWritable inf=it.next();
inflist.add(inf);
}
if(inflist.size()==2){
//放了兩個元素
if(!inflist.get(0).getSname().equals("")){
//放入第一個元素 (1是學生信息 2是住處信息)
rsBuilder.append(inflist.get(0).getSno());
rsBuilder.append("\t");
rsBuilder.append(inflist.get(0).getSname());
rsBuilder.append("\t");
rsBuilder.append(inflist.get(0).getCourseno());
rsBuilder.append("\t");
rsBuilder.append(inflist.get(0).getClassname());
context.write(new Text(rsBuilder.toString()), NullWritable.get());
}else if (!inflist.get(1).getSname().equals("")) {
rsBuilder.append(inflist.get(1).getSno());
rsBuilder.append("\t");
rsBuilder.append(inflist.get(1).getSname());
rsBuilder.append("\t");
rsBuilder.append(inflist.get(1).getCourseno());
rsBuilder.append("\t");
rsBuilder.append(inflist.get(1).getClassname());
context.write(new Text(rsBuilder.toString()), NullWritable.get());
}
}
}
}
//自定义的学生信息序列化类
public static class SinfoWritable implements Writable{
private int sno;
private String sname;
private int courseno;
private String classname;
private String province;
private String city;
private String area;
private String addr;
public SinfoWritable() {
// TODO Auto-generated constructor stub
}
public int getSno() {
return sno;
}
public String getSname() {
return sname;
}
public int getCourseno() {
return courseno;
}
public String getClassname() {
return classname;
}
public String getProvince() {
return province;
}
public String getCity() {
return city;
}
public String getArea() {
return area;
}
public String getAddr() {
return addr;
}
public void set(int sno, String sname, int courseno, String classname, String province, String city,
String area, String addr){
this.sno = sno;
this.area = area;
this.city = city;
this.classname = classname;
this.courseno = courseno;
this.province = province;
this.sname = sname;
this.addr = addr;
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(sno);
out.writeInt(courseno);
out.writeUTF(classname);
out.writeUTF(sname);
out.writeUTF(addr);
out.writeUTF(area);
out.writeUTF(city);
out.writeUTF(province);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.sno = in.readInt();
this.courseno = in.readInt();
this.classname = in.readUTF();
this.sname = in.readUTF();
this.addr = in.readUTF();
this.area = in.readUTF();
this.city = in.readUTF();
this.province = in.readUTF();
}
@Override
public String toString() {
return "SinfoWritable [sno=" + sno + ", sname=" + sname
+ ", courseno=" + courseno + ", classname=" + classname
+ ", province=" + province + ", city=" + city + ", area="
+ area + ", addr=" + addr + "]";
}
}
}
四.问题出现
运行代码
当我自信满满的以为这个练习小case的时候,我运行了我的程序,没有任何异常和报错出现了这样的结果
运行结果
五.结果分析和尝试解决
结果分析
在stu.txt中有1~4四条信息,在stu_location.txt中有1,4两条信息,如果能够成功的进行链接join操作应该有2两条信息,分别是【1 张三 1 一班】【4 赵六 2 四班】但是只有 "1号张三" 的信息,运行结果明显是不正确的。
1.错误猜想
第一次看到执行结果,我以为是自己代码出了什么纰漏?
- 1 .write(DataOutput out)方法和readFields(DataInput in) 的参数变量的顺序是否一致(一致)
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(sno);
out.writeUTF(sname);
out.writeInt(courseno);
out.writeUTF(classname);
out.writeUTF(province);
out.writeUTF(city);
out.writeUTF(area);
out.writeUTF(addr);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.sno = in.readInt();
this.sname = in.readUTF();
this.courseno = in.readInt();
this.classname = in.readUTF();
this.province = in.readUTF();
this.city = in.readUTF();
this.area = in.readUTF();
this.addr = in.readUTF();
}
2.在map阶段的记录读取是否正确(正确,这里不做太多代码展示了)
3.reduce阶段的记录输出是否正确(问题所在)
我发现只有reduce在接受map传输过来的分组信息时是正确的 但是输出时完全不正确,即缺少了数据。
第一次尝试
为了发现问题的细节,我进行了如下操作
(1)我为SinfoWritable类添加了一个 getAll() 方法,为了获取其中的成员变量信息,便于输出观察
public String getAll(){
return sno + "," + sname
+ "," + courseno + "," + classname+","+province+","+city+","+area+","+","+addr;
}
(2)将reduce阶段的代码重写了一遍,并进行了分阶段输出(写的可能有些潦草,但是能说明问题)
①在map传过来分组信息后直接输出他的学号
②将每组的信息输出到inflist中,并且当时输出他的所有信息
③放入Inflist,之后再一次输出所有长度为2的Inflist元素信息
这里一定会有人问,你这不脱裤子放气——多此一举么,输出一次又输出一次?
不要心急,这里的“多此一举”就是我发现的问题所在
//Reducere
public static class JoinReducer extends Reducer<IntWritable, SinfoWritable, Text, NullWritable>{
@Override
protected void reduce(
IntWritable sno,Iterable<SinfoWritable> values,Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
Iterator<SinfoWritable> it=values.iterator();
List<SinfoWritable> inflist=new ArrayList<SinfoWritable>();
StringBuilder rsBuilder=new StringBuilder();
context.write(new Text(sno.get()+"号"), NullWritable.get());
while(it.hasNext()){
SinfoWritable inf=it.next();
inflist.add(inf);
//inf中的属性,在放入inflist之后丢失了?
context.write(new Text("list["+(inflist.size()-1)+"]---"+inflist.get(inflist.size()-1).getAll()), NullWritable.get());
}
if(inflist.size()==2){
context.write(new Text("list[0] :"+inflist.get(0).getAll()), NullWritable.get());
context.write(new Text("list[1] :"+inflist.get(1).getAll()), NullWritable.get());
}
}
}
第二次运行结果
第二次结果分析
不晓得各位发现问题没有,放入inflist集合中的元素
1.在第一次输出的时候,没有任何问题,不管是学生信息还是地址信息都输出了出来
(FIRST- -list[0]和list[1]),
2.然而同样的集合我在第二次输出的时候,怎么就都变成了同样的内容了呢???
(SECOND- -list[0]和list[1]都是学生信息 或都是地址信息)
哪怕一直到我最后解决问题,也没有弄明白个所以然,放入list中的元素,上一秒还是两条不同的信息,下一秒就变成一样的了。。。。这个问题我也是一直在调试和查阅资料,也没找到个所以然,最后找了个投机取巧的解决办法。
六.问题解决方案(不完善)
1.解决思路
将一切实例化对象推翻,直接都转换成字符串进行操作。
2.关键代码
(全部代码我会在最后贴出来,这里我们只看Reduce阶段的代码)
实现步骤
1.我将map向reduce发送的分组信息,从<IntWritable, SinfoWritable>类型改成了<IntWritable, Text>类型
2.我将Inflist的元素泛型由Text改变成了String类型,一切以字符串来操作(这里吧StringBuilder也删除了,一切从简)。
//Reducer
public static class JoinReducer extends Reducer<IntWritable, Text, Text, NullWritable>{
@Override
protected void reduce(
IntWritable sno,Iterable<Text> values,Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
Iterator<Text> it= values.iterator();
List<String> list=new ArrayList<String>();
context.write(new Text(sno.get()+"号"), NullWritable.get());
//要把接受过来的信息toString一下,不然可能会出现特殊的bug,传递对象的问题
while(it.hasNext()){
list.add(it.next().toString());
context.write(new Text("FIRST--list["+(list.size()-1)+"]---"+list.get(list.size()-1)), NullWritable.get());
//输出信息
}
if(list.size()==2){
context.write(new Text("SECOND--list[0] :"+list.get(0)), NullWritable.get());
context.write(new Text("SECOND--list[1] :"+list.get(1)), NullWritable.get());
}
}
}
3.最终结果
这里可以看到最终的输出结果,无论是第一次输出还是第二次输出,它的结果都是一样的。
七.思考
这个问题虽然说是投机取巧解决了,不过也引发了我的一些思索。我觉得最大的可能还是和Text这种可序列化类型有关系,涉及到一些文件流的操作,将它转换为字符串之后就没有这样的问题了。
然而我现阶段也没能理解的很透彻,也是阅历和经验不大足够,欢迎各位大佬留言指点,文章略长,也比较啰嗦,也是头一回分享,看到这里也是不容易了:),感谢!