欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

荐 深入理解Spark远程通信组件RPC及消息处理机制

程序员文章站 2022-06-22 13:47:54
1.简介在Spark中,不同组件像driver,executor,worker,master(stanalone模式)之间的通信是基于RPC来实现的。Spark 1.6之前,Spark的RPC是基于Akka来实现的。Akka是一个基于scala语言的异步的消息框架。Spark1.6后,spark借鉴Akka的设计自己实现了一个基于Netty的rpc框架。本文主要对spark1.6之后基于nett......

1.简介

在Spark中,不同组件像driver,executor,worker,master(stanalone模式)之间的通信是基于RPC来实现的。Spark 1.6之前,Spark的RPC是基于Akka来实现的。Akka是一个基于scala语言的异步的消息框架。Spark1.6后,spark借鉴Akka的设计自己实现了一个基于Netty的rpc框架。本文主要对spark1.6之后基于netty新开发的rpc框架做一个较为深入的分析。

2.整体架构

荐
                                                        深入理解Spark远程通信组件RPC及消息处理机制

3.核心组件

3.1 RpcEnv

在介绍RpcEnv之前,我们先介绍SparkEnv。SparkEnv是Spark的执行环境对象,其中包括与众多Executor执行相关的对象。Spark 对任务的计算都依托于 Executor 的能力,所有的 Executor 都有自己的 Spark 的执行环境 SparkEnv。有了 SparkEnv,就可以将数据存储在存储体系中;就能利用计算引擎对计算任务进行处理,就可以在节点间进行通信等。

RpcEnv为RpcEndpoint提供处理消息的环境。RpcEnv负责RpcEndpoint整个生命周期的管理,包括:注册endpoint,endpoint之间消息的路由,以及停止endpoint。
荐
                                                        深入理解Spark远程通信组件RPC及消息处理机制

private[spark] abstract class RpcEnv(conf: SparkConf) {
	/**
     * RPC远程终端查找的默认超时时间,默认为120s
     */
   private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)

   /**
     * 返回已经注册的[[RpcEndpoint]]的RpcEndpointRef。
     * 该方法只用于[[RpcEndpoint.self]]方法实现中。
     * 如果终端相关的[[RpcEndpointRef]]不存在,则返回null。
     */
   private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef

   /**
     * 如果是服务器模式,则返回当前服务器监听的地址;否则为空
     */
   def address: RpcAddress

   /**
     * 使用一个name来注册一个[[RpcEndpoint]],并且返回它的[[RpcEndpointRef]]对象。
     * [[RpcEnv]]并不保证线程安全性。
     */
   def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef

   /**
     * 通过一个URI来异步检索[[RpcEndpointRef]]对象
     */
   def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]

   /**
     * 通过一个URI来同步检索[[RpcEndpointRef]]对象
     */
   def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
       defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
   }

   /**
     * 根据`address` 和 `endpointName`对 [[RpcEndpointRef]]进行同步检索。
     */
   def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = {
       setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString) // URI:
   }

   /**
     * 停止指定的[[RpcEndpoint]]对象。
     */
   def stop(endpoint: RpcEndpointRef): Unit

   /**
     * 异步关闭当前的[[RpcEnv]]。
     * 如果需要确保成功地退出[[RpcEnv]],在执行[[shutdown()]]之后需要调用[[awaitTermination()]]。
     */
   def shutdown(): Unit

   /**
     * 等待直到[[RpcEnv]]退出。
     * TODO do we need a timeout parameter?
     */
   def awaitTermination(): Unit

   /**
     * 如果没有[[RpcEnv]]对象,那么[[RpcEndpointRef]]将不能被反序列化。
     * 因此,如果任何反序列化的对象中包含了[[RpcEndpointRef]],那么这些反序列化的代码都应该在该方法中执行。
     */
   def deserialize[T](deserializationAction: () => T): T

   /**
     * 用于返回文件服务器的实例。
     * 如果RpcEnv不是以服务器模式运行,那么该项可能为null。
     *
     */
   def fileServer: RpcEnvFileServer

   /**
     * 打开一个通道从给定的URI下载文件。
     * 如果由RpcEnvFileServer返回的URI使用"spark"模式,那么该方法将会被工具类调用来进行文件检索。
     *
     * @param uri URI with location of the file.
     */
   def openChannel(uri: String): ReadableByteChannel

}

3.2 RpcEndpoint

