Spark源码分析 集群架构介绍和SparkContext源码解析
源码分析-1集群架构介绍和sparkcontext源码分析">Spark源码分析-1.集群架构介绍和SparkContext源码分析
在分析Spark源码之前,有必要把Spark的集群架构和SparkContext复习下,有助于后面对源码的理解。
Spark集群架构
本文的集群架构是建立在Standalone模式的,Yarn和Mesos类似
Spark架构:
名词解释:
Client:客户端进程,负责提交作业到Master。Client进程运行 在执行spark-submit命令的节点上
Master:Standalone模式中主控节点,负责接收Client提交的作业,管理Worker,并命令Worker启动Driver和Executor。通过启动start-master.sh启动
Worker:Standalone模式中slave节点上的守护进程,负责管理本节点的资源,定期向Master汇报心跳,接收Master的命令,启动Driver和Executor。通过start-slaves.sh启动,可以通过start-all.sh同时启动Master和Worker
Driver: 一个Spark作业运行时包括一个Driver进程,也是作业的主进程,负责作业的解析、生成Stage并调度Task到Executor上。包括DAGScheduler,TaskSchedule等。
Executor:Worker Node上的一个进程。为APP运行Task真正执行作业的地方,一个集群一般包含多个Executor,每个Executor接收Driver的命令Launch Task,一个Executor可以执行一到多个Task。
Job:一次action操作会触发RDD的延迟计算,我们把这样的一次计算称作一个Job。
Stage:一个Spark Job一般包含一到多个Stage,通过宽依赖划分Stage,后面会提到。
Task:发送到Executor的工作单元,一个Stage包含一到多个Task,通过多个Task实现并行运行的功能。
DAGScheduler: 实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Task set放到TaskScheduler中。
TaskScheduler:实现Task分配到Executor上执行。
QA:
问:Master和Worker可不可以在一个节点
答:可以,在slaves配置文件中添加当前主机名或者IP就行了。
问:Driver运行在哪里
答:根据deploy-mode的不同,Driver运行在不同的机器节点上,如果是cluster模式,Driver运行在Worker Node上,也就是slave节点;如果是client模式,Driver运行在Client Node上。当然Client和Worker也可以是一个Node,比如在slave节点上通过spark-submit提交APP。
SparkContext
SparkContext是Spark程序的主入口,代表一个连接到Spark集群的连接,能被用来在集群上创建RDDs、计数器(accumulators)和广播(broadcast)变量等。一个JVM只能有一个active的SparkContext,创建新的SparkContext之前必须使用stop函数停止该active SparkContext。
我们看下SparkContext的源码
SparkContext.scala文件结构
SparkContext包含了SparkContext类及其伴生对象,SparkMasterRegex对象,这个对象用来处理SparkMaster正则表达式的,后面两对半生类和伴生对象暂时用不上。
SparkContext的源码很长,2906行,我们选取部分分析
创建TaskScheduler和DAGScheduler
// Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
先创建了TaskScheduler,然后new了一个DAGScheduler,顺序不能改变,因为DAGScheduler的初始化需要TaskScheduler
DAGScheduler.scala
def this(sc: SparkContext) = this(sc, sc.taskScheduler)
我们看下TAskScheduler的代码,内部是根据master的不同执行相关操作、生成相应的TaskScheduler。
分别是
1. case “local”:本地单线程运行模式
2. case LOCAL_N_REGEX(threads):本地多线程运行,指定核数,*代表所有的核数
3. case LOCAL_N_FAILURES_REGEX:本地多线程运行,指定核数以及失败重试次数
4. case SPARK_REGEX(sparkUrl):匹配Spark Standalone运行模式
5. case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave):匹配local-cluster运行模式即伪分布模式
6. case masterUrl:匹配其他模式,如Yarn和Mesos
private def createTaskScheduler( sc: SparkContext, master: String, deployMode: String): (SchedulerBackend, TaskScheduler) = { import SparkMasterRegex._ // When running locally, don't try to re-execute tasks on failure. val MAX_LOCAL_TASK_FAILURES = 1 master match { case "local" => val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1) scheduler.initialize(backend) (backend, scheduler) case LOCAL_N_REGEX(threads) => def localCpuCount: Int = Runtime.getRuntime.availableProcessors() // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. val threadCount = if (threads == "*") localCpuCount else threads.toInt if (threadCount <= 0) { throw new SparkException(s"Asked to run locally with $threadCount threads") } val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => def localCpuCount: Int = Runtime.getRuntime.availableProcessors() // local[*, M] means the number of cores on the computer with M failures // local[N, M] means exactly N threads with M failures val threadCount = if (threads == "*") localCpuCount else threads.toInt val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) case SPARK_REGEX(sparkUrl) => val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) (backend, scheduler) case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang. val memoryPerSlaveInt = memoryPerSlave.toInt if (sc.executorMemory > memoryPerSlaveInt) { throw new SparkException( "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format( memoryPerSlaveInt, sc.executorMemory)) } val scheduler = new TaskSchedulerImpl(sc) val localCluster = new LocalSparkCluster( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf) val masterUrls = localCluster.start() val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => { localCluster.stop() } (backend, scheduler) case masterUrl => val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr case None => throw new SparkException("Could not parse Master URL: '" + master + "'") } try { val scheduler = cm.createTaskScheduler(sc, masterUrl) val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler) } catch { case se: SparkException => throw se case NonFatal(e) => throw new SparkException("External scheduler cannot be instantiated", e) } } }