欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop RPC解析

程序员文章站 2022-07-11 23:46:28
...

        RPC(Remote Procedure Call)即是远程过程调用,简单的理解就是调用远程计算机上的服务,就像调用本地服务一样。而不需要了解底层网络技术的协议。RPC采用的是C/S模式;请求部分是一个客户端,而远程服务提供程序就是一个服务器;客户端首先发送一个函数的请求调用到服务器,然后等待服务器上执行并返回响应信息。在服务器端,首先会保持监听状态直到有客户端请求到达服务端为止,之后服务便会执行调用请求的函数,计算结果并发送响应信息给客户端。


RPC内部结构一般如图所示:

Hadoop RPC解析

  1. 客户端程序以本地调用的形式调用本地产生的Stub程序;
  2. 该Stub程序会将该函数的调用按照网络通信封装成对应的网络消息包(主要包含调用的类、函数、参数等)发送到远程的服务端;
  3. 远程服务端接受到此消息后,会解析还原此消息,并调用对应的服务端的Stub程序;
  4. Stub程序会按照被调用的形式调用具体对应的函数以及传递的参数,并将结果返回给Stub程序;
  5. 服务端的Stub程序会将此结果封装成消息,通过网络通信的方式返回给客户端程序;

Hadoop RPC

        Hadoop RPC主要由三个大类组成,其即是RPC、Client类和Server类;其主要结构在hadoop-common包下的ipc.RPC类;其类的基本组织结构与提供的函数如下:

Hadoop RPC解析

Hadoop RPC对外主要提供了两类接口,分别是:

  • public static <T>ProtocolProxy <T> getProxy/waitForProxy() : 构造一个客户端代理对象,用于向服务器发送RPC请求
  • public static Server RPC.Builder (Configuration).build() : 为某个协议实例构造一个服务器对象,用于处理客户端发送的请求

        其中大量的函数waitForProxy()、getProxy()方法是用来获取对应RPC协议的客户端代理;在waitForProxy()函数中,其会采用java中的动态代理模式。首先会根据选择的序列化协议来初始化一个RpcEngine,主要用于序列化封装函数以及传递的参数;之后便会创建一个叫invoker的RpcInvocationHandler,里面包含了本次连接的remoteId、客户端client、调用协议protocolName等,并且重写了对应的invoke()方法,在invoke()方法中主要是将调用的method方法、调用协议protocolName以及客户端协议版本clientProtocolVersion等封装在rpcRequestHeader中,最终会将对应封装的请求头rpcRequestHeader以及对应的传递args封装成RpcMessageWithHeader;并通过client.call()发送到服务端。在客户端创建好了invocationHandler后,会通过Proxy.newProxyInstance()创建对应的代理类实例,最后在客户端根据生成的代理类实例,即可调用对应的方法。

1、首先在waitForProxy()中其会先根据conf配置文件来使用相应的Rpc引擎来选择不同的序列化方式,其主要是使用RpcKind来标识当前RPC框架使用的引擎,其主要包含WritableRpcEngine和ProtobufRpcEngine;其获取方法getProtocolEngine(protocol, conf).getProxy()如下;我们主要分析ProtobufRpcEngine的方式:

  // return the RpcEngine configured to handle a protocol
  static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
      Configuration conf) {
    RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
    if (engine == null) {
      Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
                                    WritableRpcEngine.class);
      engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
      PROTOCOL_ENGINES.put(protocol, engine);
    }
    return engine;
  }

  // ProtobufRpcEngine
  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
      SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
      AtomicBoolean fallbackToSimpleAuth) throws IOException {

    final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
        rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
    return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
        protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
  }

可以看到选择ProtobufRpcEngine序列化的方式,其会构造对应的代理类ProtocolProxy来代理实际的调用者Invoker类;在Invoker类中封装了使用对应序列化方式来序列化客户端请求参数,并通过客户端client,通过socket网络通信来进行序列化参数的发送至远程的服务响应端;Invoker类内部的重要部分如下:

