欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Hadoop和Spark的Shuffer过程对比解析

程序员文章站 2023-11-13 22:28:10
Hadoop Shuffer     Hadoop 的shuffer主要分为两个阶段:Map、Reduce。 Map Shuffer:     这个阶段发生在map阶段之后,数据写入内存之前,在数据写入内存的过程就已经开 ......

hadoop shuffer

    hadoop 的shuffer主要分为两个阶段:map、reduce。

map-shuffer:

    这个阶段发生在map阶段之后,数据写入内存之前,在数据写入内存的过程就已经开始shuffer,通过设置mapreduce.task.io.sort.mb的参数,可改变内存的大小,默认为100m。数据在写入内存大于80%时,会发生溢写spill)过程,将数据整体落地到磁盘,这个过程中默认调用快速排序算法进行排序,否则调用用户自定义的 combiner()方法,将数据按照排序的规则分布在分区。然后进入mapshuffer最后一个阶段merge,当磁盘中某一个分区的文件数量>=3个,自动触发文件合并合并程序,这个过程将一个分区的所有数据进行排序合并成一个文件目录(归并算法),以供reduce抓取。

(k,v,p) :一条数据,其中p是分区号。

reduce-shuffer:

​ 通过拷贝线程copy merge中的数据到reduce端,调用归并算法,生成一个个iterator,再通过分组程序,将同一个key的分组放在一起,聚合为一个iterator。

Hadoop和Spark的Shuffer过程对比解析


spark-shuffer

    spark hashshuffle 是它以前的版本,现在1.6x 版本默应是 sort-based shuffle。有分布式就一定会有 shuffle,而且 hashshuffle 是 spark以前的版本,亦即是 sort-based shuffle 的前身,因为有 hashshuffle 的不足,才会有后续的 sorted-based shuffle,以及现在的 tungsten-sort shuffle。

spark可以基于内存、也可以基于磁盘或者是第三方的储存空间进行计算:

第一、spark框架的架构设计和设计模式上是倾向于在内存中计算数据的。

第二、这也表达了人们对数据处理的一种美好的愿望,就是希望计算数据的时候,数据就在内存中

shuffle 是分布式系统的天敌

    spark 运行分成两部分,第一部分是 driver program,里面的核心是 sparkcontext,它驱动著一个程序的开始,负责指挥,另外一部分是 worker 节点上的 task,它是实际运行任务的,当程序运行时,不间断地由 driver 与所在的进程进行交互,交互什么,有几点,第一、是让你去干什么,第二、是具体告诉 task 数据在那里,例如说有三个 stage,第二个 task 要拿数据,它就会向 driver 要数据,所以在整个工作的过程中,executor 中的 task 会不断地与 driver 进行沟通,这是一个网络传输的过程。

Hadoop和Spark的Shuffer过程对比解析

关于这种架构有几点有用的注意事项:

  1. 每个应用程序都有自己的执行程序进程,这些进程在整个应用程序的持续时间内保持不变并在多个线程中运行任务。这样可以在调度方(每个驱动程序调度自己的任务)和执行方(在不同jvm中运行的不同应用程序中的任务)之间隔离应用程序。但是,这也意味着无法在不将spark应用程序(sparkcontext实例)写入外部存储系统的情况下共享数据。
  2. spark与底层集群管理器无关。只要它可以获取执行程序进程,并且这些进程相互通信,即使在也支持其他应用程序的集群管理器(例如mesos / yarn)上运行它也相对容易。
  3. 驱动程序必须在其生命周期内监听并接受来自其执行程序的传入连接(例如,请参阅)。因此,驱动程序必须是来自工作节点的网络可寻址的。
  4. 因为驱动程序在集群上调度任务,所以它应该靠近工作节点运行,最好是在同一局域网上运行。如果您想远程向群集发送请求,最好向驱动程序打开rpc并让它从附近提交操作,而不是远离工作节点运行驱动程序。

    在这个过程中一方面是 driver 跟 executor 进行网络传输,另一方面是task要从 driver 抓取其他上游的 task 的数据结果,所以有这个过程中就不断的产生网络结果。其中,下一个 stage 向上一个 stage 要数据这个过程,我们就称之为 shuffle。

    每一个节点计算一部份数据,如果不对各个节点上独立的部份进行汇聚的话,我们是计算不到最终的结果。这就是因为我们需要利用分布式来发挥它本身并行计算的能力,而后续又需要计算各节点上最终的结果,所以需要把数据汇聚集中,这就会导致 shuffle,这也是说为什么 shuffle 是分布式不可避免的命运。

