欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

从源码角度看 Spark 任务提交流程(上)

程序员文章站 2024-03-24 09:54:46
...

前言

 最近阅读一下 Spark 的部分源码,在这一过程中通过源码结合之前所了解的相关内容,能够对之前知识进行完整的梳理也能更一进步了解 Spark 运行的底层逻辑,由于阅读源码是一个较为艰深的过程遂将其记录下来方便日后回顾,本篇我们来讲一下我们的 Spark on Yarn 在提交一个任务后俩个框架为我们做了些什么。

Spark 向 Yarn 提交任务的流程

 在 Linux 上安装完 Spark 后都会用一个官方提供的 example 来测一测我们的 Spark 安装成功了,比如下面这段命令:

bin/spark-submit \
--master yarn \
./examples/jars/spark-examples_2.11-2.1.1.jar \
--class org.apache.spark.examples.SparkPi \
100

 上面命令的具体参数含义就不说了,从我们执行的 shell 名 spark-submit 就可以看出我们是在做一个提交行为,而我们又是向 Yarn 提交的任务因此我们就会得到如下的这张流程图,而本文的目的就是通过源码来看看我们的 Spark 到底是如何一步一步执行图中的步骤的,为了有助于理解我们还是先用文字略微详细的描述一下下图。

从源码角度看 Spark 任务提交流程(上)

  1. Spark Client 将我们需要执行的应用提交给 Yarn 集群中的 RM (resource manager)
  2. RM 收到后在一台 NM (node manager) 中启动 AM (application master)
  3. AM 在计算所需要的资源后向 RM 申请资源
  4. RM 返回可用的资源列表给 AM
  5. AM 根据资源列表在相应的 NM 上启动 Container 并在其中启动 Executor
  6. Executor 向 AM 注册自己
  7. AM 将任务分解并分发给各个 executor

一般来说至此我们就大致了解了整个流程,如果你对 Yarn 很了解那么整个流程就更加明了了,因为在 Yarn 上提交任务基本都是差不多的流程,但就如开头所说我们要从源码的角度来看看究竟为啥是这些流程,因此我们可以先记住这种图再读完接下来的内容后再回过头来看看是否相符。

Spark 提交任务源码

Spark Submit

 我们本次使用的工具是 IDEA 其方便的搜索功能能够帮助我们顺畅的进行阅读,首先让我们回到开头的那个命令

bin/spark-submit \
--master yarn \
./examples/jars/spark-examples_2.11-2.1.1.jar \
--class org.apache.spark.examples.SparkPi \
100

我们执行的 shell 叫 spark-submit 当我们执行后其启动了一个进程,安装我们对于 Java 相关的知识其一定是执行了一个 main() 函数,于是我们打开这个 shell 看看其执行的是那个类,我们打开后就会发现有如下的这行代码

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "aaa@qq.com"

可以看到我们就找到了执行的主类org.apache.spark.deploy.SparkSubmit,就是通过它来完成我们的应用提交的,接下来我们来看看这个类中的 main() 函数做了那些事情,代码如下:

def main(args: Array[String]): Unit = {
    // 封装 Spark 提交参数
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
        // scalastyle:off println
        printStream.println(appArgs)
        // scalastyle:on println
    }
    appArgs.action match {
        // 执行提交
        case SparkSubmitAction.SUBMIT => submit(appArgs)
        case SparkSubmitAction.KILL => kill(appArgs)
        case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
}

比较重要的两行代码我已经注释出来了,可以看到主要是做了两件事情,1)将我们提交的参数封装成一个对象;2)将参数封装成对象后执行提交,我们先来看第一步,代码如下:

