Hadoop分析气象温度数据
程序员文章站
2022-03-22 18:25:57
...
首先下载气象数据,然后解压数据集,并保存在文本文件中:
下载地址:ftp://ftp.ncdc.noaa.gov/pub/data/noaa
我们下载国内的气象数据,使用下面命令进行下载
wget -D --accept-regex=REGEX -P data -r -c ftp://ftp.ncdc.noaa.gov/pub/data/noaa/2017/5*
国内气象站ID区间50001-59998
详细的可以在《1951—2007年中国地面气候资料日值数据集台站信息》中查看,不过应该不全。另外《StationIDs_Global_1509》中提供了世界各国气象站编号范围。
其中第5-10位表示气象站编号:501360(取前五位),查表可得对应的是黑龙江漠河。我们主要分析的是月份:16-21位和空气温度:88-92位的极值关系。
package com.temperature.yexin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class TempJobMR {
public static void main(String[] args) throws Exception{
Configuration config = new Configuration();//
System.out.println("Running MapReduce");
String[] otherArgs = new GenericOptionsParser(config, args).getRemainingArgs();
if(otherArgs.length < 2){
System.out.println("Usage:worcount <in>[<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(config,"word count by yexin");
job.setJarByClass(TempJobMR.class);
job.setMapperClass(TempMapper.class);
//job.setCombinerClass(TempReducer.class);
job.setReducerClass(TempReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for(int i = 0;i < otherArgs.length-1;++i){
FileInputFormat.addInputPath(job,new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length -1]));
System.exit(job.waitForCompletion(true) ? 0:1);
}
}
------------------------------------------------------------------------------------------------------------------
package com.temperature.yexin;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TempReducer extends Reducer<Text, Text,Text, IntWritable>{
private IntWritable result = new IntWritable();
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
int maxValue = Integer.MIN_VALUE;
for(IntWritable value : values){
maxValue = Math.max(maxValue, value.get());
System.out.println("+++++++++++++++++++++++++++++++++++++++++");
}
context.write(key, new IntWritable(maxValue));
}
}
----------------------------------------------------------------------------------------------------------------
package com.temperature.yexin;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class TempJobMR {
public static void main(String[] args) throws Exception{
Configuration config = new Configuration();//
System.out.println("Running MapReduce");
String[] otherArgs = new GenericOptionsParser(config, args).getRemainingArgs();
if(otherArgs.length < 2){
System.out.println("Usage:worcount <in>[<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(config,"word count by yexin");
job.setJarByClass(TempJobMR.class);
job.setMapperClass(TempMapper.class);
//job.setCombinerClass(TempReducer.class);
job.setReducerClass(TempReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for(int i = 0;i < otherArgs.length-1;++i){
FileInputFormat.addInputPath(job,new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length -1]));
System.exit(job.waitForCompletion(true) ? 0:1);
}
}
------------------------------------------------------------------------------------------------------------------
package com.temperature.yexin;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TempReducer extends Reducer<Text, Text,Text, IntWritable>{
private IntWritable result = new IntWritable();
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
int maxValue = Integer.MIN_VALUE;
for(IntWritable value : values){
maxValue = Math.max(maxValue, value.get());
System.out.println("+++++++++++++++++++++++++++++++++++++++++");
}
context.write(key, new IntWritable(maxValue));
}
}
----------------------------------------------------------------------------------------------------------------
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TempMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
private static final int MISSING = 9999;
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
String line = value.toString();
//System.out.println("Mapper---------------------------------------------------------");
String year = line.substring(15, 21);//the data is in 15~19
int airTemperarure;
if (line.charAt(87) == '+'){ //judge the temperature is + or -
airTemperarure = Integer.parseInt(line.substring(88,92));
}else{
airTemperarure = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if(airTemperarure != MISSING && quality.matches("[01459]")){
context.write(new Text(year),new IntWritable(airTemperarure));
}
}
}
------------------------------------------------------------------------------------------------
结果很奇怪,再找原因,似乎时map的输出数据:
推荐阅读