欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark源码系列(五)Spark Submit任务提交

程序员文章站 2022-04-01 16:38:25
...

Spark源码系列:Spark Submit任务提交


前面几篇文章讲的是DAGScheduler,分析的是spark任务提交后的Stage划分。一开始没有想到按照整体任务提交的流程去写系列源码文章,所以还是写博客经验有所欠缺呀。那么从这篇文章开始我们从Spark任务提交开始,研究Spark内部是如何运行的,Spark任务是如何从开始运行到结束的。

Spark应用程序在集群上以独立的进程运行,整个的任务执行过程如下:

Spark源码系列(五)Spark Submit任务提交

  1. 用户提交任务,初始化SparkContext对象后,SparkContext负责协调Spark任务在cluster上的运行
  2. SparkContext需要连接到集群管理器Cluster Manager,申请资源,注册Application。在生产环境中,集群管理器通常是指Yarn。集群管理器负责在应用程序之间分配资源
  3. 连接到Cluster Manager后,根据申请到的资源,在集群中的Worker节点上创建Executor
  4. Executor创建后,反馈信息给Driver
  5. SparkContext初始化过程中创建并启动DAGScheduler将用户提交的任务进行Stage拆分最后转化为Task任务,完成Task任务的最佳计算位置后,将Task任务发送给指定Executor,进行任务计算执行
  6. 将Task计算结果返回Driver,Spark任务计算完毕,随后关闭Spark任务等。

前面讲了大概的Spark任务整体流程,那么下面我们将从Spark Submit开始讲起,一步步深入去看下任务提交的整体流程。

客户端任务提交

最开始自然是客户端提交用户自己编写的Spark程序,使用spark-submit脚本去提交用户的程序。

在提交Spark任务时,使用$SPARK_HOME/bin目录下的spark-submit脚本去提交。

