欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty架构 - EventLoop、EventLoopGroup

程序员文章站 2022-03-23 19:03:19
本文叙述NioEventLoop以及NioEventLoopGroup的主分支工作原理。...

前叙

Reactor单线程模型的应用:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup);

Reactor多线程模型的应用:

EventLoopGroup bossGroup = new NioEventLoopGroup(128);
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup);

主从Reactor多线程模型的应用:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workerGroup);

NioEventLoopGroup的实例化

NioEventLoopGroup的父类 - MultithreadEventExecutorGroup有一个EventExecutor[] children属性。而这个属性继承ScheduledExecutorService,分析出它是一个线程池组。每个线程池的实例化,其实交给了EventLoopGroup#newChild方法实现,也就是返回了一个NioEventLoop实例。

NioEventLoopGroup的构造器实际上是调用了它的父类 - MultithreadEventExecutorGroup的构造器。如果构造器没有指定nThreads,会使用DEFAULT_EVENT_LOOP_THREADS,如下:

Netty架构 - EventLoop、EventLoopGroup

也就是说,默认的线程数是cpu核心 * 2。

NioEventLoop的实例化

通常来说,NioEventLoop负责执行两个任务:第一个任务是作为I/O线程,执行与Channel相关的I/O操作,包括调用Selector等待就绪的I/O事件、读写数据与数据处理等;第二个任务是执行任务队列中的任务,比如用户调用 EventLoop#schedule方法提交的定时任务。

Netty中,每个Channel有且仅有一个EventLoop与之关联。

NioEventLoop的构造器

首先看下它的构造器:
Netty架构 - EventLoop、EventLoopGroup

provider: NioEventLoopGroup构造器中,调用SelectorProvider#provider()方法来获取SelectorProvider对象。

Netty架构 - EventLoop、EventLoopGroup

SelectorTuple:封装了Selector unwrappedSelectorSelector selector。它的实例是由openSelector()方法赋予。

openSelector()方法如下:(有省略)

Netty架构 - EventLoop、EventLoopGroup
Netty架构 - EventLoop、EventLoopGroup
Netty架构 - EventLoop、EventLoopGroup

此外,NioEventLoop的构造器也有调用父类 - SingleThreadEventLoop的一个构造器,如下:
Netty架构 - EventLoop、EventLoopGroup

Netty架构 - EventLoop、EventLoopGroup

它的executor属性是由ThreadExecutorMap.apply(...)方法赋予。实际上就是在EventExecutor的帮助下,实例化一个Executor对象。

然后一直调用父类的构造器,直到AbstractEventExecutor的构造器,如下:
Netty架构 - EventLoop、EventLoopGroup

实际上,就是对它的EventExecutorGroup parent属性赋予一个NioEventLoopGroup

NioEventLoop的run()方法

在分析NioEventLoop#run方法之前,需要考虑下run在哪里有被调用。

NioEventLoop的父类 - SingleThreadEventExecutor,有一个private volatile Thread thread属性,使用如下方法进行设置:

Netty架构 - EventLoop、EventLoopGroup

thread设置为当前线程。然后会调用SingleThreadEventExecutor.this.run()方法,实际上是调用NioEventLoop#run()方法。

当然了,这个doStartThread方法,是由ServerBootstrap#bind(...),即绑定客户端开始,经过一层层方法,最后才调用doStartThread方法。

接下来的重头戏是==run()==方法:

(我这里将它分为了两部分)

第一部分:
Netty架构 - EventLoop、EventLoopGroup

首先看下第一部分的处理逻辑:

Netty架构 - EventLoop、EventLoopGroup
Netty架构 - EventLoop、EventLoopGroup

有任务,则调用selectNow()并返回;否则返回SelectStrategy.SELECT,也就是-1。

先来看hasTasks()方法,如下:

Netty架构 - EventLoop、EventLoopGroup

对父类的hasTasks()方法的判断,或者对Queue<Runnable> tailTasks这个队列不为空的判断。

接下来看下它的父类的hasTasks()方法,如下:
Netty架构 - EventLoop、EventLoopGroup

同样是对Queue<Runnable> taskQueue不为空的判断。

根据calculateStrategy(…)方法的判断,有如下几种结果:
Netty架构 - EventLoop、EventLoopGroup

也就是说没有任务的时候(任务队列为空),就会执行SelectStrategy.SELECT的处理逻辑。

wakeup:AtomicBoolean类型,用来控制是否在select阻塞的过程中可以被唤醒。

下面来看第二部分的处理逻辑:

Netty架构 - EventLoop、EventLoopGroup

执行 processSelectedKeys() 方法,根据io比率是否是100决定采用 runAllTasks() 还是 runAllTasks(…) 方法。