RpcEndPoint 代表具体的通信节点,例如Master、Worker、CoarseGrainedSchedulerBackend中的DriverEndpoint、CoarseGrainedExecutorBackend等,都实现了该接口,在具体的函数中定义了消息传递来时的处理逻辑,整个生命周期是constructor -> onStart -> receive* -> onStop,即调用构造函数,然后向RpcEnv注册,内部调用onStart,之后如果收到消息,RpcEnv会调用receive*方法,结束时调用onStop方法,并给出了通信过程中RpcEndpoint所具有的基于事件驱动的行为(连接、断开、网络异常),实际上对于Spark框架来说主要是接收消息并处理

private[spark] trait RpcEndpoint {
  // 当前RpcEndpoint注册的RpcEnv
  val rpcEnv: RpcEnv
  /**
   * 当前[[RpcEndpoint]]的代理,当`onStart`方法被调用时`self`生效,当`onStop`被调用时,`self`变
   * 成null。
   * 注意:在`onStart`方法被调用之前,[[RpcEndpoint]]对象还未进行注册,所以就没有有效的			 *    * [[RpcEndpointRef]]。
   */
  final def self: RpcEndpointRef
  /**
   * 接收由RpcEndpointRef.send方法发送的消息,
   * 该类消息不需要进行响应消息(Reply),而只是在RpcEndpoint端进行处理。
   * 如果接收到一个不匹配的消息,将会抛出SparkException异常,并发送给`onError`。
   */
  def receive: PartialFunction[Any, Unit]
  /**
   * 处理来自`RpcEndpointRef.ask`的消息,RpcEndpoint端处理完消息后,需要给调用RpcEndpointRef.ask    * 的通信端返回响应消息。
   */
  def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]

  //一系列的回调函数
  def onError(cause: Throwable): Unit
  def onConnected(remoteAddress: RpcAddress): Unit
  def onDisconnected(remoteAddress: RpcAddress): Unit
  def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit
  def onStart(): Unit
  def onStop(): Unit

  // 停止RpcEndpoint
  final def stop(): Unit 
}

3.3 RpcEndpointRef

RpcEndpointRef是一个对RpcEndpoint的远程引用对象,内部记录了RpcEndpoint的位置信息,通过它可以向远程的RpcEndpoint端发送消息以进行通信。

private[spark] abstract class RpcEndpointRef(conf: SparkConf)
  extends Serializable with Logging {
  // 最大重连次数(3),重新尝试的等待事件(3s),默认的超时事件(120s)
  private[this] val maxRetries = RpcUtils.numRetries(conf)
  private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
  private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)

  // 对应RpcEndpoint的地址(host:port),名称
  def address: RpcAddress
  def name: String

  // 发送一条单向的异步消息,并且发送消息后不等待响应,亦即Send-and-forget。
  def send(message: Any): Unit
  /**
   * 发送消息给相关的[[RpcEndpoint.receiveAndReply]],并且返回一个 Future,能够在timeout时间内接    * 收回复。
   * 该方法只会发送一次消息,失败后不重试。
   * 而ask方法发送消息后需要等待通信对端给予响应,通过Future来异步获取响应结果。
   */
  def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
      
  // 发送消息到相应的`RpcEndpoint.receiveAndReply`,阻塞等待回复的结果
  def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)
  ....
}

4.收件箱Inbox

从上面的体系结构图可知,InBox作用于服务器端。它与RpcEndpoint是一对一的关系,每一个命名唯一的RpcEndpoint对应一个线程安全的InBox。所有发送给一个RpcEndpoint的消息,都由对应的InBox进行存储。InBox提供一个process方法实现,该方法会在一个dispatcher-event-loop线程池中被调用,将InBox中的消息提供给关联的RpcEndpoint进行消费。

需要注意的是,如果通信端端点的实现是继承自ThreadSafeRpcEndpoint,则表明该Endpoint不允许并发处理消息。如果继承自RpcEndpoint,那么就可以并发的调用该服务。在具体的process方法中,如果enableConcurrent为false,即只允许单线程处理。那么执行process方法时,如果numActiveThreads大于0,说明已经至少有一个线程正在处理,则立即返回,取消本次处理操作。

4.1 InboxMessage

InboxMessage是一个特质,是Inbox中所处理的基本消息单位,OneWayMessage,RpcMessage,OnStart等消息都继承了InboxMessage

4.2 messages

存储InboxMessage的链表

protected val messages = new java.util.LinkedList[InboxMessage]()

4.3 消息处理

