欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark on Yarn分析

程序员文章站 2024-02-23 08:59:58
...

Spark所有的任务,都是由Spark-submit来提交的。所以我们从这个类看起。

SparkSubmit

override def main(args: Array[String]): Unit = {
    //配置启动参数
    val appArgs = new SparkSubmitArguments(args)
    //是否打印参数,我们不用在意这个
    if (appArgs.verbose) {
        printStream.println(appArgs)
    }
    //配置任务类型,默认是Submit,为什么后面会讲
    appArgs.action match {
        case SparkSubmitAction.SUBMIT => submit(appArgs)
        case SparkSubmitAction.KILL => kill(appArgs)
        case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
}

SparkSubmitArguments方法

在这个类中,初始化了很多我们的启动参数,比如下面这些熟悉的

var master: String = null  
var deployMode: String = null
var executorMemory: String = null
var executorCores: String = null
var totalExecutorCores: String = null

其中为这些属性赋值的是下面这个代码块

// Set parameters from command line arguments
// 解析命令行中传出的参数
try {
    parse(args.asJava)
} catch {
    case e: IllegalArgumentException =>
    SparkSubmit.printErrorAndExit(e.getMessage())
}

parse调用handle(只截取了一部分)

override protected def handle(opt: String, value: String): Boolean = {
    opt match {
      case NAME =>
        name = value

      case MASTER =>
        master = value

      case CLASS =>
        mainClass = value

      case DEPLOY_MODE =>
        if (value != "client" && value != "cluster") {
          SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
        }
        deployMode = value

      case NUM_EXECUTORS =>
        numExecutors = value

      case TOTAL_EXECUTOR_CORES =>
        totalExecutorCores = value

当初我们也有一些不是命令行传入的参数,比如配置文件。就会调用loadEnvironmentArguments来加载

//大概就是下面这些
master = Option(master)
.orElse(sparkProperties.get("spark.master"))
.orElse(env.get("MASTER"))
.orNull
driverExtraClassPath = Option(driverExtraClassPath)
.orElse(sparkProperties.get("spark.driver.extraClassPath"))
.orNull
driverExtraJavaOptions = Option(driverExtraJavaOptions)
.orElse(sparkProperties.get("spark.driver.extraJavaOptions"))
.orNull
driverExtraLibraryPath = Option(driverExtraLibraryPath)
.orElse(sparkProperties.get("spark.driver.extraLibraryPath"))
.orNull
driverMemory = Option(driverMemory)
.orElse(sparkProperties.get("spark.driver.memory"))
.orElse(env.get("SPARK_DRIVER_MEMORY"))
.orNull
driverCores = Option(driverCores)
.orElse(sparkProperties.get("spark.driver.cores"))
.orNull

//还会配置这样一些东西
// Global defaults. These should be keep to minimum to avoid confusing behavior.
master = Option(master).getOrElse("local[*]") //本地并行度

// In YARN mode, app name can be set via SPARK_YARN_APP_NAME (see SPARK-5222)
if (master.startsWith("yarn")) {
    name = Option(name).orElse(env.get("SPARK_YARN_APP_NAME")).orNull
}

// Set name from main class if not given
name = Option(name).orElse(Option(mainClass)).orNull
if (name == null && primaryResource != null) {
    name = Utils.stripDirectory(primaryResource)
}

// Action should be SUBMIT unless otherwise specified
// 提交类型如果没设定,就默认Submit
action = Option(action).getOrElse(SUBMIT)

至次启动参数都会装配好,然后回到SparkSubmit中

appArgs.action match {
    case SparkSubmitAction.SUBMIT => submit(appArgs)
    case SparkSubmitAction.KILL => kill(appArgs)
    case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}

进入Submit方法

@tailrec
private def submit(args: SparkSubmitArguments): Unit = {
    val (childArgs, childClasspath, sysProps, childMainClass) = 		       prepareSubmitEnvironment(args)  //准备运行环境

    def doRunMain(): Unit = {
        // 1. 普通的启动
        runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
        // 2. isStandaloneCluster
        runMain()
        // 3. In all other modes, just run the main class as prepared
        doRunMain()
    }
}

prepareSubmitEnvironment方法

由于生产模式下一般都是yarn-cluster 我们就先只关注这个模式

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
// 在 yarn集群模式下,Clinet只是一个我们主类(我们自己写的Job)的包装器
if (isYarnCluster) {
    childMainClass = "org.apache.spark.deploy.yarn.Client"
}

runMain方法

mainClass = Utils.classForName(childMainClass)   // 反射加载运行的类
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)  //获得运行的类的main方法
mainMethod.invoke(null, childArgs.toArray)  // 反射调用

这时候我们知道我们的主类是**“org.apache.spark.deploy.yarn.Client”**

org.apache.spark.deploy.yarn.Client

所以我们看看它的main方法

private object Client extends Logging {

  def main(argStrings: Array[String]) {
    System.setProperty("SPARK_YARN_MODE", "true")
    val sparkConf = new SparkConf
    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
    // so remove them from sparkConf here for yarn mode.
    sparkConf.remove("spark.jars")
    sparkConf.remove("spark.files")
    val args = new ClientArguments(argStrings)
    new Client(args, sparkConf).run()
  }

ClientArguments()里面配置了–jar --class这些基本信息。这就体现了实际到时候调用的是我们提交的jar包,这个Client只是一个包装器(装饰者)

new Client()

// 1. 创建YarnClient
private val yarnClient = YarnClient.createYarnClient

public static YarnClient createYarnClient() {
    YarnClient client = new YarnClientImpl();
    return client;
}

// 2. 与yarn集群连接的配置信息
protected ApplicationClientProtocol rmClient;
protected InetSocketAddress rmAddress;
protected long statePollIntervalMillis;

private static final String ROOT = "root";

public YarnClientImpl() {
    super(YarnClientImpl.class.getName());
}

run方法

def run(): Unit = {
    this.appId = submitApplication()   //提交应用了,并获得全局唯一的Appid
}
submitApplication()
def submitApplication(): ApplicationId = {
    var appId: ApplicationId = null
    // 启动YarnClient的后台,用来获取任务执行的状态
    launcherBackend.connect()  
    // Setup the credentials before doing anything else,
    // so we have don't have issues at any point.
    // 1. 启动与Yarn集群的连接了
    setupCredentials()
    yarnClient.init(yarnConf)
    yarnClient.start()
    
    // 2. 新建任务
    val newApp = yarnClient.createApplication()
    val newAppResponse = newApp.getNewApplicationResponse()
    appId = newAppResponse.getApplicationId()

    new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
                      Option(appId.toString)).setCurrentContext()

    // Verify whether the cluster has enough resources for our AM
    verifyClusterResources(newAppResponse)

    // Set up the appropriate contexts to launch our AM
    
    // 3. 创建我们的应用
    // containerContext 封装了我们任务运行的指令(jvm运行指令)
    //if (isClusterMode) {
    //    集群模式起 ApplicationMaster
    //    Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
    //  }else {
    //    客户端模式起 ExecutorLauncher
    //    Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
    //  }
    //结果就是 bin/java org.apache.spark.deploy.yarn.ApplicationMaster  
    //表示提交以后要创建ApplicationMaster进程
    val containerContext = createContainerLaunchContext(newAppResponse)
    // appContext 
    val appContext = createApplicationSubmissionContext(newApp, containerContext)

    // Finally, submit and monitor the application
    logInfo(s"Submitting application $appId to ResourceManager")
    
    // 4. 提交任务去yarn集群执行了
    yarnClient.submitApplication(appContext)
    
    launcherBackend.setAppId(appId.toString)
    reportLauncherState(SparkAppHandle.State.SUBMITTED)

    appId
}

org.apache.spark.deploy.yarn.ApplicationMaster

main方法

def main(args: Array[String]): Unit = {
    //和之前套路一样,封装我们的命令行参数。
    val amArgs = new ApplicationMasterArguments(args)
    SparkHadoopUtil.get.runAsSparkUser { () =>
        // 创建ApplicationMaster
        // 创建YarnRMClient  ResourceManager连接对象
        master = new ApplicationMaster(amArgs, new YarnRMClient)
        //调用run方法,run方法执行以后程序退出
        System.exit(master.run())
    }
}

new ApplicationMaster(amArgs, new YarnRMClient)

rpcEnv		//消息通信
amEndpoint  //Application终端

run()

runDriver(securityMgr)
↓
userClassThread = startUserApplication()
    ↓
    # 获取用户提交的类的main方法
    val mainMethod = userClassLoader.loadClass(args.userClass)
    .getMethod("main", classOf[Array[String]])

    val userThread = new Thread {
         mainMethod.invoke(null, userArgs.toArray)
    }
    userThread.setContextClassLoader(userClassLoader)
    # 通过线程执行main方法,并命名为Driver
    # 这里就体现了为什么我们总把我们写的Job叫Driver,因为底层它被反射调用并通过一个线程来运行
    # 而且线程的名字叫Driver
    # 这里还可以看出,我们yarn集群模式下,Driver是运行在ApplicationMaster上的
    userThread.setName("Driver")
    userThread.start()
↓
registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
#向ApplicationMaster注册,可以看出是通过rpc交互的
	↓
	# 这个Client是client: YarnRMClient  ResourceManager的客户端
	#所以这里是AM向RM注册来申请资源的
	allocator = client.register
	# 获得资源,并分配
	allocator.allocateResources()  
		↓
		#获得Container(执行任务的容器)
		val allocatedContainers = allocateResponse.getAllocatedContainers()
		#规划容器的位置。
		handleAllocatedContainers(allocatedContainers.asScala)
			↓
			#运行Container
			runAllocatedContainers(containersToUse)
				↓
				#从线程池里拿一个线程并执行
				launcherPool.execute
				#执行的是一个ExecutorRunnable
				new ExecutorRunnable.run()
					↓
					#建立rpc交互对象
					var rpc: YarnRPC = YarnRPC.create(conf)
					#跟NodeManager做交互
  					var nmClient: NMClient = _
  					#与NM获得连接后
  					nmClient = NMClient.createNMClient()
    				nmClient.init(conf)
    				nmClient.start()
    				#启动container
  					startContainer()
  						↓
  						#准备启动指令
  						#这里面实际就是封装的Container上JVM启动的参数
  						#启动一个特殊的进程(类)
  						#org.apache.spark.executor.CoarseGrainedExecutorBackend
  						prepareCommand()
↓
#表示,如果我们job的main方法不执行完,当前线程就阻塞住
userClassThread.join() (这个方法在runDriver里面)

org.apache.spark.executor.CoarseGrainedExecutorBackend

main方法

run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)

run方法

val env = SparkEnv.createExecutorEnv(
    driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)
//new CoarseGrainedExecutorBackend对象,启动Executor计算
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
    env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
workerUrl.foreach { url =>
    env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
new CoarseGrainedExecutorBackend
def onstart(){
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      driver = Some(ref)
        // 向Driver反向注册 RegisterExecutor
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
    }(ThreadUtils.sameThread).onComplete {
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      case Success(msg) =>
        // Always receive `true`. Just ignore it
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
}


receive(){
    //反向注册收到回复后,启动Executor
    case RegisteredExecutor =>
        executor = new Executor(executorId, hostname, env, userClassPath, 
	//收到启动Task的回复后,启动Executor计算
   case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
          //计算
        executor.launchTask(this, taskDesc)
      }
}

图解过程

Spark on Yarn分析

相关标签: Spark源码