Spark源码阅读:Master接收到ClientActor后,进行worker的资源分配
程序员文章站
2024-01-31 13:49:58
看一下appActor的preStart方法
override def preStart() {
context.system.eventStream.subscribe(se...
看一下appActor的preStart方法
override def preStart() { context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) try { registerWithMaster() } catch { case e: Exception => logWarning("Failed to connect to master", e) markDisconnected() context.stop(self) } }
调用registerWithMaster向Master注册信息:
def registerWithMaster() { tryRegisterAllMasters() import context.dispatcher var retries = 0 registrationRetryTimer = Some { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { Utils.tryOrExit { retries += 1 if (registered) { registrationRetryTimer.foreach(_.cancel()) } else if (retries >= REGISTRATION_RETRIES) { markDead("All masters are unresponsive! Giving up.") } else { tryRegisterAllMasters() } } } } }
看一下tryRegisterAllMaster:
def tryRegisterAllMasters() { for (masterAkkaUrl <- masterAkkaUrls) { logInfo("Connecting to master " + masterAkkaUrl + "...") val actor = context.actorSelection(masterAkkaUrl) actor ! RegisterApplication(appDescription) } }
上面的代码中,遍历出所有的masterAkkaUrl 获取master的Actor连接,向master发送一个注册消息,我们去master中找到匹配的消息。
case RegisterApplication(description) => { if (state == RecoveryState.STANDBY) { // ignore, don't send response } else { logInfo("Registering app " + description.name) val app = createApplication(description, sender) registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id) persistenceEngine.addApplication(app) sender ! RegisteredApplication(app.id, masterUrl) schedule() } }
createApplictaion主要是创建app的描述,放到内存,sender指的是clientActor。
下一篇: C语言实现矩阵乘法