Netty源码分析(二):Reactor模型在Netty中的应用
Netty源码分析主要分两部分:
基础知识
Netty中的Reactor主从模型
服务端测试代码:
总体分为三步:初始化+注册+绑定
其中bossGroup和workerGroup就是基于主从Reactor模型的两个路由,bossGroup主要负责管理链接,workerGroup主要负责成功链接后链路上的数据读写。
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sh) throws Exception {
sh.pipeline()
.addLast(new RpcDecoder(RpcRequest.class)) //解码
.addLast(new RpcEncoder(RpcResponse.class)) //编码
.addLast(new ServerHandler());
}
});
ChannelFuture future = sb.bind(port).sync();
if (future.isSuccess()) {
System.out.println("服务端启动成功");
} else {
System.out.println("服务端启动失败");
future.cause().printStackTrace();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
future.channel().closeFuture().sync();
1. NioEventLoopGroup的初始化
简单总结一下,共三步骤:创建线程池、创建Reactor路由组、给每个执行器设置监听。
/**nEventLoops: 线程池大小默认大小是cpu核数*2
*executor: 线程池,netty5.0版本开始使用的是ForkJoinPool线程池,之前版本用的是手动版线程池,就是来一个就new一个thread
*selectorProvider:selector选择器,windows中jdk默认使用的是WindowsSelectorProvider选择器,通过调用openServerSocketChannel方法创建一个ServerSocketChannelImpl类并在里面openPiple建立通信管道,后面在介绍。
*/
public NioEventLoopGroup(int nEventLoops, Executor executor, final SelectorProvider selectorProvider) {
super(nEventLoops, executor, selectorProvider);
}
//初始化NioEventLoopGroup最终到了这一步,在这里完成创建工作。
(1)、第一步:如果没有,则创建线程池,大小是nEventExecutors,这个线程池主要工作是用来(待补充);netty5.0版本开始使用的是ForkJoinPool线程池,之前版本有single版本的,还有的别的记不得了
(2)、第二步:创建EventExecutor执行组,这个就相当于MainReactor,里面是NioEventLoop的集合。
根据源码分析一中的Reactor相关知识回一下,IO多路复用机制需要serverSocketChannel和Selector,然后设置serverSocketChannel为非阻塞并注册到selector中去,所以这个serverSocketChannel就是一个socket,然后通过selector.select()方法开启监听。这些工作都在NioEventLoop中执行。
(3)、第三步:给每个NioEventLoop设置Future监听器。
private MultithreadEventExecutorGroup(int nEventExecutors, Executor executor, boolean shutdownExecutor, Object... args) {
//......nEventExecutors小于0抛出异常......
if (executor == null) {
executor = newDefaultExecutorService(nEventExecutors); (1)
shutdownExecutor = true;
}
children = new EventExecutor[nEventExecutors]; (2)
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
for (int i = 0; i < nEventExecutors; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
//.....
} finally {
//.....如果创建不成功,优雅关闭线程池.....
}
}
final boolean shutdownExecutor0 = shutdownExecutor;
final Executor executor0 = executor;
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
if (shutdownExecutor0) {
((ExecutorService) executor0).shutdown();
}
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener); (3)
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
2. ServerBootstrap的初始化
bootstrap简单的说就是四步骤:创建socket,然后初始化,注册,绑定。
ChannelFuture future = sb.bind(port).sync();
重头戏就是这个doBind方法。
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister(); (......1......)
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);(......2......)
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.executor = channel.eventLoop();
}
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}
2.1、initAndRegister()
这个方法分为三步,创建socket,然后初始化,最后注册,分三个小节讲
ChannelFuture initAndRegister() {
(1)这个方法就是通过反射获取ServerBoostrap中配置的channel:NioServerSocketChannel
final Channel channel = channelFactory().newChannel();
try {
(2)创建完socket之后,然后初始化一下
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
(3)初始化完了之后,然后注册一下
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
2.1.1 创建NioServerSocketChannel
- 创建socket,然后设置socket兴趣点和非阻塞
- 创建unsafe用来做io操作
- 创建一个pipeline双向链表
- 然后需要提供一个对外暴露的config
//在构造中创建ServerSocketChannel,openServerSocketChannel其实就是创建了一个socket
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
}
}
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
//然后设置OP_ACCEPT兴趣点,并对外提供config
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;//熟悉的味道,设置兴趣点
try {
ch.configureBlocking(false);//熟悉的味道,设置socket为非阻塞
} catch (IOException e) {
........
}
}
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = DefaultChannelId.newInstance();//最后创建一个id
unsafe = newUnsafe();//最后创建一个unsafa,就是NioMessageUnsafe(),这个很重要,channel的注册,绑定,链接,读写,也就是socket的相关io都是通过这个类完成。
pipeline = new DefaultChannelPipeline(this);//这个是重点,创建pipe通道,这个是双向链表。
}
//这个从这个pipeline的构造中可以看出来,这个是双向链表,一头一尾;
DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
2.1.2 初始化NioServerSocketChannel
初始化就是做一些socket的配置工作,如设置配置、设置属性
@Override
void init(Channel channel) throws Exception {
//设置socket的一些配置,详细可以查看:https://blog.csdn.net/qq_28198181/article/details/82152338 这个介绍很详细
//常用的比如AUTO_READ、SO_KEEPALIVE、SO_REUSEADDR、SO_LINGER、SO_BACKLOG、SO_TIMEOUT、TCP_NODELAY
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//这里的pipeline在NioServerSocketChannel初始化的时候就已经创建了pipeline = new DefaultChannelPipeline(this);是一个双向链表,管道里添加的是handler。
ChannelPipeline p = channel.pipeline();
if (handler() != null) {
p.addLast(handler()); //这个添加的是MainReactor中的handler
}
//添加完MainReactor之后,然后将Acceptor这个handler加入到pipe中去,
//同时,将subReactor的一系列配置都添加到Accept中去,这样当Accept获取到有效链接请求之后,就能直接将channel注册到SubReactor中去了,这个思路跟基础知识一中的主从Reactor实现思路是一样的。
final EventLoopGroup currentChildGroup = childGroup; //这个是worker的Reactor
final ChannelHandler currentChildHandler = childHandler; //这个是SubReactor中的handler
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
2.1.3 注册NioServerSocketChannel
- 关于注册先整理一下思路:根据基础知识中的Reactor模型,首先创建一个线程池,将channel线注册到MianReactor中去专门用来监听客户端的请求事件,然后将有效请求的channel注册到SubReactor中的子线程中去处理IO读写操作。注册结束的标志就是将ServerSocketChannel注册到Selector中去。
注册过程跳转很长,原因就是专门的人做专门的事,首先注册需要在MainReactor路由中统筹管理,就像大老板,他将活分给某个部门的部门经理,也就是NioEventLoop(chooser.next())去执行,这个部门经理不干太细的活,像注册、绑定、连接、读写这些细活就派给了小员工去完成,也就是unSafa这个类。 - 注册结束后,将handler都填充到pipeline中去
//这里注意的是group()获取的是MainReactor
ChannelFuture regFuture = group().register(channel);//通过2.1中的regist开始注册
//调用NioEventLoopGroup中的父类MultithreadEventLoopGroup中的register
//这里的next其实就是通过chooser选择器从NioEventLoop数组中获取一个loop用来执行注册事件。
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
//PowerOfTwoEventExecutorChooser选择器,位运算取余,效率更高,需要满足2的N次幂
children[childIndex.getAndIncrement() & children.length - 1];
//GenericEventExecutorChooser选择器,摩除取余,任意数取余,效率低点
children[Math.abs(childIndex.getAndIncrement() % children.length)];
//NioEventLoop中的注册方法
public ChannelFuture register(Channel channel, ChannelPromise promise) {
channel.unsafe().register(this, promise);
return promise;
}
//最后通过unsafa完成注册工作,上面说过,注册、绑定、链接、读写都在这个类中完成。
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
........判空+是否注册过...........
if (AbstractChannel.this.eventLoop == null) {
AbstractChannel.this.eventLoop = new PausableChannelEventLoop(eventLoop);
} else {
AbstractChannel.this.eventLoop.unwrapped = eventLoop;
}
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//第一次进来走这里,调用NioEventLoop中的自定义execute方法将register作为一个task任务添加到LinkedBlockingQueue队列中
//然后先开启select.select(),同时开启死循环,不断调用run方法,执行register0任务。
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
........
}
}
}
//接着调用register0();这个注册工作主要做了两件事。
1、执行doRegister方法:终于看到了这句非常熟悉而又亲切的serverSocketChannel注册到selector的语句,到这里,注册工作就算完成了。
2、执行fireChannelRegistered()方法完成pipeline的handler填充工作
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();(1)
neverRegistered = false;
registered = true;
eventLoop.acceptNewTasks();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();(2)
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
.............
}
}
// 首先看1的注册工作,罗里吧嗦调用了一堆,最后还是调用了这句熟悉又简练的语句来结束注册工作
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
return;
} catch (CancelledKeyException e) {
........这里抛出异常..........
}
}
}
//注册完了之后,然后往pipeLine管道中添加handler事件,从head开始,通过next.fireChannelRegistered这个责任链模式依次寻找下一个节点执行其中的channelRegistered()方法
public ChannelPipeline fireChannelRegistered() {
head.fireChannelRegistered();
return this;
}
public ChannelHandlerContext fireChannelRegistered() {
AbstractChannelHandlerContext next = findContextInbound();//其实就是找到head.next且标志位是MASKGROUP_INBOUND的节点
next.invoker().invokeChannelRegistered(next); //这个invoker的角色就相当于Android系统的instrumentation的角色,这里负责调用handler中的一些方法。
--------->invokeChannelRegisteredNow(ctx);
--------->ctx.handler().channelRegistered(ctx);
return this;
}
//如果channelRegister发现handler是ChannelInitializer,那么就执行initChannel()方法,将里面的handler添加到pipeLine中,然后继续分发责任链fireChannelRegistered。
//如果是ChannelHandlerAdapter就继续分发fireChannelRegistered(),知道添加tail结束
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline pipeline = ctx.pipeline();
boolean success = false;
try {
initChannel((C) ctx.channel());
pipeline.remove(this);
ctx.fireChannelRegistered();
success = true;
} catch (Throwable t) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
} finally {
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
if (!success) {
ctx.close();
}
}
}
2.2、doBind0(regFuture, channel, localAddress, promise)
注册完成后,最后开始绑定端口的工作。绑定肯定是socket来绑定,最后到serverSocketChannel.bind()完成绑定工作。
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
附加调转代码:
--------------->pipeline.bind(localAddress, promise);
--------------->tail.bind(localAddress, promise);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
可以看到绑定跟注册其实是一样的,只不过一个从head开始往后遍历,一个从tail开始往前遍历。
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound(); //就是通过this.prev上一个节点,this指的是tail。
next.invoker().invokeBind(next, localAddress, promise);//这个跟注册一样,调用pipeline上的节点
附加调转代码:
--------------->invokeBindNow(ctx, localAddress, promise);
--------------->ctx.handler().bind(ctx, localAddress, promise);
--------------->unsafe.bind(localAddress, promise); //绑定工作交给unsafe
--------------->doBind(localAddress);
--------------->javaChannel().socket().bind(localAddress, config.getBacklog());
return promise;
}
绑定完成之后,通过pipeline.fireChannelActive()方法来给每个handler挨个发送通知。从head开始,到目前pipeline中只有一个ServerBootstrapAccept,没有channelActive方法,所以可以直接结束了。
channel.read();注册结束之后,将accept打开,开始读取数据
从read到doBeginRead中间跳转的过程比较曲折,简单来讲还是一样的,读写从pipeLine管道的tail开始,然后通过invoke管理工具同意操作handler,最终交给unsafe最具体的操作。
最后就到了Reactor中熟悉的画面:在2.1.3中已经注册完socket了,2.2中已经bin完socket了,这里将兴趣点设置成OP_ACCEPT。
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
3. Accept的功能
那么到这里为止MainReactor的结构和注册逻辑就分析完了,上面2.1.2 初始化NioServerSocketChannel的时候已经分析了,所有child的一些配置项都包装到Accept中去了,所以当Accept成功接收到一个客户端的请求时,就会将这个socket注册到subReactor中去,init和register的步骤跟Main是一样的。
这里的步骤跟Main中的初始化socket的步骤一样,添加pipeline、配置soket、最后注册channel到Reactor中去。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
上面的都是分析,看着很散,最后总结一下吧:
简单回忆一下基于IO多路复用机制的Reactor的设计模型:在主从Reactor模型中,MainReactor随机选择一个线程启动Accept,专门负责接收来自客户端的连接请求,并在Accept中封装了SubReactor,这样当Accept接收到有效请求后,将socket注册到SubReactor中去,职责分工完毕。
结合上面的分析先理一下流程:
- 主从Reactor需要new两个NioEventLoopGroup对象,分别为boss和worker;
- 在NioEventLoopGroup中,创建一个线程池,一个EventExecutor执行组(大小是内核数两倍)、每个执行组中添加NioEventLoop执行者,一个chooser选择器(专门用来挑选执行者的)、添加listener监听;
- 初始化ServerBootstrap总指挥,然后配置一些参数,如group(封装Reactor)、channel(socket对象)、handler(boss用的)、option(boss的socket的参数),childOption(subReactor的socket的参数)、childHandler(worker用的)
- 创建socket,通过反射创建的NioServerSocketChannel对象,里面构造的时候,通过select的provider创建ServerSocketChannel,同时创建一个unSafe对象,一个PipeLine对象(是一个双向链表head和tail,通过fireChannelXXX责任链将消息分发给各个节点处理)
- 初始化socket,就是添加socket配置和相关属性,如添加AUTO_READ、SO_KEEPLIVE、SO_BACKLOG等待,同时填充pipeLine。
- 注册socket,通过一系列的调用:选择一个NioEvnetLoop,通过其中的unSafe最终socket注册到selector选择器上,这是通过forkJoinPool线程池执行的异步任务,任务中开始执行selector的select方法,然后调用doRegister执行注册;此外通过责任链依次遍历节点,填充pipeLine管道,此时在MainReactor中填充的就是ServerBootstrapAccepter,同时将SubReactor中的所有信息都封装进去,你懂的~~。
- socket准备就绪后,开始绑定端口了,跟上面一样一系列分工调用,最后在通过socket.bind完成绑定工作,同时将Accepter的兴趣点设置为OP_ACCEPT;
- 因为在第6点的异步任务中已经执行了select()阻塞监听,因此当有连接进入的时候,在Accept这个handler中回调channelRead方法,接收消socket,并将socket注册到SubReactor中去,同是完成subReactor中的一系列的创建初始化和注册功能。过程跟MainReactor过程是一样的。
关于Reactor模型在Netty中的应用就分析完了,打完收工。
本文地址:https://blog.csdn.net/massonJ/article/details/108173436