从源码角度看 Spark 任务提交流程(上)
前言
最近阅读一下 Spark 的部分源码,在这一过程中通过源码结合之前所了解的相关内容,能够对之前知识进行完整的梳理也能更一进步了解 Spark 运行的底层逻辑,由于阅读源码是一个较为艰深的过程遂将其记录下来方便日后回顾,本篇我们来讲一下我们的 Spark on Yarn 在提交一个任务后俩个框架为我们做了些什么。
Spark 向 Yarn 提交任务的流程
在 Linux 上安装完 Spark 后都会用一个官方提供的 example 来测一测我们的 Spark 安装成功了,比如下面这段命令:
bin/spark-submit \
--master yarn \
./examples/jars/spark-examples_2.11-2.1.1.jar \
--class org.apache.spark.examples.SparkPi \
100
上面命令的具体参数含义就不说了,从我们执行的 shell 名 spark-submit
就可以看出我们是在做一个提交行为,而我们又是向 Yarn 提交的任务因此我们就会得到如下的这张流程图,而本文的目的就是通过源码来看看我们的 Spark 到底是如何一步一步执行图中的步骤的,为了有助于理解我们还是先用文字略微详细的描述一下下图。
- Spark Client 将我们需要执行的应用提交给 Yarn 集群中的 RM (resource manager)
- RM 收到后在一台 NM (node manager) 中启动 AM (application master)
- AM 在计算所需要的资源后向 RM 申请资源
- RM 返回可用的资源列表给 AM
- AM 根据资源列表在相应的 NM 上启动 Container 并在其中启动 Executor
- Executor 向 AM 注册自己
- AM 将任务分解并分发给各个 executor
一般来说至此我们就大致了解了整个流程,如果你对 Yarn 很了解那么整个流程就更加明了了,因为在 Yarn 上提交任务基本都是差不多的流程,但就如开头所说我们要从源码的角度来看看究竟为啥是这些流程,因此我们可以先记住这种图再读完接下来的内容后再回过头来看看是否相符。
Spark 提交任务源码
Spark Submit
我们本次使用的工具是 IDEA 其方便的搜索功能能够帮助我们顺畅的进行阅读,首先让我们回到开头的那个命令
bin/spark-submit \
--master yarn \
./examples/jars/spark-examples_2.11-2.1.1.jar \
--class org.apache.spark.examples.SparkPi \
100
我们执行的 shell 叫 spark-submit
当我们执行后其启动了一个进程,安装我们对于 Java 相关的知识其一定是执行了一个 main()
函数,于是我们打开这个 shell 看看其执行的是那个类,我们打开后就会发现有如下的这行代码
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "aaa@qq.com"
可以看到我们就找到了执行的主类org.apache.spark.deploy.SparkSubmit
,就是通过它来完成我们的应用提交的,接下来我们来看看这个类中的 main()
函数做了那些事情,代码如下:
def main(args: Array[String]): Unit = {
// 封装 Spark 提交参数
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
// 执行提交
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
比较重要的两行代码我已经注释出来了,可以看到主要是做了两件事情,1)将我们提交的参数封装成一个对象;2)将参数封装成对象后执行提交,我们先来看第一步,代码如下:
private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
extends SparkSubmitArgumentsParser {
var master: String = null
var deployMode: String = null
var executorMemory: String = null
var executorCores: String = null
var totalExecutorCores: String = null
.....
var mainClass: String = null
var primaryResource: String = null
var name: String = null
var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
var jars: String = null
......
进到 SparkSubmitArguments
这个类立马就会看到上述代码内容 (有省略),我们很快就能发现有一些变量是我们在命令行中输入的比如 master -> --master, mainClass-> --Class
可见我们在 spark-submit
后添加的参数都被封装进了这个对象中,知道了我们的参数是如何封装的之后我们就可以进入到提交了我们进入 submit
来看看其做了那些事情代码如下 (方便阅读有部分省略):
private def submit(args: SparkSubmitArguments): Unit = {
// 准备提交环境
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
if (args.proxyUser != null) {
...
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
// 根据提交环境运行 main 函数
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
...
} else {
throw e
}
}
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
if (args.isStandaloneCluster && args.useRest) {
try {
...
// 执行 doRunMain()
doRunMain()
} catch {
...
}
// In all other modes, just run the main class as prepared
} else {
// 执行 doRunMain()
doRunMain()
}
}
同样比较关键的部分用注释标注出来了,和 submit
中的 main()
函数一样,这里比较重要的依然是上面标注的两行,1)准备运行环境;2)根据运行环境来运行 main()
这里的主函数当然不是我们之前的那个但具体是什么我们稍后会看到,这次我们调换一下顺序先来看看 runMain()
函数做了那些事情,我们进到函数中就会看到如下代码 (为了方便阅读进行了省略):
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit = {
// scalastyle:off println
if (verbose) {
...
}
// scalastyle:on println
val loader =
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
// 加载 jar 包
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
// 加载系统设置参数
for ((key, value) <- sysProps) {
System.setProperty(key, value)
}
var mainClass: Class[_] = null
try {
// 通过反射获得主类
mainClass = Utils.classForName(childMainClass)
} catch {
...
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
// 获得主方法
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
// 判断主方法是不是静态 static 的
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
@tailrec
def findCause(t: Throwable): Throwable = t match {
...
}
try {
// 执行主方法
mainMethod.invoke(null, childArgs.toArray)
} catch {
...
}
}
}
同样代码中重要的部分已经用注释标注出来了,我们可以看到首先加载了我们传入的 jar 包然后加载了我们传入的系统参数设置,然后关键的来了 mainClass = Utils.classForName(childMainClass)
通过这一行我们就知道了其通过反射获得了 childMainClass
这个类,而接下来注释标注的两行代码分别从这个主类主获得了主方法然后执行该方法,至此我们知道了 runMain()
这个函数的主要作用就是执行 childMainClass
这个类的主方法,而这个类是什么我们就要到 prepareSubmitEnvironment
中去看看了(以下只截取了关键部分)
private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
: (Seq[String], Seq[String], Map[String, String], String) = {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sysProps = new HashMap[String, String]()
var childMainClass = ""
// Set the cluster manager
// 判断master 即我们在命令行中指定的 --master 参数
val clusterManager: Int = args.master match {
case "yarn" => YARN
case "yarn-client" | "yarn-cluster" =>
printWarning(s"Master ${args.master} is deprecated since 2.0." +
" Please use master \"yarn\" with specified deploy mode instead.")
YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("local") => LOCAL
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
-1
}
if (isYarnCluster) {
// 可以清楚的看到在 Yarn 集群模式下我们的 childMainClass 是以下这个 yarn Client 类
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.isPython) {
childArgs += ("--primary-py-file", args.primaryResource)
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
val mainFile = new Path(args.primaryResource).getName
childArgs += ("--primary-r-file", mainFile)
childArgs += ("--class", "org.apache.spark.deploy.RRunner")
} else {
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
// 我们在命令行指定的 --jar
childArgs += ("--jar", args.primaryResource)
}
// 我们在命令行指定的 --calss
childArgs += ("--class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
}
}
从上面代码中注释的地方可以清楚的看到在准备环境时我们 Spark 将我们在命令行传入的参数封装成了字符串数组,而我们之前执行的 childMainClass
就是这里的 org.apache.spark.deploy.yarn.Client
,就是说接下来我们就要去看看这个类的 main()
方法做了那些事情,需要注意的是虽然这里运行的是 main()
方法但其实并不像我们刚开始的命令行那样会启动一个新的进程其只是在我们的 submit
进程中被调用了。
Yarn Client
根据上一节所述我们在 runMain()
方法中执行的 main()
方法是 org.apache.spark.deploy.yarn.Client
的方法因此我们直接搜索一下便可看到以下代码:
def main(argStrings: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
logWarning("WARNING: This client is deprecated and will be removed in a " +
"future version of Spark. Use ./bin/spark-submit with \"--master yarn\"")
}
System.setProperty("SPARK_YARN_MODE", "true")
// spark 配置对象
val sparkConf = new SparkConf
sparkConf.remove("spark.jars")
sparkConf.remove("spark.files")
// 设置 Client 客户端参数
val args = new ClientArguments(argStrings)
// 新建客户端对象并 run()
new Client(args, sparkConf).run()
}
OK,从注释就可以发现依然是我们熟悉的两步走,1)封装我们的参数到一个对象中;2)以我们传入的参数为参数创建一个 client 对象并运行,首先来看我们的参数封装:
private[spark] class ClientArguments(args: Array[String]) {
var userJar: String = null
var userClass: String = null
var primaryPyFile: String = null
var primaryRFile: String = null
var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
parseArgs(args.toList)
private def parseArgs(inputArgs: List[String]): Unit = {
...
}
}
这里又看到了我们熟悉的 Jar
和 Class
那不用多说,我们命令行的传入的那些参数在这里被封装成了 Client 对象的参数,接下来看下 run()
做了那些事情
def run(): Unit = {
// 提交应用
this.appId = submitApplication()
...
}
进入 run()
方法第一行就是提交应用,而其返回了一个 appId,熟悉的读者此时就能意识到这个 appId 就是我们在 UI 界面上看到的那个,我们接着来看 submitApplication()
做了什么
def submitApplication(): ApplicationId = {
var appId: ApplicationId = null
try {
launcherBackend.connect()
// Setup the credentials before doing anything else,
// so we have don't have issues at any point.
setupCredentials()
yarnClient.init(yarnConf)
yarnClient.start()
logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
// Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()
reportLauncherState(SparkAppHandle.State.SUBMITTED)
launcherBackend.setAppId(appId.toString)
new CallerContext("CLIENT", Option(appId.toString)).setCurrentContext()
// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)
// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)
...
}
从官方的注释可以很清楚的看出这里是在从 RM 中获得我们提交的 APP 并准备 AM 的环境这之中最关键的就是createContainerLaunchContext()
这个方法做的事情来看一下(方便阅读有部分省略):
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
...
// Add Xmx for AM memory
javaOpts += "-Xmx" + amMemory + "m"
val tmpDir = new Path(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR
)
javaOpts += "-Djava.io.tmpdir=" + tmpDir
...
// Include driver-specific java options if we are launching a driver
...
// For log4j configuration to reference
...
val userClass =
if (isClusterMode) {
Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
} else {
Nil
}
val userJar =
if (args.userJar != null) {
Seq("--jar", args.userJar)
} else {
Nil
}
val primaryPyFile =
if (isClusterMode && args.primaryPyFile != null) {
Seq("--primary-py-file", new Path(args.primaryPyFile).getName())
} else {
Nil
}
val primaryRFile =
if (args.primaryRFile != null) {
Seq("--primary-r-file", args.primaryRFile)
} else {
Nil
}
val amClass =
if (isClusterMode) {
// 如果是集群模式
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
// 客户端模式
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs
}
val userArgs = args.userArgs.flatMap { arg =>
Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
}
val amArgs =
Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++
userArgs ++ Seq(
"--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
// Command for the ApplicationMaster
val commands = prefixEnv ++ Seq(
YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server"
) ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
...
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
amContainer.setCommands(printableCommands.asJava)
...
amContainer
}
上面的这段程序中中关键的就是下面这句,
YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server")
++ javaOpts ++ amArgs
我们可以很清楚的看到这里是在拼接一个 java
指令,javaOpts
是在设置 JVM 的参数,而 amArgs
则在将我们从命令行传入的那些参数例如 Ja,Class
等也拼接起来,而我们所启动的主类则可以从下面的代码中看到
if (isClusterMode) {
// 如果是集群模式
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
// 客户端模式
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
在集群模式和非集群模式下我们启动的是不同的主类(事实上是一回事),接下来我们就得到了一条形如command = bin/java org.apache.spark.deploy.yarn.ApplicationMaster
的指令(省略参数部分)用于启动我们的 AM,至此我们知道了我们提交应用实际上启动了一个 Yarn Client
并发送了一条指令 给 RM 让其启动 AM,而 AM 启动之后就会和 RM 交互并启动 container 来真正执行我们想要的计算了,这就留到下篇继续分析了,完。