schedulerBackend和taskScheduler的创建之yarn使用技巧
程序员文章站
2024-01-18 16:26:52
1。在下面代码中,指定了yarn模式运行,但是它是怎么调度的呢?
PARK_HOME/bin/spark-submit --name "lcc_sparkSql_check&q...
1。在下面代码中,指定了yarn模式运行,但是它是怎么调度的呢?
PARK_HOME/bin/spark-submit --name "lcc_sparkSql_check" --master yarn --class HbaseDataCheck.HbaseDataCheck /home/lcc/hbaseCount/SparkOnHbaseScala.jar
2.研究代码发现masterUrl 匹配所有里面才是使用外部的调度器
是什么就创建什么类型的外部调度器
/** // 这里匹配任意值,比如Yarn,mose,m3之类的其他资源管理器,这里以Yarn进行讲解 */ case masterUrl => val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr case None => throw new SparkException("Could not parse Master URL: '" + master + "'") } try { // 一个集群管理器接口,用于插件外部调度器。 ExternalClusterManager,这个是一个特质,因此看它的继承类或者实现类, // 假设是Yarn模式,那么就看YarnClusterManager,然后我们去看看YarnClusterManager val scheduler = cm.createTaskScheduler(sc, masterUrl) val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler) } catch { case se: SparkException => throw se case NonFatal(e) => throw new SparkException("External scheduler cannot be instantiated", e) }
主要看看这三行代码
// 一个集群管理器接口,用于插件外部调度器。 ExternalClusterManager,这个是一个特质,因此看它的继承类或者实现类, // 假设是Yarn模式,那么就看YarnClusterManager,然后我们去看看YarnClusterManager val scheduler = cm.createTaskScheduler(sc, masterUrl) val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend)
这里发现createTaskScheduler和createSchedulerBackend调用的都是接口,这是一个集群管理器接口,用于插件外部调度器。
/** * A cluster manager interface to plugin external scheduler. * 一个集群管理器接口,用于插件外部调度器。 */ private[spark] trait ExternalClusterManager { /** * Check if this cluster manager instance can create scheduler components * for a certain master URL. * @param masterURL the master URL * @return True if the cluster manager can create scheduler backend/ */ def canCreate(masterURL: String): Boolean /** * Create a task scheduler instance for the given SparkContext * @param sc SparkContext * @param masterURL the master URL * @return TaskScheduler that will be responsible for task handling */ def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler /** * Create a scheduler backend for the given SparkContext and scheduler. This is * called after task scheduler is created using `ExternalClusterManager.createTaskScheduler()`. * @param sc SparkContext * @param masterURL the master URL * @param scheduler TaskScheduler that will be used with the scheduler backend. * @return SchedulerBackend that works with a TaskScheduler */ def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend /** * Initialize task scheduler and backend scheduler. This is called after the * scheduler components are created * @param scheduler TaskScheduler that will be responsible for task handling * @param backend SchedulerBackend that works with a TaskScheduler */ def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit }
因为指定的是yarn模式,因此看yarn的实现继承类YarnClusterManager
/** * Cluster Manager for creation of Yarn scheduler and backend * 集群管理器,用于创建Yarn scheduler调度器和backend */ private[spark] class YarnClusterManager extends ExternalClusterManager { // 首先在SparkContexty中调用createTaskScheduler方法,因为不知道调用的是谁啊,所以都发给包括Yarn,mose,m3等,这个方法就是判断,是不是创建我啊 override def canCreate(masterURL: String): Boolean = { masterURL == "yarn" } override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { sc.deployMode match { // 如果是Yarn-cluster case "cluster" => new YarnClusterScheduler(sc) // 如果是Yarn-client case "client" => new YarnScheduler(sc) case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") } } override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { sc.deployMode match { case "cluster" => new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) case "client" => new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") } } override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) } }
这里yarn模式又分为yarn-cluster和yarn-client,这里先看看yarn-cluster
// 如果是Yarn-cluster case "cluster" => new YarnClusterScheduler(sc)
YarnClusterScheduler没看懂,先放在这里
/** * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of * ApplicationMaster, etc is done */ private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) { logInfo("Created YarnClusterScheduler") override def postStartHook() { ApplicationMaster.sparkContextInitialized(sc) super.postStartHook() logInfo("YarnClusterScheduler.postStartHook done") } }
然后创建的YarnClusterSchedulerBackend
case "cluster" => new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
YarnClusterSchedulerBackend也是没看懂
private[spark] class YarnClusterSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) extends YarnSchedulerBackend(scheduler, sc) { override def start() { val attemptId = ApplicationMaster.getAttemptId bindToYarn(attemptId.getApplicationId(), Some(attemptId)) super.start() totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf) } override def getDriverLogUrls: Option[Map[String, String]] = { var driverLogs: Option[Map[String, String]] = None try { val yarnConf = new YarnConfiguration(sc.hadoopConfiguration) val containerId = YarnSparkHadoopUtil.get.getContainerId val httpAddress = System.getenv(Environment.NM_HOST.name()) + ":" + System.getenv(Environment.NM_HTTP_PORT.name()) // lookup appropriate http scheme for container log urls val yarnHttpPolicy = yarnConf.get( YarnConfiguration.YARN_HTTP_POLICY_KEY, YarnConfiguration.YARN_HTTP_POLICY_DEFAULT ) val user = Utils.getCurrentUserName() val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "https://" val baseUrl = s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user" logDebug(s"Base URL for logs: $baseUrl") driverLogs = Some(Map( "stdout" -> s"$baseUrl/stdout?start=-4096", "stderr" -> s"$baseUrl/stderr?start=-4096")) } catch { case e: Exception => logInfo("Error while building AM log links, so AM" + " logs link will not appear in application UI", e) } driverLogs } }
然后如果是yarn-client模式
// 如果是Yarn-client case "client" => new YarnScheduler(sc)
YarnScheduler里面自己看
private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { // RackResolver logs an INFO message whenever it resolves a rack, which is way too often. if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) } // By default, rack is unknown override def getRackForHost(hostPort: String): Option[String] = { val host = Utils.parseHostPort(hostPort)._1 Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation) } }
创建的YarnClientSchedulerBackend是主要的处理类
case "client" => new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
主要看YarnClientSchedulerBackend类的start方法
/** * Create a Yarn client to submit an application to the ResourceManager. * This waits until the application is running. * * 创建一个Yarn客户端,向ResourceManager提交一个应用程序。这将等待应用程序运行。 */ override def start() { val driverHost = conf.get("spark.driver.host") // 获取driver的IP地址 val driverPort = conf.get("spark.driver.port") // 获取driver的端口号 val hostport = driverHost + ":" + driverPort sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) } // 为对应UI地址绑定ui对象 val argsArrayBuf = new ArrayBuffer[String]() // 获取启动参数 argsArrayBuf += ("--arg", hostport) logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" ")) val args = new ClientArguments(argsArrayBuf.toArray) totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(conf) // 获取启动时指定的Executor个数 client = new Client(args, conf) // 生成driver端的client bindToYarn(client.submitApplication(), None) // 通过client提交application // SPARK-8687: Ensure all necessary properties have already been set before // we initialize our driver scheduler backend, which serves these properties // to the executors super.start() // 最终调用了CoarseGrainedSchedulerBackend中的start方法 waitForApplication() // 等待Application开始运行 // SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver // reads the credentials from HDFS, just like the executors and updates its own credentials // cache. if (conf.contains("spark.yarn.credentials.file")) { YarnSparkHadoopUtil.get.startCredentialUpdater(conf) } monitorThread = asyncMonitorApplication() monitorThread.start() }
推荐阅读
-
schedulerBackend和taskScheduler的创建之yarn使用技巧
-
PHP小技巧之JS和CSS优化工具Minify的使用方法_PHP
-
iOS开发之topLayoutGuide和bottomLayoutGuide的使用小技巧分享
-
iOS开发之topLayoutGuide和bottomLayoutGuide的使用小技巧分享
-
PHP小技巧之JS和CSS优化工具Minify的使用方法
-
PHP小技巧之JS和CSS优化工具Minify的使用方法
-
PHP小技巧之JS和CSS优化工具Minify的使用方法
-
PHP小技巧之JS和CSS优化工具Minify的使用方法_PHP教程
-
PHP小技巧之JS和CSS优化工具Minify的使用方法_PHP
-
PHP小技巧之JS和CSS优化工具Minify的使用方法