欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty源码分析之核心线程处理

程序员文章站 2023-12-30 18:18:04
...

核心线程是NioEventLoop,在第一次往任务队列中添加任务时开始启动线程

abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor
thread = threadFactory.newThread(new Runnable() {
    @Override
    public void run() {
        boolean success = false;
        updateLastExecutionTime();
        SingleThreadEventExecutor.this.run();
        success = true;
    }
});

任务添加完成后会设置唤醒状态为true,这个时候是主线程,所以inEventLoop为false,唤醒选择器的等待。


if (!addTaskWakesUp && wakesUpForTask(task)) {
    wakeup(inEventLoop);
}

protected boolean wakesUpForTask(Runnable task) {
    return !(task instanceof NonWakeupRunnable);
}

protected void wakeup(boolean inEventLoop) {
    if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
        selector.wakeup();
    }
}

具体的run方法实现为


protected void run() {
  for (;;) {
      try {
          switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
              case SelectStrategy.CONTINUE:
                  continue;
              case SelectStrategy.SELECT:
                  select(wakenUp.getAndSet(false));

                  // 'wakenUp.compareAndSet(false, true)' is always evaluated
                  // before calling 'selector.wakeup()' to reduce the wake-up
                  // overhead. (Selector.wakeup() is an expensive operation.)
                  //
                  // However, there is a race condition in this approach.
                  // The race condition is triggered when 'wakenUp' is set to
                  // true too early.
                  //
                  // 'wakenUp' is set to true too early if:
                  // 1) Selector is waken up between 'wakenUp.set(false)' and
                  //    'selector.select(...)'. (BAD)
                  // 2) Selector is waken up between 'selector.select(...)' and
                  //    'if (wakenUp.get()) { ... }'. (OK)
                  //
                  // In the first case, 'wakenUp' is set to true and the
                  // following 'selector.select(...)' will wake up immediately.
                  // Until 'wakenUp' is set to false again in the next round,
                  // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                  // any attempt to wake up the Selector will fail, too, causing
                  // the following 'selector.select(...)' call to block
                  // unnecessarily.
                  //
                  // To fix this problem, we wake up the selector again if wakenUp
                  // is true immediately after selector.select(...).
                  // It is inefficient in that it wakes up the selector for both
                  // the first case (BAD - wake-up required) and the second case
                  // (OK - no wake-up required).

                  if (wakenUp.get()) {
                      selector.wakeup();
                  }
                  // fall through
              default:
          }
      } catch (Throwable t) {
          handleLoopException(t);
      }
  }
}

判断任务队列中是否由需要执行的任务

protected boolean hasTasks() {
    assert inEventLoop();
    return !taskQueue.isEmpty();
}

计算对应的返回值


public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

如果有任务的话先检查是否有需要处理的网络请求,有的话返回大于0的数字

private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        return selectNow();
    }
};

int selectNow() throws IOException {
    try {
        return selector.selectNow();
    } finally {
        // restore wakeup state if needed
        if (wakenUp.get()) {
            selector.wakeup();
        }
    }
}

判断选择的返回值数值,当为SelectStrategy.CONTINUE时需要重新检查一次,当没有任务时会返回SelectStrategy.SELECT,获取选择器检查请求的超时时间,根据延迟任务来确认时间的具体数值,再次判断是否有任务和唤醒的状态,阻塞超时等待,继续判断返回的key值是否有事件发生,判断之前是否已经有唤醒,判断当前的唤醒状态,判断是否有任务或者定时任务

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
            // Selector#wakeup. So we need to check task queue again before executing select operation.
            // If we don't, the task might be pended until select operation was timed out.
            // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;

            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                // - Selected something,
                // - waken up by user, or
                // - the task queue has a pending task.
                // - a scheduled task is ready for processing
                break;
            }
            if (Thread.interrupted()) {
                // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                // As this is most likely a bug in the handler of the user or it's client library we will
                // also log it.
                //
                // See https://github.com/netty/netty/issues/2426
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +
                            "Thread.currentThread().interrupt() was called. Use " +
                            "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // timeoutMillis elapsed without anything selected.
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // The selector returned prematurely many times in a row.
                // Rebuild the selector to work around the problem.
                logger.warn(
                        "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                        selectCnt, selector);

                rebuildSelector();
                selector = this.selector;

                // Select again to populate selectedKeys.
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            currentTimeNanos = time;
        }

        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                        selectCnt - 1, selector);
            }
        }
    } catch (CancelledKeyException e) {
        if (logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                    selector, e);
        }
        // Harmless exception - log anyway
    }
}

