sparkRPC通信框架
程序员文章站
2022-07-03 20:56:22
业务需求
1,master worker 都要启动
2,worker在启动之后,需要向master发送注册请求 附带信息 workerId, cores 内存 可以使用 case cals...
业务需求
1,master worker 都要启动
2,worker在启动之后,需要向master发送注册请求 附带信息 workerId, cores 内存 可以使用 case calss 封装数据
3,master接收到worker的请求信息之后,保存worker的注册信息,向worker发送响应信息(注册成功)
4,worker收到注册成功的信息之后,要定时发送心跳(报活) 定时任务 case class workerId
5,master收到worker发送的心跳信息之后,就要更新worker的心跳时间
6,master启动之后,定时检测worker的状态 (如果检测出来worker挂掉了,那 删除该worker的注册信息) 程序实现
## Master ##
集群运行
1,master worker 都要启动
2,worker在启动之后,需要向master发送注册请求 附带信息 workerId, cores 内存 可以使用 case calss 封装数据
3,master接收到worker的请求信息之后,保存worker的注册信息,向worker发送响应信息(注册成功)
4,worker收到注册成功的信息之后,要定时发送心跳(报活) 定时任务 case class workerId
5,master收到worker发送的心跳信息之后,就要更新worker的心跳时间
6,master启动之后,定时检测worker的状态 (如果检测出来worker挂掉了,那 删除该worker的注册信息) 程序实现
## Master ##
import akka.actor.{Actor, ActorRef, ActorSystem, Props} import com.typesafe.config.{Config, ConfigFactory} import akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.collection.mutable class Master extends Actor{ // 启动定时任务,检测超时的worker,把超时的worker删除掉 本身就是发送给自己的 override def preStart(): Unit = { import scala.concurrent.duration._ import context.dispatcher // 检测的时间 15s > 心跳的时间(10s) context.system.scheduler.schedule(0 seconds,15 seconds,self,CheckWorkerStatus) } //HashMap存放worker的注册信息 val workersMap = new mutable.HashMap[String,WorkerInfor]() override def receive: Receive = { case RegisterToMaster(workerHost,memory,cpu) => { println(workerHost,memory,cpu) val infor: WorkerInfor = new WorkerInfor(workerHost,memory,cpu) workersMap(workerHost) = infor println(s"There is recieve ${workerHost} register massage: Worker-num=${workersMap.size}") //返回注册成功的信息 sender() ! RegisterSuccess } //worker注册成功以后开始定时发送心跳信息,master要实时更新新的worker信息 case HeartBeat(workerHost) => { //判断master是否有worker的信息 if(workersMap.contains(workerHost)){ //将worker信息替换成最新的 val workerInfo: WorkerInfor = workersMap(workerHost) workerInfo.LastHeartBeatTime = System.currentTimeMillis() } } //检测master中的worker状态,将超时没有更新信息的worker剔除 case CheckWorkerStatus => { //超时规则2次未注册则剔除 当前时间 - worker最后一次注册的时间 > 2次未注册的时间 val deadWorker = workersMap.filter({ case (workerHost,info) => { System.currentTimeMillis() - info.LastHeartBeatTime > 20*1000 } }) //剔除已经死亡的worker /*deadWorker.foreach({ case (workerHost,_) => { workersMap -= (workerHost) } })*/ workersMap --= deadWorker.map(_._1) println(s"完成检测,最新的worker的数量为——${workersMap.size}") } } } object Master { val MASTER_ACS_NAME = "Master_acs_name" val MASTER_AC_NAME = "Master_ac_name" def main(args: Array[String]): Unit = { if(args.size!=2){ println("cn.edu360.scala_maven.MasterAndWorker.Master ") sys.exit(1)//输入参数错误时,结束程序 } val Array(masterHost,masterPort) = args val src = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = ${masterHost} |akka.remote.netty.tcp.port = ${masterPort} """.stripMargin //获取以个CongigFactory工厂 val conf: Config = ConfigFactory.parseString(src) //创建一个ActorSystem val asc: ActorSystem = ActorSystem.create(MASTER_ACS_NAME,conf) val masteracname: ActorRef = asc.actorOf(Props(new Master()),MASTER_AC_NAME) } }
## Worker ##
import java.util.UUID import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props} import com.typesafe.config.{Config, ConfigFactory} class Worker(val masterHost:String,val masterPost:Int,var memory:Int,var cpu:Int) extends Actor{ var selection: ActorSelection = null val workerHost:String = UUID.randomUUID().toString override def preStart(): Unit = { println("run preStart") //连接master val path = s"akka.tcp://${Master.MASTER_ACS_NAME}@${masterHost}:${masterPost}/user/${Master.MASTER_AC_NAME}" selection = context.actorSelection(path) //向Master发送注册信息 selection ! RegisterToMaster(workerHost,memory,cpu) } override def receive: Receive = { case RegisterSuccess => { println("Register success! begin to start schedule!") import scala.concurrent.duration._ import context.dispatcher //启动定时任务 /** * initialDelay: FiniteDuration, 启动延时 interval: FiniteDuration, 间隔时间 receiver: ActorRef, 信息接收方 message: Any 信息内容 */ //ActorRef类型不匹配 //context.system.scheduler.schedule(0 seconds,10 seconds,selection,HeartBeat(workerHost)) context.system.scheduler.schedule(0 seconds,10 seconds,self,SendHeartBeat) } case SendHeartBeat => { selection ! HeartBeat(workerHost) println("worker实时向master发送心跳信息!") } } } object Worker { val WORKER_ACS_NAME = "Worker_acs_name" val WORKER_AC_NAME = "Worker_ac_name" def main(args: Array[String]): Unit = { if(args.size!=6){ println("cn.edu360.scala_maven.MasterAndWorker.Master masterHost,masterPost,workerHost,workerPost,memory,cpu") sys.exit(1)//输入参数错误时,结束程序 } val Array(masterHost,masterPost,workerHost,workerPost,memory,cpu) = args val src = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = ${workerHost} |akka.remote.netty.tcp.port = ${workerPost} """.stripMargin //获取以个CongigFactory工厂 val conf: Config = ConfigFactory.parseString(src) //创建一个ActorSystem val acs: ActorSystem = ActorSystem.create(WORKER_ACS_NAME,conf) val workeracname: ActorRef = acs.actorOf(Props(new Worker(masterHost,masterPost.toInt,memory.toInt,cpu.toInt)),WORKER_AC_NAME) } }
## 封装类 ##
class MassagePassing { } //worker向master发送注册信息 case class RegisterToMaster(workerHost:String,memory:Int,cpu:Int) //向worker发送注册成功的信息 case object RegisterSuccess //向master发送心跳信息证明该worker正常工作 case class HeartBeat(workerHost:String) //向master发送心跳信息证明该worker正常工作 case object SendHeartBeat //检测worker注册信息 case object CheckWorkerStatus
case class WorkerInfor(val workerHost:String,var memory:Int,var cpu:Int) { var LastHeartBeatTime:Long = _ }windows运行结果
集群运行
上一篇: 软件简单除广告啊D注入开刀
下一篇: Vue 实现展开折叠效果的示例代码