自定义分区随机分配数据倾斜问题的解决
程序员文章站
2022-04-14 10:15:30
1、第一阶段有三个文本待统计(设置分区的个数为3)
package com.cr.skew;
import org.apache.hadoop.io.IntWritable;...
1、第一阶段有三个文本待统计(设置分区的个数为3)
package com.cr.skew; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SkewMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("进入mapper"); String line = value.toString(); String[] arr = line.split(" "); Text keyOut = new Text(); IntWritable valueOut = new IntWritable(); for (String s : arr){ keyOut.set(s); valueOut.set(1); context.write(keyOut,valueOut); } } }
package com.cr.skew; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SkewReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable iw : values){ count += iw.get(); } context.write(key,new IntWritable(count)); } }
package com.cr.skew; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class SkewApp { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //单例作业 Configuration conf = new Configuration(); conf.set("fs.defaultFS","file:///"); Job job = Job.getInstance(conf); System.setProperty("hadoop.home.dir","E:\\hadoop-2.7.5"); //设置job的各种属性 job.setJobName("SkewApp"); //设置job名称 job.setJarByClass(SkewApp.class); //设置搜索类 job.setInputFormatClass(TextInputFormat.class); //设置输入路径 FileInputFormat.addInputPath(job,new Path(("D:\\skew"))); //设置输出路径 Path path = new Path("D:\\skew\\out"); FileSystem fs = FileSystem.get(conf); if (fs.exists(path)) { fs.delete(path, true); } FileOutputFormat.setOutputPath(job,path); job.setMapperClass(SkewMapper.class); //设置mapper类 job.setReducerClass(SkewReducer.class); //设置reduecer类 job.setMapOutputKeyClass(Text.class); //设置之map输出key job.setMapOutputValueClass(IntWritable.class); //设置map输出value job.setOutputKeyClass(Text.class); //设置mapreduce 输出key job.setOutputValueClass(IntWritable.class); //设置mapreduce输出value job.setNumReduceTasks(3); job.waitForCompletion(true); } }输出
part-r-00000
world3 3part-r-00001
world1 3 world4 3part-r-00002
hello 15 world2 3 world5 3
2、第二阶段设置随机分区函数
package com.cr.skew1; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; import java.util.Random; //自定义分区数 public class RandomPartition extends Partitioner{ @Override public int getPartition(Text text, IntWritable intWritable, int numPartitioners) { //生成0-numPartitioners的随机数 return new Random().nextInt(numPartitioners); } }输出三个分区
hello 7 world1 2 world2 1 world3 1 world5 1
hello 4 world2 2 world3 2
hello 4 world1 1 world4 3 world5 2
3、对上面的reduce聚合进行再次mapper_reducer聚合
package com.cr.skew1_stage2; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SkewMapper2 extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("进入mapper"); String line = value.toString(); String[] arr = line.split("\t"); context.write(new Text(arr[0]),new IntWritable(Integer.parseInt(arr[1]))); } }
package com.cr.skew1_stage2; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SkewReducer1 extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable iw : values){ count += iw.get(); } context.write(key,new IntWritable(count)); } }
package com.cr.skew1_stage2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class SkewApp2 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //单例作业 Configuration conf = new Configuration(); conf.set("fs.defaultFS","file:///"); Job job = Job.getInstance(conf); System.setProperty("hadoop.home.dir","E:\\hadoop-2.7.5"); //设置job的各种属性 job.setJobName("SkewApp2"); //设置job名称 job.setJarByClass(SkewApp2.class); //设置搜索类 job.setInputFormatClass(TextInputFormat.class); //设置输入路径 FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00000"))); FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00001"))); FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00002"))); //设置输出路径 Path path = new Path("D:\\skew\\out2"); FileSystem fs = FileSystem.get(conf); if (fs.exists(path)) { fs.delete(path, true); } FileOutputFormat.setOutputPath(job,path); job.setMapperClass(SkewMapper2.class); //设置mapper类 job.setReducerClass(SkewReducer1.class); //设置reduecer类 job.setMapOutputKeyClass(Text.class); //设置之map输出key job.setMapOutputValueClass(IntWritable.class); //设置map输出value job.setOutputKeyClass(Text.class); //设置mapreduce 输出key job.setOutputValueClass(IntWritable.class); //设置mapreduce输出value job.setNumReduceTasks(3); job.waitForCompletion(true); } }
world3 3
world1 3 world4 3
hello 15 world2 3 world5 3可以看到这里的结果和上面没有使用分区函数的结果是一样的
4、如果在stage2阶段将job输入格式转为KeyValueTextInputForma
就可以直接将第一阶段的输出作为key-value,而不用进行切割了
package com.cr.skew1_stage_version2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class SkewApp2 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //单例作业 Configuration conf = new Configuration(); conf.set("fs.defaultFS","file:///"); Job job = Job.getInstance(conf); System.setProperty("hadoop.home.dir","E:\\hadoop-2.7.5"); //设置job的各种属性 job.setJobName("SkewApp2"); //设置job名称 job.setJarByClass(SkewApp2.class); //设置搜索类 job.setInputFormatClass(KeyValueTextInputFormat.class); //设置输入路径 FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00000"))); FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00001"))); FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00002"))); //设置输出路径 Path path = new Path("D:\\skew\\out2"); FileSystem fs = FileSystem.get(conf); if (fs.exists(path)) { fs.delete(path, true); } FileOutputFormat.setOutputPath(job,path); job.setMapperClass(SkewMapper2.class); //设置mapper类 job.setReducerClass(SkewReducer1.class); //设置reduecer类 job.setMapOutputKeyClass(Text.class); //设置之map输出key job.setMapOutputValueClass(IntWritable.class); //设置map输出value job.setOutputKeyClass(Text.class); //设置mapreduce 输出key job.setOutputValueClass(IntWritable.class); //设置mapreduce输出value job.setNumReduceTasks(3); job.waitForCompletion(true); } }查看源码可知
public class KeyValueTextInputFormat extends FileInputFormat { public KeyValueTextInputFormat() { }这里的mapper输入为
package com.cr.skew1_stage_version2; import org.apache.commons.httpclient.methods.multipart.Part; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SkewMapper2 extends Mapper { @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { System.out.println("进入mapper"); context.write(key,new IntWritable(Integer.parseInt(value.toString()))); } }这里的reducer不变
发现结果和上面也是一摸一样的,所以换成job的输入格式为KeyValueTextInputFormat,可以省很多事
推荐阅读
-
自定义分区随机分配数据倾斜问题的解决
-
2个自定义的PHP in_array 函数,解决大量数据判断in_array的效率问题
-
使用pyspark模仿sqoop从oracle导数据到hive的主要功能(自动建表,分区导入,增量,解决数据换行符问题)
-
2个自定义的PHP in_array 函数,解决大量数据判断in_array的效率问题_php实例
-
2个自定义的PHP in_array 函数,解决大量数据判断in_array的效率问题_php实例
-
2个自定义的PHP in_array 函数,解决大量数据判断in_array的效率问题
-
2个自定义的PHP in_array 函数,解决大量数据判断in_array的效率问题_PHP教程
-
使用pyspark模仿sqoop从oracle导数据到hive的主要功能(自动建表,分区导入,增量,解决数据换行符问题)
-
自定义分区随机分配数据倾斜问题的解决
-
2个自定义的PHP in_array 函数,解决大量数据判断in_array的效率问题