您现在的位置是: 首页

菜鸟的Spark 源码学习之路 -3 TaskScheduler源码 - part3

程序员文章站 2022-07-14 11:08:55


作者继续查看源码发现,TaskScheduler还有许多关于task管理, executor管理的东西,决定继续研究TaskScheduler如何对这些进行管理的。

1. task 管理


(1) 处理返回结果

(2) 处理执行成功的task

(3) 处理执行失败的task

菜鸟的Spark 源码学习之路 -3 TaskScheduler源码 - part3

def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized {
def handleSuccessfulTask(
                          taskSetManager: TaskSetManager,
                          tid: Long,
                          taskResult: DirectTaskResult[_]): Unit = synchronized {
  taskSetManager.handleSuccessfulTask(tid, taskResult)

def handleFailedTask(
                      taskSetManager: TaskSetManager,
                      tid: Long,
                      taskState: TaskState,
                      reason: TaskFailedReason): Unit = synchronized {
  taskSetManager.handleFailedTask(tid, taskState, reason)
  if (!taskSetManager.isZombie && !taskSetManager.someAttemptSucceeded(tid)) {
    // Need to revive offers again now that the task set manager state has been updated to
    // reflect failed tasks that need to be re-run.


 * Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
def enqueueSuccessfulTask(
    taskSetManager: TaskSetManager,
    tid: Long,
    serializedData: ByteBuffer): Unit = {
// 启动了一个线程去执行,包括反序列化结果数据
  getTaskResultExecutor.execute(new Runnable {
    override def run(): Unit = Utils.logUncaughtExceptions {
      try {
        val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
          case directResult: DirectTaskResult[_] =>
            if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
            // deserialize "value" without holding any lock so that it won't block other threads.
            // We should call it here, so that when it's called again in
            // "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
            (directResult, serializedData.limit())
          case IndirectTaskResult(blockId, size) =>
            if (!taskSetManager.canFetchMoreResults(size)) {
              // dropped by executor if size is larger than maxResultSize
            logDebug("Fetching indirect task result for TID %s".format(tid))
            //handleTaskGettingResult 方法标记了任务的状态为正在获取执行结果,并通知DAGscheduler,
            scheduler.handleTaskGettingResult(taskSetManager, tid)
            val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
            if (!serializedTaskResult.isDefined) {
              /* We won't be able to get the task result if the machine that ran the task failed
               * between when the task ended and when we tried to fetch the result, or if the
               * block manager had to flush the result. */
              // 处理数据获取失败的任务
                taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
            val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
            // force deserialization of referenced value
            (deserializedResult, size)

        // Set the task result size in the accumulator updates received from the executors.
        // We need to do this here on the driver because if we did this on the executors then
        // we would have to serialize the result again after updating the size.
        result.accumUpdates = result.accumUpdates.map { a =>
          if (a.name == Some(InternalAccumulator.RESULT_SIZE)) {
            val acc = a.asInstanceOf[LongAccumulator]
            assert(acc.sum == 0L, "task result size should not have been set on the executors")
          } else {

        scheduler.handleSuccessfulTask(taskSetManager, tid, result)
      } catch {
        case cnf: ClassNotFoundException =>
          val loader = Thread.currentThread.getContextClassLoader
          taskSetManager.abort("ClassNotFound with classloader: " + loader)
        // Matching NonFatal so we don't catch the ControlThrowable from the "return" above.
        case NonFatal(ex) =>
          logError("Exception while getting task result", ex)
          taskSetManager.abort("Exception while getting task result: %s".format(ex))





successful ——TaskState.FINISHED,fail ——TaskState.FINISHED

此外TaskResultGetter.enqueueFailedTask 方法也调用了scheduler.handleFailedTask

def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState,
  serializedData: ByteBuffer) {
  var reason : TaskFailedReason = UnknownReason
  try {
    getTaskResultExecutor.execute(new Runnable {
      override def run(): Unit = Utils.logUncaughtExceptions {
        val loader = Utils.getContextOrSparkClassLoader
        try {
          if (serializedData != null && serializedData.limit() > 0) {
            reason = serializer.get().deserialize[TaskFailedReason](
              serializedData, loader)
        } catch {
          case cnd: ClassNotFoundException =>
            // Log an error but keep going here -- the task failed, so not catastrophic
            // if we can't deserialize the reason.
              "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
          case ex: Exception => // No-op
        } finally {
          // If there's an error while deserializing the TaskEndReason, this Runnable
          // will die. Still tell the scheduler about the task failure, to avoid a hang
          // where the scheduler thinks the task is still running.
          scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
  } catch {
    case e: RejectedExecutionException if sparkEnv.isStopped =>
      // ignore it

不同的是,这里的taskState是TaskState.FAILED 或者KILLED的状态

菜鸟的Spark 源码学习之路 -3 TaskScheduler源码 - part3

区别在于,enqueueSuccessfulTask方法中的任务是执行完成的,调用scheduler.handleFailedTask是因为返回的结果获取存在问题,而后者是任务执行的过程中task 失败或者是被killed。


菜鸟的Spark 源码学习之路 -3 TaskScheduler源码 - part3


菜鸟的Spark 源码学习之路 -3 TaskScheduler源码 - part3


2. BlockManager管理

TaskManager.executorHeartbeatReceived 方法用于确定是否Driver知道BlockManager是Alive的状态,是则返回true,反之false,false则表明该BlockManager需要重新进行注册。

  * Update metrics for in-progress tasks and let the master know that the BlockManager is still
  * alive. Return true if the driver knows about the given block manager. Otherwise, return false,
  * indicating that the block manager should re-register.
override def executorHeartbeatReceived(
                                        execId: String,
                                        accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
                                        blockManagerId: BlockManagerId): Boolean = {
  // (taskId, stageId, stageAttemptId, accumUpdates)
  val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized {
    accumUpdates.flatMap { case (id, updates) =>
      val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None))
      taskIdToTaskSetManager.get(id).map { taskSetMgr =>
        (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos)
  dagScheduler.executorHeartbeatReceived(execId, accumUpdatesWithTaskIds, blockManagerId)


 * Update metrics for in-progress tasks and let the master know that the BlockManager is still
 * alive. Return true if the driver knows about the given block manager. Otherwise, return false,
 * indicating that the block manager should re-register.
def executorHeartbeatReceived(
    execId: String,
    // (taskId, stageId, stageAttemptId, accumUpdates)
    accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
    blockManagerId: BlockManagerId): Boolean = {
  listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates))
    BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))


override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

  // Messages sent and received locally
  case ExecutorRegistered(executorId) =>
    executorLastSeen(executorId) = clock.getTimeMillis()
  case ExecutorRemoved(executorId) =>
  case TaskSchedulerIsSet =>
    scheduler = sc.taskScheduler
  case ExpireDeadHosts =>

  // Messages received from executors
  case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) =>
    if (scheduler != null) {
      if (executorLastSeen.contains(executorId)) {
        executorLastSeen(executorId) = clock.getTimeMillis()
        eventLoopThread.submit(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            val unknownExecutor = !scheduler.executorHeartbeatReceived(
              executorId, accumUpdates, blockManagerId)
            val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
      } else {
        // This may happen if we get an executor's in-flight heartbeat immediately
        // after we just removed it. It's not really an error condition so we should
        // not log warning here. Otherwise there may be a lot of noise especially if
        // we explicitly remove executors (SPARK-4134).
        logDebug(s"Received heartbeat from unknown executor $executorId")
        context.reply(HeartbeatResponse(reregisterBlockManager = true))
    } else {
      // Because Executor will sleep several seconds before sending the first "Heartbeat", this
      // case rarely happens. However, if it really happens, log it and ask the executor to
      // register itself again.
      logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
      context.reply(HeartbeatResponse(reregisterBlockManager = true))


    sparkContext 的HeartbeatReceiver调用TaskScheduler.executorHeartbeatReceived, 在接受到远程节点心跳时会判断是否已经注册,这个过程最终是DAGsheduler 调用BlockManager远程同步方法来完成。 
