Netty源码分析----NioEventLoop之处理请求
(*文章基于Netty4.1.22版本)
之前已经讲了NioEventLoop的初始化以及核心的队列了,最后还涉及到的就是非常核心的一部分,就是NioEventLoop如何去处理请求。
其核心逻辑是run方法,记得之前说过,NioEventLoop的线程初始化的时候,会调用一下run方法开始处理请求
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
// ....
SingleThreadEventExecutor.this.run();
// ....
}
});
}
接下来就看下run方法的实现
@Override
protected void run() {
for (;;) {
// ....
}
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
上面是run方法的大体结构,可以看到除非Shutdown了,否则会在一个无限循环中不停的处理事情。Nio也是类似的,不停的从selector中获取selectionKey,然后进行处理,而这里,大同小异,不过是Netty封装了很多东西(例如处理队列任务),接下来看下for循环中的逻辑
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));// 获取准备好的selectionKey;类似selector.select方法,这里做了额外的一些操作
if (wakenUp.get()) {// 是否需要唤醒selector
selector.wakeup();
}
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
// 这里就是io任务占用时间与非io任务占用时间的占比,之前一直提起的就是这个值
// 其值=100 * io时间 /(io时间-非io时间)
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {// 根据算法,非io时间为0,那么则全部进行io任务
try {
processSelectedKeys();
} finally {// 执行完io任务后才会执行非io任务,且不需要限制时间
runAllTasks();
}
} else {// 如果设置了占比,那么需要
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {// 执行完io任务,根据占比求得非io任务运行的时间限制
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
前半部分主要是获取准备就绪的selectionKey,后半部分主要是执行io任务和非io任务,根据ioRatio的值将非io任务区分为无限制时间执行和有限制的执行。
runAllTasks方法在任务队列的文章中分析过,这里便不再分析。在nio的程序中,使用selector.select方法可以获取到准备就绪的selectionKey,netty在这里是调用了select,进行了除selector.select之后的一些处理,代码比较长,截取了主要的部分
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;// select次数
long currentTimeNanos = System.nanoTime();
// delayNanos返回最近一个延时离现在的间隔时间,
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 等于(delayNanos+500000L)/ 1000000L
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {// 如果delayNanos<500000L,那么timeoutMillis<=0,该情况直接调用selectNow返回,因为有任务需要执行
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// 否则调用select方法,阻塞timeoutMillis时间
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;//次数+1
//....
//满足如下条件则返回
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// 即 当前时间-开始执行的时间>=超时时间,到达这里证明没有获取到准备就绪的selectionKey
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 如果到达这个分支,证明在timeoutMillis时间内,没有获取到时间,但是有没有阻塞,可能触发了NIO selector空转的BUG
// 判断select的次数是否大于SELECTOR_AUTO_REBUILD_THRESHOLD,如果满足了,代表可能触发了NIO的bug,那么调用rebuildSelector重建selector
rebuildSelector();
selector = this.selector;
// 再用新的selector获取selectionKey.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
//....
}
} catch (CancelledKeyException e) {
//....
}
}
主要的流程看一**释就好了,和NIO程序差不多,Netty在这之上,根据其他条件,控制了select阻塞时间,或者说控制是否阻塞,另外在触发了NIO的BUG的情况下,如何解决,这个主要是rebuildSelector
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
//....
try {
newSelectorTuple = openSelector();//创建一个新的SelectorTuple
} catch (Exception e) {
return;
}
// 将对应的Channel注册到新的Selector上.
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
//....
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
//....
}
}
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// 关闭旧的selector
oldSelector.close();
} catch (Throwable t) {
//....
}
}
然后看下一下如何处理SelectionKey
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
两个分支,根据是否设置了selectedKeys 来判断是否要使用Netty优化的策略进行处理selectionKey,两种方式的获取的SelectionKey集合不一样,在优化的策略中,使用的是SelectedSelectionKeySet类型,与原生NIO的不一样,这里先看下selectedKeys的初始化
在NioEventLoop初始化的时候,会调用openSelector创建一个Selector,这里判断了是否要使用优化的策略(只保留核心代码)
private static final boolean DISABLE_KEYSET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
// ....
unwrappedSelector = provider.openSelector();
// ....
if (DISABLE_KEYSET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
// ....
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
// ....
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
// ....
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
// ....
}
});
selectedKeys = selectedKeySet;
// ....
}
首先创建一个selector,然后会判断DISABLE_KEYSET_OPTIMIZATION这个变量,这里意思是是否禁用SelectionKey的优化,那么如果false,则会先创建SelectedSelectionKeySet类型的变量,后面,使用反射,将这个类型的变量替换调原生Selector中的SelectionKey集合,其原生类型是一个Set,而SelectedSelectionKeySet也是Set的子类,所以这样转换没有问题,到最后,再其引用赋值给NioEventLoop的成员变量selectedKeys,那么此时selectedKeys与Selector中的Key集合都是指向同一个实例,也就是说,当Selector进行select操作获取到SelectionKey之后,selectedKeys就会有值。
Selector在进行select操作之后,会调用Set的add方法将selectionKey放入其中,Netty实现了自己的Set的原因在于原生是使用HashSet,其底层是HashMap,遍历的效率较低,而SelectedSelectionKeySet底层是一个数组,遍历相对较快,而我们在得到selectionKey集合的时候需要遍历集合进行处理
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys;
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}
return true;
}
//....
}
可以看到内部是一个SelectionKey类型的数组
接下来看下processSelectedKeysOptimized的处理逻辑
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {// 遍历集合
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
在注册的时候,att参数类型是ServerSocketChannel,所以这里是AbstractNioChannel类型,调用processSelectedKey处理
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// 一个Channel会对应一个Unsafe对象,负责底层的操作
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//....
//....
unsafe.read();
//....
}
底层也是通过unsafe来处理请求,就例如,NIO程序中key.isReadable()判断为true之后的处理.
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//readBuf为List,最多放了一个元素,为NioSocketChannel对象
int localRead = doReadMessages(readBuf);
//....
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {// 触发pipeline的read事件的传播
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();//触发read complete事件传播
if (exception != null) {
closed = closeOnReadError(exception);
// 如果有异常的话,触发异常事件传播
pipeline.fireExceptionCaught(exception);
}
} finally {
//....
}
}
}
doReadMessages方法如下:
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
//....
}
return 0;
}
将原生的socketchannel转换成NioSocketChannel,处理类似NIO程序的代码
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = channel.accept();
另外看下,触发的两个事件,一个是read事件,一个是readComplete事件,先看下read事件的传播。
回顾一下服务启动和pipeline文章相关的分析,这个时候pipeline只有3个Hander(Context),如下:
HeadContext -> ServerBootstrapAcceptor -> TailContext
HeadContext没有对read事件进行处理,直接传播到下一个Handler,即ServerBootstrapAcceptor,看下其channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 为当前Channel的pipeline添加自定义Handler
child.pipeline().addLast(childHandler);
// 设置属性
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 将Channel注册到workergroup的EventLoop上
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
ServerBootstrapAcceptor做的事情很简单:
1. 为pipeline添加程序中自定义的childHandler(ChannelInitializer)
2. 设置一下Channel属性
3. 将Channel注册到workerGroup上
ServerBootstrapAcceptor主要是衔接boss线程组和worker线程组。
另外要记住的第一点,是我们一开始调用childHandler加入的Handler(ChannelInitializer)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline()
.addLast(new StringDecoder())
.addLast(new ServerHandler())
;
}
});
在服务启动文章中分析过
其实ChannelInitializer实现了Channel注册后的为每一个Channel添加ChannelHandler的功能,但是其本质也是也是一个ChannelHandler
在注册前,会调用注册前添加的Handler,然后会调用到Handler的handlerAdded方法,其中会调用上面的initChannel,这样就给当前Channel添加了自定义的Handler。
流程是和之前分析的一样的,细节不同点如下:
1. 这时候的Channel是NioSocketChannel,服务启动的时候是NioServerSocketChanel
2. 服务启动的时候,通过ChannelInitializer来为pipeline添加ServerBootstrapAcceptor这个Handler,而此时通过ChannelInitializer为pipeline添加业务上自定义的Handler
3. 两者添加的Handler功能不一样,一个是衔接boss和worker线程组,是系统内置功能;一个是处理业务请求,编码者自定义
worker线程的channel注册过程是一样的,最后也会执行NioEventLoop的run方法,处理SelectionKey,同样执行到unsafe的read方法,但是触发pipeline的read事件的时候,pipeline中的Handler不一样,此时的pipeline是业务上自定义的Handler,如下
private static class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
String body = (String)msg;
System.out.println("receive body:"+body + ",currentHandler:"+this);
ctx.write(msg);
ctx.flush();
}
}
接下来看下readComplete事件,会依次触发pipeline上的Handler的channelReadComplete方法,上面说过此时pipeline的Handler如下
HeadContext -> ServerBootstrapAcceptor -> TailContext
只有HeadContext有逻辑,另外都是直接往后传播或者空方法,看下HeadContext的逻辑
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();// 往后传播,后面的Handler没处理,直接无视
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {// autoRead是否打开
channel.read();// 这个最后会调用unsafe的beginRead方法
}
}
之前讲过,Channel初始化的时候会设置关心的事件,但是没有注册上去,在unsafe的beginRead方法里才是注册事件的地方,不过在注册的时候isActive为true,触发了channelActive方法,提前注册了事件
private void register0(ChannelPromise promise) {
try {
// ....
// 间接触发了unsafe的begin的beginRead方法
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
//....
}
}
之前说过NioServerSocketChannel和NioSocketChannel的unsafe对象不一样,NioServerSocketChannel对应的是NioMessageUnsafe,NioSocketChannel对应的是NioSocketChannelUnsafe,所以worker线程组的NioEventLoop最后处理的是NioSocketChannelUnsafe的read方法(实际是其父类的read方法)
@Override
public final void read() {
//....
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);// 分配池化内存
allocHandle.lastBytesRead(doReadBytes(byteBuf));//从Channel中读取数据到buffer
//....
pipeline.fireChannelRead(byteBuf);// 触发事件传播
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
//....
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);//异常处理
} finally {
// ....
}
}
}
逻辑和NioMessageUnsafe的read方法类似,触发的事件一样,不同的是pipeline中handler不同,这里的fireChannelRead会触发我们自定义的channelRead方法,且传递的对象不一样,这个时候是实际的业务数据。
到这里,整个处理请求的过程就完毕了,稍微总结一下:
1. Netty在selector.select之上,判断了任务,唤醒状态位等一些条件,以此决定select是否该阻塞和阻塞的时间
2. 在触发了NIO的BUG的时候,重建selector,主要是将旧的selector对应的channel注册到新的selector上,取消旧的selector和Key
3. NioEventLoop会处理io任务和非io任务,其时间占比由ioRatio决定
4. Netty通过反射将NIO原生的SelectionKey集合替换成自定义的集合,主要是数组实现
5. boss线程组主要负责连接请求,然后派发给worker线程组进行处理
6. ServerBootstrapAcceptor主要负责boss和worker的衔接,即将channel注册到woker中的NioEventLoop上
7. worker线程组负责处理数据的读取,通过pipeline将数据传递到自定义的Handler上
8. boss和worker线程组的NioEventLoop在注册前都会通过一个叫ChannelInitializer的Handler,为Channel添加pipeline
9. boss线程组的ChannelInitializer负责添加ServerBootstrapAcceptor这个Handler到pipeline上,而worker线程组的ChannelInitializer负责添加自定义的Handler到pipeline上
上一篇: java netty http https pool 例子
下一篇: ssl双向验证
推荐阅读
-
scrapy-redis源码分析之发送POST请求详解
-
Tomcat源码分析 (九)----- HTTP请求处理过程(二)
-
Netty源码分析之ChannelPipeline(二)—ChannelHandler的添加与删除
-
Netty源码分析 (十)----- 拆包器之LineBasedFrameDecoder
-
netty之NioEventLoopGroup源码分析二
-
netty中的发动机--EventLoop及其实现类NioEventLoop的源码分析
-
Netty源码分析之Server bindAsync
-
Tomcat源码分析 (八)----- HTTP请求处理过程(一)
-
以太坊源码分析之 P2P网络(六、p2p连接控制与消息处理(下))
-
以太坊源码分析之 P2P网络(五、p2p连接控制与消息处理(中))