欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SparkContext源码分析

程序员文章站 2024-02-23 09:17:34
...

Spark源码是1.6.0版本

今天我们来分析一下SparkContext
SparkContext主要有三个功能:

  1. TaskScheduler task调度器 (主要讲)
  2. DAGScheduler stage调度器
  3. SparkUI 显示application的运行状态

SparkContext源码分析

TaskScheduler

// Create and start the scheduler
  val (sched, ts) = SparkContext.createTaskScheduler(this, master)

我们以standalone模式为例,进入createTaskScheduler

      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(ba

创建TaskSchedulerImpl对象,TaskSchedulerImpl是TaskScheduler实现类,TaskScheduler是一个接口,接下来创建了一个SparkDeploySchedulerBackend对象,然后初始化scheduler,

  def initialize(backend: SchedulerBackend) {
    this.backend = backend
    // temporarily set rootPool name to empty
    rootPool = new Pool("", schedulingMode, 0, 0)
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
      }
    }
    schedulableBuilder.buildPools()
  }

初始化的工作是给TaskSchedulerImpl赋值,然后设置调度器的调度模式FIFO、Fair,创建一个Scheduler的调度池,给TaskSchedulerImpl赋值后,程序会进入SparkDeploySchedulerBackend

    /**
      *
      *     
      *      ApplicationDescription它就代表当前执行的这个application的一些情况
      *      包括application最大需要多少cpu core 每个slave上需要多少内存
      *
      */

    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
      command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)

    //创建appclient application和spark通信的组件
    client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()

然后跳到AppClient中的start方法

  def start() {
    // Just launch an rpcEndpoint; it will call back into the listener.
    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
  }

接着进入ClientEndpoint类中的registerWithMaster方法,registerWithMaster又调用tryRegisterAllMasters,进入

 masterRef.send(RegisterApplication(appDescription, self))

在tryRegisterAllMasters调用RegisterApplication,RegisterApplication把application的信息注册到TaskSchedulerImpl

相关标签: spark 源码