private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
  extends SparkSubmitArgumentsParser {
  var master: String = null
  var deployMode: String = null
  var executorMemory: String = null
  var executorCores: String = null
  var totalExecutorCores: String = null
  .....
  var mainClass: String = null
  var primaryResource: String = null
  var name: String = null
  var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
  var jars: String = null
  ......

进到 SparkSubmitArguments 这个类立马就会看到上述代码内容 (有省略),我们很快就能发现有一些变量是我们在命令行中输入的比如 master -> --master, mainClass-> --Class 可见我们在 spark-submit 后添加的参数都被封装进了这个对象中,知道了我们的参数是如何封装的之后我们就可以进入到提交了我们进入 submit 来看看其做了那些事情代码如下 (方便阅读有部分省略):

private def submit(args: SparkSubmitArguments): Unit = {
    // 准备提交环境
    val (childArgs, childClasspath, sysProps, childMainClass) = 			    			 												prepareSubmitEnvironment(args)
    def doRunMain(): Unit = {
        if (args.proxyUser != null) {
            ...
            try {
                proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
                    override def run(): Unit = {
                        // 根据提交环境运行 main 函数
                        runMain(childArgs, childClasspath, sysProps, childMainClass, 										args.verbose)
                    }
                })
            } catch {
                ...
                } else {
                    throw e
                }
            }
        } else {
            runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
        }
    }
    if (args.isStandaloneCluster && args.useRest) {
        try {
			...
            // 执行 doRunMain()
            doRunMain()
        } catch {
			...
        }
        // In all other modes, just run the main class as prepared
    } else {
        // 执行 doRunMain()
        doRunMain()
    }
}

同样比较关键的部分用注释标注出来了,和 submit 中的 main() 函数一样,这里比较重要的依然是上面标注的两行,1)准备运行环境;2)根据运行环境来运行 main() 这里的主函数当然不是我们之前的那个但具体是什么我们稍后会看到,这次我们调换一下顺序先来看看 runMain() 函数做了那些事情,我们进到函数中就会看到如下代码 (为了方便阅读进行了省略):

private def runMain(
    childArgs: Seq[String],
    childClasspath: Seq[String],
    sysProps: Map[String, String],
    childMainClass: String,
    verbose: Boolean): Unit = {
    // scalastyle:off println
    if (verbose) {
        ...
    }
    // scalastyle:on println
    val loader =
    if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
        new ChildFirstURLClassLoader(new Array[URL](0),
                                     Thread.currentThread.getContextClassLoader)
    } else {
        new MutableURLClassLoader(new Array[URL](0),
                                  Thread.currentThread.getContextClassLoader)
    }
    Thread.currentThread.setContextClassLoader(loader)
	// 加载 jar 包
    for (jar <- childClasspath) {
        addJarToClasspath(jar, loader)
    }
	// 加载系统设置参数
    for ((key, value) <- sysProps) {
        System.setProperty(key, value)
    }

    var mainClass: Class[_] = null
    try {
        // 通过反射获得主类
        mainClass = Utils.classForName(childMainClass)
    } catch {
        ...
        }
        System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
    }
    if (classOf[scala.App].isAssignableFrom(mainClass)) {
        printWarning("Subclasses of scala.App may not work correctly. Use a main() method 						instead.")
    }
	// 获得主方法
    val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
	// 判断主方法是不是静态 static 的
    if (!Modifier.isStatic(mainMethod.getModifiers)) {
        throw new IllegalStateException("The main method in the given main class must be 										 static")
    }
    @tailrec
    def findCause(t: Throwable): Throwable = t match {
		...
    }
    try {
        // 执行主方法
        mainMethod.invoke(null, childArgs.toArray)
    } catch {
        ...
       }
    }
}

同样代码中重要的部分已经用注释标注出来了,我们可以看到首先加载了我们传入的 jar 包然后加载了我们传入的系统参数设置,然后关键的来了 mainClass = Utils.classForName(childMainClass) 通过这一行我们就知道了其通过反射获得了 childMainClass 这个类,而接下来注释标注的两行代码分别从这个主类主获得了主方法然后执行该方法,至此我们知道了 runMain() 这个函数的主要作用就是执行 childMainClass 这个类的主方法,而这个类是什么我们就要到 prepareSubmitEnvironment 中去看看了(以下只截取了关键部分)

