netty启动源码4
接着之前讲到 final ChannelFuture regFuture = initAndRegister();
接下来讲doBind0(regFuture, channel, localAddress, promise);
private ChannelFuture doBind(final SocketAddress localAddress) {
//初始化和注册
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
跟踪**doBind0(regFuture, channel, localAddress, promise);**方法到AbstractBootstrap类的dobind0()方法
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// 在触发channelregister()之前调用此方法。给用户处理程序设置的机会
// 管道在其channelRegistered()实现中。
//这些任务最终被事件轮询线程同步调用
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
System.out.println(" channel.eventLoop().execute(new Runnable() ");
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
发现了channel.eventLoop().execute()这个方法之前就碰到;这个方法是往taskqueue里添加了任务;然后有专门的线程(称为reactor线程)死循环的处理该队列的任务;
所以接下下跟踪 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);到AbstractChannelHandlerContext类的connect()
@Override
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}
这个方法是pipeline的处理链表里的tail节点的父类的方法
之前分析的处理链表:head->nettyTestHendler->ServerBootstrapAcceptor-> tail;
这行代码:final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
是表示从tail到head寻找是出栈节点的(实现ChannelOutboundHandler的类);
这里说明一下head节点是HeadContext实现了ChannelOutboundHandler,ChannelInboundHandler
其他三个节点都是只有实现了ChannelInboundHandler所以这里找到的结果就是head节点;
接着跟踪 next.invokeConnect(remoteAddress, localAddress, promise);跟踪到AbstractNioChannel类的内部类AbstractNioUnsafe的如下方法
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
//线程执行调用channelActive方法
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
其中的doBind(localAddress);就是原生jdk绑定端口号,但是此时还是没有给服务端的seversocketchannel绑定刚兴趣的事件;
接着又往taskqueue里添加任务,所以会执行**pipeline.fireChannelActive();**跟踪代码
知道pipeline.fireChannelActive();这个是执行处理链的是入栈的(inboundhandler接口实现类)的节点的channelActive()方法;先执行head节点这个方法,然后发现head这个方法里有
fireChannelActive();调用下个节点的channelActive();此时这个节点是我们自定义的nettytesthendler的方法,若我们调用fireChannelActive();则回去ServerBootstrapAcceptor的channelActive();的方法但是新版的netty里面加了注解@skip表示直接跳过该方法,所以会跳到最后一个tail去;
当调用完后则会在headcontextd的channelActive();方法中还有个 readIfIsAutoRead();这个方法去找是属于outboundHandler的节点目前只有head节点是属于出栈节点所以最后会调用head的read()方法,.最终继续跟踪下去会给通道绑定监听连接事件;
head->nettyTestHendler->ServerBootstrapAcceptor-> tail;
总结:所以doBind0(regFuture, channel, localAddress, promise);主要是绑定端口.通道绑定刚兴趣事件,接下来就是等待连接事件,可以看reactor线程的那个死循环;
上一篇: 关于一段代码的疑惑
推荐阅读
-
ASP.NET MVC4+EF5+EasyUI+Unity2.x注入的后台管理系统之前端页面框架构建源码分享
-
电脑插4根内存开不了机拔掉两根内存可以正常启动
-
使用.Net Core + Vue + IdentityServer4 + Ocelot 实现一个简单的DEMO +源码
-
Spring5源码解析4-refresh方法之invokeBeanFactoryPostProcessors
-
CloudStack SSVM启动条件源码阅读与问题解决方法
-
Netty源码分析 (四)----- ChannelPipeline
-
如何正确理解和使用Activity的4种启动模式
-
Netty源码分析 (三)----- 服务端启动源码分析
-
4月底删除 微软启动器Android版将放弃Cortana服务
-
netty源码解析(4.0)-28 ByteBuf内存池:PooledByteBufAllocator-把一切组装起来