mapreduce剖析气象站平均气温
一、气象数据
按行并以 ASCII 格式存储,每一行是一条记录。下图展示了一行采样数据。
1998 #year
03 #month
09 #day
17 #hour
11 #temperature
-100 #dew
10237 #pressure
60 #wind_direction
72 #wind_speed
0 #sky_condition
0 #rain_1h
-9999 #rain_6h
将每个气象站的数据文件拼接成一个单独的文件更容易处理,预处理过的数据文件示例如下所示:
1985 07 31 02 200 94 10137 220 26 1 0 -9999
1985 07 31 03 172 94 10142 240 0 0 0 -9999
1985 07 31 04 156 83 10148 260 10 0 0 -9999
1985 07 31 05 133 78 -9999 250 0 -9999 0 -9999
1985 07 31 06 122 72 -9999 90 0 -9999 0 0
1985 07 31 07 117 67 -9999 60 0 -9999 0 -9999
1985 07 31 08 111 61 -9999 90 0 -9999 0 -9999
1985 07 31 09 111 61 -9999 60 5 -9999 0 -9999
1985 07 31 10 106 67 -9999 80 0 -9999 0 -9999
1985 07 31 11 100 56 -9999 50 5 -9999 0 -9999
二、数据导入hdfs
使用myeclipse链接集群hdfs,将数据导入hdfs系统的/home/hadoop/weather目录下(自行创建一个存放数据的目录)。
命令行访问
三、思路分析
1.map 阶段输入 原始数据。选择文本格式作为输入格式,将数据集的每一行作为文本输入。提取气象站和气温信息,并将它们作为输出。
2.reduce阶段接收map 函数的输出结果:每个气象站后面紧跟着一系列气温数据,使用reduce 函数遍历整个列表并统计出平均气温。
四、代码
1.编写 Mapper 类,实现 map() 方法,提取气象站和气温数据。
/*
* 定义一个Mapper类实现map()方法解析气象站数据
* input key=偏移量,input value =每行数据
* output key=weatherStationID, output value =temperature
*/
public class TempratureMapper extends Mapper<LongWritable,Text,Text,IntWritable>
{
public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException
{
//将每行数据转换成string类型
String line = value.toString();
//提取气温值
int temperature = Integer.parseInt(line.substring(14,19).trim());
//过滤无效数据
if(temperature != -9999)
{
//获取分片
FileSplit filesplit = (FileSplit) context.getInputSplit();
//获取气象站编号
String weatherStationID = filesplit.getPath().getName().substring(5, 10);
//输出处理好的数据
context.write(new Text(weatherStationID),new IntWritable(temperature));
}
}
2.编写reduce函数,统计平均气温
/*
* 定义一个reducer类实现reduce()方法统计各个气象站的平均气温
* input key=weatherStationID,input value =temperature
* output key=weatherStationID, output value =average(temperature)
*/
public class TemperatureReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
//自定义成员变量result用以保存输出结果
private IntWritable result = new IntWritable();
public void reduce(Text key,Iterable< IntWritable> values,Context context) throws IOException, InterruptedException
{
//统计相同气象站的所有气温值
int sum = 0;
int count = 0;
for(IntWritable i : values)
{
//对所有气温值累加
sum += i.get();
//统计集合大小
count++;
}
//求同一个气象站的平均气温
result.set(sum/count);
context.write(key, result);
}
}
ps:Iterable:迭代器,就是专门处理集合元素的对象,例如删除和获取集合中的元素。但是该对象比较特殊,不能直接创建对象(通过new),该对象是以内部类的形式存在于每个集合类的内部。
3.编写函数,运行 MapReduce 作业。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @function 统计美国各个气象站30年来的平均气温
* @author 小讲
*
*/
public class Temperature extends Configured implements Tool {
/**
* @function 任务驱动方法
* @param args
* @return
* @throws Exception
*/
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();//读取配置文件
Path mypath = new Path(args[1]);
FileSystem hdfs = mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
}
Job job = new Job(conf, "temperature");//新建一个任务
job.setJarByClass(Temperature.class);// 设置主类
FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
job.setMapperClass(TemperatureMapper.class);// Mapper
job.setReducerClass(TemperatureReducer.class);// Reducer
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true)?0:1;//提交任务
}
/**
* @function main 方法
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//数据输入路径和输出路径
String[] args0 = {
"hdfs://pc1:9000/home/hadoop/weather/",
"hdfs://pc1:9000/home/hadoop/weather/out/"
};
int ec = ToolRunner.run(new Configuration(), new Temperature(), args0);
System.exit(ec);
}
4.对代码进行单元测试及debug调试。
编写mapper测试方法
public class TemperatureMapperTest {
private Mapper mapper;//定义一个Mapper对象
private MapDriver driver;//定义一个MapDriver 对象
@Before
public void init() {
mapper = new Temperature.TemperatureMapper();//实例化一个Temperature中的TemperatureMapper对象
driver = new MapDriver(mapper);//实例化MapDriver对象
}
@Test
public void test() throws IOException {
//输入一行测试数据
String line = "1985 07 31 02 200 94 10137 220 26 1 0 -9999";
driver.withInput(new LongWritable(), new Text(line))//跟TemperatureMapper输入类型一致
.withOutput(new Text("weatherStationId"), new IntWritable(200))//跟TemperatureMapper输出类型一致
.runTest();
}
}
鼠标放在 TemperatureMapperTest 类上右击,选择 Run As ——> JUnit test,运行结果如下图所示:
左边的对话框里显示”Runs:1/1,Errors:0,Failures:0”,说明 Mapper 测试成功了。
编写reduce测试方法
public class TemperatureReduceTest {
private Reducer reducer;//定义一个Reducer对象
private ReduceDriver driver;//定义一个ReduceDriver对象
@Before
public void init() {
reducer = new Temperature.TemperatureReducer();//实例化一个Temperature中的TemperatureReducer对象
driver = new ReduceDriver(reducer);//实例化ReduceDriver对象
}
@Test
public void test() throws IOException {
String key = "weatherStationId";//声明一个key值
List values = new ArrayList();
values.add(new IntWritable(200));//添加第一个value值
values.add(new IntWritable(100));//添加第二个value值
driver.withInput(new Text("weatherStationId"), values)//跟TemperatureReducer输入类型一致
.withOutput(new Text("weatherStationId"), new IntWritable(150))//跟TemperatureReducer输出类型一致
.runTest();
}
}
右键 Temperature 项目,选择”Debug As” ——> “java Application”,在程序中打上断点后直接进入调试模式,如下图所示。
五、运行程序
1.在myeclipse上运行程序
2.将 MapReduce作业打成jar包即temperature.jar,然后然后上传至/home/hadoop/ 目录下,由 hadoop 脚本来执行。
保存为Temp.jar,通过rz命令将Temp.jar上传到、home/hadoop/app/hadoop目录下。执行命令:
[aaa@qq.com hadoop]$ bin/hadoop jar Temp.jar com.pc1.hadoop.test.Temperature home/hadoop/weather/ home/hadoop/weather/out/
最后在 out目录查看结果 ,与本地结果一致。