spark学习-75-源代码:Endpoint模型介绍(6)-Endpoint的消息的接收(2)
关于Endpoint如何处理消息的,我一直找不到初始调用点
请问这个图中消息接受这个点在哪里?我想从这里看,但是找不到最初的起点
1。消息的产生点
没找到,消息是怎么产生的还不清楚,知道的底下评论一下,谢谢
2。消息的分发
Endpoint Inbox处理流程
Spark在Endpoint的设计上核心设计即为Inbox与Outbox,其中Inbox核心要点为
内部的处理流程拆分为多个消息指令(InboxMessage)存放入Inbox
当Dispatcher启动最后,会启动一个名为【dispatcher-event-loop】的线程扫描Inbox待处理InboxMessage,并调用Endpoint根据InboxMessage类型做相应处理
当Dispatcher启动最后,默认会向Inbox存入OnStart类型的InboxMessage,Endpoint在根据OnStart指令做相关的额外启动工作,三端启动后所有的工作都是对OnStart指令处理衍生出来的,因此可以说OnStart指令是相互通信的源头
后来看到代码创建org.apache.spark.rpc.netty.Dispatcher的时候,初始化了ThreadPoolExecutor,并且启动线程
/** Thread pool used for dispatching messages.
*
* // dispatcher的线程池
* dispatcher会用java.util.concurrent.newFixedThreadPool创建一个属于自己的
* ThreadPoolExecutor线程池,然后不停的会去拿去messages链表队列里的消息数据,
* 并根据消息的类型执行message的模式匹配做对应的处理
* */
private val threadpool: ThreadPoolExecutor = {
// 通过配置项拿到dispatcher的线程数量
val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
math.max(2, Runtime.getRuntime.availableProcessors()))
// 最后会生成Java的ThreadPoolExecutor线程池
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
for (i <- 0 until numThreads) {
// 直接调用execute执行线程MessageLoop 类型是Runnable
// 这不是启动,而是把线程丢到 线程池队列,等待Thread触发start,注意:丢进去的不是线程,而是Runnable 的实现类MessageLoop
// 放进去之后会在ThreadPoolExecutor线程池中并行的启动(虽然是队列,但是却是并行调用MessageLoop线程的start方法,不然线程池的优越性就没有了)
// 这里相当于直接启动了MessageLoop线程
pool.execute(new MessageLoop)
}
pool
}
MessageLoop是从private val receivers = new LinkedBlockingQueue[EndpointData]这个里面不停地取出数据,所以receivers里面的数据量,要么1,要么0,如果超过了,就说明你的系统太卡了,产生了很多消息,却没有消费。
// Track the receivers whose inboxes may contain messages.
// 跟踪收件箱中可能包含消息的接收方。
// 数据存储结构:java.util.concurrent下的LinkedBlockingQueue
// 里面维护着EndpointData的线程阻塞链表
// receivers貌似始终为0,里面一放入数据,就会被MessageLoop取走
private val receivers = new LinkedBlockingQueue[EndpointData]
/** Message loop used for dispatching messages. 用于发送消息的消息循环 */
private class MessageLoop extends Runnable {
override def run(): Unit = {
try {
// 线程会不停的去处理过来的messages
while (true) {
try {
// 因为receivers是一个路由器,所以来的消息(Response)和去的消息(request)都在这里面,拿出来一个消息
// OutputCommitCoordinator
val data = receivers.take()
// 如果取出的信息数据是空的
if (data == PoisonPill) {
// Put PoisonPill back so that other MessageLoops can see it.
// 就放回去LinkedBlockingQueue
receivers.offer(PoisonPill)
return
}
// 调用inbox的process方法,根据需要处理的消息类型message的模式匹配来执行对应的处理方式
// 调用收件箱的处理方法
data.inbox.process(Dispatcher.this)
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
}
} catch {
case ie: InterruptedException => // exit
}
}
}
主要看看 data.inbox.process(Dispatcher.this)这个方法
/**
* Process stored messages.
*
* 因为process里面涉及到一些组件真正调用单向和双向消息的具体实现,模式匹配+偏函数经典搭配;
* 包括还有远程消息体的处理方式
*/
def process(dispatcher: Dispatcher): Unit = {
var message: InboxMessage = null
// 这里是增加numActiveThreads 下面有减少的 应该是为了保证一个消息 ,只能发送一次(比如增加executor的消息,本来发送消息只是增加一个,
// 但是不这样做,有可能这个消息,因为某些原因重试了好几次,那么就增加了很多的executor,造成浪费)
inbox.synchronized {
// 这个(真 && 假) 结果为false
if (!enableConcurrent && numActiveThreads != 0) {
return
}
// 先从messages链表里poll出一条消息数据
message = messages.poll()
// 这个numActiveThreads只要来的消息不为null就设置为1,但是返回后,又变成0了 在1和0之间转换
if (message != null) {
numActiveThreads += 1
} else {
return
}
}
// message:"RpcMessage(192.168.2.89:59605,ExpireDeadHost,aaa@qq.com)"
// message:"RpcMessage(192.168.2.89:59605,CheckExistence(HeartbeatReceiver),aaa@qq.com)"
// message:"RpcMessage(192.168.2.89:59605,RegisterBlockManager(BlockManagerId(driver,192.168.2.89,59296,none),915406848,NettyRpcEndpointRef(Spark://aaa@qq.com:59065)),aaa@qq.com)"
// message:"RpcMessage(192.168.2.89:59605,Heartbeat(driver[Lscala,Tuple2:@33a2cec3,nonullne),aaa@qq.com)"
// message:"RpcMessage(192.168.2.89:59605,ExecutorRegistered(driver),aaa@qq.com)"
// message:"RpcMessage(192.168.2.89:59605,BlockManagerHeartbeat(driver(driver,192.168.2.89,59296,none),aaa@qq.com)"
// message:"RpcMessage(192.168.2.89:59605,UpdateBlockInfo(BlockManagerId(driver,192.168.2.89,59296,none),broadcast_0_piece0,StorageLevel(memory,1 replicas),29064,0),aaa@qq.com)"
// message:"OneWayMessage(192.168.2.89:59605,StatusUpdate(1,RUNNING,java.nio,HeartByteBuffer[pos=0,lim=0,cap=0]))"
// message:"RpcMessage(192.168.2.89:59605,StopMapOutTracker,aaa@qq.com)"
// message:"OneWayMessage(192.168.2.89:59605,StopCoordinator)"
while (true) {
// 循环调用RpcEndpoint
safelyCall(endpoint) {
// 对poll出来的message做模式匹配,调用对应的处理机制
message match {
// message:"RpcMessage(192.168.2.89:59605,ExpireDeadHost,aaa@qq.com)"
// message:"RpcMessage(192.168.2.89:59605,CheckExistence(HeartbeatReceiver),aaa@qq.com)"
// message:"RpcMessage(192.168.2.89:59605,RegisterBlockManager(BlockManagerId(driver,192.168.2.89,59296,none),915406848,NettyRpcEndpointRef(Spark://aaa@qq.com:59065)),aaa@qq.com)"
// message:"RpcMessage(192.168.2.89:59605,Heartbeat(driver[Lscala,Tuple2:@33a2cec3,nonullne),aaa@qq.com)"
// message:"RpcMessage(192.168.2.89:59605,ExecutorRegistered(driver),aaa@qq.com)"
// message:"RpcMessage(192.168.2.89:59605,BlockManagerHeartbeat(driver(driver,192.168.2.89,59296,none),aaa@qq.com)"
// message:"RpcMessage(192.168.2.89:59605,UpdateBlockInfo(BlockManagerId(driver,192.168.2.89,59296,none),broadcast_0_piece0,StorageLevel(memory,1 replicas),29064,0),aaa@qq.com)"
// message:"RpcMessage(192.168.2.89:59605,StopMapOutTracker,aaa@qq.com)"
// 匹配到一条普通的Rpc消息
// 这里说一下,一下所有匹配的消息类型都实现了trait InboxMessage,包括这条RpcMessage
case RpcMessage(_sender, content, context) =>
try {
// 这个方法是接收并返回的双向消息体,是通过sender调用对应的Ref的ask方法触发的
// 包括在下个章节会提及的blockmanager中的BlockManagerSlaveEndpoint组件在执行
// RemoveBlock,GetBlockStatus等操作时都是调用receiveAndReply
// 这里补充一下:receiveAndReply是一个PartialFunction(偏函数),当endpoint调用
// receiveAndReply时会根据case 到的类型执行对应的操作
// 如果是Heartbeat==》org.apache.spark.Heartbeat.receiveAndReply()
// 如果是BlockManagerHeartbeat==》org.apache.spark.storage.BlockManagerMasterEndpoint.receiveAndReply()
endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
} catch {
case NonFatal(e) =>
context.sendFailure(e)
// Throw the exception -- this exception will be caught by the safelyCall function.
// The endpoint's onError function will be called.
throw e
}
// message:"OneWayMessage(192.168.2.89:59605,StatusUpdate(1,RUNNING,java.nio,HeartByteBuffer[pos=0,lim=0,cap=0]))"
// 匹配一个单向的消息处理机制
case OneWayMessage(_sender, content) =>
// 这就是刚刚说到的单向消息体的具体实现
// 调用偏函数receive处理一个Ref调用send或者reply发送过过来的消息
endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
// 匹配一个开启endpoint接收消息的方法
case OnStart =>
// 在endpoint接收任何消息之前调用,启动它的接收消息功能
endpoint.onStart()
// 如果它的实例不是ThreadSafeRpcEndpoint类型就强制关闭
if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
inbox.synchronized {
if (!stopped) {
enableConcurrent = true
}
}
}
// 匹配一个停止endpoint接收消息的方法,当匹配到这个方法后,它的send和ask都不能用了
case OnStop =>
val activeThreads = inbox.synchronized { inbox.numActiveThreads }
// 做个断言
assert(activeThreads == 1,
s"There should be only a single active thread but found $activeThreads threads.")
// 移除掉RpcEndpointRef
dispatcher.removeRpcEndpointRef(endpoint)
// 停止接收消息
endpoint.onStop()
// 断言是否为空
assert(isEmpty, "OnStop should be the last message")
// 匹配到一条告诉所有节点的消息,一个远程进程已连接
case RemoteProcessConnected(remoteAddress) =>
endpoint.onConnected(remoteAddress)
// 匹配到一条告诉所有节点的消息,一个远程进程已断开连接
case RemoteProcessDisconnected(remoteAddress) =>
endpoint.onDisconnected(remoteAddress)
// 匹配到一条告诉所有节点的消息,一个远程进程连接发生错误状态
case RemoteProcessConnectionError(cause, remoteAddress) =>
endpoint.onNetworkError(cause, remoteAddress)
}
}
// 这里减少numActiveThreads
inbox.synchronized {
// "enableConcurrent" will be set to false after `onStop` is called, so we should check it
// every time.
if (!enableConcurrent && numActiveThreads != 1) {
// If we are not the only one worker, exit
numActiveThreads -= 1
return
}
message = messages.poll()
if (message == null) {
numActiveThreads -= 1
return
}
}
}
}
注意这个方法,最先开始的第一个消息永远是onStart()方法,最后一个永远是onStop()方法,所以第一次走
// 匹配一个开启endpoint接收消息的方法
case OnStart =>
// 在endpoint接收任何消息之前调用,启动它的接收消息功能
endpoint.onStart()
// 如果它的实例不是ThreadSafeRpcEndpoint类型就强制关闭
if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
inbox.synchronized {
if (!stopped) {
enableConcurrent = true
}
}
}
最后一次走onStop()
// 匹配一个停止endpoint接收消息的方法,当匹配到这个方法后,它的send和ask都不能用了
case OnStop =>
val activeThreads = inbox.synchronized { inbox.numActiveThreads }
// 做个断言
assert(activeThreads == 1,
s"There should be only a single active thread but found $activeThreads threads.")
// 移除掉RpcEndpointRef
dispatcher.removeRpcEndpointRef(endpoint)
// 停止接收消息
endpoint.onStop()
// 断言是否为空
assert(isEmpty, "OnStop should be the last message")
然后其他路径看注释,分发到不同的Endpoint.receiveAndReply()方法去处理
这里有个疑问第一个onStart()方法,调用的是谁?我80%的猜想,如果endpoint传入的是master就是master中的onStart()方法,但是调试的时候,第一个总是跑到特质里面,无法跳到继承类里面。
endpoints列表中,有如下信息
private val endpoints: ConcurrentMap[String, EndpointData] =
new ConcurrentHashMap[String, EndpointData]
而且每次多一个,就执行一次onStart方法,匹配到下面
// 匹配一个开启endpoint接收消息的方法
case OnStart =>
// 在endpoint接收任何消息之前调用,启动它的接收消息功能
endpoint.onStart()
// 如果它的实例不是ThreadSafeRpcEndpoint类型就强制关闭
if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
inbox.synchronized {
if (!stopped) {
enableConcurrent = true
}
}
}
我进截图,里面的类,发现有的有onStart()方法,有的没有,但是都和RpcEndpointRef有关系,而RpcEndpointRef中没有onStart方法,而org.apache.spark.rpc.RpcEndpoint中有
/**
* Invoked before [[RpcEndpoint]] starts to handle any message.
* 开始处理任何消息之前调用[[RpcEndpoint]]。
*
* 启动RpcEndpoint处理任何消息
*/
def onStart(): Unit = {
// By default, do nothing.
}
因此两种猜测:
1。7个类中有onStart()就执行,否则就执行RpcEndpoint的onStart()方法
2。全部由RpcEndpoint的onStart()方法执行,什么都不做,仅仅是一个形式