MapReduce框架详解
程序员文章站
2022-06-30 11:46:30
...
觉得有帮助的,请多多支持博主,点赞关注哦~
文章目录
MapReduce框架详解
1、Job提交源码分析
waitForCompletion()
submit();
// 1 建立连接
connect();
// 1)创建提交 job 的代理
new Cluster(getConfiguration());
// (1)判断是本地 yarn 还是远程
initialize(jobTrackAddr, conf);
// 2 提交 job
submitter.submitJobInternal(Job.this, cluster)
// 1)创建给集群提交数据的 Stag 路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)获取 jobid ,并创建 job 路径
JobID jobId = submitClient.getNewJobID();
// 3)拷贝 jar 包到集群
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 5)向 Stag 路径写 xml 配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 6)提交 job,返回提交状态
status = submitClient.submitJob(jobId,submitJobDir.toString(),job.getCredentials());
2、输入端InputFormat
2.1、FilelnputFormat切片原则(默认)
2.1.1、切片的原则
FileInputFormat 中默认的切片机制:
- 简单地按照文件的内容长度进行切片
- 切片大小,默认等于 block 大小
- 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
比如待处理数据有两个文件:
file1.txt 320M
file2.txt 10M
经过 FileInputFormat 的切片机制运算后,形成的切片信息如下:
file1.txt.split1-- 0~128
file1.txt.split2-- 128~256
file1.txt.split3-- 256~320
file2.txt.split1-- 0~10M
2.1.2、修改切片大小
通过分析源码,在 FileInputFormat 中,计算切片大小的逻辑:Math.max(minSize,Math.min(maxSize, blockSize));
切片主要由这几个值来运算决定
mapreduce.input.fileinputformat.split.minsize=1 默认值为 1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值 Long.MAXValue
因此,默认情况下,切片大小=blocksize。
maxsize(切片最大值):参数如果调得比 blocksize 小,则会让切片变小,而<就等于配置的这个参数的值。
minsize(切片最小值):参数调的比 blockSize 大,则可以让切片变得比 blocksize 还大。
详解:
计算每片的大小,三个数据:minSize,maxSize,blockSize
Math.max(minSize, Math.min(maxSize, blockSize));
minSize:10m maxSize:200m--->每片大小:128M
minSize:10m maxSize:100m--->每片大小:100M
maxSize<blockSize: 每片的大小 小于128M
hello.txt :10byte -->分片:1片
修改了每片的maxSize:4Byte,minSize-->分片:3片
每片size >128m?????
minSize:200m maxSize:250m ?-->200m
修改每片的大小:结论
1)maxSize<blockSize: 每片的大小 小于128M
2)minSize >blockSize: 每片的大小 大于 128m
分片数:影响 mapTask的并行度
思考:是不是并行度越高,就越好?
并不是。需要考虑数据量的多少及机器的配置。如果数据量很少,可能任务启动的时间都远远超过数据的处理时间。同样也不是越少越好。
2.1.3、获取切片信息 API
// 根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
// 获取切片的文件名称
String name = inputSplit.getPath().getName();
2.1.4、代码测试
package com.biubiubiu.demo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class TestSplit {
static class CustomMapper extends Mapper<Object, Text,Text, Text>{
Text text= new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
text.set(value.toString());
context.write(text,text);
}
}
static class CustomReducer extends Reducer<Text, Text,Text, Text>{
Text text= new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
text.set(key.toString());
context.write(text,text);
}
}
//第3部分,编写Driver(main方法)
public static void main(String[] args) {
try{
// if (args.length<4){
// System.out.println("至少4个参数");
// System.exit(0);
// }
//1)创建Configuration对象,指明namespace的路径
Configuration conf = new Configuration();
conf.set("dfs.defaultFS","hdfs://192.168.153.231:9000");
conf.set("mapreduce.input.fileinputformat.split.minsize","1");//最小片大小设置
conf.set("mapreduce.input.fileinputformat.split.maxsize","30");//最大片大小设置
//
//2)创建Job
Job job =Job.getInstance(conf,"testSplit");
job.setJarByClass(TestSplit.class);
//3)自定义Mapper进行输出参数(key,value)的配置
job.setMapperClass(CustomMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//4)自定义Reducer进行参数的配置
job.setReducerClass(CustomReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//5)配置处理的文件的路径(input)以及处理结果存放的路径(output)
FileInputFormat.addInputPath(job,new Path("d_split/hello.txt"));
FileOutputFormat.setOutputPath(job,new Path("d_split/2"));
//6)让程序执行
boolean result=job.waitForCompletion(true);
if(result){
System.out.println("执行正确!!!");
}else{
System.out.println("执行失败.....");
}
}catch(Exception ex){
System.out.println("执行出错:"+ex.getMessage());
ex.printStackTrace();
}
}
}
2.2、CombineTextinputFomat切片原则
2.2.1、关于大量小文件的优化策略
2.2.1.1、缺点
默认情况下 TextInputformat 对任务的切片机制是按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个 maptask,这样如果有大量小文件,就会产生大量的 maptask,处理效率极其低下。
2.2.1.2、优化策略
- 最好的办法,在数据处理系统的最前端(预处理/采集),将小文件先合并成大文件,再上传到 HDFS 做后续分析。
- 补救措施:如果已经是大量小文件在 HDFS 中了,可以使用另一种 InputFormat来做切片(CombineTextInputFormat),它的切片逻辑跟 TextFileInputFormat 不同:它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个 maptask。
- 优先满足最小切片大小,不超过最大切片大小
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
举例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m
具体实现步骤
// 如果不设置 InputFormat,它默认用的是 TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class)
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
因为Hadoop设计就是适合大文件,不适合小文件,所以应该尽量避免大量小文件,不生成小文件。
总之,处理思路就是将小文件合并。
2.2.2、说明
- 是对集群中大量小文件的优化策略
- 文件:在hdfs中
- 它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个maptask
- 设置切片大小原则(先满足minsp1it,再满足maxSplit)job.setInputFormatclass(CombineTextInputFormat.class)
SPLIT_MAXSIZE=Long.MAX_VALUE SPLIT_MINSIZE=1
CombineTextInputFormat.setMaxInputSplitsize(job,12810241024);//最大
合并时,考虑:
1)n个小文件,合并成【1】个文件
job.setInputFormatClass(CombineTextInputFormat.class)
默认:TextInputFormat
2)合并,希望每个合并后的文件是128M
job.setInputFormatClass(CombineTextInputFormat.class)
CombineTextInputFormat.setMaxInputSplitSize(job, 128*1024*1024); //最大
2.2.3、案例
代码如下:
package com.biubiubiu.demo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 合并追加文件
* /input/1/2 中有多个文件
* [aaa@qq.com hadoop_data]$ hadoop jar MapredTest-1.0-SNAPSHOT.jar com.biubiubiu.demo.TestSplit2 /input/1/2 /output/split
* [aaa@qq.com hadoop_data]$ hdfs dfs -cat /output/split/*
*/
public class TestSplit2 {
static class CustomMapper extends Mapper<Object, Text,Text, NullWritable>{
Text text= new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
text.set(value.toString());
context.write(text,NullWritable.get());
context.getInputSplit().getLength();
context.getInputSplit().getLocationInfo();
context.getInputSplit().getLocations();
}
}
//第3部分,编写Driver(main方法)
public static void main(String[] args) {
try{
// if (args.length<2){
// System.out.println("至少2个参数");
// System.exit(0);
// }
//1)创建Configuration对象,指明namespace的路径
Configuration conf = new Configuration();
conf.set("dfs.defaultFS","hdfs://192.168.153.231:9000");
//
//2)创建Job
Job job =Job.getInstance(conf,"CombineTextInputFormat");
job.setJarByClass(TestSplit2.class);
//3)自定义Mapper进行输出参数(key,value)的配置
job.setMapperClass(CustomMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
//4)小文件优化
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 40*1024*1024); //最大
//5)配置处理的文件的路径(input)以及处理结果存放的路径(output)
FileInputFormat.setInputPaths(job,new Path("d_in"));//"args[0]"
FileOutputFormat.setOutputPath(job,new Path("d_out/3"));//"args[1]"
//6)让程序执行
boolean result=job.waitForCompletion(true);
if(result){
System.out.println("执行正确!!!");
}else{
System.out.println("执行失败.....");
}
}catch(Exception ex){
System.out.println("执行出错:"+ex.getMessage());
ex.printStackTrace();
}
}
}
-
不使用合并优化,没有进行小文件合并前,会切割为4个
-
使用合并,但未指定每片的最大值,会切割为1个,一个maptask处理
-
设定了每个文件的大小(最大值),按照最大值合并切割
执行结果如下:
3、程序过程分析
3.1、Task任务分析
3.1.1、Maplask工作原理
1)MapTask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度.
2)一个job的map阶段MapTask并行度(个数),由客户端提交job时的切片个数决定
3.1.2、ReduceTask工作原理
1)设置ReduceTask并行度(个数)reducetask的并行度同样影响整个job的执行并发度和执行效率,但与maptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置:
//默认值是1,手动设置为4
job.setNumReduceTasks(4);
2)规则:
(1)reducetask=0,表示没有reduce阶段,输出文件个数和map个数一致。
(2)reducetask默认值就是1,所以输出文件个数为一个。
(3)如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜(4)reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个
reducetask。
(5)具体多少个reducetask,需要根据集群性能而定。
(6)如果分区数不是1,但是reducetask为1,是否执行分区过程。
答案是:不执行分区过程。因为在maptask的源码中,执行分区的前提是先判断reduceNum个数是否大于1。
不大于1肯定不执行。
3.2、自定义分区Partition
3.2.1、使用场合
使用场景:要求将统计结果按照条件输出到不同文件中(分区)。
比如:将统计结果按照年份不同输出到不同文件中(分区)
3.2.2、自定义分区步骤
1)自定义分区类,重写getPartition方法
//自定义分区类
public static class Custompartitioner extends Partitioner<Intwritable,Doublewritable>{
@override
public int getpartition(Intwritable intwritable,Doublewritable doublewritable,int numpartitions){
int year=intwritable.get);
if(year==2016){
return 0;
}else if(year==2017){
return 1;
}else if(year==2018){
return 2;
}else{
return 3;
}
}
}
3.3、实现本地化合井Combiner
3.3.1、Combiner原因
1)combiner:每一个maptask的输出进行局部汇总,以减小网络传输量
2)combiner组件的父类就是Reducer
3.3.2、Combiner与Reducer的区别
【所处的执行位置不同】
Combiner是在每一个maptask所在的节点运行(本地执行);
Reducer是接收全局所有Mapper的输出结果;
3.3.3、Combiner案例
1)自定义Combiner
2)给job配置Combiner
3.4、自定义排序TopN
3.5、自定义分组GroupingComparator
3.6、多文件的Join操作
4、输出端Outputformat
未完待续~
觉得有帮助的,请多多支持博主,点赞关注哦~