Spark源码系列(四) DAGScheduler源码分析与Task最佳位置算法源码
Spark源码系列(四) DAGScheduler源码分析与Task最佳位置算法源码
在Spark源码系列(三)中已经说到了DAGScheduler中的stage划分算法。每当执行到RDD的action算子时就会触发runJob方法,代码逻辑最后去调用DAGScheduler的runJob方法,最后会走到DAGScheduler的handleJobSubmitted方法,这个方法内部会根据当前的RDD创建一个ResultStage,然后根据这个ResultStage对象创建一个Job。再将这个Stage对象传入submitStage方法,这个方法内部会调用一些其他方法,会根据当前stage中的RDD的Dependency向前推,依据RDD之间的Dependency,碰到ShuffleDependency就创建一个新的stage,NarrowDependency就将当前RDD加入到当前Stage中,一直到所有RDD都遍历完毕,所有stage也就划分完了。
这里贴一张经典的图:
窄依赖:指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对于一个子RDD的分区,和两个父RDD的分区对应于一个子RDD的分区。
宽依赖:指子RDD的分区依赖于父RDD的所有分区,这是因为shuffle操作,如图中的groupByKey和未经协同划分的join。
划分Stage是以shuffle和result两种类型来划分。在Spark中有两类task,一类是shuffleMapTask,一类是resultTask。
第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也是以此为依据。shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage。比如rdd.parallize(1 to 10).foreach(println),算子没有涉及到shuffle,因此直接输出,task也就只有resultTask,stage只有一个;如果rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println),这个job因为有reduceBykey算子,因此就会有shuffle过程,在reduceBykey之前的是一个stage,执行shuffleMapTask,输出shuffle所需的数据,reduceBykey到最后是一个stage,直接就输出结果。如果job中有多次shuffle,那么每个shuffle之前都是一个stage。
下面我们以一个具体例子进行Stage划分的说明:
图中的final RDD为RDD G,是触发Job执行的finalRDD。首先RDD G会通过getMissingParentStages方法判断RDD G是否含有父Stage。getMissingParentStages方法中先创建了一个存放RDD的栈。
val waitingForVisit = new Stack[RDD[_]]
接着向Stack中推入RDD G
//首先往栈中,推入了stage最后的一个rdd finalRDD
waitingForVisit.push(stage.rdd)
由于栈中推入了RDD G,因此判断不为空,弹出RDD G并调用内部visit方法
/进行while循环
while (waitingForVisit.nonEmpty) {
// 对stage的最后一个rdd,调用自己内部定义的visit()方法
visit(waitingForVisit.pop())
}
判断rdd是否被访问过,若符合条件,先讲该RDD添加到已访问,继续向下执行
if (!visited(rdd)) {
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
下面我们需要判断RDD G的Dependency是宽依赖还是窄依赖。从上图中可以看到,RDD B和RDD G是窄依赖,将RDD B放入栈中,RDD F和RDD G是宽依赖。于是调用getOrCreateShuffleMapStage方法创建一个Stage,RDD G和RDD F被划分成了两个Stage,将RDD G所在Stage划分成了Stage 3,将RDD F所在的Stage划分成了Stage 2,返回一个list[Stage]。
继续从RDD B开始,RDD B向前找到它的父RDD,父RDD A与RDD B是宽依赖关系,所以RDD A与RDD B中间也创建了一个Stage,RDD A与RDD B被划分成了两个Stage,RDD A被划分成Stage 1。RDD F与它的父RDD D/E都是窄依赖关系,不产生新的Stage,RDD D与RDD C也是窄依赖关系,也不产生新的Stage,所以RDD C、D、E、F都在Stage 2中。
if (rddHasUncachedPartitions) {
for (dep <- rdd.dependencies) {
dep match {
//如果是宽依赖
case shufDep: ShuffleDependency[_, _, _] =>
// 那么使用宽依赖的那个RDD,使用getOrCreateShuffleMapStage()方法去创建一个stage
// 默认最后一个stage,不是shuffleMap stage
// 但是finalStage之前所有的stage,都是shuffleMap Stage
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
//如果是窄依赖,将rdd放入栈中
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
}
}
}
其实简单来讲Stage划分算法核心思想很简单:遇到ShuffleDependency就断开,划分为一个Stage;遇到NarrowDependency就把对应的RDD加入该stage中。
Task最佳位置计算
在前面的submitStage方法中会找到划分出的stage中的第一个stage,然后调用submitMissingTasks方法。
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
// 找到第一个 stage 去调用 submitMissingTasks 方法
submitMissingTasks(stage, jobId.get)
}
submitMissingTasks方法中的功能逻辑:
- 拿到stage中没有计算的partition
- 获取task对应的partition的最佳位置(核心算法)
- 获取taskBinary,将stage的RDD和ShuffleDependency广播到Executor
- 为stage创建task
重点分析如何分配task到最优的partition上,也就是计算partitionId和taskId的对应关系。
// 计算 taskId 和 partition 的对应关系
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
// 如果是 ShuffleMapStage
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
// 如果是 ResultStage
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
在taskIdToLocations方法中可以看到,无论是ShuffleMapStage还是ResultStage,都是每个partition调用getPreferredLocs方法,实际上是调用了getPreferredLocsInternal方法。
private[spark]
def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
getPreferredLocsInternal(rdd, partition, new HashSet)
}
在getPreferredLocsInternal方法中依次做了如下逻辑判断:
- 通过HashSet判断rdd的partition是否被操作过,如果已被操作过就返回Nil,任何操作也不执行
- 通过getCacheLocs方法查看当前RDD的partition的最佳计算位置是否被缓存过,如果有被缓存过直接返回对应的缓存位置
- 如果没有缓存,就调用RDD的preferredLocations去计算最佳位置,实际上就是看看当前RDD是否被checkpoint,如果有就返回checkpoint的位置
- 如果当前RDD既没有被缓存又没有checkpoint的话,就去遍历RDD的依赖链,如果有窄依赖,就去遍历父RDD的所有partition,递归调用getPreferredLocsInternal方法。
这里实际上就是找出当前stage是否存在某个RDD被缓存或者checkpoint了,如果有就返回其缓存或者checkpoint的位置,添加到序列中,然后返回。如果当前stage中的所有RDD都没有被缓存或者checkpoint的话,那么task的最佳计算位置就返回Nil。
private def getPreferredLocsInternal(
rdd: RDD[_],
partition: Int,
visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
// 如果这个 rdd 的 partition 已经计算过了位置了就忽略
// 因为这个方法是被递归调用的
if (!visited.add((rdd, partition))) {
// Nil has already been returned for previously visited partitions.
return Nil
}
// 如果这个 partition 被缓存过就返回缓存的位置
val cached = getCacheLocs(rdd)(partition)
if (cached.nonEmpty) {
return cached
}
// 调用 RDD 内部的 preferredLocations 方法去找最佳计算位置,实际上内部是看当前
// RDD 是否 checkpoint 了,如果做了 checkpoint 就会返回 checkpoint 的位置
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
/**
* 如果该 RDD 既没有没缓存有没有 checkpoint 的话那么就会去遍历他的依赖链,发现是窄依赖的时候
* 去就去递归调用 getPreferredLocsInternal 去看看该 RDD 是否被缓存或者 checkpoint 了。如果
* 是,就返回缓存或者 checkpoint 的位置。如果一直没找到的话就返回 Nil
**/
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
case _ =>
}
Nil
}
当获取到task的最佳位置后,根据stage的类型进行匹配,为每个partition的数据创建一个task,如果是ShuffleMapStage就创建ShuffleMapTask,如果是ResultStage就创建ResultTask。最后将整个stage创建的所有task都放到一个Seq中。创建task的过程会将每个task前面计算出来的locations、taskBinary等参数一并放到参数中去。
val tasks: Seq[Task[_]] = try {
stage match {
// 如果是 ShuffleMapStage
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
// new 一个ShuffleMapTask
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
// 如果是 ResultStage
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
// new 一个ResultTask
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
创建好该stage的tasks后,判断tasks的长度大于0,会为这些task创建一个TaskSet,然后通过TaskScheduler调用submitTasks方法,提交TaskSet给TaskScheduler。
如果tasks.size小于等于0,会将当前stage标记完成,然后调用submitWaitingChildStages(stage)方法,提交当前stage的子stage。这样Stage的TaskSet已经提交给TaskScheduler。后面的文章就是要分析TaskScheduler如何对Task进行调度处理。