7 5
-9999 1
3 95
-9999 5
2 7
1 2
4 62
4 13
2 99
1 8
7 8888
-9999 1
-9999 5
1 2
1 8
2 7
2 99
3 95
4 13
4 62
7 5
7 8888
假设每一行以空格划分的两个int型数据分别为int1、int2,那么最简单的思路是:mapper以每一行数据作为输入,输出键值对为<int1, int2>,由于我们知道在reducer运行之前,数据会先按照key也就是int1排序,那么int1相同的数据就将合并到一起供同一个reducer进行处理,那么我们便可以在reduce函数中对输入的<int1, [int2-list]>按照int2升序操作即可。
- 其分区逻辑为:只对int1进行分区(默认的分区操作是以整个key进行哈希操作的,这就可能把有同样int1的组合key发送给不同的reducer,这显然不是我们想要的);
- 其排序逻辑为:先对int1排序,在int1相同的基础上对int2排序(即是二次排序的逻辑);
- 其分组逻辑为:只对int1进行分组(保持与原逻辑一致,int1相同的数据可以在一次reduce函数被调用时一同被处理)。
- 要是可序列化的就得实现readfiels()和write()这两个序列化和反序列化函数
- 要是可比较的就得实现compareto()函数,该函数即是排序规则的实现
public static class intpair implements writablecomparable<intpair> { private int first = 0; private int second = 0; public void set(int left, int right) { first = left; second = right; } public int getfirst() { return first; } public int getsecond() { return second; } @override public void readfields(datainput in) throws ioexception { first = in.readint(); second = in.readint(); }
@override public void write(dataoutput out) throws ioexception { out.writeint(first); out.writeint(second); } @override public int compareto(intpair other) { if (first != other.first) { return first < other.first ? -1 : 1; } else if (second != other.second) { return second < other.second ? -1 : 1; } else { return 0; } } }
public static class firstpartitioner extends partitioner<intpair,intwritable>{ @override public int getpartition(intpair key, intwritable value, int numpartitions) { return math.abs(key.getfirst() % numpartitions);
由于分区只针对int1,所以这里进行哈希时只使用到了key.getfirst()。由于分区的标号只能是0到numpartitions-1的整数,所以getpartition()函数中就要个取模操作。同时为了保证分区的结果为正,这里最后要取最绝对值。如果不在0到numpartitions-1范围内就会报illegal partition的错误。
这样在通过添加 job.setpartitionerclass(firstpartitioner.class); 就可以实现设置了。
@override public int hashcode() { return first; } @override public boolean equals(object other) { if (other instanceof intpair) { intpair o = (intpair) other; return o.first == first && o.second == second; } else { return false; } }
在java中hashcode()函数和equals函数基本上是成对实现的,关于hashcode()函数的设计方式可参考:hashcode 方法及 equals 方法的规范,一般对于int型数据,其哈希值就是其本来的值,所以这里直接返回first而不需要进行什么乘法或取模运算。
public class hashpartitioner<k2, v2> implements partitioner<k2, v2> { public void configure(jobconf job) {} /** use {@link object#hashcode()} to partition. */ public int getpartition(k2 key, v2 value, int numreducetasks) { return (key.hashcode() & integer.max_value) % numreducetasks; } }
关于为什么要将key的哈希值和int最大值相与可参考what does that mean for text.hashcode() & interger.max_value?
实现方式一:继承(extends)writablecomparator 类
public static class firstgroupingcomparator extends writablecomparator{ protected firstgroupingcomparator() { super(intpair.class, true); } @override public int compare(writablecomparable w1, writablecomparable w2) { intpair key1 = (intpair) w1; intpair key2 = (intpair) w2; int l = key1.getfirst(); int r = key2.getfirst(); return l == r ? 0 : (l < r ? -1 : 1); } }
实现方式二:实现(implements)rawcomparator 接口
public static class firstgroupingcomparator implements rawcomparator<intpair> { @override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return writablecomparator.comparebytes(b1, s1, integer.size/8, b2, s2, integer.size/8); } @override public int compare(intpair o1, intpair o2) { int l = o1.getfirst(); int r = o2.getfirst(); return l == r ? 0 : (l < r ? -1 : 1); } }
- 对于第一个compare函数,其直接进行的是字节流上的比较,省去了反序列化的操作,比较效率会高一点,但是也相对难懂了一点,首先查看其在父类的参数的定义说明:
/** * compare two objects in binary. * b1[s1:l1] is the first object, and b2[s2:l2] is the second object. * * @param b1 the first byte array. * @param s1 the position index in b1. the object under comparison's starting index. * @param l1 the length of the object in b1. * @param b2 the second byte array. * @param s2 the position index in b2. the object under comparison's starting index. * @param l2 the length of the object under comparison in b2. * @return an integer result of the comparison. */ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
/** lexicographic order of binary data. */ public static int comparebytes(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return fastbytecomparisons.compareto(b1, s1, l1, b2, s2, l2); }
观察compare函数以及其调用的comparebytes函数,这两者的输入参数的命名都是一样的,所以它们对应的意义也应该一样,而对于传给comparebytes函数的参数 l1 和 l2 只需要设置为integer.size/8(也就是4个字节的长度,刚好是一个int型数据的字节长度数目,这样就达到了只比较intpair的 first 部分的值的目的,从而实现分组的逻辑)。
public static int bytes2int(byte[] b, int start, int len) { int sum = 0; int end = start + len; for (int i = start; i < end; i++) { int n = ((int)b[i]) & 0xff; n <<= (--len) * 8; sum += n; } return sum; }
- 对于第二个compare函数,其直接比较intpair的第一个值,思路简单,但是有个问题时,这个函数必须重载但是实际上在二次排序中并没有运行该函数,不知道重载了有什么用。
首先,这两种方式要起效通过 job.setgroupingcomparatorclass(firstgroupingcomparator.class); 即可。
/** * read two integers from each line and generate a key, value pair * as ((left, right), right). */ public static class mapclass extends mapper<longwritable, text, intpair, intwritable> { private final intpair key = new intpair(); private final intwritable value = new intwritable(); @override public void map(longwritable inkey, text invalue, context context) throws ioexception, interruptedexception { stringtokenizer itr = new stringtokenizer(invalue.tostring()); int left = 0; int right = 0; if (itr.hasmoretokens()) { left = integer.parseint(itr.nexttoken()); if (itr.hasmoretokens()) { right = integer.parseint(itr.nexttoken()); } key.set(left, right); value.set(right); context.write(key, value); } } }
实现思路很简单,就是读取每一行的数据,得到其中的两个int数据left和right,进一步得到键值对<(left, right), right>。
但是实现起来还是有一点要注意的:mapper的map函数在实践中会被调用很多次,所以一些能够声明在map函数之外的变量就不要声明在map函数里面,比如这里的private final的int型变量key和value就声明在map函数之外,在map函数调用的过程中它们每一次都设置新的值,而新的值通过context.write函数执行之后这两个变量又可以复用了。而如果声明在map函数里面,则可能会存在频繁地调用map函数处理每一行输入的数据,这个过程中不断地new变量不断地delete变量,效率上有点影响的。
/** * a reducer class that just emits the sum of the input values. */ public static class reduce extends reducer<intpair, intwritable, text, intwritable> { private static final text separator = new text("------------------------------------------------"); private final text first = new text(); @override public void reduce(intpair key, iterable<intwritable> values, context context) throws ioexception, interruptedexception { context.write(separator, null); first.set(integer.tostring(key.getfirst())); for(intwritable value: values) { context.write(first, value); } } }
package test.linzch3; import; import; import; import java.util.comparator; import java.util.stringtokenizer; import java.util.function.function; import java.util.function.todoublefunction; import java.util.function.tointfunction; import java.util.function.tolongfunction; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import; import; import; import; import; import; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mrjobconfig; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.partitioner; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.util.genericoptionsparser; /** * this is an example hadoop map/reduce application. * it reads the text input files that must contain two integers per a line. * the output is sorted by the first and second number and grouped on the * first number. * * to run: bin/hadoop jar build/hadoop-examples.jar secondarysort * <i>in-dir</i> <i>out-dir</i> */ public class secondarysort { /** * define a pair of integers that are writable. * they are serialized in a byte comparable format. */ public static class intpair implements writablecomparable<intpair> { private int first = 0; private int second = 0; public void set(int left, int right) { first = left; second = right; } public int getfirst() { return first; } public int getsecond() { return second; } @override public void readfields(datainput in) throws ioexception { first = in.readint(); second = in.readint(); } @override public void write(dataoutput out) throws ioexception { out.writeint(first); out.writeint(second); } @override public int compareto(intpair other) { if (first != other.first) { return first < other.first ? -1 : 1; } else if (second != other.second) { return second < other.second ? -1 : 1; } else { return 0; } } // @override // public int hashcode() { // return first; // } // // @override // public boolean equals(object other) { // if (other instanceof intpair) { // intpair o = (intpair) other; // return o.first == first && o.second == second; // } else { // return false; // } // } } /** * partition based on the first part of the pair. */ public static class firstpartitioner extends partitioner<intpair,intwritable>{ @override public int getpartition(intpair key, intwritable value, int numpartitions) { return math.abs(key.getfirst() % numpartitions); } } /** * compare only the first part of the pair, so that reduce is called once * for each value of the first part. */ // public static class firstgroupingcomparator extends writablecomparator{ // protected firstgroupingcomparator() // { // super(intpair.class, true); // } // @override // public int compare(writablecomparable w1, writablecomparable w2) // { // intpair key1 = (intpair) w1; // intpair key2 = (intpair) w2; // int l = key1.getfirst(); // int r = key2.getfirst(); // return l == r ? 0 : (l < r ? -1 : 1); // } // } public static class firstgroupingcomparator implements rawcomparator<intpair> { @override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return writablecomparator.comparebytes(b1, s1, integer.size/8, b2, s2, integer.size/8); } @override public int compare(intpair o1, intpair o2) { int l = o1.getfirst(); int r = o2.getfirst(); return l == r ? 0 : (l < r ? -1 : 1); } } /** * read two integers from each line and generate a key, value pair * as ((left, right), right). */ public static class mapclass extends mapper<longwritable, text, intpair, intwritable> { private final intpair key = new intpair(); private final intwritable value = new intwritable(); @override public void map(longwritable inkey, text invalue, context context) throws ioexception, interruptedexception { stringtokenizer itr = new stringtokenizer(invalue.tostring()); int left = 0; int right = 0; if (itr.hasmoretokens()) { left = integer.parseint(itr.nexttoken()); if (itr.hasmoretokens()) { right = integer.parseint(itr.nexttoken()); } key.set(left, right); value.set(right); context.write(key, value); } } } /** * a reducer class that just emits the sum of the input values. */ public static class reduce extends reducer<intpair, intwritable, text, intwritable> { private static final text separator = new text("------------------------------------------------"); private final text first = new text(); @override public void reduce(intpair key, iterable<intwritable> values, context context) throws ioexception, interruptedexception { context.write(separator, null); first.set(integer.tostring(key.getfirst())); for(intwritable value: values) { context.write(first, value); } } } public static void main(string[] args) throws exception { configuration conf = new configuration(); string[] otherargs = new genericoptionsparser(conf, args).getremainingargs(); if (otherargs.length != 2) { system.err.println("usage: secondarysort <in> <out>"); system.exit(2); } job job = job.getinstance(conf, "secondary sort"); job.setjarbyclass(secondarysort.class); job.setmapperclass(mapclass.class); job.setreducerclass(reduce.class); // group and partition by the first int in the pair job.setpartitionerclass(firstpartitioner.class); job.setgroupingcomparatorclass(firstgroupingcomparator.class); // the map output is intpair, intwritable job.setmapoutputkeyclass(intpair.class); job.setmapoutputvalueclass(intwritable.class); // the reduce output is text, intwritable job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); fileinputformat.addinputpath(job, new path(otherargs[0])); fileoutputformat.setoutputpath(job, new path(otherargs[1])); /*delete the output directory if exists*/ path out = new path(otherargs[otherargs.length - 1]); filesystem filesystem = filesystem.get(conf); if (filesystem.exists(out)) { filesystem.delete(out, true); } system.exit(job.waitforcompletion(true) ? 0 : 1); } }