欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

利用Hadoop MapReduce实现单词统计——Wordcount

程序员文章站 2022-05-01 11:02:12
...

Hadoop MapReduce实现单词统计——Wordcount


环境:Centos 7系统+IDEA


本程序是利用IDEA中的Maven来实现的,主要是因为Maven省去了在本地搭建Hadoop环境的麻烦,只需要在配置文件中进行相应的配置即可。如果你还没有安装IDEA,可以参考Linux下如何安装IntelliJ IDEA

(1)新建java Project ,并命名为WordCount。如果不知道如何使用IDEA的Maven新建java工程,可参考利用IDEA的Maven创建第一个java程序

在pom.xml中添加项目所需要的依赖项,内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.miaozhen.lyf</groupId>
    <artifactId>Test</artifactId>
    <version>1.0-SNAPSHOT</version>
<!-- 此处以上是创建时默认生成的,下面是添加的内容 -->
    <repositories>
        <repository>
            <id>apache</id>
            <url>http://maven.apache.org</url>
        </repository>
    </repositories>

    <properties>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

        <hadoop.version>2.6.4</hadoop.version>
        <parquet.version>1.9.0</parquet.version>
        <fastjson.version>1.2.29</fastjson.version>
        <commons.version>3.5</commons.version>
        <junit.version>4.12</junit.version>

        <shade.plugin.version>3.0.0</shade.plugin.version>
        <compiler.plugin.version>3.6.1</compiler.plugin.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>1.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
    </dependencies>

    <build>
        <finalName>wordcount</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>${shade.plugin.version}</version>
                <configuration>
                    <outputDirectory>/tmp</outputDirectory>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${compiler.plugin.version}</version>
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                    <encoding>${project.build.sourceEncoding}</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>


</project>


(2)创建WCMapper.java,代码如下:

package com.miaozhen.dmp.test.wordcount;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    private Text outputKey = new Text();
    private final LongWritable outputValue = new LongWritable(1);

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        StringTokenizer st = new StringTokenizer(value.toString());
        while(st.hasMoreTokens()){
            outputKey.set(st.nextToken());
            context.write(outputKey,outputValue);
        }
    }
}


(3)创建WCReducer.java,代码如下:

package com.miaozhen.dmp.test.wordcount;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    private LongWritable outputValue = new LongWritable();

    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        Long count = 0L;
        for(LongWritable value: values){
            count += value.get();
        }
        outputValue.set(count);
        context.write(key,outputValue);
    }
}


(4)创建WCRunner.java,代码如下:

package com.miaozhen.dmp.test.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;

public class WCRunner extends Configured implements org.apache.hadoop.util.Tool {

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        Job wcjob = Job.getInstance(conf);

        wcjob.setJarByClass(WCRunner.class);
        wcjob.setMapperClass(WCMapper.class);
        wcjob.setReducerClass(WCReducer.class);

        wcjob.setOutputKeyClass(org.apache.hadoop.io.Text.class);
        wcjob.setOutputValueClass(LongWritable.class);

        wcjob.setMapOutputKeyClass(org.apache.hadoop.io.Text.class);
        wcjob.setMapOutputValueClass(LongWritable.class);

        FileInputFormat.addInputPath(wcjob, new Path(args[0]));
        FileOutputFormat.setOutputPath(wcjob, new Path(args[1]));

        boolean rt = wcjob.waitForCompletion(true);

        return rt? 0: 1;
    }

    public static void main(String[] args) throws Exception {
        System.out.println(args[0]+args[1]);
        Configuration conf = new Configuration();
        int retnum = ToolRunner.run(conf, new WCRunner(), args);
    }
}


(5)运行

首先Run->Edit Configurations,进行相关的配置,主要是输入输出路径(注意,output文件夹是自动生成的,不需要配自己创建,如果已经存在,程序会报错)。我这里的input路径如下:

利用Hadoop MapReduce实现单词统计——Wordcount

利用Hadoop MapReduce实现单词统计——Wordcount


最后点击Run->Run “wordcount”运行即可。

(6)结果

输入的文本信息和输出的结果如下:

利用Hadoop MapReduce实现单词统计——Wordcount

利用Hadoop MapReduce实现单词统计——Wordcount