欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty源码分析-NioEventLoop(二)

程序员文章站 2022-04-23 11:49:48
...

NioEventLoop源码分析。

EventLoop的本质:内部一个线程,一个有序队列存储,线程源源不断的运行队列中的任务。

 

register方法把java-nio的channel注册到selector上面。

    //把JAVA底层Channel注册到selector上
    public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
        //把ch注册到java底层selector上面,代码省略了一些安全检查
        try {
            ch.register(selector, interestOps, task);
        } catch (Exception e) {
            throw new EventLoopException("failed to register a channel", e);
        }
    }

 

run方法是核心的启动方法,在父类中,创建好线程启动以后会调用run方法,在这里会处理selector的selelct事件和所有的IO事件

 @Override
    protected void run() {
        //这个方法是父类定义的抽象方法,在父类线程创建启动后调用此方法。
        for (;;) {
            try {
                        //如果底层队列存储在任务hasTasks会返回true,那么调用selelctNow返回key的数量,可能为0
                        //如果没有则返回SelectStrategy.SELECT
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:

                        //查询selector,如果没有任务则会在selector上阻塞一会,否则立即返回
                        select(wakenUp.getAndSet(false));

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                //IO时间的比例
                final int ioRatio = this.ioRatio;
                //如果是百分百
                if (ioRatio == 100) {
                    try {
                        //先处理所有key事件
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        //然后处理所有IO任务
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        //先处理所有key事件
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        //执行一段时间的IO任务
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

 

processSelectedKey,处理selector的selelct到的key。

 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) { //key无效
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            
            //如果ch当中的el不是当前对象则return
            if (eventLoop != this || eventLoop == null) {
                return;
            }

            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            //获取key的事件类型
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            //如果是连接事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                //完成连接
                unsafe.finishConnect();
            }

            //如果是write事件,则调用底层代码写入逻辑
            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            //如果是读事件,交给底层逻辑处理
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }