Hadoop的MR on Yarn
1.MapReduce是什么
MapReduce
Map 映射 ?个 生产
Reduce 聚合 默认1个 生产
单词统计案例代码:
map
package com.ccj.pxj.word;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 1. Mapper Interface: hadoop.mapred包下 (hadoop1.x版本提供的实现)
* 2. Mapper Class : hadoop mapreduce包下(hadoop2.x版本提供的实现)
*
* 相关的组件导入Jar包,都选择com.apache.hadoop.mapreduce包下!
*
* 3. MapReduce不管是在Map阶段还是Reduce阶段,处理的数据,都是key-value格式!
* 输入数据: key-value
* 输出数据: key-value
* KEYIN: 输入数据的key类型,每行的偏移量,整数! Long===> LongWritable
* VALUEIN: 输入数据的value类型 , String====> Text
* KEYOUT: 输出数据的key类型, String
* VALUEOUT: 输出数据的value类型,int===》 IntWritable
*
* key-value都是需要在多台机器中进行传输的! 要求key和value都要使用序列化类型!
* 传统的序列化,必须实现java.io.Serrilizable接口!
* Hadoop提供了新的序列化机制! 就是Writable接口!
*
* 总结: key 和 value的数据类型,必须实现Writable接口
*
* 4. RecordReader,负责将数据,从文件的切片中读取到Map程序中!
* 默认情况,使用RecordReader,一次只会读取一行内容!
* 默认情况,系统使用的RecordReader将数据的偏移量(offset)作为key
* hello how are you
* 5. map()方法是自己定义的处理逻辑,会对每一组key-value循环调用!
* context对象,是当前Mapper任务执行的上下文(环境和配置)
* @author lenovo
*
*/
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text text=new Text();
IntWritable one=new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
System.out.println("key的值是:"+key+",value的值是:"+value);
// //value代表一行文本
//将当前的这行文本,根据空格,进行切分!
// 获取一行
String line=value.toString();
String[] words = line.split(" ");
for (String word : words) {
text.set(word);
one.set(1);
context.write(text, one);
}
}
}
reduce
package com.ccj.pxj.word;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* 1. KEYIN, VALUEIN : 对应的是Map任务的输出的key-value
* KEYOUT, VALUEOUT: 最总合并和汇总完结果,输出的key-value
* KEYOUT: Text
* VALUEOUT: IntWritable
*
* 2. 针对每一对 KEYIN, VALUEIN ,也会循环调用 reduce方法
*
* 3. Map ===Shuffle(分组,分区,排序)==Reducer
* Map的输出是: hello-1,I-1,I-1,hello-1
* Reduce的输入: I-{1,1}
*/
public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable v=new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int count=0;
for (IntWritable value : values) {
count+=value.get();
}
v.set(count);
context.write(key, v);
}
}
package com.ccj.pxj.word;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* /*
* 1. 组件:
* Job: 一个MapReduce程序,就是一个Job,一个Job包含了Map阶段和Reduce阶段
* MapTask: Map阶段,进行运算的主进程;
* 调用Mapper类的相关方法!
* ReduceTask: Reduce阶段,进行运算的主进程;
* 调用Reducer类的相关方法!
*
* 客户端需要,将Job(MR任务)提交到YARN平台运行!
* Driver: 用来提交Job
* Driver类的主进程(main方法)中,执行Job的提交!
*
* 2. 需求,是单词统计!
* MapTask中处理出入的数据,进行每个单词的统计!
* ReduceTask中,对Map阶段的结果进行合并和归总,将最终结果输出!
*
* 3. 输入数据的格式: 使用UTF-8进行编码!
* 保存时,使用UTF-8无BOM格式保存!
*
*
*
*
* @author lenovo
*
*/
public class WCDriver {
public static void main(String[] args) throws Exception {
args=new String[] {"F:\\hadoopTest1\\wordInput","F:\\hadoopTest1\\wordoutput_88"};
// args=new String[] {"hdfs://pxj104:9000/user/pxj/wordInput","file:///F:\\hadoopTest1\\wordoutput2"};
// args=new String[] {"hdfs://pxj104:9000/user/pxj/wordInput","hdfs://pxj104:9000/user/pxj/wordoput1"};
Configuration conf = new Configuration();
//conf.set("fs.defaultFS", "hdfs://pxj104:9000");
//conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
// System.setProperty("HADOOP_USER_NAME", "pxj");
// 1.创建Job对象,代表此次WordCount的MR程序
Job job=Job.getInstance(conf);
// 2. 设置配置信息,设置到Configuration对象中
Configuration configuration=new Configuration();
// 3. MR运行的所有环节信息,使用哪个Mapper类,进行Map阶段运算,等等..
// 设置Job所在Jar包的路径,和Job的名称
job.setJobName("wordcount");
job.setJarByClass(WCDriver.class);
// 4. 设置Mapper类
job.setMapperClass(WCMapper.class);
// 5. 设置Reducer类
job.setReducerClass(WCReducer.class);
// 6. 设置计算数据的输入目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 7. 设置计算数据的输出目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 8. 设置Mapper阶段输出的结果的key-value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置自定义的输入格式
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 3145728);
// 设置Combiner
job.setCombinerClass(WCReducer.class);
// 9. 设置最终产生结果(reduce阶段)输出的结果的key-value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 10. 将Job提交运行!
boolean result = job.waitForCompletion(true);
if(result) {
System.out.println("程序执行完毕!");
}
System.exit(result?0:1);
}
}
MR2.x 架构设计(MR on Yarn流程、mr提交job)
如图:
Yarn: RM NM
RM:
applications Manager 应用程序管理器
resource scheduler 资源memory+cpu调度器
红色框是什么?
container 虚拟的概念 属于NM节点上,专门用来MR、spark等技术的最小单元
map task
reduce task
2.1.用户向Yarn提交应用程序(job app application),jar文件、sql;
其中包裹ApplicationMaster程序、启动ApplicationMaster的命令等等
2.2.RM为该job分配第一个container,运行job的ApplicationMaster
2.3.App Master向applications Manager注册,这样就可以
在RM WEB界面查询这个job的运行状态
2.4.App Master采用轮询的方式通过RPC协议向RM申请和领取资源
2.5.一旦App Master拿到资源,就对应的与NM通信,要求启动任务
2.6.NM为任务设置好运行环境(jar包等),将任务启动命令写在一个脚本里。
并通过该脚本启动任务 task
2.7.各个task通过rpc协议向App Master汇报自己的状态和进度,以此让App Master随时掌握各个task的运行状态。
从而在task运行失败重启任务。
2.8.App Master向applications Manager注销且关闭自己
简单地说:
分为两步:
启动App Master,申请资源;
运行任务,直到任务运行完成。
三、map task个数
Map 映射 ?个 生产
Reduce 聚合 默认1个 生产
参考博客:
http://blog.itpub.net/30089851/viewspace-2095837/
[aaa@qq.com /home/pxj/app/hadoop]$hadoop jar \
> ./share/hadoop/mapreduce2/hadoop-mapreduce-examples-2.6.0-cdh5.16.2.jar \
> wordcount \
> /wordcount/input /wordcount/output
19/12/09 00:02:01 INFO input.FileInputFormat: Total input paths to process : 1
19/12/09 00:02:01 INFO mapreduce.JobSubmitter: number of splits:1
Job Counters
Launched map tasks=1
Launched reduce tasks=1
[aaa@qq.com /home/pxj/app/hadoop]$hadoop fs -put /home/pxj/2.log /wordcount/input
19/12/09 00:07:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/12/09 00:07:04 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/12/09 00:07:05 INFO input.FileInputFormat: Total input paths to process : 2
19/12/09 00:07:05 INFO mapreduce.JobSubmitter: number of splits:2
Launched map tasks=2
Launched reduce tasks=1
map task:
1.log 86字节 1块
m 1份
2.log 20字节 1块
m 2份
认为: map task个数和 文件的数量有关
3.log 130M 2块
m : 4份?
尽量在生产上控制一个文件的大小稍微小于一个blocksize
比如128M 文件为120M
假如我们设定hdfs的块的大小是64mb,如果我们输入有三个文件,
大小分别是3mb、65mb和127mb,那么mapreduce会把3mb文件分为一个输入分片
(input split),65mb则是两个输入分片(input split)
而127mb也是两个输入分片(input split),
换句话说我们如果在map计算前做输入分片调整,
例如合并小文件,那么就会有5个map任务将执行,
而且每个map执行的数据大小不均,这个也是mapreduce优化计算的一个关键点。
???
应该在生产要考虑你的文件
是什么文件格式: txtfile orc parquet
压缩格式: gzip snappy lzo
hive: orc/parquet + bzip2
hbase: hfile + snappy
参考网站
https://ruozedata.github.io/2018/04/18/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%8E%8B%E7%BC%A9%EF%BC%8C%E4%BD%A0%E4%BB%AC%E7%9C%9F%E7%9A%84%E4%BA%86%E8%A7%A3%E5%90%97%EF%BC%9F/
四、shuffle过程
解释:
Map方法之后,数据首先进入到分区方法,把数据标记好分区,然后把数据发送到环形缓冲区;环形缓冲区默认大小100m,环形缓冲区达到80%时,进行溢写;溢写前对数据进行排序,排序按照对key的索引进行字典顺序排序,排序的手段快排;溢写产生大量溢写文件,需要对溢写文件进行归并排序;对溢写的文件也可以进行Combiner操作,前提是汇总操作,求平均值不行。最后将文件按照分区存储到磁盘,等待Reduce端拉取。
每个Reduce拉取Map端对应分区的数据。拉取数据后先存储到内存中,内存不够了,再存储到磁盘。拉取完所有数据后,采用归并排序将内存和磁盘中的数据都进行排序。在进入Reduce方法前,可以对数据进行分组操作。
五、压缩格式和文件格式
文件格式
压缩格式
Compression
压缩比
解压速度
压缩比=压缩/解压速度