Join源代码注释
程序员文章站
2022-06-06 17:53:46
...
package org.apache.hadoop.examples; import java.io.IOException; import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.join.*; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** hadoop的map/reduce例子,演示了用CompositeInputFormat类来对两个数据集进行join操作,可以是inner join;outerjoin;也可以是override join。 * This is the trivial map/reduce program that does absolutely nothing * other than use the framework to fragment and sort the input values. * * To run: bin/hadoop jar build/hadoop-examples.jar join * [-m <i>maps</i>] [-r <i>reduces</i>] * [-inFormat <i>input format class</i>] * [-outFormat <i>output format class</i>] * [-outKey <i>output key class</i>] * [-outValue <i>output value class</i>] * [-joinOp <inner|outer|override>] * [<i>in-dir</i>]* <i>in-dir</i> <i>out-dir</i> */ public class Join extends Configured implements Tool { static int printUsage() { System.out.println("join [-m <maps>] [-r <reduces>] " + "[-inFormat <input format class>] " + "[-outFormat <output format class>] " + "[-outKey <output key class>] " + "[-outValue <output value class>] " + "[-joinOp <inner|outer|override>] " + "[input]* <input> <output>"); ToolRunner.printGenericCommandUsage(System.out); return -1; } /**driver代码 * The main driver for sort program. * Invoke this method to submit the map/reduce job. * @throws IOException When there is communication problems with the * job tracker. */ public int run(String[] args) throws Exception { JobConf jobConf = new JobConf(getConf(), Sort.class); jobConf.setJobName("join"); jobConf.setMapperClass(IdentityMapper.class); //设置map类是IdentityMapper jobConf.setReducerClass(IdentityReducer.class);//设置reduce类是IdentityReducer JobClient client = new JobClient(jobConf); ClusterStatus cluster = client.getClusterStatus(); int num_maps = cluster.getTaskTrackers() * jobConf.getInt("test.sort.maps_per_host", 10); int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9); String sort_reduces = jobConf.get("test.sort.reduces_per_host"); if (sort_reduces != null) { num_reduces = cluster.getTaskTrackers() * Integer.parseInt(sort_reduces); } Class<? extends InputFormat> inputFormatClass = SequenceFileInputFormat.class; Class<? extends OutputFormat> outputFormatClass = SequenceFileOutputFormat.class; Class<? extends WritableComparable> outputKeyClass = BytesWritable.class; Class<? extends Writable> outputValueClass = TupleWritable.class; String op = "inner"; //join是inner join List<String> otherArgs = new ArrayList<String>(); for(int i=0; i < args.length; ++i) { try { if ("-m".equals(args[i])) { num_maps = Integer.parseInt(args[++i]); } else if ("-r".equals(args[i])) { num_reduces = Integer.parseInt(args[++i]); } else if ("-inFormat".equals(args[i])) { inputFormatClass = Class.forName(args[++i]).asSubclass(InputFormat.class); } else if ("-outFormat".equals(args[i])) { outputFormatClass = Class.forName(args[++i]).asSubclass(OutputFormat.class); } else if ("-outKey".equals(args[i])) { outputKeyClass = Class.forName(args[++i]).asSubclass(WritableComparable.class); } else if ("-outValue".equals(args[i])) { outputValueClass = Class.forName(args[++i]).asSubclass(Writable.class); } else if ("-joinOp".equals(args[i])) { op = args[++i]; } else { otherArgs.add(args[i]); } } catch (NumberFormatException except) { System.out.println("ERROR: Integer expected instead of " + args[i]); return printUsage(); } catch (ArrayIndexOutOfBoundsException except) { System.out.println("ERROR: Required parameter missing from " + args[i-1]); return printUsage(); // exits } } // Set user-supplied (possibly default) job configs jobConf.setNumMapTasks(num_maps); jobConf.setNumReduceTasks(num_reduces); if (otherArgs.size() < 2) { System.out.println("ERROR: Wrong number of parameters: "); return printUsage(); } FileOutputFormat.setOutputPath(jobConf, new Path(otherArgs.remove(otherArgs.size() - 1))); List<Path> plist = new ArrayList<Path>(otherArgs.size()); for (String s : otherArgs) { plist.add(new Path(s)); } jobConf.setInputFormat(CompositeInputFormat.class);//设置输入格式类是CompositeInputFormat jobConf.set("mapred.join.expr", CompositeInputFormat.compose( op, inputFormatClass, plist.toArray(new Path[0]))); //设置CompositeInputFormat的参数,包括join类型,输入格式类型。 jobConf.setOutputFormat(outputFormatClass); jobConf.setOutputKeyClass(outputKeyClass); jobConf.setOutputValueClass(outputValueClass); Date startTime = new Date(); System.out.println("Job started: " + startTime); JobClient.runJob(jobConf); Date end_time = new Date(); System.out.println("Job ended: " + end_time); System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds."); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Join(), args); System.exit(res); } }