Hadoop RPC解析
RPC(Remote Procedure Call)即是远程过程调用,简单的理解就是调用远程计算机上的服务,就像调用本地服务一样。而不需要了解底层网络技术的协议。RPC采用的是C/S模式;请求部分是一个客户端,而远程服务提供程序就是一个服务器;客户端首先发送一个函数的请求调用到服务器,然后等待服务器上执行并返回响应信息。在服务器端,首先会保持监听状态直到有客户端请求到达服务端为止,之后服务便会执行调用请求的函数,计算结果并发送响应信息给客户端。
RPC内部结构一般如图所示:
- 客户端程序以本地调用的形式调用本地产生的Stub程序;
- 该Stub程序会将该函数的调用按照网络通信封装成对应的网络消息包(主要包含调用的类、函数、参数等)发送到远程的服务端;
- 远程服务端接受到此消息后,会解析还原此消息,并调用对应的服务端的Stub程序;
- Stub程序会按照被调用的形式调用具体对应的函数以及传递的参数,并将结果返回给Stub程序;
- 服务端的Stub程序会将此结果封装成消息,通过网络通信的方式返回给客户端程序;
Hadoop RPC
Hadoop RPC主要由三个大类组成,其即是RPC、Client类和Server类;其主要结构在hadoop-common包下的ipc.RPC类;其类的基本组织结构与提供的函数如下:
Hadoop RPC对外主要提供了两类接口,分别是:
- public static <T>ProtocolProxy <T> getProxy/waitForProxy() : 构造一个客户端代理对象,用于向服务器发送RPC请求
- public static Server RPC.Builder (Configuration).build() : 为某个协议实例构造一个服务器对象,用于处理客户端发送的请求
其中大量的函数waitForProxy()、getProxy()方法是用来获取对应RPC协议的客户端代理;在waitForProxy()函数中,其会采用java中的动态代理模式。首先会根据选择的序列化协议来初始化一个RpcEngine,主要用于序列化封装函数以及传递的参数;之后便会创建一个叫invoker的RpcInvocationHandler,里面包含了本次连接的remoteId、客户端client、调用协议protocolName等,并且重写了对应的invoke()方法,在invoke()方法中主要是将调用的method方法、调用协议protocolName以及客户端协议版本clientProtocolVersion等封装在rpcRequestHeader中,最终会将对应封装的请求头rpcRequestHeader以及对应的传递args封装成RpcMessageWithHeader;并通过client.call()发送到服务端。在客户端创建好了invocationHandler后,会通过Proxy.newProxyInstance()创建对应的代理类实例,最后在客户端根据生成的代理类实例,即可调用对应的方法。
1、首先在waitForProxy()中其会先根据conf配置文件来使用相应的Rpc引擎来选择不同的序列化方式,其主要是使用RpcKind来标识当前RPC框架使用的引擎,其主要包含WritableRpcEngine和ProtobufRpcEngine;其获取方法getProtocolEngine(protocol, conf).getProxy()如下;我们主要分析ProtobufRpcEngine的方式:
// return the RpcEngine configured to handle a protocol
static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
Configuration conf) {
RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) {
Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
WritableRpcEngine.class);
engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
PROTOCOL_ENGINES.put(protocol, engine);
}
return engine;
}
// ProtobufRpcEngine
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
}
可以看到选择ProtobufRpcEngine序列化的方式,其会构造对应的代理类ProtocolProxy来代理实际的调用者Invoker类;在Invoker类中封装了使用对应序列化方式来序列化客户端请求参数,并通过客户端client,通过socket网络通信来进行序列化参数的发送至远程的服务响应端;Invoker类内部的重要部分如下:
private static class Invoker implements RpcInvocationHandler {
private final Client.ConnectionId remoteId;
private final Client client;
private final long clientProtocolVersion;
private final String protocolName;
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws ServiceException {
// .........
// 使用对应的序列化方式来序列化请求头,包含调用函数方法,接口协议,协议版本等
RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
// 封装的请求参数
Message theRequest = (Message) args[1];
final RpcResponseWrapper val;
try {
// 使用client.call方式发送该请求
val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
fallbackToSimpleAuth);
} catch (Throwable e) {
// .........
} finally {
if (traceScope != null) traceScope.close();
}
// .........
}
}
2、在RPC.Builder类中,其是RPC Server的一个构造者对象,可以通过RPC.Builder.build()方法快速构建一个RPC服务端对象。其基本的构建服务端代码如下:
// BindAddress : 服务器的host地址
// Port : 监听端口号,0代表系统随机选择一个端口号
// NumHandlers : 服务端处理请求的线程数
Server server = new RPC.Builder(conf).setProtocol(ClientProtocol.class)
.setInstance(new ClientProtocolImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(5).build();
server.start();
接下来分别介绍一下RPC.Client和RPC.Server端的请求发送、与请求响应的详细过程;
RPC Client
Client类的主要功能就是发送请求和接受响应信息;该Client类只有一个方法入口call()方法。通过waitForProxy()构造的客户端请求代理会调用Client.call()方法将RPC请求发送到远程的服务器上,然后等待远程服务器的响应信息。Client.call()方法发送请求和接受响应的流程如下:
- Client.call()方法用于将RPC请求封装成一个Call对象,Call对象中保存了RPC调用的信息;并且在Call方法中会创建一个用于管理Client和Server端之间的Socket连接的Connection对象;
- 客户端会使用Hashtable<ConnectionId, Connection> connections;来管理和缓存复用对应的socket连接;(与Server端建立Socket连接,会比较消耗资源);
- Client.call()会调用Connection.setupIOstreams()方法建立Client与Server之间的socket连接,并且会启动Connection线程,其会监听socket并读取server端发回的响应信息;
- Client.call()方法最终会调用Connection.sendRpcRequest()方法来进行RPC请求的发送;
- Client.call()方法会调用Call.wait()方法在Call对象上进行等待,等待Server端返回响应信息;
- Connection线程收到Server端的响应信息后,会设置对应Call对象的返回值字段,并调用Call.notify()唤醒Client.call()方法所在的调用线程读取Call对象的返回值;
Client.call()方法具体的执行源码如下:
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
// 根据序列化引擎和请求构建对应的Call对象
final Call call = createCall(rpcKind, rpcRequest);
// 根据Call对象以及服务端地址remoteId中,进行socket连接
// connections连接缓存复用,并启动对应的Connection线程
Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth);
try {
// 将远程rpc调用信息发送给server端
connection.sendRpcRequest(call); // send the rpc request
} catch (RejectedExecutionException e) {
throw new IOException("connection has been closed", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("interrupted waiting to send rpc request to server", e);
throw new IOException(e);
}
boolean interrupted = false;
synchronized (call) {
// 判断call是否完成,否则等待server端通过connection进行对应call.notify()
while (!call.done) {
try {
// 当前线程等待阻塞
// 直到Connection线程中接受到receiveRpcResponse的响应调用call.notify
call.wait(); // wait for the result
} catch (InterruptedException ie) {
// save the fact that we were interrupted
interrupted = true;
}
}
if (interrupted) {
// set the interrupt flag now that we are done waiting
Thread.currentThread().interrupt();
}
if (call.error != null) {
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
InetSocketAddress address = connection.getRemoteAddress();
throw NetUtils.wrapException(address.getHostName(),
address.getPort(),
NetUtils.getHostname(),
0,
call.error);
}
} else {
// 返回server端rpc响应的结果
return call.getRpcResponse();
}
}
}
/** Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given ConnectionId are reused. */
private Connection getConnection(ConnectionId remoteId,
Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
if (!running.get()) {
// the client is stopped
throw new IOException("The client is stopped");
}
Connection connection;
/* we could avoid this allocation for each RPC by having a
* connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok.
*/
do {
synchronized (connections) {
connection = connections.get(remoteId);
if (connection == null) {
connection = new Connection(remoteId, serviceClass);
connections.put(remoteId, connection);
}
}
} while (!connection.addCall(call));
//we don't invoke the method below inside "synchronized (connections)"
//block above. The reason for that is if the server happens to be slow,
//it will take longer to establish a connection and that will slow the
//entire system down.
// 此处会调用Thread.start()方法启动connection线程
connection.setupIOstreams(fallbackToSimpleAuth);
return connection;
}
// connection线程执行比较简单
@Override
public void run() {
try {
while (waitForWork()) {//wait here for work - read or close connection
receiveRpcResponse();
}
} catch (Throwable t) {
// ..........
}
private void receiveRpcResponse() {
// .........
// 从对应的connection.calls对象中取出对应的rpc请求call
Call call = calls.get(callId);
// 根据成功与否,将最终的结果保存在call中,并调用call.notify()通知客户端的调用等待线程
// setRpcResponse、setException等都会调用对应的call.notify()
if (status == RpcStatusProto.SUCCESS) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
calls.remove(callId);
call.setRpcResponse(value);
} else { // Rpc Request failed
if (status == RpcStatusProto.ERROR) {
calls.remove(callId);
call.setException(re);
} else if (status == RpcStatusProto.FATAL) {
// Close the connection
markClosed(re);
}
}
} catch (IOException e) {
markClosed(e);
}
}
RPC Server
Server类即RPC的服务端。Hadoop Server为了保证高性能采用了很多提高并发处理能力的技术,主要包括线程池、事件驱动和Reactor设计模式等;
典型的Reactor设计模式中主要包括以下几个角色 :
- Reactor:I/O事件的派发者
- Acceptor:接受来自Client的连接,建立与Client对应的Handler,并向Reactor注册此Handler
- Handler:与一个Client通信的实体,并按一定的过程实现业务的处理
- Reader/Sender:为了加速处理速度,Reactor模式往往构建一个存放数据处理线程的线程池,这样数据读出后,立即扔到线程吃中等待后续处理即可。为此,Reactor模式一般分离Handler中的读和写两个过程,分别注册成单独的读事件和写事件,并由对应的Reader和Sender线程处理
在Hadoop RPC Server的类结构设计中,其也是一个典型的Reactor设计模式:
- Listener:整个Server只有一个Listener线程,用来监听来自客户端的socket请求,Listener对象中定义了一个Selector对象,其负责监听SelectionKey.OP_ACCEPT事件。等待客户端Client.call()中的getConnection建立socket连接来触发该事件唤醒Listener线程,Listener线程会调用ServerSocketChannel.accept()创建一个新的SocketChannel;
- Listener会轮询的从readers线程池中选取一个线程,并在Reader的readerSelector上注册OP_READ事件;
- 当客户端Client发送RPC请求时,其会触发Reader的readerSelector并唤醒Reader线程;
- Reader线程从SocketChannel中读取数据并封装成Call对象,然后放入共享队列callQueue中;
- handlers线程池中的handler线程都通过BlockingQueue.take()阻塞队列方法在callQueue上阻塞,当有Call对象被放入callQueue中后,其中一个Handler线程被唤醒。然后根据Call对象上的信息,调用Server.call()方法,随后会尝试将响应信息写入SocketChannel;
- 当响应结果无法完全写入SocketChannel时,将会在Responder线程的respondSelector上注册OP_WRITE事件,当监听到可写时,会唤醒Responder继续写响应;
接下来一步步来看其RPC.builder.build();初始化Listener(listener构造初始化时构造Reader线程池)线程、Responder线程等;server.start();启动rpc server端对应的listener、responder、handlers线程池的详细过程:
Listener类:首先Listener类构造初始化时,会建立socket并绑定监听相关的地址端口后,创建内部的Readers线程池组,并在当前listener上打开Selector,并在channel上注册对SelectionKey.OP_ACCEPT的监听事件。当Server创建Listener并调用start方法启动listener线程后,Listener线程会执行run方法,并循环监听通道上的OP_ACCEPT事件来判断是否有新的连接请求,如果有则调用doAccept()方法来处理;
private class Listener extends Thread {
private ServerSocketChannel acceptChannel = null; //the accept channel
private Selector selector = null; //the selector that we use for the server
private Reader[] readers = null; //Readers线程池组
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector;
selector= Selector.open();
readers = new Reader[readThreads];
for (int i = 0; i < readThreads; i++) {
Reader reader = new Reader(
"Socket Reader #" + (i + 1) + " for port " + port);
readers[i] = reader;
reader.start();
}
// Register accepts on the server socket with the selector.
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}
@Override
public void run() {
LOG.info(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
connectionManager.startIdleScan();
while (running) {
SelectionKey key = null;
try {
getSelector().select();
Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
// 当有新的连接请求到来时,调用doAccept方法响应该连接请求
if (key.isAcceptable())
doAccept(key);
}
} catch (IOException e) {
}
key = null;
}
} catch (OutOfMemoryError e) {
// .........
}
}
其中doAccept()方法会接受来自客户端的socket连接请求并初始化socket连接。之后doAccept()会从readers线程池组中轮询的选择出一个Reader线程来读取这个客户端的rpc请求。通过reader.addConnection(c)将这个Connection对象添加到Reader对象所维护的一个待连接处理队列pendingConnections中,并通过readSelector.wakeup();唤醒阻塞在reader上的readSelector.select()。此后,这个channel上的读与写任务将一直固定由这个分派给自己的Reader直接负责,而不会被其它Reader线程处理。
void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
channel.socket().setKeepAlive(true);
Reader reader = getReader();
Connection c = connectionManager.register(channel);
key.attach(c); // so closeCurrentConnection can get the object
reader.addConnection(c);
}
}
Reader类:reader类有自己的readSelector对象。Reader线程的主循环是在执行doRunLoop()方法;当reader.addConnection(c)有连接添加到reader上并唤醒readSelector.select()后;其会在这个reader的readSelector对象上注册监听SelectionKey.OP_READ事件;之后便会在readSelector上等待可读事件,也就是等待客户端的rpc请求到达,之后便读取该请求并用一个call对象进行封装,最后放入在callQueue中等待handler线程处理;
private synchronized void doRunLoop() {
while (running) {
SelectionKey key = null;
try {
// consume as many connections as currently queued to avoid
// unbridled acceptance of connections that starves the select
int size = pendingConnections.size();
for (int i=size; i>0; i--) {
Connection conn = pendingConnections.take();
conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
}
readSelector.select(); // 阻塞等待 有请求到来or被唤醒
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid()) {
// 有可读事件,调用doRead()方法处理
if (key.isReadable()) {
doRead(key);
}
}
key = null;
}
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
}
} catch (IOException ex) {
LOG.error("Error in Reader", ex);
}
}
}
doRead()函数调用栈如下:
- doRead()->connection.readAndProcess()->processOneRpc()->processRpcRequest()方法
doRead()函数会使用connection对象来进行读取处理,connection对象维护了Server和Client之间的socket连接。reader线程会调用readAndProcess()方法从IO流中读取一个RPC请求,首先其会先从socket流中读取连接头connectionhead,然后读取一个完整的RPC请求,最后会调用processOneRpc()来处理该请求。processOneRpc()会读取出rpc请求头域,然后调用processRpcRequest()处理rpc请求体。
private void processRpcRequest(RpcRequestHeaderProto header,
DataInputStream dis) throws WrappedRpcServerException,
InterruptedException {
// .........
// 读取rpc请求体
Writable rpcRequest;
try { //Read the rpc request
rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
rpcRequest.readFields(dis);
} catch (Throwable t) { // includes runtime exception from newInstance
// ......
}
Span traceSpan = null;
if (header.hasTraceInfo()) {
// If the incoming RPC included tracing info, always continue the trace
TraceInfo parentSpan = new TraceInfo(header.getTraceInfo().getTraceId(),
header.getTraceInfo().getParentId());
traceSpan = Trace.startSpan(rpcRequest.toString(), parentSpan).detach();
}
// 构造call对象封装rpc请求信息
Call call = new Call(header.getCallId(), header.getRetryCount(),
rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
header.getClientId().toByteArray(), traceSpan);
// 将call对象放入callQueue中,等待handler处理
callQueue.put(call); // queue the call; maybe blocked here
incRpcCount(); // Increment the rpc count
}
Handler类:是实际处理请求的线程类,负责执行RPC请求对应的本地函数,然后将结果发回客户端。在Server类中会同时存在多个Handler线程,它们并行的从共享队列callqueue中取出待处理的Call对象,然后调用Server.call()方法执行RPC调用对应的本地函数;之后handler会调用setupReponse()方法构造RPC应答结果,并通过reponse.doRespond()将响应结果返回给客户端。
/** Handles queued calls . */
private class Handler extends Thread {
@Override
public void run() {
while (running) {
try {
// 从callQueue当中读取请求
final Call call = callQueue.take(); // pop the queue; maybe blocked here
// .........
try {
// Make the call as the user via Subject.doAs, thus associating
// the call with the Subject
// 调用本地的call()方法执行RPC调用对应的本地函数
if (call.connection.user == null) {
value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
call.timestamp);
} else {
value =
call.connection.user.doAs
(new PrivilegedExceptionAction<Writable>() {
@Override
public Writable run() throws Exception {
// make the call
return call(call.rpcKind, call.connection.protocolName,
call.rpcRequest, call.timestamp);
}
}
);
}
} catch (Throwable e) {
// .........
}
CurCall.set(null);
synchronized (call.connection.responseQueue) {
// setupResponse() needs to be sync'ed together with
// responder.doResponse() since setupResponse may use
// SASL to encrypt response data and SASL enforces
// its own message ordering.
// 构造rpc响应结果信息,包含正常响应与异常响应
setupResponse(buf, call, returnStatus, detailedErr,
value, errorClass, error);
// Discard the large buf and reset it back to smaller size
// to free up heap
// 调用responder.doRespond()返回响应
responder.doRespond(call);
}
} catch (InterruptedException e) {
// .........
}
Responder类:也是一个线程类,server端仅有一个Responder对象,其内部包含一个writeSelector对象用于监听channel中的SelectionKey.OP_WRITE事件。此时当Responder线程循环执行doRunLoop()时,其会阻塞等待在writeSelector.select(PURGE_INTERVAL)上,等待通道可写之后,便会相应的执行doRunLoop()->doAsyncWrite()->processResponse()方法来继续执行剩余的响应写操作。
// Processes one response. Returns true if there are no more pending
// data for this channel.
//
private boolean processResponse(LinkedList<Call> responseQueue,
boolean inHandler) throws IOException {
Call call = null;
try {
synchronized (responseQueue) {
// 先进先出LinkedList,从respondeQueue中取出第一个Call对象进行处理
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
//
// Send as much data as we can in the non-blocking fashion
//
// 将call.rpcResponse中的数据写入到channel中
int numBytes = channelWrite(channel, call.rpcResponse);
if (numBytes < 0) {
return true;
}
if (!call.rpcResponse.hasRemaining()) { // 数据已经写入完毕,清理工作
//Clear out the response buffer so it can be collected
call.rpcResponse = null;
call.connection.decRpcCount();
if (numElements == 1) { // last call fully processes.
done = true; // no more data for this channel.
} else {
done = false; // more calls pending to be sent.
}
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": responding to " + call
+ " Wrote " + numBytes + " bytes.");
}
} else {
//
// If we were unable to write the entire response out, then
// insert in Selector queue.
//
// 如果数据没有写完,则把Call对象重新加入responseQueue中,下次会继续发送剩余的数据
call.connection.responseQueue.addFirst(call);
if (inHandler) {
// 如果是inHandler中调用,则需要唤醒writeSelector,并将channel注册到writeSelector上
// 这样就可以在Responder.doRunLoop()中检测到可写的SocketChannel,然后继续发送剩余数据至客户端
// set the serve time when the response has to be sent later
call.timestamp = Time.now();
incPending();
try {
// Wakeup the thread blocked on select, only then can the call
// to channel.register() complete.
writeSelector.wakeup();
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
} catch (ClosedChannelException e) {
//Its ok. channel might be closed else where.
done = true;
} finally {
decPending();
}
}
}
}
} finally {
// .........
}
return done;
}
从源码中可以看到,当Handler将call对象加入到当前Connection的responseQueue中时,会判断是否只有当前一个call对象需要返回响应,则将直接在handler线程中调用processResponse(),如果没能将结果一次性返回给客户端时,会在对应的writeSelector上注册SelectionKey.OP_WRITE事件,从而能够使Responder线程采用异步的方式来继续发送未发送完成的结果。