跟我学hadoop学习4
程序员文章站
2022-07-14 22:58:54
...
// cc AvroGenericMaxTemperature MapReduce program to find the maximum temperature, creating Avro output import java.io.IOException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapred.AvroCollector; import org.apache.avro.mapred.AvroJob; import org.apache.avro.mapred.AvroMapper; import org.apache.avro.mapred.AvroReducer; import org.apache.avro.mapred.AvroUtf8InputFormat; import org.apache.avro.mapred.Pair; import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //vv AvroGenericMaxTemperature public class AvroGenericMaxTemperature extends Configured implements Tool { private static final Schema SCHEMA = new Schema.Parser().parse( "{" + " \"type\": \"record\"," + " \"name\": \"WeatherRecord\"," + " \"doc\": \"A weather reading.\"," + " \"fields\": [" + " {\"name\": \"year\", \"type\": \"int\"}," + " {\"name\": \"temperature\", \"type\": \"int\"}," + " {\"name\": \"stationId\", \"type\": \"string\"}" + " ]" + "}" ); public static class MaxTemperatureMapper extends AvroMapper<Utf8, Pair<Integer, GenericRecord>> { private NcdcRecordParser parser = new NcdcRecordParser(); private GenericRecord record = new GenericData.Record(SCHEMA); @Override public void map(Utf8 line, AvroCollector<Pair<Integer, GenericRecord>> collector, Reporter reporter) throws IOException { parser.parse(line.toString()); if (parser.isValidTemperature()) { record.put("year", parser.getYearInt()); record.put("temperature", parser.getAirTemperature()); record.put("stationId", parser.getStationId()); collector.collect( new Pair<Integer, GenericRecord>(parser.getYearInt(), record)); } } } public static class MaxTemperatureReducer extends AvroReducer<Integer, GenericRecord, GenericRecord> { @Override public void reduce(Integer key, Iterable<GenericRecord> values, AvroCollector<GenericRecord> collector, Reporter reporter) throws IOException { GenericRecord max = null; for (GenericRecord value : values) { if (max == null || (Integer) value.get("temperature") > (Integer) max.get("temperature")) { max = newWeatherRecord(value); } } collector.collect(max); } private GenericRecord newWeatherRecord(GenericRecord value) { GenericRecord record = new GenericData.Record(SCHEMA); record.put("year", value.get("year")); record.put("temperature", value.get("temperature")); record.put("stationId", value.get("stationId")); return record; } } @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } JobConf conf = new JobConf(getConf(), getClass()); conf.setJobName("Max temperature"); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); AvroJob.setInputSchema(conf, Schema.create(Schema.Type.STRING)); AvroJob.setMapOutputSchema(conf, Pair.getPairSchema(Schema.create(Schema.Type.INT), SCHEMA)); AvroJob.setOutputSchema(conf, SCHEMA); conf.setInputFormat(AvroUtf8InputFormat.class); AvroJob.setMapperClass(conf, MaxTemperatureMapper.class); AvroJob.setReducerClass(conf, MaxTemperatureReducer.class); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new AvroGenericMaxTemperature(), args); System.exit(exitCode); } } // ^^ AvroGenericMaxTemperature import java.io.IOException; import org.apache.avro.Schema; import org.apache.avro.mapred.AvroCollector; import org.apache.avro.mapred.AvroJob; import org.apache.avro.mapred.AvroMapper; import org.apache.avro.mapred.AvroReducer; import org.apache.avro.mapred.AvroUtf8InputFormat; import org.apache.avro.mapred.Pair; import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import specific.WeatherRecord; public class AvroSpecificMaxTemperature extends Configured implements Tool { public static class MaxTemperatureMapper extends AvroMapper<Utf8, Pair<Integer, WeatherRecord>> { private NcdcRecordParser parser = new NcdcRecordParser(); private WeatherRecord record = new WeatherRecord(); @Override public void map(Utf8 line, AvroCollector<Pair<Integer, WeatherRecord>> collector, Reporter reporter) throws IOException { parser.parse(line.toString()); if (parser.isValidTemperature()) { record.year = parser.getYearInt(); record.temperature = parser.getAirTemperature(); record.stationId = new Utf8(parser.getStationId()); collector.collect( new Pair<Integer, WeatherRecord>(parser.getYearInt(), record)); } } } public static class MaxTemperatureReducer extends AvroReducer<Integer, WeatherRecord, WeatherRecord> { @Override public void reduce(Integer key, Iterable<WeatherRecord> values, AvroCollector<WeatherRecord> collector, Reporter reporter) throws IOException { WeatherRecord max = null; for (WeatherRecord value : values) { if (max == null || value.temperature > max.temperature) { max = newWeatherRecord(value); } } collector.collect(max); } } public static class MaxTemperatureCombiner extends AvroReducer<Integer, WeatherRecord, Pair<Integer, WeatherRecord>> { @Override public void reduce(Integer key, Iterable<WeatherRecord> values, AvroCollector<Pair<Integer, WeatherRecord>> collector, Reporter reporter) throws IOException { WeatherRecord max = null; for (WeatherRecord value : values) { if (max == null || value.temperature > max.temperature) { max = newWeatherRecord(value); } } collector.collect(new Pair<Integer, WeatherRecord>(key, max)); } } private static WeatherRecord newWeatherRecord(WeatherRecord value) { WeatherRecord record = new WeatherRecord(); record.year = value.year; record.temperature = value.temperature; record.stationId = value.stationId; return record; } @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } JobConf conf = new JobConf(getConf(), getClass()); conf.setJobName("Max temperature"); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); AvroJob.setInputSchema(conf, Schema.create(Schema.Type.STRING)); AvroJob.setMapOutputSchema(conf, Pair.getPairSchema( Schema.create(Schema.Type.INT), WeatherRecord.SCHEMA$)); AvroJob.setOutputSchema(conf, WeatherRecord.SCHEMA$); conf.setInputFormat(AvroUtf8InputFormat.class); AvroJob.setMapperClass(conf, MaxTemperatureMapper.class); AvroJob.setCombinerClass(conf, MaxTemperatureCombiner.class); AvroJob.setReducerClass(conf, MaxTemperatureReducer.class); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new AvroSpecificMaxTemperature(), args); System.exit(exitCode); } } import java.text.*; import java.util.Date; import org.apache.hadoop.io.Text; public class NcdcRecordParser { private static final int MISSING_TEMPERATURE = 9999; private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmm"); private String stationId; private String observationDateString; private String year; private String airTemperatureString; private int airTemperature; private boolean airTemperatureMalformed; private String quality; public void parse(String record) { stationId = record.substring(4, 10) + "-" + record.substring(10, 15); observationDateString = record.substring(15, 27); year = record.substring(15, 19); airTemperatureMalformed = false; // Remove leading plus sign as parseInt doesn't like them if (record.charAt(87) == '+') { airTemperatureString = record.substring(88, 92); airTemperature = Integer.parseInt(airTemperatureString); } else if (record.charAt(87) == '-') { airTemperatureString = record.substring(87, 92); airTemperature = Integer.parseInt(airTemperatureString); } else { airTemperatureMalformed = true; } airTemperature = Integer.parseInt(airTemperatureString); quality = record.substring(92, 93); } public void parse(Text record) { parse(record.toString()); } public boolean isValidTemperature() { return !airTemperatureMalformed && airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]"); } public boolean isMalformedTemperature() { return airTemperatureMalformed; } public boolean isMissingTemperature() { return airTemperature == MISSING_TEMPERATURE; } public String getStationId() { return stationId; } public Date getObservationDate() { try { System.out.println(observationDateString); return DATE_FORMAT.parse(observationDateString); } catch (ParseException e) { throw new IllegalArgumentException(e); } } public String getYear() { return year; } public int getYearInt() { return Integer.parseInt(year); } public int getAirTemperature() { return airTemperature; } public String getAirTemperatureString() { return airTemperatureString; } public String getQuality() { return quality; } } import java.io.File; import org.apache.avro.Schema; import org.apache.avro.mapred.AvroJob; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class AvroProjection extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 3) { System.err.printf("Usage: %s [generic options] <input> <output> <schema-file>\n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } String input = args[0]; String output = args[1]; String schemaFile = args[2]; JobConf conf = new JobConf(getConf(), getClass()); conf.setJobName("Avro projection"); FileInputFormat.addInputPath(conf, new Path(input)); FileOutputFormat.setOutputPath(conf, new Path(output)); Schema schema = new Schema.Parser().parse(new File(schemaFile)); AvroJob.setInputSchema(conf, schema); AvroJob.setMapOutputSchema(conf, schema); AvroJob.setOutputSchema(conf, schema); conf.setNumReduceTasks(0); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new AvroProjection(), args); System.exit(exitCode); } } // cc AvroSort A MapReduce program to sort an Avro data file import java.io.File; import java.io.IOException; import org.apache.avro.Schema; import org.apache.avro.mapred.AvroCollector; import org.apache.avro.mapred.AvroJob; import org.apache.avro.mapred.AvroMapper; import org.apache.avro.mapred.AvroReducer; import org.apache.avro.mapred.Pair; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //vv AvroSort public class AvroSort extends Configured implements Tool { static class SortMapper<K> extends AvroMapper<K, Pair<K, K>> { public void map(K datum, AvroCollector<Pair<K, K>> collector, Reporter reporter) throws IOException { collector.collect(new Pair<K, K>(datum, null, datum, null)); } } static class SortReducer<K> extends AvroReducer<K, K, K> { public void reduce(K key, Iterable<K> values, AvroCollector<K> collector, Reporter reporter) throws IOException { for (K value : values) { collector.collect(value); } } } @Override public int run(String[] args) throws Exception { if (args.length != 3) { System.err.printf( "Usage: %s [generic options] <input> <output> <schema-file>\n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } String input = args[0]; String output = args[1]; String schemaFile = args[2]; JobConf conf = new JobConf(getConf(), getClass()); conf.setJobName("Avro sort"); FileInputFormat.addInputPath(conf, new Path(input)); FileOutputFormat.setOutputPath(conf, new Path(output)); Schema schema = new Schema.Parser().parse(new File(schemaFile)); AvroJob.setInputSchema(conf, schema); Schema intermediateSchema = Pair.getPairSchema(schema, schema); AvroJob.setMapOutputSchema(conf, intermediateSchema); AvroJob.setOutputSchema(conf, schema); AvroJob.setMapperClass(conf, SortMapper.class); AvroJob.setReducerClass(conf, SortReducer.class); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new AvroSort(), args); System.exit(exitCode); } } // ^^ AvroSort
推荐阅读
-
Linux计划任务Crontab学习笔记(4):crontab 的日志
-
Hadoop学习(9)-spark的安装与简单使用
-
C#学习基础概念二十五问第1/4页
-
学习制作MVC4分页控件(上)
-
学习制作MVC4分页控件(下)
-
学习9.总结# 1.函数初识 # 2.函数的定义 # 3.函数的调用 # 4.函数的返回值 # 5.函数的参数
-
Linux内核学习笔记(4)-- wait、waitpid、wait3 和 wait4
-
C学习笔记(4)--- 指针的应用(第一部分)
-
脱离脚手架来配置、学习 webpack4.x (一)基础搭建项目
-
Java学习笔记 DbUtils数据库查询和log4j日志输出 使用