netty4源码分析-bind
本文为原创,转载请注明出处
netty4源码分析-bind
在前一篇文章中分析了监听套接字ServerSocketChannel的创建过程,本文接着分析绑定IP和端口的过程。
回到之前未分析完的doBind逻辑,前一篇文章已分析到dobind方法中initAndRegister方法,该方法最终触发了对regPromise 的listener的回调,Listener将bind任务加到boss线程的任务队列中
//AbstractBootstrap
private ChannelFuture AbstractBootstrap doBind(final SocketAddress localAddress) {
final ChannelFuture regPromise = initAndRegister();
final Channel channel = regPromise.channel();
final ChannelPromise promise = channel.newPromise();
if (regPromise.isDone()) {
doBind0(regPromise, channel, localAddress, promise);
} else {
regPromise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(future, channel, localAddress, promise);
}
});
}
return promise;
}
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
本文就来分析bind任务
channel.bind(localAddress, promise)调用AbstractChannel的bind方法
//AbstractChannel
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
pipeline.bind(localAddress, promise)调用的是DefaultChannelPipeline的方法
//DefaultChannelPipeline
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
tail.bind(localAddress, promise)会调用DefaultChannelHandlerContext的方法
//DefaultChannelHandlerContext
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
validatePromise(promise, false);
return findContextOutbound().invokeBind(localAddress, promise);
}
private DefaultChannelHandlerContext findContextOutbound() {
DefaultChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!(ctx.handler() instanceof ChannelOutboundHandler));
return ctx;
}
bind是一个Outbound事件,因此会按照tail->head的顺序执行所有的Outbound处理器,目前有三个处理器:tail-> ServerBootstrapAcceptor->head,但只有head是outbound处理器,所以看一下Head的invokeBind方法
// DefaultChannelHandlerContext
private ChannelFuture invokeBind(final SocketAddress localAddress, final ChannelPromise promise) {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
invokeBind0(localAddress, promise);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
invokeBind0(localAddress, promise);
}
});
}
return promise;
}
private void invokeBind0(SocketAddress localAddress, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise)这行代码会调用Headhandler的bind方法
//Headhandler
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
而headHandler会调用AbstractUnsafe的bind方法
//AbstractUnsafe
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (!ensureOpen(promise)) {
return;
}
try {
boolean wasActive = isActive();
// See: https://github.com/netty/netty/issues/576
if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&
Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
doBind(localAddress);
promise.setSuccess();
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
} catch (Throwable t) {
promise.setFailure(t);
closeIfClosed();
}
}
因为AbstractUnsafe是AbstractChannel的内部类,所以doBind(localAddress)调用的就是AbstractChannel的子类NioServerSocketChannel的方法
//NioServerSocketChannel
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
//AbstractUnsafe
private void invokeLater(Runnable task) {
// This method is used by outbound operation implementations to trigger an inbound event later.
// They do not trigger an inbound event immediately because an outbound operation might have been
// triggered by another inbound event handler method. If fired immediately, the call stack
// will look like this for example:
//
// handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
// -> handlerA.ctx.close()
// -> channel.unsafe.close()
// -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
//
// which means the execution of two inbound handler methods of the same handler overlap undesirably.
eventLoop().execute(task);
}
终于看到熟悉的ServerSocket的bind方法的调用了吧,至此,就完成了对IP和端口的绑定。注意:此处的backlog(最大完成连接队列数)的默认值为3072。
由于此时bind已执行,所以isActive方法会返回true,然而channelActive是一个Inbound事件,所以不能由outbound操作直接触发(具体原因看上面代码的注释),需要将channelActive任务加入到boss线程的任务队列中,此时boss线程的任务队列已经执行完了bind任务,接着再执行channelActive任务。
ChannelActive是一个inbound事件,因此会按照head->tail的顺序执行Inbound处理器,目前有三个处理器:head-> ServerBootstrapAcceptor->tail, ServerBootstrapAcceptor和tail都是inbound处理器,先看一下Head的fireChannelActive方法
//DefaultChannelPipeline
public ChannelPipeline fireChannelActive() {
head.fireChannelActive();
if (channel.config().isAutoRead()) {
channel.read();
}
return this;
}
head.fireChannelActive()的代码如下:
// DefaultChannelHandlerContext
public ChannelHandlerContext fireChannelActive() {
final DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
return this;
}
private DefaultChannelHandlerContext findContextInbound() {
DefaultChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!(ctx.handler() instanceof ChannelInboundHandler));
return ctx;
}
private void invokeChannelActive() {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
ServerBootstrapAcceptor和tail的channelActive方法都没有做任何实质性的事情。最后以tailHandler的空实现结束
接着再看DefaultChannelPipeline执行完head.fireChannelActive()后,对channel.read()的执行
里面调用了abstractChannel的如下方法:
//abstractChannel
public Channel read() {
pipeline.read();
return this;
}
//DefaultChannelPipeline
public ChannelPipeline read() {
tail.read();
return this;
}
//DefaultChannelHandlerContext
public ChannelHandlerContext read() {
findContextOutbound().invokeRead();
return this;
}
Read是一个Outbound事件,因此findContextOutbound()会按照tail->head的顺序执行所有的Outbound处理器,目前有三个处理器:tail->ServerBootstrapAcceptor->head,但只有head是outbound处理器,所以看一下Head的invokeRead方法
private void invokeRead() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
invokeRead0();
} else {
Runnable task = invokeRead0Task;
if (task == null) {
invokeRead0Task = task = new Runnable() {
@Override
public void run() {
invokeRead0();
}
};
}
executor.execute(task);
}
}
private void invokeRead0() {
try {
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
((ChannelOutboundHandler) handler()).read(this)这行代码会调用Headhandler的read方法
//Headhandler
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
unsafe.beginRead()会调用AbstractUnsafe的beginRead方法
//AbstractUnsafe
public void beginRead() {
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
因为AbstractUnsafe是AbstractChannel的内部类,所以doBeginRead()调用的就是AbstractChannel的子类AbstractNioChannel的方法
//AbstractNioChannel
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
selectionKey.interestOps()的值是之前AbastractUnsafe类中的doRegister方法执行如下代码selectionKey = javaChannel().register(eventLoop().selector, 0, this)时设置的,因此值为0。
而readInterestOp是之前创建NioServerSocketChanne时,NioServerSocketChannel类的构造函数中设置的super(null, newSocket(), SelectionKey.OP_ACCEPT),因此值为16。
selectionKey.interestOps(interestOps | readInterestOp)会将ops设置为16。
总结:
依次发生了以下事件:Bind(outbound)->channelActive(inbound)->read(outbound)。
注意:channelActive是在bind中触发的。
Boss线程的任务队列变化为:Bind任务->channelActive任务
bind任务共做了以下几件事情:
1、将监听套接字绑定IP和端口,并设置最大完成连接队列数
2、将channelActive任务加入到boss线程的任务队列中
channelActive任务做了以下事情:将selectionKey的interestOps设置为SelectionKey.OP_ACCEPT,即16
上一篇: netty4源码分析-socket
下一篇: netty4源码分析-connect