Spark ShuffleMap任务的生成、执行及数据跟踪流程
RDD抽象类的定义
我们知道Spark中的一个可以用户计算的数据集,被抽象成了一个RDD,如下是RDD的类定义(这里只保留了类中关键的成员变量):
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
/** A unique ID for this RDD (within its SparkContext). */
val id: Int = sc.newRddId()
/** A friendly name for this RDD */
@transient var name: String = _
// Our dependencies and partitions will be gotten by calling subclass's methods below, and will
// be overwritten when we're checkpointed
private var dependencies_ : Seq[Dependency[_]] = _
@transient private var partitions_ : Array[Partition] = _
private var storageLevel: StorageLevel = StorageLevel.NONE
private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
private val checkpointAllMarkedAncestors =
Option(sc.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS)).exists(_.toBoolean)
// From performance concern, cache the value to avoid repeatedly compute `isBarrier()` on a long
// RDD chain.
@transient protected lazy val isBarrier_ : Boolean =
dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, _]]).exists(_.rdd.isBarrier())
private[spark] final lazy val outputDeterministicLevel: DeterministicLevel.Value = {
if (isReliablyCheckpointed) {
DeterministicLevel.DETERMINATE
} else {
getOutputDeterministicLevel
}
}
}
从上面的代码可以看到RDD在序列化时,仅仅会序列化id、dependencies_、storageLevel、checkpointAllMarkedAncestors、outputDeterministicLevel这些成员变量,当然还包括RD实现类的其它可以被序列化的成员变量。
ShuffledRDD创建
当我们通过RDD API调用带有Shuffle动作的方法时,就会生成这样的实例,partitionBy(…)方法的定义如下:
/**
* Return a copy of the RDD partitioned using the specified partitioner.
*/
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
if (self.partitioner == Some(partitioner)) {
self
} else {
new ShuffledRDD[K, V, V](self, partitioner)
}
}
ShuffledRDD是整个RDD依赖图(一个有向图)中的一个结点,因此在下面的类定义中,可以看到ShuffledRDD会有一个成员变量prev,它就是前驱RDD的引用,同时注意这个变量是@transient类型的,因此它不能被序列化。
Spark会根据RDD依赖图,生成一系列的任务,对于ShuffledRDD来说,就会对应一个ShuffleMapTask(后面篇章详细说明),通常需要跨结点进行数据交换。
为了在创建任务时,方便解析RDD的依赖关系,RDD抽象类提供了getDenpendencies方法,用于生成RDD的依赖关系类ShuffleDependency,这个关系除了记录当前RDD以及前驱RDD,还记录了当前RDD的数据分区信息part、数据序列化器serializer、数据排序信息keyOrdering、聚合函数aggregator、合并函数mapSideCombine。
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
@transient var prev: RDD[_ <: Product2[K, V]],
part: Partitioner)
extends RDD[(K, C)](prev.context, Nil) {
override def getDependencies: Seq[Dependency[_]] = {
val serializer = userSpecifiedSerializer.getOrElse {
val serializerManager = SparkEnv.get.serializerManager
if (mapSideCombine) {
serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[C]])
} else {
serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[V]])
}
}
List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
}
}
ShuffleDependency创建
ShuffleDependency是对一个RDD及其直接父继RDD的关系的描述,下面的代码展示了它的定义。这里主要关心的是ShuffleHandle的生成以及shuffle数据清理逻辑的注册,细节见代码中的注释。
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
if (mapSideCombine) {
require(aggregator.isDefined, "Map-side combine without Aggregator specified!")
}
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)
// 在创建ShuffleDependency实例时,会生成一个在当前用户环境下的唯一ID
// 以便能够在其它地方通过这个id映射到此实例
val shuffleId: Int = _rdd.context.newShuffleId()
// 同时在实例化时,会向当前执行环境中的ShuffleManager中注册自己
// 实际是返回一个适合的ShuffleHandle,以便在Map端完成数据处理工作时,决定如何
// 缓存这些数据。
// BypassMergeSortShuffleHandle:序列化并写出文件
// SerializedShuffleHandle:序列化缓存内存
// BaseShuffleHandle:不序列化缓存
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
// 在这里会注册清理方法,当这个ShuffleDependency对应的数据不再使用时,清理掉
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
ShuffleMapTask创建
当调用RDD的action行为的方法时,会触发DAGScheduler::handleJobSubmitted(…)方法的调用,然后遍历前面讲到的ShuffledRDD依赖图,创建任务ShuffleMapTask和MapTask两种类型的任务,最后提交集群运行。提交任务方法的核心代码如下:
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
// 从最后一个RDD,递归生成Stage图,本人其它Spark文章已经有介绍,这里仅简单说明:
// 这个方法会根据RDD的依赖图,即ShuffleDependency依赖图,生成一个
// ShuffleMapStage的依赖图,ShuffleDependency依赖图中的每一个结点就是一个
// ShuffleMapStage
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
...
// 创建一个ActiveJob实例,也就是我们在WEBUI页面上看到的JOB,每一个
// 依赖图的都会产生一个新的JOB。
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
...
val jobSubmissionTime = clock.getTimeMillis()
// 缓存ActiveJob信息
jobIdToActiveJob(jobId) = job
activeJobs += job
// 设置finalStage关联的Job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
// 通知所有的事件监听者,有新的任务生成
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
// 提交sage
submitStage(finalStage)
}
ShuffleMapStage的创建
createShuffleMapStage是在DAGScheduler中定义的方法用于从当前的ShuffleDenpendency实例递归地构建每一个ShuffleMapStage,同时会将它们注册到各个缓存数据结构中,以便在任务运行时交换状态及数据,其定义如下:
// 可以看到每一个ShuffleDependency实例就对应一个ShuffleMapStage实例
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
val numTasks = rdd.partitions.length
// 递归创建父依赖(ShuffleMapStage)的实例
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ShuffleMapStage(
id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
// 缓存当前stage的映射信息
stageIdToStage(id) = stage
// 缓存父stage到当前stage的映射信息
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
// 将当前stage添加到jobId对应的集合
updateJobIdStageIdMaps(jobId, stage)
if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
// 已经处理完当前stage所有父依赖,向MapOutputTracker中注册当前
// shuffle stage,实际上就是缓存当前stage与ShuffleStatus实例的映射,
// 而ShuffleStatus用来记录上游rdd即将输出的每一个partion的状态
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}
提交Stage图
在前面的过程中,DAGScheduler根据RDD依赖图,最终生成了Stage依赖,如果想要Stage依次执行,就需要将整个信赖图,转换成一个个的Task。而submitStage(stage: Stage)方法,就是从Stage依赖图的最后一个Stage,即FinalStage开始构建不同类型的任务,具体的方法定义如下:
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
// 如果当前stage还没有被执行,同时如果有父Stage且还没准备好,
// 就递归地、尝试执行构建祖先任务,将提交执行。
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
// 如果发现所有依赖的stage都已经完成了,那么就可以从当前stage开始跑任务啦
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
从当前ShuffleStage构建任务
下面的方法,首先查找当前stage要计算的所有的Partitions信息,然后根据自己的类型分别创建ShuffleMapTask或是ResultTask,并填充任务实例所需要的各种信息,最后在有任务还没跑时,调用TaskScheduler::submitTasks(…)方法提交任务。
/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// First figure out the indexes of partition ids to compute.
// 找到所有要计算的partition信息
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
val properties = jobIdToActiveJob(jobId).properties
runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
// 如果是
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
// If there are tasks to execute, record the submission time of the stage. Otherwise,
// post the even without the submission time, which indicates that this stage was
// skipped.
if (partitionsToCompute.nonEmpty) {
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
}
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
// the serialized copy of the RDD and for each task we will deserialize it, which means each
// task gets a different copy of the RDD. This provides stronger isolation between tasks that
// might modify state of objects referenced in their closures. This is necessary in Hadoop
// where the JobConf/Configuration object is not thread-safe.
var taskBinary: Broadcast[Array[Byte]] = null
var partitions: Array[Partition] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
var taskBinaryBytes: Array[Byte] = null
// taskBinaryBytes and partitions are both effected by the checkpoint status. We need
// this synchronization in case another concurrent job is checkpointing this RDD, so we get a
// consistent view of both variables.
RDDCheckpointData.synchronized {
taskBinaryBytes = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
partitions = stage.rdd.partitions
}
// 序列化任务图,包含当前stage.rdd信息以及依赖关系信息stage.shuffleDep
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
// In the case of a failure during serialization, abort the stage.
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString, Some(e))
runningStages -= stage
// Abort execution
return
case e: Throwable =>
abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
// Abort execution
return
}
// 创建任务
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
// 如果是ShuffleMapStage类型的任务,则创建ShuffleMapTask
// 对每一个partition,都创建一个任务,因此可以看到ShuffleMapTask会带有
// stage.id:当前任务属于哪个stage
// stage.lastInfo.attemptNumber:任务的重试次数
// taskBinary:任务执行时,需要执行在parition数据集上需要进行的操作
// part:当前任务要在哪个Partition数据集上的运行
// locs:任务要读取的parition数据的位置,可以在本地,可能在远端
// ...
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
// 创建ResultTask类型的任务
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
if (tasks.size > 0) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
// 当前Stage所依赖的所有Paritions,都会生成对应的ShuffleMapTask或是ResultTask,
// 这些任务又被组织成了TaskSet类,它表示了一组可以并行 计算的任务集合,至于如何分配这些任务
//到哪些Worker上执行,以及如何管理状态等,都会交给TaskSetManager去完成。
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)
stage match {
case stage: ShuffleMapStage =>
logDebug(s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})")
markMapStageJobsAsFinished(stage)
case stage : ResultStage =>
logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
}
submitWaitingChildStages(stage)
}
}
ShuffleMapTask的执行
前面已经讲到,在Driver端,DAGScheduler会遍历RDD依赖图,最终生成一组组的TaskSet,并将由TaskSetManager进行管理,最终每一个ShuffleMapTask会给分配到空闲的Worker上运行,而任务的运行过程定义在ShuffleMapTask中,代码如下:
class ShuffleMapTask {
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
// 从任务携带的taskBinary广播变量中,反序列化出当前任务对应的RDD,以及依赖信息
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
// 根据当前任务指定的shuffle处理器类型,返回一个可用的、绑定到partitionId的writer,
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
// 通过rdd的迭代iterator操作,处理绑定的parition数据,此RDD可能包含了多个流水线操作,
// 例如有依赖如下(其中<--表示右边依赖左边):
// ShuffledRDD <-- MapParitionsRDD <-- MapParitionsRDD <-- current_rdd
// 因此rdd.iterator在迭代时,会以递归的形式,从最左边的ShuffledRDD开始,通过
// ShuffleManager远程读取数据,然后再今次执行后面的两个MapParitionsRDD上绑定的处理函数,
// 最终由当前RDD,即ShuffledRDD类型,根据dep.shuffleHandle类型,将处理后的Parition
// 数据输出到指定位置。
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
}
总结
从RDD依赖图到任务执行的流程简化如下(其中<-- 表示右边依赖左边的完成):
调用RDD或DataFrame API,生成一个RDD依赖图,例如df.read().filter(…).partitionBy(…).write(),
将生成ShuffledRDD <-- MapPartitionsRDD <-- ShuffledRDD <-- ShuffledRDD
通过DAGScheduler,生成依赖图:
ShuffleDependency <-- ShuffleDependency <-- ShuffleDependency
通过DAGScheduler,生成Stage依赖图
ShuffleMapStage <-- ShuffleMapStage <-- ResultStage
通过DAGScheduler,生成任务集TaskSet
ShuffleMapTask_1 <-- ShuffleMapTask_2 <-- ResultTask
通过TaskSetManager管理任务,将任务发送到Worker端执行
在Worker上执行ShuffleMapTask_1
在Worker上执行ShuffleMapTask_2
在Driver上执行ResultTask
上一篇: 蕨菜保鲜的保存方法有哪些