Netty源码分析之核心线程处理
程序员文章站
2023-12-30 18:18:04
...
核心线程是NioEventLoop,在第一次往任务队列中添加任务时开始启动线程
abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
SingleThreadEventExecutor.this.run();
success = true;
}
});
任务添加完成后会设置唤醒状态为true,这个时候是主线程,所以inEventLoop为false,唤醒选择器的等待。
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
protected boolean wakesUpForTask(Runnable task) {
return !(task instanceof NonWakeupRunnable);
}
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
具体的run方法实现为
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
判断任务队列中是否由需要执行的任务
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty();
}
计算对应的返回值
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
如果有任务的话先检查是否有需要处理的网络请求,有的话返回大于0的数字
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
int selectNow() throws IOException {
try {
return selector.selectNow();
} finally {
// restore wakeup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}
判断选择的返回值数值,当为SelectStrategy.CONTINUE时需要重新检查一次,当没有任务时会返回SelectStrategy.SELECT,获取选择器检查请求的超时时间,根据延迟任务来确认时间的具体数值,再次判断是否有任务和唤醒的状态,阻塞超时等待,继续判断返回的key值是否有事件发生,判断之前是否已经有唤醒,判断当前的唤醒状态,判断是否有任务或者定时任务
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
判断nio是否发生了空循环,也就是选择器多次没有等待足够的时间就直接返回结果,默认次数为512,然后需要重建选择器
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector0();
}
});
return;
}
rebuildSelector0();
}
把旧的选择器的key,关注点以及附件等信息重新注册到新的选择器上,最后关闭旧的选择器。
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
选择器检查等待结束后开始执行任务和处理请求事件,ioRatio用来控制网络io操作占用的时间比例,先处理网络请求
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
处理完后分配对应的时间来处理任务
protected boolean runAllTasks() {
boolean fetchedAll;
do {
fetchedAll = fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
return false;
}
for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}
task = pollTask();
if (task == null) {
break;
}
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
lastExecutionTime = ScheduledFutureTask.nanoTime();
return true;
}
取出第一个任务开始执行,判断是否需要立刻再次执行选择器检查网络请求,这里的任务就包含之前的处理器上下文添加操作,注册网络通道,**通道活跃事件等
protected Runnable pollTask() {
Runnable task = super.pollTask();
if (needsToSelectAgain) {
selectAgain();
}
return task;
}
protected Runnable pollTask() {
assert inEventLoop();
for (;;) {
Runnable task = taskQueue.poll();
if (task == WAKEUP_TASK) {
continue;
}
return task;
}
}
推荐阅读
-
Netty源码分析之核心线程处理
-
Netty源码分析之ChannelPipeline(二)—ChannelHandler的添加与删除
-
Netty源码分析 (十)----- 拆包器之LineBasedFrameDecoder
-
netty之NioEventLoopGroup源码分析二
-
jQuery源码分析之构造jQuery对象-源码结构和核心函数
-
Android消息机制之线程间存储ThreadLocal源码分析
-
Netty源码分析之Server bindAsync
-
啃下MyBatis源码 - MyBatis核心流程三大阶段之代理阶段(binding模块分析)
-
JAVA核心知识之ConcurrentHashMap源码分析
-
Java并发之线程池ThreadPoolExecutor源码分析学习