select(…)

主要用来设置下次唤醒时间、根据条件调用nio原生的selectNow()或者select(timeout)

private void select(boolean oldWakenUp) throws IOException {
    /* nio原生Selector */
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        /* 当前时间 + 延迟时间 */
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

        /* 当前时间 + 延迟时间 - 开始时间 */
        long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
        if (nextWakeupTime != normalizedDeadlineNanos) {
            /* 重置下次唤醒时间 */
            nextWakeupTime = normalizedDeadlineNanos;
        }

        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                	/* 非阻塞的select方式 */
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            /* 任务队列里有任务 并且 wakenUp成功设置为true */
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
            	/* 非阻塞的select方式 */
                selector.selectNow();
                selectCnt = 1;
                break;
            }

			/* 阻塞、超时等待的select方式 */
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;

            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() 
                || hasScheduledTasks()) {
                // - Selected something,
                // - waken up by user, or
                // - the task queue has a pending task.
                // - a scheduled task is ready for processing
                break;
            }

			/* 如果线程中断,结束select过程 */
            if (Thread.interrupted()) {           
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                selectCnt = 1;
            /* 从"io.netty.selectorAutoRebuildThreshold"属性中获取重建Selector的阈值,默认是512 */  
             /* 为了避免无谓的空轮询,轮询的次数达到阈值之后,需要重建Selector */      
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                /* 创建一个新的Selector,将旧的Selector上注册的Channel迁移到新的Selector上 */
                selector = selectRebuildSelector(selectCnt);
                selectCnt = 1;
                break;
            }
            currentTimeNanos = time;
        }

    } catch (CancelledKeyException e) {
        。。。。。。
    }
}
rebuildSelector0()

从"io.netty.selectorAutoRebuildThreshold"属性中获取重建Selector的阈值,默认是512。

为了避免无谓的空轮询,轮询的次数达到阈值之后,需要重建Selector。

private void rebuildSelector0() {
    final Selector oldSelector = selector;
    final SelectorTuple newSelectorTuple;
    if (oldSelector == null) {
        return;
    }
    try {
        newSelectorTuple = openSelector();
    } catch (Exception e) {
        logger.warn("Failed to create a new Selector.", e);
        return;
    }
    // Register all channels to the new Selector.
    int nChannels = 0;
    for (SelectionKey key: oldSelector.keys()) {
        Object a = key.attachment();
        try {
            if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                continue;
            }
            int interestOps = key.interestOps();
            key.cancel();
            /* 在新的Selector上,重新注册原有Selector上已注册的所有Channel */
            SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
            if (a instanceof AbstractNioChannel) {
                // Update SelectionKey
                ((AbstractNioChannel) a).selectionKey = newKey;
            }
            nChannels ++;
        } catch (Exception e) {
            logger.warn("Failed to re-register a Channel to the new Selector.", e);
            if (a instanceof AbstractNioChannel) {
                AbstractNioChannel ch = (AbstractNioChannel) a;
                ch.unsafe().close(ch.unsafe().voidPromise());
            } else {
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, key, e);
            }
        }
    }
    selector = newSelectorTuple.selector;
    unwrappedSelector = newSelectorTuple.unwrappedSelector;
    try {
        oldSelector.close();
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("Failed to close the old Selector.", t);
        }
    }
    if (logger.isInfoEnabled()) {
        logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
    }
}

processSelectedKeys()

Netty架构 - EventLoop、EventLoopGroup

selectedKeys: SelectedSelectionKeySet类型。其封装了如下内容:
Netty架构 - EventLoop、EventLoopGroup

processSelectedKeysOptimized()

Netty架构 - EventLoop、EventLoopGroup

selectedKeys.key[i] = null:数组元素的帮助gc操作。

processSelectedKey(...):实际的处理操作,下面有分析。

reset(...): 对seletedKeys数组从指定索引开始,重置其元素的值为null。

processSelectedKey

尝试channelReady(…)操作,并且根据状态值的不同,而执行不同的后续操作。
Netty架构 - EventLoop、EventLoopGroup

channelReady(SelectableChannel ch, SelectionKey key): 当SelectableChannel被Selector选出的时候,调用该方法。

invokeChannelUnregistered(...): 实际上就是channelUnregistered(…)方法,当指定的SelectableChannel的SeletionKey被取消时,调用该方法。

processSelectedKeysPlain(…)

Netty架构 - EventLoop、EventLoopGroup

processSelectedKey(...):上面有讲到,尝试channelReady(…)操作,并且根据状态值的不同,而执行不同的后续操作。

selectAgain():对needsToSelectAgain属性值设置为false,然后调用selector.selectNow()方法。

runAllTasks()