./bin/spark-submit \
 --class <main-class> \
 --master <master-url> \
 --deploy-mode <deply-mode> \
 --conf <key>=<value> \
 ... # other options
 <application-jars> \
 [application-arguments]
  • –class表示任务的入口
  • –master表示master地址,这是集群中master的URL地址(比如说spark://10.142.97.4:7077)
  • –deploy-mode表示部署模式,是否将用户的Driver程序部署到集群的Worker节点,或者将本地作为外部client客户端模式。在生产环境中,我们通常选用cluster模式,并且都是用Yarn来做资源管理器
  • –conf表示spark配置,k-v形式
  • application-jar:用户程序的Jar包路径
  • application-arguments表示用户程序所需要的参数

举个更为实际的例子:

./bin/spark-submit \
 --class org.apache.spark.examples.SparkPi \
 --master spark://10.142.97.4:7077 \
 --deploy-mode cluster \
 --supervise \
 --executor-memory 2G \
 --total-executors-cores 5 \
 /path/examples.jar \
 <program arguments>
  • class指定程序入口
  • master指定master URL地址
  • deploy mode指定程序部署模式为cluster集群模式
  • supervise表示在程序执行失败后,重新启动application
  • executor-memory 2G表示每个executor的内存为2G
  • total-executor-cores 5表示executor的cpu总核数为5
  • /path/examples.jar是程序的jar包
  • 表示程序所需要的参数

这些脚本会将这些参数代入到spark-submit脚本中去执行,具体来看一下spark/bin/spark-submit脚本内容:

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "aaa@qq.com"
  • 首先检查SPARK_HOME变量是否为空。如果为空则执行then后面的程序,即执行当前目录下的find-spark-home脚本文件,设置SPARK_HOME值
  • 脚本最后调用exec执行"${SPARK_HOME}"/bin/spark-class 调用class为:org.apache.spark.deploy.SparkSubmit,后面的"aaa@qq.com"是脚本执行的所有参数。实际上是调用了spark-class脚本最后进行任务的提交

继续看/spark/bin/spark-class脚本代码内容:

# -z检查设置SPARK_HOME的值
if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi
# 执行load-spark-env.sh脚本文件,主要目的在于加载设定一些变量值。设定spark-env.sh中的变量值到环境变量中
. "${SPARK_HOME}"/bin/load-spark-env.sh

# 检查设定java环境值
# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
else
  if [ "$(command -v java)" ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi
# 设置关联class文件
# Find Spark jars.
if [ -d "${SPARK_HOME}/jars" ]; then
  SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
  SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi

if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
  echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
  echo "You need to build Spark with the target \"package\" before running this program." 1>&2
  exit 1
else
  LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi

# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
  LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi

# For tests
if [[ -n "$SPARK_TESTING" ]]; then
  unset YARN_CONF_DIR
  unset HADOOP_CONF_DIR
fi

# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
#
# The exit code of the launcher is appended to the output, so the parent shell removes it from the
# command array and checks the value to see if the launcher succeeded.
# 执行类文件org.apache.spark.launcher.Main,返回解析后的参数
build_command() {
  "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "aaa@qq.com"
  printf "%d\0" $?
}

# Turn off posix mode since it does not allow process substitution
# 将build_command方法解析后的参数赋给CMD
set +o posix
CMD=()
while IFS= read -d '' -r ARG; do
  CMD+=("$ARG")
done < <(build_command "aaa@qq.com")

COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}

# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
# the code that parses the output of the launcher to get confused. In those cases, check if the
# exit code is an integer, and if it's not, handle it as a special error case.
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
  echo "${CMD[@]}" | head -n-1 1>&2
  exit 1
fi

if [ $LAUNCHER_EXIT_CODE != 0 ]; then
  exit $LAUNCHER_EXIT_CODE
fi

CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"

spark-class脚本大致逻辑总结一下:检查SPARK_HOME环境变量;执行load-spark-env.sh文件;

检查设定java的执行路径变量值;find spark jars,设定一些引用相关类的位置变量;执行类文件org.apache.spark.launcher.Main,返回解析后的参数给CMD;判断解析参数是否正确;最后执行SparkSubmit类。

通过spark-class脚本,最终执行的命令中,制定了程序的入口为org.apache.spark.deploy.SparkSubmit。

源码解析

org.apache.spark.deploy.SparkSubmit

最终执行的命令中,指定了程序的入口为org.apache.spark.deploy.SparkSubmit,具体进一步查看主函数

 def main(args: Array[String]): Unit = {
    //  SparkSubmitArguments继承了SparkSubmitArgumentsParser,对提交参数进行解析
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    }
    //  对appArgs的action进行模式匹配
    appArgs.action match {
        //  如果是SUBMIT,则调用submit
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      //  如果是KILL,则调用kill
      case SparkSubmitAction.KILL => kill(appArgs)
      //  如果是REQUEST_STATUS,则调用requestStatus
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }

从main方法中可以看出,根据解析后的参数中的action进行模式匹配,对应到实际的操作中。由于当前是submit操作,那么调用submit方法。

submit方法中,调用了prepareSubmitEnvironment(args),用于准备提交环境。

private def submit(args: SparkSubmitArguments): Unit = {
  val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
  ...
}

prepareSubmitEnvironment方法中设置一些基本参数。其中主要是根据args中的master和deploy-mode来设置对应的clusterManager和部署模式

private[deploy] def prepareSubmitEnvironment(
    args: SparkSubmitArguments,
    conf: Option[HadoopConfiguration] = None)
    : (Seq[String], Seq[String], Map[String, String], String) = {
  // Return values
  // 要返回的四个参数
  val childArgs = new ArrayBuffer[String]()
  val childClasspath = new ArrayBuffer[String]()
  val sysProps = new HashMap[String, String]()
  var childMainClass = ""

  // 根据脚本中配置的master参数进行模式匹配出clusterManager    
  // Set the cluster manager
  val clusterManager: Int = args.master match {
    case "yarn" => YARN
    case "yarn-client" | "yarn-cluster" =>
      printWarning(s"Master ${args.master} is deprecated since 2.0." +
        " Please use master \"yarn\" with specified deploy mode instead.")
      YARN
    case m if m.startsWith("spark") => STANDALONE
    case m if m.startsWith("mesos") => MESOS
    case m if m.startsWith("local") => LOCAL
    case _ =>
      printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
      -1
  }

  // 根据deployMode参数去模式匹配出部署模式    
  // Set the deploy mode; default is client mode
  var deployMode: Int = args.deployMode match {
    case "client" | null => CLIENT
    case "cluster" => CLUSTER
    case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1
  }

  // Because the deprecated way of specifying "yarn-cluster" and "yarn-client" encapsulate both
  // the master and deploy mode, we have some logic to infer the master and deploy mode
  // from each other if only one is specified, or exit early if they are at odds.
  if (clusterManager == YARN) {
    (args.master, args.deployMode) match {
      case ("yarn-cluster", null) =>
        deployMode = CLUSTER
        args.master = "yarn"
      case ("yarn-cluster", "client") =>
        printErrorAndExit("Client deploy mode is not compatible with master \"yarn-cluster\"")
      case ("yarn-client", "cluster") =>
        printErrorAndExit("Cluster deploy mode is not compatible with master \"yarn-client\"")
      case (_, mode) =>
        args.master = "yarn"
    }
  }
  ...
  // 由于我们生产环境基本上都是用yarn-cluster模式,所有我们重点沿着这个部署模式进行源码分析
  // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
    if (isYarnCluster) {
      childMainClass = "org.apache.spark.deploy.yarn.Client"
      if (args.isPython) {
        childArgs += ("--primary-py-file", args.primaryResource)
        childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
      } else if (args.isR) {
        val mainFile = new Path(args.primaryResource).getName
        childArgs += ("--primary-r-file", mainFile)
        childArgs += ("--class", "org.apache.spark.deploy.RRunner")
      } else {
        if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
          childArgs += ("--jar", args.primaryResource)
        }
  //  --class指定的是AppMaster里启动的Driver,也就是我们应用程序的入口main类
        childArgs += ("--class", args.mainClass)
      }
      if (args.childArgs != null) {
        args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
      }
    }    

standalone模式和cluster模式区别很明显,就是spark任务是运行在spark独立集群还是yarn统一集群上的区别,这里不做过多的说明。我们就yarn-client和yarn-cluster做个区别说明,yarn-client的Driver运行在客户端本地,而AppMaster运行在yarn的一个节点上,它们之间进行rpc通信,AppMaster只负责资源申请和释放,然后等待Driver的完成;而yarn-cluster模式下的Driver运行在AppMaster所在的container中,Driver和AppMaster是同一个进程的两个不同线程,它们之间也会进行通信,AppMaster同样等待Driver的完成,从而释放资源。

在yarn-client模式里,优先运行的是Driver(我们写的应用程序就是入口),然后初始化SparkContext时,会作为client端向yarn申请AppMaster资源,当AppMaster运行后,它会向yarn注册自己并申请Executor资源,之后由本地Driver与其通信控制任务运行,而AppMaster则时刻监控Driver的运行情况。如果Driver端完成或者意外退出,AppMaster会释放资源并注销自己,所以在yarn-client模式下,如果运行spark-submit的程序退出了,整个任务也会退出。

在yarn-cluster模式中,本地进程则仅仅是一个client端,它会优先向yarn申请AppMaster资源用于执行AppMaster,在运行AppMaster时通过reflect启动Driver,也就是我们的应用程序,在SparkContext初始化成功后,再向yarn注册自己并申请Executor资源,此时Driver与AppMaster运行在同一个container中,但是是两个不同的线程,当Driver运行完毕,AppMaster会释放资源并注销自己。所以在该模式下,本地进程仅作为一个client提出submit任务的申请,如果结束了该进程,这个Spark任务并不会受到影响,因为Driver进程是在远程执行的。

从代码中可以看到,cluster模式下运行的是org.apache.spark.deploy.yarn.Client。在prepareSubmitEnvironment方法中,主要负责解析用户参数,设置环境变量env,处理python/R等依赖,然后针对不同的部署模式,匹配不同的运行主类,比如:yarn-client>args.mainClass,yarn-cluster>org.apache.spark.yarn.Client.

对于yarn-cluster模式,执行的是org.apache.spark.deploy.yarn.Client#Main方法

def main(argStrings: Array[String]) {
  if (!sys.props.contains("SPARK_SUBMIT")) {
    logWarning("WARNING: This client is deprecated and will be removed in a " +
      "future version of Spark. Use ./bin/spark-submit with \"--master yarn\"")
  }

  // Set an env variable indicating we are running in YARN mode.
  // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
  System.setProperty("SPARK_YARN_MODE", "true")
  val sparkConf = new SparkConf
  // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
  // so remove them from sparkConf here for yarn mode.
  sparkConf.remove("spark.jars")
  sparkConf.remove("spark.files")
  val args = new ClientArguments(argStrings)
  new Client(args, sparkConf).run()
}

在Client伴生对象中构建了Client类的对象,然后调用了Client.run方法。

/**
 * Submit an application to the ResourceManager.
 * If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
 * reporting the application's status until the application has exited for any reason.
 * Otherwise, the client process will exit after submission.
 * If the application finishes with a failed, killed, or undefined status,
 * throw an appropriate SparkException.
 */
def run(): Unit = {
  // 重点是这一行代码
  this.appId = submitApplication()
  if (!launcherBackend.isConnected() && fireAndForget) {
    val report = getApplicationReport(appId)
    val state = report.getYarnApplicationState
    logInfo(s"Application report for $appId (state: $state)")
    logInfo(formatReportDetails(report))
    if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
      throw new SparkException(s"Application $appId finished with status: $state")
    }
  } else {
    val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId)
    if (yarnApplicationState == YarnApplicationState.FAILED ||
      finalApplicationStatus == FinalApplicationStatus.FAILED) {
      throw new SparkException(s"Application $appId finished with failed status")
    }
    if (yarnApplicationState == YarnApplicationState.KILLED ||
      finalApplicationStatus == FinalApplicationStatus.KILLED) {
      throw new SparkException(s"Application $appId is killed")
    }
    if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
      throw new SparkException(s"The final status of application $appId is undefined")
    }
  }
}

