欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MapReduce工作流程和工作原理

程序员文章站 2022-06-30 20:50:26
...

MapReduce是我们再进行离线大数据处理的时候经常要使用的计算模型,MapReduce的计算过程被封装的很好,我们只用使用Map和Reduce函数,所以对其整体的计算过程不是太清楚,同时MapReduce1.0和MapReduce2.0在网上有很多人混淆。

MapReduce1.0运行模型

MapReduce工作流程和工作原理

20170730014216035.png

Input

Input但是输入文件的存储位置,

但是注意这里并一定是一些博客说的当然是HDFS似的分布式文件系统位置,默认是HDFS文件系统,当然也可以修改。

,它也可以是本机上的文件位置。
我们来仔细分析下input

MapReduce工作流程和工作原理

8aab5880-d171-30f7-91d6-aaacba2d03ce.jpg

首先我们知道要和JobTracker打交道是离不开JobClient这个接口的,就如上图所示,

然后JobClient中的Run方法 会让 JobClient 把所有 Hadoop Job 的信息,比如 mapper reducer jar path, mapper / reducer class name, 输入文件的路径等等,告诉给 JobTracker,如下面的代码所示:

 

public int run(String[] args) throws Exception {
        
        //create job
        Job job = Job.getInstance(getConf(), this.getClass().getSimpleName());
        
        // set run jar class
        job.setJarByClass(this.getClass());
        
        // set input . output
        FileInputFormat.addInputPath(job, new Path(PropReader.Reader("arg1")));
        FileOutputFormat.setOutputPath(job, new Path(PropReader.Reader("arg2")));
        
        // set map
        job.setMapperClass(HFile2TabMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        
        // set reduce
        job.setReducerClass(PutSortReducer.class);
        return 0;
    }

除此以外,JobClient.runJob() 还会做一件事,使用 InputFormat类去计算如何把 input 文件 分割成一份一份,然后交给 mapper 处理。inputformat.getSplit() 函数返回一个 InputSplit 的 List, 每一个 InputSplit 就是一个 mapper 需要处理的数据。

一个 Hadoop Job的 input 既可以是一个很大的 file, 也可以是多个 file; 无论怎样,getSplit() 都会计算如何分割 input.

如果是HDFS文件系统,我们都知道其可以通过将文件分割为block的形式存放在很多台电脑上,使其可以存放很大的文件。那么Mapper是如何确定一个HDFS文件中的block存放哪几台电脑,有什么数据?

inputFormat它实际上是个 interface, 需要 类 来继承,提供分割 input 的逻辑。

Jobclient 有一个方法叫 setInputFormat(), 通过它,我们可以告诉 JobTracker 想要使用的 InputFormat 类 是什么。如果我们不设置,Hadoop默认的是 TextInputFormat, 它默认为文件在 HDFS上的每一个 Block 生成一个对应的 InputSplit. 所以大家使用 Hadoop 时,也可以编写自己的 input format, 这样可以*的选择分割 input 的算法,甚至处理存储在 HDFS 之外的数据。

JobTracker 尽量把 mapper 安排在离它要处理的数据比较近的机器上,以便 mapper 从本机读取数据,节省网络传输时间。具体实现是如何实现?

对于每个 map任务, 我们知道它的 split 包含的数据所在的主机位置,我们就把 mapper 安排在那个相应的主机上好了,至少是比较近的host. 你可能会问:split 里存储的 主机位置是 HDFS 存数据的主机,和 MapReduce 的主机 有什么相关呢?为了达到数据本地性,其实通常把MapReduce 和 HDFS 部署在同一组主机上。

既然一个 InputSplit 对应一个 map任务, 那么当 map 任务收到它所处理数据的位置信息,它就可以从 HDFS 读取这些数据了。

接下来我们再从map函数看Input

map函数接受的是一个 key value 对。

实际上,Hadoop 会把每个 mapper 的输入数据再次分割,分割成一个个 key-value对, 然后为每一个 key-value对,调用Map函数一次. 为了这一步分割,Hadoop 使用到另一个类: RecordReader. 它主要的方法是 next(), 作用就是从 InputSplit 读出一条 key-value对.

RecordReader 可以被定义在每个 InputFormat 类中。当我们通过 JobClient.setInputFormat() 告诉 Hadoop inputFormat 类名称的时候, RecordReader 的定义也一并被传递过来。

所以整个Input,

1.JobClient输入输入文件的存储位置

2.JobClient通过InputFormat接口可以设置分割的逻辑,默认是按HDFS文件分割。

3.Hadoop把文件再次分割为key-value对。

4.JobTracker负责分配对应的分割块由对应的maper处理,同时 RecordReader负责读取key-value对值。

Mapper

JobClient运行后获得所需的配置文件和客户端计算所得的输入划分信息。并将这些信息都存放在JobTracker专门为该作业创建的文件夹中。文件夹名为该作业的Job ID。JAR文件默认会有10个副本(mapred.submit.replication属性控制);

然后输入划分信息告诉了JobTracker应该为这个作业启动多少个map任务等信息。

JobTracker通过TaskTracker 向其汇报的心跳情况和slot(情况),每一个slot可以接受一个map任务,这样为了每一台机器map任务的平均分配,JobTracker会接受每一个TaskTracker所监控的slot情况。

JobTracker接收到作业后,将其放在一个作业队列里,等待作业调度器对其进行调度,当作业调度器根据自己的调度算法调度到该作业时,会根据输入划分信息为每个划分创建一个map任务,并将map任务分配给TaskTracker执行,分配时根据slot的情况作为标准。

TaskTracker每隔一段时间会给JobTracker发送一个心跳,告诉JobTracker它依然在运行,同时心跳中还携带着很多的信息,比如当前map任务完成的进度等信息。当JobTracker收到作业的最后一个任务完成信息时,便把该作业设置成“成功”。当JobClient查询状态时,它将得知任务已完成,便显示一条消息给用户。

Map通过 RecordReader 读取Input的key/value对,map根据用户自定义的任务,运行完毕后,产生另外一系列 key/value,并将其写入到Hadoop的内存缓冲取中,在内存缓冲区中的key/value对按key排序,此时会按照reduce partition进行,分到不同partition中,一旦内存满就会被写入到本地磁盘的文件里,这个文件叫spill file。

shuffle

Shuffle是我们不需要编写的模块,但却是十分关键的模块。

MapReduce工作流程和工作原理

4df193f5-e56e-308f-9689-eac035dd8a2b.png

在map中,每个 map 函数会输出一组 key/value对, Shuffle 阶段需要从所有 map主机上把相同的 key 的 key value对组合在一起,(也就是这里省去的Combiner阶段)组合后传给 reduce主机, 作为输入进入 reduce函数里。

Partitioner组件 负责计算哪些 key 应当被放到同一个 reduce 里

HashPartitioner类,它会把 key 放进一个 hash函数里,然后得到结果。如果两个 key 的哈希值 一样,他们的 key/value对 就被放到同一个 reduce 函数里。我们也把分配到同一个 reduce函数里的 key /value对 叫做一个reduce partition.

我们看到 hash 函数最终产生多少不同的结果, 这个 Hadoop job 就会有多少个 reduce partition/reduce 函数,这些 reduce函数最终被JobTracker 分配到负责 reduce 的主机上,进行处理。

我们知道map阶段可能会产生多个spill file 当 Map 结束时,这些 spill file 会被 merge 起来,不是 merge 成一个 file,而是也会按 reduce partition 分成多个。

当 Map tasks 成功结束时,他们会通知负责的 tasktracker, 然后消息通过 jobtracker 的 heartbeat 传给 jobtracker. 这样,对于每一个 job, jobtracker 知道 map output 和 map tasks 的关联。Reducer 内部有一个 thread 负责定期向 jobtracker 询问 map output 的位置,直到 reducer 得到所有它需要处理的 map output 的位置。

Reducer 的另一个 thread 会把拷贝过来的 map output file merge 成更大的 file. 如果 map task 被 configure 成需要对 map output 进行压缩,那 reduce 还要对 map 结果进行解压缩。当一个 reduce task 所有的 map output 都被拷贝到一个它的 host上时,reduce 就要开始对他们排序了。

排序并不是一次把所有 file 都排序,而是分几轮。每轮过后产生一个结果,然后再对结果排序。最后一轮就不用产生排序结果了,而是直接向 reduce 提供输入。这时,用户提供的 reduce函数 就可以被调用了。输入就是 map 任务 产生的 key value对.

同时reduce任务并不是在map任务完全结束后才开始的,Map 任务有可能在不同时间结束,所以 reduce 任务没必要等所有 map任务 都结束才开始。事实上,每个 reduce任务有一些 threads 专门负责从 map主机复制 map 输出(默认是5个)。

Reduce

MapReduce工作流程和工作原理

e1090dee-ee98-30d1-ad55-2f88f774fa73.jpg

 

reduce() 函数以 key 及对应的 value 列表作为输入,按照用户自己的程序逻辑,经合并 key 相同的 value 值后,产 生另外一系列 key/value 对作为最终输出写入 HDFS。

一定要注意以上为MapReduce1.0的过程,而且现在MapReduce已经升级到了2.0版本,具体2.0的工作流程可参考:

Yarn框架深入理解

但是并不意味着MapReduce1.0被淘汰,在Yarn中的MRYarnClild模块中基本上是是采用MapReduce1.0的解决思路,MRv2 具有与 MRv1 相同的编程模型和数据处理引擎,唯一不同的是运行时环境。MRv2 是在 MRv1 基础上经加工之后,运行于资源管理框架 YARN 之上的计算框架 MapReduce。 它的运行时环境不再由 JobTracker 和 TaskTracker 等服务组成,而是变为通用资源管理 系统 YARN 和作业控制进程 ApplicationMaster,其中,YARN 负责资源管理和调度,而 ApplicationMaster 仅负责一个作业的管理。简言之,MRv1 仅是一个独立的离线计算框架, 而 MRv2 则是运行于 YARN 之上的 MapReduce。

 

MapReduce的工作原理

 

我们知道MapReduce诞生与搜索邻域,主要解决的是海量数据处理扩展性差的问题。

MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。它们的主要思想,都是从函数式编程语言里借来的。每次一个步骤方法会产生一个状态,这个状态会直接当参数传进下一步中。而不是使用全局变量。

MapReduce框架

  • MapReduce将复杂的,运行大规模集群上的并行计算过程高度地抽象两个函数:Map和Reduce
  • MapReduce采用“分而治之”策略,将一个分布式文件系统中的大规模数据集,分成许多独立的分片。这些分片可以被多个Map任务并行处理。
  • MapReduce设计的一个理念就是“计算向数据靠拢”,而不是“数据向计算靠拢”,原因是,移动数据需要大量的网络传输开销
  • MapReduce框架采用了Master/Slave架构,包括一个Master和若干个Slave,Master上运行JobTracker,Slave运行TaskTracker
  • Hadoop框架是用JAVA来写的,但是,MapReduce应用程序则不一定要用Java来写。

JobTracker:初始化作业,分配作业,TaskTracker与其进行通信,协调监控整个作业
TaskTracker:定期与JobTracker通信,执行Map和Reduce任务
HDFS:保存作业的数据、配置、jar包、结果

作业调度算法:

FIFO调度器(默认)、公平调度器、容量调度器

TaskTracker和JobTracker之间的通信与任务的分配是通过心跳机制完成的;

TaskTracker会主动向JobTracker询问是否有作业要做,如果自己可以做,那么就会申请到作业任务,这个任务 可以使Map也可能是Reduce任务;

TaskTraker将代码和配置信息到本地;

分别为每一个Task启动JVM运行任务

任务在运行过程中,首先会将自己的状态汇报给TaskTracker,然后由TaskTracker汇总告之JobTracker;

任务进度是通过计数器来实现的;

JobTracker是在接受到最后一个任务运行完成后,才会将作业标志为成功。

MapReduce编程模型

MapReduce 由 两 个 阶 段 组 成 :Map 和 Reduce。

  • map() 函数以 key/value 对作为输入,产生另外一系列 key/value 对作为中间输出写入本地 磁盘。MapReduce 框架会自动将这些中间数据按照 key 值进行聚集,且 key 值相同(用户可 设定聚集策略,默认情况下是对 key 值进行哈希取模)的数据被统一交给 reduce() 函数处理。
  • reduce() 函数以 key 及对应的 value 列表作为输入,经合并 key 相同的 value 值后,产 生另外一系列 key/value 对作为最终输出写入 HDFS。
  • 指定三个组件分别是 InputFormat、Partitioner 和 OutputFormat, 它们均需要用户根据自己的应用需求配置①指定输入 文件格式。将输入数据切分成若干个 split,且将每个 split 中的数据解析成一个个 map() 函数 要求的 key/value 对。②确定 map() 函数产生的每个 key/value 对发给哪个 Reduce Task 函数处 理。③指定输出文件格式,即每个 key/value 对以何种形式保存到输出文件中。

MapReduce作业运行流程

MapReduce工作流程和工作原理

8aab5880-d171-30f7-91d6-aaacba2d03ce.jpg

 

1.在客户端启动一个作业。

2.向JobTracker请求一个Job ID。

3.将运行作业所需要的资源文件复制到HDFS上,包括MapReduce程序打包的JAR文件、配置文件和客户端计算所得的输入划分信息。这些文件都存放在JobTracker专门为该作业创建的文件夹中。文件夹名为该作业的Job ID。JAR文件默认会有10个副本(mapred.submit.replication属性控制);输入划分信息告诉了JobTracker应该为这个作业启动多少个map任务等信息。

4.JobTracker接收到作业后,将其放在一个作业队列里,等待作业调度器对其进行调度(这里是不是很像微机中的进程调度呢,呵呵),当作业调度器根据自己的调度算法调度到该作业时,会根据输入划分信息为每个划分创建一个map任务,并将map任务分配给TaskTracker执行。对于map和reduce任务,TaskTracker根据主机核的数量和内存的大小有固定数量的map槽和reduce槽。这里需要强调的是:map任务不是随随便便地分配给某个TaskTracker的,这里有个概念叫:数据本地化(Data-Local)。意思是:将map任务分配给含有该map处理的数据块的TaskTracker上,同时将程序JAR包复制到该TaskTracker上来运行,这叫“运算移动,数据不移动”。而分配reduce任务时并不考虑数据本地化。

5.TaskTracker每隔一段时间会给JobTracker发送一个心跳,告诉JobTracker它依然在运行,同时心跳中还携带着很多的信息,比如当前map任务完成的进度等信息。当JobTracker收到作业的最后一个任务完成信息时,便把该作业设置成“成功”。当JobClient查询状态时,它将得知任务已完成,便显示一条消息给用户。

以上是在客户端、JobTracker、TaskTracker的层次来分析MapReduce的工作原理的,下面我们再细致一点如下图。

MapReduce工作流程和工作原理

20160428165418734.png

Map、Reduce任务中Shuffle和排序的过程

输入分片(input split):

在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组,输入分片(input split)往往和hdfs的block(块)关系很密切,假如我们设定hdfs的块的大小是64mb,如果我们输入有三个文件,大小分别是3mb、65mb和127mb,那么mapreduce会把3mb文件分为一个输入分片(input split),65mb则是两个输入分片(input split)而127mb也是两个输入分片(input split),换句话说我们如果在map计算前做输入分片调整,例如合并小文件,那么就会有5个map任务将执行,而且每个map执行的数据大小不均,这个也是mapreduce优化计算的一个关键点。

map阶段:

就是程序员编写好的map函数了,因此map函数效率相对好控制,而且一般map操作都是本地化操作也就是在数据存储节点上进行;

combiner阶段:

combiner阶段是程序员可以选择的,combiner其实也是一种reduce操作,因此我们看见WordCount类里是用reduce进行加载的。Combiner是一个本地化的reduce操作,它是map运算的后续操作,主要是在map计算出中间文件前做一个简单的合并重复key值的操作,例如我们对文件里的单词频率做统计,map计算时候如果碰到一个hadoop的单词就会记录为1,但是这篇文章里hadoop可能会出现n多次,那么map输出文件冗余就会很多,因此在reduce计算前对相同的key做一个合并操作,那么文件会变小,这样就提高了宽带的传输效率,毕竟hadoop计算力宽带资源往往是计算的瓶颈也是最为宝贵的资源,但是combiner操作是有风险的,使用它的原则是combiner的输入不会影响到reduce计算的最终输入,例如:如果计算只是求总数,最大值,最小值可以使用combiner,但是做平均值计算使用combiner的话,最终的reduce计算结果就会出错。

shuffle阶段:

将map的输出作为reduce的输入的过程就是shuffle了,这个是mapreduce优化的重点地方。这里我不讲怎么优化shuffle阶段,讲讲shuffle阶段的原理,因为大部分的书籍里都没讲清楚shuffle阶段。Shuffle一开始就是map阶段做输出操作,一般mapreduce计算的都是海量数据,map输出时候不可能把所有文件都放到内存操作,因此map写入磁盘的过程十分的复杂,更何况map输出时候要对结果进行排序,内存开销是很大的,map在做输出时候会在内存里开启一个环形内存缓冲区,这个缓冲区专门用来输出的,默认大小是100mb,并且在配置文件里为这个缓冲区设定了一个阀值,默认是0.80(这个大小和阀值都是可以在配置文件里进行配置的),同时map还会为输出操作启动一个守护线程,如果缓冲区的内存达到了阀值的80%时候,这个守护线程就会把内容写到磁盘上,这个过程叫spill,另外的20%内存可以继续写入要写进磁盘的数据,写入磁盘和写入内存操作是互不干扰的,如果缓存区被撑满了,那么map就会阻塞写入内存的操作,让写入磁盘操作完成后再继续执行写入内存操作,前面我讲到写入磁盘前会有个排序操作,这个是在写入磁盘操作时候进行,不是在写入内存时候进行的,如果我们定义了combiner函数,那么排序前还会执行combiner操作。每次spill操作也就是写入磁盘操作时候就会写一个溢出文件,也就是说在做map输出有几次spill就会产生多少个溢出文件,等map输出全部做完后,map会合并这些输出文件。这个过程里还会有一个Partitioner操作,对于这个操作很多人都很迷糊,其实Partitioner操作和map阶段的输入分片(Input split)很像,一个Partitioner对应一个reduce作业,如果我们mapreduce操作只有一个reduce操作,那么Partitioner就只有一个,如果我们有多个reduce操作,那么Partitioner对应的就会有多个,Partitioner因此就是reduce的输入分片,这个程序员可以编程控制,主要是根据实际key和value的值,根据实际业务类型或者为了更好的reduce负载均衡要求进行,这是提高reduce效率的一个关键所在。到了reduce阶段就是合并map输出文件了,Partitioner会找到对应的map输出文件,然后进行复制操作,复制操作时reduce会开启几个复制线程,这些线程默认个数是5个,程序员也可以在配置文件更改复制线程的个数,这个复制过程和map写入磁盘过程类似,也有阀值和内存大小,阀值一样可以在配置文件里配置,而内存大小是直接使用reduce的tasktracker的内存大小,复制时候reduce还会进行排序操作和合并文件操作,这些操作完了就会进行reduce计算了。

reduce阶段:

和map函数一样也是程序员编写的,最终结果是存储在hdfs上的。

输入 --> map --> shuffle --> reduce -->输出

HDFS block和MapReduce split之间的联系?

Block:HDFS中最小的数据存储单位,默认是164M;Split:MapReduce中最小的计算单元,默认与Block一一对应。
两者的对应关系是任意的,可有用户控制。

Mapreduce将作业分成两个阶段,分别是Map阶段和Reduce阶段,请问:Partitioner,Combiner,Shuffle分别位于哪个阶段中?

Partitioner:数据分组 决定了Map task输出的每条数据交给哪个Reduce Task处理。默认实现:hash(key) mod R R是Reduce Task数目,允许用户自定义,很多情况下需要自定义Partitioner ,比如“hash(hostname(URL)) mod R”确保相同域名的网页交给同一个Reduce Task处理 属于(map)阶段。

Combiner:可以看做local reduce 合并相同的key对应的value,通常与reducer逻辑一样 ,好处是减少map task输出 数量(磁盘IO),见少Reduce-map网络传输数据量(I网络IO) 结果叠加属于(map)阶段。

Shuffle:Shuffle描述着数据从map task输出到reduce task输入的这段过程 (完整地从map task端拉取数据到reduce 端。
在跨节点拉取数据时,尽可能地减少对带宽的不必要消耗。减少磁盘IO对task执行的影响。
) 属于(reduce)阶段。

 

相关标签: 大数据

上一篇: 在HTML中使用JavaScript

下一篇: Lottie