MapReduce 内部实现机制,你真的懂吗?
微信公众号:小林玩大数据
作者:林中鸟
如果你觉得此文对你有帮助,欢迎点赞!
1. MapReduce 简介
MapReduce 编程范式将数据处理拆分成了两个基本阶段:Map 阶段与 Reduce 阶段。每个阶段的输入和输出均为键值对。
Map 阶段对应的进程为Mapper。Mapper 是在 JVM 上运行的 Java 进程,通常在要处理的数据节点上启动。利用数据本地性是 MapReduce 的一个很重要的原则。对于大数据集而言,将处理的进程分配至包含数据的节点上(计算向数据移动),比通过网络传输数据要高效的多。当 Mapper 处理完输入数据之后,会进入 sort & shuffle 阶段,在此阶段,数据会进行排序与分区,产生 (k,v,p) 型数据。
Recucer 阶段会将分区后的有序数据通过网络 I/O 发送给 Reducer 所在的 JVM ,Reducer 会读取这些按照键进行分区的有序数据。Reducer 拿到这些记录,reducer()方法可以对数据执行各种操作。Reducer 通常会把处理好的数据聚合到 HDFS 或 HBase 的存储中。
总的来说,JVM 有两类进程,一类读取无序数据,一类处理分区后的有序数据。
2. MapReduce 的处理特点
- Mapper 以键值对形式处理输入数据,并且每次只能处理一个键值对。Mapper 的个数是由框架而非开发人员决定的。
- Mapper 将键值对作为输出传递给 Reducer ,但是不能将数据传递给其它 Mapper。Reducer 之间也不能通信。
- Mapper 和 Reducer 通常不会使用太多的内存,一般会将 JVM 的堆大小设置为相对较小的值。
- 一般来说,每一个 Reducer 都会有单个输出的数据流,在默认情况下,这是一个文件合集,命名形式如:Part-r-0000、Part-r-00001等。存在客户端指定的 HDFS 目录下。
- Mapper 和 Reducer 的输出均会写入磁盘。如果 Reducer 输出的结果还需要额外的处理,那么整个数据集将会写入磁盘中,再读取一遍。这种模式被成为同步屏障(synchronization barrier)。这也是使用 MapReduce 迭代式处理数据比较低效的主要原因。
注意:MapReduce 有两大缺点:其一是启动时间较长。即使 MapReduce 过程几乎什么都不做,启动也需要大概10~30秒;其二是 MapReduce 会频繁写入磁盘,以便容错。这两大弱点使 MapReduce 不适用于迭代算法。
3. MapReduce 内部实现机制
3.1 MapReduce 执行流程
-
大数据集文件在集群的 HDFS 中以块存储,并把业务逻辑的计算程序上传到 HDFS 中,客户端提交作业给ResourceManager,ResourceManager 会在集群内随机挑一个不忙的节点,随机创建一个AppMstr进程。
-
AppMstr 会从 HDFS 中获取由 InputFormat 实现好的逻辑切片清单信息,并向 ResourceManager 申请计算资源,ResourceManager 会根据切片清单信息返回一个 container 并开始作业。
-
Map 计算过程中,一个 InputSplit 对应一个 Mapper,Map 计算结束后,通过 Hash 算法,计算 key 的hashCode值,再对分区数(partition)取模,实现 (k,v,p) 型数据。(相同的 key 为一组,最后会被分配到同一个 Reducer)。
-
此时 不会直接写入磁盘,否则 I/O 切换次数过于频繁,内部采取了一个 buffer 机制,默认大小为100M;当缓存满了 80%(此参数可调)之后,读取一次 I/O ,把缓存区中的数据根据 p(分区) 溢写到磁盘,磁盘上的每个文件再根据 key 进行 sort,最终的结果是形成一个个内部由序,外部无序的小文件。
-
当 Map 输出结束,对磁盘上的小文件进行一次归并排序,归并的过程将同一分区的数据放在一起,此时仍然内部的 key 有序;各个 Reduce 会从多个 Map shuffle 属于自己分区的小文件,shuffle 到 reduce 中再一次归并成多个全排序文件进行 reduce 计算,形成以 Part-r-00000、Part-r-00001命名的文件,并写到 HDFS 中。
注意:
1.上述第5步的再一次归并成多个全排序文件,主要是为了减少 I/O 时间。
2.上述过程的时间并非线性,在溢写出采用了时间复用。举个例子:假设前面有500个 Map ,有20个 Map 跑完了,剩下480个继续跑,reduce 会把跑完的20个 Map 先 shuffle 来做归并并计算。
3.虽然上述过程有一次 sort ,两次归并。 但是 reduce 没有重新排序的能力,而是强依赖之前 Map 的 sort 结果。
MapReduce 执行流程图
MapReduce 的强大之处在于,它不仅仅是由 Map 和 Reduce 任务构成的,其中还包括协同工作的多个组件。每个组件均可以由开发人员扩展。下面从 Map 阶段、Reduce 阶段两个方面介绍一个任务执行所涉及的主要组件。
3.2 Map阶段
Map 阶段主要涉及的组件有:InputFormat、RecordReader、Mapper 的 setuP()、Mapper 的 map()、patitioner(分区器)、Mapper 的cleanup()、Combiner。
- InputFormat Map 的输入数据主要是数据的逻辑分片,此过程主要由 InputFormat 类实现。该类主要有一下两个抽象方法:
- getSplits()
这个方法实现的是将输入分步到多个 Map 进程的具体逻辑。框架默认的是 TextInputFormat ,它会为每个数据块生成一个输入分片,并将对应数据块的位置发送给 Map 任务。框架会针对每个分片启动一个 Mapper 进程执行处理。正因如此,开发人员常常假定 MapReduce 任务中,Mapper 的个数就是待处理数据集的数据块个数。
createRecordReader()
- 这一方法为 Map 任务提供了 Reader 机制,赋予了 Map 访问待处理数据的能力,该方法会返回一个记录读取器,一般常用的是 LineRecordReader 行记录读取器,
//获取 InputSplit 抽象方法
public abstract List<InputSplit> getSplits(JobContext context)
throws IOException,InterruptedException;
//获取记录读取器抽象方法
public abstract RecordReader<K,V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException;
-
RecordReader RecordReader 类读取数据块的内容,将读取到的键值对记录返回到 Map 任务。初始化 RecordReader 实例,使用的是需要读取的数据块在文件中的起始位置,以及该文件在 HDFS 中的 URI 地址。寻址到这一起始位置之后,每次调用 nextKeyValue()方法查找下一个行分隔符,并读取下一条记录。
-
Mapper 的 setuP() 在 Map 任务的 map 方法调用之前,Mapper 的 setup()方法会被首先调用一次。这一方法可以使开发人员初始化 Map 进程中会用到的变量和文件句柄。setup()最常用的作用是从配置对象中获取配置项的值。
-
Mapper 的 map() Mapper的核心方法就是 map()方法,开发人员需要重写 map()方法。该方法有三个输入参数:键(key)、值(value)及上下文(context)。其中 key 和 value 可以通过 RecordReader 获得,它们包含map()方法需要处理的数据。context 对象为 Mapper 更多行为的实现提供了支持:将输出发送给 Reduceer,从Configuration 对象中读取值,计数器自增以汇报 Map 任务进度。
-
Partitioner 分区器实现了在 Reducer 之间进行数据分区的逻辑。默认的 Partitioner 处理逻辑:首先获得 key 的标准哈希函数散列,再与 Reducer(开发人员可控制数量) 数取模,余数则决定这条记录的目标 Reducer。
-
Combiner MapReduce 中的Combiner可以提供一种简单的方法,减少 Mapper 和 Reducer 间的数据传输。在 buffer 中,数据 sort(默认快排)完之后,如果定义了 Combiner,你就可以调用 combine()方法,对 Mapper 产生的数据进行聚合操作了,因为这个方法会在 Mapper 执行的节点上执行,所以这种聚合行为可以减少网络 I/O。
-
Mapper 的 cleanup() cleanup()方法在 map 方法处理了所有的数据之后调用。这里通常要执行文件的关闭操作,以及最后的报告和总结,比如将最终状态写入日志中。
3.3 Reduce 阶段
Reducer 任务没有 Map 任务那么复杂,主要涉及的组件有:Shuffle、Reducer 的 setup()、Reducer 的reduce()、OutputFormat、Reducer 的 cleanup()。
-
Shuffle 在 Reducer 开始之前,Reducer 任务会将 Mapper 的输出从 Map 节点复制到 Reducer 节点中,这个过程称为 Shuffle。每一个 Reducer 都需要从多个 Mapper Shuffle 数据,所以我们要让每一个 Reducer 按照 Map 任务的方式读取本地的数据,提高处理性能。
-
Reducer 的 setup() Reducer 的 setup()步骤与 Map 的 setup()相似。这一方法在 Reducer 处理单个记录之前,通常用来初始化变量和文件句柄。
-
Reducer 的 reduce() reduce()方法是进行绝大多数数据处理的地方。与 Map 的 map()方法类似:
但在输入方面有几点不同,首先,key 是有序的;其次,value 从原来的一个 value 变成了 values(从一次处理一个值,变为一次处理多个值)。
在输出方面,map()方法中,调用 context.write(k,v)方法可以将输出放入缓存区,缓存区中的数据会被 sort,然后被 Reducer 读取。而在 reduce()方法中,调用 context.write(k,v)方法会将输出发送给 OutputFormat。 -
OutputFormat 在 Map 阶段,InputFormat 负责处理输入数据的读取,OutputFormat 负责数据的格式化和把输出数据写出(通常是写到 HDFS)。对于 OutputFormat 类来说,一个 Reducer 只会写一个文件,因此在 HDFS 上看到一个 Reducer 对应一个 输出文件。
林中鸟儿们,这期就和大家分享到这儿!扫一扫来个关注,下期为大家解析 MapReduce 关键组件的源码!敬请期待!
林中鸟公众号
上一篇: 集合之浅谈HashSet
下一篇: JS事件,你真的懂吗(捕获,冒泡)?