Hadoop——使用IDEA开发WordCount on Yarn
程序员文章站
2022-04-29 10:57:21
...
首先将之前配置好的mapred-site.xml和yarn-site.xml拷贝进resources文件夹,并在pom.xml中加入
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.5</version>
</dependency>
具体代码及解释如下:
package com.hadoop.mapreduce.wc;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
/**
* @author Song X
* @date 2020/02/29
*/
public class MyWordCount {
/**
* 用于解析配置文件
*/
private static Configuration configuration = null;
/**
* 用于提交计算任务给集群
*/
private static Job job = null;
public static void init() throws Exception {
configuration = new Configuration(true);
job = Job.getInstance(configuration);
job.setJarByClass(MyWordCount.class);
//起一个job名字
job.setJobName("Test_WordCount");
//指定输入和输出数据路径
Path inputPath = new Path("/data/wc/input");
TextInputFormat.addInputPath(job, inputPath);
//写这句话时要保证这个目录不存在,不然会报错
//为防止我们事先不知道存不存在,可以加下面的代码,若存在则先删除,注意正式开发时,小心使用这个代码,防止误删数据!!!
//if(outputPath.getFileSystem(configuration).exists(outputPath)){
// outputPath.getFileSystem(configuration).delete(outputPath, true);
//}
Path outputPath = new Path("/data/wc/output");
TextOutputFormat.setOutputPath(job, outputPath);
}
private static void compute() throws Exception {
//设置Map计算类
job.setMapperClass(MyMapper.class);
//需要在这里告诉Reduce端传来的key-value都是什么类型,方便Reduce端做序列化和反序列化
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置Reduce计算类
job.setReducerClass(MyReduce.class);
//提交任务,并等待完成
job.waitForCompletion(true);
}
public static void main(String[] args) throws Exception{
init();
compute();
}
}
package com.hadoop.mapreduce.wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;
/**
* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
* 不管在map还是reduce,都会牵扯到Java的序列化和反序列化
* Text和IntWritable就是Hadoop对String和int的序列化包装类型
* 如果是自己开发的的类型,必须实现序列化、反序列化接口
*
* 在map和reduce中,依照所学,排序是很重要的,而排序实际上就是比较,所以自己开发的类型也需要实现比较器接口,定义自己的比较规则
*
* @author Frank
* @date 2020/02/29
*/
public class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
/**
* 比如这样的数据hello hadoop 1
* hello hadoop 2
* 经过map完的记过应该是hello 1
* hello 1
* hadoop 1
* hadoop 1
* 1 1
* 2 1
* 所以你会发现,key永远在变,而value是不变的(正是因为value不变,所以定义为static final对性能是最优的)
*/
/**
* 这就是Map输出类型key-value的value
*/
private static final IntWritable one = new IntWritable(1);
/**
* 这就是输出类型key-value的key
*/
private Text word = new Text();
/**
* hello hadoop 1
* hello hadoop 2
* @param key 是每一行字符串第一个字符面向源文件的偏移量,比如上面第一行的h偏移量是0,第二行的h偏移量是15(换行符也算,第一个h偏移量是0)
* @param value 字符串本身,比如hello hadoop 1。所以我们其实只是要处理value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//java的一个工具类,使用正则的方式对字符串进行空白切割,得到一个包含所有单词的迭代器
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()){
//将单词赋给word
word.set(itr.nextToken());
//将单词和1合并输出
context.write(word, one);
}
}
}
package com.hadoop.mapreduce.wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
* Reducer的输入类型很明显就是Map的输出类型,Reduce的输出类型由我们自己指定
* @author
* @date 2020/02/29
*/
public class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* 回想ReduceTask,是相同的key为一组,一组数据调用一次reduce方法
* 所以reduce收到的数据应该长这样
* hello 1
* hello 1
* hello 1
* ...
*
*/
/**
* 最后key-value结果的value
*/
private IntWritable result = new IntWritable();
/**
*
* @param key
* @param values 我们会得到一个value的迭代器
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
//业务代码其实很简单,就是对同一个key做value的累加
int sum = 0;
for(IntWritable value : values){
sum += value.get();
}
result.set(sum);
context.write(key, result);
}
}
写好代码后点击package将程序打包成jar包i
然后会在目录中生成对应的jar包
将其拷贝到节点上,使用hadoop jar com.hadoop.mapreduce.wc.MyWordCount
进行运行
注意:第三个参数是程序的入口类,第四个参数和第五个参数分别是数据的input path和output path,我在程序中写死了,这里就不用传了
Hadoop入门系列到此就更新结束了
推荐阅读
-
使用IDEA配置Maven搭建开发框架ssm教程
-
使用IDEA配置Maven搭建开发框架ssm教程
-
使用IDEA开发springboot入门程序
-
hadoop使用docker安装和使用(单节点适合开发环境)
-
使用windows下的Eclipse或者IDEA远程连接Linux的Hadoop并运行wordcount
-
Hadoop学习笔记--开发WordCount
-
准备Flink开发环境-使用IntelliJ IDEA+Maven开发Flink项目
-
开发必备--盘点那些我一直在使用的Idea插件
-
[Kotlin]如何使用Intellij IDEA来开发kotlin(持续更新)
-
IDEA POJO开发神器之Groovy的使用详解