欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

大数据处理—浅析MapReduce之shuffle

程序员文章站 2022-11-18 09:19:43
浅析MapReduce -> shuffle shuffle的意思就是洗牌,它是MapReduce的核心,也是被称为奇迹发生的地方,因为MapReduce玩的就是洗数据,然后让数据...

浅析MapReduce -> shuffle

shuffle的意思就是洗牌,它是MapReduce的核心,也是被称为奇迹发生的地方,因为MapReduce玩的就是洗数据,然后让数据出现在该出现的位置.

官方描述的shuffle过程,我们不太可能明白shuffle的过程,因为它与事实相差挺多的,细节也是错乱的.?我们现在这样理解就可以了, shuffle

描述着数据从map task输出到reduce task输入的这段过程.?

大数据处理—浅析MapReduce之shuffle

从最基本的要求来说,我们队shuffle过程的期望可以有:

1.完整地从map task端拉取数据到reduce端

2.在跨节点拉取数据时,尽可能地减少对带宽的不必要消耗

3.减少磁盘IO对task执行的影响

我们想详细的分析shuffle,那么我们先来看看map端,下面我找了一张图帮我们理解:

大数据处理—浅析MapReduce之shuffle

上图可能是某个map task的运行情况,那它与官方图的左半边比较,会发现很多不一致.?官方图没有清楚地说明partition.?sort与combiner到底作

用在那个阶段.?我找了这张图,希望让大家清晰地了解从map数据输入到map端所有数据准备好的全过程.

整个流程分为四步,简单些可以这样说,每一个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以

一个临时文件的方式存放到磁盘中,当整个map task结束后再对磁盘中这个Map task产生的所有临时文件做合并,生产最终的正式输出文件,然后

等待reduce task来拉取数据.

1.在map task执行时,它的输入数据来源于HDFS的block,当然在MapReduce概念当中,map task只读取split. split与block的对应关系可能是多对

一,默认是一对一.?在wordCount例子里,假设map的输入数据都是像"aaa"这样的字符串.

2.在经过mapper的运行后,我们得知mapper的输出是这样一个key/value对:key是"aaa",value是数值1.?因为当前map端只做加一操作,在reduce?

task里才去合并结果集.?前面我们知道这个job有3个reduce task,到底当前"aaa"应该交由那个reduce去做,是需要现在决定的.

MapReduce提供partitioner接口,它的作用就是根据key或value以及reduce的数量来决定当前的这对输出数据最终应该交由那个reduce task处理.

默认对ley hash后再以reduce task数量取模.?默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有要求,可以制订.

在我们的例子中,"aaa"经过Partition后返回0,也就是这对值应该交由第一个reduce来处理.?接下来,需要将数据写入内存缓冲区中,缓冲区的作

用是批量收集map的结果,减少磁盘的IO影响.?我们的key/value对以及Partition的结果都会被写入缓冲区,当然写入之前,key/value值都会被序

列化成字节数组.

3.在这个内存缓冲区是有大小限制的,默认为100MB,当map task的输出结果很多时,就可能撑爆内存,所有需要在一定条件下将缓冲区中的数据临

时写入磁盘,然后重新利用这块缓冲区.?这个内存往磁盘写数据的过程被称为spill,中文可翻译为溢写,字面意思很直观.?这个溢写是由单独线程

来完成的,不影响往缓冲区写Map结果的线程.?溢写程序启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill,这个比例默认是

0.8,也就是当缓冲区的数据已经达到峰值,溢写线程启动,锁定这80MB的内存,执行溢写进程,Map task的输出结果还可以往剩下的20Mb内存中写

当溢写线程启动后,需要对这80MB空间内的key做排序(sort),排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做排序.

在这里我们可以想想,因为map task的输出是需要发送到不同的reduce端去,而内存缓冲区没有对将发送到相同reduce端的数据做合并,那么这种

合并应该是体现是磁盘文件中的。从官方图上也可以看到写到磁盘中的溢写文件是对不同的reduce端的数值做过合并。所以溢写过程一个很重要的

细节在于,如果有很多个key/value对需要发送到某个reduce端去,那么需要将这些key/value值拼接到一块,减少与partition相关的索引记录。?