def process(dispatcher: Dispatcher): Unit = {
    var message: InboxMessage = null
    inbox.synchronized {
      //enableConcurrent默认为false
      //当前线程被占用时,即返回
      if (!enableConcurrent && numActiveThreads != 0) {
        return
      }
      
      //只有当Inbox中有新消息(InboxMessage)时,才会启用当前线程
      message = messages.poll()
      if (message != null) {
        numActiveThreads += 1
      } else {
        return
      }
    }
    while (true) {
      ...
      //消息处理
      }

      inbox.synchronized {
        //enableConcurrent会被设置为false在onStop()被调用后
        if (!enableConcurrent && numActiveThreads != 1) {
          // If we are not the only one worker, exit
          numActiveThreads -= 1
          return
        }
        message = messages.poll()
        if (message == null) {
          numActiveThreads -= 1
          return
        }
      }
    }
  }

4.4消息发送

def post(message: InboxMessage): Unit = inbox.synchronized {
    if (stopped) {
      // We already put "OnStop" into "messages", so we should drop further messages
      onDrop(message)
    } else {
      messages.add(message)
      false
    }
  }

5.消息转发路由Dispatcher

消息调度器,有效提高NettyRpcEnv对消息的处理并最大提升并行处理,主要负责将RPC消息发送到要对该消息处理的RpcEndpoint.

5.1 EndpointData

EndpointData是一个简单的JavaBean类,RPC端点数据,把RpcEndpoint,RpcEndpointRef,Inbox封装在一起。

private class EndpointData(
      val name: String,
      val endpoint: RpcEndpoint,
      val ref: NettyRpcEndpointRef) {
    val inbox = new Inbox(ref, endpoint)
  }

5.2 receivers

端点数据的阻塞队列(LinkedBlockingQueue),只有当Inbox中有新消息(InboxMessage)时,才会放入到此队列中。

private val receivers = new LinkedBlockingQueue[EndpointData]

5.3 threadpool

用于对消息进行调度的线程池,默认线程池大小为2,线程为MessageLoop

/** Message loop used for dispatching messages. */
  private class MessageLoop extends Runnable {
    override def run(): Unit = {
      try {
        while (true) {
          try {
            //从receivers队列中拿取data数据,只有当Inbox中有新消息时,data才会被放入到receivers中,由于receivers是一个阻塞队列,所以没有消息时,线程会被阻塞
            val data = receivers.take()
            //毒丸,空数据
            if (data == PoisonPill) {
              // Put PoisonPill back so that other MessageLoops can see it.
              receivers.offer(PoisonPill)
              return
            }
            data.inbox.process(Dispatcher.this)
          } catch {
            case NonFatal(e) => logError(e.getMessage, e)
          }
        }
      } catch {
        case ie: InterruptedException => // exit
      }
    }
  }

5.5. 发送消息

/**
   * Posts a message to a specific endpoint.
   *
   * @param endpointName name of the endpoint.
   * @param message the message to post
   * @param callbackIfStopped callback function if the endpoint is stopped.
   */
private def postMessage(
      endpointName: String,
      message: InboxMessage,
      callbackIfStopped: (Exception) => Unit): Unit = {
    val error = synchronized {
      val data = endpoints.get(endpointName)
      if (stopped) {
        Some(new RpcEnvStoppedException())
      } else if (data == null) {
        Some(new SparkException(s"Could not find $endpointName."))
      } else {
        data.inbox.post(message)
        receivers.offer(data)
        None
      }
    }
    // We don't need to call `onStop` in the `synchronized` block
    error.foreach(callbackIfStopped)
  }

5.5 Dispatcher内存模型

荐
                                                        深入理解Spark远程通信组件RPC及消息处理机制

6.发件箱OutBox

从上面的体系结构图可知,OnBox作用于客户端。类似于收件箱InBox,它与TransportClient是一对一的关系,而一个TransportClient对应着一个远程的RpcEndPoint。

6.1 OutBoxMessage

具体的消息发送逻辑则交由OutboxMessage的实现来完成,OutboxMessage有两个子类,OneWayOutboxMessageRpcOutboxMessage,分别对应调用RpcEndpointreceivereceiveAndReply方法。

对于OneWayOutboxMessage,由于不需要返回值,则简单地通过调用传输层client.send方法将消息发出。

private[netty] case class OneWayOutboxMessage(content: ByteBuffer) extends OutboxMessage
  with Logging {

  override def sendWith(client: TransportClient): Unit = {
    client.send(content)
  }

  override def onFailure(e: Throwable): Unit = {
    e match {
      case e1: RpcEnvStoppedException => logWarning(e1.getMessage)
      case e1: Throwable => logWarning(s"Failed to send one-way RPC.", e1)
    }
  }

}

