【Spark】编程实战之模拟SparkRPC原理实现自定义RPC
1. 什么是RPC
RPC(Remote Procedure Call)远程过程调用。在Hadoop和Spark中都使用了PRC,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。简单来说,就是有A、B两台机器,A机器可以调用B机器上的程序。
2. Spark 的RPC
Master和Worker的启动流程:
(1) 启动Master,会启动一个定时器,定时检查超时的Worker,并移除超时Worker信息。
(2) 启动Worker,向Master发送注册信息。
(3) Master收到Worker发来的注册信息后,保存到内存中,并返回一个响应信息,这个信息就是自己的masterUrl。
(4) Worker接收到Master发来的响应信息(masterUrl)之后,保存到内存中,并开启一个定时器,定时向Master发送心跳信息。
(5) Master 不断的接收Worker发来的心跳信息,并将每个Worker的最后一次心跳时间为当前接收到心跳信息的时间。
流程如下图。
3. 编程实战
3.1 项目代码(Scala语言)
WorkInfo.scala
package com.nova.rpc /** * @author Supernova * @date 2018/06/15 */ class WorkerInfo(val id: String, val host: String, val port: Int,val memory: Int, val cores: Int) { // 记录最后一次心跳时间 var lastHeartbeatTime: Long = _ }
RemoteMsg.scala
package com.nova.rpc /** * @author Supernova * @date 2018/06/15 */ trait RemoteMsg extends Serializable{ } // Master 向自己发送检查超时Worker的信息 case object CheckTimeOutWorker // Worker向Master发送的注册信息 case class RegisterWorker(id: String, host: String,port: Int, memory: Int, cores: Int) extends RemoteMsg // Master向Worker发送的响应信息 case class RegisteredWorker(masterUrl: String) extends RemoteMsg // Worker向Master发送的心跳信息 case class Heartbeat(workerId: String) extends RemoteMsg // Worker向自己发送的要执行发送心跳信息的消息 case object SendHeartbeat
Master.scala
package com.nova.rpc import akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.{Config, ConfigFactory} import scala.collection.mutable import scala.concurrent.duration._ /** * @author Supernova * @date 2018/06/15 */ class Master(val masterHost: String, val masterPort: Int) extends Actor{ // 用来存储Worker的注册信息: <workerId, WorkerInfo> val idToWorker = new mutable.HashMap[String, WorkerInfo]() // 用来存储Worker的信息,必须使用可变的HashSet val workers = new mutable.HashSet[WorkerInfo]() // Worker的超时时间间隔 val checkInterval: Long = 15000 /** * 重写生命周期preStart方法 * 作用:当Master启动时,开启定时器,定时检查超时Worker */ override def preStart(): Unit = { // 启动定时器,定时检查超时的Worker import context.dispatcher context.system.scheduler.schedule(0 millis,checkInterval millis, self,CheckTimeOutWorker) } /** * 重写生命周期receive方法 * 作用: * 1.接收Worker发来的注册信息 * 2.不断接收Worker发来的心跳信息,并更新最后一次心跳时间 * 3.过滤出超时的Worker并移除 */ override def receive = { // 接收Worker给Master发送过来的注册信息 case RegisterWorker(id, host, port, memory, cores) => { //判断改Worker是否已经注册过,已注册的不执行任何操作,未注册的将进行注册 if (!idToWorker.contains(id)) { val workerInfo = new WorkerInfo(id, host, port, memory, cores) idToWorker += (id -> workerInfo) workers += workerInfo println("一个新的Worker注册成功") //向Worker发送响应信息,将masterUrl返回 sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_SYSTEM}" + s"@${masterHost}:${masterPort}/user/${Master.MASTER_ACTOR}") } } //接收Worker发来的心跳信息 case Heartbeat(workerId) => { // 通过传输过来的workerId获取对应的WorkerInfo val workerInfo = idToWorker(workerId) // 获取当前时间 val currentTime = System.currentTimeMillis() // 更新最后一次心跳时间 workerInfo.lastHeartbeatTime = currentTime } //检查超时Worker并移除 case CheckTimeOutWorker => { val currentTime = System.currentTimeMillis() // 把超时的Worker过滤出来 val toRemove: mutable.HashSet[WorkerInfo] = workers.filter(w => currentTime - w.lastHeartbeatTime > checkInterval) // 将超时的Worker移除 toRemove.foreach(deadWorker => { idToWorker -= deadWorker.id workers -= deadWorker }) } println(s"当前Worker的数量: ${workers.size}") } } object Master{ val MASTER_SYSTEM = "MasterSystem" val MASTER_ACTOR = "Master" def main(args: Array[String]): Unit = { val host = args(0) // 通过main方法参数制定master主机名 val port = args(1).toInt //通过main方法参数指定Master的端口号 //akka配置信息 val configStr: String = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin // 配置创建Actor需要的配置信息 val config: Config = ConfigFactory.parseString(configStr) // 创建ActorSystem val actorSystem: ActorSystem = ActorSystem(MASTER_SYSTEM, config) // 用actorSystem实例创建Actor actorSystem.actorOf(Props(new Master(host, port)), MASTER_ACTOR) actorSystem.awaitTermination() } }
Worker.scala
package com.nova.rpc import java.util.UUID import akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.{Config, ConfigFactory} import scala.concurrent.duration._ /** * @author Supernova * @date 2018/06/15 */ class Worker(val host: String, val port: Int, val masterHost: String,val masterPort: Int, val memory: Int, val cores: Int) extends Actor{ // 生成一个Worker ID val workerId: String = UUID.randomUUID().toString // 用来存储MasterUrl var masterUrl: String = _ // 心跳时间间隔 val heartbeat_interval: Long = 10000 // Master的Actor var master: ActorSelection = _ /** * 生命周期preStart方法 * 作用:当启动Worker时,向master发送注册信息 */ override def preStart(): Unit = { // 获取Master的Actor master = context.actorSelection(s"akka.tcp://${Master.MASTER_SYSTEM}" + s"@${masterHost}:${masterPort}/user/${Master.MASTER_ACTOR}") master ! RegisterWorker(workerId, host, port, memory, cores) } /** * 生命周期receive方法 * 作用: * 定时向Master发送心跳信息 */ override def receive: Receive = { // Worker接收到Master发送过来的注册成功的信息(masterUrl) case RegisteredWorker(masterUrl) => { this.masterUrl = masterUrl // 启动一个定时器, 定时的给Master发送心跳 import context.dispatcher context.system.scheduler.schedule( 0 millis, heartbeat_interval millis, self, SendHeartbeat) } case SendHeartbeat => { // 向Master发送心跳信息 master ! Heartbeat(workerId) } } } object Worker{ val WORKER_SYSTEM = "WorkerSystem" val WORKER_ACTOR = "Worker" def main(args: Array[String]): Unit = { /** * 通过main方法参数指定相应的 * worker主机名、端口号,master主机名、端口号,使用的内存和核数 */ val host = args(0) val port = args(1).toInt val masterHost = args(2) val masterPort = args(3).toInt val memory = args(4).toInt val cores = args(5).toInt //akka配置信息 val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin // 配置创建Actor需要的配置信息 val config: Config = ConfigFactory.parseString(configStr) // 创建ActorSystem val actorSystem: ActorSystem = ActorSystem(WORKER_SYSTEM, config) // 用actorSystem实例创建Actor actorSystem.actorOf(Props(new Worker( host, port, masterHost, masterPort, memory, cores)), WORKER_ACTOR) actorSystem.awaitTermination() } }
3.2 测试运行
由于Master 和Worker的运行都是使用main方法参数传入相应的主机名端口等参数,所以在运行前要在IDEA中的Edit Configurations 窗口中传入相应的参数。在本次测试中,我指定的参数如图:
【Master端】
【Worker端】
【运行结果】
1. 先运行Master,可以看到一旦运行Master,就启动了定时器检查超时Worker,因为还没有Worker进行注册,所以结果一直为0
2. 启动Worker
3. 启动Worker后,再看Master的窗口可以发现Worker注册成功,并且数量为1
4. 关闭Worker,此时Worker已经宕掉了,可以发现Master窗口会收到一条警告信息,并且Master在定时检查超时Worker的时候移除了过期未收到心跳的Worker
上一篇: 数据库优化
下一篇: 【读书笔记】iOS-解析XML