欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

schedulerBackend和taskScheduler的创建之yarn使用技巧

程序员文章站 2022-04-19 15:54:32
1。在下面代码中,指定了yarn模式运行,但是它是怎么调度的呢? PARK_HOME/bin/spark-submit --name "lcc_sparkSql_check&q...

1。在下面代码中,指定了yarn模式运行,但是它是怎么调度的呢?

PARK_HOME/bin/spark-submit --name "lcc_sparkSql_check" --master yarn --class HbaseDataCheck.HbaseDataCheck /home/lcc/hbaseCount/SparkOnHbaseScala.jar

2.研究代码发现masterUrl 匹配所有里面才是使用外部的调度器

是什么就创建什么类型的外部调度器

/**
        // 这里匹配任意值,比如Yarn,mose,m3之类的其他资源管理器,这里以Yarn进行讲解
        */
      case masterUrl =>
        val cm = getClusterManager(masterUrl) match {
          case Some(clusterMgr) => clusterMgr
          case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
        }
        try {
          // 一个集群管理器接口,用于插件外部调度器。 ExternalClusterManager,这个是一个特质,因此看它的继承类或者实现类,
          // 假设是Yarn模式,那么就看YarnClusterManager,然后我们去看看YarnClusterManager
          val scheduler = cm.createTaskScheduler(sc, masterUrl)
          val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
          cm.initialize(scheduler, backend)
          (backend, scheduler)
        } catch {
          case se: SparkException => throw se
          case NonFatal(e) =>
            throw new SparkException("External scheduler cannot be instantiated", e)
        }

主要看看这三行代码

// 一个集群管理器接口,用于插件外部调度器。 ExternalClusterManager,这个是一个特质,因此看它的继承类或者实现类,
          // 假设是Yarn模式,那么就看YarnClusterManager,然后我们去看看YarnClusterManager
          val scheduler = cm.createTaskScheduler(sc, masterUrl)
          val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
          cm.initialize(scheduler, backend)

这里发现createTaskScheduler和createSchedulerBackend调用的都是接口,这是一个集群管理器接口,用于插件外部调度器。

/**
 * A cluster manager interface to plugin external scheduler.
  * 一个集群管理器接口,用于插件外部调度器。
 */
private[spark] trait ExternalClusterManager {

  /**
   * Check if this cluster manager instance can create scheduler components
   * for a certain master URL.
   * @param masterURL the master URL
   * @return True if the cluster manager can create scheduler backend/
   */
  def canCreate(masterURL: String): Boolean

  /**
   * Create a task scheduler instance for the given SparkContext
   * @param sc SparkContext
   * @param masterURL the master URL
   * @return TaskScheduler that will be responsible for task handling
   */
  def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler

  /**
   * Create a scheduler backend for the given SparkContext and scheduler. This is
   * called after task scheduler is created using `ExternalClusterManager.createTaskScheduler()`.
   * @param sc SparkContext
   * @param masterURL the master URL
   * @param scheduler TaskScheduler that will be used with the scheduler backend.
   * @return SchedulerBackend that works with a TaskScheduler
   */
  def createSchedulerBackend(sc: SparkContext,
      masterURL: String,
      scheduler: TaskScheduler): SchedulerBackend

  /**
   * Initialize task scheduler and backend scheduler. This is called after the
   * scheduler components are created
   * @param scheduler TaskScheduler that will be responsible for task handling
   * @param backend SchedulerBackend that works with a TaskScheduler
   */
  def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
}

因为指定的是yarn模式,因此看yarn的实现继承类YarnClusterManager

/**
 * Cluster Manager for creation of Yarn scheduler and backend
  * 集群管理器,用于创建Yarn scheduler调度器和backend
 */
private[spark] class YarnClusterManager extends ExternalClusterManager {

  // 首先在SparkContexty中调用createTaskScheduler方法,因为不知道调用的是谁啊,所以都发给包括Yarn,mose,m3等,这个方法就是判断,是不是创建我啊
  override def canCreate(masterURL: String): Boolean = {
    masterURL == "yarn"
  }

  override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
    sc.deployMode match {
        // 如果是Yarn-cluster
      case "cluster" => new YarnClusterScheduler(sc)
        // 如果是Yarn-client
      case "client" => new YarnScheduler(sc)
      case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
    }
  }

  override def createSchedulerBackend(sc: SparkContext,
      masterURL: String,
      scheduler: TaskScheduler): SchedulerBackend = {
    sc.deployMode match {
      case "cluster" =>
        new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
      case "client" =>
        new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
      case  _ =>
        throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
    }
  }

  override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
    scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
  }
}

