荐 深入理解Spark远程通信组件RPC及消息处理机制
1.简介
在Spark中,不同组件像driver,executor,worker,master(stanalone模式)之间的通信是基于RPC来实现的。Spark 1.6之前,Spark的RPC是基于Akka来实现的。Akka是一个基于scala语言的异步的消息框架。Spark1.6后,spark借鉴Akka的设计自己实现了一个基于Netty的rpc框架。本文主要对spark1.6之后基于netty新开发的rpc框架做一个较为深入的分析。
2.整体架构
3.核心组件
3.1 RpcEnv
在介绍RpcEnv之前,我们先介绍SparkEnv。SparkEnv是Spark的执行环境对象,其中包括与众多Executor执行相关的对象。Spark 对任务的计算都依托于 Executor 的能力,所有的 Executor 都有自己的 Spark 的执行环境 SparkEnv。有了 SparkEnv,就可以将数据存储在存储体系中;就能利用计算引擎对计算任务进行处理,就可以在节点间进行通信等。
RpcEnv为RpcEndpoint提供处理消息的环境。RpcEnv负责RpcEndpoint整个生命周期的管理,包括:注册endpoint,endpoint之间消息的路由,以及停止endpoint。
private[spark] abstract class RpcEnv(conf: SparkConf) {
/**
* RPC远程终端查找的默认超时时间,默认为120s
*/
private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)
/**
* 返回已经注册的[[RpcEndpoint]]的RpcEndpointRef。
* 该方法只用于[[RpcEndpoint.self]]方法实现中。
* 如果终端相关的[[RpcEndpointRef]]不存在,则返回null。
*/
private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
/**
* 如果是服务器模式,则返回当前服务器监听的地址;否则为空
*/
def address: RpcAddress
/**
* 使用一个name来注册一个[[RpcEndpoint]],并且返回它的[[RpcEndpointRef]]对象。
* [[RpcEnv]]并不保证线程安全性。
*/
def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
/**
* 通过一个URI来异步检索[[RpcEndpointRef]]对象
*/
def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]
/**
* 通过一个URI来同步检索[[RpcEndpointRef]]对象
*/
def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
}
/**
* 根据`address` 和 `endpointName`对 [[RpcEndpointRef]]进行同步检索。
*/
def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = {
setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString) // URI:
}
/**
* 停止指定的[[RpcEndpoint]]对象。
*/
def stop(endpoint: RpcEndpointRef): Unit
/**
* 异步关闭当前的[[RpcEnv]]。
* 如果需要确保成功地退出[[RpcEnv]],在执行[[shutdown()]]之后需要调用[[awaitTermination()]]。
*/
def shutdown(): Unit
/**
* 等待直到[[RpcEnv]]退出。
* TODO do we need a timeout parameter?
*/
def awaitTermination(): Unit
/**
* 如果没有[[RpcEnv]]对象,那么[[RpcEndpointRef]]将不能被反序列化。
* 因此,如果任何反序列化的对象中包含了[[RpcEndpointRef]],那么这些反序列化的代码都应该在该方法中执行。
*/
def deserialize[T](deserializationAction: () => T): T
/**
* 用于返回文件服务器的实例。
* 如果RpcEnv不是以服务器模式运行,那么该项可能为null。
*
*/
def fileServer: RpcEnvFileServer
/**
* 打开一个通道从给定的URI下载文件。
* 如果由RpcEnvFileServer返回的URI使用"spark"模式,那么该方法将会被工具类调用来进行文件检索。
*
* @param uri URI with location of the file.
*/
def openChannel(uri: String): ReadableByteChannel
}
3.2 RpcEndpoint
RpcEndPoint 代表具体的通信节点,例如Master、Worker、CoarseGrainedSchedulerBackend中的DriverEndpoint、CoarseGrainedExecutorBackend等,都实现了该接口,在具体的函数中定义了消息传递来时的处理逻辑,整个生命周期是constructor -> onStart -> receive* -> onStop
,即调用构造函数,然后向RpcEnv注册,内部调用onStart,之后如果收到消息,RpcEnv会调用receive*
方法,结束时调用onStop方法,并给出了通信过程中RpcEndpoint所具有的基于事件驱动的行为(连接、断开、网络异常),实际上对于Spark框架来说主要是接收消息并处理。
private[spark] trait RpcEndpoint {
// 当前RpcEndpoint注册的RpcEnv
val rpcEnv: RpcEnv
/**
* 当前[[RpcEndpoint]]的代理,当`onStart`方法被调用时`self`生效,当`onStop`被调用时,`self`变
* 成null。
* 注意:在`onStart`方法被调用之前,[[RpcEndpoint]]对象还未进行注册,所以就没有有效的 * * [[RpcEndpointRef]]。
*/
final def self: RpcEndpointRef
/**
* 接收由RpcEndpointRef.send方法发送的消息,
* 该类消息不需要进行响应消息(Reply),而只是在RpcEndpoint端进行处理。
* 如果接收到一个不匹配的消息,将会抛出SparkException异常,并发送给`onError`。
*/
def receive: PartialFunction[Any, Unit]
/**
* 处理来自`RpcEndpointRef.ask`的消息,RpcEndpoint端处理完消息后,需要给调用RpcEndpointRef.ask * 的通信端返回响应消息。
*/
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]
//一系列的回调函数
def onError(cause: Throwable): Unit
def onConnected(remoteAddress: RpcAddress): Unit
def onDisconnected(remoteAddress: RpcAddress): Unit
def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit
def onStart(): Unit
def onStop(): Unit
// 停止RpcEndpoint
final def stop(): Unit
}
3.3 RpcEndpointRef
RpcEndpointRef是一个对RpcEndpoint的远程引用对象,内部记录了RpcEndpoint的位置信息,通过它可以向远程的RpcEndpoint端发送消息以进行通信。
private[spark] abstract class RpcEndpointRef(conf: SparkConf)
extends Serializable with Logging {
// 最大重连次数(3),重新尝试的等待事件(3s),默认的超时事件(120s)
private[this] val maxRetries = RpcUtils.numRetries(conf)
private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
// 对应RpcEndpoint的地址(host:port),名称
def address: RpcAddress
def name: String
// 发送一条单向的异步消息,并且发送消息后不等待响应,亦即Send-and-forget。
def send(message: Any): Unit
/**
* 发送消息给相关的[[RpcEndpoint.receiveAndReply]],并且返回一个 Future,能够在timeout时间内接 * 收回复。
* 该方法只会发送一次消息,失败后不重试。
* 而ask方法发送消息后需要等待通信对端给予响应,通过Future来异步获取响应结果。
*/
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
// 发送消息到相应的`RpcEndpoint.receiveAndReply`,阻塞等待回复的结果
def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)
....
}
4.收件箱Inbox
从上面的体系结构图可知,InBox作用于服务器端。它与RpcEndpoint是一对一的关系,每一个命名唯一的RpcEndpoint对应一个线程安全的InBox。所有发送给一个RpcEndpoint的消息,都由对应的InBox进行存储。InBox提供一个process方法实现,该方法会在一个dispatcher-event-loop线程池中被调用,将InBox中的消息提供给关联的RpcEndpoint进行消费。
需要注意的是,如果通信端端点的实现是继承自ThreadSafeRpcEndpoint,则表明该Endpoint不允许并发处理消息。如果继承自RpcEndpoint,那么就可以并发的调用该服务。在具体的process方法中,如果enableConcurrent为false,即只允许单线程处理。那么执行process方法时,如果numActiveThreads大于0,说明已经至少有一个线程正在处理,则立即返回,取消本次处理操作。
4.1 InboxMessage
InboxMessage是一个特质,是Inbox中所处理的基本消息单位,OneWayMessage,RpcMessage,OnStart等消息都继承了InboxMessage
4.2 messages
存储InboxMessage的链表
protected val messages = new java.util.LinkedList[InboxMessage]()
4.3 消息处理
def process(dispatcher: Dispatcher): Unit = {
var message: InboxMessage = null
inbox.synchronized {
//enableConcurrent默认为false
//当前线程被占用时,即返回
if (!enableConcurrent && numActiveThreads != 0) {
return
}
//只有当Inbox中有新消息(InboxMessage)时,才会启用当前线程
message = messages.poll()
if (message != null) {
numActiveThreads += 1
} else {
return
}
}
while (true) {
...
//消息处理
}
inbox.synchronized {
//enableConcurrent会被设置为false在onStop()被调用后
if (!enableConcurrent && numActiveThreads != 1) {
// If we are not the only one worker, exit
numActiveThreads -= 1
return
}
message = messages.poll()
if (message == null) {
numActiveThreads -= 1
return
}
}
}
}
4.4消息发送
def post(message: InboxMessage): Unit = inbox.synchronized {
if (stopped) {
// We already put "OnStop" into "messages", so we should drop further messages
onDrop(message)
} else {
messages.add(message)
false
}
}
5.消息转发路由Dispatcher
消息调度器,有效提高NettyRpcEnv对消息的处理并最大提升并行处理,主要负责将RPC消息发送到要对该消息处理的RpcEndpoint.
5.1 EndpointData
EndpointData是一个简单的JavaBean类,RPC端点数据,把RpcEndpoint,RpcEndpointRef,Inbox封装在一起。
private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
val inbox = new Inbox(ref, endpoint)
}
5.2 receivers
端点数据的阻塞队列(LinkedBlockingQueue),只有当Inbox中有新消息(InboxMessage)时,才会放入到此队列中。
private val receivers = new LinkedBlockingQueue[EndpointData]
5.3 threadpool
用于对消息进行调度的线程池,默认线程池大小为2,线程为MessageLoop
/** Message loop used for dispatching messages. */
private class MessageLoop extends Runnable {
override def run(): Unit = {
try {
while (true) {
try {
//从receivers队列中拿取data数据,只有当Inbox中有新消息时,data才会被放入到receivers中,由于receivers是一个阻塞队列,所以没有消息时,线程会被阻塞
val data = receivers.take()
//毒丸,空数据
if (data == PoisonPill) {
// Put PoisonPill back so that other MessageLoops can see it.
receivers.offer(PoisonPill)
return
}
data.inbox.process(Dispatcher.this)
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
}
} catch {
case ie: InterruptedException => // exit
}
}
}
5.5. 发送消息
/**
* Posts a message to a specific endpoint.
*
* @param endpointName name of the endpoint.
* @param message the message to post
* @param callbackIfStopped callback function if the endpoint is stopped.
*/
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
val error = synchronized {
val data = endpoints.get(endpointName)
if (stopped) {
Some(new RpcEnvStoppedException())
} else if (data == null) {
Some(new SparkException(s"Could not find $endpointName."))
} else {
data.inbox.post(message)
receivers.offer(data)
None
}
}
// We don't need to call `onStop` in the `synchronized` block
error.foreach(callbackIfStopped)
}
5.5 Dispatcher内存模型
6.发件箱OutBox
从上面的体系结构图可知,OnBox作用于客户端。类似于收件箱InBox,它与TransportClient是一对一的关系,而一个TransportClient对应着一个远程的RpcEndPoint。
6.1 OutBoxMessage
具体的消息发送逻辑则交由OutboxMessage的实现来完成,OutboxMessage有两个子类,OneWayOutboxMessage和RpcOutboxMessage,分别对应调用RpcEndpoint的receive
和receiveAndReply
方法。
对于OneWayOutboxMessage,由于不需要返回值,则简单地通过调用传输层client.send
方法将消息发出。
private[netty] case class OneWayOutboxMessage(content: ByteBuffer) extends OutboxMessage
with Logging {
override def sendWith(client: TransportClient): Unit = {
client.send(content)
}
override def onFailure(e: Throwable): Unit = {
e match {
case e1: RpcEnvStoppedException => logWarning(e1.getMessage)
case e1: Throwable => logWarning(s"Failed to send one-way RPC.", e1)
}
}
}
对于RpcOutboxMessage,由于需要服务器的响应结果,因此需要实现传输层提供的RpcResponseCallback接口,并提供onFailure和onSuccess的方法实现。在实际的发送消息时会使用client.sendRpc方法,将消息内容和RpcResponseCallback对象传递给传输层,该方法会立即返回一个requestId。
而传输层底层会有独立的线程负责将消息序列化并且发送出去,每个Message都会返回一个UUID,由底层来维护一个发送出去消息与其Callback的HashMap。
- 如果请求超时,会通过requestId在传输层中移除该RPC请求,从而达到取消消息发送的效果;
- 如果请求的消息成功返回,则会使用RpcResponseCallback对象根据返回的状态回调对应的onFailure和onSuccess的方法,进而回调Spark core中的业务逻辑,执行Promise/Future的done方法,上层退出阻塞。
private[netty] case class RpcOutboxMessage(
content: ByteBuffer,
_onFailure: (Throwable) => Unit,
_onSuccess: (TransportClient, ByteBuffer) => Unit)
extends OutboxMessage with RpcResponseCallback with Logging {
private var client: TransportClient = _
private var requestId: Long = _
override def sendWith(client: TransportClient): Unit = {
this.client = client
this.requestId = client.sendRpc(content, this)
}
def onTimeout(): Unit = {
if (client != null) {
client.removeRpcRequest(requestId)
} else {
logError("Ask timeout before connecting successfully")
}
}
override def onFailure(e: Throwable): Unit = {
_onFailure(e)
}
override def onSuccess(response: ByteBuffer): Unit = {
_onSuccess(client, response)
}
}
6.2 发送消息
/**
* 用于发送消息。
* - 如果目前没有可用的连接,则将消息缓存并建立一个连接。
* - 如果[[Outbox]]已经停止,那么sender将会抛出一个[[SparkException]]
*/
def send(message: OutboxMessage): Unit = {
val dropped = synchronized {
if (stopped) {
true
} else {
//将RpcEnv中所有转发给某个RpcEndPoint的消息都先放到一个messages链表中
messages.add(message)
false
}
}
if (dropped) {
message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))
} else {
drainOutbox()
}
}
6.3 清空发件箱
drainOutbox主要用于清空发件箱中的消息,消息会通过传输层TransportClient发送给远端服务器。该方法会在开始处进行一系列的检查,需要保证传输层的连接已经建立,如果没有建立,则向nettyEnv.clientConnectionExecutor提交建立连接的任务后并返回,待连接任务完成后会再次调用drainOutox方法。另外,drainOutox会保证线程安全性,通过布尔值draining可以保证同一时刻只会有一个线程能够进行消息的处理和发送。
6.4 关闭发件箱
网络连接错误和RpcEnv的停止运行都会触发OutBox的关闭和资源的清理,OutBox关闭的处理逻辑如下:
-
如果connectFuture不为空,说明这会正在执行连接任务,那么调用connectFuture.cancel(true)方法,将任务取消。
-
调用closeClient方法,关闭客户端,这里仅仅将client引用置为null,但并不是真正的关闭,因为需要重用连接。
-
调用nettyEnv.removeOutbox(remoteAddress)方法,从nettyEnv中移除OutBox,因此将来的消息将会使用一个新的或原有的client连接并创建一个新的OutBox。
-
执行所有还未处理的消息的onFailure方法,并告知失败的原因。
会再次调用drainOutox方法。另外,drainOutox会保证线程安全性,通过布尔值draining可以保证同一时刻只会有一个线程能够进行消息的处理和发送。
6.4 关闭发件箱
网络连接错误和RpcEnv的停止运行都会触发OutBox的关闭和资源的清理,OutBox关闭的处理逻辑如下:
-
如果connectFuture不为空,说明这会正在执行连接任务,那么调用connectFuture.cancel(true)方法,将任务取消。
-
调用closeClient方法,关闭客户端,这里仅仅将client引用置为null,但并不是真正的关闭,因为需要重用连接。
-
调用nettyEnv.removeOutbox(remoteAddress)方法,从nettyEnv中移除OutBox,因此将来的消息将会使用一个新的或原有的client连接并创建一个新的OutBox。
-
执行所有还未处理的消息的onFailure方法,并告知失败的原因。
本文地址:https://blog.csdn.net/wangsl754/article/details/104978021