MapReduce工作流程详述
MapReduce过程
1:最简单的过程:
map - reduce
2:定制了partitioner以将map的结果送往指定reducer的过程:
map - partition - reduce
3:增加了在本地先进性一次reduce(优化)
map - combin(本地reduce) - partition - reduce
基本上,一个完整的mapreduce过程可以分为以上3中提到的4个步骤,我们以图为例,详细描述第三种
1、Map端:
每个输入分片会让一个map任务来处理,默认情况下,以HDFS的一个块的大小(默认为64M)为一个分片,当然我们也可以设置块的大小。map输出的结果会暂且放在一个环形内存缓冲区中(该缓冲区的大小默认为100M,由io.sort.mb属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的80%,由io.sort.spill.percent属性控制),一个后台线程负责将结果写到硬盘,这个过程称为“spill”。Spill过程中,Map仍可以向缓存写入结果,如果缓存已经写满,那么Map进行等待。
Spill的具体过程如下:首先,后台线程根据Reducer的个数将输出结果进行分区,每一个分区对应一个Reducer。其次,每一个分区后台线程对输出结果的Key进行排序。每一次spill都会在硬盘产生一个spill文件。因此,一个Map task有可能会产生多个spill文件,当Map写出最后一个输出时,会将所有的spill文件进行合并与排序,最后合并成了一个已分区且已排序的文件。在这个过程中Combiner函数仍然会被调用。从整个过程来看,Combiner函数的调用次数是不确定的。下面我们重点分析下Shuffle阶段的排序过程:
Shuffle阶段的排序可以理解成两部分,
第一部分是对spill进行分区时,由于一个分区包含多个key值,所以要对分区内的<key,value>按照key进行排序,即key值相同的一串<key,value>存放在一起,这样一个partition内按照key值整体有序了。
第二部分并不是排序,而是进行merge,merge有两次,一次是map端将多个spill 按照分区和分区内的key进行merge,形成一个大的文件。第二次merge是在reduce端,进入同一个reduce的多个map的输出 merge在一起,该merge理解起来有点复杂,最终不是形成一个大文件,而且期间数据在内存和磁盘上都有。所以shuffle阶段的merge并不是严格的排序意义,只是将多个整体有序的文件merge成一个大的文件,由于不同的task执行map的输出会有所不同,所以merge后的结果不是每次都相同,不过还是严格要求按照分区划分,同时每个分区内的具有相同key的<key,value>对挨在一起。
Shuffle排序综述:如果只定义了map函数,没有定义reduce函数,那么输入数据经过shuffle的排序后,结果为key值相同的输出挨在一起,且key值小的一定在前面,这样整体来看key值有序(宏观意义的,不一定是按从大到小,因为如果采用默认的HashPartitioner,则key 的hash值相等的在一个分区,如果key为IntWritable的话,每个分区内的key会排序好的),而每个key对应的value不是有序的。
2、combin是在本地进行的一个reduce的过程,其目的是提高hadoop的效率。
直接将数据交给下一个步骤处理,假如文件中存在两个以hello为键的记录,在下一个步骤中需要处理2条的记录,如果先做一次combin,则只需处理一次的记录,这样做的一个好处就是,当数据量很大时,减少很多开销。(直接将partition后的结果交给reduce处理,由于tasktracker并不一定分布在本节点,过多的冗余记录会影响IO,与其在reduce时进行处理,不如在本地先进性一些优化以提高效率)
3、Redcue端
将分区中的数据拷贝给相对应的reduce任务。有人可能会问:分区中的数据怎么知道它对应的reduce是哪个呢?其实map任务一直和其父TaskTracker保持联系,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中保存了整个集群中的宏观信息。只要reduce任务向JobTracker获取对应的map输出位置就ok了哦。
到这里,map端就分析完了。那到底什么是Shuffle呢?Shuffle的中文意思是“洗牌”,如果我们这样看:一个map产生的数据,结果通过hash过程分区却分配给了不同的reduce任务,是不是一个对数据洗牌的过程呢?
2.Reduce端:
(1)Reduce会接收到不同map任务传来的数据,并且每个map传来的数据都是有序的。如果reduce端接受的数据量相当小,则直接存储在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用作此用途的堆空间的百分比),如果数据量超过了该缓冲区大小的一定比例(由mapred.job.shuffle.merge.percent决定),则对数据合并后溢写到磁盘中。
(2)随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样做是为了给后面的合并节省时间。
(3)合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到reduce函数
其实不管在map端还是reduce端,MapReduce都是反复地执行排序,合并操作,现在终于明白了有些人为什么会说:排序是hadoop的灵魂。