从任务队列中拉取所有的任务,然后运行它们。
Netty架构 - EventLoop、EventLoopGroup

assert inEventLoop()是判断SingleThreadEventExecutor#Thread thread属性是否是当前线程。

fetchFromScheduledTaskQueue()方法如下:
Netty架构 - EventLoop、EventLoopGroup

单纯的判断true或者false。返回true的情况是调度任务队列为空、从调度任务队列拉取的调度任务为空。返回false的情况是调度任务添加任务队列失败,将其重新添加到调度任务队列中。

这里的nanoTime实际上是System.nanoTime() - START_TIME

通过pollScheduledTask(...)方法拉取一个调度任务。
Netty架构 - EventLoop、EventLoopGroup

从调度任务队列中获取一个调度任务,如果调度任务不为空并且执行时间有效,就返回它。

runAllTasksFrom(...):运行所有从任务队列中拉取的任务。
Netty架构 - EventLoop、EventLoopGroup

除了返回true或者false之外,对于返回true的情况,会运行所有的拉取的任务。

pollTaskFrom(...):从任务队列拉取任务,要求不是WAKEUP_TASK(run方法体为空),返回这个任务。

safeExecute(...):调度任务的run()方法,执行任务。

对于runAllTasksFrom(...)方法返回true的情况下,会将ranAtLeastOne标记为true,然后执行lastExecutionTime = ScheduledFutureTask.nanoTime()

afterRunningAllTasks():在运行完所有的任务之后调度。在SingleThreadEventExecutor里是一个未实现的protected方法,留给子类实现。

isShuttingDown()

判断状态是否是ST_SHUTTING_DOWN、ST_SHUTDOWN、ST_TERMINATED之一。
Netty架构 - EventLoop、EventLoopGroup

可选状态如下:
Netty架构 - EventLoop、EventLoopGroup

closeAll()

关闭所有的通道,取消SelectionKey,调用channelUnregistered(…)回调方法。
Netty架构 - EventLoop、EventLoopGroup

selectAgain():对needsToSelectAgain设置为false。然后调用Selector#selectNow()方法。

invokeChannelUnregistered(...):实际上是调用channelUnregistered(…)方法。也就是当SelectionKey被取消的时候,调用该方法。

confirmShutdown()

是否确认关闭。(任务队列会添加WAKEUP_TASK)
Netty架构 - EventLoop、EventLoopGroup

判断true或者false。如果是true,有可能是runAllTasks() 或者 runShutdownHooks()返回true的情况下,gracefulShutdownQuietPeriod等于0。或者是状态是ST_SHUTDOWN、ST_TERMINATED之一,或者优雅关机等待的时间超时。在run()方法,如果是true,则直接返回并跳出循环;否则一直进行循环。

isShuttingdown():判断状态是否是ST_SHUTTING_DOWN、ST_SHUTDOWN、ST_TERMINATED之一。

runAllTasks():前面有分析过,运行从任务队列拉取的任务。

isShutdown():判断状态是否是ST_SHUTDOWN、ST_TERMINATED之一。

WAKEUP_TASK:前面有说过,run方法体什么都没有实现。

cancelScheduledTasks()

取消所有的调度任务。
Netty架构 - EventLoop、EventLoopGroup

从调度任务队列中取出调度任务,进行取消。
cancelWithoutRemove(…)

Netty架构 - EventLoop、EventLoopGroup

mayInterruptIfRunning:这里传入的是false,也就是运行时不会被中断。

clearTaskAfterCompletion:如果第一个参数是true,会将PromiseTask#Object task属性设置为CANCELLED。

CANCELLED:它是一个Runnable CANCELLED = new SentinelRunnable("CANCELLED")。toString()方法返回cancelled,而run()方法什么都没有实现。

cancel(…)

Netty架构 - EventLoop、EventLoopGroup

RESULT_UPDATER:AtomicReferenceFieldUpdater类型,这里负责对result属性原子更新。

CANCELLATION_CAUSE_HOLDER:CauseHolder类型,持有Throwable参数。

checkNotifyWaiters():对于waiters大于0,调用notifyAll(),进行唤醒。再根据listeners是否不为空进行返回true或者false。

notifyListeners

Netty架构 - EventLoop、EventLoopGroup

MAX_LISTENER_STACK_DEPTH:"io.netty.defaultPromise.maxListenerStackDepth"属性与8取最小值。

notifyListenersNow():调用GenericFutureListener接口的operationComplete()方法。

runShutdownHooks()

运行所有的shutdownHooks。更新最近执行的时间。
Netty架构 - EventLoop、EventLoopGroup

shutdownHooks:LinkedHashSet类型,里面是Runnable。

本文地址:https://blog.csdn.net/qq_34561892/article/details/107214924