SparkContext源码分析
程序员文章站
2024-02-23 09:17:34
...
Spark源码是1.6.0版本
今天我们来分析一下SparkContext
SparkContext主要有三个功能:
- TaskScheduler task调度器 (主要讲)
- DAGScheduler stage调度器
- SparkUI 显示application的运行状态
TaskScheduler
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
我们以standalone模式为例,进入createTaskScheduler
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(ba
创建TaskSchedulerImpl对象,TaskSchedulerImpl是TaskScheduler实现类,TaskScheduler是一个接口,接下来创建了一个SparkDeploySchedulerBackend对象,然后初始化scheduler,
def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
}
}
schedulableBuilder.buildPools()
}
初始化的工作是给TaskSchedulerImpl赋值,然后设置调度器的调度模式FIFO、Fair,创建一个Scheduler的调度池,给TaskSchedulerImpl赋值后,程序会进入SparkDeploySchedulerBackend
/**
*
*
* ApplicationDescription它就代表当前执行的这个application的一些情况
* 包括application最大需要多少cpu core 每个slave上需要多少内存
*
*/
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
//创建appclient application和spark通信的组件
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
然后跳到AppClient中的start方法
def start() {
// Just launch an rpcEndpoint; it will call back into the listener.
endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
}
接着进入ClientEndpoint类中的registerWithMaster方法,registerWithMaster又调用tryRegisterAllMasters,进入
masterRef.send(RegisterApplication(appDescription, self))
在tryRegisterAllMasters调用RegisterApplication,RegisterApplication把application的信息注册到TaskSchedulerImpl