欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark学习-75-源代码:Endpoint模型介绍(6)-Endpoint的消息的接收(2)

程序员文章站 2022-07-15 10:09:14
...

关于Endpoint如何处理消息的,我一直找不到初始调用点
spark学习-75-源代码:Endpoint模型介绍(6)-Endpoint的消息的接收(2)
请问这个图中消息接受这个点在哪里?我想从这里看,但是找不到最初的起点

1。消息的产生点

没找到,消息是怎么产生的还不清楚,知道的底下评论一下,谢谢

2。消息的分发

Endpoint Inbox处理流程
Spark在Endpoint的设计上核心设计即为Inbox与Outbox,其中Inbox核心要点为
内部的处理流程拆分为多个消息指令(InboxMessage)存放入Inbox
当Dispatcher启动最后,会启动一个名为【dispatcher-event-loop】的线程扫描Inbox待处理InboxMessage,并调用Endpoint根据InboxMessage类型做相应处理
当Dispatcher启动最后,默认会向Inbox存入OnStart类型的InboxMessage,Endpoint在根据OnStart指令做相关的额外启动工作,三端启动后所有的工作都是对OnStart指令处理衍生出来的,因此可以说OnStart指令是相互通信的源头

spark学习-75-源代码:Endpoint模型介绍(6)-Endpoint的消息的接收(2)

后来看到代码创建org.apache.spark.rpc.netty.Dispatcher的时候,初始化了ThreadPoolExecutor,并且启动线程

/** Thread pool used for dispatching messages.
    *
    * //  dispatcher的线程池
    *   dispatcher会用java.util.concurrent.newFixedThreadPool创建一个属于自己的
    * ThreadPoolExecutor线程池,然后不停的会去拿去messages链表队列里的消息数据,
    * 并根据消息的类型执行message的模式匹配做对应的处理
    * */
  private val threadpool: ThreadPoolExecutor = {
    //  通过配置项拿到dispatcher的线程数量
    val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
      math.max(2, Runtime.getRuntime.availableProcessors()))

    // 最后会生成Java的ThreadPoolExecutor线程池
    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
    for (i <- 0 until numThreads) {
      // 直接调用execute执行线程MessageLoop  类型是Runnable
      // 这不是启动,而是把线程丢到  线程池队列,等待Thread触发start,注意:丢进去的不是线程,而是Runnable 的实现类MessageLoop
      // 放进去之后会在ThreadPoolExecutor线程池中并行的启动(虽然是队列,但是却是并行调用MessageLoop线程的start方法,不然线程池的优越性就没有了)
      // 这里相当于直接启动了MessageLoop线程
      pool.execute(new MessageLoop)
    }
    pool
  }

MessageLoop是从private val receivers = new LinkedBlockingQueue[EndpointData]这个里面不停地取出数据,所以receivers里面的数据量,要么1,要么0,如果超过了,就说明你的系统太卡了,产生了很多消息,却没有消费。

  // Track the receivers whose inboxes may contain messages.
  // 跟踪收件箱中可能包含消息的接收方。
  // 数据存储结构:java.util.concurrent下的LinkedBlockingQueue
  // 里面维护着EndpointData的线程阻塞链表
  // receivers貌似始终为0,里面一放入数据,就会被MessageLoop取走
  private val receivers = new LinkedBlockingQueue[EndpointData]
/** Message loop used for dispatching messages. 用于发送消息的消息循环 */
  private class MessageLoop extends Runnable {
    override def run(): Unit = {
      try {
        // 线程会不停的去处理过来的messages
        while (true) {
          try {
            // 因为receivers是一个路由器,所以来的消息(Response)和去的消息(request)都在这里面,拿出来一个消息
            // OutputCommitCoordinator
            val data = receivers.take()
            // 如果取出的信息数据是空的
            if (data == PoisonPill) {
              // Put PoisonPill back so that other MessageLoops can see it.
              // 就放回去LinkedBlockingQueue
              receivers.offer(PoisonPill)
              return
            }
            // 调用inbox的process方法,根据需要处理的消息类型message的模式匹配来执行对应的处理方式
            // 调用收件箱的处理方法
            data.inbox.process(Dispatcher.this)
          } catch {
            case NonFatal(e) => logError(e.getMessage, e)
          }
        }
      } catch {
        case ie: InterruptedException => // exit
      }
    }
  }

主要看看 data.inbox.process(Dispatcher.this)这个方法

