spark学习-71-源代码:Endpoint模型介绍(3)-Endpoint Send&Ask流程
程序员文章站
2022-07-15 10:09:26
...
1。Endpoint Send&Ask流程
Endpoint的消息发送与请求流程,如下:
从图上看send方法最先调用,我们来看看什么时候调用,下面是worker的调用语句,但是这里我们主要走的是master的send语句
workerRef.send(MasterInStandby)
send代码如下,这里主要处理是发给自己的还是发给别人的消息
/**
* 我们来看下send这个经典的发送消息的方法,里面封装了不同类型消息体之间的通信的不同实现
*/
private[netty] def send(message: RequestMessage): Unit = {
// 拿到需要发送的endpoint地址
val remoteAddr = message.receiver.address
// 判断是否是远程地址,如果remoteAddr == address远程地址和自己所在的地址是一个地址,比如,我自己是address=192.168.10.12,要发送的地址
// 是remoteAddr=192.268.10.12,这说明我是给自己发送消息,所以直接放到自己的邮箱里就可以了,不需要跑一圈
if (remoteAddr == address) {
// Message to a local RPC endpoint.
try {
// 如果消息接受者在本地就调用dispatcher来发送消息 消息放到自己的收件箱里
// message的格式:
// message:"RpcMessage(192.168.2.89:53298,TrackSchedulerIsSet,aaa@qq.com)"
dispatcher.postOneWayMessage(message)
} catch {
case e: RpcEnvStoppedException => logWarning(e.getMessage)
}
} else {
// 否则就是我自己是address=192.168.10.12,要发送的地址
// 是remoteAddr=192.268.10.56,这说明我是给别人发送消息,所以需要放到自己的发件箱里就可以了,然后被系统发送出去,别人来接收
// Message to a remote RPC endpoint.
// 如果消息接受者在远程节点就发送到对应节点的outbox
// message.serialize(this) 这里消息要序列化,因为是要放到自己的发件箱,然后发送到别的机器上,要走网络的,因此要序列化
// message.receiver==>NettyRpcEndpointRef==》比如workerRef
// message的格式:
// message:"RpcMessage(192.168.2.89:53298,TrackSchedulerIsSet,aaa@qq.com)"
postToOutbox(message.receiver, OneWayOutboxMessage(message.serialize(this)))
}
}
这里先看发给自己的
dispatcher.postOneWayMessage(message)
/** Posts a one-way message. 单向发送消息*/
def postOneWayMessage(message: RequestMessage): Unit = {
postMessage(message.receiver.name, OneWayMessage(message.senderAddress, message.content),
(e) => throw e)
}
最终放到了自己的收件箱(相当于QQ发邮件给自己)
/**
* Posts a message to a specific endpoint.
* 将消息发送到特定的endpoint。
*
* @param endpointName name of the endpoint.
* @param message the message to post
* @param callbackIfStopped callback function if the endpoint is stopped.
*/
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
val error = synchronized {
// 拿到对应的EndpointData,从收件箱拿出第一条消息onStart
val data = endpoints.get(endpointName)
// 判断dispatcher是否停止了,如果停止了 信息肯定发不出去了
if (stopped) {
// 抛出异常
Some(new RpcEnvStoppedException())
} else if (data == null) {
// 如果data为空,那就是endpoint没有注册
Some(new SparkException(s"Could not find $endpointName."))
} else {
// 调用inbox对象把massage加入到java.util.LinkedList[InboxMessage]消息队列链表中 放到自己的收件箱里
data.inbox.post(message)
// 把EndpointData加入到receivers链表中等待被消费
receivers.offer(data)
None
}
}
// We don't need to call `onStop` in the `synchronized` block
error.foreach(callbackIfStopped)
}
然后看看发给别人的
// 否则就是我自己是address=192.168.10.12,要发送的地址
// 是remoteAddr=192.268.10.56,这说明我是给别人发送消息,所以需要放到自己的发件箱里就可以了,然后被系统发送出去,别人来接收
// Message to a remote RPC endpoint.
// 如果消息接受者在远程节点就发送到对应节点的outbox
// message.serialize(this) 这里消息要序列化,因为是要放到自己的发件箱,然后发送到别的机器上,要走网络的,因此要序列化
// message.receiver==>NettyRpcEndpointRef==》比如workerRef
// message的格式:
// message:"RpcMessage(192.168.2.89:53298,TrackSchedulerIsSet,aaa@qq.com)"
postToOutbox(message.receiver, OneWayOutboxMessage(message.serialize(this)))
主要把信息放到自己的发件箱里等待被 邮差取走后送给别人
private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
// 如果接收端的TransportClient启动了 就直接调用sendWith
// 调用sendWith核心方法
// 提醒一下:这里所有的outbox里提取出的消息体都是实现了trait OutboxMessage
// 所以不同类型的message调用的sendWith实现也不同
// 也是分为单向和双向消息体
// message.receiver(receiver.client)==>NettyRpcEndpointRef==》比如workerRef
// message的格式:
// message:"RpcMessage(192.168.2.89:53298,TrackSchedulerIsSet,aaa@qq.com)"
if (receiver.client != null) {
// 如果接受信息的客户端存在,直接发送信息过去
message.sendWith(receiver.client)
} else {
// 如果接收端没有启动TransportClient就会先查询下是否包含接收者地址
require(receiver.address != null,
"Cannot send message to client endpoint with no listen address.")
val targetOutbox = {
// 通过Rpc地址从outboxes拿到接收者地址的对应的outbox
// 数据结构:Java的ConcurrentHashMap[RpcAddress, Outbox],根据要发送的远程地址获取本地发件箱
// 有点拗口:我的本地地址是 远程地址
// 192.168.10.82 192.168.10.83
// inbox(82的收件箱) <------------不断的从 右边 取消息放到 左边-------------- outBox(83的发件箱)
//
//
// outBox(82的发件箱) ------------不断的从 左边 取消息放到 右边--------------> inbox(83的收件箱)
val outbox = outboxes.get(receiver.address)
if (outbox == null) {
// 如果该地址对应的outbox不存在就构建一个 (如果发件箱不存在,就创建一个)
val newOutbox = new Outbox(this, receiver.address)
// 并加入到outboxes里面
val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox)
if (oldOutbox == null) {
// 若为空就直接引用刚生成的newOutbox
newOutbox
} else {
// 返回老的
oldOutbox
}
} else {
// 返回
outbox
}
}
// 判断NettyRpcEnv是否停止了
if (stopped.get) {
// It's possible that we put `targetOutbox` after stopping. So we need to clean it.
outboxes.remove(receiver.address)
// 关闭发件箱
targetOutbox.stop()
} else {
// 最后生成的outbox对象会根据不同的状态执行send中不同的实现
// 包括可能也会走drainOutbox方法(里面包含在接收者端启动一个TransportClient)
// 把message添加到自己的消息队列里 ,加入 java.util.LinkedList[OutboxMessage] 的队列中,等待被线程消费
targetOutbox.send(message)
}
}
}