netty源码分析-server启动-initAndRegister 博客分类: netty源码分析
程序员文章站
2024-03-17 19:12:16
...
本文包括以下内容
1. initAndRegister-createChannel分析
2. initAndRegister-createChannel分析
3. ChannelInitializer分析
创建channel-server端bind()调用链如下
sever端启动主要处理都在bind()处理中,其中主要代码如下
AbstractBootstrap
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise;
if (regFuture.isDone()) {
promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(regFuture, channel, localAddress, promise);
}
});
}
return promise;
}
其中最重要的两个部分已经标出,先来分析一下initAndRegister()
分析initAndRegister方法
时序图如下
final ChannelFuture initAndRegister() {
Channel channel;
try {
//创建channel,并配置其主要属性
channel = createChannel();
} catch (Throwable t) {
return VoidChannel.INSTANCE.newFailedFuture(t);
}
try {
//初始化channel
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return channel.newFailedFuture(t);
}
//创建ChannelPromise
ChannelPromise regFuture = channel.newPromise();
channel.unsafe().register(regFuture);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
1.createChannel分析
ServerBootstrap:
Channel createChannel() {
EventLoop eventLoop = group().next();
return channelFactory().newChannel(eventLoop, childGroup);
}
ServerBootstrapChannelFactory
@Override
public T newChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
try {
//class=NioSocketChannel.class,调用它的构造方法。
Constructor<? extends T> constructor = clazz.getConstructor(EventLoop.class, EventLoopGroup.class);
return constructor.newInstance(eventLoop, childGroup);
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
//调用NioServerSocketChannel构造方法创建实例
public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
//调用父类构造方法
super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT);
//创建默认配置
config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());
}
protected AbstractNioMessageServerChannel(
Channel parent, EventLoop eventLoop, EventLoopGroup childGroup, SelectableChannel ch, int readInterestOp) {
//调用父类构造方法
//此时parent=null 。ch=ServerSocketChannelImp(java原生的)
super(parent, eventLoop, ch, readInterestOp);
//childGroup赋值
this.childGroup = childGroup;
}
protected AbstractNioMessageChannel(
Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) {
//调用父类构造方法
super(parent, eventLoop, ch, readInterestOp);
}
protected AbstractNioChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) {
//调用父类构造方法
super(parent, eventLoop);
// 赋值 java SocketChannelImpl
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
//配置java.nio.ServerSocketChannel为非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
//异常处理暂时不关注
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
最后调用
protected AbstractChannel(Channel parent, EventLoop eventLoop) {
this.parent = parent;
// 非空和兼容校验
this.eventLoop = validate(eventLoop);
unsafe = newUnsafe();//初始化unsafe××重要×××
pipeline = new DefaultChannelPipeline(this);//初始化pipeline××重要×××
}
AbstractNioChannel
//兼容校验
protected boolean isCompatible(EventLoop loop) {
return loop instanceof NioEventLoop;
}
小结:createChannel创建了 unsafe,pipeline,java.nio.ServerSocketChannel,然后为其属性eventLoop,childGroup,readInterestOp,config 赋值。
eventLoop是什么时候创建的 ?
创建 EventLoopGroup时创建的
2.init分析
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
//options value= {SO_BACKLOG=128}
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
//attrs={}
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
if (handler() != null) {
p.addLast(handler());
}
final ChannelHandler currentChildHandler = childHandler;
//获取
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
//childOptions={SO_KEEPALIVE=true}
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
//childAttrs={}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//ChannelInitializer的作用:定义initChannel方法,便于添加handler。
//ServerBootstrapAcceptor的主要方法channelRead获取了ServerBootstrap的参数,貌似ServerBootstrapAcceptor作用只是获取ServerBootstrap的参数
//详细作用以后分析
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
currentChildAttrs));
}
});
}
在一个标准的netty server childHandler 对应下面部分
3.ChannelInitializer分析
//ChannelInitializer:用于添加handler,在chnnel执行register()时调用
public abstract class ChannelInitializer<C extends Channel> extends ChannelHandlerAdapter {
//子类实现 通常用于添加handler
protected abstract void initChannel(C ch) throws Exception;
//在chnnel执行register()时调用
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline pipeline = ctx.pipeline();
boolean success = false;
try {
//调用子类实现方法, 通常用于添加handler
initChannel((C) ctx.channel());
//使用完毕 移除当前handler
pipeline.remove(this);
//传递ChannelRegistered事件,调用其他处理ChannelRegistered事件的handler
//实现事件传递关键的步骤,如果不写,则事件终止
ctx.fireChannelRegistered();
success = true;
} catch (Throwable t) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
} finally {
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
if (!success) {
ctx.close();
}
}
}
}
在server启动-initAndRegister-init()中添加ServerBootstrapAcceptor和用户添加自定义handler时用到.
小结:init()将ServerBootstrap传递给channel(),并添加了一个访问ServerBootstrap属性的handler
最后看一下这两行代码
//创建ChannelPromise,ChannelPromise用途详见其分析篇
ChannelPromise regFuture = channel.newPromise();
//注册channel到EventLoop上
channel.unsafe().register(regFuture);
OK initAndRegister 分析完毕
总结:initAndRegister 创建了NioServerSocketChannel,初始化了ServerBootstrapAcceptor,注册channel到parent EventLoop上 。
1. initAndRegister-createChannel分析
2. initAndRegister-createChannel分析
3. ChannelInitializer分析
创建channel-server端bind()调用链如下
sever端启动主要处理都在bind()处理中,其中主要代码如下
AbstractBootstrap
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise;
if (regFuture.isDone()) {
promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(regFuture, channel, localAddress, promise);
}
});
}
return promise;
}
其中最重要的两个部分已经标出,先来分析一下initAndRegister()
分析initAndRegister方法
时序图如下
final ChannelFuture initAndRegister() {
Channel channel;
try {
//创建channel,并配置其主要属性
channel = createChannel();
} catch (Throwable t) {
return VoidChannel.INSTANCE.newFailedFuture(t);
}
try {
//初始化channel
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return channel.newFailedFuture(t);
}
//创建ChannelPromise
ChannelPromise regFuture = channel.newPromise();
channel.unsafe().register(regFuture);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
1.createChannel分析
ServerBootstrap:
Channel createChannel() {
EventLoop eventLoop = group().next();
return channelFactory().newChannel(eventLoop, childGroup);
}
ServerBootstrapChannelFactory
@Override
public T newChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
try {
//class=NioSocketChannel.class,调用它的构造方法。
Constructor<? extends T> constructor = clazz.getConstructor(EventLoop.class, EventLoopGroup.class);
return constructor.newInstance(eventLoop, childGroup);
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
//调用NioServerSocketChannel构造方法创建实例
public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
//调用父类构造方法
super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT);
//创建默认配置
config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());
}
protected AbstractNioMessageServerChannel(
Channel parent, EventLoop eventLoop, EventLoopGroup childGroup, SelectableChannel ch, int readInterestOp) {
//调用父类构造方法
//此时parent=null 。ch=ServerSocketChannelImp(java原生的)
super(parent, eventLoop, ch, readInterestOp);
//childGroup赋值
this.childGroup = childGroup;
}
protected AbstractNioMessageChannel(
Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) {
//调用父类构造方法
super(parent, eventLoop, ch, readInterestOp);
}
protected AbstractNioChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) {
//调用父类构造方法
super(parent, eventLoop);
// 赋值 java SocketChannelImpl
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
//配置java.nio.ServerSocketChannel为非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
//异常处理暂时不关注
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
最后调用
protected AbstractChannel(Channel parent, EventLoop eventLoop) {
this.parent = parent;
// 非空和兼容校验
this.eventLoop = validate(eventLoop);
unsafe = newUnsafe();//初始化unsafe××重要×××
pipeline = new DefaultChannelPipeline(this);//初始化pipeline××重要×××
}
AbstractNioChannel
//兼容校验
protected boolean isCompatible(EventLoop loop) {
return loop instanceof NioEventLoop;
}
小结:createChannel创建了 unsafe,pipeline,java.nio.ServerSocketChannel,然后为其属性eventLoop,childGroup,readInterestOp,config 赋值。
eventLoop是什么时候创建的 ?
创建 EventLoopGroup时创建的
2.init分析
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
//options value= {SO_BACKLOG=128}
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
//attrs={}
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
if (handler() != null) {
p.addLast(handler());
}
final ChannelHandler currentChildHandler = childHandler;
//获取
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
//childOptions={SO_KEEPALIVE=true}
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
//childAttrs={}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//ChannelInitializer的作用:定义initChannel方法,便于添加handler。
//ServerBootstrapAcceptor的主要方法channelRead获取了ServerBootstrap的参数,貌似ServerBootstrapAcceptor作用只是获取ServerBootstrap的参数
//详细作用以后分析
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
currentChildAttrs));
}
});
}
在一个标准的netty server childHandler 对应下面部分
3.ChannelInitializer分析
//ChannelInitializer:用于添加handler,在chnnel执行register()时调用
public abstract class ChannelInitializer<C extends Channel> extends ChannelHandlerAdapter {
//子类实现 通常用于添加handler
protected abstract void initChannel(C ch) throws Exception;
//在chnnel执行register()时调用
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline pipeline = ctx.pipeline();
boolean success = false;
try {
//调用子类实现方法, 通常用于添加handler
initChannel((C) ctx.channel());
//使用完毕 移除当前handler
pipeline.remove(this);
//传递ChannelRegistered事件,调用其他处理ChannelRegistered事件的handler
//实现事件传递关键的步骤,如果不写,则事件终止
ctx.fireChannelRegistered();
success = true;
} catch (Throwable t) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
} finally {
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
if (!success) {
ctx.close();
}
}
}
}
在server启动-initAndRegister-init()中添加ServerBootstrapAcceptor和用户添加自定义handler时用到.
小结:init()将ServerBootstrap传递给channel(),并添加了一个访问ServerBootstrap属性的handler
最后看一下这两行代码
//创建ChannelPromise,ChannelPromise用途详见其分析篇
ChannelPromise regFuture = channel.newPromise();
//注册channel到EventLoop上
channel.unsafe().register(regFuture);
OK initAndRegister 分析完毕
总结:initAndRegister 创建了NioServerSocketChannel,初始化了ServerBootstrapAcceptor,注册channel到parent EventLoop上 。
推荐阅读