欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty源码分析----NioEventLoop之处理请求

程序员文章站 2022-04-23 11:55:17
...

(*文章基于Netty4.1.22版本)
之前已经讲了NioEventLoop的初始化以及核心的队列了,最后还涉及到的就是非常核心的一部分,就是NioEventLoop如何去处理请求。

其核心逻辑是run方法,记得之前说过,NioEventLoop的线程初始化的时候,会调用一下run方法开始处理请求

    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                    // ....
                    SingleThreadEventExecutor.this.run();
                    // ....
            }
        });
    }

接下来就看下run方法的实现

    @Override
    protected void run() {
        for (;;) {
           // ....
        }
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
    }

上面是run方法的大体结构,可以看到除非Shutdown了,否则会在一个无限循环中不停的处理事情。Nio也是类似的,不停的从selector中获取selectionKey,然后进行处理,而这里,大同小异,不过是Netty封装了很多东西(例如处理队列任务),接下来看下for循环中的逻辑

switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
    case SelectStrategy.CONTINUE:
        continue;
    case SelectStrategy.SELECT:
        select(wakenUp.getAndSet(false));// 获取准备好的selectionKey;类似selector.select方法,这里做了额外的一些操作

        if (wakenUp.get()) {// 是否需要唤醒selector
            selector.wakeup();
        }
    default:
}

cancelledKeys = 0;
needsToSelectAgain = false;
// 这里就是io任务占用时间与非io任务占用时间的占比,之前一直提起的就是这个值
// 其值=100 * io时间 /(io时间-非io时间) 
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {// 根据算法,非io时间为0,那么则全部进行io任务
    try {
        processSelectedKeys();
    } finally {// 执行完io任务后才会执行非io任务,且不需要限制时间
        runAllTasks();
    }
} else {// 如果设置了占比,那么需要
    final long ioStartTime = System.nanoTime();
    try {
        processSelectedKeys();
    } finally {// 执行完io任务,根据占比求得非io任务运行的时间限制
        final long ioTime = System.nanoTime() - ioStartTime;
        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    }
}

前半部分主要是获取准备就绪的selectionKey,后半部分主要是执行io任务和非io任务,根据ioRatio的值将非io任务区分为无限制时间执行和有限制的执行。

runAllTasks方法在任务队列的文章中分析过,这里便不再分析。在nio的程序中,使用selector.select方法可以获取到准备就绪的selectionKey,netty在这里是调用了select,进行了除selector.select之后的一些处理,代码比较长,截取了主要的部分

    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;// select次数
            long currentTimeNanos = System.nanoTime();
            // delayNanos返回最近一个延时离现在的间隔时间,
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) {
                //  等于(delayNanos+500000L)/ 1000000L
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {// 如果delayNanos<500000L,那么timeoutMillis<=0,该情况直接调用selectNow返回,因为有任务需要执行
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }
                // 否则调用select方法,阻塞timeoutMillis时间
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;//次数+1
                //....
                //满足如下条件则返回
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }

                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// 即 当前时间-开始执行的时间>=超时时间,到达这里证明没有获取到准备就绪的selectionKey
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // 如果到达这个分支,证明在timeoutMillis时间内,没有获取到时间,但是有没有阻塞,可能触发了NIO selector空转的BUG
                    // 判断select的次数是否大于SELECTOR_AUTO_REBUILD_THRESHOLD,如果满足了,代表可能触发了NIO的bug,那么调用rebuildSelector重建selector
                    rebuildSelector();
                    selector = this.selector;

                    // 再用新的selector获取selectionKey.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
                currentTimeNanos = time;
            }
            //....
            }
        } catch (CancelledKeyException e) {
        //....
        }
    }

主要的流程看一**释就好了,和NIO程序差不多,Netty在这之上,根据其他条件,控制了select阻塞时间,或者说控制是否阻塞,另外在触发了NIO的BUG的情况下,如何解决,这个主要是rebuildSelector

    private void rebuildSelector0() {
        final Selector oldSelector = selector;
        final SelectorTuple newSelectorTuple;
        //....
        try {
            newSelectorTuple = openSelector();//创建一个新的SelectorTuple
        } catch (Exception e) {
            return;
        }

        // 将对应的Channel注册到新的Selector上.
        int nChannels = 0;
        for (SelectionKey key: oldSelector.keys()) {
            Object a = key.attachment();
            try {
                //....
                int interestOps = key.interestOps();
                key.cancel();
                SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
                if (a instanceof AbstractNioChannel) {
                    // Update SelectionKey
                    ((AbstractNioChannel) a).selectionKey = newKey;
                }
                nChannels ++;
            } catch (Exception e) {
                //....
            }
        }

        selector = newSelectorTuple.selector;
        unwrappedSelector = newSelectorTuple.unwrappedSelector;

        try {
            // 关闭旧的selector
            oldSelector.close();
        } catch (Throwable t) {
            //....
        }
    }

