欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

Spark源码分析 集群架构介绍和SparkContext源码解析

程序员文章站 2022-07-07 22:33:50
源码分析-1集群架构介绍和sparkcontext源码分析">Spark源码分析-1.集群架构介绍和SparkContext源码分析 在分析Spark源码之前,有必要把Spark的集群架构和...

在分析Spark源码之前,有必要把Spark的集群架构和SparkContext复习下,有助于后面对源码的理解。

Spark集群架构

本文的集群架构是建立在Standalone模式的,Yarn和Mesos类似

Spark架构:
Spark源码分析 集群架构介绍和SparkContext源码解析

名词解释:

Client:客户端进程,负责提交作业到Master。Client进程运行 在执行spark-submit命令的节点上

Master:Standalone模式中主控节点,负责接收Client提交的作业,管理Worker,并命令Worker启动Driver和Executor。通过启动start-master.sh启动

Worker:Standalone模式中slave节点上的守护进程,负责管理本节点的资源,定期向Master汇报心跳,接收Master的命令,启动Driver和Executor。通过start-slaves.sh启动,可以通过start-all.sh同时启动Master和Worker

Driver: 一个Spark作业运行时包括一个Driver进程,也是作业的主进程,负责作业的解析、生成Stage并调度Task到Executor上。包括DAGScheduler,TaskSchedule等。

Executor:Worker Node上的一个进程。为APP运行Task真正执行作业的地方,一个集群一般包含多个Executor,每个Executor接收Driver的命令Launch Task,一个Executor可以执行一到多个Task。

Job:一次action操作会触发RDD的延迟计算,我们把这样的一次计算称作一个Job。

Stage:一个Spark Job一般包含一到多个Stage,通过宽依赖划分Stage,后面会提到。

Task:发送到Executor的工作单元,一个Stage包含一到多个Task,通过多个Task实现并行运行的功能。

DAGScheduler: 实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Task set放到TaskScheduler中。

TaskScheduler:实现Task分配到Executor上执行。

QA:

问:Master和Worker可不可以在一个节点
答:可以,在slaves配置文件中添加当前主机名或者IP就行了。

问:Driver运行在哪里
答:根据deploy-mode的不同,Driver运行在不同的机器节点上,如果是cluster模式,Driver运行在Worker Node上,也就是slave节点;如果是client模式,Driver运行在Client Node上。当然Client和Worker也可以是一个Node,比如在slave节点上通过spark-submit提交APP。

SparkContext

SparkContext是Spark程序的主入口,代表一个连接到Spark集群的连接,能被用来在集群上创建RDDs、计数器(accumulators)和广播(broadcast)变量等。一个JVM只能有一个active的SparkContext,创建新的SparkContext之前必须使用stop函数停止该active SparkContext。

我们看下SparkContext的源码

SparkContext.scala文件结构
Spark源码分析 集群架构介绍和SparkContext源码解析

SparkContext包含了SparkContext类及其伴生对象,SparkMasterRegex对象,这个对象用来处理SparkMaster正则表达式的,后面两对半生类和伴生对象暂时用不上。

SparkContext的源码很长,2906行,我们选取部分分析

创建TaskScheduler和DAGScheduler

// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

先创建了TaskScheduler,然后new了一个DAGScheduler,顺序不能改变,因为DAGScheduler的初始化需要TaskScheduler

DAGScheduler.scala

def this(sc: SparkContext) = this(sc, sc.taskScheduler)

我们看下TAskScheduler的代码,内部是根据master的不同执行相关操作、生成相应的TaskScheduler。
分别是
1. case “local”:本地单线程运行模式
2. case LOCAL_N_REGEX(threads):本地多线程运行,指定核数,*代表所有的核数
3. case LOCAL_N_FAILURES_REGEX:本地多线程运行,指定核数以及失败重试次数
4. case SPARK_REGEX(sparkUrl):匹配Spark Standalone运行模式
5. case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave):匹配local-cluster运行模式即伪分布模式
6. case masterUrl:匹配其他模式,如Yarn和Mesos

  private def createTaskScheduler(
      sc: SparkContext,
      master: String,
      deployMode: String): (SchedulerBackend, TaskScheduler) = {
    import SparkMasterRegex._

    // When running locally, don't try to re-execute tasks on failure.
    val MAX_LOCAL_TASK_FAILURES = 1

    master match {
      case "local" =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_N_REGEX(threads) =>
        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
        // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
        if (threadCount <= 0) {
          throw new SparkException(s"Asked to run locally with $threadCount threads")
        }
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
        // local[*, M] means the number of cores on the computer with M failures
        // local[N, M] means exactly N threads with M failures
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
        val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
        // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
        val memoryPerSlaveInt = memoryPerSlave.toInt
        if (sc.executorMemory > memoryPerSlaveInt) {
          throw new SparkException(
            "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
              memoryPerSlaveInt, sc.executorMemory))
        }

        val scheduler = new TaskSchedulerImpl(sc)
        val localCluster = new LocalSparkCluster(
          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
        val masterUrls = localCluster.start()
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
          localCluster.stop()
        }
        (backend, scheduler)

      case masterUrl =>
        val cm = getClusterManager(masterUrl) match {
          case Some(clusterMgr) => clusterMgr
          case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
        }
        try {
          val scheduler = cm.createTaskScheduler(sc, masterUrl)
          val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
          cm.initialize(scheduler, backend)
          (backend, scheduler)
        } catch {
          case se: SparkException => throw se
          case NonFatal(e) =>
            throw new SparkException("External scheduler cannot be instantiated", e)
        }
    }
  }