欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  网络运营

解析spark源码yarn-cluster模式任务提交

程序员文章站 2022-03-07 08:24:59
目录一,运行命令二,任务提交流程图三,启动脚本四,程序入口类org.apache.spark.deploy.sparksubmit五,org.apache.spark.deploy.yarn.yarn...

一,运行命令

bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--class org.apache.spark.examples.sparkpi \
examples/jars/spark-examples_2.11-2.3.1.3.0.1.0-187.jar  

二,任务提交流程图

解析spark源码yarn-cluster模式任务提交

三,启动脚本

查看spark-submit 脚本文件,程序入口为

解析spark源码yarn-cluster模式任务提交

exec "${spark_home}"/bin/spark-class org.apache.spark.deploy.sparksubmit "$@“

查看${spark_home}"/bin/spark-class可知该脚本执行了java -cp main-class 命令启动了一个java进程,进程名为sparksubmit,main函数在主类org.apache.spark.deploy.sparksubmit中。

实际执行的具体命令为:

/etc/alternatives/jre/bin/java -dhdp.version=3.0.1.0-187 -cp /usr/hdp/3.0.1.0-187/spark2/conf/:/usr/hdp/3.0.1.0-187/spark2/jars/*:/usr/hdp/3.0.1.0-187/hadoop/conf/ -xmx1g org.apache.spark.deploy.sparksubmit --master yarn --class org.apache.spark.examples.sparkpi examples/jars/spark-examples_2.11-2.3.1.3.0.1.0-187.jar 

四,程序入口类org.apache.spark.deploy.sparksubmit

该类有个伴生对象,其中有main函数,创建了sparksubmit对象并执行dosubmit();

override def main(args: array[string]): unit = {
  val submit = new sparksubmit() {...}
  submit.dosubmit(args)
}

dosubmit 解析args参数,封装到appargs:sparksubmitarguments对象中,然后执行submit(appargs, uninitlog)。

def dosubmit(args: array[string]): unit = {
  // initialize logging if it hasn't been done yet. keep track of whether logging needs to
  // be reset before the application starts.
  val uninitlog = initializelogifnecessary(true, silent = true)
  val appargs = parsearguments(args)
  if (appargs.verbose) {
    loginfo(appargs.tostring)
  }
  appargs.action match {
    case sparksubmitaction.submit => submit(appargs, uninitlog)
    case sparksubmitaction.kill => kill(appargs)
    case sparksubmitaction.request_status => requeststatus(appargs)
    case sparksubmitaction.print_version => printversion()
  }
}

submit(appargs, uninitlog) 调用 runmain(args: sparksubmitarguments, uninitlog: boolean)

private def runmain(args: sparksubmitarguments, uninitlog: boolean): unit = {
  val (childargs, childclasspath, sparkconf, childmainclass) = preparesubmitenvironment(args)
    .
    .
    .
  try {
     mainclass = utils.classforname(childmainclass)
  } catch {...}
  val app: sparkapplication = if (classof[sparkapplication].isassignablefrom(mainclass)) {
    mainclass.getconstructor().newinstance().asinstanceof[sparkapplication]
  } else {
    new javamainapplication(mainclass)
  }
    .
    .
    .
  try {
    app.start(childargs.toarray, sparkconf)
  } catch {
    case t: throwable =>
      throw findcause(t)
  }
}

这里mainclass十分重要,先判读mainclass是否是sparkapplication的子类,如果是则通过反射调用其构造器创建对象;

如果不是则创建一个javamainapplication(是sparkapplication的子类)对象并在其override def start(args: array[string], conf: sparkconf)函数中利用反射执行mainclass中main函数。

sparkapplication创建完毕后执行其start(childargs.toarray, sparkconf) 方法。

/**
 * entry point for a spark application. implementations must provide a no-argument constructor.
 */
private[spark] trait sparkapplication {
  def start(args: array[string], conf: sparkconf): unit
}
/**
 * implementation of sparkapplication that wraps a standard java class with a "main" method.
 *
 * configuration is propagated to the application via system properties, so running multiple
 * of these in the same jvm may lead to undefined behavior due to configuration leaks.
 */
private[deploy] class javamainapplication(klass: class[_]) extends sparkapplication {
  override def start(args: array[string], conf: sparkconf): unit = {
    val mainmethod = klass.getmethod("main", new array[string](0).getclass)
    if (!modifier.isstatic(mainmethod.getmodifiers)) {
      throw new illegalstateexception("the main method in the given main class must be static")
    }
    val sysprops = conf.getall.tomap
    sysprops.foreach { case (k, v) =>
      sys.props(k) = v
    }
    mainmethod.invoke(null, args)
  }
}

