Netty源码分析-Bootstrap
Bootstrap 是 Netty 提供的客户端和服务器启动入口类,其提供了完善的服务配置参数,供开发者合理化配置自身服务。源码分析将从Bootstrap开始着手,逐个分析Netty的各个核心组建及设计理念。
写在前面
Java NIO已经将Java对于网络IO的操作提供了完善的API,Netty只是将其包装更方便使用,因此Netty源码分析的重点应该是Java NIO,因此需要先行了解Java NIO的API。
入手
开发一个TCP服务一般从EchoServer和EcheClient开始,顾名思义就是讲发送的内容原封不动的返回至客户端。
客户端代码核心:
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
服务器代码核心:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
ChannelFuture f = b.bind(PORT).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
上述demo是一个netty的入门demo,但是这里包含了很多接下来将会重点介绍的内容,包括Reactor模型的NioEventLoop,抽象连接的Channel,处理数据的ChannelHandler和非阻塞相关的ChannelFuture。
1.EventLoop
客户端和服务器初始化EventLoopGroup时明显存在差异,服务器会初始化两个EventLoopGroup,其中bossGroup用户处理连接请求,workerGroup用于处理后续连接上的IO事件。因此客户端只需要一个workerGroup去处理IO事件。
在初始化bootstrap时,传入的EventLoop类型都是NioEventLoopGroup,分析其构造函数最终调用的是MultithreadEventExecutorGroup的构造函数。Netty允许我们设置EventLoop中线程数量,如果没有设置则使用处理器核心数 * 2。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
...
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
...
}
}
chooser = chooserFactory.newChooser(children);
...
}
根据代码, 我们可以发现生成了一个大小为 nThreads 的 EventExecutor数组,newChild方法的实现在NioEventLoopGroup中,生成的实例是NioEventLoop。另外根据 nThreads是否是2的幂选择chooser,如果是2的幂使用PowerOfTwoEventExecutorChooser,反之使用 GenericEventExecutorChooser。chooser的功能是每当 Netty 需要一个 EventLoop 时,会调用 next() 方法获取一个可用的 EventLoop,其实现方式基本类似。
public EventExecutor next() {
return chooser.next();
}
2.Channel
我们以TCP为例,Netty使用Channel抽象一个Socket连接,它提供了完善的Socket连接状态查询以及网络IO读写等操作。另外Netty也提供了很多不同阻塞类型和不同协议的Channel,其中NIO为前缀的表示非阻塞,OIO为前缀的便是阻塞:
- NioSocketChannel, 代表异步的客户端 TCP Socket 连接.
- NioServerSocketChannel, 异步的服务器端 TCP Socket 连接.
- NioDatagramChannel, 异步的 UDP 连接
- NioSctpChannel, 异步的客户端 Sctp 连接.
- NioSctpServerChannel, 异步的 Sctp 服务器端连接.
- OioSocketChannel, 同步的客户端 TCP Socket 连接.
- OioServerSocketChannel, 同步的服务器端 TCP Socket 连接.
- OioDatagramChannel, 同步的 UDP 连接
- OioSctpChannel, 同步的 Sctp 服务器端连接.
- OioSctpServerChannel, 同步的客户端 TCP Socket 连接.
Bootstrap通过channel()方法设置创建的Channel类型,其实现过程是设置了ChannelFactory
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new BootstrapChannelFactory<C>(channelClass));
}
而 BootstrapChannelFactory 实现了 ChannelFactory 接口, 它提供了唯一的方法, 即 newChannel()。其核心代码如下:
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
客户端和服务器的行为是不同的,因此在实例化Channel这个动作也存在不同,客户端通过connect()触发,而服务器通过bind()触发,这两个操作对于客户端或者服务器来说虽然不同但是却必须的操作。
初始化Channel的函数是AbstractBootstrap#initAndRegister(),共三步:
- 创建Channel实例
- 第二步初始化Channel
- Channel注册至EventLoop。
final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
...
}
ChannelFuture regFuture = group().register(channel);
...
}
2.1创建Channel
ChannelFactory#newChannel 中,通过newInstance 来创建一个新 Channel 实例,客户端和服务器传入的Channel类型不同,其中客户端传入的是NioSocketChannel ,而服务器传入的是ServerSocketChannel。
我们首先分析NioSocketChannel 默认构造器代码如下:
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
/**
* Create a new instance using the given {@link SelectorProvider}.
*/
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
/**
* Create a new instance using the given {@link SocketChannel}.
*/
public NioSocketChannel(SocketChannel socket) {
this(null, socket);
}
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
其调用的父类构造函数过程:NioSocketChannel->AbstractNioByteChannel->AbstractNioChannel->AbstractChannel。AbstractNioByteChannel的构造函数中可以看到客户端主动连接的channel的监听事件是OP_READ
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
其中newSocket方法会通过创建一个新的Java NIO socketChannel
private static SocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
AbstractNioChannel中可以看到将channel设置为非阻塞。
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
这里我们可以得到几个结论:
- parent为null
- 每个channel都会生成一个id
- 在创建channel时会创建一个与之相关联的unsafe。unsafe是java对于底层实现的封装,不同类型的channel的不同表现主要通过unsafe来体现,关于unsafe会在后续章节详细介绍。
- 同样每个channel还会创建出与之相关联的ChannelPipeline。ChannelPipeline是netty对于数据流转及数据读写的重要通道,其根据IO读写分为两个数据流向,是Netty中非常关键的核心流程。关于ChannelPipeline会在后续章节详细介绍。
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
而NioServerSocketChannel的默认构造函数代码如下,可以看到服务器创建的channel的监听事件是OP_ACCEPT。:
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
NioServerSocketChannel的构造过程NioServerSocketChannel->AbstractNioMessageChannel->AbstractNioChannel->AbstractChannel。可以看到服务器和客户端拥有不同的channel父类,可想而知其unsafe的实现也是不一样的。
2.2初始化Channel
其中init(channel)方法是个抽象方法,由子类Bootstrap和ServerBootstrap
abstract void init(Channel channel) throws Exception;
Bootstrap中init的方法的主要功能是设置Channel的属性
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(handler());
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
try {
if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
}
}
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
而ServerBootstrap的init方法不同点在于,客户端init的是主动连接的channel的option和attrs,而服务器需要初始化两部分channel,第一个是用于接收客户端连接请求,也就是OP_ACCEPT事件,第二部分则是接收客户端的数据,也就是OP_READ事件。
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
if (handler() != null) {
p.addLast(handler());
}
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
其中ServerBootstrapAcceptor既是负责接收客户端连接请求:
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter
可以看到ServerBootstrapAcceptor是关心Inbound事件的ChannelHandler,关于ChannelHandler会在后面章节详细介绍,其需要实现的主要函数channelRead如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
...
});
} catch (Throwable t) {
...
}
}
其主要既是设置channel的channelHandler,option和attrs并将其注册至对应EventLoop。
最后我们来总结一下服务器端的 handler 与 childHandler 的区别与联系:
- 在服务器 NioServerSocketChannel 的 pipeline 中添加的是 handler 与 ServerBootstrapAcceptor。
- 当有新的客户端连接请求时, ServerBootstrapAcceptor.channelRead 中负责新建此连接的 NioSocketChannel 并添加 childHandler 到 NioSocketChannel 对应的 pipeline 中, 并将此 channel 绑定到 workerGroup 中的某个 eventLoop 中。
- handler 是在 accept 阶段起作用, 它处理客户端的连接请求。
- childHandler 是在客户端连接建立以后起作用, 它负责处理客户端的 IO 事件。
2.3将Channel注册至EventLoop
当Channel 初始化后,会紧接着调用 group().register() 方法来注册 Channel, 我们继续跟踪的话, 会发现其调用链如下:
AbstractBootstrap#initAndRegister -> SingleThreadEventLoop#register -> AbstractUnsafe#register->AbstractNioChannel#doRegister
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
unsafe方法中有着Netty特色的代码块if(inEvenLoop) {…}else {execute},这是Netty线程模型的体现,会在后续章节EventLoop中重点介绍。
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
其最终调用的方法,其中pipeline.invokeXXX和pipeline.fireXXX是NettyChannel事件传递的前缀写法。
private void register0(ChannelPromise promise) {
try {
...
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
...
}
}
其中doRegister最终我们将Channel注册到与 eventLoop 关联的 selector 上。
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
}
另外register事件会通过fireChannelRegistered()在ChannelPipeline中传递,其中ChannelInitializer就是在channelRegistered事件中将用户定义的ChannelHandler加入ChannelPipeline中:
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline pipeline = ctx.pipeline();
boolean success = false;
try {
initChannel((C) ctx.channel());
pipeline.remove(this);
ctx.fireChannelRegistered();
success = true;
} catch (Throwable t) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
} finally {
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
if (!success) {
ctx.close();
}
}
}
同时通过pipeline.remove(this)方法将自身从pipeline中删除,即ChannelInitializer在channel的register事件中会被删除。
总的来说,Channel 注册过程所做的工作就是将 Channel 与对应的 EventLoop 关联,之后这个 Channel 中的所有 IO 操作都是在这个 EventLoop 中执行的,最终的注册还是将 Java NIO SocketChannel 注册到指定的 selector 中,并通过selector完成网络事件的轮询。
3.connect
对于客户端,最核心的工作就是发起TCP连接,其核心方法是connect,最终调用的是 doConnect方法, 其实现如下:
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
channel的具体类型是NioSocketChannel ,其源码如下:
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
pipeline的默认实现是DefaultChannelPipeline,其源码如下:
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
在DefaultChannelPipeline由一个head和tail组成的双向链表,其链表上的entry是ChannelHandlerContext,像demo中的EchoClientHandler就会挂在这条双向链表中。IO写事件是由tail->head的,IO读事件是由head->tail的,因此unsafe操作都在head中实现。
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
最终的发送代码都在unsafe中实现,而不同的Channel也会拥有不同的unsafe,根据我们的NioSocketChannel实例可以找到其unsafe的connect实现:
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
...
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
...
}
} catch (Throwable t) {
...
}
}
其中doConnect实现在NioSocketChannel中:
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
到此我们终于找到了Netty封装的Java NIO函数完成了具体的socket连接。
4.bind
bind方法是服务器必经方法,其最终调用代码如下:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
...
if (regFuture.isDone()) {
...
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
...
return promise;
}
}
其中initAndRegister在上述文章中已经介绍,主要完成channel初始化及注册至Eventloop。
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
...
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
接着又是我们熟悉的流程,从channel->tail->head->unsafe
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
pipeline的bind实现在DefaultChannelPipeline的tail节点中函数中:
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
bind函数传递过程:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
...
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
...
}
return promise;
}
最后走到pipeline中的head节点:
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
unsafe的定义在AbstractChannel中:
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
...
try {
doBind(localAddress);
} catch (Throwable t) {
...
}
...
safeSetSuccess(promise);
}
其中doBind是个抽象方法,因为在服务器端指定的channel类型是NioServerSocketChannel,因此该方法的具体实现在NioServerSocketChannel中:
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
总结
我们从bootstrap为源头分析了Netty作为客户端和服务器的核心流程,以及揭开了Netty众多模块的面纱,之后的章节将会逐个详细分析。希望读者在阅读源码之前能够有一定的Netty开发经验,并且不必陷入复杂琐碎的细节中,先了解整体流程,再逐个击破,实在无法理解的可以先有个概念,日后再慢慢思考。