/**
   * Process stored messages.
    *
    * 因为process里面涉及到一些组件真正调用单向和双向消息的具体实现,模式匹配+偏函数经典搭配;
    *  包括还有远程消息体的处理方式
   */
  def process(dispatcher: Dispatcher): Unit = {
    var message: InboxMessage = null
    // 这里是增加numActiveThreads 下面有减少的 应该是为了保证一个消息 ,只能发送一次(比如增加executor的消息,本来发送消息只是增加一个,
    // 但是不这样做,有可能这个消息,因为某些原因重试了好几次,那么就增加了很多的executor,造成浪费)
    inbox.synchronized {
      // 这个(真 && 假) 结果为false
      if (!enableConcurrent && numActiveThreads != 0) {
        return
      }
      // 先从messages链表里poll出一条消息数据
      message = messages.poll()
      // 这个numActiveThreads只要来的消息不为null就设置为1,但是返回后,又变成0了 在1和0之间转换
      if (message != null) {
        numActiveThreads += 1
      } else {
        return
      }
    }
    // message:"RpcMessage(192.168.2.89:59605,ExpireDeadHost,aaa@qq.com)"
    // message:"RpcMessage(192.168.2.89:59605,CheckExistence(HeartbeatReceiver),aaa@qq.com)"
    // message:"RpcMessage(192.168.2.89:59605,RegisterBlockManager(BlockManagerId(driver,192.168.2.89,59296,none),915406848,NettyRpcEndpointRef(Spark://aaa@qq.com:59065)),aaa@qq.com)"
    // message:"RpcMessage(192.168.2.89:59605,Heartbeat(driver[Lscala,Tuple2:@33a2cec3,nonullne),aaa@qq.com)"
    // message:"RpcMessage(192.168.2.89:59605,ExecutorRegistered(driver),aaa@qq.com)"
    // message:"RpcMessage(192.168.2.89:59605,BlockManagerHeartbeat(driver(driver,192.168.2.89,59296,none),aaa@qq.com)"
    // message:"RpcMessage(192.168.2.89:59605,UpdateBlockInfo(BlockManagerId(driver,192.168.2.89,59296,none),broadcast_0_piece0,StorageLevel(memory,1 replicas),29064,0),aaa@qq.com)"
    // message:"OneWayMessage(192.168.2.89:59605,StatusUpdate(1,RUNNING,java.nio,HeartByteBuffer[pos=0,lim=0,cap=0]))"
    // message:"RpcMessage(192.168.2.89:59605,StopMapOutTracker,aaa@qq.com)"
    // message:"OneWayMessage(192.168.2.89:59605,StopCoordinator)"
    while (true) {
      // 循环调用RpcEndpoint
      safelyCall(endpoint) {
        //  对poll出来的message做模式匹配,调用对应的处理机制
        message match {
          // message:"RpcMessage(192.168.2.89:59605,ExpireDeadHost,aaa@qq.com)"
          // message:"RpcMessage(192.168.2.89:59605,CheckExistence(HeartbeatReceiver),aaa@qq.com)"
          // message:"RpcMessage(192.168.2.89:59605,RegisterBlockManager(BlockManagerId(driver,192.168.2.89,59296,none),915406848,NettyRpcEndpointRef(Spark://aaa@qq.com:59065)),aaa@qq.com)"
          // message:"RpcMessage(192.168.2.89:59605,Heartbeat(driver[Lscala,Tuple2:@33a2cec3,nonullne),aaa@qq.com)"
          // message:"RpcMessage(192.168.2.89:59605,ExecutorRegistered(driver),aaa@qq.com)"
          // message:"RpcMessage(192.168.2.89:59605,BlockManagerHeartbeat(driver(driver,192.168.2.89,59296,none),aaa@qq.com)"
          // message:"RpcMessage(192.168.2.89:59605,UpdateBlockInfo(BlockManagerId(driver,192.168.2.89,59296,none),broadcast_0_piece0,StorageLevel(memory,1 replicas),29064,0),aaa@qq.com)"
          // message:"RpcMessage(192.168.2.89:59605,StopMapOutTracker,aaa@qq.com)"
          // 匹配到一条普通的Rpc消息
          // 这里说一下,一下所有匹配的消息类型都实现了trait InboxMessage,包括这条RpcMessage
          case RpcMessage(_sender, content, context) =>
            try {
              // 这个方法是接收并返回的双向消息体,是通过sender调用对应的Ref的ask方法触发的
              // 包括在下个章节会提及的blockmanager中的BlockManagerSlaveEndpoint组件在执行
              // RemoveBlock,GetBlockStatus等操作时都是调用receiveAndReply
              // 这里补充一下:receiveAndReply是一个PartialFunction(偏函数),当endpoint调用
              // receiveAndReply时会根据case 到的类型执行对应的操作
              // 如果是Heartbeat==》org.apache.spark.Heartbeat.receiveAndReply()
              // 如果是BlockManagerHeartbeat==》org.apache.spark.storage.BlockManagerMasterEndpoint.receiveAndReply()
              endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
                throw new SparkException(s"Unsupported message $message from ${_sender}")
              })
            } catch {
              case NonFatal(e) =>
                context.sendFailure(e)
                // Throw the exception -- this exception will be caught by the safelyCall function.
                // The endpoint's onError function will be called.
                throw e
            }

          // message:"OneWayMessage(192.168.2.89:59605,StatusUpdate(1,RUNNING,java.nio,HeartByteBuffer[pos=0,lim=0,cap=0]))"
          // 匹配一个单向的消息处理机制
          case OneWayMessage(_sender, content) =>
            // 这就是刚刚说到的单向消息体的具体实现
            // 调用偏函数receive处理一个Ref调用send或者reply发送过过来的消息
            endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
              throw new SparkException(s"Unsupported message $message from ${_sender}")
            })

          // 匹配一个开启endpoint接收消息的方法
          case OnStart =>
            //  在endpoint接收任何消息之前调用,启动它的接收消息功能
            endpoint.onStart()
            //  如果它的实例不是ThreadSafeRpcEndpoint类型就强制关闭
            if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
              inbox.synchronized {
                if (!stopped) {
                  enableConcurrent = true
                }
              }
            }

          // 匹配一个停止endpoint接收消息的方法,当匹配到这个方法后,它的send和ask都不能用了
          case OnStop =>
            val activeThreads = inbox.synchronized { inbox.numActiveThreads }
            // 做个断言
            assert(activeThreads == 1,
              s"There should be only a single active thread but found $activeThreads threads.")
            // 移除掉RpcEndpointRef
            dispatcher.removeRpcEndpointRef(endpoint)
            //  停止接收消息
            endpoint.onStop()
            // 断言是否为空
            assert(isEmpty, "OnStop should be the last message")

          // 匹配到一条告诉所有节点的消息,一个远程进程已连接
          case RemoteProcessConnected(remoteAddress) =>
            endpoint.onConnected(remoteAddress)

          // 匹配到一条告诉所有节点的消息,一个远程进程已断开连接
          case RemoteProcessDisconnected(remoteAddress) =>
            endpoint.onDisconnected(remoteAddress)

          // 匹配到一条告诉所有节点的消息,一个远程进程连接发生错误状态
          case RemoteProcessConnectionError(cause, remoteAddress) =>
            endpoint.onNetworkError(cause, remoteAddress)
        }
      }

      // 这里减少numActiveThreads
      inbox.synchronized {
        // "enableConcurrent" will be set to false after `onStop` is called, so we should check it
        // every time.
        if (!enableConcurrent && numActiveThreads != 1) {
          // If we are not the only one worker, exit
          numActiveThreads -= 1
          return
        }
        message = messages.poll()
        if (message == null) {
          numActiveThreads -= 1
          return
        }
      }
    }
  }

