欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop入门(二十一)Mapreduce的求和程序

程序员文章站 2022-03-05 12:05:29
...

一、简介

求和是统计中最常使用到的,现在使用Mapreduce在海量数据中统计数据的求和。

 

二、例子

(1)实例描述
给出三个文件,每个文件中都存储了若干个数值,求所有数值中的求和。

样例输入:                                            
1)file1:  

1
2
3
7
9
-99
2


2)file2:  

11
2
23
17
9
199
22


3)file3:  

21
12
3
17
2
39
12


 期望输出:

314

 

(2)问题分析
实现统计海量数据的求和,不能将所有的数据加载到内存,计算只能使用类似外部排序的方式,加载一部分数据统计求和,接着加载另一部分进行统计。

(3)实现步骤

1)Map过程 
    首先使用默认的TextInputFormat类对输入文件进行处理,得到文本中每行的偏移量及其内容。显然,Map过程首先必须分析输入的<key,value>对,得到数值,然后在mapper中统计单个分块的求和。

2)Reduce过程 
    经过map方法处理后,Reduce过程将获取每个mapper的求和进行统计,分行统计出总的求和。

 

(3)关键代码

package com.mk.mapreduce;


import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;

public class SumValue {

    public static class SumValueMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> {

        private int sumValue = 0;

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            if (StringUtils.isBlank(value.toString())) {
                System.out.println("空白行");
                return;
            }

            int v = Integer.parseInt(value.toString().trim());

            sumValue = sumValue + v;
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
             context.write( new IntWritable(sumValue), NullWritable.get());
        }
    }


    public static class SumValueReducer extends Reducer< IntWritable, NullWritable,IntWritable, NullWritable> {

        private int sumValue = 0;
        @Override
        protected void reduce(IntWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

            int v = key.get();
            sumValue = sumValue + v;

        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
             context.write( new IntWritable(sumValue), NullWritable.get());
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        String uri = "hdfs://192.168.150.128:9000";
        String input = "/sumValue/input";
        String output = "/sumValue/output";
        Configuration conf = new Configuration();
        if (System.getProperty("os.name").toLowerCase().contains("win"))
            conf.set("mapreduce.app-submission.cross-platform", "true");

        FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(output);
        fileSystem.delete(path, true);

        Job job = new Job(conf, "SumValue");
        job.setJar("./out/artifacts/hadoop_test_jar/hadoop-test.jar");
        job.setJarByClass(SumValue.class);
        job.setMapperClass(SumValueMapper.class);
        job.setReducerClass(SumValueReducer.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.addInputPaths(job, uri + input);
        FileOutputFormat.setOutputPath(job, new Path(uri + output));


        boolean ret = job.waitForCompletion(true);
        System.out.println(job.getJobName() + "-----" + ret);
    }
}