在针对每个reduce端而合并数据时,有些数据可能像这样:“aaa”/1, “aaa”/1。对于WordCount例子,就是简单地统计单词出现的次数,如果

在同一个map task的结果中有很多个像“aaa”一样出现多次的key,我们就应该把它们的值合并到一块,这个过程叫reduce也叫combine。但M

apReduce的术语中,reduce只指reduce端执行从多个map task取数据做计算的过程。除reduce外,非正式地合并数据只能算做combine了。其实大家

知道的,MapReduce中将Combiner等同于Reducer.?如果client设置过combiner,那么现在就是使用combiner的时候了.?将相同的key/value对的

value想加起来,减少溢写到磁盘的数据量. combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用.?combiner的使用场景:?应该

用于那种Reducer的输入,combiner绝不改变最终的计算结果.?比如累加,最大值等等. ombiner的使用一定得慎重,如果用得好,它会对job执行效

率有帮助,反之会影响reduce的最终结果.

每次溢写会在磁盘上生成一个溢写文件,如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个溢写文件存在。当map?

task真正完成时,内存缓冲区中的数据也全部溢写到磁盘中形成一个溢写文件。最终磁盘中会至少有一个这样的溢写文件存在(如果map的输出结果

很少,当map执行完成时,只会产生一个溢写文件),因为最终的文件只有一个,所以需要将这些溢写文件归并到一起,这个过程就叫做Merge。

Merge是怎样的?如前面的例子,“aaa”从某个map task读取过来时值是5,从另外一个map 读取时值是8,因为它们有相同的key,所以得merge成

group。什么是group。对于“aaa”就是像这样的:{“aaa”, [5, 8, 2, …]},数组中的值就是从不同溢写文件中读取出来的,然后再把这些值加

起来。请注意,因为merge是将多个溢写文件合并到一个文件,所以可能也有相同的key存在,在这个过程中如果client设置过Combiner,也会使用

Combiner来合并相同的key.

至此,map端的所有工作都已经结束了,最终生成的这个文件也存放在TaskTracker够得着的某个本地目录内.?每个reduce task不断的通过RPC从job

Tracker哪里获取map task是否完成的信息,如果reduce task得到通知,获知某台TaskTracker上的map task执行完成,shuffle的后半段过程开始

执行.?下面这两个图会帮我们理解map - > reduce的过程

大数据处理—浅析MapReduce之shuffle

大数据处理—浅析MapReduce之shuffle

简单的说,reduce?task在执行之前的工作就是不断地拉取当前job里面每一个map task的最终结果,然后对从不同地方拉取过来的数据不断的做

merge,也最终形成了一个文件作为reduce task的输入文件.

shuffle在reduced端的过程也能用图上表明的三点来概括. Reducer真正运行之前,所有时间都是在拉取数据,做merge,且不断重复地在做.?如前面

的方式一样,下面我也分段地描述reduce端的shuffle细节.

1.?copy过程,简单地拉取数据,reduce进程回启动一些数据copy线程,然后将文件管理在本地磁盘中.

2.Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓

冲区大小要比map端的更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。这里

需要强调的是,merge有三种形式:1)内存到内存? 2)内存到磁盘? 3)磁盘到磁盘。默认情况下第一种形式不启用,让人比较困惑,是吧。当内存中

的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后

在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终

的那个文件。

3.?Reducer的输入文件。不断地merge后,最后会生成一个“最终文件”。为什么加引号?因为这个文件可能存在于磁盘上,也可能存在于内存中。

对我们来说,当然希望它存放于内存中,直接作为Reducer的输入,但默认情况下,这个文件是存放于磁盘中的。当Reducer的输入文件已定,整个

Shuffle才最终结束。然后就是Reducer执行,把结果放到HDFS上。?

大概就是上面的这个编程模型,这个模型让我思考了很久,总结起来还是让正确的人做正确的事情,当一个人专注的做一件事情那么,它的效率会

很高.?让一个人一直削土豆,和让一个削土豆再烧水再做饭效率肯定不一样,我们要有一个框架!?在这个框架中,让每一个进程持续做一件事情,

然后再对程序进行划分和合并,最后你会高效的得到你想要的结果.