欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MapReduce框架详解

程序员文章站 2022-06-30 11:46:30
...

觉得有帮助的,请多多支持博主,点赞关注哦~

MapReduce框架详解

1、Job提交源码分析

MapReduce框架详解

waitForCompletion()
submit();
// 1 建立连接
connect();
// 1)创建提交 job 的代理
new Cluster(getConfiguration());
// (1)判断是本地 yarn 还是远程
initialize(jobTrackAddr, conf);
// 2 提交 job
submitter.submitJobInternal(Job.this, cluster)
// 1)创建给集群提交数据的 Stag 路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)获取 jobid ,并创建 job 路径
JobID jobId = submitClient.getNewJobID();
// 3)拷贝 jar 包到集群
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 5)向 Stag 路径写 xml 配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 6)提交 job,返回提交状态
status = submitClient.submitJob(jobId,submitJobDir.toString(),job.getCredentials());

MapReduce框架详解

2、输入端InputFormat

2.1、FilelnputFormat切片原则(默认)

2.1.1、切片的原则

FileInputFormat 中默认的切片机制:

  1. 简单地按照文件的内容长度进行切片
  2. 切片大小,默认等于 block 大小
  3. 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

比如待处理数据有两个文件:

file1.txt 320M
file2.txt 10M

经过 FileInputFormat 的切片机制运算后,形成的切片信息如下:

file1.txt.split1-- 0~128
file1.txt.split2-- 128~256
file1.txt.split3-- 256~320
file2.txt.split1-- 0~10M

2.1.2、修改切片大小

通过分析源码,在 FileInputFormat 中,计算切片大小的逻辑:Math.max(minSize,Math.min(maxSize, blockSize));
切片主要由这几个值来运算决定

mapreduce.input.fileinputformat.split.minsize=1 默认值为 1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值 Long.MAXValue

因此,默认情况下,切片大小=blocksize
maxsize(切片最大值):参数如果调得比 blocksize 小,则会让切片变小,而<就等于配置的这个参数的值。
minsize(切片最小值):参数调的比 blockSize 大,则可以让切片变得比 blocksize 还大。

详解:

计算每片的大小,三个数据:minSize,maxSize,blockSize

Math.max(minSize, Math.min(maxSize, blockSize));
minSize:10m   maxSize:200m--->每片大小:128M
minSize:10m   maxSize:100m--->每片大小:100M
maxSize<blockSize: 每片的大小  小于128M

hello.txt :10byte  -->分片:1片
修改了每片的maxSize:4Byte,minSize-->分片:3片

每片size  >128m?????
minSize:200m   maxSize:250m  ?-->200m

修改每片的大小:结论
1)maxSize<blockSize: 每片的大小  小于128M
2)minSize >blockSize: 每片的大小  大于 128m

分片数:影响   mapTask的并行度

思考:是不是并行度越高,就越好?
并不是。需要考虑数据量的多少及机器的配置。如果数据量很少,可能任务启动的时间都远远超过数据的处理时间。同样也不是越少越好。

2.1.3、获取切片信息 API

// 根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
// 获取切片的文件名称
String name = inputSplit.getPath().getName();

2.1.4、代码测试