判断nio是否发生了空循环,也就是选择器多次没有等待足够的时间就直接返回结果,默认次数为512,然后需要重建选择器


public void rebuildSelector() {
    if (!inEventLoop()) {
        execute(new Runnable() {
            @Override
            public void run() {
                rebuildSelector0();
            }
        });
        return;
    }
    rebuildSelector0();
}

把旧的选择器的key,关注点以及附件等信息重新注册到新的选择器上,最后关闭旧的选择器。

private void rebuildSelector0() {
    final Selector oldSelector = selector;
    final SelectorTuple newSelectorTuple;

    if (oldSelector == null) {
        return;
    }

    try {
        newSelectorTuple = openSelector();
    } catch (Exception e) {
        logger.warn("Failed to create a new Selector.", e);
        return;
    }

    // Register all channels to the new Selector.
    int nChannels = 0;
    for (SelectionKey key: oldSelector.keys()) {
        Object a = key.attachment();
        try {
            if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                continue;
            }

            int interestOps = key.interestOps();
            key.cancel();
            SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
            if (a instanceof AbstractNioChannel) {
                // Update SelectionKey
                ((AbstractNioChannel) a).selectionKey = newKey;
            }
            nChannels ++;
        } catch (Exception e) {
            logger.warn("Failed to re-register a Channel to the new Selector.", e);
            if (a instanceof AbstractNioChannel) {
                AbstractNioChannel ch = (AbstractNioChannel) a;
                ch.unsafe().close(ch.unsafe().voidPromise());
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, key, e);
            }
        }
    }

    selector = newSelectorTuple.selector;
    unwrappedSelector = newSelectorTuple.unwrappedSelector;

    try {
        // time to close the old selector as everything else is registered to the new one
        oldSelector.close();
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("Failed to close the old Selector.", t);
        }
    }

    logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}

 

选择器检查等待结束后开始执行任务和处理请求事件,ioRatio用来控制网络io操作占用的时间比例,先处理网络请求


cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
    try {
        processSelectedKeys();
    } finally {
        // Ensure we always run tasks.
        runAllTasks();
    }
} else {
    final long ioStartTime = System.nanoTime();
    try {
        processSelectedKeys();
    } finally {
        // Ensure we always run tasks.
        final long ioTime = System.nanoTime() - ioStartTime;
        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    }
}

处理完后分配对应的时间来处理任务


protected boolean runAllTasks() {
    boolean fetchedAll;
    do {
        fetchedAll = fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            return false;
        }

        for (;;) {
            try {
                task.run();
            } catch (Throwable t) {
                logger.warn("A task raised an exception.", t);
            }

            task = pollTask();
            if (task == null) {
                break;
            }
        }
    } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

    lastExecutionTime = ScheduledFutureTask.nanoTime();
    return true;
}

取出第一个任务开始执行,判断是否需要立刻再次执行选择器检查网络请求,这里的任务就包含之前的处理器上下文添加操作,注册网络通道,**通道活跃事件等


protected Runnable pollTask() {
    Runnable task = super.pollTask();
    if (needsToSelectAgain) {
        selectAgain();
    }
    return task;
}

protected Runnable pollTask() {
    assert inEventLoop();
    for (;;) {
        Runnable task = taskQueue.poll();
        if (task == WAKEUP_TASK) {
            continue;
        }
        return task;
    }
}

 

上一篇:

下一篇: