欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

菜鸟的Spark 源码学习之路 -3 TaskScheduler源码 - part3

程序员文章站 2022-07-14 11:08:55
...

上一篇文中讲到TaskScheduler中的任务启动流程:https://blog.csdn.net/u012543819/article/details/81510632

作者继续查看源码发现,TaskScheduler还有许多关于task管理, executor管理的东西,决定继续研究TaskScheduler如何对这些进行管理的。

1. task 管理

这里task执行结果处理主要有三个部分:

(1) 处理返回结果

(2) 处理执行成功的task

(3) 处理执行失败的task

菜鸟的Spark 源码学习之路 -3 TaskScheduler源码 - part3

三个方法的源码如下:
def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized {
  taskSetManager.handleTaskGettingResult(tid)
}
def handleSuccessfulTask(
                          taskSetManager: TaskSetManager,
                          tid: Long,
                          taskResult: DirectTaskResult[_]): Unit = synchronized {
  taskSetManager.handleSuccessfulTask(tid, taskResult)
}

def handleFailedTask(
                      taskSetManager: TaskSetManager,
                      tid: Long,
                      taskState: TaskState,
                      reason: TaskFailedReason): Unit = synchronized {
  taskSetManager.handleFailedTask(tid, taskState, reason)
  if (!taskSetManager.isZombie && !taskSetManager.someAttemptSucceeded(tid)) {
    // Need to revive offers again now that the task set manager state has been updated to
    // reflect failed tasks that need to be re-run.
    backend.reviveOffers()
  }
}

它们都在TaskResultGetter.denqueueSuccessfulTask方法中被调用。

/**
 * Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
 */
private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
def enqueueSuccessfulTask(
    taskSetManager: TaskSetManager,
    tid: Long,
    serializedData: ByteBuffer): Unit = {
// 启动了一个线程去执行,包括反序列化结果数据
  getTaskResultExecutor.execute(new Runnable {
    override def run(): Unit = Utils.logUncaughtExceptions {
      try {
        val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
          case directResult: DirectTaskResult[_] =>
            if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
              return
            }
            // deserialize "value" without holding any lock so that it won't block other threads.
            // We should call it here, so that when it's called again in
            // "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
            directResult.value(taskResultSerializer.get())
            (directResult, serializedData.limit())
          case IndirectTaskResult(blockId, size) =>
            if (!taskSetManager.canFetchMoreResults(size)) {
              // dropped by executor if size is larger than maxResultSize
              sparkEnv.blockManager.master.removeBlock(blockId)
              return
            }
            logDebug("Fetching indirect task result for TID %s".format(tid))
            //处理正在获取结果的任务,底层调用了taskSetManager.handleTaskGettingResult 
            //handleTaskGettingResult 方法标记了任务的状态为正在获取执行结果,并通知DAGscheduler,
            scheduler.handleTaskGettingResult(taskSetManager, tid)
            //获取结果
            val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
            if (!serializedTaskResult.isDefined) {
              /* We won't be able to get the task result if the machine that ran the task failed
               * between when the task ended and when we tried to fetch the result, or if the
               * block manager had to flush the result. */
              // 处理数据获取失败的任务
              scheduler.handleFailedTask(
                taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
              return
            }
            val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
              serializedTaskResult.get.toByteBuffer)
            // force deserialization of referenced value
            deserializedResult.value(taskResultSerializer.get())
            sparkEnv.blockManager.master.removeBlock(blockId)
            (deserializedResult, size)
        }

        // Set the task result size in the accumulator updates received from the executors.
        // We need to do this here on the driver because if we did this on the executors then
        // we would have to serialize the result again after updating the size.
        result.accumUpdates = result.accumUpdates.map { a =>
          if (a.name == Some(InternalAccumulator.RESULT_SIZE)) {
            val acc = a.asInstanceOf[LongAccumulator]
            assert(acc.sum == 0L, "task result size should not have been set on the executors")
            acc.setValue(size.toLong)
            acc
          } else {
            a
          }
        }

        //处理执行成功的任务
        scheduler.handleSuccessfulTask(taskSetManager, tid, result)
      } catch {
        case cnf: ClassNotFoundException =>
          val loader = Thread.currentThread.getContextClassLoader
          taskSetManager.abort("ClassNotFound with classloader: " + loader)
        // Matching NonFatal so we don't catch the ControlThrowable from the "return" above.
        case NonFatal(ex) =>
          logError("Exception while getting task result", ex)
          taskSetManager.abort("Exception while getting task result: %s".format(ex))
      }
    }
  })
}

实际上,task的管理都是由taskSetManager来完成的,他负责标记任务的状态,并通知DAGScheduler相应的任务状态。

任务的状态主要有以下几种:

 

任务执行成功和失败的状态

successful ——TaskState.FINISHED,fail ——TaskState.FINISHED

此外TaskResultGetter.enqueueFailedTask 方法也调用了scheduler.handleFailedTask