private static class Invoker implements RpcInvocationHandler {
    private final Client.ConnectionId remoteId;
    private final Client client;
    private final long clientProtocolVersion;
    private final String protocolName;
 
    @Override
    public Object invoke(Object proxy, Method method, Object[] args)
        throws ServiceException {
      // .........

      // 使用对应的序列化方式来序列化请求头,包含调用函数方法,接口协议,协议版本等
      RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
      
      // 封装的请求参数
      Message theRequest = (Message) args[1];
      final RpcResponseWrapper val;
      try {
        // 使用client.call方式发送该请求
        val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
            new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
            fallbackToSimpleAuth);

      } catch (Throwable e) {
        // .........
      } finally {
        if (traceScope != null) traceScope.close();
      }
      // .........
    }
}

2、在RPC.Builder类中,其是RPC Server的一个构造者对象,可以通过RPC.Builder.build()方法快速构建一个RPC服务端对象。其基本的构建服务端代码如下:

// BindAddress : 服务器的host地址
// Port : 监听端口号,0代表系统随机选择一个端口号
// NumHandlers : 服务端处理请求的线程数
Server server = new RPC.Builder(conf).setProtocol(ClientProtocol.class)
    .setInstance(new ClientProtocolImpl()).setBindAddress(ADDRESS).setPort(0)
    .setNumHandlers(5).build();
server.start();

接下来分别介绍一下RPC.Client和RPC.Server端的请求发送、与请求响应的详细过程;

 

RPC Client

       Client类的主要功能就是发送请求和接受响应信息;该Client类只有一个方法入口call()方法。通过waitForProxy()构造的客户端请求代理会调用Client.call()方法将RPC请求发送到远程的服务器上,然后等待远程服务器的响应信息。Client.call()方法发送请求和接受响应的流程如下:

Hadoop RPC解析

  • Client.call()方法用于将RPC请求封装成一个Call对象,Call对象中保存了RPC调用的信息;并且在Call方法中会创建一个用于管理Client和Server端之间的Socket连接的Connection对象;
  • 客户端会使用Hashtable<ConnectionId, Connection> connections;来管理和缓存复用对应的socket连接;(与Server端建立Socket连接,会比较消耗资源);
  • Client.call()会调用Connection.setupIOstreams()方法建立Client与Server之间的socket连接,并且会启动Connection线程,其会监听socket并读取server端发回的响应信息;
  • Client.call()方法最终会调用Connection.sendRpcRequest()方法来进行RPC请求的发送;
  • Client.call()方法会调用Call.wait()方法在Call对象上进行等待,等待Server端返回响应信息;
  • Connection线程收到Server端的响应信息后,会设置对应Call对象的返回值字段,并调用Call.notify()唤醒Client.call()方法所在的调用线程读取Call对象的返回值;

Client.call()方法具体的执行源码如下:

  public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
      ConnectionId remoteId, int serviceClass,
      AtomicBoolean fallbackToSimpleAuth) throws IOException {
    // 根据序列化引擎和请求构建对应的Call对象
    final Call call = createCall(rpcKind, rpcRequest);
    // 根据Call对象以及服务端地址remoteId中,进行socket连接
    // connections连接缓存复用,并启动对应的Connection线程
    Connection connection = getConnection(remoteId, call, serviceClass,
      fallbackToSimpleAuth);
    try {
      // 将远程rpc调用信息发送给server端
      connection.sendRpcRequest(call);                 // send the rpc request
    } catch (RejectedExecutionException e) {
      throw new IOException("connection has been closed", e);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      LOG.warn("interrupted waiting to send rpc request to server", e);
      throw new IOException(e);
    }

    boolean interrupted = false;
    synchronized (call) {
      // 判断call是否完成,否则等待server端通过connection进行对应call.notify()
      while (!call.done) {
        try {
          // 当前线程等待阻塞
          // 直到Connection线程中接受到receiveRpcResponse的响应调用call.notify
          call.wait();                           // wait for the result
        } catch (InterruptedException ie) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }

      if (interrupted) {
        // set the interrupt flag now that we are done waiting
        Thread.currentThread().interrupt();
      }

      if (call.error != null) {
        if (call.error instanceof RemoteException) {
          call.error.fillInStackTrace();
          throw call.error;
        } else { // local exception
          InetSocketAddress address = connection.getRemoteAddress();
          throw NetUtils.wrapException(address.getHostName(),
                  address.getPort(),
                  NetUtils.getHostname(),
                  0,
                  call.error);
        }
      } else {
        // 返回server端rpc响应的结果
        return call.getRpcResponse();
      }
    }
  }
