Hadoop 统计单词个数
创建项目
按下图所示在resources目录下创建文件夹input,在其中提供文件wc.txt:
注意:不要创建output目录,系统会自动创建。否则会报目录已存在的错。wc.txt文件的内容:
hello hadoop and hello java
I love java
Liang He Cai
Mapper类
public class TokenizerMapper extends Mapper<Object,Text,Text,IntWritable> {
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();//读取一行数据
StringTokenizer st = new StringTokenizer(line);//使用空格分隔
while (st.hasMoreTokens()) {
String word = st.nextToken();//单词
context.write(new Text(word), new IntWritable(1));//单词---> 1
}
}
}
Mapper的4个类型分别是:输入key的类型、输入的value类型、输出key的类型、输出value的类型。
mapper类的map()方法,MapReduce框架每读到一行数据,就会调用一次这个map方法。map的处理流程就是接收一个key value对儿,然后进行业务逻辑处理,最后输出一个key value对儿。map()方法的三个参数key、value、context分别代表map输入的键、map输入的值、map执行的上下文。
MapReduce框架将读到的一行数据侯以key value形式传进来,key默认情况下是MapReduce所读到一行文本的起始偏移量(Long类型),value默认情况下是MapReduce所读到的一行的数据内容(String类型)。
输出也是key value形式的,key是用户自定义逻辑处理完成后定义的key,用户自己决定用什么作为key,value是用户自定义逻辑处理完成后的value,内容和类型也是用户自己决定。统计单词个数案例中,输出key就是word(字符串类型),输出value就是单词数量(整型)。
这里的数据类型和我们常用的不一样,因为MapReduce程序的输出数据需要在不同机器间传输,所以必须是可序列化的,例如Long类型,Hadoop中定义了自己的可序列化类型LongWritable,String对应的是Text,int对应的是IntWritable。
Reducer类
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;//汇总
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));//单词--->数量
}
}
上面代码定义了一个Reducer类和一个reduce方法。
Reducer<Text, IntWritable, Text, IntWritable>4个类型分别指:输入key的类型、输入value的类型、输出key的类型、输出value的类型。
需要注意,reduce方法接收的是:一个字符串类型的key、一个可迭代的数据集。因为reduce任务读取到map任务处理结果是:(good,1)(good,1)(good,1)(good,1)
当传给reduce方法时,就变为:
key:good
value:(1,1,1,1)
所以,reduce方法接收到的是同一个key的一组value。
运行部署
Driver编程套路:
- 获取配置信息,获取job对象实例
- 指定本程序的jar包所在的本地路径
- 关联Mapper/Reducer业务类
- 指定Mapper输出数据的KV类型
- 指定最终输出数据的KV类型
- 指定job的输入原始文件所在目录
- 指定job的输出结果所在目录
- 提交作业
本地模式部署
本地运行模式下:mapreduce程序是被提交给LocalJobRunner在本地以单进程的形式运行,而处理的数据及输出结果可以在本地文件系统,也可以在hdfs上。
实现本地运行,不要带集群的配置文件(本质是mr程序的conf中是否有mapreduce.framework.name=local以及yarn.resourcemanager.hostname参数)。
本地模式非常便于进行业务逻辑的debug,只要在IDE中打断点即可。在windows下想运行本地模式来测试程序逻辑,需要在windows中配置环境变量,将hadoop的lib和bin目录替换成windows平台编译的版本
Configuration conf = new Configuration();
conf.set("mapreduce.framework.name","local");
//本地模式运行mr,输入输出的数据可以在本地,也可以在hdfs
//conf.set("fs.defaultFS","hdfs://hcmaster:9000");
conf.set("fs.defaultFS","file:///");
具体代码:
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration cfg = new Configuration();//创建配置对象
Job job = Job.getInstance(cfg, "word count");//创建job对象
job.setJarByClass(WordCount.class);//创建运行job的类
job.setMapperClass(TokenizerMapper.class);//设置mapper类
job.setReducerClass(IntSumReducer.class); //设置Reduce类
job.setOutputKeyClass(Text.class);//设置Reduce输出的key
job.setOutputValueClass(IntWritable.class);//设置Reduce输出的value
FileInputFormat.addInputPath(job, new Path(args[0]));//设置输入路径
Path op1 = new Path(args[1]);
FileSystem fs = FileSystem.get(cfg);
if (fs.exists(op1)) {
fs.delete(op1, true);
System.out.println("存在此输出路径,已删除!!!");
}
FileOutputFormat.setOutputPath(job, op1);//设置输出路径
boolean b = job.waitForCompletion(true); //提交job
System.exit(b ? 0 : 1);
}
}
上面代码args[0]的值为src/main/resources/input/,args[1]的值为src/main/resources/output,可以按照下图所示进行配置:
运行程序,在resources目录下生成output文件夹,其中part-r-0000是任务结果,_SUCCESS是任务成功标志。打开part-r-00000,结果如下图所示:
将Mapper中间结果作为最终结果输出
在src/main/resources/input/目录下面准备三个写有单词的文本文件,
public static void main(String[] args) throws Exception {
args=new String[2];
args[0]="src/main/resources/input/";
args[1]="src/main/resources/output";
Configuration cfg = new Configuration();
Job job = Job.getInstance(cfg);
job.setJobName( "word count");//作业名称
job.setJarByClass(WordCountTest2.class);
job.setInputFormatClass(TextInputFormat.class);//设置输入格式
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(WordCountMapper.class);
job.setNumReduceTasks(0); // 设置Reducer的个数为0
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
小知识:
当job.setNumReduceTasks(0)时,即没有reduce阶段,此时唯一影响的就是map结果的输出方式。
- 如果有reduce阶段,map的结果被flush到硬盘,作为reduce的输入;reduce的结果将被OutputFormat的RecordWriter写到指定的地方(setOutputPath),作为整个程序的输出。
- 如果没有reduce阶段,map的结果将直接被OutputFormat的RecordWriter写到指定的地方(setOutputPath),作为整个程序的输出。
OutputFormat可以是普通的FileOutputFormat,也可以是一个空的OutputFormat如NullOutputFormat。所以有无reduce和OutputFormat的多样性将组合出现以下情形(这个组合其实没什么意义,只是为了更加清楚而已)
- 有reduce
reduce的结果不需要输出到文件,如reduce里直接将结果插入HBase,此时可以采用NullOutputFormat,当然就不需要setOutputPath。reduce的结果需要输出到文件,如采用FileOutputFormat,需要setOutputPath。 - 无reduce
map的结果需要不输出到文件,如map里直接将结果插入HBase,此时可以采用NullOutputFormat,当然就不需要setOutputPath。map的结果需要输出到文件,如采用FileOutputFormat,需要setOutputPath。
有无reduce决定map结果的输出方式。有reduce时reduce的结果作为整个程序的输出;无reduce时,map的结果作为整个程序的输出。如果能在map阶段解决的问题尽量不要丢给直接输出的reduce。如NullOutputFormat层面上OutputFormat的不需要指定OutputPath;其他如FileOutputFormat需要指定,不然报错。
运行程序,结果如下图所示:
集群模式部署
在资源路径下面添加core-site.xml:
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://hcmaster:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/local/hadoop/hadoop-3.1.2/data/tmp/</value>
</property>
</configuration>
修改测试代码如下所示:
public class WordCountTest extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration cfg = new Configuration(); //获取我们的配置
Job job = Job.getInstance(cfg, this.getClass().getSimpleName());
job.setJarByClass(WordCountTest.class);
//设置map与需要设置的内容类 + 输出key与value
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reduce
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置input与output
FileInputFormat.addInputPath(job, new Path(args[0]));
Path op1 = new Path(args[1]);
FileOutputFormat.setOutputPath(job, op1);
FileSystem fs = FileSystem.get(cfg);
if (fs.exists(op1)) {
fs.delete(op1, true);
System.out.println("存在此输出路径,已删除!!!");
}
//将job交给Yarn
boolean issucess = job.waitForCompletion(true);
return issucess ? 0 : 1;
}
public static void main(String[] args) throws Exception {
//需要在resources下面提供core-site.xml文件
args = new String[]{ "test/", "test/output/"};
int status = new WordCountTest().run(args); //跑我们的任务
System.exit(status);
}
}
在pom.xml中添加如下语句:
<packaging>jar</packaging>
打包:
找到打包好的jar包,并将上传到Linux中,然后运行