原始的 hashshuffle 机制

    基于 mapper 和 reducer 理解的基础上,当 reducer 去抓取数据时,它的 key 到底是怎么分配的,核心思考点是:作为上游数据是怎么去分配给下游数据的。在这张图中你可以看到有4个 task 在2个 executors 上面,它们是并行运行的,hash 本身有一套 hash算法,可以把数据的 key 进行重新分类,每个 task 对数据进行分类然后把它们不同类别的数据先写到本地磁盘,然后再经过网络传输 shuffle,把数据传到下一个 stage 进行汇聚。

hashshuffle 缺点:

  1. shuffle前在磁盘上会产生海量的小文件,此时会产生大量耗时低效的 io 操作 (因為产生过多的小文件)
  2. 内存不够用,由于内存中需要保存海量文件操作句柄和临时信息,如果数据处理的规模比较庞大的话,内存不可承受,会出现 oom 等问题。

优化后的 hashshuffle 机制

Hadoop和Spark的Shuffer过程对比解析

    有4个tasks,数据类别还是分成3种类型,因为hash算法会根据你的 key 进行分类,在同一个进程中,无论是有多少过task,都会把同样的key放在同一个buffer里,然后把buffer中的数据写入以core数量为单位的本地文件中,(一个core只有一种类型的key的数据)每1个task所在的进程中,分别写入共同进程中的3份本地文件,这里有4个mapper tasks,总共输出是 2个cores x 3个分类文件 = 6个本地小文件。consoldiated hash-shuffle的优化有一个很大的好处就是假设现在有200个mapper tasks在同一个进程中,也只会产生3个本地小文件; 如果用原始的 hash-based shuffle 的话,200个mapper tasks 会各自产生3个本地小文件,在一个进程已经产生了600个本地小文件。

这个优化后的 hashshuffle 叫 consolidatedshuffle,在实际生产环境下可以调以下参数:

spark.shuffle.consolidatefiles=true

consolidated hashshuffle 缺点:

  1. 如果 reducer 端的并行任务或者是数据分片过多的话则 core * reducer task 依旧过大,也会产生很多小文件。

shuffle影响spark性能及调优点

    shuffle 不可以避免是因为在分布式系统中的基本点就是把一个很大的的任务/作业分成一百份或者是一千份,这一百份和一千份文件在不同的机器上独自完成各自不同的部份,我们是针对整个作业要结果,所以在后面会进行汇聚,这个汇聚的过程的前一阶段到后一阶段以至网络传输的过程就叫 shuffle。

    在 spark 中为了完成 shuffle 的过程会把真正的一个作业划分为不同的 stage,这个stage 的划分是跟据依赖关系去决定的,shuffle 是整个 spark 中最消耗性能的一个地方。试试想想如果没有 shuffle 的话,spark可以完成一个纯内存式的操作。

reducebykey,它会把每个 key 对应的 value 聚合成一个 value 然后生成新的 rdd。

    因为在不同节点上我们要进行数据传输,数据在通过网络发送之前,要先存储在内存中,内存达到一定的程度,它会写到本地磁盘,(在以前 spark 的版本它没有buffer 的限制,会不断地写入 buffer 然后等内存满了就写入本地,现在的版本对 buffer 多少设定了限制,以防止出现 oom,减少了 io)。mapper 端会写入内存 buffer,这个便关乎到 gc 的问题,然后 mapper端的 block 要写入本地,大量的磁盘与io的操作和磁盘与网络io的操作,这就构成了分布式的性能杀手。

    如果要对最终计算结果进行排序的话,一般会都会进行 sortbykey,如果以最终结果来思考的话,可以认为是产生了一个很大很大的 partition,可以用 reducebykey 的时候指定它的并行度,例如把 reducebykey 的并行度变成为1,新 rdd 的数据切片就变成1,排序一般都会在很多节点上,如果把很多节点变成一个节点然后进行排序,有时候会取得更好的效果,因为数据就在一个节点上,技术层面来讲就只需要在一个进程里进行排序。