然后看下一下如何处理SelectionKey

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

两个分支,根据是否设置了selectedKeys 来判断是否要使用Netty优化的策略进行处理selectionKey,两种方式的获取的SelectionKey集合不一样,在优化的策略中,使用的是SelectedSelectionKeySet类型,与原生NIO的不一样,这里先看下selectedKeys的初始化
在NioEventLoop初始化的时候,会调用openSelector创建一个Selector,这里判断了是否要使用优化的策略(只保留核心代码)

    private static final boolean DISABLE_KEYSET_OPTIMIZATION =
            SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
    private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
                // ....
            unwrappedSelector = provider.openSelector();
               // ....

        if (DISABLE_KEYSET_OPTIMIZATION) {
            return new SelectorTuple(unwrappedSelector);
        }

        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
        // ....
        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                        // ....
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
                            // ....
                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                        // ....
            }
        });

        selectedKeys = selectedKeySet;
        // ....
    }

首先创建一个selector,然后会判断DISABLE_KEYSET_OPTIMIZATION这个变量,这里意思是是否禁用SelectionKey的优化,那么如果false,则会先创建SelectedSelectionKeySet类型的变量,后面,使用反射,将这个类型的变量替换调原生Selector中的SelectionKey集合,其原生类型是一个Set,而SelectedSelectionKeySet也是Set的子类,所以这样转换没有问题,到最后,再其引用赋值给NioEventLoop的成员变量selectedKeys,那么此时selectedKeys与Selector中的Key集合都是指向同一个实例,也就是说,当Selector进行select操作获取到SelectionKey之后,selectedKeys就会有值。

Selector在进行select操作之后,会调用Set的add方法将selectionKey放入其中,Netty实现了自己的Set的原因在于原生是使用HashSet,其底层是HashMap,遍历的效率较低,而SelectedSelectionKeySet底层是一个数组,遍历相对较快,而我们在得到selectionKey集合的时候需要遍历集合进行处理

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
    SelectionKey[] keys;
    int size;

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }

    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }

        keys[size++] = o;
        if (size == keys.length) {
            increaseCapacity();
        }

        return true;
    }
  //....
}

可以看到内部是一个SelectionKey类型的数组

接下来看下processSelectedKeysOptimized的处理逻辑

    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {// 遍历集合
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);

                selectAgain();
                i = -1;
            }
        }
    }

在注册的时候,att参数类型是ServerSocketChannel,所以这里是AbstractNioChannel类型,调用processSelectedKey处理

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        // 一个Channel会对应一个Unsafe对象,负责底层的操作
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//....

        //....
      unsafe.read();
//....
    }

底层也是通过unsafe来处理请求,就例如,NIO程序中key.isReadable()判断为true之后的处理.

        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                      //readBuf为List,最多放了一个元素,为NioSocketChannel对象
                        int localRead = doReadMessages(readBuf);
                       //....
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {// 触发pipeline的read事件的传播
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();//触发read complete事件传播

                if (exception != null) {
                    closed = closeOnReadError(exception);
                    // 如果有异常的话,触发异常事件传播
                    pipeline.fireExceptionCaught(exception);
                }
            } finally {
                //....
            }
        }
    }

doReadMessages方法如下:

    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());
        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            //....
        }

        return 0;
    }

将原生的socketchannel转换成NioSocketChannel,处理类似NIO程序的代码

  ServerSocketChannel channel = (ServerSocketChannel) key.channel();
  SocketChannel socketChannel = channel.accept();

另外看下,触发的两个事件,一个是read事件,一个是readComplete事件,先看下read事件的传播。

回顾一下服务启动和pipeline文章相关的分析,这个时候pipeline只有3个Hander(Context),如下:
HeadContext -> ServerBootstrapAcceptor -> TailContext
HeadContext没有对read事件进行处理,直接传播到下一个Handler,即ServerBootstrapAcceptor,看下其channelRead方法:

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
            // 为当前Channel的pipeline添加自定义Handler
            child.pipeline().addLast(childHandler);
            // 设置属性
            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
                // 将Channel注册到workergroup的EventLoop上
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

ServerBootstrapAcceptor做的事情很简单:
1. 为pipeline添加程序中自定义的childHandler(ChannelInitializer)
2. 设置一下Channel属性
3. 将Channel注册到workerGroup上

