欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark源码系列(四) DAGScheduler源码分析与Task最佳位置算法源码

程序员文章站 2024-02-23 09:21:40
...

Spark源码系列(四) DAGScheduler源码分析与Task最佳位置算法源码

在Spark源码系列(三)中已经说到了DAGScheduler中的stage划分算法。每当执行到RDD的action算子时就会触发runJob方法,代码逻辑最后去调用DAGScheduler的runJob方法,最后会走到DAGScheduler的handleJobSubmitted方法,这个方法内部会根据当前的RDD创建一个ResultStage,然后根据这个ResultStage对象创建一个Job。再将这个Stage对象传入submitStage方法,这个方法内部会调用一些其他方法,会根据当前stage中的RDD的Dependency向前推,依据RDD之间的Dependency,碰到ShuffleDependency就创建一个新的stage,NarrowDependency就将当前RDD加入到当前Stage中,一直到所有RDD都遍历完毕,所有stage也就划分完了。

这里贴一张经典的图:

Spark源码系列(四) DAGScheduler源码分析与Task最佳位置算法源码

窄依赖:指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对于一个子RDD的分区,和两个父RDD的分区对应于一个子RDD的分区。

宽依赖:指子RDD的分区依赖于父RDD的所有分区,这是因为shuffle操作,如图中的groupByKey和未经协同划分的join。

划分Stage是以shuffle和result两种类型来划分。在Spark中有两类task,一类是shuffleMapTask,一类是resultTask。

第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也是以此为依据。shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage。比如rdd.parallize(1 to 10).foreach(println),算子没有涉及到shuffle,因此直接输出,task也就只有resultTask,stage只有一个;如果rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println),这个job因为有reduceBykey算子,因此就会有shuffle过程,在reduceBykey之前的是一个stage,执行shuffleMapTask,输出shuffle所需的数据,reduceBykey到最后是一个stage,直接就输出结果。如果job中有多次shuffle,那么每个shuffle之前都是一个stage。

下面我们以一个具体例子进行Stage划分的说明:

Spark源码系列(四) DAGScheduler源码分析与Task最佳位置算法源码

图中的final RDD为RDD G,是触发Job执行的finalRDD。首先RDD G会通过getMissingParentStages方法判断RDD G是否含有父Stage。getMissingParentStages方法中先创建了一个存放RDD的栈。

val waitingForVisit = new Stack[RDD[_]]

接着向Stack中推入RDD G

//首先往栈中,推入了stage最后的一个rdd  finalRDD
    waitingForVisit.push(stage.rdd)

由于栈中推入了RDD G,因此判断不为空,弹出RDD G并调用内部visit方法

/进行while循环
    while (waitingForVisit.nonEmpty) {
      // 对stage的最后一个rdd,调用自己内部定义的visit()方法
      visit(waitingForVisit.pop())
    }

判断rdd是否被访问过,若符合条件,先讲该RDD添加到已访问,继续向下执行

if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)

下面我们需要判断RDD G的Dependency是宽依赖还是窄依赖。从上图中可以看到,RDD B和RDD G是窄依赖,将RDD B放入栈中,RDD F和RDD G是宽依赖。于是调用getOrCreateShuffleMapStage方法创建一个Stage,RDD G和RDD F被划分成了两个Stage,将RDD G所在Stage划分成了Stage 3,将RDD F所在的Stage划分成了Stage 2,返回一个list[Stage]。

继续从RDD B开始,RDD B向前找到它的父RDD,父RDD A与RDD B是宽依赖关系,所以RDD A与RDD B中间也创建了一个Stage,RDD A与RDD B被划分成了两个Stage,RDD A被划分成Stage 1。RDD F与它的父RDD D/E都是窄依赖关系,不产生新的Stage,RDD D与RDD C也是窄依赖关系,也不产生新的Stage,所以RDD C、D、E、F都在Stage 2中。

