欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

windows下idea编写WordCount程序,并打jar包上传到hadoop集群运行

程序员文章站 2024-02-02 23:22:22
...
前提条件 1.已在虚拟机中安装了hadoop集群环境
                  版本
                         windows 10
                         IntelliJ IDEA 2.16.3.5
                         centos : 7
                         hadoop:2.7.0
                         java: 1.8
大致步骤就是在windows下的idea编辑代码,打成jar包,ftp上传到虚拟机的hadoop集群的master节点上,然后执行作业,得到结果。
1、首先在idea中新建项目hadoop-demo,pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hadoopbook</groupId>
    <artifactId>hadoop-demo</artifactId>
    <packaging>jar</packaging>

    <dependencies>
        <dependency>
            <groupId>commons-beanutils</groupId>
            <artifactId>commons-beanutils</artifactId>
            <version>1.9.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

</project>



创建的源码目录如下:
windows下idea编写WordCount程序,并打jar包上传到hadoop集群运行
代码
MapReduce任务过程分为两个处理阶段:map阶段和reduce阶段、每个阶段都以键值对作为输入和输出,其类型邮程序员来选择。因此,我们需要写三个代码实现:一个map函数,一个reduce函数和一个用来运行作业的代码(在这里是通过一个main函数实现)。

2、map函数由继承Mapper的类来实现: src/main/java/WordcountMapper.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* Created by zxk on 2017/6/29.
*/
public class WordcountMapper extends Mapper<LongWritable, Text,Text,IntWritable>{
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
        //得到输入的每一行数据
        String line = value.toString();

        //通过空格分隔
        String[] words = line.split(" ");

        //循环遍历输出
        for(String word : words){
            context.write(new Text(word), new IntWritable(1));
        }
    }

}
 



这里定义了一个mapper类,其中有一个map方法。MapReduce框架每读到一行数据,就会调用一次这个map方法。
map的处理流程就是接收一个key value对儿,然后进行业务逻辑处理,最后输出一个key value对。

Mapper<LongWritable, Text, Text, IntWritable>

其中的4个类型分别是:输入key类型、输入value类型、输出key类型、输出value类型
MapReduce框架读到一行数据后以key value形式传进来,key默认情况下是mr框架所读到一行文本的起始偏移量(Long类型),value默认情况下是mr框架所读到的一行的数据内容(String类型)。map( )方法还提供了Context实例用于输出内容的写入。在这种情况下,将数据中的单词按Text对象进行读/写(把word当作键),将遍历到的每个单词的值设为 1 并封装到IntWritable类型中。最终被写入输出记录中。

输出也是key value形式的,是用户自定义逻辑处理完成后定义的key,用户自己决定用什么作为key,value是用户自定义逻辑处理完成后的value,内容和类型也是用户自己决定。

此例中,输出key就是word(字符串类型),输出value就是单词数量(整型)。
这里的数据类型和我们常用的不一样,因为MapReduce程序的输出数据需要在不同机器间传输,所以必须是可序列化的,例如java中的Long类型,Hadoop中定义了自己的可序列化类型LongWritable,String对应的是Text,Integer对应的是IntWritable。

3、reduce函数由继承Reducer的类来实现: src/main/java/WordcountReducer.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* Created by zxk on 2017/6/29.
*/
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        Integer count = 0;
        for (IntWritable value : values) {
            count += value.get();
        }
        context.write(key, new IntWritable(count));
    }
}



这里定义了一个Reducer类和一个reduce方法。


当传给reduce方法时,就变为:
Reducer<Text, IntWritable, Text, IntWritable>


4个类型分别指:输入key的类型、输入value的类型、输出key的类型、输出value的类型。
需要注意,reduce方法接收的是:一个字符串类型的key、一个可迭代的数据集。因为reduce任务读取到的map任务处理结果是这样的:

(good,1)(good,1)(good,1)(good,1)

当传给reduce方法时,就变为:

key:good
value:(1,1,1,1)

所以,reduce方法接收到的是同一个key的一组value。

第三部分代码负责运行MapReduce作业
4、主程序:src/main/java/WordCountMapReduce.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* Created by zxk on 2017/6/29.
*/
public class WordCountMapReduce {

    public static void main(String[] args)throws Exception{

        //创建配置对象
        Configuration conf = new Configuration();

        //创建job对象
        Job job = Job.getInstance(conf,"wordcount");

        //设置运行job的类
        job.setJarByClass(WordCountMapReduce.class);

        //设置mapper 类
        job.setMapperClass(WordcountMapper.class);

        //设置reduce 类
        job.setReducerClass(WordcountReducer.class);

        //设置map输出的key value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //设置reduce 输出的 key value
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //设置输入输出的路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //提交job
        boolean b = job.waitForCompletion(true);
        if(!b){
            System.out.println("wordcount task fail!");
        }
    }
}



Job对象指定作业执行规范。我们可以用它来控制整个作业运行。我们在Hadoop集群上运行这个作业时,要发代码打包成一个JAR文件(Hadoop在集群上发布这个文件)。不必明确指定JAR文件的名称,在Jab对象的setJarByClass( )方法中传递一个类即可,Hadoop利用这个类来查找包含他的JARwenjian ,进而找到相关的JAR文件。

5、编译打包
在idea中打jar包可以参考这里:http://www.cnblogs.com/blog5277/p/5920560.html

6、运行
先把生成的jar包上传到Hadoop集群服务器,然后在Hadoop服务器的HDFS中准备测试文件:

在我的centos系统的根目录下有个input文件夹,文件夹中保存了两个txt文件,如下:
windows下idea编写WordCount程序,并打jar包上传到hadoop集群运行
在HDFS中新建input文件夹,将本地系统input下的test1.txt和test2.txt 保存到hdfs文件系统。

[aaa@qq.com hadoop-2.7.0]# bin/hadoop fs -mkdir -p  input
[aaa@qq.com hadoop-2.7.0]# bin/hadoop fs -put ~/input/* input


查看保存在hdfs中的两个文件:
windows下idea编写WordCount程序,并打jar包上传到hadoop集群运行

执行 jar包
注意在运行下面这条命令前,/user/root/output 这个目录是不应该存在的,否则Hadoop会报错并拒绝运行作业。在我这里也就是说hdfs的/user/root/下不存在output文件夹。运行完后会自动生成output文件夹,并且output文件夹中有运行后的结果。

 hadoop jar hadoop-demo.jar WordCountMapReduce /user/root/input /user/root/output


查看结果,可以看到单词数量统计结果。
windows下idea编写WordCount程序,并打jar包上传到hadoop集群运行

以上。