ServerBootstrapAcceptor主要是衔接boss线程组和worker线程组。

另外要记住的第一点,是我们一开始调用childHandler加入的Handler(ChannelInitializer)

.childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch)
                            throws Exception {
                        ch.pipeline()
                            .addLast(new StringDecoder())
                            .addLast(new ServerHandler())
                            ;
                    }
                });

在服务启动文章中分析过

其实ChannelInitializer实现了Channel注册后的为每一个Channel添加ChannelHandler的功能,但是其本质也是也是一个ChannelHandler

在注册前,会调用注册前添加的Handler,然后会调用到Handler的handlerAdded方法,其中会调用上面的initChannel,这样就给当前Channel添加了自定义的Handler。
流程是和之前分析的一样的,细节不同点如下:
1. 这时候的Channel是NioSocketChannel,服务启动的时候是NioServerSocketChanel
2. 服务启动的时候,通过ChannelInitializer来为pipeline添加ServerBootstrapAcceptor这个Handler,而此时通过ChannelInitializer为pipeline添加业务上自定义的Handler
3. 两者添加的Handler功能不一样,一个是衔接boss和worker线程组,是系统内置功能;一个是处理业务请求,编码者自定义

worker线程的channel注册过程是一样的,最后也会执行NioEventLoop的run方法,处理SelectionKey,同样执行到unsafe的read方法,但是触发pipeline的read事件的时候,pipeline中的Handler不一样,此时的pipeline是业务上自定义的Handler,如下

    private static class ServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            String body = (String)msg;
            System.out.println("receive body:"+body + ",currentHandler:"+this);
            ctx.write(msg);
            ctx.flush();
        }
    }

接下来看下readComplete事件,会依次触发pipeline上的Handler的channelReadComplete方法,上面说过此时pipeline的Handler如下
HeadContext -> ServerBootstrapAcceptor -> TailContext
只有HeadContext有逻辑,另外都是直接往后传播或者空方法,看下HeadContext的逻辑

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelReadComplete();// 往后传播,后面的Handler没处理,直接无视
            readIfIsAutoRead();
        }

        private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {// autoRead是否打开
                channel.read();// 这个最后会调用unsafe的beginRead方法
            }
        }

之前讲过,Channel初始化的时候会设置关心的事件,但是没有注册上去,在unsafe的beginRead方法里才是注册事件的地方,不过在注册的时候isActive为true,触发了channelActive方法,提前注册了事件

        private void register0(ChannelPromise promise) {
            try {
                // ....
                // 间接触发了unsafe的begin的beginRead方法
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                //....
            }
        }

之前说过NioServerSocketChannel和NioSocketChannel的unsafe对象不一样,NioServerSocketChannel对应的是NioMessageUnsafe,NioSocketChannel对应的是NioSocketChannelUnsafe,所以worker线程组的NioEventLoop最后处理的是NioSocketChannelUnsafe的read方法(实际是其父类的read方法)

        @Override
        public final void read() {
            //....
            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);// 分配池化内存
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));//从Channel中读取数据到buffer
                    //....
                    pipeline.fireChannelRead(byteBuf);// 触发事件传播
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();
                //....
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);//异常处理
            } finally {
                // ....
            }
        }
    }

逻辑和NioMessageUnsafe的read方法类似,触发的事件一样,不同的是pipeline中handler不同,这里的fireChannelRead会触发我们自定义的channelRead方法,且传递的对象不一样,这个时候是实际的业务数据。

到这里,整个处理请求的过程就完毕了,稍微总结一下:
1. Netty在selector.select之上,判断了任务,唤醒状态位等一些条件,以此决定select是否该阻塞和阻塞的时间
2. 在触发了NIO的BUG的时候,重建selector,主要是将旧的selector对应的channel注册到新的selector上,取消旧的selector和Key
3. NioEventLoop会处理io任务和非io任务,其时间占比由ioRatio决定
4. Netty通过反射将NIO原生的SelectionKey集合替换成自定义的集合,主要是数组实现
5. boss线程组主要负责连接请求,然后派发给worker线程组进行处理
6. ServerBootstrapAcceptor主要负责boss和worker的衔接,即将channel注册到woker中的NioEventLoop上
7. worker线程组负责处理数据的读取,通过pipeline将数据传递到自定义的Handler上
8. boss和worker线程组的NioEventLoop在注册前都会通过一个叫ChannelInitializer的Handler,为Channel添加pipeline
9. boss线程组的ChannelInitializer负责添加ServerBootstrapAcceptor这个Handler到pipeline上,而worker线程组的ChannelInitializer负责添加自定义的Handler到pipeline上

相关标签: Netty 源码分析