Hadoop入门(二十)Mapreduce的最小值程序
程序员文章站
2022-07-07 21:33:32
...
一、简介
最小值是统计中最常使用到的,现在使用Mapreduce在海量数据中统计数据的最小值。
二、例子
(1)实例描述
给出三个文件,每个文件中都存储了若干个数值,求所有数值中的最小值。
样例输入:
1)file1:
1
2
3
7
9
-99
2
2)file2:
11
2
23
17
9
199
22
3)file3:
21
12
3
17
2
39
12
期望输出:
-99
(2)问题分析
实现统计海量数据的最小值,不能将所有的数据加载到内存,计算只能使用类似外部排序的方式,加载一部分数据统计最小值,接着加载另一部分进行统计。
(3)实现步骤
1)Map过程
首先使用默认的TextInputFormat类对输入文件进行处理,得到文本中每行的偏移量及其内容。显然,Map过程首先必须分析输入的<key,value>对,得到数值,然后在mapper中统计单个分块的最小值。
2)Reduce过程
经过map方法处理后,Reduce过程将获取每个mapper的最小值进行统计,分行统计出最小值。
(3)关键代码
package com.mk.mapreduce;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
public class MinValue {
public static class MinValueMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> {
private Integer minValue = null;
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if (StringUtils.isBlank(value.toString())) {
System.out.println("空白行");
return;
}
Integer v = Integer.valueOf(value.toString().trim());
if(minValue==null||minValue > v){
minValue = v;
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
if (minValue != null)
context.write( new IntWritable(minValue), NullWritable.get());
}
}
public static class MinValueReducer extends Reducer< IntWritable, NullWritable,IntWritable, NullWritable> {
private Integer minValue = null;
@Override
protected void reduce(IntWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
Integer v = key.get();
if(minValue==null||minValue > v){
minValue = v;
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
if (minValue != null)
context.write( new IntWritable(minValue), NullWritable.get());
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String uri = "hdfs://192.168.150.128:9000";
String input = "/minValue/input";
String output = "/minValue/output";
Configuration conf = new Configuration();
if (System.getProperty("os.name").toLowerCase().contains("win"))
conf.set("mapreduce.app-submission.cross-platform", "true");
FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
Path path = new Path(output);
fileSystem.delete(path, true);
Job job = new Job(conf, "MinValue");
job.setJar("./out/artifacts/hadoop_test_jar/hadoop-test.jar");
job.setJarByClass(MinValue.class);
job.setMapperClass(MinValueMapper.class);
job.setReducerClass(MinValueReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPaths(job, uri + input);
FileOutputFormat.setOutputPath(job, new Path(uri + output));
boolean ret = job.waitForCompletion(true);
System.out.println(job.getJobName() + "-----" + ret);
}
}
上一篇: (二十一)Python学习之模块初识
推荐阅读
-
用PHP和Shell写Hadoop的MapReduce程序
-
Hadoop入门——MapReduce对于海量小文件的多种解决方案
-
跟我一起读《Hadoop权威指南》 第二篇 -- 入门程序,编写MapReduce处理气象数据
-
Hadoop入门(二十三)Mapreduce的求TopK程序
-
Hadoop入门(二十)Mapreduce的最小值程序
-
Hadoop入门(二十三)Mapreduce的求数量最大程序
-
Hadoop入门(二十二)Mapreduce的求平均值程序
-
Hadoop学习(3)-mapreduce快速入门加yarn的安装
-
用PHP和Shell写Hadoop的MapReduce程序
-
编写简单的Mapreduce程序并部署在Hadoop2.2.0上运行