/** Get a connection from the pool, or create a new one and add it to the
   * pool.  Connections to a given ConnectionId are reused. */
  private Connection getConnection(ConnectionId remoteId,
      Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
      throws IOException {
    if (!running.get()) {
      // the client is stopped
      throw new IOException("The client is stopped");
    }
    Connection connection;
    /* we could avoid this allocation for each RPC by having a  
     * connectionsId object and with set() method. We need to manage the
     * refs for keys in HashMap properly. For now its ok.
     */
    do {
      synchronized (connections) {
        connection = connections.get(remoteId);
        if (connection == null) {
          connection = new Connection(remoteId, serviceClass);
          connections.put(remoteId, connection);
        }
      }
    } while (!connection.addCall(call));
    
    //we don't invoke the method below inside "synchronized (connections)"
    //block above. The reason for that is if the server happens to be slow,
    //it will take longer to establish a connection and that will slow the
    //entire system down.
    // 此处会调用Thread.start()方法启动connection线程
    connection.setupIOstreams(fallbackToSimpleAuth);
    return connection;
  }
// connection线程执行比较简单
@Override
public void run() {
  try {
	while (waitForWork()) {//wait here for work - read or close connection
	  receiveRpcResponse();
	}
  } catch (Throwable t) {
  // ..........
}

private void receiveRpcResponse() {
    // .........
    // 从对应的connection.calls对象中取出对应的rpc请求call
	Call call = calls.get(callId);
	
    // 根据成功与否,将最终的结果保存在call中,并调用call.notify()通知客户端的调用等待线程
    // setRpcResponse、setException等都会调用对应的call.notify()
	if (status == RpcStatusProto.SUCCESS) {
	  Writable value = ReflectionUtils.newInstance(valueClass, conf);
	  value.readFields(in);                 // read value
	  calls.remove(callId);
	  call.setRpcResponse(value);
	} else { // Rpc Request failed
	  if (status == RpcStatusProto.ERROR) {
		calls.remove(callId);
		call.setException(re);
	  } else if (status == RpcStatusProto.FATAL) {
		// Close the connection
		markClosed(re);
	  }
	}
  } catch (IOException e) {
	markClosed(e);
  }
}

 

RPC Server

        Server类即RPC的服务端。Hadoop Server为了保证高性能采用了很多提高并发处理能力的技术,主要包括线程池、事件驱动和Reactor设计模式等;

典型的Reactor设计模式中主要包括以下几个角色 : 

  • Reactor:I/O事件的派发者
  • Acceptor:接受来自Client的连接,建立与Client对应的Handler,并向Reactor注册此Handler
  • Handler:与一个Client通信的实体,并按一定的过程实现业务的处理
  • Reader/Sender:为了加速处理速度,Reactor模式往往构建一个存放数据处理线程的线程池,这样数据读出后,立即扔到线程吃中等待后续处理即可。为此,Reactor模式一般分离Handler中的读和写两个过程,分别注册成单独的读事件和写事件,并由对应的Reader和Sender线程处理

在Hadoop RPC Server的类结构设计中,其也是一个典型的Reactor设计模式:

Hadoop RPC解析

  • Listener:整个Server只有一个Listener线程,用来监听来自客户端的socket请求,Listener对象中定义了一个Selector对象,其负责监听SelectionKey.OP_ACCEPT事件。等待客户端Client.call()中的getConnection建立socket连接来触发该事件唤醒Listener线程,Listener线程会调用ServerSocketChannel.accept()创建一个新的SocketChannel;
  • Listener会轮询的从readers线程池中选取一个线程,并在Reader的readerSelector上注册OP_READ事件;
  • 当客户端Client发送RPC请求时,其会触发Reader的readerSelector并唤醒Reader线程;
  • Reader线程从SocketChannel中读取数据并封装成Call对象,然后放入共享队列callQueue中;
  • handlers线程池中的handler线程都通过BlockingQueue.take()阻塞队列方法在callQueue上阻塞,当有Call对象被放入callQueue中后,其中一个Handler线程被唤醒。然后根据Call对象上的信息,调用Server.call()方法,随后会尝试将响应信息写入SocketChannel;
  • 当响应结果无法完全写入SocketChannel时,将会在Responder线程的respondSelector上注册OP_WRITE事件,当监听到可写时,会唤醒Responder继续写响应;

接下来一步步来看其RPC.builder.build();初始化Listener(listener构造初始化时构造Reader线程池)线程、Responder线程等;server.start();启动rpc server端对应的listener、responder、handlers线程池的详细过程:

Listener类:首先Listener类构造初始化时,会建立socket并绑定监听相关的地址端口后,创建内部的Readers线程池组,并在当前listener上打开Selector,并在channel上注册对SelectionKey.OP_ACCEPT的监听事件。当Server创建Listener并调用start方法启动listener线程后,Listener线程会执行run方法,并循环监听通道上的OP_ACCEPT事件来判断是否有新的连接请求,如果有则调用doAccept()方法来处理;

  private class Listener extends Thread {
    private ServerSocketChannel acceptChannel = null; //the accept channel
    private Selector selector = null; //the selector that we use for the server
    private Reader[] readers = null; //Readers线程池组
   
    public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);

      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();
      readers = new Reader[readThreads];
      for (int i = 0; i < readThreads; i++) {
        Reader reader = new Reader(
            "Socket Reader #" + (i + 1) + " for port " + port);
        readers[i] = reader;
        reader.start();
      }

      // Register accepts on the server socket with the selector.
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }
 
    @Override
    public void run() {
      LOG.info(Thread.currentThread().getName() + ": starting");
      SERVER.set(Server.this);
      connectionManager.startIdleScan();
      while (running) {
        SelectionKey key = null;
        try {
          getSelector().select();
          Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {
                // 当有新的连接请求到来时,调用doAccept方法响应该连接请求
                if (key.isAcceptable())
                  doAccept(key);
              }
            } catch (IOException e) {
            }
            key = null;
          }
        } catch (OutOfMemoryError e) {
        // .........
    }
  }

