欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

[spark] standalone集群模式Driver启动过程

程序员文章站 2022-03-24 17:38:20
...

      本篇文章简单整理一下spark在standalone集训模式下启动Driver的流程,本篇文章只解析到Driver启动成功,启动后续任务执行在后面博客更新,个人比较喜欢从代码跟踪,文章代码粘贴只提取部分重要代码。。。。。。

一、脚本查看

spark-submit触发任务的提交,查看spark-submit脚本会看出最终执行任务的主类是:

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "aaa@qq.com"

[spark] standalone集群模式Driver启动过程

二、源码解析

idea打开源码包(spark2.1.1),查看SparkSubmitmain方法,很明显我们需要查看submit()方法

 //提交任务主类运行
override def main(args: Array[String]): Unit = {
    val uninitLog = initializeLogIfNecessary(true, silent = true)
    //设置参数
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    }
    appArgs.action match {
      //任务提交匹配 submit
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }

submit()方法体内第一行注明了一个重点属性childMainClass,所以这里需要关注prepareSubmitEnvironment()的调用过程

submit()

@tailrec
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
//以下方法返回四元组,重点注意childMainClass类 这里以standalone-cluster为例
val (childArgs, childClasspath, sparkConf, childMainClass)=prepareSubmitEnvironment(args)
   ........
 //运行
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
}

prepareSubmitEnvironment()==>doPrepareSubmitEnvironment(),为了查看主要逻辑代码,删除了大部分代码,这里需要重点关注不同deployMode下的childMainClass的变化,代码注释很清楚,方法最后返回四元组

private def doPrepareSubmitEnvironment(
   .......
    var childMainClass = ""
.........
//客户端模式提交任务,那么这里args.mainClass就是我们提交任务的主类,直接在客户端启动Driver
    if (deployMode == CLIENT) {
      childMainClass = args.mainClass
      if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
        childClasspath += localPrimaryResource
      }
      if (localJars != null) { childClasspath ++= localJars.split(",") }
    }
.........
 //standalone-cluster模式
    if (args.isStandaloneCluster) {
      //使用rest风格,这里rest提交是指使用json 格式和http 提交任务 ,spark1.3+支持
      if (args.useRest) {
        childMainClass = REST_CLUSTER_SUBMIT_CLASS
        childArgs += (args.primaryResource, args.mainClass)
      } else {
        //正常提交方式
        childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
        if (args.supervise) { childArgs += "--supervise" }
        Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
        Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
        childArgs += "launch"
        childArgs += (args.master, args.primaryResource, args.mainClass)
      }
      if (args.childArgs != null) {
        childArgs ++= args.childArgs
      }
    }
......
(childArgs, childClasspath, sparkConf, childMainClass)
}

当deployMode为CLIENT时,driver会在客户端直接运行,这里我们关注集群模式提交的任务,也是生产环境中用到的。STANDALONE_CLUSTER_SUBMIT_CLASS对应的类是ClientApp,文章后面会再次解析这个类,这里先跳回到主流程submit

//org.apache.spark.deploy.ClientApp
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()

