MapReduce二次排序
程序员文章站
2022-03-02 16:58:13
...
默认情况下,Map 输出的结果会对 Key 进行默认的排序,但是有时候需要对 Key 排序的同时再对 Value 进行排序,这时候就要用到二次排序了。下面让我们来介绍一下什么是二次排序。
二次排序原理
我们把二次排序主要分为以下几个阶段。
Map 起始阶段
在Map阶段,使用 job.setInputFormatClass() 定义的 InputFormat ,将输入的数据集分割成小数据块 split,同时 InputFormat 提供一个 RecordReader的实现。本课程中使用的是 TextInputFormat,它提供的 RecordReader 会将文本的行号作为 Key,这一行的文本作为 Value。这就是自定义 Mapper 的输入是 < LongWritable,Text> 的原因。 然后调用自定义 Mapper 的map方法,将一个个< LongWritable,Text>键值对输入给 Mapper 的 map方法。
Map 最后阶段
在 Map 阶段的最后,会先调用 job.setPartitionerClass() 对这个 Mapper 的输出结果进行分区,每个分区映射到一个Reducer。每个分区内又调用 job.setSortComparatorClass() 设置的 Key 比较函数类排序。 可以看到,这本身就是一个二次排序。如果没有通过 job.setSortComparatorClass() 设置 Key 比较函数类,则使用 Key 实现的 compareTo() 方法。我们既可以使用 IntPair 实现的 compareTo() 方法,也可以专门定义 Key 比较函数类。
Reduce 阶段
在 Reduce 阶段,reduce() 方法接受所有映射到这个 Reduce 的 map 输出后,也是会调用 job.setSortComparatorClass()方法设置的 Key 比较函数类,对所有数据进行排序。然后开始构造一个 Key 对应的 Value 迭代器。 这时就要用到分组,使用 job.setGroupingComparatorClass()方法设置分组函数类。只要这个比较器比较的两个 Key 相同,它们就属于同一组,它们的 Value 放在一个 Value 迭代器,而这个迭代器的 Key 使用属于同一个组的所有Key的第一个Key。最后就是进入 Reducer 的 reduce() 方法,reduce() 方法的输入是所有的 Key 和它的 Value 迭代器,同样注意输入与输出的类型必须与自定义的 Reducer 中声明的一致。
接下来我们通过数据示例,可以很直观的了解二次排序的原理。
输入文件sort.txt(下载)内容为:
40 20
40 10
40 30
40 5
30 30
30 20
30 10
30 40
50 20
50 50
50 10
50 60
输出文件的内容(从小到大排序)如下:
30 10
30 20
30 30
30 40
==============================
40 5
40 10
40 20
40 30
==============================
50 10
50 20
50 50
50 60
二次排序的具体流程
在 MapReduce 中,所有的 Key 是需要被比较和排序的,而且是二次,先根据 Partitioner,再根据大小。而本例中也是要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类 IntPair ,它有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。
代码实现
Hadoop 的 example 包中自带了一个 MapReduce 的二次排序算法,下面这个示例对 example 包中的二次排序源码的改进。 我们按照以下几步完成二次排序:
第一步:自定义IntPair类,将示例数据中的key/value封装成一个整体作为Key,同时实现 WritableComparable 接口并重写其方法。
第二步:自定义分区函数类FirstPartitioner,根据 IntPair 中的first实现分区。
第三步:自定义 SortComparator 实现 IntPair 类中的first和second排序。本次中没有使用这种方法,而是使用 IntPair 中的compareTo()方法实现的。
第四步:自定义 GroupingComparator 类,实现分区内的数据分组。
第五步:编写 MapReduce 主程序实现二次排序。
二次排序原理
我们把二次排序主要分为以下几个阶段。
Map 起始阶段
在Map阶段,使用 job.setInputFormatClass() 定义的 InputFormat ,将输入的数据集分割成小数据块 split,同时 InputFormat 提供一个 RecordReader的实现。本课程中使用的是 TextInputFormat,它提供的 RecordReader 会将文本的行号作为 Key,这一行的文本作为 Value。这就是自定义 Mapper 的输入是 < LongWritable,Text> 的原因。 然后调用自定义 Mapper 的map方法,将一个个< LongWritable,Text>键值对输入给 Mapper 的 map方法。
Map 最后阶段
在 Map 阶段的最后,会先调用 job.setPartitionerClass() 对这个 Mapper 的输出结果进行分区,每个分区映射到一个Reducer。每个分区内又调用 job.setSortComparatorClass() 设置的 Key 比较函数类排序。 可以看到,这本身就是一个二次排序。如果没有通过 job.setSortComparatorClass() 设置 Key 比较函数类,则使用 Key 实现的 compareTo() 方法。我们既可以使用 IntPair 实现的 compareTo() 方法,也可以专门定义 Key 比较函数类。
Reduce 阶段
在 Reduce 阶段,reduce() 方法接受所有映射到这个 Reduce 的 map 输出后,也是会调用 job.setSortComparatorClass()方法设置的 Key 比较函数类,对所有数据进行排序。然后开始构造一个 Key 对应的 Value 迭代器。 这时就要用到分组,使用 job.setGroupingComparatorClass()方法设置分组函数类。只要这个比较器比较的两个 Key 相同,它们就属于同一组,它们的 Value 放在一个 Value 迭代器,而这个迭代器的 Key 使用属于同一个组的所有Key的第一个Key。最后就是进入 Reducer 的 reduce() 方法,reduce() 方法的输入是所有的 Key 和它的 Value 迭代器,同样注意输入与输出的类型必须与自定义的 Reducer 中声明的一致。
接下来我们通过数据示例,可以很直观的了解二次排序的原理。
输入文件sort.txt(下载)内容为:
40 20
40 10
40 30
40 5
30 30
30 20
30 10
30 40
50 20
50 50
50 10
50 60
输出文件的内容(从小到大排序)如下:
30 10
30 20
30 30
30 40
==============================
40 5
40 10
40 20
40 30
==============================
50 10
50 20
50 50
50 60
二次排序的具体流程
在 MapReduce 中,所有的 Key 是需要被比较和排序的,而且是二次,先根据 Partitioner,再根据大小。而本例中也是要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类 IntPair ,它有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。
代码实现
Hadoop 的 example 包中自带了一个 MapReduce 的二次排序算法,下面这个示例对 example 包中的二次排序源码的改进。 我们按照以下几步完成二次排序:
第一步:自定义IntPair类,将示例数据中的key/value封装成一个整体作为Key,同时实现 WritableComparable 接口并重写其方法。
/** * 自己定义的key类应该实现WritableComparable接口 */ public class IntPair implements WritableComparable<IntPair>{ int first;//第一个成员变量 int second;//第二个成员变量 public void set(int left, int right){ first = left; second = right; } public int getFirst(){ return first; } public int getSecond(){ return second; } @Override //反序列化,从流中的二进制转换成IntPair public void readFields(DataInput in) throws IOException{ first = in.readInt(); second = in.readInt(); } @Override //序列化,将IntPair转化成使用流传送的二进制 public void write(DataOutput out) throws IOException{ out.writeInt(first); out.writeInt(second); } @Override //key的比较 public int compareTo(IntPair o) { // TODO Auto-generated method stub if (first != o.first){ return first < o.first ? -1 : 1; }else if (second != o.second){ return second < o.second ? -1 : 1; }else{ return 0; } } @Override public int hashCode(){ return first * 157 + second; } @Override public boolean equals(Object right){ if (right == null) return false; if (this == right) return true; if (right instanceof IntPair){ IntPair r = (IntPair) right; return r.first == first && r.second == second; }else{ return false; } } }
第二步:自定义分区函数类FirstPartitioner,根据 IntPair 中的first实现分区。
第三步:自定义 SortComparator 实现 IntPair 类中的first和second排序。本次中没有使用这种方法,而是使用 IntPair 中的compareTo()方法实现的。
第四步:自定义 GroupingComparator 类,实现分区内的数据分组。
/** *继承WritableComparator */ public static class GroupingComparator extends WritableComparator{ protected GroupingComparator(){ super(IntPair.class, true); } @Override //Compare two WritableComparables. public int compare(WritableComparable w1, WritableComparable w2){ IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; int l = ip1.getFirst(); int r = ip2.getFirst(); return l == r ? 0 : (l < r ? -1 : 1); } }
第五步:编写 MapReduce 主程序实现二次排序。
public class SecondarySort{ // 自定义map public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>{ private final IntPair intkey = new IntPair(); private final IntWritable intvalue = new IntWritable(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); int left = 0; int right = 0; if (tokenizer.hasMoreTokens()){ left = Integer.parseInt(tokenizer.nextToken()); if (tokenizer.hasMoreTokens()) right = Integer.parseInt(tokenizer.nextToken()); intkey.set(left, right); intvalue.set(right); context.write(intkey, intvalue); } } } // 自定义reduce public static class Reduce extends Reducer< IntPair, IntWritable, Text, IntWritable>{ private final Text left = new Text(); public void reduce(IntPair key, Iterable< IntWritable> values,Context context) throws IOException, InterruptedException{ left.set(Integer.toString(key.getFirst())); for (IntWritable val : values){ context.write(left, val); } } } /** * @param args */ public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{ // TODO Auto-generated method stub Configuration conf = new Configuration(); Job job = new Job(conf, "secondarysort"); job.setJarByClass(SecondarySort.class); FileInputFormat.setInputPaths(job, new Path(args[0]));//输入路径 FileOutputFormat.setOutputPath(job, new Path(args[1]));//输出路径 job.setMapperClass(Map.class);// Mapper job.setReducerClass(Reduce.class);// Reducer job.setPartitionerClass(FirstPartitioner.class);// 分区函数 //job.setSortComparatorClass(KeyComparator.Class);//本课程并没有自定义SortComparator,而是使用IntPair自带的排序 job.setGroupingComparatorClass(GroupingComparator.class);// 分组函数 job.setMapOutputKeyClass(IntPair.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
上一篇: HDFS 和YARN HA 简介
下一篇: cdh集群数据恢复