[spark] standalone集群模式Driver启动过程
本篇文章简单整理一下spark在standalone集训模式下启动Driver的流程,本篇文章只解析到Driver启动成功,启动后续任务执行在后面博客更新,个人比较喜欢从代码跟踪,文章代码粘贴只提取部分重要代码。。。。。。
一、脚本查看
spark-submit触发任务的提交,查看spark-submit脚本会看出最终执行任务的主类是:
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "aaa@qq.com"
二、源码解析
idea打开源码包(spark2.1.1),查看SparkSubmit的main方法,很明显我们需要查看submit()方法
//提交任务主类运行
override def main(args: Array[String]): Unit = {
val uninitLog = initializeLogIfNecessary(true, silent = true)
//设置参数
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
//任务提交匹配 submit
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
submit()方法体内第一行注明了一个重点属性childMainClass,所以这里需要关注prepareSubmitEnvironment()的调用过程
submit()
@tailrec
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
//以下方法返回四元组,重点注意childMainClass类 这里以standalone-cluster为例
val (childArgs, childClasspath, sparkConf, childMainClass)=prepareSubmitEnvironment(args)
........
//运行
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
}
prepareSubmitEnvironment()==>doPrepareSubmitEnvironment(),为了查看主要逻辑代码,删除了大部分代码,这里需要重点关注不同deployMode下的childMainClass的变化,代码注释很清楚,方法最后返回四元组
private def doPrepareSubmitEnvironment(
.......
var childMainClass = ""
.........
//客户端模式提交任务,那么这里args.mainClass就是我们提交任务的主类,直接在客户端启动Driver
if (deployMode == CLIENT) {
childMainClass = args.mainClass
if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
childClasspath += localPrimaryResource
}
if (localJars != null) { childClasspath ++= localJars.split(",") }
}
.........
//standalone-cluster模式
if (args.isStandaloneCluster) {
//使用rest风格,这里rest提交是指使用json 格式和http 提交任务 ,spark1.3+支持
if (args.useRest) {
childMainClass = REST_CLUSTER_SUBMIT_CLASS
childArgs += (args.primaryResource, args.mainClass)
} else {
//正常提交方式
childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
if (args.supervise) { childArgs += "--supervise" }
Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
}
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
}
......
(childArgs, childClasspath, sparkConf, childMainClass)
}
当deployMode为CLIENT时,driver会在客户端直接运行,这里我们关注集群模式提交的任务,也是生产环境中用到的。STANDALONE_CLUSTER_SUBMIT_CLASS对应的类是ClientApp,文章后面会再次解析这个类,这里先跳回到主流程submit
//org.apache.spark.deploy.ClientApp
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
在submit()方法最后一行会执行runMain()方法,runmain方法内部会加载、利用反射实例化childMainClass的引用类(ClientApp),并调用start方法
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sparkConf: SparkConf,
childMainClass: String,
......
try {
//加载类
mainClass = Utils.classForName(childMainClass)
} catch {
......
//将mainClass 映射成SparkApplication对象
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.newInstance().asInstanceOf[SparkApplication]
......
try {
//调用start方法,这里调用的是ClientApp的start方法
app.start(childArgs.toArray, sparkConf)
} catch {
......
继续跟踪到ClientApp的start方法,其实也是ClientAPP的唯一方法,从代码中可以看出ClientAPP的start()方法主要做了三件事:
1、创建rpc环境
2、获取所有master引用
3、注册ClientEndpoint
override def start(args: Array[String], conf: SparkConf): Unit = {
val driverArgs = new ClientArguments(args)
if (!conf.contains("spark.rpc.askTimeout")) {
conf.set("spark.rpc.askTimeout", "10s")
}
Logger.getRootLogger.setLevel(driverArgs.logLevel)
//创建rpc通信环境
val rpcEnv =RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
//得到Master的通信邮箱
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
//在rpc中设置提交当前任务的Endpoint,只要设置肯定会运行 new ClientEndpoint 类的 start方法
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
rpcEnv.awaitTermination()
}
当endpoint被注册时,很明显会执行onstart()方法,所以继续看ClientEndpoint的onstart(),方法中重点关注属性mainClass、command对象、driverDescription对象,最后会调用asyncSendToMasterAndForwardReply方法向master提交信息
override def onStart(): Unit = {
driverArgs.cmd match {
case "launch" =>
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
......
//将DriverWrapper 这个类封装到Command中
val command = new Command(mainClass,Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,sys.env, classPathEntries, libraryPathEntries, javaOpts)
val driverDescription = new DriverDescription(
driverArgs.jarUrl,driverArgs.memory,driverArgs.cores,
driverArgs.supervise,command)
//向Master申请启动Driver,Master中的 receiveAndReply 方法会接收此请求消息
asyncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))
......
}
asyncSendToMasterAndForwardReply方法继续调用masterEndpoint的ask方法,对应的,master的receiveAndReply方法会有相应处理
private def asyncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = {
for (masterEndpoint <- masterEndpoints) {
masterEndpoint.ask[T](message).onComplete {
case Success(v) => self.send(v)
case Failure(e) =>
logWarning(s"Error sending messages to master $masterEndpoint", e)
}(forwardMessageExecutionContext)
}
}
Master的receiveAndReply方法中除了创建Driver、回复消息之外,调用了一个核心的方法----------schedule()
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RequestSubmitDriver(description) =>
//判断Master状态
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
schedule()
// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. For now it's simply "fire and forget".
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
再次进入schedule()方法,这个方法很重要,注释我都贴下来了,这个方法会将会筛选可用workers中满足资源条件的worker,做两件牛X的事情,Driver调度、Executor资源分配
//schedule() 方法是通用的方法
//这个方法中当申请启动Driver的时候也会执行,但是最后一行的startExecutorsOnWorkers 方法中 //waitingApp是空的,只是启动Driver。
//在提交application时也会执行到这个scheduler方法,这个时候就是要启动的Driver是空的,但是会直接//运行startExecutorsOnWorkers 方法给当前的application分配资源
private def schedule(): Unit = {
//判断Master状态
if (state != RecoveryState.ALIVE) {return}
// Drivers take strict precedence over executors 这里是打散worker
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
//可用的worker数量
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
//拿到curPos位置的worker
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
//这里是启动Driver,启动Driver之后会为当前的application 申请资源
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
//curPos 就是一直加一的往后取 Worker ,一直找到满足资源的worker
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}
接下来就是worker该工作了,满足条件的worker会执行launchDriver()方法,也就是给worker发送消息启动driver
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
//给Worker发送消息启动Driver,这里在Worker中会有receive方法一直匹配LaunchDriver
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
driver.state = DriverState.RUNNING
}
所以最后到Worker的receive()方法,Driver在worker节点中启动
override def receive: PartialFunction[Any, Unit] = synchronized {
........
//启动Driver,这里说启动的Driver就是刚才说的 val //mainClass="org.apache.spark.deploy.worker.DriverWrapper"
// Driver启动就是DriverWrapper类启动,DriverWrapper的启动就是在Worker中创建一个Driver 进程,
//之后就是启动DriverWrapper的main方法
case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
drivers(driverId) = driver
//启动Driver,会初始化 org.apache.spark.deploy.worker.DriverWrapper ,运行main方法
driver.start()
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
.........
三、总结
这样直接看源码没思路看完文章没毛用,所以简单整理一下思路,大致流程是:客户端在spark集群中的一个节点提交任务(spark-submit),并在参数注明deployMode类型(client,cluster),ClientApp和master通信,封装driver对象发送个master,master挑选一台满足资源的worker,然后Worker在本机fork()一个DriverWrapper对象,DriverWrapper对象会执行客户端的逻辑代码
具体步骤整理:
1、客户端提交任务到集群,客户端spark-submit任务执行SparkSubmit类的main方法,main方法中调用submit()
2、集群根据参数判断客户端提交任务模式,doPrepareSubmitEnvironment()方法中判断deployMode值来决定提交任务的环境(本文只分析cluster模式任务)
3、SparkSubmit对象在runMain()中加载并利用反射实例化childMainClass的引用类(ClientApp)
4、执行ClientApp的start()方法,该方法主要完成三件事:a、创建rpc环境; b、获取所有master引用; c、注册ClientEndpoint
5、ClientEndpoint被注册后会执行onstart()方法,该方法会封装一个DriverWrapper对象到Command中,然后将Command的实例化对象封装到DriverDescription中,然后向Master申请启动Driver,Master中的receiveAndReply 方法会接收此请求消息
6、Master匹配消息类型(RequestSubmitDriver)然后处理,创建driver对象,回复消息,重要的是执行schedule()方法调度资源
7、Master的schedule()方法内部会删选满足条件(内存、核心数量)的worker随机挑选一节点发送启动Driver命令
8、Worker收到消息匹配类型(LaunchDriver)启动Driver,运行DriverMapper的main()方法