netty4源码分析-connect
本文为原创,转载请注明出处
netty4源码分析-connect
客户端向服务端发起connect请求由以下代码触发:
ChannelFuture f = b.connect(host, port).sync();
调用Bootstrap的connect方法:
//Bootstrap
public ChannelFuture connect(String inetHost, int inetPort) {
return connect(new InetSocketAddress(inetHost, inetPort));
}
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
validate();
return doConnect(remoteAddress, localAddress());
}
private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise = channel.newPromise();
if (regFuture.isDone()) {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
} else {
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
}
});
}
return promise;
}
1、 首先分析initAndRegister()方法
// AbstractBootstrap
final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return channel.newFailedFuture(t);
}
ChannelPromise regPromise = channel.newPromise();
group().register(channel, regPromise);
if (regPromise.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regPromise;
}
1.1 final Channel channel = channelFactory().newChannel()的执行逻辑与socket那篇文章中分析的差不多
//BootstrapChannelFactory
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
区别在于:a、本次根据反射创建的channel的类型是SocketChannel
//BootstrapChannelFactory
public NioSocketChannel() {
this(newSocket());
}
private static SocketChannel newSocket() {
try {
return SocketChannel.open();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new DefaultSocketChannelConfig(this, socket.socket());
}
b、NioSocketChannel的父类是AbstractNioByteChannel,设置的interestOps为SelectionKey.OP_READ。(注:NioServerSocketChannel的父类是AbstractNioMessageChannel,设置的interestOps为SelectionKey.OP_ACCEPT)
//AbstractNioByteChannel
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
c、最后分析父类AbstractNioChannel的构造函数
//AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
// AbstractChannel
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
newUnsafe()调用AbstractNioByteChannel的实现,实例化一个NioByteUnsafe(注:在NioServerSocketChannel中实例化的是NioMessageUnsafe)。其他点都一样,将channel设置为非阻塞,然后为channel创建管道。
1.2 init(channel)
该方法由Bootstrap实现(注:在NioServerSocketChannel中,该方法由ServerBootstrap实现)
//Bootstrap
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(handler());
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
try {
if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
}
}
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
该方法主要做了两件事:
a) 为管道增加一个Inbound处理器ChannelInitializer。经过此步骤后,管道中的处理器链表为:head(outbound)->ChannelInitializer(inbound)->tail(inbound)。注意ChannelInitializer的实现方法initChannel,里面会当channelRegistered事件发生时将EchoClientHandler加入到管道中。
b) 设置NioSocketChannel的options和attrs
1.3 group().register(channel, regPromise);
实际是调用MultithreadEventLoopGroup的register方法
//MultithreadEventLoopGroup
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);
}
next方法从group中选择一个EventExecutor(实际是一个SingleThreadEventLoop),然后执行register方法
//SingleThreadEventLoop
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
}
channel.unsafe().register(this, promise);
return promise;
}
channel.unsafe().register(this, promise)这里会调用AbstractChannel的内部类AbstractUnsafe的register方法
//AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
closeForcibly();
promise.setFailure(t);
}
}
}
此处开启了eventloop中的线程(即启动了netty客户端的一个线程),并将register0任务加入到线程的任务队列中。经过此步骤后,线程的任务队列仅含有一个任务,即register0任务,且正在被执行。
接着分析register0任务具体干了什么事情
//AbstractUnsafe
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!ensureOpen(promise)) {
return;
}
Runnable postRegisterTask = doRegister();
registered = true;
promise.setSuccess();
pipeline.fireChannelRegistered();
if (postRegisterTask != null) {
postRegisterTask.run();
}
if (isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
if (!promise.tryFailure(t)) {
logger.warn(
"Tried to fail the registration promise, but it is complete already. " +
"Swallowing the cause of the registration failure:", t);
}
closeFuture.setClosed();
}
}
protected Runnable doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return null;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
doRegister中的这行代码selectionKey = javaChannel().register(eventLoop().selector, 0, this)将SocketChannel、0、以及this注册到selector中并得到对应的selectionkey
接着promise.setSuccess()将promise设置为success,就会触发异步回调,回调之前main函数所在的线程中为ChannelPromise添加的listener,即Bootstrap的以下代码:
//Bootstrap.java
private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise = channel.newPromise();
if (regFuture.isDone()) {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
} else {
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
}
});
}
经过此步骤后,线程的任务队列数量由原来的1个增加到了2个,即正在执行的register0任务以及本次新增的doConnect0任务。
再接着分析register0任务中的此行代码pipeline.fireChannelRegistered()
//DefaultChannelPipeline
public ChannelPipeline fireChannelRegistered() {
head.fireChannelRegistered();
return this;
}
//DefaultChannelHandlerContext
public ChannelHandlerContext fireChannelRegistered() {
final DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
return this;
}
private void invokeChannelRegistered() {
try {
((ChannelInboundHandler) handler).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
ChannelRegistered是一个Inbound事件,因此会按照head->tail的顺序执行所有的inbound处理器,目前有三个处理器:head-> ChannelInitializer ->tail,ChannelInitializer和tail都是inbound处理器,所以看一下ChannelInitializer的invokeChannelRegistered方法
//ChannelInitializer
public final void channelRegistered(ChannelHandlerContext ctx)
throws Exception {
boolean removed = false;
boolean success = false;
try {
initChannel((C) ctx.channel());
ctx.pipeline().remove(this);
removed = true;
ctx.fireChannelRegistered();
success = true;
} catch (Throwable t) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
} finally {
if (!removed) {
ctx.pipeline().remove(this);
}
if (!success) {
ctx.close();
}
}
}
}
该方法主要做了以下几件事:
a) initChannel方法是在此处实例化内部类ChannelInitializer实现的
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
//new LoggingHandler(LogLevel.INFO),
new EchoClientHandler(firstMessageSize));
}
});
其功能就是将EchoClientHandler加入到管道中,并将自己(ChannelInitializer)从管道中删除,此时的处理器链表为:
Head->EchoClientHandler->tail
b) 接着调用EchoClientHandler和tail的channelRegistered方法,都没有做啥实质性的事情,最后以tail的空实现结束。
再分析register0任务中的以下代码
//AbstractUnsafe
if (isActive())
{
pipeline.fireChannelActive();
}
// NioSocketChannel
public boolean isActive() {
SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}
由于此时还没有执行connect操作,所以isActive返回false,不会执行pipeline.fireChannelActive()
执行完此代码后,register0任务就执行完了,boss线程中的任务队列中仅剩下doConnect0任务。
2、 接下来分析doConnect0方法
//Bootstrap
private static void doConnect0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
if (localAddress == null) {
channel.connect(remoteAddress, promise);
} else {
channel.connect(remoteAddress, localAddress, promise);
}
promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
channel.connect(remoteAddress, promise)调用AbstractChannel的方法,channel里会调用管道的方法
//AbstractChannel
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
// DefaultChannelPipeline
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
由于connect是一个Outbound事件,所以按照tail到head的顺序执行所有的outBound处理器。目前共有Head->EchoClientHandler->tail三个处理器,而只有Head是outbound处理器
//HeadHandler
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
看一下AbstractNioUnsafe的connect方法
//AbstractNioUnsafe
public void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
throw new IllegalStateException("connection attempt already made");
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
promise.setSuccess();
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
if (t instanceof ConnectException) {
Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
newT.setStackTrace(t.getStackTrace());
t = newT;
}
closeIfClosed();
promise.tryFailure(t);
}
}
重点分析一下doConnect方法,由NioSocketChannel实现
//NioSocketChannel
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
javaChannel().socket().bind(localAddress);
}
boolean success = false;
try {
boolean connected = javaChannel().connect(remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
boolean connected = javaChannel().connect(remoteAddress)此处就向服务端发起了connect请求,准备三次握手。由于是非阻塞模式,所以该方法会立即返回。如果建立连接成功,则返回true,否则返回false,后续需要使用select来检测连接是否已建立成功。如果返回false,此种情况就需要将ops设置为SelectionKey.OP_CONNECT,等待connect的select事件通知,然后调用finishConnect方法。
对于connect,会加一个超时调度任务,默认的超时时间是30s,超时后做什么操作,后续再分析吧。
3、 最后分析客户端线程NioEventLoop的select接收到connect事件后的处理逻辑
//NioEventLoop
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException e) {
unsafe.close(unsafe.voidPromise());
}
}
当readyOps包含SelectionKey.OP_CONNECT时,必须首先将OP_CONNECT从ops中去掉,然后调用unsafe.finishConnect()。
//AbstractUnsafe
public void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
assert eventLoop().inEventLoop();
assert connectPromise != null;
try {
boolean wasActive = isActive();
doFinishConnect();
connectPromise.setSuccess();
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
} catch (Throwable t) {
if (t instanceof ConnectException) {
Throwable newT = new ConnectException(t.getMessage() + ": " + requestedRemoteAddress);
newT.setStackTrace(t.getStackTrace());
t = newT;
}
connectPromise.setFailure(t);
closeIfClosed();
} finally {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
doFinishConnect方法通过调用SocketChannel的finishConnect方法完成连接的建立,在NioSocketChannel中实现
//NioSocketChannel
protected void doFinishConnect() throws Exception {
if (!javaChannel().finishConnect()) {
throw new Error();
}
}
此时,isActive()返回true,所以触发ChannelActive事件,该事件是一个inbound事件,所以Inbound的处理器可以通过实现channelActive方法来进行相应的操作,netty的自带例子中,EchoClientHandler实现该方法来开始向服务端写数据。
总结:从发起connect请求到请求建立先后共经历了以下几件事情:
- 创建套接字SocketChannel
- 设置套接字为非阻塞
- 设置channel当前感兴趣的事件为SelectionKey.OP_READ
- 创建作用于SocketChannel的管道Pipeline,该管道中此时的处理器链表为:Head(outbound)->tail(inbound)。
- 设置SocketChannel的options和attrs。
- 为管道增加一个Inbound处理器ChannelInitializer。经过此步骤后,管道中的处理器链表为:head(outbound)->ChannelInitializer(inbound)->tail(inbound)。注意ChannelInitializer的实现方法initChannel,里面会当channelRegisgered事件发生时将EchoClientHandler加入到管道中。
- 启动客户端线程,并将register0任务加入到线程的任务队列中。而register0任务做的事情为:将SocketChannel、0、注册到selector中并得到对应的selectionkey。然后通过回调,将doConnect0任务加入到线程的任务队列中。线程从启动到现在这段时间内,任务队列的变化如下:register0任务->register0任务,doConnect0任务-> doConnect0任务
- 通过channelRegistered事件,将EchoClientHandler加入到管道中,并移除ChannelInitializer,经过此步骤后,管道中的处理器链表为:head(outbound)-> EchoClientHandler (inbound)->tail(inbound)。管道从创建到现在这段时间内,处理器链表的变化历史为:head->tail,head->ChannelInitializer(inbound)->tail,head-> EchoClientHandler (inbound)->tail
- doConnect0任务会触发connect事件,connect是一个Outbound事件,headHandler通过调用AbstractNioUnsafe的方法向服务端发起connect请求,并设置ops为SelectionKey.OP_CONNECT
- 客户端线程NioEventLoop中的select接收到connect事件后,将SelectionKey.OP_CONNECT从ops中移除,然后调用finishConnect方法完成连接的建立。到此,connect就正式建立了。
- 最后触发ChannelActive事件。
上一篇: netty4源码分析-bind
下一篇: netty 4源码分析-write