重点就是submitApplication()方法,提交任务到yarn,拿到提交完的appID后,监控app的状态。

def submitApplication(): ApplicationId = {
  var appId: ApplicationId = null
  try {
    launcherBackend.connect()
    // Setup the credentials before doing anything else,
    // so we have don't have issues at any point.
    // 获取提交用户的Credentials,用于后面获取delegationToken
    setupCredentials()
    yarnClient.init(yarnConf)
    yarnClient.start()

    logInfo("Requesting a new application from cluster with %d NodeManagers"
      .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

    // Get a new application from our RM
    val newApp = yarnClient.createApplication()
    val newAppResponse = newApp.getNewApplicationResponse()
    // 获取appID
    appId = newAppResponse.getApplicationId()

    new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
      Option(appId.toString)).setCurrentContext()

    // Verify whether the cluster has enough resources for our AM
    verifyClusterResources(newAppResponse)

    // Set up the appropriate contexts to launch our AM
    // 创建AppMaster运行的context,为其准备运行环境,java options,以及需要运行的java命令,AppMaster通过该命令在yarn节点上启动
    val containerContext = createContainerLaunchContext(newAppResponse)
    val appContext = createApplicationSubmissionContext(newApp, containerContext)

    // Finally, submit and monitor the application
    logInfo(s"Submitting application $appId to ResourceManager")
    // 通过yarnClient提交app
    yarnClient.submitApplication(appContext)
    launcherBackend.setAppId(appId.toString)
    reportLauncherState(SparkAppHandle.State.SUBMITTED)

    appId
  } catch {
    case e: Throwable =>
      if (appId != null) {
        cleanupStagingDir(appId)
      }
      throw e
  }
}