if (rddHasUncachedPartitions) {
          for (dep <- rdd.dependencies) {
            dep match {
                //如果是宽依赖
              case shufDep: ShuffleDependency[_, _, _] =>
                // 那么使用宽依赖的那个RDD,使用getOrCreateShuffleMapStage()方法去创建一个stage
                // 默认最后一个stage,不是shuffleMap stage
                // 但是finalStage之前所有的stage,都是shuffleMap Stage
                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
                //如果是窄依赖,将rdd放入栈中
              case narrowDep: NarrowDependency[_] =>
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }

其实简单来讲Stage划分算法核心思想很简单:遇到ShuffleDependency就断开,划分为一个Stage;遇到NarrowDependency就把对应的RDD加入该stage中。

Task最佳位置计算

在前面的submitStage方法中会找到划分出的stage中的第一个stage,然后调用submitMissingTasks方法。

if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          // 找到第一个 stage 去调用 submitMissingTasks 方法
          submitMissingTasks(stage, jobId.get)
        }

submitMissingTasks方法中的功能逻辑:

  1. 拿到stage中没有计算的partition
  2. 获取task对应的partition的最佳位置(核心算法)
  3. 获取taskBinary,将stage的RDD和ShuffleDependency广播到Executor
  4. 为stage创建task

重点分析如何分配task到最优的partition上,也就是计算partitionId和taskId的对应关系。

// 计算 taskId 和 partition 的对应关系	
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        // 如果是 ShuffleMapStage
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        // 如果是 ResultStage
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    } catch {
      case NonFatal(e) =>
        stage.makeNewStageAttempt(partitionsToCompute.size)
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

在taskIdToLocations方法中可以看到,无论是ShuffleMapStage还是ResultStage,都是每个partition调用getPreferredLocs方法,实际上是调用了getPreferredLocsInternal方法。

private[spark]
  def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
    getPreferredLocsInternal(rdd, partition, new HashSet)
  }

在getPreferredLocsInternal方法中依次做了如下逻辑判断:

  1. 通过HashSet判断rdd的partition是否被操作过,如果已被操作过就返回Nil,任何操作也不执行
  2. 通过getCacheLocs方法查看当前RDD的partition的最佳计算位置是否被缓存过,如果有被缓存过直接返回对应的缓存位置
  3. 如果没有缓存,就调用RDD的preferredLocations去计算最佳位置,实际上就是看看当前RDD是否被checkpoint,如果有就返回checkpoint的位置
  4. 如果当前RDD既没有被缓存又没有checkpoint的话,就去遍历RDD的依赖链,如果有窄依赖,就去遍历父RDD的所有partition,递归调用getPreferredLocsInternal方法。

这里实际上就是找出当前stage是否存在某个RDD被缓存或者checkpoint了,如果有就返回其缓存或者checkpoint的位置,添加到序列中,然后返回。如果当前stage中的所有RDD都没有被缓存或者checkpoint的话,那么task的最佳计算位置就返回Nil。

private def getPreferredLocsInternal(
      rdd: RDD[_],
      partition: Int,
      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
    // 如果这个 rdd 的 partition 已经计算过了位置了就忽略
  	// 因为这个方法是被递归调用的
    if (!visited.add((rdd, partition))) {
      // Nil has already been returned for previously visited partitions.
      return Nil
    }
    // 如果这个 partition 被缓存过就返回缓存的位置
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }
    // 调用 RDD 内部的 preferredLocations 方法去找最佳计算位置,实际上内部是看当前
    // RDD 是否 checkpoint 了,如果做了 checkpoint 就会返回 checkpoint 的位置
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.map(TaskLocation(_))
    }

    /**
    * 如果该 RDD 既没有没缓存有没有 checkpoint 的话那么就会去遍历他的依赖链,发现是窄依赖的时候
    * 去就去递归调用 getPreferredLocsInternal 去看看该 RDD 是否被缓存或者 checkpoint 了。如果
    * 是,就返回缓存或者 checkpoint 的位置。如果一直没找到的话就返回 Nil
    **/
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) {
          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
          if (locs != Nil) {
            return locs
          }
        }

      case _ =>
    }

    Nil
  }

当获取到task的最佳位置后,根据stage的类型进行匹配,为每个partition的数据创建一个task,如果是ShuffleMapStage就创建ShuffleMapTask,如果是ResultStage就创建ResultTask。最后将整个stage创建的所有task都放到一个Seq中。创建task的过程会将每个task前面计算出来的locations、taskBinary等参数一并放到参数中去。

val tasks: Seq[Task[_]] = try {
      stage match {
        // 如果是 ShuffleMapStage
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = stage.rdd.partitions(id)
            // new 一个ShuffleMapTask
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId)
          }

        // 如果是 ResultStage
        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = stage.rdd.partitions(p)
            val locs = taskIdToLocations(id)
            // new 一个ResultTask
            new ResultTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

创建好该stage的tasks后,判断tasks的长度大于0,会为这些task创建一个TaskSet,然后通过TaskScheduler调用submitTasks方法,提交TaskSet给TaskScheduler。

如果tasks.size小于等于0,会将当前stage标记完成,然后调用submitWaitingChildStages(stage)方法,提交当前stage的子stage。这样Stage的TaskSet已经提交给TaskScheduler。后面的文章就是要分析TaskScheduler如何对Task进行调度处理。