private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
: (Seq[String], Seq[String], Map[String, String], String) = {
    // Return values
    val childArgs = new ArrayBuffer[String]()
    val childClasspath = new ArrayBuffer[String]()
    val sysProps = new HashMap[String, String]()
    var childMainClass = ""

    // Set the cluster manager
    // 判断master 即我们在命令行中指定的 --master 参数
    val clusterManager: Int = args.master match {
        case "yarn" => YARN
        case "yarn-client" | "yarn-cluster" =>
        printWarning(s"Master ${args.master} is deprecated since 2.0." +
                     " Please use master \"yarn\" with specified deploy mode instead.")
        YARN
        case m if m.startsWith("spark") => STANDALONE
        case m if m.startsWith("mesos") => MESOS
        case m if m.startsWith("local") => LOCAL
        case _ =>
        printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
        -1
    }
    if (isYarnCluster) {
      // 可以清楚的看到在 Yarn 集群模式下我们的 childMainClass 是以下这个 yarn Client 类
      childMainClass = "org.apache.spark.deploy.yarn.Client"
      if (args.isPython) {
        childArgs += ("--primary-py-file", args.primaryResource)
        childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
      } else if (args.isR) {
        val mainFile = new Path(args.primaryResource).getName
        childArgs += ("--primary-r-file", mainFile)
        childArgs += ("--class", "org.apache.spark.deploy.RRunner")
      } else {
        if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
          // 我们在命令行指定的 --jar
          childArgs += ("--jar", args.primaryResource)
        }
        // 我们在命令行指定的 --calss
        childArgs += ("--class", args.mainClass)
      }
      if (args.childArgs != null) {
        args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
      }
    }
}

从上面代码中注释的地方可以清楚的看到在准备环境时我们 Spark 将我们在命令行传入的参数封装成了字符串数组,而我们之前执行的 childMainClass 就是这里的 org.apache.spark.deploy.yarn.Client,就是说接下来我们就要去看看这个类的 main() 方法做了那些事情,需要注意的是虽然这里运行的是 main() 方法但其实并不像我们刚开始的命令行那样会启动一个新的进程其只是在我们的 submit 进程中被调用了。

Yarn Client

 根据上一节所述我们在 runMain() 方法中执行的 main() 方法是 org.apache.spark.deploy.yarn.Client 的方法因此我们直接搜索一下便可看到以下代码:

def main(argStrings: Array[String]) {
    if (!sys.props.contains("SPARK_SUBMIT")) {
      logWarning("WARNING: This client is deprecated and will be removed in a " +
        "future version of Spark. Use ./bin/spark-submit with \"--master yarn\"")
    }
    System.setProperty("SPARK_YARN_MODE", "true")
    // spark 配置对象
    val sparkConf = new SparkConf
    sparkConf.remove("spark.jars")
    sparkConf.remove("spark.files")
    // 设置 Client 客户端参数
    val args = new ClientArguments(argStrings)
    // 新建客户端对象并 run()
    new Client(args, sparkConf).run()
  }

OK,从注释就可以发现依然是我们熟悉的两步走,1)封装我们的参数到一个对象中;2)以我们传入的参数为参数创建一个 client 对象并运行,首先来看我们的参数封装:

private[spark] class ClientArguments(args: Array[String]) {

  var userJar: String = null
  var userClass: String = null
  var primaryPyFile: String = null
  var primaryRFile: String = null
  var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()

  parseArgs(args.toList)

  private def parseArgs(inputArgs: List[String]): Unit = {
      ...
  }
}

这里又看到了我们熟悉的 JarClass 那不用多说,我们命令行的传入的那些参数在这里被封装成了 Client 对象的参数,接下来看下 run() 做了那些事情

def run(): Unit = {
    // 提交应用
    this.appId = submitApplication()
    ...
  }

进入 run() 方法第一行就是提交应用,而其返回了一个 appId,熟悉的读者此时就能意识到这个 appId 就是我们在 UI 界面上看到的那个,我们接着来看 submitApplication() 做了什么