可以在调用 reducebykey()接著调用 mappartition( );
也可以用 repartitionandsortwithpartitions( );

    还有一个地方就是数据倾斜,shuffle 时会导政数据分布不均衡。数据倾斜的问题会引申很多其他问题,比如,网络带宽、各重硬件故障、内存过度消耗、文件掉失。因为 shuffle 的过程中会产生大量的磁盘 io、网络 io、以及压缩、解压缩、序列化和反序列化等等

shuffle可能面临的问题,运行 task 的时候才会产生 shuffle (shuffle 已经融化在 spark 的算子中)

  1. 几千台或者是上万台的机器进行汇聚计算,数据量会非常大,网络传输会很大
  2. 数据如何分类其实就是 partition,即如何 partition、hash 、sort 、计算
  3. 负载均衡 (数据倾斜)
  4. 网络传输效率,需要压缩或解压缩之间做出权衡,序列化 和 反序列化也是要考虑的问题

    具体的 task 进行计算的时候尽一切最大可能使得数据具备 process locality 的特性,退而求其次是增加数据分片,减少每个 task 处理的数据量**,基于shuffle 和数据倾斜所导致的一系列问题,可以延伸出很多不同的调优点,比如说:

  • mapper端的 buffer 应该设置为多大呢?
  • reducer端的 buffer 应该设置为多大呢?如果 reducer 太少的话,这会限制了抓取多少数据
  • 在数据传输的过程中是否有压缩以及该用什么方式去压缩,默应是用 snappy 的压缩方式。
  • 网络传输失败重试的次数,每次重试之间间隔多少时间。

总结

    因为想利用分布式的计算能力,所以要把数据分散到不同节点上运行,上游阶段数据是并行运行的,下游阶段要进行汇聚,所以出现shuffle,如果下游分成三类,上游也需要每个task把数据分成三类,虽然有可能有一类是没有数据,这无所谓,只要在实际运行时按照这套规则就可以了,这就是最原始的 shuffle 过程。

    hash-based shuffle 默认mapper 阶段会为reducer 阶段的每一个task单独创建一个文件来保存该task中要使用的数据,但是在一些情况下(例如说数据量非常庞大的情况) 会造成大量文件的随机磁盘io操作且会性成大量的memory消耗(极易造成oom)。

  • 原始的 hash-shuffle 所产生的小文件: mapper 端 task 的个数 x reduce 端 task 的数量
  • consolidated hash-shuffle 所产生的小文件: cpu cores 的个数 x reduce 端 task 的数量

    spark shuffle 说到底都是离不开读文件、写文件、为了高效我们需要缓存,由于有很多不同的进程,就需要一个管理者。hashshuffle 适合的埸景是小数据的埸景,对小规模数据的处理效率会比排序后的 shuffle 高。

    区别在于hadoopshuffer是sort-based,spill内存大小是100m,saprk是hash-based(hash-based故名思义也就是在shuffle的过程中写数据时不做排序操作,只是将数据根据hash的结果,将各个reduce分区的数据写到各自的磁盘文件中),内存大小是32k,hadoop的shuffle过程是明显的几个阶段:map(),spill,merge,shuffle,sort,reduce()等,是按照流程顺次执行的,属于push类型;但是,spark不一样,因为spark的shuffle过程是算子驱动的,具有懒执行的特点,属于pull类型。


参考

spark性能调优- 第二章:彻底解密spark的hashshuffle

hadoop shuffer 和 spark shuffer区别

cluster mode overview