在submit()方法最后一行会执行runMain()方法,runmain方法内部会加载、利用反射实例化childMainClass的引用类(ClientApp),并调用start方法

 private def runMain(
      childArgs: Seq[String],
      childClasspath: Seq[String],
      sparkConf: SparkConf,
      childMainClass: String,
     ......
    try {
      //加载类
      mainClass = Utils.classForName(childMainClass)
    } catch {
    ......
    //将mainClass 映射成SparkApplication对象
    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass))         {
      mainClass.newInstance().asInstanceOf[SparkApplication]
    ......
    try {
      //调用start方法,这里调用的是ClientApp的start方法
      app.start(childArgs.toArray, sparkConf)
    } catch {
......

继续跟踪到ClientApp的start方法,其实也是ClientAPP的唯一方法,从代码中可以看出ClientAPP的start()方法主要做了三件事:

1、创建rpc环境

2、获取所有master引用

3、注册ClientEndpoint

override def start(args: Array[String], conf: SparkConf): Unit = {
    val driverArgs = new ClientArguments(args)
    if (!conf.contains("spark.rpc.askTimeout")) {
      conf.set("spark.rpc.askTimeout", "10s")
    }
    Logger.getRootLogger.setLevel(driverArgs.logLevel)
    //创建rpc通信环境
    val rpcEnv =RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
    //得到Master的通信邮箱
    val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
    //在rpc中设置提交当前任务的Endpoint,只要设置肯定会运行 new ClientEndpoint 类的 start方法
    rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
    rpcEnv.awaitTermination()
}

当endpoint被注册时,很明显会执行onstart()方法,所以继续看ClientEndpoint的onstart(),方法中重点关注属性mainClass、command对象、driverDescription对象,最后会调用asyncSendToMasterAndForwardReply方法向master提交信息

override def onStart(): Unit = {
    driverArgs.cmd match {
      case "launch" =>
        val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
       ......
        //将DriverWrapper 这个类封装到Command中
        val command = new Command(mainClass,Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,sys.env, classPathEntries, libraryPathEntries, javaOpts)

        val driverDescription = new DriverDescription(
          driverArgs.jarUrl,driverArgs.memory,driverArgs.cores,
          driverArgs.supervise,command)
        //向Master申请启动Driver,Master中的 receiveAndReply 方法会接收此请求消息
        asyncSendToMasterAndForwardReply[SubmitDriverResponse](
          RequestSubmitDriver(driverDescription))
     ......
  }

asyncSendToMasterAndForwardReply方法继续调用masterEndpoint的ask方法,对应的,master的receiveAndReply方法会有相应处理

  private def asyncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = {
    for (masterEndpoint <- masterEndpoints) {
      masterEndpoint.ask[T](message).onComplete {
        case Success(v) => self.send(v)
        case Failure(e) =>
          logWarning(s"Error sending messages to master $masterEndpoint", e)
      }(forwardMessageExecutionContext)
    }
  }

Master的receiveAndReply方法中除了创建Driver、回复消息之外,调用了一个核心的方法----------schedule()

  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case RequestSubmitDriver(description) =>
      //判断Master状态
      if (state != RecoveryState.ALIVE) {
        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
          "Can only accept driver submissions in ALIVE state."
        context.reply(SubmitDriverResponse(self, false, None, msg))
      } else {
        logInfo("Driver submitted " + description.command.mainClass)
        val driver = createDriver(description)
        persistenceEngine.addDriver(driver)
        waitingDrivers += driver
        drivers.add(driver)
        schedule()

        // TODO: It might be good to instead have the submission client poll the master to determine
        //       the current status of the driver. For now it's simply "fire and forget".

        context.reply(SubmitDriverResponse(self, true, Some(driver.id),
          s"Driver successfully submitted as ${driver.id}"))
      }

再次进入schedule()方法,这个方法很重要,注释我都贴下来了,这个方法会将会筛选可用workers中满足资源条件的worker,做两件牛X的事情,Driver调度、Executor资源分配

//schedule() 方法是通用的方法
//这个方法中当申请启动Driver的时候也会执行,但是最后一行的startExecutorsOnWorkers 方法中 //waitingApp是空的,只是启动Driver。
//在提交application时也会执行到这个scheduler方法,这个时候就是要启动的Driver是空的,但是会直接//运行startExecutorsOnWorkers 方法给当前的application分配资源

  private def schedule(): Unit = {
    //判断Master状态
    if (state != RecoveryState.ALIVE) {return}
    // Drivers take strict precedence over executors 这里是打散worker
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    //可用的worker数量
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
      // start from the last worker that was assigned a driver, and continue onwards until we have
      // explored all alive workers.
      var launched = false
      var numWorkersVisited = 0
      while (numWorkersVisited < numWorkersAlive && !launched) {
        //拿到curPos位置的worker
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          //这里是启动Driver,启动Driver之后会为当前的application 申请资源
          launchDriver(worker, driver)
          waitingDrivers -= driver
          launched = true
        }
        //curPos 就是一直加一的往后取 Worker  ,一直找到满足资源的worker
        curPos = (curPos + 1) % numWorkersAlive
      }
    }
    startExecutorsOnWorkers()
  }

接下来就是worker该工作了,满足条件的worker会执行launchDriver()方法,也就是给worker发送消息启动driver

private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    worker.addDriver(driver)
    driver.worker = Some(worker)
    //给Worker发送消息启动Driver,这里在Worker中会有receive方法一直匹配LaunchDriver
    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
    driver.state = DriverState.RUNNING
}

所以最后到Worker的receive()方法,Driver在worker节点中启动

override def receive: PartialFunction[Any, Unit] = synchronized {
........
//启动Driver,这里说启动的Driver就是刚才说的 val //mainClass="org.apache.spark.deploy.worker.DriverWrapper"
// Driver启动就是DriverWrapper类启动,DriverWrapper的启动就是在Worker中创建一个Driver 进程,
//之后就是启动DriverWrapper的main方法

    case LaunchDriver(driverId, driverDesc) =>
      logInfo(s"Asked to launch driver $driverId")
      val driver = new DriverRunner(
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        workerUri,
        securityMgr)
      drivers(driverId) = driver
      //启动Driver,会初始化 org.apache.spark.deploy.worker.DriverWrapper ,运行main方法
      driver.start()

      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem
.........

三、总结

这样直接看源码没思路看完文章没毛用,所以简单整理一下思路,大致流程是:客户端在spark集群中的一个节点提交任务(spark-submit),并在参数注明deployMode类型(client,cluster),ClientApp和master通信,封装driver对象发送个master,master挑选一台满足资源的worker,然后Worker在本机fork()一个DriverWrapper对象,DriverWrapper对象会执行客户端的逻辑代码

具体步骤整理:

1、客户端提交任务到集群,客户端spark-submit任务执行SparkSubmit类的main方法,main方法中调用submit()

2、集群根据参数判断客户端提交任务模式,doPrepareSubmitEnvironment()方法中判断deployMode值来决定提交任务的环境(本文只分析cluster模式任务)

3、SparkSubmit对象在runMain()中加载并利用反射实例化childMainClass的引用类(ClientApp)

4、执行ClientApp的start()方法,该方法主要完成三件事:a、创建rpc环境;  b、获取所有master引用;  c、注册ClientEndpoint

5、ClientEndpoint被注册后会执行onstart()方法,该方法会封装一个DriverWrapper对象到Command中,然后将Command的实例化对象封装到DriverDescription中,然后向Master申请启动Driver,Master中的receiveAndReply 方法会接收此请求消息

6、Master匹配消息类型(RequestSubmitDriver)然后处理,创建driver对象,回复消息,重要的是执行schedule()方法调度资源

7、Master的schedule()方法内部会删选满足条件(内存、核心数量)的worker随机挑选一节点发送启动Driver命令

8、Worker收到消息匹配类型(LaunchDriver)启动Driver,运行DriverMapper的main()方法

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

相关标签: spark