Hadoop(4-3)-MapReduce程序案例-统计每一年最高温度
一、软件环境
我使用的软件版本如下:
1. Intellij Idea 2017.1
二、创建maven工程及配置
2.1创建工程
打开Idea,file->new->Project,左侧面板选择maven工程。(如果只跑MapReduce创建Java工程即可,不用勾选Creat from archetype,如果想创建web工程或者使用骨架可以勾选)
创建完成后以及运行结束后目录会如下:
2.2 添加maven依赖
在pom.xml添加依赖,对于Hadoop 2.6.5版本的hadoop,需要的jar包有以下几个:
• hadoop-common
• hadoop-hdfs
• hadoop-mapreduce-client-jobclient
注:
此程序需要以Hadoop文件作为输入文件,以hadoop文件作为输出文件,因此需要用到文件系统,于是需要引入hadoop-hdfs包;我们需要向Map-Reduce集群提交任务,需要用到Map-Reduce的客户端,于是需要导入hadoop-mapreduce-client-jobclient包;另外,在处理数据的时候会用到一些hadoop的数据类型例如IntWritable和Text等,因此需要导入hadoop-common包。
具体pom.xml中的依赖如下:
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.6.5</version>
</dependency>
</dependencies>
2.2 在工程根目录下创建input文件夹存放输入文件,
input创建文件temperature.txt,在其中输入
2014010114
2014010216
2014010317
2014010410
2014010506
2012010609
2012010732
2012010812
2012010919
2012011023
2001010116
2001010212
2001010310
2001010411
2001010529
2013010619
2013010722
2013010812
2013010929
2013011023
2008010105
2008010216
2008010337
2008010414
2008010516
2007010619
2007010712
2007010812
2007010999
2007011023
2010010114
2010010216
2010010317
2010010410
2010010506
2015010649
2015010722
2015010812
2015010999
2015011023
2.3 编写MapReduce程序
Temperature.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class Temperature {
/**
* 四个泛型类型分别代表:
* * KeyIn Mapper的输入数据的Key,这里是每行文字的起始位置(0,11,...)
* ValueIn Mapper的输入数据的Value,这里是每行文字
* KeyOut Mapper的输出数据的Key,这里是每行文字中的“年份”
* ValueOut Mapper的输出数据的Value,这里是每行文字中的“气温”
*/
static class TempMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 打印样本: Before Mapper: 0,2014010114
System.out.print("Before Mapper: " + key + ", " + value);
String line = value.toString();
String year = line.substring(0, 4);
int temperature = Integer.parseInt(line.substring(8));
context.write(new Text(year), new IntWritable(temperature));
// 打印样本: After Mapper:2014, 14
System.out.println(
"======" +
"After Mapper:" + new Text(year) + ", " + new IntWritable(temperature));
}
}
/**
* 四个泛型类型分别代表:
* KeyIn Reducer的输入数据的Key,这里是每行文字中的“年份”
* ValueIn Reducer的输入数据的Value,这里是每行文字中的“气温”
* KeyOut Reducer的输出数据的Key,这里是不重复的“年份”
* ValueOut Reducer的输出数据的Value,这里是这一年中的“最高气温”
*/
static class TempReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
StringBuffer sb = new StringBuffer();
//取values的最大值
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
sb.append(value).append(", ");
}
// 打印样本: Before Reduce: 2001, 12, 10, 11, 29, 16
System.out.print("Before Reduce: " + key + ", " + sb.toString());
context.write(key, new IntWritable(maxValue));
// 打印样本: After Reduce: 2001, 29
System.out.println(
"======" +
"After Reduce: " + key + ", " + maxValue);
}
}
public static void main(String[] args) throws Exception {
Configuration hadoopConfig = new Configuration();
String path[]={"input/temperature.txt","output"};
Job job = Job.getInstance(hadoopConfig, "Temperature_Max");
//如果需要打成jar运行,需要下面这句
job.setJarByClass(Temperature.class);
//job执行作业时输入和输出文件的路径
FileInputFormat.addInputPath(job, new Path(path[0]));
FileOutputFormat.setOutputPath(job, new Path(path[1]));
//指定自定义的Mapper和Reducer作为两个阶段的任务处理类
job.setMapperClass(TempMapper.class);
job.setReducerClass(TempReducer.class);
//设置最后输出结果的Key和Value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//执行job,直到完成
job.waitForCompletion(true);
System.out.println("Finished");
}
}
注:
1.上面代码中,注意Mapper类的泛型不是java的基本类型,而是Hadoop的数据类型Text、IntWritable。我们可以简单的等价为java的类String、int。
2.代码中Mapper类的泛型依次是
Before Mapper: 0, 2014010114======After Mapper:2014, 14
Before Mapper: 12, 2014010216======After Mapper:2014, 16
Before Mapper: 24, 2014010317======After Mapper:2014, 17
Before Mapper: 36, 2014010410======After Mapper:2014, 10
Before Mapper: 48, 2014010506======After Mapper:2014, 6
Before Mapper: 60, 2012010609======After Mapper:2012, 9
Before Mapper: 72, 2012010732======After Mapper:2012, 32
Before Mapper: 84, 2012010812======After Mapper:2012, 12
Before Mapper: 96, 2012010919======After Mapper:2012, 19
Before Mapper: 108, 2012011023======After Mapper:2012, 23
Before Mapper: 120, 2001010116======After Mapper:2001, 16
Before Mapper: 132, 2001010212======After Mapper:2001, 12
Before Mapper: 144, 2001010310======After Mapper:2001, 10
Before Mapper: 156, 2001010411======After Mapper:2001, 11
Before Mapper: 168, 2001010529======After Mapper:2001, 29
Before Mapper: 180, 2013010619======After Mapper:2013, 19
Before Mapper: 192, 2013010722======After Mapper:2013, 22
Before Mapper: 204, 2013010812======After Mapper:2013, 12
Before Mapper: 216, 2013010929======After Mapper:2013, 29
Before Mapper: 228, 2013011023======After Mapper:2013, 23
Before Mapper: 240, 2008010105======After Mapper:2008, 5
Before Mapper: 252, 2008010216======After Mapper:2008, 16
Before Mapper: 264, 2008010337======After Mapper:2008, 37
Before Mapper: 276, 2008010414======After Mapper:2008, 14
Before Mapper: 288, 2008010516======After Mapper:2008, 16
Before Mapper: 300, 2007010619======After Mapper:2007, 19
Before Mapper: 312, 2007010712======After Mapper:2007, 12
Before Mapper: 324, 2007010812======After Mapper:2007, 12
Before Mapper: 336, 2007010999======After Mapper:2007, 99
Before Mapper: 348, 2007011023======After Mapper:2007, 23
Before Mapper: 360, 2010010114======After Mapper:2010, 14
Before Mapper: 372, 2010010216======After Mapper:2010, 16
Before Mapper: 384, 2010010317======After Mapper:2010, 17
Before Mapper: 396, 2010010410======After Mapper:2010, 10
Before Mapper: 408, 2010010506======After Mapper:2010, 6
Before Mapper: 420, 2015010649======After Mapper:2015, 49
Before Mapper: 432, 2015010722======After Mapper:2015, 22
Before Mapper: 444, 2015010812======After Mapper:2015, 12
Before Mapper: 456, 2015010999======After Mapper:2015, 99
Before Mapper: 468, 2015011023======After Mapper:2015, 23
Before Reduce: 2001, 12, 10, 11, 29, 16, ======After Reduce: 2001, 29
Before Reduce: 2007, 23, 19, 12, 12, 99, ======After Reduce: 2007, 99
Before Reduce: 2008, 16, 14, 37, 16, 5, ======After Reduce: 2008, 37
Before Reduce: 2010, 10, 6, 14, 16, 17, ======After Reduce: 2010, 17
Before Reduce: 2012, 19, 12, 32, 9, 23, ======After Reduce: 2012, 32
Before Reduce: 2013, 23, 29, 12, 22, 19, ======After Reduce: 2013, 29
Before Reduce: 2014, 14, 6, 10, 17, 16, ======After Reduce: 2014, 17
Before Reduce: 2015, 23, 49, 22, 12, 99, ======After Reduce: 2015, 99
Finished
上一篇: 联想R7000 ubuntu 18.04 安装1650ti驱动
下一篇: Python数据可视化