package com.biubiubiu.demo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TestSplit {

    static class CustomMapper extends Mapper<Object, Text,Text, Text>{
        Text text= new Text();
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            text.set(value.toString());
            context.write(text,text);
        }
    }

    static class CustomReducer extends Reducer<Text, Text,Text, Text>{
        Text text= new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            text.set(key.toString());
            context.write(text,text);
        }
    }

    //第3部分,编写Driver(main方法)
    public static void main(String[] args) {
       try{
//           if (args.length<4){
//               System.out.println("至少4个参数");
//               System.exit(0);
//           }
           //1)创建Configuration对象,指明namespace的路径
           Configuration conf = new Configuration();
           conf.set("dfs.defaultFS","hdfs://192.168.153.231:9000");
           conf.set("mapreduce.input.fileinputformat.split.minsize","1");//最小片大小设置
           conf.set("mapreduce.input.fileinputformat.split.maxsize","30");//最大片大小设置

           //
           //2)创建Job
           Job job =Job.getInstance(conf,"testSplit");
           job.setJarByClass(TestSplit.class);

           //3)自定义Mapper进行输出参数(key,value)的配置
           job.setMapperClass(CustomMapper.class);
           job.setMapOutputKeyClass(Text.class);
           job.setMapOutputValueClass(Text.class);

           //4)自定义Reducer进行参数的配置
           job.setReducerClass(CustomReducer.class);
           job.setOutputKeyClass(Text.class);
           job.setOutputValueClass(Text.class);

           //5)配置处理的文件的路径(input)以及处理结果存放的路径(output)
           FileInputFormat.addInputPath(job,new Path("d_split/hello.txt"));
           FileOutputFormat.setOutputPath(job,new Path("d_split/2"));

           //6)让程序执行
           boolean result=job.waitForCompletion(true);
           if(result){
               System.out.println("执行正确!!!");
           }else{
               System.out.println("执行失败.....");
           }
       }catch(Exception ex){
           System.out.println("执行出错:"+ex.getMessage());
           ex.printStackTrace();
       }


    }
}

2.2、CombineTextinputFomat切片原则

2.2.1、关于大量小文件的优化策略

2.2.1.1、缺点

默认情况下 TextInputformat 对任务的切片机制是按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个 maptask,这样如果有大量小文件,就会产生大量的 maptask,处理效率极其低下。

2.2.1.2、优化策略
  1. 最好的办法,在数据处理系统的最前端(预处理/采集),将小文件先合并成大文件,再上传到 HDFS 做后续分析。
  2. 补救措施:如果已经是大量小文件在 HDFS 中了,可以使用另一种 InputFormat来做切片(CombineTextInputFormat),它的切片逻辑跟 TextFileInputFormat 不同:它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个 maptask。
  3. 优先满足最小切片大小,不超过最大切片大小
    CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
    CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
    举例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m
具体实现步骤
// 如果不设置 InputFormat,它默认用的是 TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class)
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

因为Hadoop设计就是适合大文件,不适合小文件,所以应该尽量避免大量小文件,不生成小文件。
总之,处理思路就是将小文件合并。

2.2.2、说明

  1. 是对集群中大量小文件的优化策略
  2. 文件:在hdfs中
  3. 它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个maptask
  4. 设置切片大小原则(先满足minsp1it,再满足maxSplit)job.setInputFormatclass(CombineTextInputFormat.class)

SPLIT_MAXSIZE=Long.MAX_VALUE SPLIT_MINSIZE=1
CombineTextInputFormat.setMaxInputSplitsize(job,12810241024);//最大

合并时,考虑:
1)n个小文件,合并成【1】个文件
    job.setInputFormatClass(CombineTextInputFormat.class)
   默认:TextInputFormat
2)合并,希望每个合并后的文件是128M
 job.setInputFormatClass(CombineTextInputFormat.class)
 CombineTextInputFormat.setMaxInputSplitSize(job, 128*1024*1024); //最大

2.2.3、案例

代码如下:

package com.biubiubiu.demo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 合并追加文件
 * /input/1/2 中有多个文件
 * [aaa@qq.com hadoop_data]$ hadoop jar MapredTest-1.0-SNAPSHOT.jar com.biubiubiu.demo.TestSplit2 /input/1/2 /output/split
 * [aaa@qq.com hadoop_data]$ hdfs dfs -cat /output/split/*
 */
public class TestSplit2 {