def submitApplication(): ApplicationId = {
    var appId: ApplicationId = null
    try {
      launcherBackend.connect()
      // Setup the credentials before doing anything else,
      // so we have don't have issues at any point.
      setupCredentials()
      yarnClient.init(yarnConf)
      yarnClient.start()

      logInfo("Requesting a new application from cluster with %d NodeManagers"
        .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

      // Get a new application from our RM
      val newApp = yarnClient.createApplication()
      val newAppResponse = newApp.getNewApplicationResponse()
      appId = newAppResponse.getApplicationId()
      reportLauncherState(SparkAppHandle.State.SUBMITTED)
      launcherBackend.setAppId(appId.toString)

      new CallerContext("CLIENT", Option(appId.toString)).setCurrentContext()

      // Verify whether the cluster has enough resources for our AM
      verifyClusterResources(newAppResponse)

      // Set up the appropriate contexts to launch our AM
      val containerContext = createContainerLaunchContext(newAppResponse)
      val appContext = createApplicationSubmissionContext(newApp, containerContext)
	  ...
  }

从官方的注释可以很清楚的看出这里是在从 RM 中获得我们提交的 APP 并准备 AM 的环境这之中最关键的就是createContainerLaunchContext() 这个方法做的事情来看一下(方便阅读有部分省略):

private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
    ...
    // Add Xmx for AM memory
    javaOpts += "-Xmx" + amMemory + "m"
	
    val tmpDir = new Path(
      YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
      YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR
    )
    javaOpts += "-Djava.io.tmpdir=" + tmpDir
	...

    // Include driver-specific java options if we are launching a driver
    ...

    // For log4j configuration to reference
    ...
    val userClass =
      if (isClusterMode) {
        Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
      } else {
        Nil
      }
    val userJar =
      if (args.userJar != null) {
        Seq("--jar", args.userJar)
      } else {
        Nil
      }
    val primaryPyFile =
      if (isClusterMode && args.primaryPyFile != null) {
        Seq("--primary-py-file", new Path(args.primaryPyFile).getName())
      } else {
        Nil
      }
    val primaryRFile =
      if (args.primaryRFile != null) {
        Seq("--primary-r-file", args.primaryRFile)
      } else {
        Nil
      }
    val amClass =
      if (isClusterMode) {
        // 如果是集群模式
        Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
      } else {
        // 客户端模式
        Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
      }
    if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
      args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs
    }
    val userArgs = args.userArgs.flatMap { arg =>
      Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
    }
    val amArgs =
      Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++
        userArgs ++ Seq(
          "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
            LOCALIZED_CONF_DIR, SPARK_CONF_FILE))

    // Command for the ApplicationMaster
    val commands = prefixEnv ++ Seq(
        YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server"
      ) ++
      javaOpts ++ amArgs ++
      Seq(
        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
	...
    val printableCommands = commands.map(s => if (s == null) "null" else s).toList
    amContainer.setCommands(printableCommands.asJava)
	...
    amContainer
  }

上面的这段程序中中关键的就是下面这句,

YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server")
++ javaOpts ++ amArgs

我们可以很清楚的看到这里是在拼接一个 java 指令,javaOpts 是在设置 JVM 的参数,而 amArgs 则在将我们从命令行传入的那些参数例如 Ja,Class 等也拼接起来,而我们所启动的主类则可以从下面的代码中看到

if (isClusterMode) {
    // 如果是集群模式
    Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
    // 客户端模式
    Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}

在集群模式和非集群模式下我们启动的是不同的主类(事实上是一回事),接下来我们就得到了一条形如
command = bin/java org.apache.spark.deploy.yarn.ApplicationMaster 的指令(省略参数部分)用于启动我们的 AM,至此我们知道了我们提交应用实际上启动了一个 Yarn Client发送了一条指令 给 RM 让其启动 AM,而 AM 启动之后就会和 RM 交互并启动 container 来真正执行我们想要的计算了,这就留到下篇继续分析了,完。