欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark ShuffleMap任务的生成、执行及数据跟踪流程

程序员文章站 2022-06-03 17:57:28
...

RDD抽象类的定义

我们知道Spark中的一个可以用户计算的数据集,被抽象成了一个RDD,如下是RDD的类定义(这里只保留了类中关键的成员变量):

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {
    /** A unique ID for this RDD (within its SparkContext). */
  val id: Int = sc.newRddId()

  /** A friendly name for this RDD */
  @transient var name: String = _
    // Our dependencies and partitions will be gotten by calling subclass's methods below, and will
  // be overwritten when we're checkpointed
  private var dependencies_ : Seq[Dependency[_]] = _
  @transient private var partitions_ : Array[Partition] = _

  private var storageLevel: StorageLevel = StorageLevel.NONE

  private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None

  private val checkpointAllMarkedAncestors =
 Option(sc.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS)).exists(_.toBoolean)

  // From performance concern, cache the value to avoid repeatedly compute `isBarrier()` on a long
  // RDD chain.
  @transient protected lazy val isBarrier_ : Boolean =
    dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, _]]).exists(_.rdd.isBarrier())

  private[spark] final lazy val outputDeterministicLevel: DeterministicLevel.Value = {
    if (isReliablyCheckpointed) {
      DeterministicLevel.DETERMINATE
    } else {
      getOutputDeterministicLevel
    }
  }
}

从上面的代码可以看到RDD在序列化时,仅仅会序列化id、dependencies_、storageLevel、checkpointAllMarkedAncestors、outputDeterministicLevel这些成员变量,当然还包括RD实现类的其它可以被序列化的成员变量。

ShuffledRDD创建

当我们通过RDD API调用带有Shuffle动作的方法时,就会生成这样的实例,partitionBy(…)方法的定义如下:

  /**
   * Return a copy of the RDD partitioned using the specified partitioner.
   */
  def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
    if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    if (self.partitioner == Some(partitioner)) {
      self
    } else {
      new ShuffledRDD[K, V, V](self, partitioner)
    }
  }

ShuffledRDD是整个RDD依赖图(一个有向图)中的一个结点,因此在下面的类定义中,可以看到ShuffledRDD会有一个成员变量prev,它就是前驱RDD的引用,同时注意这个变量是@transient类型的,因此它不能被序列化。
Spark会根据RDD依赖图,生成一系列的任务,对于ShuffledRDD来说,就会对应一个ShuffleMapTask(后面篇章详细说明),通常需要跨结点进行数据交换。
为了在创建任务时,方便解析RDD的依赖关系,RDD抽象类提供了getDenpendencies方法,用于生成RDD的依赖关系类ShuffleDependency,这个关系除了记录当前RDD以及前驱RDD,还记录了当前RDD的数据分区信息part、数据序列化器serializer、数据排序信息keyOrdering、聚合函数aggregator、合并函数mapSideCombine。

class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient var prev: RDD[_ <: Product2[K, V]],
    part: Partitioner)
  extends RDD[(K, C)](prev.context, Nil) {
  override def getDependencies: Seq[Dependency[_]] = {
    val serializer = userSpecifiedSerializer.getOrElse {
      val serializerManager = SparkEnv.get.serializerManager
      if (mapSideCombine) {
        serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[C]])
      } else {
        serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[V]])
      }
    }
    List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
  }
}

ShuffleDependency创建

ShuffleDependency是对一个RDD及其直接父继RDD的关系的描述,下面的代码展示了它的定义。这里主要关心的是ShuffleHandle的生成以及shuffle数据清理逻辑的注册,细节见代码中的注释。

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {

  if (mapSideCombine) {
    require(aggregator.isDefined, "Map-side combine without Aggregator specified!")
  }
  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  // Note: It's possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)
  // 在创建ShuffleDependency实例时,会生成一个在当前用户环境下的唯一ID
  // 以便能够在其它地方通过这个id映射到此实例
  val shuffleId: Int = _rdd.context.newShuffleId()
  // 同时在实例化时,会向当前执行环境中的ShuffleManager中注册自己
  // 实际是返回一个适合的ShuffleHandle,以便在Map端完成数据处理工作时,决定如何
  // 缓存这些数据。
  // BypassMergeSortShuffleHandle:序列化并写出文件
  // SerializedShuffleHandle:序列化缓存内存
  // BaseShuffleHandle:不序列化缓存
  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)
  // 在这里会注册清理方法,当这个ShuffleDependency对应的数据不再使用时,清理掉
  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

ShuffleMapTask创建