如果**–deploy-mode** 是client mainclass的值由命令行参数 –class 决定,也就是org.apache.spark.examples.sparkpi。

这种情况下会在当前虚拟机中执行客户端代码,如果是其它条件情况会比较复杂。

以上文指定的运行命令为例,这里mainclass是org.apache.spark.deploy.yarn.yarnclusterapplication类class对象。

private[deploy] val yarn_cluster_submit_class =
  "org.apache.spark.deploy.yarn.yarnclusterapplication"
...
if (isyarncluster) {
  childmainclass = yarn_cluster_submit_class
  if (args.ispython) {
    childargs += ("--primary-py-file", args.primaryresource)
    childargs += ("--class", "org.apache.spark.deploy.pythonrunner")
  } else if (args.isr) {
    val mainfile = new path(args.primaryresource).getname
    childargs += ("--primary-r-file", mainfile)
    childargs += ("--class", "org.apache.spark.deploy.rrunner")
  } else {
    if (args.primaryresource != sparklauncher.no_resource) {
      childargs += ("--jar", args.primaryresource)
    }
    childargs += ("--class", args.mainclass)
  }
  if (args.childargs != null) {
    args.childargs.foreach { arg => childargs += ("--arg", arg) }
  }
}

五,org.apache.spark.deploy.yarn.yarnclusterapplication类

该类在spark-yarn包中。

<dependency>
    <groupid>org.apache.spark</groupid>
    <artifactid>spark-yarn_${scala.version}</artifactid>
    <version>${spark.version}</version>
</dependency>

开始执行其override def start(args: array[string], conf: sparkconf) 方法。

private[spark] class yarnclusterapplication extends sparkapplication {

  override def start(args: array[string], conf: sparkconf): unit = {
    // sparksubmit would use yarn cache to distribute files & jars in yarn mode,
    // so remove them from sparkconf here for yarn mode.
    conf.remove(jars)
    conf.remove(files)
    new client(new clientarguments(args), conf, null).run()
  }
}

sparksubmi进程中创建一个客户端client,该类是一个代理类其中包括yarnclient,执行run() 方法。

提交application给yarn集群resourcemanager,提交成功后返回appid,

如果spark.submit.deploymode=cluster&&spark.yarn.submit.waitappcompletion=true,

sparksubmit进程会定期输出appid日志直到任务结束(monitorapplication(appid)),否则会输出一次日志然后退出。

def run(): unit = {
    this.appid = submitapplication()
    if (!launcherbackend.isconnected() && fireandforget) {
      val report = getapplicationreport(appid)
      val state = report.getyarnapplicationstate
      loginfo(s"application report for $appid (state: $state)")
      loginfo(formatreportdetails(report))
      if (state == yarnapplicationstate.failed || state == yarnapplicationstate.killed) {
        throw new sparkexception(s"application $appid finished with status: $state")
      }
    } else {
      val yarnappreport(appstate, finalstate, diags) = monitorapplication(appid)
      if (appstate == yarnapplicationstate.failed || finalstate == finalapplicationstatus.failed) {
        diags.foreach { err =>
          logerror(s"application diagnostics message: $err")
        }
        throw new sparkexception(s"application $appid finished with failed status")
      }
      if (appstate == yarnapplicationstate.killed || finalstate == finalapplicationstatus.killed) {
        throw new sparkexception(s"application $appid is killed")
      }
      if (finalstate == finalapplicationstatus.undefined) {
        throw new sparkexception(s"the final status of application $appid is undefined")
      }
    }
  }

继续跟踪submitapplication()

