欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop入门(二十三)Mapreduce的求数量最大程序

程序员文章站 2022-07-07 21:32:56
...

一、简介

在文件中统计出现最多个数的单词,将其输出到hdfs文件上。

 

二、例子

(1)实例描述
给出三个文件,每个文件中都若干个单词以空白符分隔,需要统计出现最多的单词                                            

样例输入:                                            
1)file1:  

MapReduce is simple

2)file2:  

MapReduce is powerful is simple 

3)file3:  

Hello MapReduce bye MapReduce

期望输出:

MapReduce      4

(2)问题分析
实现"统计出现最多个数的单词"只要关注的信息为:单词、词频。

 

(3)实现步骤

1)Map过程 

首先使用默认的TextInputFormat类对输入文件进行处理,得到文本中每行的偏移量及其内容。显然,Map过程首先必须分析输入的<key,value>对,得到倒排索引中需要的三个信息:单词、词频

2)Combine过程 
    经过map方法处理后,Combine过程将key值相同的value值累加,得到一个单词在文档在文档中的词频,输出作为Reduce过程的输入。

3)Reduce过程 
经过上述两个过程后,Reduce过程只需将相同key值的value值累加,保留最大词频的单词输出。

 

(4)代码实现

package com.mk.mapreduce;


import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.util.*;

public class MaxWord {

    public static class MaxWordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

        private final Text newKey = new Text();
        private final IntWritable newValue = new IntWritable(1);

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            if (StringUtils.isBlank(value.toString())) {
                System.out.println("空白行");
                return;
            }
            StringTokenizer tokenizer = new StringTokenizer(value.toString());
            while (tokenizer.hasMoreTokens()) {
                String word = tokenizer.nextToken();
                newKey.set(word);
                context.write(newKey, newValue);
            }
        }
    }

    public static class MaxWordCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

        private final IntWritable newValue = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {


            int count = 0;
            for (IntWritable v : values) {
                count += v.get();
            }
            newValue.set(count);
            context.write(key, newValue);

        }
    }

    public static class MaxWordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        private String word = null;
        private int count = 0;

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int c = 0;
            for (IntWritable v : values) {
                c += v.get();
            }
            if (word == null || count < c) {
                word = key.toString();
                count = c;
            }

        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            if (word != null) {
                context.write(new Text(word), new IntWritable(count));
            }

        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        String uri = "hdfs://192.168.150.128:9000";
        String input = "/maxWord/input";
        String output = "/maxWord/output";
        Configuration conf = new Configuration();
        if (System.getProperty("os.name").toLowerCase().contains("win"))
            conf.set("mapreduce.app-submission.cross-platform", "true");

        FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(output);
        fileSystem.delete(path, true);

        Job job = new Job(conf, "MaxWord");
        job.setJar("./out/artifacts/hadoop_test_jar/hadoop-test.jar");
        job.setJarByClass(MaxWord.class);
        job.setMapperClass(MaxWordMapper.class);
        job.setCombinerClass(MaxWordCombiner.class);
        job.setReducerClass(MaxWordReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPaths(job, uri + input);
        FileOutputFormat.setOutputPath(job, new Path(uri + output));


        boolean ret = job.waitForCompletion(true);
        System.out.println(job.getJobName() + "-----" + ret);
    }
}