欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

hadoop编程(6)-MapReduce案例:Partitioner应用实例——全局排序

程序员文章站 2022-07-14 13:30:08
...

目标

<温度,年份>格式,且按温度由低到高的顺序输出

-333    1901
-328    1901
-328    1902
-322    1902
-311    1902
-311    1902
-300    1902
-300    1902
……
306 1901
311 1901
311 1901
317 1901

最简单的方式:一个分区

方案:map函数的输出是<温度,年份>,只使用一个分区,这样只有一个输出文件,这个文件中的数据就是前面提出的目标格式。

为什么这样可以?

因为只有一个分区,map的所有输出都交给一个reduce任务来处理,数据进入reduce之前会被归并,而归并就是要按key来排序的,相同温度的年份会归并为列表,而reduce的默认输出行为是迭代value-list按行输出。

代码

public class SortUsingHashPartition extends Configured implements Tool {
  public static class SortUsingTextMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
    private static final int MISSING = 9999;
    @Override
    protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
      String line = value.toString(); // 整行的数据
      String year = line.substring(15, 19); // 年份
      int airTemperature;  // 某次记录的温度
      if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
        airTemperature = Integer.parseInt(line.substring(88, 92));
      } else {
        airTemperature = Integer.parseInt(line.substring(87, 92));
      }
      String quality = line.substring(92, 93); // 质量代码
      //提取有效数据
      if (airTemperature != MISSING && quality.matches("[01459]")) {
        context.write(new IntWritable(airTemperature),new Text(year));
      }
    }
  }
  @Override
  public int run(String[] args) throws Exception {
    Job job = JobUtil.getJob(this, args);
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapperClass(SortUsingTextMapper.class);
    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(Text.class);

    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);

    return job.waitForCompletion(true) ? 0 : 1;
  }

  public static void main(String[] args) throws Exception {
    Tool tool = new SortUsingHashPartition();
    DriverUtil.runLocal(tool,args);
  }
}

程序参数

src/main/resources/weather output

问题:牺牲了并行性

很显然,这只适用于小数据量,因为大数据量发送给一个reduce程序,肯定会内存溢出。而且这样做,没有扩展性,无法通过增加reduce来加快数据处理速度。

使用HashPartitioner,设置多分区

我们自然会想到,使用多分区即可,这样就有了水平扩展性,但这带来新的问题,数据是局部有序(一个reduce任务的输出文件),整体上是无序的。

我们可以这样调整程序参数来试验:

-D mapreduce.job.reduces=30 src/main/resources/weather output

-D mapreduce.job.reduces=30设置使用30个Reducer来归约,也即使用30个分区,但结果是这样的:

hadoop编程(6)-MapReduce案例:Partitioner应用实例——全局排序

生成了30个输出文件,每个文件里面的数据是有序的,但是part0并不整体小于part1,所以我们的输出是非全局有序的。

这是因为我们不设置任何分区器的话,MapReduce框架默认采用HashPartitioner来做分区,而它的行为是对key求hash分配到30个桶但并不保障桶的整体有序:

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

自定义分区器

思路:我们需要改变hash函数,用不同的方式对key进行分区。我们采用类似桶排序的思路来对关键字进行运算,0号桶的全部元素整体小于1号桶。
这个hash算法是这样的:

  // 根据桶的个数来确定hash函数,这份代码适合桶的个数等于数组长度
  private static int hash(int element, int max, int length) {
    return (element * length) / (max + 1);
  }

这需要我们事先取得所有关键字中的最大值;
另外,因为温度中有负数,而分区编号不能有负数,我们还要进行一点特殊的处理。

整体的分区器代码如下:

public class TotalSortPartitioner extends Partitioner<IntWritable,Text> {
  @Override
  public int getPartition(IntWritable intWritable, Text text, int numPartitions) {
    //333:最低温度-333;317:最高温度317
    int pa= ((intWritable.get()+333)* numPartitions) /(317+333+1);
    return pa;
  }
}

关联分区器

还是上一份代码SortUsingHashPartition,里面增加一句代码:

  @Override
  public int run(String[] args) throws Exception {
    Job job = JobUtil.getJob(this, args);
    ……
    job.setPartitionerClass(TotalSortPartitioner.class);
    ……
    return job.waitForCompletion(true) ? 0 : 1;
  }

运行参数与运行结果

参数:

-D mapreduce.job.reduces=30 src/main/resources/weather output

结果:
30个分区文件
part0:

-333    1901
-328    1901
-328    1902
-322    1902

part1:

-311    1902
-311    1902
-300    1902
-300    1902
-300    1902
-300    1901
-294    1902

……
part29:

300 1901
300 1901
306 1901
306 1901
306 1901
306 1901
311 1901
311 1901
317 1901

这30个文件按顺序合并就是一个整体按温度排序的结果了。

总结

默认的HashPartitioner是按hashcode来对key分区的,这会导致数值接近的key无规律散落在各桶中,从而无法做到全局有序。

因此我们需要自定义分区器,让桶与桶之内的元素整体有序(桶0整体<桶1整体……<桶29整体)。

具体实现在文中,为配合分区算法,我们事先需要求得最大和最小值(因为有负数)。

本例实现方式的问题在于:关键值key的数值并非均匀分布在30个桶内,这将导致reduce任务不均衡,数据量越大,处理时间上的差别越明显。更好的方式是先取样统计,然后来划分每个桶所装的数值范围,尽量让每个桶处理的数据量均衡。

下一章我们将解读《Hadoop权威指南》中的实现方式。