欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark学习-71-源代码:Endpoint模型介绍(3)-Endpoint Send&Ask流程

程序员文章站 2022-07-15 10:09:26
...

1。Endpoint Send&Ask流程
Endpoint的消息发送与请求流程,如下:
spark学习-71-源代码:Endpoint模型介绍(3)-Endpoint Send&Ask流程

从图上看send方法最先调用,我们来看看什么时候调用,下面是worker的调用语句,但是这里我们主要走的是master的send语句

 workerRef.send(MasterInStandby)

send代码如下,这里主要处理是发给自己的还是发给别人的消息

/**
    * 我们来看下send这个经典的发送消息的方法,里面封装了不同类型消息体之间的通信的不同实现
    */
  private[netty] def send(message: RequestMessage): Unit = {
    // 拿到需要发送的endpoint地址
    val remoteAddr = message.receiver.address
    // 判断是否是远程地址,如果remoteAddr == address远程地址和自己所在的地址是一个地址,比如,我自己是address=192.168.10.12,要发送的地址
    // 是remoteAddr=192.268.10.12,这说明我是给自己发送消息,所以直接放到自己的邮箱里就可以了,不需要跑一圈
    if (remoteAddr == address) {
      // Message to a local RPC endpoint.
      try {
        // 如果消息接受者在本地就调用dispatcher来发送消息 消息放到自己的收件箱里
        // message的格式:
        // message:"RpcMessage(192.168.2.89:53298,TrackSchedulerIsSet,aaa@qq.com)"
        dispatcher.postOneWayMessage(message)
      } catch {
        case e: RpcEnvStoppedException => logWarning(e.getMessage)
      }
    } else {
      // 否则就是我自己是address=192.168.10.12,要发送的地址
      // 是remoteAddr=192.268.10.56,这说明我是给别人发送消息,所以需要放到自己的发件箱里就可以了,然后被系统发送出去,别人来接收

      // Message to a remote RPC endpoint.
      // 如果消息接受者在远程节点就发送到对应节点的outbox
      // message.serialize(this) 这里消息要序列化,因为是要放到自己的发件箱,然后发送到别的机器上,要走网络的,因此要序列化
      // message.receiver==>NettyRpcEndpointRef==》比如workerRef
      // message的格式:
      // message:"RpcMessage(192.168.2.89:53298,TrackSchedulerIsSet,aaa@qq.com)"
      postToOutbox(message.receiver, OneWayOutboxMessage(message.serialize(this)))
    }
  }

这里先看发给自己的

 dispatcher.postOneWayMessage(message)
 /** Posts a one-way message. 单向发送消息*/
  def postOneWayMessage(message: RequestMessage): Unit = {
    postMessage(message.receiver.name, OneWayMessage(message.senderAddress, message.content),
      (e) => throw e)
  }

最终放到了自己的收件箱(相当于QQ发邮件给自己)

/**
   * Posts a message to a specific endpoint.
    * 将消息发送到特定的endpoint。
   *
   * @param endpointName name of the endpoint.
   * @param message the message to post
   * @param callbackIfStopped callback function if the endpoint is stopped.
   */
  private def postMessage(
      endpointName: String,
      message: InboxMessage,
      callbackIfStopped: (Exception) => Unit): Unit = {
    val error = synchronized {
      // 拿到对应的EndpointData,从收件箱拿出第一条消息onStart
      val data = endpoints.get(endpointName)
      // 判断dispatcher是否停止了,如果停止了 信息肯定发不出去了
      if (stopped) {
        // 抛出异常
        Some(new RpcEnvStoppedException())
      } else if (data == null) {
        // 如果data为空,那就是endpoint没有注册
        Some(new SparkException(s"Could not find $endpointName."))
      } else {
        // 调用inbox对象把massage加入到java.util.LinkedList[InboxMessage]消息队列链表中 放到自己的收件箱里
        data.inbox.post(message)
        // 把EndpointData加入到receivers链表中等待被消费
        receivers.offer(data)
        None
      }
    }
    // We don't need to call `onStop` in the `synchronized` block
    error.foreach(callbackIfStopped)
  }

然后看看发给别人的

 // 否则就是我自己是address=192.168.10.12,要发送的地址
      // 是remoteAddr=192.268.10.56,这说明我是给别人发送消息,所以需要放到自己的发件箱里就可以了,然后被系统发送出去,别人来接收

      // Message to a remote RPC endpoint.
      // 如果消息接受者在远程节点就发送到对应节点的outbox
      // message.serialize(this) 这里消息要序列化,因为是要放到自己的发件箱,然后发送到别的机器上,要走网络的,因此要序列化
      // message.receiver==>NettyRpcEndpointRef==》比如workerRef
      // message的格式:
      // message:"RpcMessage(192.168.2.89:53298,TrackSchedulerIsSet,aaa@qq.com)"
      postToOutbox(message.receiver, OneWayOutboxMessage(message.serialize(this)))

主要把信息放到自己的发件箱里等待被 邮差取走后送给别人

  private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
    // 如果接收端的TransportClient启动了 就直接调用sendWith
    // 调用sendWith核心方法
    // 提醒一下:这里所有的outbox里提取出的消息体都是实现了trait OutboxMessage
    // 所以不同类型的message调用的sendWith实现也不同
    // 也是分为单向和双向消息体
    // message.receiver(receiver.client)==>NettyRpcEndpointRef==》比如workerRef
    // message的格式:
    // message:"RpcMessage(192.168.2.89:53298,TrackSchedulerIsSet,aaa@qq.com)"
    if (receiver.client != null) {
      // 如果接受信息的客户端存在,直接发送信息过去
      message.sendWith(receiver.client)
    } else {
      // 如果接收端没有启动TransportClient就会先查询下是否包含接收者地址
      require(receiver.address != null,
        "Cannot send message to client endpoint with no listen address.")
      val targetOutbox = {
        // 通过Rpc地址从outboxes拿到接收者地址的对应的outbox
        // 数据结构:Java的ConcurrentHashMap[RpcAddress, Outbox],根据要发送的远程地址获取本地发件箱
        // 有点拗口:我的本地地址是                                                       远程地址
        //        192.168.10.82                                                      192.168.10.83
        //          inbox(82的收件箱) <------------不断的从 右边 取消息放到 左边-------------- outBox(83的发件箱)
        //
        //
        //          outBox(82的发件箱) ------------不断的从 左边 取消息放到 右边--------------> inbox(83的收件箱)
        val outbox = outboxes.get(receiver.address)
        if (outbox == null) {
          // 如果该地址对应的outbox不存在就构建一个 (如果发件箱不存在,就创建一个)
          val newOutbox = new Outbox(this, receiver.address)
          // 并加入到outboxes里面
          val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox)
          if (oldOutbox == null) {
            // 若为空就直接引用刚生成的newOutbox
            newOutbox
          } else {
            // 返回老的
            oldOutbox
          }
        } else {
          // 返回
          outbox
        }
      }

      // 判断NettyRpcEnv是否停止了
      if (stopped.get) {
        // It's possible that we put `targetOutbox` after stopping. So we need to clean it.
        outboxes.remove(receiver.address)
        // 关闭发件箱
        targetOutbox.stop()
      } else {
        // 最后生成的outbox对象会根据不同的状态执行send中不同的实现
        // 包括可能也会走drainOutbox方法(里面包含在接收者端启动一个TransportClient)
        // 把message添加到自己的消息队列里 ,加入 java.util.LinkedList[OutboxMessage] 的队列中,等待被线程消费
        targetOutbox.send(message)
      }
    }
  }