def submitapplication(): applicationid = {
    resourcerequesthelper.validateresources(sparkconf)
    var appid: applicationid = null
    try {
      launcherbackend.connect()
      yarnclient.init(hadoopconf)
      yarnclient.start()
      loginfo("requesting a new application from cluster with %d nodemanagers"
        .format(yarnclient.getyarnclustermetrics.getnumnodemanagers))
      // get a new application from our rm
      val newapp = yarnclient.createapplication()
      val newappresponse = newapp.getnewapplicationresponse()
      appid = newappresponse.getapplicationid()
      // the app staging dir based on the staging_dir configuration if configured
      // otherwise based on the users home directory.
      val appstagingbasedir = sparkconf.get(staging_dir)
        .map { new path(_, usergroupinformation.getcurrentuser.getshortusername) }
        .getorelse(filesystem.get(hadoopconf).gethomedirectory())
      stagingdirpath = new path(appstagingbasedir, getappstagingdir(appid))
      new callercontext("client", sparkconf.get(app_caller_context),
        option(appid.tostring)).setcurrentcontext()
      // verify whether the cluster has enough resources for our am
      verifyclusterresources(newappresponse)
      // set up the appropriate contexts to launch our am
      val containercontext = createcontainerlaunchcontext(newappresponse)
      val appcontext = createapplicationsubmissioncontext(newapp, containercontext)
      // finally, submit and monitor the application
      loginfo(s"submitting application $appid to resourcemanager")
      yarnclient.submitapplication(appcontext)
      launcherbackend.setappid(appid.tostring)
      reportlauncherstate(sparkapphandle.state.submitted)
      appid
    } catch {
      case e: throwable =>
        if (stagingdirpath != null) {
          cleanupstagingdir()
        }
        throw e
    }

该方法做了如下工作(对应于任务提交流程图中的1,2,3):
1,向resourcemanager发送请求创建application,获取全局唯一的
appid。
2,根据配置的缓存目录信息+appid信息,创建运行application运行的缓存目录stagingdirpath。
3,verifyclusterresources 验证集群中是否有足够资源可用,没有的话抛出异常。
4,createcontainerlaunchcontext 创建container,其中封装了container进程的启动命令。
5,提交appcontext。

查看createcontainerlaunchcontext(newappresponse) 代码。

val amclass =
      if (isclustermode) {
        utils.classforname("org.apache.spark.deploy.yarn.applicationmaster").getname
      } else {
        utils.classforname("org.apache.spark.deploy.yarn.executorlauncher").getname
      }
...
// command for the applicationmaster
    val commands = prefixenv ++
      seq(environment.java_home.$$() + "/bin/java", "-server") ++
      javaopts ++ amargs ++
      seq(
        "1>", applicationconstants.log_dir_expansion_var + "/stdout",
        "2>", applicationconstants.log_dir_expansion_var + "/stderr")
    // todo: it would be nicer to just make sure there are no null commands here
    val printablecommands = commands.map(s => if (s == null) "null" else s).tolist
    amcontainer.setcommands(printablecommands.asjava)

container的启动代码大概为
bin/java -server org.apache.spark.deploy.yarn.applicationmaster --class …

六, org.apache.spark.deploy.yarn.applicationmaster 类。

yarn集群某一个nodemanager收到resourcemanager的命令,启动applicationmaster进程,对应任务提交流程图中的步骤4.
查看applicationmaster 伴生对象中的main方法。

def main(args: array[string]): unit = {
    signalutils.registerlogger(log)
    val amargs = new applicationmasterarguments(args)
    val sparkconf = new sparkconf()
    if (amargs.propertiesfile != null) {
      utils.getpropertiesfromfile(amargs.propertiesfile).foreach { case (k, v) =>
        sparkconf.set(k, v)
      }
    }
    // set system properties for each config entry. this covers two use cases:
    // - the default configuration stored by the sparkhadooputil class
    // - the user application creating a new sparkconf in cluster mode
    //
    // both cases create a new sparkconf object which reads these configs from system properties.
    sparkconf.getall.foreach { case (k, v) =>
      sys.props(k) = v
    }
    val yarnconf = new yarnconfiguration(sparkhadooputil.newconfiguration(sparkconf))
    master = new applicationmaster(amargs, sparkconf, yarnconf)
    val ugi = sparkconf.get(principal) match {
      // we only need to log in with the keytab in cluster mode. in client mode, the driver
      // handles the user keytab.
      case some(principal) if master.isclustermode =>
        val originalcreds = usergroupinformation.getcurrentuser().getcredentials()
        sparkhadooputil.get.loginuserfromkeytab(principal, sparkconf.get(keytab).ornull)
        val newugi = usergroupinformation.getcurrentuser()
       if (master.appattemptid == null || master.appattemptid.getattemptid > 1) {
          // re-obtain delegation tokens if this is not a first attempt, as they might be outdated
          // as of now. add the fresh tokens on top of the original user's credentials (overwrite).
          // set the context class loader so that the token manager has access to jars
          // distributed by the user.
          utils.withcontextclassloader(master.userclassloader) {
            val credentialmanager = new hadoopdelegationtokenmanager(sparkconf, yarnconf, null)
            credentialmanager.obtaindelegationtokens(originalcreds)
          }
        }
        // transfer the original user's tokens to the new user, since it may contain needed tokens
        // (such as those user to connect to yarn).
        newugi.addcredentials(originalcreds)
        newugi
      case _ =>
        sparkhadooputil.get.createsparkuser()
    }
    ugi.doas(new privilegedexceptionaction[unit]() {
      override def run(): unit = system.exit(master.run())
    })
  }

创建了applicationmaster对象并执行其run() 方法。

 final def run(): int = {
    try {
      val attemptid = if (isclustermode) {
        // set the web ui port to be ephemeral for yarn so we don't conflict with
        // other spark processes running on the same box
        system.setproperty(ui_port.key, "0")
        // set the master and deploy mode property to match the requested mode.
        system.setproperty("spark.master", "yarn")
        system.setproperty(submit_deploy_mode.key, "cluster")
        // set this internal configuration if it is running on cluster mode, this
        // configuration will be checked in sparkcontext to avoid misuse of yarn cluster mode.
        system.setproperty("spark.yarn.app.id", appattemptid.getapplicationid().tostring())
        option(appattemptid.getattemptid.tostring)
      } else {
        none
      }
      new callercontext(
        "appmaster", sparkconf.get(app_caller_context),
        option(appattemptid.getapplicationid.tostring), attemptid).setcurrentcontext()
      loginfo("applicationattemptid: " + appattemptid)
      // this shutdown hook should run *after* the sparkcontext is shut down.
      val priority = shutdownhookmanager.spark_context_shutdown_priority - 1
      shutdownhookmanager.addshutdownhook(priority) { () =>
        val maxappattempts = client.getmaxregattempts(sparkconf, yarnconf)
        val islastattempt = appattemptid.getattemptid() >= maxappattempts
        if (!finished) {
          // the default state of applicationmaster is failed if it is invoked by shut down hook.
          // this behavior is different compared to 1.x version.
          // if user application is exited ahead of time by calling system.exit(n), here mark
          // this application as failed with exit_early. for a good shutdown, user shouldn't call
          // system.exit(0) to terminate the application.
          finish(finalstatus,
            applicationmaster.exit_early,
            "shutdown hook called before final status was reported.")
        }
        if (!unregistered) {
          // we only want to unregister if we don't want the rm to retry
          if (finalstatus == finalapplicationstatus.succeeded || islastattempt) {
            unregister(finalstatus, finalmsg)
            cleanupstagingdir(new path(system.getenv("spark_yarn_staging_dir")))
          }
        }
      }
      if (isclustermode) {
        rundriver()
      } else {
        runexecutorlauncher()
      }
    } catch {
      case e: exception =>
        // catch everything else if not specifically handled
        logerror("uncaught exception: ", e)
        finish(finalapplicationstatus.failed,
          applicationmaster.exit_uncaught_exception,
          "uncaught exception: " + stringutils.stringifyexception(e))
    } finally {
      try {
        metricssystem.foreach { ms =>
          ms.report()
          ms.stop()
        }
      } catch {
        case e: exception =>
          logwarning("exception during stopping of the metric system: ", e)
      }
    }

    exitcode
  }

执行rundriver()方法。
userclassthread = startuserapplication() 启动了一个名为driver的线程,该线程中通过反射执行命令行中**–class指定的类(org.apache.spark.examples.sparkpi)中的main**函数,初始化sparkcontext。主线程唤醒后,向resourcemanager注册applicationmaster,步骤5;

private def rundriver(): unit = {
    addamipfilter(none, system.getenv(applicationconstants.application_web_proxy_base_env))
    userclassthread = startuserapplication()
    // this a bit hacky, but we need to wait until the spark.driver.port property has
    // been set by the thread executing the user class.
    loginfo("waiting for spark context initialization...")
    val totalwaittime = sparkconf.get(am_max_wait_time)
    try {
      val sc = threadutils.awaitresult(sparkcontextpromise.future,
        duration(totalwaittime, timeunit.milliseconds))
      if (sc != null) {
        val rpcenv = sc.env.rpcenv
        val userconf = sc.getconf
        val host = userconf.get(driver_host_address)
        val port = userconf.get(driver_port)
        registeram(host, port, userconf, sc.ui.map(_.weburl), appattemptid)
        val driverref = rpcenv.setupendpointref(
          rpcaddress(host, port),
          yarnschedulerbackend.endpoint_name)
        createallocator(driverref, userconf, rpcenv, appattemptid, distcacheconf)
      } else {
        // sanity check; should never happen in normal operation, since sc should only be null
        // if the user app did not create a sparkcontext.
        throw new illegalstateexception("user did not initialize spark context!")
      }
      resumedriver()
      userclassthread.join()
    } catch {
      case e: sparkexception if e.getcause().isinstanceof[timeoutexception] =>
        logerror(
          s"sparkcontext did not initialize after waiting for $totalwaittime ms. " +
           "please check earlier log output for errors. failing the application.")
        finish(finalapplicationstatus.failed,
          applicationmaster.exit_sc_not_inited,
          "timed out waiting for sparkcontext.")
    } finally {
      resumedriver()
    }
  }
private def startuserapplication(): thread = {
    loginfo("starting the user application in a separate thread")
    var userargs = args.userargs
    if (args.primarypyfile != null && args.primarypyfile.endswith(".py")) {
      // when running pyspark, the app is run using pythonrunner. the second argument is the list
      // of files to add to pythonpath, which client.scala already handles, so it's empty.
      userargs = seq(args.primarypyfile, "") ++ userargs
    }
    if (args.primaryrfile != null &&
        (args.primaryrfile.endswith(".r") || args.primaryrfile.endswith(".r"))) {
      // todo(davies): add r dependencies here
    }
    val mainmethod = userclassloader.loadclass(args.userclass)
      .getmethod("main", classof[array[string]])
    val userthread = new thread {
      override def run(): unit = {
        try {
          if (!modifier.isstatic(mainmethod.getmodifiers)) {
            logerror(s"could not find static main method in object ${args.userclass}")
            finish(finalapplicationstatus.failed, applicationmaster.exit_exception_user_class)
          } else {
            mainmethod.invoke(null, userargs.toarray)
            finish(finalapplicationstatus.succeeded, applicationmaster.exit_success)
            logdebug("done running user class")
          }
        } catch {
          case e: invocationtargetexception =>
            e.getcause match {
              case _: interruptedexception =>
                // reporter thread can interrupt to stop user class
              case sparkuserappexception(exitcode) =>
                val msg = s"user application exited with status $exitcode"
                logerror(msg)
                finish(finalapplicationstatus.failed, exitcode, msg)
              case cause: throwable =>
                logerror("user class threw exception: " + cause, cause)
                finish(finalapplicationstatus.failed,
                  applicationmaster.exit_exception_user_class,
                  "user class threw exception: " + stringutils.stringifyexception(cause))
            }
            sparkcontextpromise.tryfailure(e.getcause())
        } finally {
          // notify the thread waiting for the sparkcontext, in case the application did not
          // instantiate one. this will do nothing when the user code instantiates a sparkcontext
          // (with the correct master), or when the user code throws an exception (due to the
          // tryfailure above).
          sparkcontextpromise.trysuccess(null)
        }
      }
    }
    userthread.setcontextclassloader(userclassloader)
    userthread.setname("driver")
    userthread.start()
    userthread
  }

注册完成后,主线程处理yarn返回的资源createallocator(driverref, userconf, rpcenv, appattemptid, distcacheconf)。

 private def createallocator(
      driverref: rpcendpointref,
      _sparkconf: sparkconf,
      rpcenv: rpcenv,
      appattemptid: applicationattemptid,
      distcacheconf: sparkconf): unit = {
    // in client mode, the am may be restarting after delegation tokens have reached their ttl. so
    // always contact the driver to get the current set of valid tokens, so that local resources can
    // be initialized below.
    if (!isclustermode) {
      val tokens = driverref.asksync[array[byte]](retrievedelegationtokens)
      if (tokens != null) {
        sparkhadooputil.get.adddelegationtokens(tokens, _sparkconf)
      }
    }
    val appid = appattemptid.getapplicationid().tostring()
    val driverurl = rpcendpointaddress(driverref.address.host, driverref.address.port,
      coarsegrainedschedulerbackend.endpoint_name).tostring
    val localresources = preparelocalresources(distcacheconf)
    // before we initialize the allocator, let's log the information about how executors will
    // be run up front, to avoid printing this out for every single executor being launched.
    // use placeholders for information that changes such as executor ids.
    loginfo {
      val executormemory = _sparkconf.get(executor_memory).toint
      val executorcores = _sparkconf.get(executor_cores)
      val dummyrunner = new executorrunnable(none, yarnconf, _sparkconf, driverurl, "<executorid>",
        "<hostname>", executormemory, executorcores, appid, securitymgr, localresources,
        resourceprofile.default_resource_profile_id)
      dummyrunner.launchcontextdebuginfo()
    }
    allocator = client.createallocator(
      yarnconf,
      _sparkconf,
      appattemptid,
      driverurl,
      driverref,
      securitymgr,
      localresources)
    // initialize the am endpoint *after* the allocator has been initialized. this ensures
    // that when the driver sends an initial executor request (e.g. after an am restart),
    // the allocator is ready to service requests.
    rpcenv.setupendpoint("yarnam", new amendpoint(rpcenv, driverref))
    allocator.allocateresources()
    val ms = metricssystem.createmetricssystem(metricssysteminstances.application_master,
      sparkconf, securitymgr)
    val prefix = _sparkconf.get(yarn_metrics_namespace).getorelse(appid)
    ms.registersource(new applicationmastersource(prefix, allocator))
    // do not register static sources in this case as per spark-25277
    ms.start(false)
    metricssystem = some(ms)
    reporterthread = launchreporterthread()
  }

只看关键代码allocator.allocateresources(),处理分配的资源。

def allocateresources(): unit = synchronized {
    updateresourcerequests()
    val progressindicator = 0.1f
    // poll the resourcemanager. this doubles as a heartbeat if there are no pending container
    // requests.
    val allocateresponse = amclient.allocate(progressindicator)
    val allocatedcontainers = allocateresponse.getallocatedcontainers()
    allocatorblacklisttracker.setnumclusternodes(allocateresponse.getnumclusternodes)
    if (allocatedcontainers.size > 0) {
      logdebug(("allocated containers: %d. current executor count: %d. " +
        "launching executor count: %d. cluster resources: %s.")
        .format(
          allocatedcontainers.size,
          runningexecutors.size,
          numexecutorsstarting.get,
          allocateresponse.getavailableresources))
      handleallocatedcontainers(allocatedcontainers.asscala)
    }
   val completedcontainers = allocateresponse.getcompletedcontainersstatuses()
    if (completedcontainers.size > 0) {
      logdebug("completed %d containers".format(completedcontainers.size))
      processcompletedcontainers(completedcontainers.asscala)
      logdebug("finished processing %d completed containers. current running executor count: %d."
        .format(completedcontainers.size, runningexecutors.size))
    }
  }

如果分配的container数量大于0,调用** handleallocatedcontainers(allocatedcontainers.asscala)**

def handleallocatedcontainers(allocatedcontainers: seq[container]): unit = {
    val containerstouse = new arraybuffer[container](allocatedcontainers.size)
    // match incoming requests by host
    val remainingafterhostmatches = new arraybuffer[container]
    for (allocatedcontainer <- allocatedcontainers) {
      matchcontainertorequest(allocatedcontainer, allocatedcontainer.getnodeid.gethost,
        containerstouse, remainingafterhostmatches)
    }
    // match remaining by rack. because yarn's rackresolver swallows thread interrupts
    // (see spark-27094), which can cause this code to miss interrupts from the am, use
    // a separate thread to perform the operation.
    val remainingafterrackmatches = new arraybuffer[container]
    if (remainingafterhostmatches.nonempty) {
      var exception: option[throwable] = none
      val thread = new thread("spark-rack-resolver") {
        override def run(): unit = {
          try {
            for (allocatedcontainer <- remainingafterhostmatches) {
              val rack = resolver.resolve(allocatedcontainer.getnodeid.gethost)
              matchcontainertorequest(allocatedcontainer, rack, containerstouse,
                remainingafterrackmatches)
            }
          } catch {
            case e: throwable =>
              exception = some(e)
          }
        }
      }
      thread.setdaemon(true)
      thread.start()
      try {
        thread.join()
      } catch {
        case e: interruptedexception =>
          thread.interrupt()
          throw e
      }
      if (exception.isdefined) {
        throw exception.get
      }
    }
    // assign remaining that are neither node-local nor rack-local
    val remainingafteroffrackmatches = new arraybuffer[container]
    for (allocatedcontainer <- remainingafterrackmatches) {
      matchcontainertorequest(allocatedcontainer, any_host, containerstouse,
        remainingafteroffrackmatches)
    }
    if (remainingafteroffrackmatches.nonempty) {
      logdebug(s"releasing ${remainingafteroffrackmatches.size} unneeded containers that were " +
        s"allocated to us")
      for (container <- remainingafteroffrackmatches) {
        internalreleasecontainer(container)
      }
    }
    runallocatedcontainers(containerstouse)
    loginfo("received %d containers from yarn, launching executors on %d of them."
      .format(allocatedcontainers.size, containerstouse.size))
  }

这里会根据主机host,机架rack等信息队container进行分配。完成后启动container,runallocatedcontainers(containerstouse)。

  private val launcherpool = threadutils.newdaemoncachedthreadpool(
    "containerlauncher", sparkconf.get(container_launch_max_threads))

创建线程池launcherpool。

  /**
   * launches executors in the allocated containers.
   */
  private def runallocatedcontainers(containerstouse: arraybuffer[container]): unit = {
    for (container <- containerstouse) {
      executoridcounter += 1
      val executorhostname = container.getnodeid.gethost
      val containerid = container.getid
      val executorid = executoridcounter.tostring
      assert(container.getresource.getmemory >= resource.getmemory)
      loginfo(s"launching container $containerid on host $executorhostname " +
        s"for executor with id $executorid")
      def updateinternalstate(): unit = synchronized {
        runningexecutors.add(executorid)
        numexecutorsstarting.decrementandget()
        executoridtocontainer(executorid) = container
        containeridtoexecutorid(container.getid) = executorid
        val containerset = allocatedhosttocontainersmap.getorelseupdate(executorhostname,
          new hashset[containerid])
        containerset += containerid
        allocatedcontainertohostmap.put(containerid, executorhostname)
      }
      if (runningexecutors.size() < targetnumexecutors) {
        numexecutorsstarting.incrementandget()
        if (launchcontainers) {
          launcherpool.execute(() => {
            try {
              new executorrunnable(
                some(container),
                conf,
                sparkconf,
                driverurl,
                executorid,
                executorhostname,
                executormemory,
                executorcores,
                appattemptid.getapplicationid.tostring,
                securitymgr,
                localresources,
                resourceprofile.default_resource_profile_id // use until fully supported
              ).run()
              updateinternalstate()
            } catch {
              case e: throwable =>
                numexecutorsstarting.decrementandget()
                if (nonfatal(e)) {
                  logerror(s"failed to launch executor $executorid on container $containerid", e)
                  // assigned container should be released immediately
                  // to avoid unnecessary resource occupation.
                  amclient.releaseassignedcontainer(containerid)
                } else {
                  throw e
                }
            }
          })
        } else {
          // for test only
          updateinternalstate()
        }
      } else {
        loginfo(("skip launching executorrunnable as running executors count: %d " +
          "reached target executors count: %d.").format(
          runningexecutors.size, targetnumexecutors))
      }
    }
  }

查看executorrunnable 类,其中nmclient = nmclient.createnmclient(), nodemanager客户端,负责于nodemanager交互;其preparecommand() 方法拼接了一个进程启动命令,大体格式为:

bin/java -server org.apache.spark.executor.yarncoarsegrainedexecutorbackend ...

解析spark源码yarn-cluster模式任务提交

applicationmaster进程中的launcherpool线程池,会根据container的个数挨个启动线程executorrunnable,executorrunnable中的nmclient会将拼接好的jvm启动命令发送给相关的nodemanager,启动container进程,进程名为yarncoarsegrainedexecutorbackend。
executorrunnable完整代码:

private[yarn] class executorrunnable(
    container: option[container],
    conf: yarnconfiguration,
    sparkconf: sparkconf,
    masteraddress: string,
    executorid: string,
    hostname: string,
    executormemory: int,
    executorcores: int,
    appid: string,
    securitymgr: securitymanager,
    localresources: map[string, localresource],
    resourceprofileid: int) extends logging {
  var rpc: yarnrpc = yarnrpc.create(conf)
  var nmclient: nmclient = _
  def run(): unit = {
    logdebug("starting executor container")
    nmclient = nmclient.createnmclient()
    nmclient.init(conf)
    nmclient.start()
    startcontainer()
  }
  def launchcontextdebuginfo(): string = {
    val commands = preparecommand()
    val env = prepareenvironment()
    s"""
    |===============================================================================
    |default yarn executor launch context:
    |  env:
    |${utils.redact(sparkconf, env.toseq).map { case (k, v) => s"    $k -> $v\n" }.mkstring}
    |  command:
    |    ${utils.redactcommandlineargs(sparkconf, commands).mkstring(" \\ \n      ")}
    |
    |  resources:
    |${localresources.map { case (k, v) => s"    $k -> $v\n" }.mkstring}
    |===============================================================================""".stripmargin
  }
  def startcontainer(): java.util.map[string, bytebuffer] = {
    val ctx = records.newrecord(classof[containerlaunchcontext])
      .asinstanceof[containerlaunchcontext]
    val env = prepareenvironment().asjava
    ctx.setlocalresources(localresources.asjava)
    ctx.setenvironment(env)
    val credentials = usergroupinformation.getcurrentuser().getcredentials()
    val dob = new dataoutputbuffer()
    credentials.writetokenstoragetostream(dob)
    ctx.settokens(bytebuffer.wrap(dob.getdata()))
    val commands = preparecommand()
    ctx.setcommands(commands.asjava)
    ctx.setapplicationacls(
      yarnsparkhadooputil.getapplicationaclsforyarn(securitymgr).asjava)
    // if external shuffle service is enabled, register with the yarn shuffle service already
    // started on the nodemanager and, if authentication is enabled, provide it with our secret
    // key for fetching shuffle files later
    if (sparkconf.get(shuffle_service_enabled)) {
      val secretstring = securitymgr.getsecretkey()
      val secretbytes =
        if (secretstring != null) {
          // this conversion must match how the yarnshuffleservice decodes our secret
          javautils.stringtobytes(secretstring)
        } else {
          // authentication is not enabled, so just provide dummy metadata
          bytebuffer.allocate(0)
        }
      ctx.setservicedata(collections.singletonmap("spark_shuffle", secretbytes))
    }
    // send the start request to the containermanager
    try {
      nmclient.startcontainer(container.get, ctx)
    } catch {
      case ex: exception =>
        throw new sparkexception(s"exception while starting container ${container.get.getid}" +
          s" on host $hostname", ex)
    }
  }
  private def preparecommand(): list[string] = {
    // extra options for the jvm
    val javaopts = listbuffer[string]()
    // set the jvm memory
    val executormemorystring = executormemory + "m"
    javaopts += "-xmx" + executormemorystring
    // set extra java options for the executor, if defined
    sparkconf.get(executor_java_options).foreach { opts =>
      val subsopt = utils.substituteappnexecids(opts, appid, executorid)
      javaopts ++= utils.splitcommandstring(subsopt).map(yarnsparkhadooputil.escapeforshell)
    }
    // set the library path through a command prefix to append to the existing value of the
    // env variable.
    val prefixenv = sparkconf.get(executor_library_path).map { libpath =>
      client.createlibrarypathprefix(libpath, sparkconf)
    }
    javaopts += "-djava.io.tmpdir=" +
      new path(environment.pwd.$$(), yarnconfiguration.default_container_temp_dir)
    // certain configs need to be passed here because they are needed before the executor
    // registers with the scheduler and transfers the spark configs. since the executor backend
    // uses rpc to connect to the scheduler, the rpc settings are needed as well as the
    // authentication settings.
    sparkconf.getall
      .filter { case (k, v) => sparkconf.isexecutorstartupconf(k) }
      .foreach { case (k, v) => javaopts += yarnsparkhadooputil.escapeforshell(s"-d$k=$v") }
    // commenting it out for now - so that people can refer to the properties if required. remove
    // it once cpuset version is pushed out.
    // the context is, default gc for server class machines end up using all cores to do gc - hence
    // if there are multiple containers in same node, spark gc effects all other containers
    // performance (which can also be other spark containers)
    // instead of using this, rely on cpusets by yarn to enforce spark behaves 'properly' in
    // multi-tenant environments. not sure how default java gc behaves if it is limited to subset
    // of cores on a node.
    /*
        else {
          // if no java_opts specified, default to using -xx:+cmsincrementalmode
          // it might be possible that other modes/config is being done in
          // spark.executor.extrajavaoptions, so we don't want to mess with it.
          // in our expts, using (default) throughput collector has severe perf ramifications in
          // multi-tenant machines
          // the options are based on
          // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20when%20to%20use
          // %20the%20concurrent%20low%20pause%20collector|outline
          javaopts += "-xx:+useconcmarksweepgc"
          javaopts += "-xx:+cmsincrementalmode"
          javaopts += "-xx:+cmsincrementalpacing"
          javaopts += "-xx:cmsincrementaldutycyclemin=0"
          javaopts += "-xx:cmsincrementaldutycycle=10"
        }
    */
    // for log4j configuration to reference
    javaopts += ("-dspark.yarn.app.container.log.dir=" + applicationconstants.log_dir_expansion_var)
    val userclasspath = client.getuserclasspath(sparkconf).flatmap { uri =>
      val abspath =
        if (new file(uri.getpath()).isabsolute()) {
          client.getclusterpath(sparkconf, uri.getpath())
        } else {
          client.buildpath(environment.pwd.$(), uri.getpath())
        }
      seq("--user-class-path", "file:" + abspath)
    }.toseq
    yarnsparkhadooputil.addoutofmemoryerrorargument(javaopts)
    val commands = prefixenv ++
      seq(environment.java_home.$$() + "/bin/java", "-server") ++
      javaopts ++
      seq("org.apache.spark.executor.yarncoarsegrainedexecutorbackend",
        "--driver-url", masteraddress,
        "--executor-id", executorid,
        "--hostname", hostname,
        "--cores", executorcores.tostring,
        "--app-id", appid,
        "--resourceprofileid", resourceprofileid.tostring) ++
      userclasspath ++
      seq(
        s"1>${applicationconstants.log_dir_expansion_var}/stdout",
        s"2>${applicationconstants.log_dir_expansion_var}/stderr")
    // todo: it would be nicer to just make sure there are no null commands here
    commands.map(s => if (s == null) "null" else s).tolist
  }
  private def prepareenvironment(): hashmap[string, string] = {
    val env = new hashmap[string, string]()
    client.populateclasspath(null, conf, sparkconf, env, sparkconf.get(executor_class_path))
    system.getenv().asscala.filterkeys(_.startswith("spark"))
      .foreach { case (k, v) => env(k) = v }
    sparkconf.getexecutorenv.foreach { case (key, value) =>
      if (key == environment.classpath.name()) {
        // if the key of env variable is classpath, we assume it is a path and append it.
        // this is kept for backward compatibility and consistency with hadoop
        yarnsparkhadooputil.addpathtoenvironment(env, key, value)
      } else {
        // for other env variables, simply overwrite the value.
        env(key) = value
      }
    }
   env
  }
}

以上就是解析spark源码yarn-cluster模式任务提交的详细内容,更多关于spark源码解析的资料请关注其它相关文章!