解析spark源码yarn-cluster模式任务提交
一,运行命令
bin/spark-submit \ --master yarn \ --deploy-mode cluster \ --class org.apache.spark.examples.sparkpi \ examples/jars/spark-examples_2.11-2.3.1.3.0.1.0-187.jar
二,任务提交流程图
三,启动脚本
查看spark-submit 脚本文件,程序入口为
exec "${spark_home}"/bin/spark-class org.apache.spark.deploy.sparksubmit "$@“
查看${spark_home}"/bin/spark-class可知该脚本执行了java -cp main-class 命令启动了一个java进程,进程名为sparksubmit,main函数在主类org.apache.spark.deploy.sparksubmit中。
实际执行的具体命令为:
/etc/alternatives/jre/bin/java -dhdp.version=3.0.1.0-187 -cp /usr/hdp/3.0.1.0-187/spark2/conf/:/usr/hdp/3.0.1.0-187/spark2/jars/*:/usr/hdp/3.0.1.0-187/hadoop/conf/ -xmx1g org.apache.spark.deploy.sparksubmit --master yarn --class org.apache.spark.examples.sparkpi examples/jars/spark-examples_2.11-2.3.1.3.0.1.0-187.jar
四,程序入口类org.apache.spark.deploy.sparksubmit
该类有个伴生对象,其中有main函数,创建了sparksubmit对象并执行dosubmit();
override def main(args: array[string]): unit = { val submit = new sparksubmit() {...} submit.dosubmit(args) }
dosubmit 解析args参数,封装到appargs:sparksubmitarguments对象中,然后执行submit(appargs, uninitlog)。
def dosubmit(args: array[string]): unit = { // initialize logging if it hasn't been done yet. keep track of whether logging needs to // be reset before the application starts. val uninitlog = initializelogifnecessary(true, silent = true) val appargs = parsearguments(args) if (appargs.verbose) { loginfo(appargs.tostring) } appargs.action match { case sparksubmitaction.submit => submit(appargs, uninitlog) case sparksubmitaction.kill => kill(appargs) case sparksubmitaction.request_status => requeststatus(appargs) case sparksubmitaction.print_version => printversion() } }
submit(appargs, uninitlog) 调用 runmain(args: sparksubmitarguments, uninitlog: boolean)
private def runmain(args: sparksubmitarguments, uninitlog: boolean): unit = { val (childargs, childclasspath, sparkconf, childmainclass) = preparesubmitenvironment(args) . . . try { mainclass = utils.classforname(childmainclass) } catch {...} val app: sparkapplication = if (classof[sparkapplication].isassignablefrom(mainclass)) { mainclass.getconstructor().newinstance().asinstanceof[sparkapplication] } else { new javamainapplication(mainclass) } . . . try { app.start(childargs.toarray, sparkconf) } catch { case t: throwable => throw findcause(t) } }
这里mainclass十分重要,先判读mainclass是否是sparkapplication的子类,如果是则通过反射调用其构造器创建对象;
如果不是则创建一个javamainapplication(是sparkapplication的子类)对象并在其override def start(args: array[string], conf: sparkconf)函数中利用反射执行mainclass中main函数。
sparkapplication创建完毕后执行其start(childargs.toarray, sparkconf) 方法。
/** * entry point for a spark application. implementations must provide a no-argument constructor. */ private[spark] trait sparkapplication { def start(args: array[string], conf: sparkconf): unit } /** * implementation of sparkapplication that wraps a standard java class with a "main" method. * * configuration is propagated to the application via system properties, so running multiple * of these in the same jvm may lead to undefined behavior due to configuration leaks. */ private[deploy] class javamainapplication(klass: class[_]) extends sparkapplication { override def start(args: array[string], conf: sparkconf): unit = { val mainmethod = klass.getmethod("main", new array[string](0).getclass) if (!modifier.isstatic(mainmethod.getmodifiers)) { throw new illegalstateexception("the main method in the given main class must be static") } val sysprops = conf.getall.tomap sysprops.foreach { case (k, v) => sys.props(k) = v } mainmethod.invoke(null, args) } }
如果**–deploy-mode** 是client mainclass的值由命令行参数 –class 决定,也就是org.apache.spark.examples.sparkpi。
这种情况下会在当前虚拟机中执行客户端代码,如果是其它条件情况会比较复杂。
以上文指定的运行命令为例,这里mainclass是org.apache.spark.deploy.yarn.yarnclusterapplication类class对象。
private[deploy] val yarn_cluster_submit_class = "org.apache.spark.deploy.yarn.yarnclusterapplication" ... if (isyarncluster) { childmainclass = yarn_cluster_submit_class if (args.ispython) { childargs += ("--primary-py-file", args.primaryresource) childargs += ("--class", "org.apache.spark.deploy.pythonrunner") } else if (args.isr) { val mainfile = new path(args.primaryresource).getname childargs += ("--primary-r-file", mainfile) childargs += ("--class", "org.apache.spark.deploy.rrunner") } else { if (args.primaryresource != sparklauncher.no_resource) { childargs += ("--jar", args.primaryresource) } childargs += ("--class", args.mainclass) } if (args.childargs != null) { args.childargs.foreach { arg => childargs += ("--arg", arg) } } }
五,org.apache.spark.deploy.yarn.yarnclusterapplication类
该类在spark-yarn包中。
<dependency> <groupid>org.apache.spark</groupid> <artifactid>spark-yarn_${scala.version}</artifactid> <version>${spark.version}</version> </dependency>
开始执行其override def start(args: array[string], conf: sparkconf) 方法。
private[spark] class yarnclusterapplication extends sparkapplication { override def start(args: array[string], conf: sparkconf): unit = { // sparksubmit would use yarn cache to distribute files & jars in yarn mode, // so remove them from sparkconf here for yarn mode. conf.remove(jars) conf.remove(files) new client(new clientarguments(args), conf, null).run() } }
sparksubmi进程中创建一个客户端client,该类是一个代理类其中包括yarnclient,执行run() 方法。
提交application给yarn集群resourcemanager,提交成功后返回appid,
如果spark.submit.deploymode=cluster&&spark.yarn.submit.waitappcompletion=true,
sparksubmit进程会定期输出appid日志直到任务结束(monitorapplication(appid)),否则会输出一次日志然后退出。
def run(): unit = { this.appid = submitapplication() if (!launcherbackend.isconnected() && fireandforget) { val report = getapplicationreport(appid) val state = report.getyarnapplicationstate loginfo(s"application report for $appid (state: $state)") loginfo(formatreportdetails(report)) if (state == yarnapplicationstate.failed || state == yarnapplicationstate.killed) { throw new sparkexception(s"application $appid finished with status: $state") } } else { val yarnappreport(appstate, finalstate, diags) = monitorapplication(appid) if (appstate == yarnapplicationstate.failed || finalstate == finalapplicationstatus.failed) { diags.foreach { err => logerror(s"application diagnostics message: $err") } throw new sparkexception(s"application $appid finished with failed status") } if (appstate == yarnapplicationstate.killed || finalstate == finalapplicationstatus.killed) { throw new sparkexception(s"application $appid is killed") } if (finalstate == finalapplicationstatus.undefined) { throw new sparkexception(s"the final status of application $appid is undefined") } } }
继续跟踪submitapplication()
def submitapplication(): applicationid = { resourcerequesthelper.validateresources(sparkconf) var appid: applicationid = null try { launcherbackend.connect() yarnclient.init(hadoopconf) yarnclient.start() loginfo("requesting a new application from cluster with %d nodemanagers" .format(yarnclient.getyarnclustermetrics.getnumnodemanagers)) // get a new application from our rm val newapp = yarnclient.createapplication() val newappresponse = newapp.getnewapplicationresponse() appid = newappresponse.getapplicationid() // the app staging dir based on the staging_dir configuration if configured // otherwise based on the users home directory. val appstagingbasedir = sparkconf.get(staging_dir) .map { new path(_, usergroupinformation.getcurrentuser.getshortusername) } .getorelse(filesystem.get(hadoopconf).gethomedirectory()) stagingdirpath = new path(appstagingbasedir, getappstagingdir(appid)) new callercontext("client", sparkconf.get(app_caller_context), option(appid.tostring)).setcurrentcontext() // verify whether the cluster has enough resources for our am verifyclusterresources(newappresponse) // set up the appropriate contexts to launch our am val containercontext = createcontainerlaunchcontext(newappresponse) val appcontext = createapplicationsubmissioncontext(newapp, containercontext) // finally, submit and monitor the application loginfo(s"submitting application $appid to resourcemanager") yarnclient.submitapplication(appcontext) launcherbackend.setappid(appid.tostring) reportlauncherstate(sparkapphandle.state.submitted) appid } catch { case e: throwable => if (stagingdirpath != null) { cleanupstagingdir() } throw e }
该方法做了如下工作(对应于任务提交流程图中的1,2,3):
1,向resourcemanager发送请求创建application,获取全局唯一的
appid。
2,根据配置的缓存目录信息+appid信息,创建运行application运行的缓存目录stagingdirpath。
3,verifyclusterresources 验证集群中是否有足够资源可用,没有的话抛出异常。
4,createcontainerlaunchcontext 创建container,其中封装了container进程的启动命令。
5,提交appcontext。
查看createcontainerlaunchcontext(newappresponse) 代码。
val amclass = if (isclustermode) { utils.classforname("org.apache.spark.deploy.yarn.applicationmaster").getname } else { utils.classforname("org.apache.spark.deploy.yarn.executorlauncher").getname } ... // command for the applicationmaster val commands = prefixenv ++ seq(environment.java_home.$$() + "/bin/java", "-server") ++ javaopts ++ amargs ++ seq( "1>", applicationconstants.log_dir_expansion_var + "/stdout", "2>", applicationconstants.log_dir_expansion_var + "/stderr") // todo: it would be nicer to just make sure there are no null commands here val printablecommands = commands.map(s => if (s == null) "null" else s).tolist amcontainer.setcommands(printablecommands.asjava)
container的启动代码大概为
bin/java -server org.apache.spark.deploy.yarn.applicationmaster --class …
六, org.apache.spark.deploy.yarn.applicationmaster 类。
yarn集群某一个nodemanager收到resourcemanager的命令,启动applicationmaster进程,对应任务提交流程图中的步骤4.
查看applicationmaster 伴生对象中的main方法。
def main(args: array[string]): unit = { signalutils.registerlogger(log) val amargs = new applicationmasterarguments(args) val sparkconf = new sparkconf() if (amargs.propertiesfile != null) { utils.getpropertiesfromfile(amargs.propertiesfile).foreach { case (k, v) => sparkconf.set(k, v) } } // set system properties for each config entry. this covers two use cases: // - the default configuration stored by the sparkhadooputil class // - the user application creating a new sparkconf in cluster mode // // both cases create a new sparkconf object which reads these configs from system properties. sparkconf.getall.foreach { case (k, v) => sys.props(k) = v } val yarnconf = new yarnconfiguration(sparkhadooputil.newconfiguration(sparkconf)) master = new applicationmaster(amargs, sparkconf, yarnconf) val ugi = sparkconf.get(principal) match { // we only need to log in with the keytab in cluster mode. in client mode, the driver // handles the user keytab. case some(principal) if master.isclustermode => val originalcreds = usergroupinformation.getcurrentuser().getcredentials() sparkhadooputil.get.loginuserfromkeytab(principal, sparkconf.get(keytab).ornull) val newugi = usergroupinformation.getcurrentuser() if (master.appattemptid == null || master.appattemptid.getattemptid > 1) { // re-obtain delegation tokens if this is not a first attempt, as they might be outdated // as of now. add the fresh tokens on top of the original user's credentials (overwrite). // set the context class loader so that the token manager has access to jars // distributed by the user. utils.withcontextclassloader(master.userclassloader) { val credentialmanager = new hadoopdelegationtokenmanager(sparkconf, yarnconf, null) credentialmanager.obtaindelegationtokens(originalcreds) } } // transfer the original user's tokens to the new user, since it may contain needed tokens // (such as those user to connect to yarn). newugi.addcredentials(originalcreds) newugi case _ => sparkhadooputil.get.createsparkuser() } ugi.doas(new privilegedexceptionaction[unit]() { override def run(): unit = system.exit(master.run()) }) }
创建了applicationmaster对象并执行其run() 方法。
final def run(): int = { try { val attemptid = if (isclustermode) { // set the web ui port to be ephemeral for yarn so we don't conflict with // other spark processes running on the same box system.setproperty(ui_port.key, "0") // set the master and deploy mode property to match the requested mode. system.setproperty("spark.master", "yarn") system.setproperty(submit_deploy_mode.key, "cluster") // set this internal configuration if it is running on cluster mode, this // configuration will be checked in sparkcontext to avoid misuse of yarn cluster mode. system.setproperty("spark.yarn.app.id", appattemptid.getapplicationid().tostring()) option(appattemptid.getattemptid.tostring) } else { none } new callercontext( "appmaster", sparkconf.get(app_caller_context), option(appattemptid.getapplicationid.tostring), attemptid).setcurrentcontext() loginfo("applicationattemptid: " + appattemptid) // this shutdown hook should run *after* the sparkcontext is shut down. val priority = shutdownhookmanager.spark_context_shutdown_priority - 1 shutdownhookmanager.addshutdownhook(priority) { () => val maxappattempts = client.getmaxregattempts(sparkconf, yarnconf) val islastattempt = appattemptid.getattemptid() >= maxappattempts if (!finished) { // the default state of applicationmaster is failed if it is invoked by shut down hook. // this behavior is different compared to 1.x version. // if user application is exited ahead of time by calling system.exit(n), here mark // this application as failed with exit_early. for a good shutdown, user shouldn't call // system.exit(0) to terminate the application. finish(finalstatus, applicationmaster.exit_early, "shutdown hook called before final status was reported.") } if (!unregistered) { // we only want to unregister if we don't want the rm to retry if (finalstatus == finalapplicationstatus.succeeded || islastattempt) { unregister(finalstatus, finalmsg) cleanupstagingdir(new path(system.getenv("spark_yarn_staging_dir"))) } } } if (isclustermode) { rundriver() } else { runexecutorlauncher() } } catch { case e: exception => // catch everything else if not specifically handled logerror("uncaught exception: ", e) finish(finalapplicationstatus.failed, applicationmaster.exit_uncaught_exception, "uncaught exception: " + stringutils.stringifyexception(e)) } finally { try { metricssystem.foreach { ms => ms.report() ms.stop() } } catch { case e: exception => logwarning("exception during stopping of the metric system: ", e) } } exitcode }
执行rundriver()方法。
userclassthread = startuserapplication() 启动了一个名为driver的线程,该线程中通过反射执行命令行中**–class指定的类(org.apache.spark.examples.sparkpi)中的main**函数,初始化sparkcontext。主线程唤醒后,向resourcemanager注册applicationmaster,步骤5;
private def rundriver(): unit = { addamipfilter(none, system.getenv(applicationconstants.application_web_proxy_base_env)) userclassthread = startuserapplication() // this a bit hacky, but we need to wait until the spark.driver.port property has // been set by the thread executing the user class. loginfo("waiting for spark context initialization...") val totalwaittime = sparkconf.get(am_max_wait_time) try { val sc = threadutils.awaitresult(sparkcontextpromise.future, duration(totalwaittime, timeunit.milliseconds)) if (sc != null) { val rpcenv = sc.env.rpcenv val userconf = sc.getconf val host = userconf.get(driver_host_address) val port = userconf.get(driver_port) registeram(host, port, userconf, sc.ui.map(_.weburl), appattemptid) val driverref = rpcenv.setupendpointref( rpcaddress(host, port), yarnschedulerbackend.endpoint_name) createallocator(driverref, userconf, rpcenv, appattemptid, distcacheconf) } else { // sanity check; should never happen in normal operation, since sc should only be null // if the user app did not create a sparkcontext. throw new illegalstateexception("user did not initialize spark context!") } resumedriver() userclassthread.join() } catch { case e: sparkexception if e.getcause().isinstanceof[timeoutexception] => logerror( s"sparkcontext did not initialize after waiting for $totalwaittime ms. " + "please check earlier log output for errors. failing the application.") finish(finalapplicationstatus.failed, applicationmaster.exit_sc_not_inited, "timed out waiting for sparkcontext.") } finally { resumedriver() } }
private def startuserapplication(): thread = { loginfo("starting the user application in a separate thread") var userargs = args.userargs if (args.primarypyfile != null && args.primarypyfile.endswith(".py")) { // when running pyspark, the app is run using pythonrunner. the second argument is the list // of files to add to pythonpath, which client.scala already handles, so it's empty. userargs = seq(args.primarypyfile, "") ++ userargs } if (args.primaryrfile != null && (args.primaryrfile.endswith(".r") || args.primaryrfile.endswith(".r"))) { // todo(davies): add r dependencies here } val mainmethod = userclassloader.loadclass(args.userclass) .getmethod("main", classof[array[string]]) val userthread = new thread { override def run(): unit = { try { if (!modifier.isstatic(mainmethod.getmodifiers)) { logerror(s"could not find static main method in object ${args.userclass}") finish(finalapplicationstatus.failed, applicationmaster.exit_exception_user_class) } else { mainmethod.invoke(null, userargs.toarray) finish(finalapplicationstatus.succeeded, applicationmaster.exit_success) logdebug("done running user class") } } catch { case e: invocationtargetexception => e.getcause match { case _: interruptedexception => // reporter thread can interrupt to stop user class case sparkuserappexception(exitcode) => val msg = s"user application exited with status $exitcode" logerror(msg) finish(finalapplicationstatus.failed, exitcode, msg) case cause: throwable => logerror("user class threw exception: " + cause, cause) finish(finalapplicationstatus.failed, applicationmaster.exit_exception_user_class, "user class threw exception: " + stringutils.stringifyexception(cause)) } sparkcontextpromise.tryfailure(e.getcause()) } finally { // notify the thread waiting for the sparkcontext, in case the application did not // instantiate one. this will do nothing when the user code instantiates a sparkcontext // (with the correct master), or when the user code throws an exception (due to the // tryfailure above). sparkcontextpromise.trysuccess(null) } } } userthread.setcontextclassloader(userclassloader) userthread.setname("driver") userthread.start() userthread }
注册完成后,主线程处理yarn返回的资源createallocator(driverref, userconf, rpcenv, appattemptid, distcacheconf)。
private def createallocator( driverref: rpcendpointref, _sparkconf: sparkconf, rpcenv: rpcenv, appattemptid: applicationattemptid, distcacheconf: sparkconf): unit = { // in client mode, the am may be restarting after delegation tokens have reached their ttl. so // always contact the driver to get the current set of valid tokens, so that local resources can // be initialized below. if (!isclustermode) { val tokens = driverref.asksync[array[byte]](retrievedelegationtokens) if (tokens != null) { sparkhadooputil.get.adddelegationtokens(tokens, _sparkconf) } } val appid = appattemptid.getapplicationid().tostring() val driverurl = rpcendpointaddress(driverref.address.host, driverref.address.port, coarsegrainedschedulerbackend.endpoint_name).tostring val localresources = preparelocalresources(distcacheconf) // before we initialize the allocator, let's log the information about how executors will // be run up front, to avoid printing this out for every single executor being launched. // use placeholders for information that changes such as executor ids. loginfo { val executormemory = _sparkconf.get(executor_memory).toint val executorcores = _sparkconf.get(executor_cores) val dummyrunner = new executorrunnable(none, yarnconf, _sparkconf, driverurl, "<executorid>", "<hostname>", executormemory, executorcores, appid, securitymgr, localresources, resourceprofile.default_resource_profile_id) dummyrunner.launchcontextdebuginfo() } allocator = client.createallocator( yarnconf, _sparkconf, appattemptid, driverurl, driverref, securitymgr, localresources) // initialize the am endpoint *after* the allocator has been initialized. this ensures // that when the driver sends an initial executor request (e.g. after an am restart), // the allocator is ready to service requests. rpcenv.setupendpoint("yarnam", new amendpoint(rpcenv, driverref)) allocator.allocateresources() val ms = metricssystem.createmetricssystem(metricssysteminstances.application_master, sparkconf, securitymgr) val prefix = _sparkconf.get(yarn_metrics_namespace).getorelse(appid) ms.registersource(new applicationmastersource(prefix, allocator)) // do not register static sources in this case as per spark-25277 ms.start(false) metricssystem = some(ms) reporterthread = launchreporterthread() }
只看关键代码allocator.allocateresources(),处理分配的资源。
def allocateresources(): unit = synchronized { updateresourcerequests() val progressindicator = 0.1f // poll the resourcemanager. this doubles as a heartbeat if there are no pending container // requests. val allocateresponse = amclient.allocate(progressindicator) val allocatedcontainers = allocateresponse.getallocatedcontainers() allocatorblacklisttracker.setnumclusternodes(allocateresponse.getnumclusternodes) if (allocatedcontainers.size > 0) { logdebug(("allocated containers: %d. current executor count: %d. " + "launching executor count: %d. cluster resources: %s.") .format( allocatedcontainers.size, runningexecutors.size, numexecutorsstarting.get, allocateresponse.getavailableresources)) handleallocatedcontainers(allocatedcontainers.asscala) } val completedcontainers = allocateresponse.getcompletedcontainersstatuses() if (completedcontainers.size > 0) { logdebug("completed %d containers".format(completedcontainers.size)) processcompletedcontainers(completedcontainers.asscala) logdebug("finished processing %d completed containers. current running executor count: %d." .format(completedcontainers.size, runningexecutors.size)) } }
如果分配的container数量大于0,调用** handleallocatedcontainers(allocatedcontainers.asscala)**
def handleallocatedcontainers(allocatedcontainers: seq[container]): unit = { val containerstouse = new arraybuffer[container](allocatedcontainers.size) // match incoming requests by host val remainingafterhostmatches = new arraybuffer[container] for (allocatedcontainer <- allocatedcontainers) { matchcontainertorequest(allocatedcontainer, allocatedcontainer.getnodeid.gethost, containerstouse, remainingafterhostmatches) } // match remaining by rack. because yarn's rackresolver swallows thread interrupts // (see spark-27094), which can cause this code to miss interrupts from the am, use // a separate thread to perform the operation. val remainingafterrackmatches = new arraybuffer[container] if (remainingafterhostmatches.nonempty) { var exception: option[throwable] = none val thread = new thread("spark-rack-resolver") { override def run(): unit = { try { for (allocatedcontainer <- remainingafterhostmatches) { val rack = resolver.resolve(allocatedcontainer.getnodeid.gethost) matchcontainertorequest(allocatedcontainer, rack, containerstouse, remainingafterrackmatches) } } catch { case e: throwable => exception = some(e) } } } thread.setdaemon(true) thread.start() try { thread.join() } catch { case e: interruptedexception => thread.interrupt() throw e } if (exception.isdefined) { throw exception.get } } // assign remaining that are neither node-local nor rack-local val remainingafteroffrackmatches = new arraybuffer[container] for (allocatedcontainer <- remainingafterrackmatches) { matchcontainertorequest(allocatedcontainer, any_host, containerstouse, remainingafteroffrackmatches) } if (remainingafteroffrackmatches.nonempty) { logdebug(s"releasing ${remainingafteroffrackmatches.size} unneeded containers that were " + s"allocated to us") for (container <- remainingafteroffrackmatches) { internalreleasecontainer(container) } } runallocatedcontainers(containerstouse) loginfo("received %d containers from yarn, launching executors on %d of them." .format(allocatedcontainers.size, containerstouse.size)) }
这里会根据主机host,机架rack等信息队container进行分配。完成后启动container,runallocatedcontainers(containerstouse)。
private val launcherpool = threadutils.newdaemoncachedthreadpool( "containerlauncher", sparkconf.get(container_launch_max_threads))
创建线程池launcherpool。
/** * launches executors in the allocated containers. */ private def runallocatedcontainers(containerstouse: arraybuffer[container]): unit = { for (container <- containerstouse) { executoridcounter += 1 val executorhostname = container.getnodeid.gethost val containerid = container.getid val executorid = executoridcounter.tostring assert(container.getresource.getmemory >= resource.getmemory) loginfo(s"launching container $containerid on host $executorhostname " + s"for executor with id $executorid") def updateinternalstate(): unit = synchronized { runningexecutors.add(executorid) numexecutorsstarting.decrementandget() executoridtocontainer(executorid) = container containeridtoexecutorid(container.getid) = executorid val containerset = allocatedhosttocontainersmap.getorelseupdate(executorhostname, new hashset[containerid]) containerset += containerid allocatedcontainertohostmap.put(containerid, executorhostname) } if (runningexecutors.size() < targetnumexecutors) { numexecutorsstarting.incrementandget() if (launchcontainers) { launcherpool.execute(() => { try { new executorrunnable( some(container), conf, sparkconf, driverurl, executorid, executorhostname, executormemory, executorcores, appattemptid.getapplicationid.tostring, securitymgr, localresources, resourceprofile.default_resource_profile_id // use until fully supported ).run() updateinternalstate() } catch { case e: throwable => numexecutorsstarting.decrementandget() if (nonfatal(e)) { logerror(s"failed to launch executor $executorid on container $containerid", e) // assigned container should be released immediately // to avoid unnecessary resource occupation. amclient.releaseassignedcontainer(containerid) } else { throw e } } }) } else { // for test only updateinternalstate() } } else { loginfo(("skip launching executorrunnable as running executors count: %d " + "reached target executors count: %d.").format( runningexecutors.size, targetnumexecutors)) } } }
查看executorrunnable 类,其中nmclient = nmclient.createnmclient(), nodemanager客户端,负责于nodemanager交互;其preparecommand() 方法拼接了一个进程启动命令,大体格式为:
bin/java -server org.apache.spark.executor.yarncoarsegrainedexecutorbackend ...
applicationmaster进程中的launcherpool线程池,会根据container的个数挨个启动线程executorrunnable,executorrunnable中的nmclient会将拼接好的jvm启动命令发送给相关的nodemanager,启动container进程,进程名为yarncoarsegrainedexecutorbackend。
executorrunnable完整代码:
private[yarn] class executorrunnable( container: option[container], conf: yarnconfiguration, sparkconf: sparkconf, masteraddress: string, executorid: string, hostname: string, executormemory: int, executorcores: int, appid: string, securitymgr: securitymanager, localresources: map[string, localresource], resourceprofileid: int) extends logging { var rpc: yarnrpc = yarnrpc.create(conf) var nmclient: nmclient = _ def run(): unit = { logdebug("starting executor container") nmclient = nmclient.createnmclient() nmclient.init(conf) nmclient.start() startcontainer() } def launchcontextdebuginfo(): string = { val commands = preparecommand() val env = prepareenvironment() s""" |=============================================================================== |default yarn executor launch context: | env: |${utils.redact(sparkconf, env.toseq).map { case (k, v) => s" $k -> $v\n" }.mkstring} | command: | ${utils.redactcommandlineargs(sparkconf, commands).mkstring(" \\ \n ")} | | resources: |${localresources.map { case (k, v) => s" $k -> $v\n" }.mkstring} |===============================================================================""".stripmargin } def startcontainer(): java.util.map[string, bytebuffer] = { val ctx = records.newrecord(classof[containerlaunchcontext]) .asinstanceof[containerlaunchcontext] val env = prepareenvironment().asjava ctx.setlocalresources(localresources.asjava) ctx.setenvironment(env) val credentials = usergroupinformation.getcurrentuser().getcredentials() val dob = new dataoutputbuffer() credentials.writetokenstoragetostream(dob) ctx.settokens(bytebuffer.wrap(dob.getdata())) val commands = preparecommand() ctx.setcommands(commands.asjava) ctx.setapplicationacls( yarnsparkhadooputil.getapplicationaclsforyarn(securitymgr).asjava) // if external shuffle service is enabled, register with the yarn shuffle service already // started on the nodemanager and, if authentication is enabled, provide it with our secret // key for fetching shuffle files later if (sparkconf.get(shuffle_service_enabled)) { val secretstring = securitymgr.getsecretkey() val secretbytes = if (secretstring != null) { // this conversion must match how the yarnshuffleservice decodes our secret javautils.stringtobytes(secretstring) } else { // authentication is not enabled, so just provide dummy metadata bytebuffer.allocate(0) } ctx.setservicedata(collections.singletonmap("spark_shuffle", secretbytes)) } // send the start request to the containermanager try { nmclient.startcontainer(container.get, ctx) } catch { case ex: exception => throw new sparkexception(s"exception while starting container ${container.get.getid}" + s" on host $hostname", ex) } } private def preparecommand(): list[string] = { // extra options for the jvm val javaopts = listbuffer[string]() // set the jvm memory val executormemorystring = executormemory + "m" javaopts += "-xmx" + executormemorystring // set extra java options for the executor, if defined sparkconf.get(executor_java_options).foreach { opts => val subsopt = utils.substituteappnexecids(opts, appid, executorid) javaopts ++= utils.splitcommandstring(subsopt).map(yarnsparkhadooputil.escapeforshell) } // set the library path through a command prefix to append to the existing value of the // env variable. val prefixenv = sparkconf.get(executor_library_path).map { libpath => client.createlibrarypathprefix(libpath, sparkconf) } javaopts += "-djava.io.tmpdir=" + new path(environment.pwd.$$(), yarnconfiguration.default_container_temp_dir) // certain configs need to be passed here because they are needed before the executor // registers with the scheduler and transfers the spark configs. since the executor backend // uses rpc to connect to the scheduler, the rpc settings are needed as well as the // authentication settings. sparkconf.getall .filter { case (k, v) => sparkconf.isexecutorstartupconf(k) } .foreach { case (k, v) => javaopts += yarnsparkhadooputil.escapeforshell(s"-d$k=$v") } // commenting it out for now - so that people can refer to the properties if required. remove // it once cpuset version is pushed out. // the context is, default gc for server class machines end up using all cores to do gc - hence // if there are multiple containers in same node, spark gc effects all other containers // performance (which can also be other spark containers) // instead of using this, rely on cpusets by yarn to enforce spark behaves 'properly' in // multi-tenant environments. not sure how default java gc behaves if it is limited to subset // of cores on a node. /* else { // if no java_opts specified, default to using -xx:+cmsincrementalmode // it might be possible that other modes/config is being done in // spark.executor.extrajavaoptions, so we don't want to mess with it. // in our expts, using (default) throughput collector has severe perf ramifications in // multi-tenant machines // the options are based on // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20when%20to%20use // %20the%20concurrent%20low%20pause%20collector|outline javaopts += "-xx:+useconcmarksweepgc" javaopts += "-xx:+cmsincrementalmode" javaopts += "-xx:+cmsincrementalpacing" javaopts += "-xx:cmsincrementaldutycyclemin=0" javaopts += "-xx:cmsincrementaldutycycle=10" } */ // for log4j configuration to reference javaopts += ("-dspark.yarn.app.container.log.dir=" + applicationconstants.log_dir_expansion_var) val userclasspath = client.getuserclasspath(sparkconf).flatmap { uri => val abspath = if (new file(uri.getpath()).isabsolute()) { client.getclusterpath(sparkconf, uri.getpath()) } else { client.buildpath(environment.pwd.$(), uri.getpath()) } seq("--user-class-path", "file:" + abspath) }.toseq yarnsparkhadooputil.addoutofmemoryerrorargument(javaopts) val commands = prefixenv ++ seq(environment.java_home.$$() + "/bin/java", "-server") ++ javaopts ++ seq("org.apache.spark.executor.yarncoarsegrainedexecutorbackend", "--driver-url", masteraddress, "--executor-id", executorid, "--hostname", hostname, "--cores", executorcores.tostring, "--app-id", appid, "--resourceprofileid", resourceprofileid.tostring) ++ userclasspath ++ seq( s"1>${applicationconstants.log_dir_expansion_var}/stdout", s"2>${applicationconstants.log_dir_expansion_var}/stderr") // todo: it would be nicer to just make sure there are no null commands here commands.map(s => if (s == null) "null" else s).tolist } private def prepareenvironment(): hashmap[string, string] = { val env = new hashmap[string, string]() client.populateclasspath(null, conf, sparkconf, env, sparkconf.get(executor_class_path)) system.getenv().asscala.filterkeys(_.startswith("spark")) .foreach { case (k, v) => env(k) = v } sparkconf.getexecutorenv.foreach { case (key, value) => if (key == environment.classpath.name()) { // if the key of env variable is classpath, we assume it is a path and append it. // this is kept for backward compatibility and consistency with hadoop yarnsparkhadooputil.addpathtoenvironment(env, key, value) } else { // for other env variables, simply overwrite the value. env(key) = value } } env } }
以上就是解析spark源码yarn-cluster模式任务提交的详细内容,更多关于spark源码解析的资料请关注其它相关文章!