RandomWriter代码注释
程序员文章站
2022-06-06 18:34:34
...
package org.apache.hadoop.examples; import java.io.IOException; import java.util.Date; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** 程序是hadoop的 map/reducer例子程序,主要功能是生成随机数的二进制文件 代码中自定义了inputformat,作为虚拟的mapper文件输入。代码中还用counter统计了一些状态。 * This program uses map/reduce to just run a distributed job where there is * no interaction between the tasks and each task write a large unsorted * random binary sequence file of BytesWritable. * In order for this program to generate data for terasort with 10-byte keys * and 90-byte values, have the following config: * <xmp> * <?xml version="1.0"?> * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> * <configuration> * <property> * <name>test.randomwrite.min_key</name> * <value>10</value> * </property> * <property> * <name>test.randomwrite.max_key</name> * <value>10</value> * </property> * <property> * <name>test.randomwrite.min_value</name> * <value>90</value> * </property> * <property> * <name>test.randomwrite.max_value</name> * <value>90</value> * </property> * <property> * <name>test.randomwrite.total_bytes</name> * <value>1099511627776</value> * </property> * </configuration></xmp> * * Equivalently, {@link RandomWriter} also supports all the above options * and ones supported by {@link GenericOptionsParser} via the command-line. */ public class RandomWriter extends Configured implements Tool { /** * User counters */ static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN } /**自定义的文件输入格式作为虚拟的mapper文件输入,需要实现接口InputFormat两个方法。一个是getSplits,另一个是getRecordReader * A custom input format that creates virtual inputs of a single string * for each map. */ static class RandomInputFormat implements InputFormat<Text, Text> { /** 返回inputsplit数组,filesplit是inputsplit的一个实现。实例化有四个参数 第一个是文件名,第二个是filesplit开始字节位置,第三个是filesplit字节长度,第4个是filesplit位置信息,host数组的列表 * Generate the requested number of file splits, with the filename * set to the filename of the output file. */ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { InputSplit[] result = new InputSplit[numSplits]; Path outDir = FileOutputFormat.getOutputPath(job); for(int i=0; i < result.length; ++i) { result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, (String[])null); } return result; } /**嵌套静态类,自定义的recordreader。用于读取分片split * Return a single record (filename, "") where the filename is taken from * the file split. */ static class RandomRecordReader implements RecordReader<Text, Text> { Path name; public RandomRecordReader(Path p) { name = p; } public boolean next(Text key, Text value) { if (name != null) { key.set(name.getName()); name = null; return true; } return false; } public Text createKey() { return new Text(); } public Text createValue() { return new Text(); } public long getPos() { return 0; } public void close() {} public float getProgress() { return 0.0f; } } public RecordReader<Text, Text> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { return new RandomRecordReader(((FileSplit) split).getPath()); } } /* mapper类*/ static class Map extends MapReduceBase implements Mapper<WritableComparable, Writable, BytesWritable, BytesWritable> { private long numBytesToWrite; //生成的字节长度总数 private int minKeySize;//最小key大小 private int keySizeRange;//key的大小范围 private int minValueSize;//最小value大小 private int valueSizeRange;//value的大小范围 private Random random = new Random(); //随机数 private BytesWritable randomKey = new BytesWritable(); private BytesWritable randomValue = new BytesWritable(); /* 为每个字节生成一个随机数*/ private void randomizeBytes(byte[] data, int offset, int length) { for(int i=offset + length - 1; i >= offset; --i) { data[i] = (byte) random.nextInt(256); } } /**map方法 * Given an output filename, write a bunch of random records to it. */ public void map(WritableComparable key, Writable value, OutputCollector<BytesWritable, BytesWritable> output, Reporter reporter) throws IOException { int itemCount = 0; while (numBytesToWrite > 0) { int keyLength = minKeySize + (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0); randomKey.setSize(keyLength); randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength()); int valueLength = minValueSize + (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0); randomValue.setSize(valueLength); randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength()); output.collect(randomKey, randomValue);//输出随机的key和随机的value numBytesToWrite -= keyLength + valueLength; reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);//状态统计 reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);//状态统计 if (++itemCount % 200 == 0) { reporter.setStatus("wrote record " + itemCount + ". " + numBytesToWrite + " bytes left."); } } reporter.setStatus("done with " + itemCount + " records."); } /**初始化参数 * Save the values out of the configuaration that we need to write * the data. */ @Override public void configure(JobConf job) { numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map", 1*1024*1024*1024); minKeySize = job.getInt("test.randomwrite.min_key", 10); keySizeRange = job.getInt("test.randomwrite.max_key", 1000) - minKeySize; minValueSize = job.getInt("test.randomwrite.min_value", 0); valueSizeRange = job.getInt("test.randomwrite.max_value", 20000) - minValueSize; } } /**driver方法 * This is the main routine for launching a distributed random write job. * It runs 10 maps/node and each node writes 1 gig of data to a DFS file. * The reduce doesn't do anything. * * @throws IOException */ public int run(String[] args) throws Exception { if (args.length == 0) { System.out.println("Usage: writer <out-dir>"); ToolRunner.printGenericCommandUsage(System.out); return -1; } Path outDir = new Path(args[0]); JobConf job = new JobConf(getConf()); job.setJarByClass(RandomWriter.class); job.setJobName("random-writer"); FileOutputFormat.setOutputPath(job, outDir); job.setOutputKeyClass(BytesWritable.class); job.setOutputValueClass(BytesWritable.class); job.setInputFormat(RandomInputFormat.class);//设置输入文件格式类 job.setMapperClass(Map.class); job.setReducerClass(IdentityReducer.class); job.setOutputFormat(SequenceFileOutputFormat.class);//设置输出文件格式 JobClient client = new JobClient(job); ClusterStatus cluster = client.getClusterStatus(); int numMapsPerHost = job.getInt("test.randomwriter.maps_per_host", 10); long numBytesToWritePerMap = job.getLong("test.randomwrite.bytes_per_map", 1*1024*1024*1024); if (numBytesToWritePerMap == 0) { System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0"); return -2; } long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes", numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers()); int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap); if (numMaps == 0 && totalBytesToWrite > 0) { numMaps = 1; job.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite); } job.setNumMapTasks(numMaps); System.out.println("Running " + numMaps + " maps."); // reducer NONE job.setNumReduceTasks(0); //设置reducer的数目为0 Date startTime = new Date(); System.out.println("Job started: " + startTime); JobClient.runJob(job); Date endTime = new Date(); System.out.println("Job ended: " + endTime); System.out.println("The job took " + (endTime.getTime() - startTime.getTime()) /1000 + " seconds."); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new RandomWriter(), args); System.exit(res); } }
上一篇: java web开发中文乱码全面解决攻略
下一篇: ubuntu装配php-curl扩展