MapReduce学习笔记(1)——字符统计
程序员文章站
2022-07-14 13:54:57
...
1 MapReduce 介绍
Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上;
1.1 使用 MapReduce 原因
- 海量数据在单机上处理因为硬件资源限制,无法胜任;
- 而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度;
- 引入mapreduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理
1.2 数据处理过程
- 分布式的运算程序往往需要分成2个阶段;
- 第1 阶段(map阶段) 的 task 并发实例(maptask)各司其职,互不相干,完全并行;
- 第2阶段(reduce阶段)的 task 并发实例(reducetask)互不相干,但是他们的数据依赖于上一个阶段的所有task并发实例的输出;
- MAPREDUCE编程模型只能包含一个map阶段和一个reduce阶段,如果用户的业务逻辑非常复杂,那就只能用多个mapreduce程序,串行运行
2 练习:统计文本字符数
package com.tzb.mr.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
* KEYIN: 默认是 mr 框架所读到的一行文本的起始偏移量,Long
* 但是在Hadoop中有自己的更精简的序列化接口,所以不用直接用Long,用LongWritable
*
*
* VALUEIN:默认是mr 框架所读到的一行文本的内容,String,同上,用Text
*
* KEYOUT:用户自定义逻辑处理完成之后输出数据的 KEY,此处是单词 String
* VALUEOUT: 用户自定义逻辑处理完成后输出数据中的value, 此处是单词次数,Integer
*
* */
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/*
* map阶段的业务逻辑就写在自定义的map()方法中
* maptask 会对每一行输入数据调用一次自定义的map()方法
*
* */
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//将maptask 传给的文本内容先转换为String
String line = value.toString();
//根据空格将这一行切分成单词
String[] words=line.split(" ");
//将单词输出为<单词,1>
for(String word:words){
//将单词作为key,将次数1作为value,便于后续的数据分发,
//可以根据单词分发,以便于相同的单词会到相同的reduce task
context.write(new Text(word),new IntWritable(1));
}
}
}
package com.tzb.mr.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/*
* KEYIN VALUEIN 对应 mapper 输出的KEYOUT,VALUEOUT 类型
*
* KEYOUT ,VALUEOUT 是自定义reduce逻辑处理结果的输出数据类型
* KEYOUT 是单词
* VALUEOUT 是总次数
*
* */
public class WordcountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
/*
*
* key 是一组相同单词kv对的key,<hello,1><hello,1>
*
* */
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count=0;
/* Iterable<IntWritable> iterator = values.iterator();
while(iterator.hasNext()){
count += iterator.next().get();
}*/
for(IntWritable value:values){
count += value.get();
}
context.write(key,new IntWritable(count));
}
}
package com.tzb.mr.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/*
* 相当于yarn 集群的客户端
* 需要在此封装mr程序的相关运行参数,指定jar包
* 最后提交给yarn
*
* */
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
/* conf.set("mapreduce.framework.name","yarn");
conf.set("yarn.resourcemanager.hostname","node1");*/
Job job = Job.getInstance(conf);
// job.setJar("/home/hadoop/wc.jar");
//指定本程序的jar包所在的本地路径
job.setJarByClass(WordcountDriver.class);
//指定本业务job要使用的mapper业务类
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
//指定 mapper 输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定最终输出的数据kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定job的输入原始文件目录
FileInputFormat.setInputPaths(job, new Path(args[1]));
//指定job输出结果目录
FileOutputFormat.setOutputPath(job, new Path(args[2]));
//将job中配置的相关参数以及job所用的java类所在的jar包,提交给yarn去运行
// job.submit();
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
2.1 将工程打包成 jar
2.2 将jar包上传到 node1
这里用了 SSH Secure File Transfer Cliient 上传
2.3 上传要统计的文件
2.4 运行 jar
hadoop jar wordcount.jar com.tzb.mr.wordcount.WordcountDriver /wordcount/input /wordcount/output
18/07/30 10:57:56 INFO client.RMProxy: Connecting to ResourceManager at node1/192.168.154.131:8032
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://node1:9000/wordcount/input already exists
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:266)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:139)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1758)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308)
at com.tzb.mr.wordcount.WordcountDriver.main(WordcountDriver.java:57)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
[aaa@qq.com ~]$ hadoop jar wordcount.jar com.tzb.mr.wordcount.WordcountDriver /wordcount/input /wordcount/output
18/07/30 11:01:31 INFO client.RMProxy: Connecting to ResourceManager at node1/192.168.154.131:8032
18/07/30 11:01:32 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/07/30 11:01:33 INFO input.FileInputFormat: Total input paths to process : 3
18/07/30 11:01:33 INFO mapreduce.JobSubmitter: number of splits:3
18/07/30 11:01:33 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1532959530485_0002
18/07/30 11:01:34 INFO impl.YarnClientImpl: Submitted application application_1532959530485_0002
18/07/30 11:01:35 INFO mapreduce.Job: The url to track the job: http://node1:8088/proxy/application_1532959530485_0002/
18/07/30 11:01:35 INFO mapreduce.Job: Running job: job_1532959530485_0002
18/07/30 11:01:53 INFO mapreduce.Job: Job job_1532959530485_0002 running in uber mode : false
18/07/30 11:01:53 INFO mapreduce.Job: map 0% reduce 0%
18/07/30 11:02:13 INFO mapreduce.Job: map 33% reduce 0%
18/07/30 11:02:28 INFO mapreduce.Job: map 67% reduce 0%
18/07/30 11:02:29 INFO mapreduce.Job: map 100% reduce 0%
18/07/30 11:02:36 INFO mapreduce.Job: map 100% reduce 100%
18/07/30 11:02:36 INFO mapreduce.Job: Job job_1532959530485_0002 completed successfully
18/07/30 11:02:36 INFO mapreduce.Job: Counters: 50
File System Counters
FILE: Number of bytes read=204659
FILE: Number of bytes written=900031
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=103096
HDFS: Number of bytes written=30296
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Killed map tasks=1
Launched map tasks=4
Launched reduce tasks=1
Data-local map tasks=4
Total time spent by all maps in occupied slots (ms)=80861
Total time spent by all reduces in occupied slots (ms)=18871
Total time spent by all map tasks (ms)=80861
Total time spent by all reduce tasks (ms)=18871
Total vcore-milliseconds taken by all map tasks=80861
Total vcore-milliseconds taken by all reduce tasks=18871
Total megabyte-milliseconds taken by all map tasks=82801664
Total megabyte-milliseconds taken by all reduce tasks=19323904
Map-Reduce Framework
Map input records=2062
Map output records=16982
Map output bytes=170688
Map output materialized bytes=204671
Input split bytes=328
Combine input records=0
Combine output records=0
Reduce input groups=2409
Reduce shuffle bytes=204671
Reduce input records=16982
Reduce output records=2409
Spilled Records=33964
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=1223
CPU time spent (ms)=8590
Physical memory (bytes) snapshot=693231616
Virtual memory (bytes) snapshot=8310362112
Total committed heap usage (bytes)=385667072
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=102768
File Output Format Counters
Bytes Written=30296
[aaa@qq.com ~]$