当调用RDD的action行为的方法时,会触发DAGScheduler::handleJobSubmitted(…)方法的调用,然后遍历前面讲到的ShuffledRDD依赖图,创建任务ShuffleMapTask和MapTask两种类型的任务,最后提交集群运行。提交任务方法的核心代码如下:

  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    // New stage creation may throw an exception if, for example, jobs are run on a
    // HadoopRDD whose underlying HDFS files have been deleted.
    // 从最后一个RDD,递归生成Stage图,本人其它Spark文章已经有介绍,这里仅简单说明:
    // 这个方法会根据RDD的依赖图,即ShuffleDependency依赖图,生成一个
    // ShuffleMapStage的依赖图,ShuffleDependency依赖图中的每一个结点就是一个
    // ShuffleMapStage
    finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    ...
    // 创建一个ActiveJob实例,也就是我们在WEBUI页面上看到的JOB,每一个
    // 依赖图的都会产生一个新的JOB。
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    ...
    val jobSubmissionTime = clock.getTimeMillis()
    // 缓存ActiveJob信息
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    // 设置finalStage关联的Job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    // 通知所有的事件监听者,有新的任务生成
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    // 提交sage
    submitStage(finalStage)
  }

ShuffleMapStage的创建

createShuffleMapStage是在DAGScheduler中定义的方法用于从当前的ShuffleDenpendency实例递归地构建每一个ShuffleMapStage,同时会将它们注册到各个缓存数据结构中,以便在任务运行时交换状态及数据,其定义如下:

  // 可以看到每一个ShuffleDependency实例就对应一个ShuffleMapStage实例
  def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd
    checkBarrierStageWithDynamicAllocation(rdd)
    checkBarrierStageWithNumSlots(rdd)
    checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
    val numTasks = rdd.partitions.length
    // 递归创建父依赖(ShuffleMapStage)的实例
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ShuffleMapStage(
      id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
	// 缓存当前stage的映射信息
    stageIdToStage(id) = stage
    // 缓存父stage到当前stage的映射信息
    shuffleIdToMapStage(shuffleDep.shuffleId) = stage
    // 将当前stage添加到jobId对应的集合
    updateJobIdStageIdMaps(jobId, stage)

    if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      // 已经处理完当前stage所有父依赖,向MapOutputTracker中注册当前
      // shuffle stage,实际上就是缓存当前stage与ShuffleStatus实例的映射,
      // 而ShuffleStatus用来记录上游rdd即将输出的每一个partion的状态
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    stage
  }

提交Stage图

在前面的过程中,DAGScheduler根据RDD依赖图,最终生成了Stage依赖,如果想要Stage依次执行,就需要将整个信赖图,转换成一个个的Task。而submitStage(stage: Stage)方法,就是从Stage依赖图的最后一个Stage,即FinalStage开始构建不同类型的任务,具体的方法定义如下:

  /** Submits stage, but first recursively submits any missing parents. */
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      // 如果当前stage还没有被执行,同时如果有父Stage且还没准备好,
      // 就递归地、尝试执行构建祖先任务,将提交执行。
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          // 如果发现所有依赖的stage都已经完成了,那么就可以从当前stage开始跑任务啦
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

从当前ShuffleStage构建任务

