欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SecondarySort代码的注释  

程序员文章站 2022-06-06 18:19:40
...
package org.apache.hadoop.examples; 

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import java.util.StringTokenizer; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.RawComparator; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.io.WritableComparator; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Partitioner; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.util.GenericOptionsParser; 

/** 
* hadoop的map/reduce自带的例子代码,目的是演示二次排序, 
* 输入是文本文件,文本的每行是两个用空格分隔的整数,程序的输出结果是 
* 按前一个整数排序,然后按后一个整数排序。应用场景是:数据join的reduce 
* 端的算法。 
* 
* To run: bin/hadoop jar build/hadoop-examples.jar secondarysort 
*            in-dir out-dir 
*/ 
public class SecondarySort { 

  /** 
   * 自定义map的输出key类型,而不是默认的Text类型,他必须实现接口comparable ,作用是输出中对key自定义排序(倒序),实现中必须提供数据的
读写方法(readFields,write)。等于(equals)和比较方法(compareTo)。 
   * 
   */ 
  public static class IntPair 
                      implements WritableComparable { 
    private int first = 0; 
    private int second = 0; 
    
    /** 
     * Set the left and right values. 
     */ 
    public void set(int left, int right) { 
      first = left; 
      second = right; 
    } 
    public int getFirst() { 
      return first; 
    } 
    public int getSecond() { 
      return second; 
    } 
    /** 
     * 按编码读两个整数,编码是:读第一个整数+ Integer.MIN_VALUE+读第二个整数 +  Integer.MIN_VALUE 

     * Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1 
     */ 
    @Override 
    public void readFields(DataInput in) throws IOException { 
      first = in.readInt() + Integer.MIN_VALUE; 
      second = in.readInt() + Integer.MIN_VALUE; 
    } 
   /*按编码写两个整数。编码是:写第一个整数-Integer.MIN_VALUE +写第二个整数 -  Integer.MIN_VALUE 
*/   
    @Override 
    public void write(DataOutput out) throws IOException { 
      out.writeInt(first - Integer.MIN_VALUE); 
      out.writeInt(second - Integer.MIN_VALUE); 
    } 

  /*哈希方法,估计用于对象比较 
*/    @Override 
    public int hashCode() { 
      return first * 157 + second; 
    } 

/*等于方法,和compareTo方法用于排序,而且两者必须一致,不然,排序的结果可能不对。 
*/    @Override 
    public boolean equals(Object right) { 
      if (right instanceof IntPair) { 
        IntPair r = (IntPair) right; 
        return r.first == first && r.second == second; 
      } else { 
        return false; 
      } 
    } 
    /** 内置的静态类,用于对象的排序比较。如果没有这个类,排序的话 
   用 IntPair类自身的等于和比较函数来排序*/ 
    public static class Comparator extends WritableComparator { 
      public Comparator() { 
        super(IntPair.class); 
      } 

     /*排序方法,b1第一个对象的字节数组第一个元素,s1首个字节的长度, 
    l1整个字节数组的长度, 
    b2第二个对象的字节数组第一个元素,s2首个字节的长度, 
    l2整个字节数组的长度, 
*/      public int compare(byte[] b1, int s1, int l1, 
                         byte[] b2, int s2, int l2) { 
        return compareBytes(b1, s1, l1, b2, s2, l2);//比较对象的字节数组,遇到第一个不同立即返回 
      } 
    } 

    static {                                        // 登记比较器comparator,对象     IntPair按比较器的定义排序 
      WritableComparator.define(IntPair.class, new Comparator()); 
    } 
/*对象的比较方法 
*/    @Override 
     public int compareTo(IntPair o) {
      if (first != o.first) {
        return first < o.first ? -1 : 1;
      } else if (second != o.second) {
        return second < o.second ? -1 : 1;
      } else {
        return 0;
      }
    }
  }
 /** 
   * pair对象的Partitioner方法。默认是hashpartition方法。 
   目的是按前一个整数(firest)来决定让那个reducer来处理   */ 
  public static class FirstPartitioner extends Partitioner{ 
    @Override 
    public int getPartition(IntPair key, IntWritable value, 
                            int numPartitions) { 
      return Math.abs(key.getFirst() * 127) % numPartitions; 
    } 
  } 

  /** 
   *  reducer的输出按第一个整数来输出,自定义的比较器   */ 
  public static class FirstGroupingComparator 
                implements RawComparator { 

/*比较字节数组,注释掉该函数会报错 
*/    @Override 
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, 
                                             b2, s2, Integer.SIZE/8); 
    } 

/*作用不明,注释掉改函数,不会报错。 
*/   @Override 
   public int compare(IntPair o1, IntPair o2) {
      int l = o1.getFirst();
      int r = o2.getFirst();
      return l == r ? 0 : (l < r ? -1 : 1);
    }
  }
/**mapper类,输出的key是自定义的IntPair 
   * Read two integers from each line and generate a key, value pair 
   * as ((left, right), right). 
   */ 
  public static class MapClass 
         extends Mapper { 
    
    private final IntPair key = new IntPair(); 
    private final IntWritable value = new IntWritable(); 
    
    @Override 
    public void map(LongWritable inKey, Text inValue, 
                    Context context) throws IOException, InterruptedException { 
      StringTokenizer itr = new StringTokenizer(inValue.toString()); 
      int left = 0; 
      int right = 0; 
      if (itr.hasMoreTokens()) { 
        left = Integer.parseInt(itr.nextToken()); 
        if (itr.hasMoreTokens()) { 
          right = Integer.parseInt(itr.nextToken()); 
        } 
        key.set(left, right); 
        value.set(right); 
        context.write(key, value); 
      } 
    } 
  } 
  
  /**reducer类 
   * A reducer class that just emits the sum of the input values. 
   */ 
  public static class Reduce 
         extends Reducer { 
    private static final Text SEPARATOR = 
      new Text(&quot;------------------------------------------------&quot;); 
    private final Text first = new Text(); 
    
    @Override 
    public void reduce(IntPair key, Iterable values, 
                       Context context 
                       ) throws IOException, InterruptedException { 
      context.write(SEPARATOR, null); 
      first.set(Integer.toString(key.getFirst())); 
      for(IntWritable value: values) { 
        context.write(first, value); 
      } 
    } 
  } 
  
  public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
    if (otherArgs.length != 2) { 
      System.err.println(&quot;Usage: secondarysrot &quot;); 
      System.exit(2); 
    } 
     Job job = new Job(conf, "secondary sort");//生成job对象 
    job.setJarByClass(SecondarySort.class);//指定jar包的名字 
    job.setMapperClass(MapClass.class);//设置mapper类 
    job.setReducerClass(Reduce.class);//设置reducer类 

    // group and partition by the first int in the pair 
    job.setPartitionerClass(FirstPartitioner.class);//设置自定义的partition类 
    job.setGroupingComparatorClass(FirstGroupingComparator.class); 
    //设置分组比较器。(按前一个整数来输出) 
    // the map output is IntPair, IntWritable 
    job.setMapOutputKeyClass(IntPair.class);//设置mapper输出的key类型 
    job.setMapOutputValueClass(IntWritable.class);//设置mapper输出的value类型 

    // the reduce output is Text, IntWritable 
    job.setOutputKeyClass(Text.class); //设置reducer输出的key的类型 
    job.setOutputValueClass(IntWritable.class);//设置reuducer输出的value的类型 
    
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//设置输入路径 
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//设置输出路径 
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
  } 

}