这里yarn模式又分为yarn-cluster和yarn-client,这里先看看yarn-cluster

   // 如果是Yarn-cluster
      case "cluster" => new YarnClusterScheduler(sc)

YarnClusterScheduler没看懂,先放在这里

/**
 * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
 * ApplicationMaster, etc is done
 */
private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {

  logInfo("Created YarnClusterScheduler")

  override def postStartHook() {
    ApplicationMaster.sparkContextInitialized(sc)
    super.postStartHook()
    logInfo("YarnClusterScheduler.postStartHook done")
  }

}

然后创建的YarnClusterSchedulerBackend

  case "cluster" =>
        new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)

YarnClusterSchedulerBackend也是没看懂

private[spark] class YarnClusterSchedulerBackend(
    scheduler: TaskSchedulerImpl,
    sc: SparkContext)
  extends YarnSchedulerBackend(scheduler, sc) {

  override def start() {
    val attemptId = ApplicationMaster.getAttemptId
    bindToYarn(attemptId.getApplicationId(), Some(attemptId))
    super.start()
    totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf)
  }

  override def getDriverLogUrls: Option[Map[String, String]] = {
    var driverLogs: Option[Map[String, String]] = None
    try {
      val yarnConf = new YarnConfiguration(sc.hadoopConfiguration)
      val containerId = YarnSparkHadoopUtil.get.getContainerId

      val httpAddress = System.getenv(Environment.NM_HOST.name()) +
        ":" + System.getenv(Environment.NM_HTTP_PORT.name())
      // lookup appropriate http scheme for container log urls
      val yarnHttpPolicy = yarnConf.get(
        YarnConfiguration.YARN_HTTP_POLICY_KEY,
        YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
      )
      val user = Utils.getCurrentUserName()
      val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "https://"
      val baseUrl = s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user"
      logDebug(s"Base URL for logs: $baseUrl")
      driverLogs = Some(Map(
        "stdout" -> s"$baseUrl/stdout?start=-4096",
        "stderr" -> s"$baseUrl/stderr?start=-4096"))
    } catch {
      case e: Exception =>
        logInfo("Error while building AM log links, so AM" +
          " logs link will not appear in application UI", e)
    }
    driverLogs
  }
}

然后如果是yarn-client模式

  // 如果是Yarn-client
      case "client" => new YarnScheduler(sc)

YarnScheduler里面自己看

private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {

  // RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
  if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
    Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
  }

  // By default, rack is unknown
  override def getRackForHost(hostPort: String): Option[String] = {
    val host = Utils.parseHostPort(hostPort)._1
    Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
  }
}

创建的YarnClientSchedulerBackend是主要的处理类

case "client" =>
        new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)

主要看YarnClientSchedulerBackend类的start方法

/**
   * Create a Yarn client to submit an application to the ResourceManager.
   * This waits until the application is running.
    *
    * 创建一个Yarn客户端,向ResourceManager提交一个应用程序。这将等待应用程序运行。
   */
  override def start() {
    val driverHost = conf.get("spark.driver.host") // 获取driver的IP地址
    val driverPort = conf.get("spark.driver.port") // 获取driver的端口号
    val hostport = driverHost + ":" + driverPort
    sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) }  // 为对应UI地址绑定ui对象

    val argsArrayBuf = new ArrayBuffer[String]() // 获取启动参数
    argsArrayBuf += ("--arg", hostport)

    logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
    val args = new ClientArguments(argsArrayBuf.toArray)
    totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(conf) // 获取启动时指定的Executor个数
    client = new Client(args, conf)  // 生成driver端的client
    bindToYarn(client.submitApplication(), None)  // 通过client提交application

    // SPARK-8687: Ensure all necessary properties have already been set before
    // we initialize our driver scheduler backend, which serves these properties
    // to the executors
    super.start()  // 最终调用了CoarseGrainedSchedulerBackend中的start方法
    waitForApplication() // 等待Application开始运行

    // SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver
    // reads the credentials from HDFS, just like the executors and updates its own credentials
    // cache.
    if (conf.contains("spark.yarn.credentials.file")) {
      YarnSparkHadoopUtil.get.startCredentialUpdater(conf)
    }
    monitorThread = asyncMonitorApplication()
    monitorThread.start()
  }