欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark性能调优系列一:Spark的作业模型

程序员文章站 2024-03-15 11:31:41
...

Job

Spark的整个作业体系中,处于顶层的是Job, Job和Spark中的Action是一一对应的,每一个Action都会触发一个Job的执行,这个Job包含的处理逻辑是Action以及Action之前的所有Transformation, 所有这些逻辑会被Spark转换成一张关于RDD的DAG(有向无环图),这个DAG也就是实际意义上的Job的执行计划。本文原文出处: 本文原文链接: http://blog.csdn.net/bluishglc/article/details/80653801 转载请注明出处。

Stage

Job的下一个层级是Stage,一个Stage是由一群task组成的,这些task执行的是完全一样的代码,分散到多个executor上执行。Stage对标的是shuffle, 或者叫宽依赖的transformation, 也就是每当在Job执行中遇到一个需要shuffle的transformation时,就将这个transformation和它之前的所有动作放到一个Stage里(当然,如果只action,自然也会触发一个stage),这样的安排是一种很自然的处理方式,究其本质,在一系列的分布式计算中,当遇到shuffle之前,所有的操作都可以在一个单一分区里独立完成,并不需要依赖到其他的分区(例如map,filter这类操作),这些操作可以放在一个流水线(pipline)中执行,这样有助于性能优化,而一但作业中遇到一个需要基于全体数据集(spark称之为all-to-all 操作)进行计算的操作(例如groupByKey, reduceByKey),就需要进行shuffle, shuffle会对全体数据重新洗牌并分区,所以后续的操作不可能和前面的操作放在同一个流水线上,所以每个stage是以一个shuffle来终结的。这里需要对前面提到的transformation的窄依赖和宽依赖进行一个说明:

  • 窄依赖指的这个计算在每一个partition上都只依赖于partition自有的数据就可以完成,再直白地解释就是如果某种操作在每一个局部(each partition)做完就相当于全集做完,这就是窄依赖,例如map操作。
  • 宽依赖指的这个计算只有在数据全集(all partitions)上执行才能得到结果,例如groupByKey等。

Task

Task是Spark执行作业的最小单位,一个Task只对应一个Partition, 前面提到过,对于同一个RDD下的某个transformation,它有多少个partition就会对应生成多少个task, 这些task执行的是完全一样的代码。所以控制一个Stage会产生多少个Task的唯一因素就是Spark配置了多个个partition,这个配置参数是spark.sql.shuffle.partitions, 它的默认值是200,也就是默认一个stage总是产生200个task,对于一些只有一个节点的小集群来说,这个数值是比较大的,在性能调优时往往会调小。

其他:Slot & Batch

既然Task是一个作业被切分成的最小粒度,那也就是执行的最小单位。我们可以把Executor的JVM看成是一个用于执行Task的Pool!如果一个Task只占用一个Core/Thread, 那么所谓的slot就是这个Executor的核数!但是spark中有一个参数:spark.task.cpus,用来控制分配给每一个Task的核数,这样,对于一个Spark集群来说,可以用来执行Task的可用slot数量就是

slots = spark.num.executors * spark.executor.cores / spark.task.cpus

所以宏观上看,slot才是Spark的并行能力,假定一个集群有4个executor,每个executor有8个core, 每个task分配一个core, 则同一时刻,这个集群可以并行处理4*8=32个task, 假设在某个时刻只有一个作业在执行,而这个作业正在运行的stage由200个task组成,那么这个作业要分成200/32+1=7个batch才能执行完成!

一个“立体”的视图

需要注意的一点是Job->Stage和Stage->Task的切分的维度是不太一样的,如果我们把一个作业涉及到的一系列的操作(transformation和action)看成是横向排列的,那么Stage就是对作业的“垂直”切分,分解的依据就是遇到需要shuffle的transformaion或action就切一刀。而Stage->Task的切分则是一种“水平”切分了,每一个task要执行的操作就是Stage圈定的所有的操作,只是一个task基于一个partition的数据去执行这些操作,如果Spark配置的是200个分区,则一个Stage就会被“水平”切分成200个Task去并行的执行!

所以下图展示了本文自定义的“垂直”与“水平”切分的定义,这样,我们可以清晰地认识到Spark是如何将一系列连续的数据操作在一个分布式环境上“拆解”成了一个操作的“矩阵”,垂直方向上是可以在单一partition上连续执行的操作,水平方向是可以并行处理的多个分区。

Spark性能调优系列一:Spark的作业模型

注:上图由两张图拼接面来,一张是来自:https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-taskscheduler-tasks.html, 另一张来自:http://litaotao.github.io/deep-into-spark-exection-model