下面的方法,首先查找当前stage要计算的所有的Partitions信息,然后根据自己的类型分别创建ShuffleMapTask或是ResultTask,并填充任务实例所需要的各种信息,最后在有任务还没跑时,调用TaskScheduler::submitTasks(…)方法提交任务。

  /** Called when stage's parents are available and we can now do its task. */
  private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")

    // First figure out the indexes of partition ids to compute.
    // 找到所有要计算的partition信息
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

    // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
    // with this Stage
    val properties = jobIdToActiveJob(jobId).properties

    runningStages += stage
    // SparkListenerStageSubmitted should be posted before testing whether tasks are
    // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
    // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
    // event.
    stage match {
      case s: ShuffleMapStage =>
        outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
      case s: ResultStage =>
        // 如果是
        outputCommitCoordinator.stageStart(
          stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
    }
    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    } catch {
      case NonFatal(e) =>
        stage.makeNewStageAttempt(partitionsToCompute.size)
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

    stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)

    // If there are tasks to execute, record the submission time of the stage. Otherwise,
    // post the even without the submission time, which indicates that this stage was
    // skipped.
    if (partitionsToCompute.nonEmpty) {
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    }
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

    // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
    // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
    // the serialized copy of the RDD and for each task we will deserialize it, which means each
    // task gets a different copy of the RDD. This provides stronger isolation between tasks that
    // might modify state of objects referenced in their closures. This is necessary in Hadoop
    // where the JobConf/Configuration object is not thread-safe.
    var taskBinary: Broadcast[Array[Byte]] = null
    var partitions: Array[Partition] = null
    try {
      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
      // For ResultTask, serialize and broadcast (rdd, func).
      var taskBinaryBytes: Array[Byte] = null
      // taskBinaryBytes and partitions are both effected by the checkpoint status. We need
      // this synchronization in case another concurrent job is checkpointing this RDD, so we get a
      // consistent view of both variables.
      RDDCheckpointData.synchronized {
        taskBinaryBytes = stage match {
          case stage: ShuffleMapStage =>
            JavaUtils.bufferToArray(
              closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
          case stage: ResultStage =>
            JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
        }

        partitions = stage.rdd.partitions
      }
	  // 序列化任务图,包含当前stage.rdd信息以及依赖关系信息stage.shuffleDep
      taskBinary = sc.broadcast(taskBinaryBytes)
    } catch {
      // In the case of a failure during serialization, abort the stage.
      case e: NotSerializableException =>
        abortStage(stage, "Task not serializable: " + e.toString, Some(e))
        runningStages -= stage

        // Abort execution
        return
      case e: Throwable =>
        abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage

        // Abort execution
        return
    }
    // 创建任务
    val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        // 如果是ShuffleMapStage类型的任务,则创建ShuffleMapTask
        // 对每一个partition,都创建一个任务,因此可以看到ShuffleMapTask会带有
        // stage.id:当前任务属于哪个stage
        // stage.lastInfo.attemptNumber:任务的重试次数
        // taskBinary:任务执行时,需要执行在parition数据集上需要进行的操作
        // part:当前任务要在哪个Partition数据集上的运行
        // locs:任务要读取的parition数据的位置,可以在本地,可能在远端
        // ...
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
          }
	    // 创建ResultTask类型的任务
        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
              stage.rdd.isBarrier())
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

    if (tasks.size > 0) {
      logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
        s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
      // 当前Stage所依赖的所有Paritions,都会生成对应的ShuffleMapTask或是ResultTask,
      // 这些任务又被组织成了TaskSet类,它表示了一组可以并行 计算的任务集合,至于如何分配这些任务
      //到哪些Worker上执行,以及如何管理状态等,都会交给TaskSetManager去完成。
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
    } else {
      // Because we posted SparkListenerStageSubmitted earlier, we should mark
      // the stage as completed here in case there are no tasks to run
      markStageAsFinished(stage, None)

      stage match {
        case stage: ShuffleMapStage =>
          logDebug(s"Stage ${stage} is actually done; " +
              s"(available: ${stage.isAvailable}," +
              s"available outputs: ${stage.numAvailableOutputs}," +
              s"partitions: ${stage.numPartitions})")
          markMapStageJobsAsFinished(stage)
        case stage : ResultStage =>
          logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
      }
      submitWaitingChildStages(stage)
    }
  }

ShuffleMapTask的执行

前面已经讲到,在Driver端,DAGScheduler会遍历RDD依赖图,最终生成一组组的TaskSet,并将由TaskSetManager进行管理,最终每一个ShuffleMapTask会给分配到空闲的Worker上运行,而任务的运行过程定义在ShuffleMapTask中,代码如下:

class ShuffleMapTask {
  override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime
    } else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()
    // 从任务携带的taskBinary广播变量中,反序列化出当前任务对应的RDD,以及依赖信息
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L

    var writer: ShuffleWriter[Any, Any] = null
    try {
      val manager = SparkEnv.get.shuffleManager
      // 根据当前任务指定的shuffle处理器类型,返回一个可用的、绑定到partitionId的writer,
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      // 通过rdd的迭代iterator操作,处理绑定的parition数据,此RDD可能包含了多个流水线操作,
      // 例如有依赖如下(其中<--表示右边依赖左边):
      // ShuffledRDD <-- MapParitionsRDD <-- MapParitionsRDD <-- current_rdd
      // 因此rdd.iterator在迭代时,会以递归的形式,从最左边的ShuffledRDD开始,通过
      // ShuffleManager远程读取数据,然后再今次执行后面的两个MapParitionsRDD上绑定的处理函数,
      // 最终由当前RDD,即ShuffledRDD类型,根据dep.shuffleHandle类型,将处理后的Parition
      // 数据输出到指定位置。
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      writer.stop(success = true).get
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  }
}

总结

从RDD依赖图到任务执行的流程简化如下(其中<-- 表示右边依赖左边的完成):

调用RDD或DataFrame API,生成一个RDD依赖图,例如df.read().filter(…).partitionBy(…).write(),
将生成ShuffledRDD <-- MapPartitionsRDD <-- ShuffledRDD <-- ShuffledRDD

通过DAGScheduler,生成依赖图:
ShuffleDependency <-- ShuffleDependency <-- ShuffleDependency

通过DAGScheduler,生成Stage依赖图
ShuffleMapStage <-- ShuffleMapStage <-- ResultStage

通过DAGScheduler,生成任务集TaskSet
ShuffleMapTask_1 <-- ShuffleMapTask_2 <-- ResultTask

通过TaskSetManager管理任务,将任务发送到Worker端执行

在Worker上执行ShuffleMapTask_1

在Worker上执行ShuffleMapTask_2

在Driver上执行ResultTask

相关标签: Spark spark scala