def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState,
  serializedData: ByteBuffer) {
  var reason : TaskFailedReason = UnknownReason
  try {
    getTaskResultExecutor.execute(new Runnable {
      override def run(): Unit = Utils.logUncaughtExceptions {
        val loader = Utils.getContextOrSparkClassLoader
        try {
          if (serializedData != null && serializedData.limit() > 0) {
            reason = serializer.get().deserialize[TaskFailedReason](
              serializedData, loader)
          }
        } catch {
          case cnd: ClassNotFoundException =>
            // Log an error but keep going here -- the task failed, so not catastrophic
            // if we can't deserialize the reason.
            logError(
              "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
          case ex: Exception => // No-op
        } finally {
          // If there's an error while deserializing the TaskEndReason, this Runnable
          // will die. Still tell the scheduler about the task failure, to avoid a hang
          // where the scheduler thinks the task is still running.
          scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
        }
      }
    })
  } catch {
    case e: RejectedExecutionException if sparkEnv.isStopped =>
      // ignore it
  }
}

不同的是,这里的taskState是TaskState.FAILED 或者KILLED的状态

菜鸟的Spark 源码学习之路 -3 TaskScheduler源码 - part3

区别在于,enqueueSuccessfulTask方法中的任务是执行完成的,调用scheduler.handleFailedTask是因为返回的结果获取存在问题,而后者是任务执行的过程中task 失败或者是被killed。

其他的task管理方法:

菜鸟的Spark 源码学习之路 -3 TaskScheduler源码 - part3

还有一些executor的管理方法:

菜鸟的Spark 源码学习之路 -3 TaskScheduler源码 - part3

 

2. BlockManager管理

TaskManager.executorHeartbeatReceived 方法用于确定是否Driver知道BlockManager是Alive的状态,是则返回true,反之false,false则表明该BlockManager需要重新进行注册。

/**
  * Update metrics for in-progress tasks and let the master know that the BlockManager is still
  * alive. Return true if the driver knows about the given block manager. Otherwise, return false,
  * indicating that the block manager should re-register.
  */
override def executorHeartbeatReceived(
                                        execId: String,
                                        accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
                                        blockManagerId: BlockManagerId): Boolean = {
  // (taskId, stageId, stageAttemptId, accumUpdates)
  val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized {
    accumUpdates.flatMap { case (id, updates) =>
      val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None))
      taskIdToTaskSetManager.get(id).map { taskSetMgr =>
        (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos)
      }
    }
  }
//实际调用的是dagscheduler.executorHeartbeatReceived,功能与本方法相同
  dagScheduler.executorHeartbeatReceived(execId, accumUpdatesWithTaskIds, blockManagerId)
}

由于上述方法实际调用的是dagscheduler.executorHeartbeatReceived,我们再来看一下这个方法的实现:

/**
 * Update metrics for in-progress tasks and let the master know that the BlockManager is still
 * alive. Return true if the driver knows about the given block manager. Otherwise, return false,
 * indicating that the block manager should re-register.
 */
def executorHeartbeatReceived(
    execId: String,
    // (taskId, stageId, stageAttemptId, accumUpdates)
    accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
    blockManagerId: BlockManagerId): Boolean = {
  listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates))
//实际还是blockManager发送了同步请求确定BlockManager是否丢失心跳
  blockManagerMaster.driverEndpoint.askSync[Boolean](
    BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
}

最终TaskManager.executorHeartbeatReceived是在HeartbeatReceiver.receiveAndReply方法中调用的,sparkContext中持有一个HeartbeatReceiver对象。

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

  // Messages sent and received locally
  case ExecutorRegistered(executorId) =>
    executorLastSeen(executorId) = clock.getTimeMillis()
    context.reply(true)
  case ExecutorRemoved(executorId) =>
    executorLastSeen.remove(executorId)
    context.reply(true)
  case TaskSchedulerIsSet =>
    scheduler = sc.taskScheduler
    context.reply(true)
  case ExpireDeadHosts =>
    expireDeadHosts()
    context.reply(true)

  // Messages received from executors
  case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) =>
    if (scheduler != null) {
      if (executorLastSeen.contains(executorId)) {
        executorLastSeen(executorId) = clock.getTimeMillis()
        eventLoopThread.submit(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            val unknownExecutor = !scheduler.executorHeartbeatReceived(
              executorId, accumUpdates, blockManagerId)
            val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
            context.reply(response)
          }
        })
      } else {
        // This may happen if we get an executor's in-flight heartbeat immediately
        // after we just removed it. It's not really an error condition so we should
        // not log warning here. Otherwise there may be a lot of noise especially if
        // we explicitly remove executors (SPARK-4134).
        logDebug(s"Received heartbeat from unknown executor $executorId")
        context.reply(HeartbeatResponse(reregisterBlockManager = true))
      }
    } else {
      // Because Executor will sleep several seconds before sending the first "Heartbeat", this
      // case rarely happens. However, if it really happens, log it and ask the executor to
      // register itself again.
      logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
      context.reply(HeartbeatResponse(reregisterBlockManager = true))
    }
}

最后我们梳理一下整个过程:

    sparkContext 的HeartbeatReceiver调用TaskScheduler.executorHeartbeatReceived, 在接受到远程节点心跳时会判断是否已经注册,这个过程最终是DAGsheduler 调用BlockManager远程同步方法来完成。 

所有的方法,其实都是通过调用DAGsheduler的对应方法来完成的。