注意这个方法,最先开始的第一个消息永远是onStart()方法,最后一个永远是onStop()方法,所以第一次走

 // 匹配一个开启endpoint接收消息的方法
          case OnStart =>
            //  在endpoint接收任何消息之前调用,启动它的接收消息功能
            endpoint.onStart()
            //  如果它的实例不是ThreadSafeRpcEndpoint类型就强制关闭
            if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
              inbox.synchronized {
                if (!stopped) {
                  enableConcurrent = true
                }
              }
            }

最后一次走onStop()

 // 匹配一个停止endpoint接收消息的方法,当匹配到这个方法后,它的send和ask都不能用了
          case OnStop =>
            val activeThreads = inbox.synchronized { inbox.numActiveThreads }
            // 做个断言
            assert(activeThreads == 1,
              s"There should be only a single active thread but found $activeThreads threads.")
            // 移除掉RpcEndpointRef
            dispatcher.removeRpcEndpointRef(endpoint)
            //  停止接收消息
            endpoint.onStop()
            // 断言是否为空
            assert(isEmpty, "OnStop should be the last message")

然后其他路径看注释,分发到不同的Endpoint.receiveAndReply()方法去处理

这里有个疑问第一个onStart()方法,调用的是谁?我80%的猜想,如果endpoint传入的是master就是master中的onStart()方法,但是调试的时候,第一个总是跑到特质里面,无法跳到继承类里面。

endpoints列表中,有如下信息

  private val endpoints: ConcurrentMap[String, EndpointData] =
    new ConcurrentHashMap[String, EndpointData]

spark学习-75-源代码:Endpoint模型介绍(6)-Endpoint的消息的接收(2)

而且每次多一个,就执行一次onStart方法,匹配到下面

 // 匹配一个开启endpoint接收消息的方法
          case OnStart =>
            //  在endpoint接收任何消息之前调用,启动它的接收消息功能
            endpoint.onStart()
            //  如果它的实例不是ThreadSafeRpcEndpoint类型就强制关闭
            if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
              inbox.synchronized {
                if (!stopped) {
                  enableConcurrent = true
                }
              }
            }

我进截图,里面的类,发现有的有onStart()方法,有的没有,但是都和RpcEndpointRef有关系,而RpcEndpointRef中没有onStart方法,而org.apache.spark.rpc.RpcEndpoint中有

  /**
   * Invoked before [[RpcEndpoint]] starts to handle any message.
    * 开始处理任何消息之前调用[[RpcEndpoint]]。
    *
    * 启动RpcEndpoint处理任何消息
   */
  def onStart(): Unit = {
    // By default, do nothing.
  }

因此两种猜测:
1。7个类中有onStart()就执行,否则就执行RpcEndpoint的onStart()方法
2。全部由RpcEndpoint的onStart()方法执行,什么都不做,仅仅是一个形式