windows下idea编写WordCount程序,并打jar包上传到hadoop集群运行
程序员文章站
2024-02-02 23:22:22
...
前提条件 1.已在虚拟机中安装了hadoop集群环境
版本
windows 10
IntelliJ IDEA 2.16.3.5
centos : 7
hadoop:2.7.0
java: 1.8
大致步骤就是在windows下的idea编辑代码,打成jar包,ftp上传到虚拟机的hadoop集群的master节点上,然后执行作业,得到结果。
1、首先在idea中新建项目hadoop-demo,pom.xml文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hadoopbook</groupId>
<artifactId>hadoop-demo</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
创建的源码目录如下:
代码
MapReduce任务过程分为两个处理阶段:map阶段和reduce阶段、每个阶段都以键值对作为输入和输出,其类型邮程序员来选择。因此,我们需要写三个代码实现:一个map函数,一个reduce函数和一个用来运行作业的代码(在这里是通过一个main函数实现)。
2、map函数由继承Mapper的类来实现: src/main/java/WordcountMapper.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* Created by zxk on 2017/6/29.
*/
public class WordcountMapper extends Mapper<LongWritable, Text,Text,IntWritable>{
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
//得到输入的每一行数据
String line = value.toString();
//通过空格分隔
String[] words = line.split(" ");
//循环遍历输出
for(String word : words){
context.write(new Text(word), new IntWritable(1));
}
}
}
这里定义了一个mapper类,其中有一个map方法。MapReduce框架每读到一行数据,就会调用一次这个map方法。
map的处理流程就是接收一个key value对儿,然后进行业务逻辑处理,最后输出一个key value对。
Mapper<LongWritable, Text, Text, IntWritable>
其中的4个类型分别是:输入key类型、输入value类型、输出key类型、输出value类型
MapReduce框架读到一行数据后以key value形式传进来,key默认情况下是mr框架所读到一行文本的起始偏移量(Long类型),value默认情况下是mr框架所读到的一行的数据内容(String类型)。map( )方法还提供了Context实例用于输出内容的写入。在这种情况下,将数据中的单词按Text对象进行读/写(把word当作键),将遍历到的每个单词的值设为 1 并封装到IntWritable类型中。最终被写入输出记录中。
输出也是key value形式的,是用户自定义逻辑处理完成后定义的key,用户自己决定用什么作为key,value是用户自定义逻辑处理完成后的value,内容和类型也是用户自己决定。
此例中,输出key就是word(字符串类型),输出value就是单词数量(整型)。
这里的数据类型和我们常用的不一样,因为MapReduce程序的输出数据需要在不同机器间传输,所以必须是可序列化的,例如java中的Long类型,Hadoop中定义了自己的可序列化类型LongWritable,String对应的是Text,Integer对应的是IntWritable。
3、reduce函数由继承Reducer的类来实现: src/main/java/WordcountReducer.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Created by zxk on 2017/6/29.
*/
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
Integer count = 0;
for (IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
这里定义了一个Reducer类和一个reduce方法。
当传给reduce方法时,就变为:
Reducer<Text, IntWritable, Text, IntWritable>
4个类型分别指:输入key的类型、输入value的类型、输出key的类型、输出value的类型。
需要注意,reduce方法接收的是:一个字符串类型的key、一个可迭代的数据集。因为reduce任务读取到的map任务处理结果是这样的:
(good,1)(good,1)(good,1)(good,1)
当传给reduce方法时,就变为:
key:good
value:(1,1,1,1)
所以,reduce方法接收到的是同一个key的一组value。
第三部分代码负责运行MapReduce作业
4、主程序:src/main/java/WordCountMapReduce.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* Created by zxk on 2017/6/29.
*/
public class WordCountMapReduce {
public static void main(String[] args)throws Exception{
//创建配置对象
Configuration conf = new Configuration();
//创建job对象
Job job = Job.getInstance(conf,"wordcount");
//设置运行job的类
job.setJarByClass(WordCountMapReduce.class);
//设置mapper 类
job.setMapperClass(WordcountMapper.class);
//设置reduce 类
job.setReducerClass(WordcountReducer.class);
//设置map输出的key value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reduce 输出的 key value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入输出的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交job
boolean b = job.waitForCompletion(true);
if(!b){
System.out.println("wordcount task fail!");
}
}
}
Job对象指定作业执行规范。我们可以用它来控制整个作业运行。我们在Hadoop集群上运行这个作业时,要发代码打包成一个JAR文件(Hadoop在集群上发布这个文件)。不必明确指定JAR文件的名称,在Jab对象的setJarByClass( )方法中传递一个类即可,Hadoop利用这个类来查找包含他的JARwenjian ,进而找到相关的JAR文件。
5、编译打包
在idea中打jar包可以参考这里:http://www.cnblogs.com/blog5277/p/5920560.html
6、运行
先把生成的jar包上传到Hadoop集群服务器,然后在Hadoop服务器的HDFS中准备测试文件:
在我的centos系统的根目录下有个input文件夹,文件夹中保存了两个txt文件,如下:
在HDFS中新建input文件夹,将本地系统input下的test1.txt和test2.txt 保存到hdfs文件系统。
[aaa@qq.com hadoop-2.7.0]# bin/hadoop fs -mkdir -p input
[aaa@qq.com hadoop-2.7.0]# bin/hadoop fs -put ~/input/* input
查看保存在hdfs中的两个文件:
执行 jar包
注意在运行下面这条命令前,/user/root/output 这个目录是不应该存在的,否则Hadoop会报错并拒绝运行作业。在我这里也就是说hdfs的/user/root/下不存在output文件夹。运行完后会自动生成output文件夹,并且output文件夹中有运行后的结果。
hadoop jar hadoop-demo.jar WordCountMapReduce /user/root/input /user/root/output
查看结果,可以看到单词数量统计结果。
以上。
上一篇: IntelliJ IDEA 设置项总结
下一篇: SD卡读写测试程序