Netty源码分析 (六)----- 客户端接入accept过程
通读本文,你会了解到
1.netty如何接受新的请求
2.netty如何给新请求分配reactor线程
3.netty如何给每个新连接增加channelhandler
netty中的reactor线程
netty中最核心的东西莫过于两种类型的reactor线程,可以看作netty中两种类型的发动机,驱动着netty整个框架的运转
一种类型的reactor线程是boos线程组,专门用来接受新的连接,然后封装成channel对象扔给worker线程组;还有一种类型的reactor线程是worker线程组,专门用来处理连接的读写
不管是boos线程还是worker线程,所做的事情均分为以下三个步骤
- 轮询注册在selector上的io事件
- 处理io事件
- 执行异步task
对于boos线程来说,第一步轮询出来的基本都是 accept 事件,表示有新的连接,而worker线程轮询出来的基本都是read/write事件,表示网络的读写事件
新连接的建立
简单来说,新连接的建立可以分为三个步骤
1.检测到有新的连接
2.将新的连接注册到worker线程组
3.注册新连接的读事件
检测到有新连接进入
我们已经知道,当服务端绑启动之后,服务端的channel已经注册到boos reactor线程中,reactor不断检测有新的事件,直到检测出有accept事件发生
nioeventloop.java
private static void processselectedkey(selectionkey k, abstractniochannel ch) { final niounsafe unsafe = ch.unsafe(); //检查该selectionkey是否有效,如果无效,则关闭channel if (!k.isvalid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidpromise()); return; } try { int readyops = k.readyops(); // also check for readops of 0 to workaround possible jdk bug which may otherwise lead // to a spin loop // 如果准备好read或accept则触发unsafe.read() ,检查是否为0,如上面的源码英文注释所说:解决jdk可能会产生死循环的一个bug。 if ((readyops & (selectionkey.op_read | selectionkey.op_accept)) != 0 || readyops == 0) { unsafe.read(); if (!ch.isopen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件 // connection already closed - no need to handle write. return; } } // 如果准备好了write则将缓冲区中的数据发送出去,如果缓冲区中数据都发送完成,则清除之前关注的op_write标记 if ((readyops & selectionkey.op_write) != 0) { // call forceflush which will also take care of clear the op_write once there is nothing left to write ch.unsafe().forceflush(); } // 如果是op_connect,则需要移除op_connect否则selector.select(timeout)将立即返回不会有任何阻塞,这样可能会出现cpu 100% if ((readyops & selectionkey.op_connect) != 0) { // remove op_connect as otherwise selector.select(..) will always return without blocking // see https://github.com/netty/netty/issues/924 int ops = k.interestops(); ops &= ~selectionkey.op_connect; k.interestops(ops); unsafe.finishconnect(); } } catch (cancelledkeyexception ignored) { unsafe.close(unsafe.voidpromise()); } }
该方法主要是对selectionkey k进行了检查,有如下几种不同的情况
1)op_accept,接受客户端连接
2)op_read, 可读事件, 即 channel 中收到了新数据可供上层读取。
3)op_write, 可写事件, 即上层可以向 channel 写入数据。
4)op_connect, 连接建立事件, 即 tcp 连接已经建立, channel 处于 active 状态。
本篇博文主要来看下当boss线程 selector检测到op_accept事件时,内部干了些什么。
if ((readyops & (selectionkey.op_read | selectionkey.op_accept)) != 0 || readyops == 0) { unsafe.read(); if (!ch.isopen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件 // connection already closed - no need to handle write. return; } }
boos reactor线程已经轮询到 selectionkey.op_accept
事件,说明有新的连接进入,此时将调用channel的 unsafe
来进行实际的操作,此时的channel为 nioserversocketchannel,则unsafe为nioserversocketchannel的属性niomessageunsafe
那么,我们进入到它的read
方法,进入新连接处理的第二步
注册到reactor线程
niomessageunsafe.java
private final list<object> readbuf = new arraylist<object>(); public void read() { assert eventloop().ineventloop(); final channelpipeline pipeline = pipeline(); final recvbytebufallocator.handle allochandle = unsafe().recvbufallochandle(); do { int localread = doreadmessages(readbuf); if (localread == 0) { break; } if (localread < 0) { closed = true; break; } } while (allochandle.continuereading()); int size = readbuf.size(); for (int i = 0; i < size; i ++) { pipeline.firechannelread(readbuf.get(i)); } readbuf.clear(); pipeline.firechannelreadcomplete(); }
调用 doreadmessages
方法不断地读取消息,用 readbuf
作为容器,这里,其实可以猜到读取的是一个个连接,然后调用 pipeline.firechannelread()
,将每条新连接经过一层服务端channel的洗礼,之后清理容器,触发 pipeline.firechannelreadcomplete()
下面我们具体看下这两个方法
1.doreadmessages(list)
2.pipeline.firechannelread(niosocketchannel)
doreadmessages()
protected int doreadmessages(list<object> buf) throws exception { socketchannel ch = javachannel().accept(); try { if (ch != null) { buf.add(new niosocketchannel(this, ch)); return 1; } } catch (throwable t) { logger.warn("failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (throwable t2) { logger.warn("failed to close a socket.", t2); } } return 0; }
我们终于窥探到netty调用jdk底层nio的边界 javachannel().accept();
,由于netty中reactor线程第一步就扫描到有accept事件发生,因此,这里的accept
方法是立即返回的,返回jdk底层nio创建的一条channel
serversocketchannel有阻塞和非阻塞两种模式:
a、阻塞模式:serversocketchannel.accept() 方法监听新进来的连接,当 accept()方法返回的时候,它返回一个包含新进来的连接的 socketchannel。阻塞模式下, accept()方法会一直阻塞到有新连接到达。
b、非阻塞模式:,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。 因此,需要检查返回的socketchannel是否是null.
在nioserversocketchannel的构造函数分析中,我们知道,其通过ch.configureblocking(false);语句设置当前的serversocketchannel为非阻塞的。
netty将jdk的 socketchannel
封装成自定义的 niosocketchannel
,加入到list里面,这样外层就可以遍历该list,做后续处理
从上一篇文章中,我们已经知道服务端的创建过程中会创建netty中一系列的核心组件,包括pipeline,unsafe等等,那么,接受一条新连接的时候是否也会创建这一系列的组件呢?
带着这个疑问,我们跟进去
niosocketchannel.java
public niosocketchannel(channel parent, socketchannel socket) { super(parent, socket); config = new niosocketchannelconfig(this, socket.socket()); }
我们重点分析 super(parent, socket),
niosocketchannel
的父类为 abstractniobytechannel
abstractniobytechannel.java
protected abstractniobytechannel(channel parent, selectablechannel ch) { super(parent, ch, selectionkey.op_read); }
这里,我们看到jdk nio里面熟悉的影子—— selectionkey.op_read
,一般在原生的jdk nio编程中,也会注册这样一个事件,表示对channel的读感兴趣
我们继续往上,追踪到abstractniobytechannel
的父类 abstractniochannel
, 这里,我相信读了上一篇文章你对于这部分代码肯定是有印象的
protected abstractniochannel(channel parent, selectablechannel ch, int readinterestop) { super(parent); this.ch = ch; this.readinterestop = readinterestop; try { ch.configureblocking(false); } catch (ioexception e) { try { ch.close(); } catch (ioexception e2) { if (logger.iswarnenabled()) { logger.warn( "failed to close a partially initialized socket.", e2); } } throw new channelexception("failed to enter non-blocking mode.", e); } }
在创建服务端channel的时候,最终也会进入到这个方法,super(parent)
, 便是在abstractchannel
中创建一系列和该channel绑定的组件,如下
protected abstractchannel(channel parent) { this.parent = parent; id = newid(); unsafe = newunsafe(); pipeline = newchannelpipeline(); }
而这里的 readinterestop
表示该channel关心的事件是 selectionkey.op_read
,后续会将该事件注册到selector,之后设置该通道为非阻塞模式,在channel中创建 unsafe 和一条 pipeline
pipeline.firechannelread(niosocketchannel)
对于 pipeline
我们前面已经了解过,在netty的各种类型的channel中,都会包含一个pipeline,字面意思是管道,我们可以理解为一条流水线工艺,流水线工艺有起点,有结束,中间还有各种各样的流水线关卡,一件物品,在流水线起点开始处理,经过各个流水线关卡的加工,最终到流水线结束
对应到netty里面,流水线的开始就是headcontxt
,流水线的结束就是tailconext
,headcontxt
中调用unsafe
做具体的操作,tailconext
中用于向用户抛出pipeline中未处理异常以及对未处理消息的警告
通过前面的文章中,我们已经知道在服务端的channel初始化时,在pipeline中,已经自动添加了一个pipeline处理器 serverbootstrapacceptor
, 并已经将用户代码中设置的一系列的参数传入了构造函数,接下来,我们就来看下serverbootstrapacceptor
serverbootstrapacceptor.java
private static class serverbootstrapacceptor extends channelinboundhandleradapter { private final eventloopgroup childgroup; private final channelhandler childhandler; private final entry<channeloption<?>, object>[] childoptions; private final entry<attributekey<?>, object>[] childattrs; serverbootstrapacceptor( eventloopgroup childgroup, channelhandler childhandler, entry<channeloption<?>, object>[] childoptions, entry<attributekey<?>, object>[] childattrs) { this.childgroup = childgroup; this.childhandler = childhandler; this.childoptions = childoptions; this.childattrs = childattrs; } public void channelread(channelhandlercontext ctx, object msg) { final channel child = (channel) msg; child.pipeline().addlast(childhandler); for (entry<channeloption<?>, object> e: childoptions) { try { if (!child.config().setoption((channeloption<object>) e.getkey(), e.getvalue())) { logger.warn("unknown channel option: " + e); } } catch (throwable t) { logger.warn("failed to set a channel option: " + child, t); } } for (entry<attributekey<?>, object> e: childattrs) { child.attr((attributekey<object>) e.getkey()).set(e.getvalue()); } try { childgroup.register(child).addlistener(new channelfuturelistener() { @override public void operationcomplete(channelfuture future) throws exception { if (!future.issuccess()) { forceclose(child, future.cause()); } } }); } catch (throwable t) { forceclose(child, t); } } }
前面的 pipeline.firechannelread(niosocketchannel);
最终通过head->unsafe->serverbootstrapacceptor的调用链,调用到这里的 serverbootstrapacceptor
的channelread
方法,而 channelread
一上来就把这里的msg强制转换为 channel
然后,拿到该channel,也就是我们之前new出来的 niosocketchannel中
对应的pipeline,将用户代码中的 childhandler
,添加到pipeline,这里的 childhandler
在用户代码中的体现为
serverbootstrap b = new serverbootstrap(); b.group(bossgroup, workergroup) .channel(nioserversocketchannel.class) .childhandler(new channelinitializer<socketchannel>() { @override public void initchannel(socketchannel ch) throws exception { channelpipeline p = ch.pipeline(); p.addlast(new echoserverhandler()); } });
其实对应的是 channelinitializer
,到了这里,niosocketchannel
中pipeline对应的处理器为 head->channelinitializer->tail,牢记,后面会再次提到!
接着,设置 niosocketchannel
对应的 attr和option,然后进入到 childgroup.register(child)
,这里的childgroup就是我们在启动代码中new出来的nioeventloopgroup
我们进入到nioeventloopgroup
的register
方法,代理到其父类multithreadeventloopgroup
multithreadeventloopgroup.java
public channelfuture register(channel channel) { return next().register(channel); }
这里又扯出来一个 next()方法,我们跟进去
multithreadeventloopgroup.java
@override public eventloop next() { return (eventloop) super.next(); }
回到其父类
multithreadeventexecutorgroup.java
@override public eventexecutor next() { return chooser.next(); }
这里的chooser对应的类为 eventexecutorchooser
,字面意思为事件执行器选择器,放到我们这里的上下文中的作用就是从worker reactor线程组中选择一个reactor线程
public interface eventexecutorchooserfactory { /** * returns a new {@link eventexecutorchooser}. */ eventexecutorchooser newchooser(eventexecutor[] executors); /** * chooses the next {@link eventexecutor} to use. */ @unstableapi interface eventexecutorchooser { /** * returns the new {@link eventexecutor} to use. */ eventexecutor next(); } }
chooser的实现有两种
public final class defaulteventexecutorchooserfactory implements eventexecutorchooserfactory { public static final defaulteventexecutorchooserfactory instance = new defaulteventexecutorchooserfactory(); private defaulteventexecutorchooserfactory() { } @suppresswarnings("unchecked") @override public eventexecutorchooser newchooser(eventexecutor[] executors) { if (ispoweroftwo(executors.length)) { return new poweroftoweventexecutorchooser(executors); } else { return new genericeventexecutorchooser(executors); } } private static boolean ispoweroftwo(int val) { return (val & -val) == val; } private static final class poweroftoweventexecutorchooser implements eventexecutorchooser { private final atomicinteger idx = new atomicinteger(); private final eventexecutor[] executors; poweroftoweventexecutorchooser(eventexecutor[] executors) { this.executors = executors; } @override public eventexecutor next() { return executors[idx.getandincrement() & executors.length - 1]; } } private static final class genericeventexecutorchooser implements eventexecutorchooser { private final atomicinteger idx = new atomicinteger(); private final eventexecutor[] executors; genericeventexecutorchooser(eventexecutor[] executors) { this.executors = executors; } @override public eventexecutor next() { return executors[math.abs(idx.getandincrement() % executors.length)]; } } }
默认情况下,chooser通过 defaulteventexecutorchooserfactory
被创建,在创建reactor线程选择器的时候,会判断reactor线程的个数,如果是2的幂,就创建poweroftoweventexecutorchooser
,否则,创建genericeventexecutorchooser
两种类型的选择器在选择reactor线程的时候,都是通过round-robin的方式选择reactor线程,唯一不同的是,poweroftoweventexecutorchooser
是通过与运算,而genericeventexecutorchooser
是通过取余运算,与运算的效率要高于求余运算
选择完一个reactor线程,即 nioeventloop
之后,我们回到注册的地方
public channelfuture register(channel channel) { return next().register(channel); }
singlethreadeventloop.java
@override public channelfuture register(channel channel) { return register(new defaultchannelpromise(channel, this)); }
其实,这里已经和服务端启动的过程一样了,可以参考我前面的文章
abstractniochannel.java
private void register0(channelpromise promise) { boolean firstregistration = neverregistered; doregister(); neverregistered = false; registered = true; pipeline.invokehandleraddedifneeded(); safesetsuccess(promise); pipeline.firechannelregistered(); if (isactive()) { if (firstregistration) { pipeline.firechannelactive(); } else if (config().isautoread()) { beginread(); } } }
和服务端启动过程一样,先是调用 doregister();
做真正的注册过程,如下
protected void doregister() throws exception { boolean selected = false; for (;;) { try { selectionkey = javachannel().register(eventloop().selector, 0, this); return; } catch (cancelledkeyexception e) { if (!selected) { eventloop().selectnow(); selected = true; } else { throw e; } } } }
将该条channel绑定到一个selector
上去,一个selector被一个reactor线程使用,后续该channel的事件轮询,以及事件处理,异步task执行都是由此reactor线程来负责
绑定完reactor线程之后,调用 pipeline.invokehandleraddedifneeded()
前面我们说到,到目前为止niosocketchannel
的pipeline中有三个处理器,head->channelinitializer->tail,最终会调用到 channelinitializer
的 handleradded
方法
public void handleradded(channelhandlercontext ctx) throws exception { if (ctx.channel().isregistered()) { initchannel(ctx); } }
handleradded
方法调用 initchannel
方法之后,调用remove(ctx);
将自身删除,如下
abstractniochannel.java
private boolean initchannel(channelhandlercontext ctx) throws exception { if (initmap.putifabsent(ctx, boolean.true) == null) { try { initchannel((c) ctx.channel()); } catch (throwable cause) { exceptioncaught(ctx, cause); } finally { remove(ctx); } return true; } return false; }
而这里的 initchannel
方法又是神马玩意?让我们回到用户方法,比如下面这段用户代码
用户代码
serverbootstrap b = new serverbootstrap(); b.group(bossgroup, workergroup) .channel(nioserversocketchannel.class) .option(channeloption.so_backlog, 100) .handler(new logginghandler(loglevel.info)) .childhandler(new channelinitializer<socketchannel>() { @override public void initchannel(socketchannel ch) throws exception { channelpipeline p = ch.pipeline(); p.addlast(new logginghandler(loglevel.info)); p.addlast(new echoserverhandler()); } });
原来最终跑到我们自己的代码里去了啊!完了之后,niosocketchannel
绑定的pipeline的处理器就包括 head->logginghandler->echoserverhandler->tail
注册读事件
接下来,我们还剩下这些代码没有分析完
abstractniochannel.java
private void register0(channelpromise promise) { // .. pipeline.firechannelregistered(); if (isactive()) { if (firstregistration) { pipeline.firechannelactive(); } else if (config().isautoread()) { beginread(); } } }
pipeline.firechannelregistered();
,其实没有干啥有意义的事情,最终无非是再调用一下业务pipeline中每个处理器的 channelhandleradded
方法处理下回调
isactive()
在连接已经建立的情况下返回true,所以进入方法块,进入到 pipeline.firechannelactive();
在这里我详细步骤先省略,直接进入到关键环节
abstractniochannel.java
@override protected void dobeginread() throws exception { // channel.read() or channelhandlercontext.read() was called final selectionkey selectionkey = this.selectionkey; if (!selectionkey.isvalid()) { return; } readpending = true; final int interestops = selectionkey.interestops(); if ((interestops & readinterestop) == 0) { selectionkey.interestops(interestops | readinterestop); } }
这里其实就是将 selectionkey.op_read
事件注册到selector中去,表示这条通道已经可以开始处理read事件了
总结
至此,netty中关于新连接的处理已经向你展示完了,我们做下总结
1.boos reactor线程轮询到有新的连接进入
2.通过封装jdk底层的channel创建 niosocketchannel
以及一系列的netty核心组件
3.将该条连接通过chooser,选择一条worker reactor线程绑定上去
4.注册读事件,开始新连接的读写