    static class CustomMapper extends Mapper<Object, Text,Text, NullWritable>{
        Text text= new Text();
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            text.set(value.toString());
            context.write(text,NullWritable.get());
            context.getInputSplit().getLength();
            context.getInputSplit().getLocationInfo();
            context.getInputSplit().getLocations();
        }
    }


    //第3部分,编写Driver(main方法)
    public static void main(String[] args) {
       try{
//           if (args.length<2){
//               System.out.println("至少2个参数");
//               System.exit(0);
//           }
           //1)创建Configuration对象,指明namespace的路径
           Configuration conf = new Configuration();
           conf.set("dfs.defaultFS","hdfs://192.168.153.231:9000");
           //
           //2)创建Job
           Job job =Job.getInstance(conf,"CombineTextInputFormat");
           job.setJarByClass(TestSplit2.class);


           //3)自定义Mapper进行输出参数(key,value)的配置
           job.setMapperClass(CustomMapper.class);
           job.setMapOutputKeyClass(Text.class);
           job.setMapOutputValueClass(NullWritable.class);

           job.setNumReduceTasks(0);

           //4)小文件优化
           job.setInputFormatClass(CombineTextInputFormat.class);
           CombineTextInputFormat.setMaxInputSplitSize(job, 40*1024*1024); //最大

           //5)配置处理的文件的路径(input)以及处理结果存放的路径(output)
           FileInputFormat.setInputPaths(job,new Path("d_in"));//"args[0]"
           FileOutputFormat.setOutputPath(job,new Path("d_out/3"));//"args[1]"

           //6)让程序执行
           boolean result=job.waitForCompletion(true);
           if(result){
               System.out.println("执行正确!!!");
           }else{
               System.out.println("执行失败.....");
           }
       }catch(Exception ex){
           System.out.println("执行出错:"+ex.getMessage());
           ex.printStackTrace();
       }


    }
}

  1. 不使用合并优化,没有进行小文件合并前,会切割为4个
    MapReduce框架详解
  2. 使用合并,但未指定每片的最大值,会切割为1个,一个maptask处理
    MapReduce框架详解
  3. 设定了每个文件的大小(最大值),按照最大值合并切割
    MapReduce框架详解

执行结果如下:
MapReduce框架详解

3、程序过程分析

3.1、Task任务分析

3.1.1、Maplask工作原理

1)MapTask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度.
2)一个job的map阶段MapTask并行度(个数),由客户端提交job时的切片个数决定

3.1.2、ReduceTask工作原理

1)设置ReduceTask并行度(个数)reducetask的并行度同样影响整个job的执行并发度和执行效率,但与maptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置:
//默认值是1,手动设置为4
job.setNumReduceTasks(4);
2)规则:
(1)reducetask=0,表示没有reduce阶段,输出文件个数和map个数一致。
(2)reducetask默认值就是1,所以输出文件个数为一个。
(3)如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜(4)reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个
reducetask。
(5)具体多少个reducetask,需要根据集群性能而定。
(6)如果分区数不是1,但是reducetask为1,是否执行分区过程。
答案是:不执行分区过程。因为在maptask的源码中,执行分区的前提是先判断reduceNum个数是否大于1。
不大于1肯定不执行。

3.2、自定义分区Partition

3.2.1、使用场合

使用场景:要求将统计结果按照条件输出到不同文件中(分区)。
比如:将统计结果按照年份不同输出到不同文件中(分区)

3.2.2、自定义分区步骤

1)自定义分区类,重写getPartition方法
//自定义分区类
public static class Custompartitioner extends Partitioner<Intwritable,Doublewritable>{

	@override 
	public int getpartition(Intwritable intwritable,Doublewritable doublewritable,int numpartitions){
		int year=intwritable.get);
		if(year==2016){
			return 0;
		}else if(year==2017){
			return 1;
		}else if(year==2018){
			return 2;
		}else{
			return 3;
		}
  }
}

3.3、实现本地化合井Combiner

3.3.1、Combiner原因

1)combiner:每一个maptask的输出进行局部汇总,以减小网络传输量
2)combiner组件的父类就是Reducer

3.3.2、Combiner与Reducer的区别

【所处的执行位置不同】
Combiner是在每一个maptask所在的节点运行(本地执行);
Reducer是接收全局所有Mapper的输出结果;

3.3.3、Combiner案例

1)自定义Combiner
2)给job配置Combiner

3.4、自定义排序TopN

3.5、自定义分组GroupingComparator

3.6、多文件的Join操作

4、输出端Outputformat

未完待续~
觉得有帮助的,请多多支持博主,点赞关注哦~