欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

sparkRPC通信框架

程序员文章站 2022-07-03 20:56:22
业务需求 1,master worker 都要启动 2,worker在启动之后,需要向master发送注册请求 附带信息 workerId, cores 内存 可以使用 case cals...
业务需求
1,master worker 都要启动
2,worker在启动之后,需要向master发送注册请求 附带信息 workerId, cores 内存 可以使用 case calss 封装数据
3,master接收到worker的请求信息之后,保存worker的注册信息,向worker发送响应信息(注册成功)
4,worker收到注册成功的信息之后,要定时发送心跳(报活) 定时任务 case class workerId
5,master收到worker发送的心跳信息之后,就要更新worker的心跳时间
6,master启动之后,定时检测worker的状态 (如果检测出来worker挂掉了,那 删除该worker的注册信息) 程序实现
## Master ##
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.collection.mutable


class Master extends Actor{

// 启动定时任务,检测超时的worker,把超时的worker删除掉   本身就是发送给自己的
  override def preStart(): Unit = {

    import scala.concurrent.duration._
    import context.dispatcher
    // 检测的时间 15s >  心跳的时间(10s)
    context.system.scheduler.schedule(0 seconds,15 seconds,self,CheckWorkerStatus)
  }
  //HashMap存放worker的注册信息
  val workersMap = new mutable.HashMap[String,WorkerInfor]()
  override def receive: Receive = {

    case RegisterToMaster(workerHost,memory,cpu) => {
      println(workerHost,memory,cpu)

      val infor: WorkerInfor = new WorkerInfor(workerHost,memory,cpu)
      workersMap(workerHost) = infor
      println(s"There is recieve ${workerHost} register massage: Worker-num=${workersMap.size}")

      //返回注册成功的信息
      sender() ! RegisterSuccess
    }
      //worker注册成功以后开始定时发送心跳信息,master要实时更新新的worker信息
    case HeartBeat(workerHost) => {
      //判断master是否有worker的信息
      if(workersMap.contains(workerHost)){
        //将worker信息替换成最新的
        val workerInfo: WorkerInfor = workersMap(workerHost)

        workerInfo.LastHeartBeatTime = System.currentTimeMillis()
      }
    }
      //检测master中的worker状态,将超时没有更新信息的worker剔除
    case CheckWorkerStatus => {
      //超时规则2次未注册则剔除 当前时间 - worker最后一次注册的时间 > 2次未注册的时间
      val deadWorker = workersMap.filter({
        case (workerHost,info) => {
          System.currentTimeMillis() - info.LastHeartBeatTime > 20*1000
        }
      })
      //剔除已经死亡的worker
      /*deadWorker.foreach({
        case (workerHost,_) => {
          workersMap -= (workerHost)
        }
      })*/
      workersMap --= deadWorker.map(_._1)
      println(s"完成检测,最新的worker的数量为——${workersMap.size}")
    }
  }
}

object Master {
  val MASTER_ACS_NAME = "Master_acs_name"
  val MASTER_AC_NAME = "Master_ac_name"

  def main(args: Array[String]): Unit = {
    if(args.size!=2){
      println("cn.edu360.scala_maven.MasterAndWorker.Master   ")
      sys.exit(1)//输入参数错误时,结束程序
    }
    val Array(masterHost,masterPort) = args
    val src =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = ${masterHost}
         |akka.remote.netty.tcp.port = ${masterPort}
      """.stripMargin
    //获取以个CongigFactory工厂
    val conf: Config = ConfigFactory.parseString(src)
    //创建一个ActorSystem
    val asc: ActorSystem = ActorSystem.create(MASTER_ACS_NAME,conf)

    val masteracname: ActorRef = asc.actorOf(Props(new Master()),MASTER_AC_NAME)
  }
}

## Worker ##

import java.util.UUID

import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

class Worker(val masterHost:String,val masterPost:Int,var memory:Int,var cpu:Int) extends Actor{
  var selection: ActorSelection = null
  val workerHost:String = UUID.randomUUID().toString
  override def preStart(): Unit = {
    println("run preStart")
    //连接master
    val path = s"akka.tcp://${Master.MASTER_ACS_NAME}@${masterHost}:${masterPost}/user/${Master.MASTER_AC_NAME}"
    selection = context.actorSelection(path)
    //向Master发送注册信息
    selection ! RegisterToMaster(workerHost,memory,cpu)
  }
  override def receive: Receive = {
    case RegisterSuccess => {
      println("Register success! begin to start schedule!")

      import scala.concurrent.duration._
      import context.dispatcher

      //启动定时任务
      /**
        * initialDelay: FiniteDuration, 启动延时
          interval:     FiniteDuration, 间隔时间
          receiver:     ActorRef, 信息接收方
          message:      Any       信息内容
        */
      //ActorRef类型不匹配
      //context.system.scheduler.schedule(0 seconds,10 seconds,selection,HeartBeat(workerHost))
      context.system.scheduler.schedule(0 seconds,10 seconds,self,SendHeartBeat)
    }
    case SendHeartBeat => {
      selection ! HeartBeat(workerHost)
      println("worker实时向master发送心跳信息!")
    }
  }
}

object Worker {
  val WORKER_ACS_NAME = "Worker_acs_name"
  val WORKER_AC_NAME = "Worker_ac_name"
  def main(args: Array[String]): Unit = {
    if(args.size!=6){
      println("cn.edu360.scala_maven.MasterAndWorker.Master  masterHost,masterPost,workerHost,workerPost,memory,cpu")
      sys.exit(1)//输入参数错误时,结束程序
    }
    val Array(masterHost,masterPost,workerHost,workerPost,memory,cpu) = args
    val src =
      s"""
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname = ${workerHost}
        |akka.remote.netty.tcp.port = ${workerPost}
      """.stripMargin
    //获取以个CongigFactory工厂
    val conf: Config = ConfigFactory.parseString(src)
    //创建一个ActorSystem
    val acs: ActorSystem = ActorSystem.create(WORKER_ACS_NAME,conf)

    val workeracname: ActorRef = acs.actorOf(Props(new Worker(masterHost,masterPost.toInt,memory.toInt,cpu.toInt)),WORKER_AC_NAME)
  }
}

## 封装类 ##

class MassagePassing {

}
//worker向master发送注册信息
case class RegisterToMaster(workerHost:String,memory:Int,cpu:Int)

//向worker发送注册成功的信息
case object RegisterSuccess

//向master发送心跳信息证明该worker正常工作
case class HeartBeat(workerHost:String)

//向master发送心跳信息证明该worker正常工作
case object SendHeartBeat

//检测worker注册信息
case object CheckWorkerStatus
case class WorkerInfor(val workerHost:String,var memory:Int,var cpu:Int) {
  var LastHeartBeatTime:Long = _
}
windows运行结果
sparkRPC通信框架
sparkRPC通信框架 集群运行
sparkRPC通信框架

sparkRPC通信框架

sparkRPC通信框架

sparkRPC通信框架

sparkRPC通信框架