【Spark八十四】Spark零碎知识点记录
程序员文章站
2024-01-29 21:09:40
...
1. ShuffleMapTask的shuffle数据在什么地方记录到MapOutputTracker中的
ShuffleMapTask的runTask方法负责写数据到shuffle map文件中。当任务执行完成成功,DAGScheduler会收到通知,在DAGScheduler的handleTaskCompletion方法中完成记录到MapOutputTracker中
event.reason match {
case Success =>
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, event.taskMetrics))
stage.pendingTasks -= task
task match {
case rt: ResultTask[_, _] =>
stage.resultOfJob match {
case Some(job) =>
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
job.finished(rt.outputId) = true
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
markStageAsFinished(stage)
cleanupStateForJobAndIndependentStages(job)
listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
}
// taskSucceeded runs some user code that might throw an exception. Make sure
// we are resilient against that.
try {
job.listener.taskSucceeded(rt.outputId, event.result)
} catch {
case e: Exception =>
// TODO: Perhaps we want to mark the stage as failed?
job.listener.jobFailed(new SparkDriverExecutionException(e))
}
}
case None =>
logInfo("Ignoring result from " + rt + " because its job has finished")
}
case smt: ShuffleMapTask =>
updateAccumulators(event)
///从通知事件中获得MapStatus对西那个
val status = event.result.asInstanceOf[MapStatus]
////ExecutorId
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
} else {
stage.addOutputLoc(smt.partitionId, status)
}
if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) {
markStageAsFinished(stage)
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
logInfo("waiting: " + waitingStages)
logInfo("failed: " + failedStages)
if (stage.shuffleDep.isDefined) {
// We supply true to increment the epoch number here in case this is a
// recomputation of the map outputs. In that case, some nodes may have cached
// locations with holes (from when we detected the error) and will need the
// epoch incremented to refetch them.
// TODO: Only increment the epoch number if this is not the first time
// we registered these map outputs.
///在此处将MapOutput注册到mapOutputTracker中
mapOutputTracker.registerMapOutputs(
stage.shuffleDep.get.shuffleId,
stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
changeEpoch = true)
}
clearCacheLocs()
if (stage.outputLocs.exists(_ == Nil)) {
// Some tasks had failed; let's resubmit this stage
// TODO: Lower-level scheduler should also deal with this
logInfo("Resubmitting " + stage + " (" + stage.name +
") because some of its tasks had failed: " +
stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", "))
submitStage(stage)
} else {
val newlyRunnable = new ArrayBuffer[Stage]
for (stage <- waitingStages) {
logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage))
}
for (stage <- waitingStages if getMissingParentStages(stage) == Nil) {
newlyRunnable += stage
}
waitingStages --= newlyRunnable
runningStages ++= newlyRunnable
for {
stage <- newlyRunnable.sortBy(_.id)
jobId <- activeJobForStage(stage)
} {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable")
submitMissingTasks(stage, jobId)
}
}
}
}
2. ShuffleMapTask在写shuffle map数据时(调用SortShuffleWriter.write方法),首先写内存,当内存不够使用时,将spill到磁盘;
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
sorter = new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.insertAll(records) ///Spill到磁盘
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
sorter = new ExternalSorter[K, V, V](
None, Some(dep.partitioner), None, dep.serializer)
sorter.insertAll(records)
}
val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) ///写到磁盘文件中
shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
}
3. ResultTask在都去ShuffledRDD中的数据时(通过调用HashShufflerReader),首先读取到内存,当内存不够使用时,将spill到磁盘
override def read(): Iterator[Product2[K, C]] = {
val ser = Serializer.getSerializer(dep.serializer)
///将shuffle数据转换成可遍历的Iterator对象
val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
///从Mapper端读取数据前,做Combine
///combine时,可能会spill到磁盘
if (dep.mapSideCombine) {
new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
} else {
new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
}
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
// Convert the Product2s to pairs since this is what downstream RDDs currently expect
iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
}
// Sort the output if there is a sort ordering defined.
///对output排序,可能spill到磁盘
dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =>
// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
// the ExternalSorter won't spill to disk.
val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
sorter.insertAll(aggregatedIter)
context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
sorter.iterator
case None =>
aggregatedIter
}
}
4. 任务本地性处理
a.DriverActor收到
上一篇: JavaScript 零碎知识点
下一篇: word加入foxmail word
推荐阅读
-
【Spark八十四】Spark零碎知识点记录
-
Spark异常:A master URL must be set in your configuration处理记录
-
spark基础知识点介绍
-
大数据核心知识点:Hbase、Spark、Hive、MapReduce概念理解,特点及机制
-
spark学习使用记录
-
Spark异常:A master URL must be set in your configuration处理记录
-
关于华为FusionInsight Manager安全模式下执行spark任务的一个坑--记录
-
Hanlp分词1.7版本在Spark中分布式使用记录
-
面试分享(spark 实现每天访问的记录数和用户数)
-
spark基础知识点介绍