对于RpcOutboxMessage,由于需要服务器的响应结果,因此需要实现传输层提供的RpcResponseCallback接口,并提供onFailure和onSuccess的方法实现。在实际的发送消息时会使用client.sendRpc方法,将消息内容和RpcResponseCallback对象传递给传输层,该方法会立即返回一个requestId。

而传输层底层会有独立的线程负责将消息序列化并且发送出去,每个Message都会返回一个UUID,由底层来维护一个发送出去消息与其Callback的HashMap。

  1. 如果请求超时,会通过requestId在传输层中移除该RPC请求,从而达到取消消息发送的效果;
  2. 如果请求的消息成功返回,则会使用RpcResponseCallback对象根据返回的状态回调对应的onFailure和onSuccess的方法,进而回调Spark core中的业务逻辑,执行Promise/Future的done方法,上层退出阻塞。
private[netty] case class RpcOutboxMessage(
    content: ByteBuffer,
    _onFailure: (Throwable) => Unit,
    _onSuccess: (TransportClient, ByteBuffer) => Unit)
  extends OutboxMessage with RpcResponseCallback with Logging {

  private var client: TransportClient = _
  private var requestId: Long = _

  override def sendWith(client: TransportClient): Unit = {
    this.client = client
    this.requestId = client.sendRpc(content, this)
  }

  def onTimeout(): Unit = {
    if (client != null) {
      client.removeRpcRequest(requestId)
    } else {
      logError("Ask timeout before connecting successfully")
    }
  }

  override def onFailure(e: Throwable): Unit = {
    _onFailure(e)
  }

  override def onSuccess(response: ByteBuffer): Unit = {
    _onSuccess(client, response)
  }

}

6.2 发送消息

/**
 * 用于发送消息。
 * - 如果目前没有可用的连接,则将消息缓存并建立一个连接。
 * - 如果[[Outbox]]已经停止,那么sender将会抛出一个[[SparkException]]
 */
  def send(message: OutboxMessage): Unit = {
    val dropped = synchronized {
      if (stopped) {
        true
      } else {
        //将RpcEnv中所有转发给某个RpcEndPoint的消息都先放到一个messages链表中
        messages.add(message)
        false
      }
    }
    if (dropped) {
      message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))
    } else {
      drainOutbox()
    }
  }

6.3 清空发件箱

drainOutbox主要用于清空发件箱中的消息,消息会通过传输层TransportClient发送给远端服务器。该方法会在开始处进行一系列的检查,需要保证传输层的连接已经建立,如果没有建立,则向nettyEnv.clientConnectionExecutor提交建立连接的任务后并返回,待连接任务完成后会再次调用drainOutox方法。另外,drainOutox会保证线程安全性,通过布尔值draining可以保证同一时刻只会有一个线程能够进行消息的处理和发送。

6.4 关闭发件箱

网络连接错误和RpcEnv的停止运行都会触发OutBox的关闭和资源的清理,OutBox关闭的处理逻辑如下:

  1. 如果connectFuture不为空,说明这会正在执行连接任务,那么调用connectFuture.cancel(true)方法,将任务取消。

  2. 调用closeClient方法,关闭客户端,这里仅仅将client引用置为null,但并不是真正的关闭,因为需要重用连接。

  3. 调用nettyEnv.removeOutbox(remoteAddress)方法,从nettyEnv中移除OutBox,因此将来的消息将会使用一个新的或原有的client连接并创建一个新的OutBox。

  4. 执行所有还未处理的消息的onFailure方法,并告知失败的原因。

会再次调用drainOutox方法。另外,drainOutox会保证线程安全性,通过布尔值draining可以保证同一时刻只会有一个线程能够进行消息的处理和发送。

6.4 关闭发件箱

网络连接错误和RpcEnv的停止运行都会触发OutBox的关闭和资源的清理,OutBox关闭的处理逻辑如下:

  1. 如果connectFuture不为空,说明这会正在执行连接任务,那么调用connectFuture.cancel(true)方法,将任务取消。

  2. 调用closeClient方法,关闭客户端,这里仅仅将client引用置为null,但并不是真正的关闭,因为需要重用连接。

  3. 调用nettyEnv.removeOutbox(remoteAddress)方法,从nettyEnv中移除OutBox,因此将来的消息将会使用一个新的或原有的client连接并创建一个新的OutBox。

  4. 执行所有还未处理的消息的onFailure方法,并告知失败的原因。

本文地址:https://blog.csdn.net/wangsl754/article/details/104978021

相关标签: Spark