其中doAccept()方法会接受来自客户端的socket连接请求并初始化socket连接。之后doAccept()会从readers线程池组中轮询的选择出一个Reader线程来读取这个客户端的rpc请求。通过reader.addConnection(c)将这个Connection对象添加到Reader对象所维护的一个待连接处理队列pendingConnections中,并通过readSelector.wakeup();唤醒阻塞在reader上的readSelector.select()。此后,这个channel上的读与写任务将一直固定由这个分派给自己的Reader直接负责,而不会被其它Reader线程处理。

    void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) {

        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(tcpNoDelay);
        channel.socket().setKeepAlive(true);

        Reader reader = getReader();
        Connection c = connectionManager.register(channel);
        key.attach(c);  // so closeCurrentConnection can get the object
        reader.addConnection(c);
      }
    }

Reader类:reader类有自己的readSelector对象。Reader线程的主循环是在执行doRunLoop()方法;当reader.addConnection(c)有连接添加到reader上并唤醒readSelector.select()后;其会在这个reader的readSelector对象上注册监听SelectionKey.OP_READ事件;之后便会在readSelector上等待可读事件,也就是等待客户端的rpc请求到达,之后便读取该请求并用一个call对象进行封装,最后放入在callQueue中等待handler线程处理;

  private synchronized void doRunLoop() {
	while (running) {
	  SelectionKey key = null;
	  try {
		// consume as many connections as currently queued to avoid
		// unbridled acceptance of connections that starves the select
		int size = pendingConnections.size();
		for (int i=size; i>0; i--) {
		  Connection conn = pendingConnections.take();
		  conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
		}
		readSelector.select(); // 阻塞等待 有请求到来or被唤醒

		Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
		while (iter.hasNext()) {
		  key = iter.next();
		  iter.remove();
		  if (key.isValid()) {
            // 有可读事件,调用doRead()方法处理
			if (key.isReadable()) {
			  doRead(key);
			}
		  }
		  key = null;
		}
	  } catch (InterruptedException e) {
		if (running) {                      // unexpected -- log it
		  LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
		}
	  } catch (IOException ex) {
		LOG.error("Error in Reader", ex);
	  }
	}
  }

