hadoop编程(6)-MapReduce案例:Partitioner应用实例——全局排序
目标
以<温度,年份>
格式,且按温度由低到高的顺序输出
-333 1901
-328 1901
-328 1902
-322 1902
-311 1902
-311 1902
-300 1902
-300 1902
……
306 1901
311 1901
311 1901
317 1901
最简单的方式:一个分区
方案:map函数的输出是<温度,年份>
,只使用一个分区,这样只有一个输出文件,这个文件中的数据就是前面提出的目标格式。
为什么这样可以?
因为只有一个分区,map的所有输出都交给一个reduce任务来处理,数据进入reduce之前会被归并,而归并就是要按key来排序的,相同温度的年份会归并为列表,而reduce的默认输出行为是迭代value-list按行输出。
代码
public class SortUsingHashPartition extends Configured implements Tool {
public static class SortUsingTextMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
private static final int MISSING = 9999;
@Override
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
String line = value.toString(); // 整行的数据
String year = line.substring(15, 19); // 年份
int airTemperature; // 某次记录的温度
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93); // 质量代码
//提取有效数据
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new IntWritable(airTemperature),new Text(year));
}
}
}
@Override
public int run(String[] args) throws Exception {
Job job = JobUtil.getJob(this, args);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(SortUsingTextMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Tool tool = new SortUsingHashPartition();
DriverUtil.runLocal(tool,args);
}
}
程序参数
src/main/resources/weather output
问题:牺牲了并行性
很显然,这只适用于小数据量,因为大数据量发送给一个reduce程序,肯定会内存溢出。而且这样做,没有扩展性,无法通过增加reduce来加快数据处理速度。
使用HashPartitioner,设置多分区
我们自然会想到,使用多分区即可,这样就有了水平扩展性,但这带来新的问题,数据是局部有序(一个reduce任务的输出文件),整体上是无序的。
我们可以这样调整程序参数来试验:
-D mapreduce.job.reduces=30 src/main/resources/weather output
-D mapreduce.job.reduces=30
设置使用30个Reducer来归约,也即使用30个分区,但结果是这样的:
生成了30个输出文件,每个文件里面的数据是有序的,但是part0并不整体小于part1,所以我们的输出是非全局有序的。
这是因为我们不设置任何分区器的话,MapReduce框架默认采用HashPartitioner来做分区,而它的行为是对key求hash分配到30个桶但并不保障桶的整体有序:
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
自定义分区器
思路:我们需要改变hash函数,用不同的方式对key进行分区。我们采用类似桶排序的思路来对关键字进行运算,0号桶的全部元素整体小于1号桶。
这个hash算法是这样的:
// 根据桶的个数来确定hash函数,这份代码适合桶的个数等于数组长度
private static int hash(int element, int max, int length) {
return (element * length) / (max + 1);
}
这需要我们事先取得所有关键字中的最大值;
另外,因为温度中有负数,而分区编号不能有负数,我们还要进行一点特殊的处理。
整体的分区器代码如下:
public class TotalSortPartitioner extends Partitioner<IntWritable,Text> {
@Override
public int getPartition(IntWritable intWritable, Text text, int numPartitions) {
//333:最低温度-333;317:最高温度317
int pa= ((intWritable.get()+333)* numPartitions) /(317+333+1);
return pa;
}
}
关联分区器
还是上一份代码SortUsingHashPartition
,里面增加一句代码:
@Override
public int run(String[] args) throws Exception {
Job job = JobUtil.getJob(this, args);
……
job.setPartitionerClass(TotalSortPartitioner.class);
……
return job.waitForCompletion(true) ? 0 : 1;
}
运行参数与运行结果
参数:
-D mapreduce.job.reduces=30 src/main/resources/weather output
结果:
30个分区文件
part0:
-333 1901
-328 1901
-328 1902
-322 1902
part1:
-311 1902
-311 1902
-300 1902
-300 1902
-300 1902
-300 1901
-294 1902
……
part29:
300 1901
300 1901
306 1901
306 1901
306 1901
306 1901
311 1901
311 1901
317 1901
这30个文件按顺序合并就是一个整体按温度排序的结果了。
总结
默认的HashPartitioner是按hashcode来对key分区的,这会导致数值接近的key无规律散落在各桶中,从而无法做到全局有序。
因此我们需要自定义分区器,让桶与桶之内的元素整体有序(桶0整体<桶1整体……<桶29整体)。
具体实现在文中,为配合分区算法,我们事先需要求得最大和最小值(因为有负数)。
本例实现方式的问题在于:关键值key的数值并非均匀分布在30个桶内,这将导致reduce任务不均衡,数据量越大,处理时间上的差别越明显。更好的方式是先取样统计,然后来划分每个桶所装的数值范围,尽量让每个桶处理的数据量均衡。
下一章我们将解读《Hadoop权威指南》中的实现方式。