欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

自定义分区随机分配数据倾斜问题的解决

程序员文章站 2022-04-14 10:15:30
1、第一阶段有三个文本待统计(设置分区的个数为3) package com.cr.skew; import org.apache.hadoop.io.IntWritable;...

1、第一阶段有三个文本待统计(设置分区的个数为3)

package com.cr.skew;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SkewMapper extends Mapper {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        System.out.println("进入mapper");
        String line = value.toString();
        String[] arr = line.split(" ");
        Text keyOut = new Text();
        IntWritable valueOut = new IntWritable();

        for (String s : arr){
            keyOut.set(s);
            valueOut.set(1);
            context.write(keyOut,valueOut);
        }



    }
}
package com.cr.skew;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SkewReducer extends Reducer {
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        int count = 0;
       for(IntWritable iw : values){
           count += iw.get();
       }
       context.write(key,new IntWritable(count));
    }
}
package com.cr.skew;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SkewApp {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //单例作业
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","file:///");
        Job job = Job.getInstance(conf);
        System.setProperty("hadoop.home.dir","E:\\hadoop-2.7.5");

        //设置job的各种属性
        job.setJobName("SkewApp");                 //设置job名称
        job.setJarByClass(SkewApp.class);              //设置搜索类
        job.setInputFormatClass(TextInputFormat.class);

        //设置输入路径
        FileInputFormat.addInputPath(job,new Path(("D:\\skew")));
        //设置输出路径
        Path path = new Path("D:\\skew\\out");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        FileOutputFormat.setOutputPath(job,path);

        job.setMapperClass(SkewMapper.class);               //设置mapper类
        job.setReducerClass(SkewReducer.class);               //设置reduecer类

        job.setMapOutputKeyClass(Text.class);            //设置之map输出key
        job.setMapOutputValueClass(IntWritable.class);   //设置map输出value

        job.setOutputKeyClass(Text.class);               //设置mapreduce 输出key
        job.setOutputValueClass(IntWritable.class);      //设置mapreduce输出value


        job.setNumReduceTasks(3);
        job.waitForCompletion(true);

    }

}
输出

part-r-00000

world3	3
part-r-00001

world1	3
world4	3
part-r-00002

hello	15
world2	3
world5	3

2、第二阶段设置随机分区函数

package com.cr.skew1;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import java.util.Random;

//自定义分区数
public class RandomPartition extends Partitioner{
    @Override
    public int getPartition(Text text, IntWritable intWritable, int numPartitioners) {
        //生成0-numPartitioners的随机数
        return new Random().nextInt(numPartitioners);
    }
}
输出三个分区

hello	7
world1	2
world2	1
world3	1
world5	1
hello	4
world2	2
world3	2
hello	4
world1	1
world4	3
world5	2

3、对上面的reduce聚合进行再次mapper_reducer聚合

package com.cr.skew1_stage2;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SkewMapper2 extends Mapper {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        System.out.println("进入mapper");
        String line = value.toString();
        String[] arr = line.split("\t");

        context.write(new Text(arr[0]),new IntWritable(Integer.parseInt(arr[1])));

    }
}
package com.cr.skew1_stage2;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SkewReducer1 extends Reducer {
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        int count = 0;
       for(IntWritable iw : values){
           count += iw.get();
       }
       context.write(key,new IntWritable(count));
    }
}
package com.cr.skew1_stage2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SkewApp2 {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //单例作业
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","file:///");
        Job job = Job.getInstance(conf);
        System.setProperty("hadoop.home.dir","E:\\hadoop-2.7.5");

        //设置job的各种属性
        job.setJobName("SkewApp2");                 //设置job名称
        job.setJarByClass(SkewApp2.class);              //设置搜索类
        job.setInputFormatClass(TextInputFormat.class);

        //设置输入路径
        FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00000")));
        FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00001")));
        FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00002")));
        //设置输出路径
        Path path = new Path("D:\\skew\\out2");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        FileOutputFormat.setOutputPath(job,path);

        job.setMapperClass(SkewMapper2.class);               //设置mapper类
        job.setReducerClass(SkewReducer1.class);               //设置reduecer类

        job.setMapOutputKeyClass(Text.class);            //设置之map输出key
        job.setMapOutputValueClass(IntWritable.class);   //设置map输出value

        job.setOutputKeyClass(Text.class);               //设置mapreduce 输出key
        job.setOutputValueClass(IntWritable.class);      //设置mapreduce输出value

        job.setNumReduceTasks(3);
        job.waitForCompletion(true);

    }

}
world3	3
world1	3
world4	3
hello	15
world2	3
world5	3
可以看到这里的结果和上面没有使用分区函数的结果是一样的

4、如果在stage2阶段将job输入格式转为KeyValueTextInputForma

就可以直接将第一阶段的输出作为key-value,而不用进行切割了

package com.cr.skew1_stage_version2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SkewApp2 {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //单例作业
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","file:///");
        Job job = Job.getInstance(conf);
        System.setProperty("hadoop.home.dir","E:\\hadoop-2.7.5");

        //设置job的各种属性
        job.setJobName("SkewApp2");                 //设置job名称
        job.setJarByClass(SkewApp2.class);              //设置搜索类
        job.setInputFormatClass(KeyValueTextInputFormat.class);

        //设置输入路径
        FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00000")));
        FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00001")));
        FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00002")));
        //设置输出路径
        Path path = new Path("D:\\skew\\out2");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        FileOutputFormat.setOutputPath(job,path);

        job.setMapperClass(SkewMapper2.class);               //设置mapper类
        job.setReducerClass(SkewReducer1.class);               //设置reduecer类

        job.setMapOutputKeyClass(Text.class);            //设置之map输出key
        job.setMapOutputValueClass(IntWritable.class);   //设置map输出value

        job.setOutputKeyClass(Text.class);               //设置mapreduce 输出key
        job.setOutputValueClass(IntWritable.class);      //设置mapreduce输出value

        job.setNumReduceTasks(3);
        job.waitForCompletion(true);

    }

}
查看源码可知
public class KeyValueTextInputFormat extends FileInputFormat {
    public KeyValueTextInputFormat() {
    }
这里的mapper输入为类型,text>

package com.cr.skew1_stage_version2;

import org.apache.commons.httpclient.methods.multipart.Part;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SkewMapper2 extends Mapper {

    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        System.out.println("进入mapper");

        context.write(key,new IntWritable(Integer.parseInt(value.toString())));

    }
}
这里的reducer不变

发现结果和上面也是一摸一样的,所以换成job的输入格式为KeyValueTextInputFormat,可以省很多事