欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

Spark源码阅读:Master接收到ClientActor后,进行worker的资源分配

程序员文章站 2024-01-31 13:49:58
看一下appActor的preStart方法 override def preStart() { context.system.eventStream.subscribe(se...
看一下appActor的preStart方法
override def preStart() {
  context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
  try {
    registerWithMaster()
  } catch {
    case e: Exception =>
      logWarning("Failed to connect to master", e)
      markDisconnected()
      context.stop(self)
  }
}

调用registerWithMaster向Master注册信息:

def registerWithMaster() {
  tryRegisterAllMasters()
  import context.dispatcher
  var retries = 0
  registrationRetryTimer = Some {
    context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
      Utils.tryOrExit {
        retries += 1
        if (registered) {
          registrationRetryTimer.foreach(_.cancel())
        } else if (retries >= REGISTRATION_RETRIES) {
          markDead("All masters are unresponsive! Giving up.")
        } else {
          tryRegisterAllMasters()
        }
      }
    }
  }
}

看一下tryRegisterAllMaster:

 

def tryRegisterAllMasters() {
  for (masterAkkaUrl <- masterAkkaUrls) {
    logInfo("Connecting to master " + masterAkkaUrl + "...")
    val actor = context.actorSelection(masterAkkaUrl)
    actor ! RegisterApplication(appDescription)
  }
}

上面的代码中,遍历出所有的masterAkkaUrl 获取master的Actor连接,向master发送一个注册消息,我们去master中找到匹配的消息。

case RegisterApplication(description) => {
  if (state == RecoveryState.STANDBY) {
    // ignore, don't send response
  } else {
    logInfo("Registering app " + description.name)
    val app = createApplication(description, sender)
    registerApplication(app)
    logInfo("Registered app " + description.name + " with ID " + app.id)
    persistenceEngine.addApplication(app)
    sender ! RegisteredApplication(app.id, masterUrl)
    schedule()
  }
}

createApplictaion主要是创建app的描述,放到内存,sender指的是clientActor。