在submitApplication中完成了app申请,AppMaster context创建,最后完成了任务的提交,对于yarn-cluster模式而言,任务提交后本地进程只是一个client,Driver运行在与AppMaster同一个container中。

createContainerLaunchContext方法的功能是创建AppMaster container context,这里面会指定AppMaster里面是否运行Driver。

val amClass =
  if (isClusterMode) {
    Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
  } else {
    Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
  }

这里只把里面最核心的几行代码贴出来。通过判断是否为cluster模式,设置AppMaster运行的主类。ExecutorLauncher只是ApplicationMaster的一个wrapper。

yarn-client模式的具体代码细节我们这里不做描述,毕竟生产环境中都是yarn-cluster部署模式。

总结

大致Spark-Submit过程就是这样:

  1. 应用程序通过spark-submit进行提交并设置一些参数
  2. spark-submit调用spark-class.sh
  3. 最后入口定位到SparkSubmit方法
  4. 由于生产环境基本都是统一提交到Yarn集群,所以源码中会匹配提交模式,最后定位到yarn-cluster部署模式。
  5. Client创建Yarn Client,然后向Yarn发送执行指令:bin/java ApplicationMaster。
  6. Yarn收到指令后会在指定的NodeManager中启动ApplicationMaster。
  7. ApplicationMaster启动Driver线程,执行用户的作业。