Spark源码系列(五)Spark Submit任务提交
Spark源码系列:Spark Submit任务提交
前面几篇文章讲的是DAGScheduler,分析的是spark任务提交后的Stage划分。一开始没有想到按照整体任务提交的流程去写系列源码文章,所以还是写博客经验有所欠缺呀。那么从这篇文章开始我们从Spark任务提交开始,研究Spark内部是如何运行的,Spark任务是如何从开始运行到结束的。
Spark应用程序在集群上以独立的进程运行,整个的任务执行过程如下:
- 用户提交任务,初始化SparkContext对象后,SparkContext负责协调Spark任务在cluster上的运行
- SparkContext需要连接到集群管理器Cluster Manager,申请资源,注册Application。在生产环境中,集群管理器通常是指Yarn。集群管理器负责在应用程序之间分配资源
- 连接到Cluster Manager后,根据申请到的资源,在集群中的Worker节点上创建Executor
- Executor创建后,反馈信息给Driver
- SparkContext初始化过程中创建并启动DAGScheduler将用户提交的任务进行Stage拆分最后转化为Task任务,完成Task任务的最佳计算位置后,将Task任务发送给指定Executor,进行任务计算执行
- 将Task计算结果返回Driver,Spark任务计算完毕,随后关闭Spark任务等。
前面讲了大概的Spark任务整体流程,那么下面我们将从Spark Submit开始讲起,一步步深入去看下任务提交的整体流程。
客户端任务提交
最开始自然是客户端提交用户自己编写的Spark程序,使用spark-submit脚本去提交用户的程序。
在提交Spark任务时,使用$SPARK_HOME/bin目录下的spark-submit脚本去提交。
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deply-mode> \
--conf <key>=<value> \
... # other options
<application-jars> \
[application-arguments]
- –class表示任务的入口
- –master表示master地址,这是集群中master的URL地址(比如说spark://10.142.97.4:7077)
- –deploy-mode表示部署模式,是否将用户的Driver程序部署到集群的Worker节点,或者将本地作为外部client客户端模式。在生产环境中,我们通常选用cluster模式,并且都是用Yarn来做资源管理器
- –conf表示spark配置,k-v形式
- application-jar:用户程序的Jar包路径
- application-arguments表示用户程序所需要的参数
举个更为实际的例子:
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://10.142.97.4:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 2G \
--total-executors-cores 5 \
/path/examples.jar \
<program arguments>
- class指定程序入口
- master指定master URL地址
- deploy mode指定程序部署模式为cluster集群模式
- supervise表示在程序执行失败后,重新启动application
- executor-memory 2G表示每个executor的内存为2G
- total-executor-cores 5表示executor的cpu总核数为5
- /path/examples.jar是程序的jar包
- 表示程序所需要的参数
这些脚本会将这些参数代入到spark-submit脚本中去执行,具体来看一下spark/bin/spark-submit脚本内容:
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "aaa@qq.com"
- 首先检查SPARK_HOME变量是否为空。如果为空则执行then后面的程序,即执行当前目录下的find-spark-home脚本文件,设置SPARK_HOME值
- 脚本最后调用exec执行"${SPARK_HOME}"/bin/spark-class 调用class为:org.apache.spark.deploy.SparkSubmit,后面的"aaa@qq.com"是脚本执行的所有参数。实际上是调用了spark-class脚本最后进行任务的提交
继续看/spark/bin/spark-class脚本代码内容:
# -z检查设置SPARK_HOME的值
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
# 执行load-spark-env.sh脚本文件,主要目的在于加载设定一些变量值。设定spark-env.sh中的变量值到环境变量中
. "${SPARK_HOME}"/bin/load-spark-env.sh
# 检查设定java环境值
# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ "$(command -v java)" ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
# 设置关联class文件
# Find Spark jars.
if [ -d "${SPARK_HOME}/jars" ]; then
SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi
if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
echo "You need to build Spark with the target \"package\" before running this program." 1>&2
exit 1
else
LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi
# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi
# For tests
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
fi
# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
#
# The exit code of the launcher is appended to the output, so the parent shell removes it from the
# command array and checks the value to see if the launcher succeeded.
# 执行类文件org.apache.spark.launcher.Main,返回解析后的参数
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "aaa@qq.com"
printf "%d\0" $?
}
# Turn off posix mode since it does not allow process substitution
# 将build_command方法解析后的参数赋给CMD
set +o posix
CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
done < <(build_command "aaa@qq.com")
COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}
# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
# the code that parses the output of the launcher to get confused. In those cases, check if the
# exit code is an integer, and if it's not, handle it as a special error case.
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
echo "${CMD[@]}" | head -n-1 1>&2
exit 1
fi
if [ $LAUNCHER_EXIT_CODE != 0 ]; then
exit $LAUNCHER_EXIT_CODE
fi
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"
spark-class脚本大致逻辑总结一下:检查SPARK_HOME环境变量;执行load-spark-env.sh文件;
检查设定java的执行路径变量值;find spark jars,设定一些引用相关类的位置变量;执行类文件org.apache.spark.launcher.Main,返回解析后的参数给CMD;判断解析参数是否正确;最后执行SparkSubmit类。
通过spark-class脚本,最终执行的命令中,制定了程序的入口为org.apache.spark.deploy.SparkSubmit。
源码解析
org.apache.spark.deploy.SparkSubmit
最终执行的命令中,指定了程序的入口为org.apache.spark.deploy.SparkSubmit,具体进一步查看主函数
def main(args: Array[String]): Unit = {
// SparkSubmitArguments继承了SparkSubmitArgumentsParser,对提交参数进行解析
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
// 对appArgs的action进行模式匹配
appArgs.action match {
// 如果是SUBMIT,则调用submit
case SparkSubmitAction.SUBMIT => submit(appArgs)
// 如果是KILL,则调用kill
case SparkSubmitAction.KILL => kill(appArgs)
// 如果是REQUEST_STATUS,则调用requestStatus
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
从main方法中可以看出,根据解析后的参数中的action进行模式匹配,对应到实际的操作中。由于当前是submit操作,那么调用submit方法。
submit方法中,调用了prepareSubmitEnvironment(args),用于准备提交环境。
private def submit(args: SparkSubmitArguments): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
...
}
prepareSubmitEnvironment方法中设置一些基本参数。其中主要是根据args中的master和deploy-mode来设置对应的clusterManager和部署模式
private[deploy] def prepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], Map[String, String], String) = {
// Return values
// 要返回的四个参数
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sysProps = new HashMap[String, String]()
var childMainClass = ""
// 根据脚本中配置的master参数进行模式匹配出clusterManager
// Set the cluster manager
val clusterManager: Int = args.master match {
case "yarn" => YARN
case "yarn-client" | "yarn-cluster" =>
printWarning(s"Master ${args.master} is deprecated since 2.0." +
" Please use master \"yarn\" with specified deploy mode instead.")
YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("local") => LOCAL
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
-1
}
// 根据deployMode参数去模式匹配出部署模式
// Set the deploy mode; default is client mode
var deployMode: Int = args.deployMode match {
case "client" | null => CLIENT
case "cluster" => CLUSTER
case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1
}
// Because the deprecated way of specifying "yarn-cluster" and "yarn-client" encapsulate both
// the master and deploy mode, we have some logic to infer the master and deploy mode
// from each other if only one is specified, or exit early if they are at odds.
if (clusterManager == YARN) {
(args.master, args.deployMode) match {
case ("yarn-cluster", null) =>
deployMode = CLUSTER
args.master = "yarn"
case ("yarn-cluster", "client") =>
printErrorAndExit("Client deploy mode is not compatible with master \"yarn-cluster\"")
case ("yarn-client", "cluster") =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"yarn-client\"")
case (_, mode) =>
args.master = "yarn"
}
}
...
// 由于我们生产环境基本上都是用yarn-cluster模式,所有我们重点沿着这个部署模式进行源码分析
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.isPython) {
childArgs += ("--primary-py-file", args.primaryResource)
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
val mainFile = new Path(args.primaryResource).getName
childArgs += ("--primary-r-file", mainFile)
childArgs += ("--class", "org.apache.spark.deploy.RRunner")
} else {
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs += ("--jar", args.primaryResource)
}
// --class指定的是AppMaster里启动的Driver,也就是我们应用程序的入口main类
childArgs += ("--class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
}
standalone模式和cluster模式区别很明显,就是spark任务是运行在spark独立集群还是yarn统一集群上的区别,这里不做过多的说明。我们就yarn-client和yarn-cluster做个区别说明,yarn-client的Driver运行在客户端本地,而AppMaster运行在yarn的一个节点上,它们之间进行rpc通信,AppMaster只负责资源申请和释放,然后等待Driver的完成;而yarn-cluster模式下的Driver运行在AppMaster所在的container中,Driver和AppMaster是同一个进程的两个不同线程,它们之间也会进行通信,AppMaster同样等待Driver的完成,从而释放资源。
在yarn-client模式里,优先运行的是Driver(我们写的应用程序就是入口),然后初始化SparkContext时,会作为client端向yarn申请AppMaster资源,当AppMaster运行后,它会向yarn注册自己并申请Executor资源,之后由本地Driver与其通信控制任务运行,而AppMaster则时刻监控Driver的运行情况。如果Driver端完成或者意外退出,AppMaster会释放资源并注销自己,所以在yarn-client模式下,如果运行spark-submit的程序退出了,整个任务也会退出。
在yarn-cluster模式中,本地进程则仅仅是一个client端,它会优先向yarn申请AppMaster资源用于执行AppMaster,在运行AppMaster时通过reflect启动Driver,也就是我们的应用程序,在SparkContext初始化成功后,再向yarn注册自己并申请Executor资源,此时Driver与AppMaster运行在同一个container中,但是是两个不同的线程,当Driver运行完毕,AppMaster会释放资源并注销自己。所以在该模式下,本地进程仅作为一个client提出submit任务的申请,如果结束了该进程,这个Spark任务并不会受到影响,因为Driver进程是在远程执行的。
从代码中可以看到,cluster模式下运行的是org.apache.spark.deploy.yarn.Client。在prepareSubmitEnvironment方法中,主要负责解析用户参数,设置环境变量env,处理python/R等依赖,然后针对不同的部署模式,匹配不同的运行主类,比如:yarn-client>args.mainClass,yarn-cluster>org.apache.spark.yarn.Client.
对于yarn-cluster模式,执行的是org.apache.spark.deploy.yarn.Client#Main方法
def main(argStrings: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
logWarning("WARNING: This client is deprecated and will be removed in a " +
"future version of Spark. Use ./bin/spark-submit with \"--master yarn\"")
}
// Set an env variable indicating we are running in YARN mode.
// Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf
// SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
// so remove them from sparkConf here for yarn mode.
sparkConf.remove("spark.jars")
sparkConf.remove("spark.files")
val args = new ClientArguments(argStrings)
new Client(args, sparkConf).run()
}
在Client伴生对象中构建了Client类的对象,然后调用了Client.run方法。
/**
* Submit an application to the ResourceManager.
* If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
* reporting the application's status until the application has exited for any reason.
* Otherwise, the client process will exit after submission.
* If the application finishes with a failed, killed, or undefined status,
* throw an appropriate SparkException.
*/
def run(): Unit = {
// 重点是这一行代码
this.appId = submitApplication()
if (!launcherBackend.isConnected() && fireAndForget) {
val report = getApplicationReport(appId)
val state = report.getYarnApplicationState
logInfo(s"Application report for $appId (state: $state)")
logInfo(formatReportDetails(report))
if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
throw new SparkException(s"Application $appId finished with status: $state")
}
} else {
val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId)
if (yarnApplicationState == YarnApplicationState.FAILED ||
finalApplicationStatus == FinalApplicationStatus.FAILED) {
throw new SparkException(s"Application $appId finished with failed status")
}
if (yarnApplicationState == YarnApplicationState.KILLED ||
finalApplicationStatus == FinalApplicationStatus.KILLED) {
throw new SparkException(s"Application $appId is killed")
}
if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
throw new SparkException(s"The final status of application $appId is undefined")
}
}
}
重点就是submitApplication()方法,提交任务到yarn,拿到提交完的appID后,监控app的状态。
def submitApplication(): ApplicationId = {
var appId: ApplicationId = null
try {
launcherBackend.connect()
// Setup the credentials before doing anything else,
// so we have don't have issues at any point.
// 获取提交用户的Credentials,用于后面获取delegationToken
setupCredentials()
yarnClient.init(yarnConf)
yarnClient.start()
logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
// Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
// 获取appID
appId = newAppResponse.getApplicationId()
new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
Option(appId.toString)).setCurrentContext()
// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)
// Set up the appropriate contexts to launch our AM
// 创建AppMaster运行的context,为其准备运行环境,java options,以及需要运行的java命令,AppMaster通过该命令在yarn节点上启动
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)
// Finally, submit and monitor the application
logInfo(s"Submitting application $appId to ResourceManager")
// 通过yarnClient提交app
yarnClient.submitApplication(appContext)
launcherBackend.setAppId(appId.toString)
reportLauncherState(SparkAppHandle.State.SUBMITTED)
appId
} catch {
case e: Throwable =>
if (appId != null) {
cleanupStagingDir(appId)
}
throw e
}
}
在submitApplication中完成了app申请,AppMaster context创建,最后完成了任务的提交,对于yarn-cluster模式而言,任务提交后本地进程只是一个client,Driver运行在与AppMaster同一个container中。
createContainerLaunchContext方法的功能是创建AppMaster container context,这里面会指定AppMaster里面是否运行Driver。
val amClass =
if (isClusterMode) {
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
这里只把里面最核心的几行代码贴出来。通过判断是否为cluster模式,设置AppMaster运行的主类。ExecutorLauncher只是ApplicationMaster的一个wrapper。
yarn-client模式的具体代码细节我们这里不做描述,毕竟生产环境中都是yarn-cluster部署模式。
总结
大致Spark-Submit过程就是这样:
- 应用程序通过spark-submit进行提交并设置一些参数
- spark-submit调用spark-class.sh
- 最后入口定位到SparkSubmit方法
- 由于生产环境基本都是统一提交到Yarn集群,所以源码中会匹配提交模式,最后定位到yarn-cluster部署模式。
- Client创建Yarn Client,然后向Yarn发送执行指令:bin/java ApplicationMaster。
- Yarn收到指令后会在指定的NodeManager中启动ApplicationMaster。
- ApplicationMaster启动Driver线程,执行用户的作业。