欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark源码 —— 从 SparkSubmit 到 Driver启动

程序员文章站 2024-02-21 20:57:52
...

前言

本文主要是以笔记的整理方式写的,
仅以分享的方式供你阅读,
如有不对的地方欢迎指点错误。
读完本文可以学到:
当你用 shell 命令执行 spark-submit 之后,
到你的代码开始正式运行的一些列知识和细节,
恩...粗略的,要看的更细,可以按照流程自己撸源码哈~~~~

SparkSubmit

  • Spark-Submit脚本执行后,
    会执行到org.apache.spark.deploy.SparkSubmit
    所以我们从SparkSubmit 类开始,
    以下是org.apache.spark.deploy.SparkSubmit简单的时序图
Spark源码 —— 从 SparkSubmit 到 Driver启动
image.png
  • main方法:
    1. 解析我们传入的参数
    2. 根据 action 执行相对应的功能
      当然这里我们的 Action 是:SparkSubmitAction.SUBMIT
  def main(args: Array[String]): Unit = {
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }
  • prepareSubmitEnvironment:
    该方法主要是进行四个参数的解析:
···
private[deploy] def prepareSubmitEnvironment(args: 
SparkSubmitArguments)    : (Seq[String], Seq[String], Map[String, String], String) = {  // Return values  
val childArgs = new ArrayBuffer[String]() 
val childClasspath = new ArrayBuffer[String]()  
val sysProps = new HashMap[String, String]()  
var childMainClass = ""
 ...
 返回值
  (childArgs, childClasspath, sysProps, childMainClass)
  1. childArgs: 主要就是一些参数的
  2. childClasspath:这个就是classPath,jvm运行的class路径
  3. sysProps:一些系统参数
  4. childMainClass:接下来将要运行的主类
    • 如果是 Client模式,则该类就是我们自己编写的
    • 如果是Cluster 模式,则根据集群的不同返回不同的类:
      isStandaloneCluster:org.apache.spark.deploy.Client
      isYarnCluster: org.apache.spark.deploy.yarn.Client
  • runMain
    1. 加载 childClasspath下的 jars
    2. 设置系统参数 sysProps
    3. 运行 mainMethod,并传递参数
    mainMethod.invoke(null, childArgs.toArray)
    

至此Sumbmit任务完成,接下来我们以 Standalone Client为列,
进行org.apache.spark.deploy.Client相关源码分析

ClientEendpoint

Spark源码 —— 从 SparkSubmit 到 Driver启动
image.png
  • 创建ClientEendpoint,并将Master注册到 ClientEendpoint
  • ClientEendpointonstart 方法被调起,
    构建 DriverDescription,
    并指定 Drive r的主类是org.apache.spark.deploy.worker.DriverWrapper
    向 Master 申请RequestSubmitDriver(driverDescription)
  • Master 端收到请求后构建DriverInfo并加入到队列:
    waitingDrivers += driver
    drivers.add(driver)
  • master开始调度schedule(),并回复客户端申请成功的消息
  • 在Master的 schedule() 里面开始准备启动Driver

这里主要是将整条线理清楚了,
没有纠结细节,
如果有兴趣你可以按照这个线自己去看下源码
那么接下来就是启动Driver的过程了

Master调度

注意查看源码里面写的注释,
千万不要略过,
要不然本文就没啥意思了~~~

 private def schedule(): Unit = {
    if (state != RecoveryState.ALIVE) { return }
    // 将worker打乱,主要就是为了负载均衡
    val shuffledWorkers = Random.shuffle(workers)
    //筛选存活的 worker
    for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
      // 遍历等待启动的Driver
      for (driver <- waitingDrivers) {
        //如果该worker内存和core都满足要求
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
         // 启动Driver
          launchDriver(worker, driver)
          waitingDrivers -= driver
        }
      }
    }
    //实际上还会去启动Executor,
    //但是我们目前不关注这里,略过
    startExecutorsOnWorkers()
  }

我们重点看下 launchDriver做了什么

  private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
   //将driver的信息记录下来
    worker.addDriver(driver)
    // driver现在知道他该在哪个worker启动了
    driver.worker = Some(worker)
    // 向worker节点发送 LaunchDriver
    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
    // 标记driver已经运行,实际是Driver可能还没启动呢!!!
    driver.state = DriverState.RUNNING
  }

launchDriver主要就是给 Worker 发送了启动 Driver 的消息
接下来就可以看看 Worker 端是怎么处理 LaunchDriver 这个消息的了。

Wroker调度

//节选代码,不要介意是 case 开头
 case LaunchDriver(driverId, driverDesc) => {
      logInfo(s"Asked to launch driver $driverId")
      //new一个 DriverRunner 
      val driver = new DriverRunner(
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        workerUri,
        securityMgr)
        
      //DriverRunner start
      drivers(driverId) = driver
      driver.start()
      //记录下消耗的资源
      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem
    }

重点看到 DriverRunner 的 Start 方法

new Thread("DriverRunner for " + driverId) {
    override def run(){
        ...
       //创建driver工作目录
       val driverDir = createWorkingDirectory()
       //下载一些jars
       val localJarFilename = downloadUserJar(driverDir)
        ...
        // 通过系统的指令创建 jvm进程,至此正式启动
        launchDriver(builder, driverDir, driverDesc.supervise)
    }
}.start()

driver的启动是通过一个 DriverRunner类开启一个线程异步启动的,
其过程没有什么特殊的地方,
至此 Driver 正式启动完成了。
接下来就是分析 Driver 主类的启动了
org.apache.spark.deploy.worker.DriverWrapper
而实际上,该类主要的作用就是会:

// Delegate to supplied main class
val clazz = Utils.classForName(mainClass)
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
mainMethod.invoke(null, extraArgs.toArray[String])

调起我们自己写的主类方法,
至此,从我们敲下Spark-Submit之后,
终于执行到我们自己所写的代码了。

结言

Spark这部分源码流程比较简单清楚,
基本没有太多弯弯道道,
但是就算简单,那也是需要你自己去琢磨去看的,
否则你还是不能清楚的知道,
你的那个 spark-submit 敲下之后,
怎么就执行到你的代码了呢?
OK,就到这里了,
如果没有意外,
本人应该会继续更新一系列的Spark源码文章,
如果你有兴趣,不妨关注一下,

最后,求赞 ~~~~