spark任务运行完成后在driver端的处理逻辑
回顾
上一篇,我们分析了了任务在executor端的运行流程,任务运行结束后,在executor.launchtask方法最后,通过调用execbackend.statusupdate方法将任务结果以及任务状态发送给driver。回到driver端,我们在driver的rpc服务端driverendpoint的receive方法中寻找对statusupdate消息的处理逻辑。
driverendpoint.receive
case statusupdate(executorid, taskid, state, data) => // 通知taskscheduler任务已完成 scheduler.statusupdate(taskid, state, data.value) // 如果任务已经运行结束了,包括finished, failed, killed, lost这几种状态 // 那么说明任务占用的资源已经释放了,此时就可以回收这部分资源并重新分配任务 if (taskstate.isfinished(state)) { executordatamap.get(executorid) match { case some(executorinfo) => executorinfo.freecores += scheduler.cpus_per_task makeoffers(executorid) case none => // ignoring the update since we don't know about the executor. logwarning(s"ignored task status update ($taskid state $state) " + s"from unknown executor with id $executorid") } }
所以重点是scheduler.statusupdate调用
taskschedulerimpl.statusupdate
def statusupdate(tid: long, state: taskstate, serializeddata: bytebuffer) { var failedexecutor: option[string] = none var reason: option[executorlossreason] = none synchronized { try { taskidtotasksetmanager.get(tid) match { case some(taskset) => // 这个状态不明,没看什么地方会产生这个状态 if (state == taskstate.lost) { // taskstate.lost is only used by the deprecated mesos fine-grained scheduling mode, // where each executor corresponds to a single task, so mark the executor as failed. val execid = taskidtoexecutorid.getorelse(tid, throw new illegalstateexception( "taskidtotasksetmanager.contains(tid) <=> taskidtoexecutorid.contains(tid)")) if (executoridtorunningtaskids.contains(execid)) { reason = some( slavelost(s"task $tid was lost, so marking the executor as lost as well.")) removeexecutor(execid, reason.get) failedexecutor = some(execid) } } // 任务运行结束,包括这几种状态finished, failed, killed, lost if (taskstate.isfinished(state)) { // 清除关于这个task的一些簿记量 cleanuptaskstate(tid) // 将这个task从正在运行的task集合中移除 taskset.removerunningtask(tid) if (state == taskstate.finished) { // 启动一个线程,用来异步地处理任务成功的情况 taskresultgetter.enqueuesuccessfultask(taskset, tid, serializeddata) } else if (set(taskstate.failed, taskstate.killed, taskstate.lost).contains(state)) { taskresultgetter.enqueuefailedtask(taskset, tid, state, serializeddata) } } case none => logerror( ("ignoring update with state %s for tid %s because its task set is gone (this is " + "likely the result of receiving duplicate task finished status updates) or its " + "executor has been marked as failed.") .format(state, tid)) } } catch { case e: exception => logerror("exception in statusupdate", e) } } // update the dagscheduler without holding a lock on this, since that can deadlock if (failedexecutor.isdefined) { assert(reason.isdefined) dagscheduler.executorlost(failedexecutor.get, reason.get) backend.reviveoffers() } }
这里,启动了一个异步任务,用来处理任务成功的情况,所以我们分析一下异步任务的处理逻辑。
taskresultgetter.enqueuesuccessfultask
def enqueuesuccessfultask( tasksetmanager: tasksetmanager, tid: long, serializeddata: bytebuffer): unit = { // 启动一个异步任务 gettaskresultexecutor.execute(new runnable { override def run(): unit = utils.loguncaughtexceptions { try { // 对传回的结果进行反序列化 val (result, size) = serializer.get().deserialize[taskresult[_]](serializeddata) match { // 如果是直接传回的结果,那么直接从反序列化的对象中取数据即可 case directresult: directtaskresult[_] => // 首先检查结果大小是否超过阈值,默认是1g, // 也即最多能够允许多大的结果放到driver端 if (!tasksetmanager.canfetchmoreresults(serializeddata.limit())) { return } // deserialize "value" without holding any lock so that it won't block other threads. // we should call it here, so that when it's called again in // "tasksetmanager.handlesuccessfultask", it does not need to deserialize the value. // 对任务结果进行反序列化,调用该方法不会引起其他线程阻塞, directresult.value(taskresultserializer.get()) (directresult, serializeddata.limit()) case indirecttaskresult(blockid, size) => // 检查结果大小是否超过限制 if (!tasksetmanager.canfetchmoreresults(size)) { // dropped by executor if size is larger than maxresultsize // 如果放弃了该任务,那么需要将该任务在blockmanager中对应的block移除掉 sparkenv.blockmanager.master.removeblock(blockid) return } logdebug("fetching indirect task result for tid %s".format(tid)) // 这句话最终会通过dagscheduler给事件总线投递一条taskgetting的事件 scheduler.handletaskgettingresult(tasksetmanager, tid) // 通过blockmanager远程拉取结果数据 // 而这个blockid对应的块的位置信息已经在之前由executor端传回 val serializedtaskresult = sparkenv.blockmanager.getremotebytes(blockid) if (!serializedtaskresult.isdefined) { /* we won't be able to get the task result if the machine that ran the task failed * between when the task ended and when we tried to fetch the result, or if the * block manager had to flush the result. */ // 这里拉取数据失败分为两种情况:一种是由于任务序列化后体积太大主动丢弃 // 另一种是executor节点网络异常,导致拉取失败 // 这两种情况都算作任务失败 // 这个方法主要是对失败的任务重新运行 scheduler.handlefailedtask( tasksetmanager, tid, taskstate.finished, taskresultlost) return } // 将从blockmanager拉取到的数据进行反序列化 val deserializedresult = serializer.get().deserialize[directtaskresult[_]]( serializedtaskresult.get.tobytebuffer) // force deserialization of referenced value // 对任务结果进行反序列化 deserializedresult.value(taskresultserializer.get()) // 将block移除,因为数据已经拉取到了 sparkenv.blockmanager.master.removeblock(blockid) (deserializedresult, size) } // set the task result size in the accumulator updates received from the executors. // we need to do this here on the driver because if we did this on the executors then // we would have to serialize the result again after updating the size. // 处理累加器,主要是对任务结果大小的统计量需要做特殊处理 result.accumupdates = result.accumupdates.map { a => // 对于任务结果大小的统计量需要做特殊处理 if (a.name == some(internalaccumulator.result_size)) { val acc = a.asinstanceof[longaccumulator] assert(acc.sum == 0l, "task result size should not have been set on the executors") acc.setvalue(size.tolong) acc } else { a } } // 将反序列化好的结果数据告诉taskschedulerimpl做进一步处理 scheduler.handlesuccessfultask(tasksetmanager, tid, result) } catch { case cnf: classnotfoundexception => val loader = thread.currentthread.getcontextclassloader tasksetmanager.abort("classnotfound with classloader: " + loader) // matching nonfatal so we don't catch the controlthrowable from the "return" above. case nonfatal(ex) => logerror("exception while getting task result", ex) tasksetmanager.abort("exception while getting task result: %s".format(ex)) } } }) }
这里会有好几次反序列化,这时因为在executor端对任务结果数据处理时就是经过了好几次序列化,
- 首先会把任务运行的结果进行序列化,和累加器一起包装成directtaskresult对象
- 然后对directtaskresult对象进行序列化
- 对于结果太大通过blockmanager传输的情况,需要封装一个indirecttaskresult对象
- 最后还有对indirecttaskresult对象进行序列化
可以看到在结果传回driver端后,是按照与上面相反的顺序进行反序列化的。
最后拿到任务运行的结果数据以后,将结果数据交给taskschedulerimpl做进一步处理。
taskschedulerimpl.handlesuccessfultask
def handlesuccessfultask( tasksetmanager: tasksetmanager, tid: long, taskresult: directtaskresult[_]): unit = synchronized { tasksetmanager.handlesuccessfultask(tid, taskresult) }
tasksetmanager.handlesuccessfultask
def handlesuccessfultask(tid: long, result: directtaskresult[_]): unit = { // 更新一些簿记量 val info = taskinfos(tid) val index = info.index info.markfinished(taskstate.finished, clock.gettimemillis()) if (speculationenabled) { successfultaskdurations.insert(info.duration) } removerunningtask(tid) // kill any other attempts for the same task (since those are unnecessary now that one // attempt completed successfully). // 对于这个任务的其他运行中的副本,全部都要杀掉,主要是推测执行机制会对同一个任务同时运行多个副本 for (attemptinfo <- taskattempts(index) if attemptinfo.running) { loginfo(s"killing attempt ${attemptinfo.attemptnumber} for task ${attemptinfo.id} " + s"in stage ${taskset.id} (tid ${attemptinfo.taskid}) on ${attemptinfo.host} " + s"as the attempt ${info.attemptnumber} succeeded on ${info.host}") killedbyotherattempt(index) = true // 通过调度后端发送杀死任务的信息 sched.backend.killtask( attemptinfo.taskid, attemptinfo.executorid, interruptthread = true, reason = "another attempt succeeded") } // 检查是不是第一次,如果是第一次才会更新这些簿记量 // 这么做主要是为了防止多个任务副本多次更新造成不一致 if (!successful(index)) { taskssuccessful += 1 loginfo(s"finished task ${info.id} in stage ${taskset.id} (tid ${info.taskid}) in" + s" ${info.duration} ms on ${info.host} (executor ${info.executorid})" + s" ($taskssuccessful/$numtasks)") // mark successful and stop if all the tasks have succeeded. successful(index) = true // 如果全部的任务都完成了,就说明这个任务集(stage)完成了 if (taskssuccessful == numtasks) { iszombie = true } } else { loginfo("ignoring task-finished event for " + info.id + " in stage " + taskset.id + " because task " + index + " has already completed successfully") } // this method is called by "taskschedulerimpl.handlesuccessfultask" which holds the // "taskschedulerimpl" lock until exiting. to avoid the spark-7655 issue, we should not // "deserialize" the value when holding a lock to avoid blocking other threads. so we call // "result.value()" in "taskresultgetter.enqueuesuccessfultask" before reaching here. // note: "result.value()" only deserializes the value when it's called at the first time, so // here "result.value()" just returns the value and won't block other threads. // 进一步通知dag调度器做进一步处理, // 这里可见在任务提交运行是的处理顺序是从dagscheduler -> taskscheduler -> schedulerbackend -> executor // 而任务运行结束后结果返回处理的顺序则与上面的顺正好反过来。 // 此外,也能看出taskscheduler也充当了dagscheduler和schedulerbackend中间人的角色,传递消息 sched.dagscheduler.taskended(tasks(index), success, result.value(), result.accumupdates, info) // 更新一些簿记量 maybefinishtaskset() }
这个方法的主要工作是更新一些簿记量;杀掉其他的任务副本;
然后通知dagscheduler做进一步处理。
dagscheduler.handletaskcompletion
这个方法很长,所以我们把这个方法的主要逻辑做一个总结:
- 处理累加器。对于resulttask类型的任务不会进行重复累加,而对于shufflemaptask类型的任务则会进行重复累加(推测执行)
- 首先,向事件总线中投递一个任务结束的事件
- 针对任务运行成功的情况做处理。如果是resulttask类型的任务,需要更新一些簿记量,并在整个stage的所有任务完成时将stage标记为完成,并且通知作业监听器;对于shufflemaptask类型的任务处理要复杂一些,同样要更新一些簿记量,并且在mapoutputtracker组件中注册这个任务的输出block信息,如果所有的分区全部完成,那么还要将这个stage标记为完成。
-
处理拉取数据失败的情况。除了更新一些簿记量,主要做的事就是判断是否要再次提交stage,如果不能再次提交(冲提交次数超过阈值)那么就需要将关联的job取消掉,否则再次提交这个stage。这里需要注意的是,再次提交stage并不会把所有的任务全部再重新运行一遍,只会把那些因失败而导致没有完成的任务重新提交,通过mapoutputtrackermaster组件追踪mshufflemap任务的输出情况。
private[scheduler] def handletaskcompletion(event: completionevent) { val task = event.task val taskid = event.taskinfo.id val stageid = task.stageid val tasktype = utils.getformattedclassname(task) // 通知outputcommitcoordinator组件对任务完成的事件做一些处理 // outputcommitcoordinator组件需要对失败的任务 outputcommitcoordinator.taskcompleted( stageid, task.partitionid, event.taskinfo.attemptnumber, // this is a task attempt number event.reason) if (!stageidtostage.contains(task.stageid)) { // the stage may have already finished when we get this event -- eg. maybe it was a // speculative task. it is important that we send the taskend event in any case, so listeners // are properly notified and can chose to handle it. for instance, some listeners are // doing their own accounting and if they don't get the task end event they think // tasks are still running when they really aren't. // 在获取这个事件时对应的stage可能已经完成了。比如,当前完成的task可能是一个推测执行的task。 // 但是,无论如何,我们都有必要向事件总线中投递一个任务结束的事件, // 这样才能正确第通知监听器,以使得监听器能够做出正确的处理。 // 例如有的监听器会对所有完成的任务(包括推测执行)进行计数,如果监听器获取不到任务完成的事件 // 他们就会认为任务还在运行。 posttaskend(event) // skip all the actions if the stage has been cancelled. // 由于stage在之前已经被处理过了,所以这里直接返回 return } val stage = stageidtostage(task.stageid) // make sure the task's accumulators are updated before any other processing happens, so that // we can post a task end event before any jobs or stages are updated. the accumulators are // only updated in certain cases. // 这里应该思考一个问题:既然任务的多个副本可能会同时完成, // 那么也就有可能会同时发送任务结束事件, // 也就说这个方法可能因为任务的多个副本在同一段时间内完成而被同时执行 // 那么这里没有加锁,也没有cas或其他的一些同步措施,这样不会尝试线程不安全问题吗?? // 答案在于eventloop类中,这个类处理事件的线程只有一个, // 所以实际上所有的事件都是串行执行的,自然也就不会有线程不安全的问题了 // 这一步主要是处理累加器 event.reason match {
case success =>
task match {
case rt: resulttask[, ] =>
val resultstage = stage.asinstanceof[resultstage]
resultstage.activejob match {
case some(job) =>
// only update the accumulator once for each result task.
// 对于resulttask的累加器只计算一次,不会重复计算
if (!job.finished(rt.outputid)) {
updateaccumulators(event)
}
case none => // ignore update if task's job has finished.
}
case _ =>
// 对于shufflemaptask则不会考虑累加器的重复计数,
// 也就意味着shuflemaptask中执行的累加器会重复计数
updateaccumulators(event)
}
case : exceptionfailure => updateaccumulators(event)
case =>
}
// 向事件总线投递一个任务完成的事件
posttaskend(event)// 这一步主要是对作业的一些簿记量的更新维护 // 如果作业的全部分区都已完成,那么移除掉这个作业 // 并移除作业内不被其他作业依赖的stage的信息 event.reason match { case success => task match { case rt: resulttask[_, _] => // cast to resultstage here because it's part of the resulttask // todo refactor this out to a function that accepts a resultstage val resultstage = stage.asinstanceof[resultstage] resultstage.activejob match { case some(job) => if (!job.finished(rt.outputid)) { job.finished(rt.outputid) = true job.numfinished += 1 // if the whole job has finished, remove it // 如果作业的全部分区都已完成,那么移除掉这个作业 // 并移除作业内不被其他作业依赖的stage的信息 if (job.numfinished == job.numpartitions) { // 把这个stage标记为已完成 markstageasfinished(resultstage) // 移除作业内不被其他作业依赖的stage的信息 cleanupstateforjobandindependentstages(job) // 向事件总线追踪投递一个作业结束的事件 listenerbus.post( sparklistenerjobend(job.jobid, clock.gettimemillis(), jobsucceeded)) } // tasksucceeded runs some user code that might throw an exception. make sure // we are resilient against that. // 最后,需要调用作业监听器的回调函数,以通知作业监听器 try { job.listener.tasksucceeded(rt.outputid, event.result) } catch { case e: exception => // todo: perhaps we want to mark the resultstage as failed? job.listener.jobfailed(new sparkdriverexecutionexception(e)) } } case none => loginfo("ignoring result from " + rt + " because its job has finished") } // 处理shufflemaptask的情况 case smt: shufflemaptask => val shufflestage = stage.asinstanceof[shufflemapstage] val status = event.result.asinstanceof[mapstatus] val execid = status.location.executorid logdebug("shufflemaptask finished on " + execid) if (stageidtostage(task.stageid).latestinfo.attemptnumber == task.stageattemptid) { // this task was for the currently running attempt of the stage. since the task // completed successfully from the perspective of the tasksetmanager, mark it as // no longer pending (the tasksetmanager may consider the task complete even // when the output needs to be ignored because the task's epoch is too small below. // in this case, when pending partitions is empty, there will still be missing // output locations, which will cause the dagscheduler to resubmit the stage below.) // 如果如果task的stageattemptid与当前最新的stage信息相同, // 说明该任务已经完成 shufflestage.pendingpartitions -= task.partitionid } // 如果这个任务的epoch比被标记为失败的epoch要早,那么忽略这次运行结果 if (failedepoch.contains(execid) && smt.epoch <= failedepoch(execid)) { loginfo(s"ignoring possibly bogus $smt completion from executor $execid") } else { // the epoch of the task is acceptable (i.e., the task was launched after the most // recent failure we're aware of for the executor), so mark the task's output as // available. // 这个任务的epoch被接收,那么在mapoutputtracker组件中将这个任务标记为成功 // 然后就能通过mapoutputtracker组件获取到这个分区的结果状态了 mapoutputtracker.registermapoutput( shufflestage.shuffledep.shuffleid, smt.partitionid, status) // remove the task's partition from pending partitions. this may have already been // done above, but will not have been done yet in cases where the task attempt was // from an earlier attempt of the stage (i.e., not the attempt that's currently // running). this allows the dagscheduler to mark the stage as complete when one // copy of each task has finished successfully, even if the currently active stage // still has tasks running. // 同样将这个分区标记为已完成 shufflestage.pendingpartitions -= task.partitionid } // 如果stage的所有分区都已完成,那么将这个stage标记为已完成 if (runningstages.contains(shufflestage) && shufflestage.pendingpartitions.isempty) { markstageasfinished(shufflestage) loginfo("looking for newly runnable stages") loginfo("running: " + runningstages) loginfo("waiting: " + waitingstages) loginfo("failed: " + failedstages) // this call to increment the epoch may not be strictly necessary, but it is retained // for now in order to minimize the changes in behavior from an earlier version of the // code. this existing behavior of always incrementing the epoch following any // successful shuffle map stage completion may have benefits by causing unneeded // cached map outputs to be cleaned up earlier on executors. in the future we can // consider removing this call, but this will require some extra investigation. // see https://github.com/apache/spark/pull/17955/files#r117385673 for more details. mapoutputtracker.incrementepoch() // 清除rdd的分区结果位置缓存 // 以便在访问缓存是重新从blockmanager中或rdd分区结果的位置信息 clearcachelocs() if (!shufflestage.isavailable) { // some tasks had failed; let's resubmit this shufflestage. // 如果有部分任务失败,那么需要重新提交这个stage // todo: lower-level scheduler should also deal with this loginfo("resubmitting " + shufflestage + " (" + shufflestage.name + ") because some of its tasks had failed: " + shufflestage.findmissingpartitions().mkstring(", ")) submitstage(shufflestage) } else { // mark any map-stage jobs waiting on this stage as finished // 将所有依赖于这个stage的job标记为运行结束 if (shufflestage.mapstagejobs.nonempty) { val stats = mapoutputtracker.getstatistics(shufflestage.shuffledep) for (job <- shufflestage.mapstagejobs) { markmapstagejobasfinished(job, stats) } } // 提价下游的子stage submitwaitingchildstages(shufflestage) } } } //处理重复提交的情况 case resubmitted => loginfo("resubmitted " + task + ", so marking it as still running") stage match { case sms: shufflemapstage => sms.pendingpartitions += task.partitionid case _ => assert(false, "tasksetmanagers should only send resubmitted task statuses for " + "tasks in shufflemapstages.") } // 处理拉取数据失败的情况 case fetchfailed(bmaddress, shuffleid, mapid, reduceid, failuremessage) => val failedstage = stageidtostage(task.stageid) val mapstage = shuffleidtomapstage(shuffleid) // 如果这个任务的attempid与stage最近一次的attemptid不同, // 那么忽略这个异常,因为又一次更新的stage的尝试正在运行中 if (failedstage.latestinfo.attemptnumber != task.stageattemptid) { loginfo(s"ignoring fetch failure from $task as it's from $failedstage attempt" + s" ${task.stageattemptid} and there is a more recent attempt for that stage " + s"(attempt ${failedstage.latestinfo.attemptnumber}) running") } else { // it is likely that we receive multiple fetchfailed for a single stage (because we have // multiple tasks running concurrently on different executors). in that case, it is // possible the fetch failure has already been handled by the scheduler. // 将这个stage标记为已结束 if (runningstages.contains(failedstage)) { loginfo(s"marking $failedstage (${failedstage.name}) as failed " + s"due to a fetch failure from $mapstage (${mapstage.name})") markstageasfinished(failedstage, some(failuremessage)) } else { logdebug(s"received fetch failure from $task, but its from $failedstage which is no " + s"longer running") } // 把拉取失败的stage的attemptid记录下来 failedstage.fetchfailedattemptids.add(task.stageattemptid) // 如果stage的尝试次数已经超过最大允许值,那么将直接将取消该stage val shouldabortstage = failedstage.fetchfailedattemptids.size >= maxconsecutivestageattempts || disallowstageretryfortest if (shouldabortstage) { val abortmessage = if (disallowstageretryfortest) { "fetch failure will not retry stage due to testing config" } else { s"""$failedstage (${failedstage.name}) |has failed the maximum allowable number of |times: $maxconsecutivestageattempts. |most recent failure reason: $failuremessage""".stripmargin.replaceall("\n", " ") } // 取消这个stage, 做一些处理 abortstage(failedstage, abortmessage, none) } else { // update failedstages and make sure a resubmitfailedstages event is enqueued // todo: cancel running tasks in the failed stage -- cf. spark-17064 val noresubmitenqueued = !failedstages.contains(failedstage) // 将这个stage添加到失败的stage队列中, // 这个队列是等待重新提交的stage队列 failedstages += failedstage failedstages += mapstage if (noresubmitenqueued) { // we expect one executor failure to trigger many fetchfailures in rapid succession, // but all of those task failures can typically be handled by a single resubmission of // the failed stage. we avoid flooding the scheduler's event queue with resubmit // messages by checking whether a resubmit is already in the event queue for the // failed stage. if there is already a resubmit enqueued for a different failed // stage, that event would also be sufficient to handle the current failed stage, but // producing a resubmit for each failed stage makes debugging and logging a little // simpler while not producing an overwhelming number of scheduler events. loginfo( s"resubmitting $mapstage (${mapstage.name}) and " + s"$failedstage (${failedstage.name}) due to fetch failure" ) // 200毫秒之后给内部的事件处理线程发送一个重新提交stage的事件 // 以通知dagschedduler重新提交失败的stage messagescheduler.schedule( new runnable { override def run(): unit = eventprocessloop.post(resubmitfailedstages) }, dagscheduler.resubmit_timeout, timeunit.milliseconds ) } } // mark the map whose fetch failed as broken in the map stage // 从mapoutputtracker中将这个任务的map输出信息移除掉 if (mapid != -1) { mapoutputtracker.unregistermapoutput(shuffleid, mapid, bmaddress) } // todo: mark the executor as failed only if there were lots of fetch failures on it // 将拉取失败的block所在的executor移除掉,通知driverendpoint移除 // 并且在blockmanagermaster中将对应的executor上的所有block信息全部移除 if (bmaddress != null) { val hosttounregisteroutputs = if (env.blockmanager.externalshuffleserviceenabled && unregisteroutputonhostonfetchfailure) { // we had a fetch failure with the external shuffle service, so we // assume all shuffle data on the node is bad. some(bmaddress.host) } else { // unregister shuffle data just for one executor (we don't have any // reason to believe shuffle data has been lost for the entire host). none } removeexecutorandunregisteroutputs( execid = bmaddress.executorid, filelost = true, hosttounregisteroutputs = hosttounregisteroutputs, maybeepoch = some(task.epoch)) } } case commitdenied: taskcommitdenied => // do nothing here, left up to the taskscheduler to decide how to handle denied commit case exceptionfailure: exceptionfailure => // nothing left to do, already handled above for accumulator updates. case taskresultlost => // do nothing here; the taskscheduler handles these failures and resubmits the task. case _: executorlostfailure | _: taskkilled | unknownreason => // unrecognized failure - also do nothing. if the task fails repeatedly, the taskscheduler // will abort the job. } }
下一篇: 电信3G网明年升至9.3M