doRead()函数调用栈如下:

  • doRead()->connection.readAndProcess()->processOneRpc()->processRpcRequest()方法

doRead()函数会使用connection对象来进行读取处理,connection对象维护了Server和Client之间的socket连接。reader线程会调用readAndProcess()方法从IO流中读取一个RPC请求,首先其会先从socket流中读取连接头connectionhead,然后读取一个完整的RPC请求,最后会调用processOneRpc()来处理该请求。processOneRpc()会读取出rpc请求头域,然后调用processRpcRequest()处理rpc请求体。

   private void processRpcRequest(RpcRequestHeaderProto header,
        DataInputStream dis) throws WrappedRpcServerException,
        InterruptedException {
      // .........
      // 读取rpc请求体
      Writable rpcRequest;
      try { //Read the rpc request
        rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
        rpcRequest.readFields(dis);
      } catch (Throwable t) { // includes runtime exception from newInstance
         // ......
      }

      Span traceSpan = null;
      if (header.hasTraceInfo()) {
        // If the incoming RPC included tracing info, always continue the trace
        TraceInfo parentSpan = new TraceInfo(header.getTraceInfo().getTraceId(),
                                             header.getTraceInfo().getParentId());
        traceSpan = Trace.startSpan(rpcRequest.toString(), parentSpan).detach();
      }

      // 构造call对象封装rpc请求信息
      Call call = new Call(header.getCallId(), header.getRetryCount(),
          rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
          header.getClientId().toByteArray(), traceSpan);
      // 将call对象放入callQueue中,等待handler处理
      callQueue.put(call);              // queue the call; maybe blocked here
      incRpcCount();  // Increment the rpc count
    }

Handler类:是实际处理请求的线程类,负责执行RPC请求对应的本地函数,然后将结果发回客户端。在Server类中会同时存在多个Handler线程,它们并行的从共享队列callqueue中取出待处理的Call对象,然后调用Server.call()方法执行RPC调用对应的本地函数;之后handler会调用setupReponse()方法构造RPC应答结果,并通过reponse.doRespond()将响应结果返回给客户端。

  /** Handles queued calls . */
  private class Handler extends Thread { 
    @Override
    public void run() {
      while (running) {
        try {
          // 从callQueue当中读取请求
          final Call call = callQueue.take(); // pop the queue; maybe blocked here
          // .........
          try {
            // Make the call as the user via Subject.doAs, thus associating
            // the call with the Subject
            // 调用本地的call()方法执行RPC调用对应的本地函数
            if (call.connection.user == null) {
              value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
                           call.timestamp);
            } else {
              value =
                call.connection.user.doAs
                  (new PrivilegedExceptionAction<Writable>() {
                     @Override
                     public Writable run() throws Exception {
                       // make the call
                       return call(call.rpcKind, call.connection.protocolName,
                                   call.rpcRequest, call.timestamp);

                     }
                   }
                  );
            }
          } catch (Throwable e) {
             // .........
          }
          CurCall.set(null);
          synchronized (call.connection.responseQueue) {
            // setupResponse() needs to be sync'ed together with
            // responder.doResponse() since setupResponse may use
            // SASL to encrypt response data and SASL enforces
            // its own message ordering.
            // 构造rpc响应结果信息,包含正常响应与异常响应
            setupResponse(buf, call, returnStatus, detailedErr,
                value, errorClass, error);

            // Discard the large buf and reset it back to smaller size
            // to free up heap
            // 调用responder.doRespond()返回响应
            responder.doRespond(call);
          }
        } catch (InterruptedException e) {
        // .........
  }

Responder类:也是一个线程类,server端仅有一个Responder对象,其内部包含一个writeSelector对象用于监听channel中的SelectionKey.OP_WRITE事件。此时当Responder线程循环执行doRunLoop()时,其会阻塞等待在writeSelector.select(PURGE_INTERVAL)上,等待通道可写之后,便会相应的执行doRunLoop()->doAsyncWrite()->processResponse()方法来继续执行剩余的响应写操作。

    // Processes one response. Returns true if there are no more pending
    // data for this channel.
    //
    private boolean processResponse(LinkedList<Call> responseQueue,
                                    boolean inHandler) throws IOException {
      Call call = null;
      try {
        synchronized (responseQueue) {
          // 先进先出LinkedList,从respondeQueue中取出第一个Call对象进行处理
          call = responseQueue.removeFirst();
          SocketChannel channel = call.connection.channel;
         
          //
          // Send as much data as we can in the non-blocking fashion
          //
          // 将call.rpcResponse中的数据写入到channel中
          int numBytes = channelWrite(channel, call.rpcResponse);
          if (numBytes < 0) {
            return true;
          }
          if (!call.rpcResponse.hasRemaining()) { // 数据已经写入完毕,清理工作
            //Clear out the response buffer so it can be collected
            call.rpcResponse = null;
            call.connection.decRpcCount();
            if (numElements == 1) {    // last call fully processes.
              done = true;             // no more data for this channel.
            } else {
              done = false;            // more calls pending to be sent.
            }
            if (LOG.isDebugEnabled()) {
              LOG.debug(Thread.currentThread().getName() + ": responding to " + call
                  + " Wrote " + numBytes + " bytes.");
            }
          } else {
            //
            // If we were unable to write the entire response out, then
            // insert in Selector queue.
            //
            // 如果数据没有写完,则把Call对象重新加入responseQueue中,下次会继续发送剩余的数据
            call.connection.responseQueue.addFirst(call);

            if (inHandler) {
              // 如果是inHandler中调用,则需要唤醒writeSelector,并将channel注册到writeSelector上
              // 这样就可以在Responder.doRunLoop()中检测到可写的SocketChannel,然后继续发送剩余数据至客户端
              // set the serve time when the response has to be sent later
              call.timestamp = Time.now();

              incPending();
              try {
                // Wakeup the thread blocked on select, only then can the call
                // to channel.register() complete.
                writeSelector.wakeup();
                channel.register(writeSelector, SelectionKey.OP_WRITE, call);
              } catch (ClosedChannelException e) {
                //Its ok. channel might be closed else where.
                done = true;
              } finally {
                decPending();
              }
            }
           
          }
        }
      } finally {
         // .........
      }
      return done;
    }

从源码中可以看到,当Handler将call对象加入到当前Connection的responseQueue中时,会判断是否只有当前一个call对象需要返回响应,则将直接在handler线程中调用processResponse(),如果没能将结果一次性返回给客户端时,会在对应的writeSelector上注册SelectionKey.OP_WRITE事件,从而能够使Responder线程采用异步的方式来继续发送未发送完成的结果。