Netty源码分析-NioEventLoop
程序员文章站
2022-04-23 11:49:30
...
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.nio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopException;
import io.netty.channel.EventLoopTaskQueueFactory;
import io.netty.channel.SelectStrategy;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.IntSupplier;
import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ReflectionUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import java.nio.channels.spi.SelectorProvider;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
/**
* {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
* {@link Selector} and so does the multi-plexing of these in the event loop.
*
*/
public final class NioEventLoop extends SingleThreadEventLoop {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
//是否禁止优化JDK原生的selector
private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
//默认值512
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
//调用JDK的selector.selectNow()方法,返回IO事件的数量
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
// Workaround for JDK NIO bug.
// 修复JDK空轮询的BUG
// See:
// - http://bugs.sun.com/view_bug.do?bug_id=6427854
// - https://github.com/netty/netty/issues/203
static {
final String key = "sun.nio.ch.bugLevel";
final String bugLevel = SystemPropertyUtil.get(key);
if (bugLevel == null) {
try {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
System.setProperty(key, "");
return null;
}
});
} catch (final SecurityException e) {
logger.debug("Unable to get/set System Property: " + key, e);
}
}
//默认值512
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
//默认值512
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
}
}
/**
* The NIO {@link Selector}.
*/
//优化后的事件选择器
private Selector selector;
//JDK-NIO的事件选择器
private Selector unwrappedSelector;
//优化了Set接口,内部使用数组
private SelectedSelectionKeySet selectedKeys;
//NIO的selector创建器
private final SelectorProvider provider;
private static final long AWAKE = -1L;
private static final long NONE = Long.MAX_VALUE;
// nextWakeupNanos is:
// AWAKE when EL is awake
// NONE when EL is waiting with no wakeup scheduled
// other value T when EL is waiting with wakeup scheduled at time T
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
//默认实现类DefaultSelectStrategy
private final SelectStrategy selectStrategy;
//IO使用比率
private volatile int ioRatio = 50;
private int cancelledKeys;
private boolean needsToSelectAgain;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
//优化后的对象
this.selector = selectorTuple.selector;
//JDK原生对象
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
private static Queue<Runnable> newTaskQueue(
EventLoopTaskQueueFactory queueFactory) {
if (queueFactory == null) {
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
}
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
private static final class SelectorTuple {
final Selector unwrappedSelector;
final Selector selector;
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
SelectorTuple(Selector unwrappedSelector, Selector selector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = selector;
}
}
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
//JDK原生Selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
//禁止优化JDK原生Selector
if (DISABLE_KEY_SET_OPTIMIZATION) {
//封装为SelectorTuple对象,内部原生JDK的Selector
return new SelectorTuple(unwrappedSelector);
}
//反射获取JDK原生Selector的实现类sun.nio.ch.SelectorImpl
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
//不是Class类型,说明上面方法出现异常
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
//不是unwrappedSelector的类型
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
//如果出现异常则不进行优化
return new SelectorTuple(unwrappedSelector);
}
//转换为Class类型
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
//优化了Set接口,内部使用数组
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
//反射获取selectedKeys成员变量,原来默认使用HashSet<SelectionKey>
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
//反射获取publicSelectedKeys成员变量,原来默认使用HashSet<SelectionKey>
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
//根据JDK版本,使用不用API替换俩个成员变量
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
//把原来的HashSet<SelectionKey>替换为数组实现的SelectedSelectionKeySet
//数组的实现比HashSet效率高一些
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
//把原来的HashSet<SelectionKey>替换为数组实现的SelectedSelectionKeySet
//数组的实现比HashSet效率高一些
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
//如果maybeException是异常则不进行优化
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
//进行优化
//unwrappedSelector把内部的HasSet变成了数组的实现
//SelectedSelectionKeySetSelector代理了JDK原生的NIO-Selector
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
/**
* Returns the {@link SelectorProvider} used by this {@link NioEventLoop} to obtain the {@link Selector}.
*/
public SelectorProvider selectorProvider() {
return provider;
}
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return newTaskQueue0(maxPendingTasks);
}
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
/**
* Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
* of this event loop. Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
* be executed by this event loop when the {@link SelectableChannel} is ready.
*/
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
ObjectUtil.checkNotNull(ch, "ch");
if (interestOps == 0) {
throw new IllegalArgumentException("interestOps must be non-zero.");
}
//检查感兴趣的事件是否是channel能够注册的。
if ((interestOps & ~ch.validOps()) != 0) {
throw new IllegalArgumentException(
"invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
}
ObjectUtil.checkNotNull(task, "task");
//如果已经关闭
if (isShutdown()) {
throw new IllegalStateException("event loop shut down");
}
//把Channel注册到selector上面,必须是EventLoop线程
if (inEventLoop()) {
register0(ch, interestOps, task);
} else {
try {
// Offload to the EventLoop as otherwise java.nio.channels.spi.AbstractSelectableChannel.register
// may block for a long time while trying to obtain an internal lock that may be hold while selecting.
submit(new Runnable() {
@Override
public void run() {
register0(ch, interestOps, task);
}
}).sync();
} catch (InterruptedException ignore) {
// Even if interrupted we did schedule it so just mark the Thread as interrupted.
Thread.currentThread().interrupt();
}
}
}
private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
try {
//把JDK的channel注册到selector上面
ch.register(unwrappedSelector, interestOps, task);
} catch (Exception e) {
throw new EventLoopException("failed to register a channel", e);
}
}
/**
* Returns the percentage of the desired amount of time spent for I/O in the event loop.
*/
public int getIoRatio() {
return ioRatio;
}
/**
* Sets the percentage of the desired amount of time spent for I/O in the event loop. Value range from 1-100.
* The default value is {@code 50}, which means the event loop will try to spend the same amount of time for I/O
* as for non-I/O tasks. The lower the number the more time can be spent on non-I/O tasks. If value set to
* {@code 100}, this feature will be disabled and event loop will not attempt to balance I/O and non-I/O tasks.
*/
public void setIoRatio(int ioRatio) {
if (ioRatio <= 0 || ioRatio > 100) {
throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
}
this.ioRatio = ioRatio;
}
/**
* Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
* around the infamous epoll 100% CPU bug.
*/
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector0();
}
});
return;
}
rebuildSelector0();
}
@Override
public int registeredChannels() {
return selector.keys().size() - cancelledKeys;
}
//解决JDK空轮训的bug
//当bug发生后,把selector上的channel全部转移到新的channel上面
private void rebuildSelector0() {
//优化后的selector,所有的channel都注册到它上面
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
//重新创建Seletor
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
//循环所有旧selector上面的key,每一个key代表一个channel
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
//key无效 或者 key所属的channel已经绑定在新的selector上都不需要处理
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
//把key取消掉
int interestOps = key.interestOps();
key.cancel();
//重新注册channel,把channel注册到新的selector上
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
//更新对象的成员变量
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
//更新成员变量
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
//关闭旧的selector
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
if (logger.isInfoEnabled()) {
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
}
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
// hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
// selectSupplier.get() = selector.selectNow();
// 如果存在任务已不阻塞的方式查询IO事件的数量,否则返回SelectStrategy.SELECT
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
//如果返回了SelectStrategy.SELECT说明队列里没有任务
case SelectStrategy.SELECT:
//查询优先级队列中最近到期的任务时间
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
//优先级队列也没有任务
curDeadlineNanos = NONE; // nothing on the calendar
}
//设置值
nextWakeupNanos.set(curDeadlineNanos);
try {
//再次判断是否具有任务
if (!hasTasks()) {
//阻塞固定时间
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
//IO执行比率为100,则先执行IO任务,然后执行全部的其它任务
if (ioRatio == 100) {
try {
//如果IO事件数量大于0则先处理IO事件
if (strategy > 0) {
processSelectedKeys();
}
} finally {
//然后实行全部任务
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
//记录IO事件的起始时间
final long ioStartTime = System.nanoTime();
try {
//如果IO事件数量大于0则先处理IO事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
//ioTime= 如果存在IO事件,则ioTime代表了底层socketIO事件的执行时间
final long ioTime = System.nanoTime() - ioStartTime;
//计算普通任务的可以执行的时间,比如ioRatio=80%,那么任务能执行的时间就是20%
//(100 - ioRatio) / ioRatio = 1/4
//也就是说如果socket-IO执行了20秒,那么任务最多执行5秒。
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
//如果没有IO任务,则传入0。那么普通任务最多执行64个。
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
//如果具备任务,或者具备IO事件
selectCnt = 0; //则把计数器归0
//否则的话就是空轮训,相当于线程自动唤醒
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
// returns true if selectCnt should be reset
private boolean unexpectedSelectorWakeup(int selectCnt) {
//是否是因为线程中断被唤醒的
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
return true;
}
//默认是512,如果空轮询512次则说明发生了JDK-NIO的BUG
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
//修复BUG,重新创建selector
rebuildSelector();
return true;
}
return false;
}
private static void handleLoopException(Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
@Override
protected void cleanup() {
try {
selector.close();
} catch (IOException e) {
logger.warn("Failed to close a selector.", e);
}
}
void cancel(SelectionKey key) {
key.cancel();
cancelledKeys ++;
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
needsToSelectAgain = true;
}
}
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
private void processSelectedKeysOptimized() {
//循环这一批IO事件上的key
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
//拿到key关联的对象
final Object a = k.attachment();
//强制类型转换
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//拿到Unsafe对象操作底层Socket
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//验证key的有效性
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
//底层事件为Socket客户端连接到服务器端的事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
//通过unsafe调用
unsafe.finishConnect();
}
//可以写的事件,调用底层输出字节流
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
//可读事件或者连接事件,调用底层socket处理
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
int state = 0;
try {
task.channelReady(k.channel(), k);
state = 1;
} catch (Exception e) {
k.cancel();
invokeChannelUnregistered(task, k, e);
state = 2;
} finally {
switch (state) {
case 0:
k.cancel();
invokeChannelUnregistered(task, k, null);
break;
case 1:
if (!k.isValid()) { // Cancelled by channelReady()
invokeChannelUnregistered(task, k, null);
}
break;
}
}
}
private void closeAll() {
selectAgain();
Set<SelectionKey> keys = selector.keys();
Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
for (SelectionKey k: keys) {
Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
channels.add((AbstractNioChannel) a);
} else {
k.cancel();
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, k, null);
}
}
for (AbstractNioChannel ch: channels) {
ch.unsafe().close(ch.unsafe().voidPromise());
}
}
private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) {
try {
task.channelUnregistered(k.channel(), cause);
} catch (Exception e) {
logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
selector.wakeup();
}
}
@Override
protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
// Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
return deadlineNanos < nextWakeupNanos.get();
}
@Override
protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
// Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
return deadlineNanos < nextWakeupNanos.get();
}
Selector unwrappedSelector() {
return unwrappedSelector;
}
int selectNow() throws IOException {
return selector.selectNow();
}
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
//阻塞查询,不会自动唤醒,需要其它线程唤醒
return selector.select();
}
// Timeout will only be 0 if deadline is within 5 microsecs
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
//因为优先级队列有即将到期的任务,所以阻塞固定时间后自动唤醒.
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
private void selectAgain() {
needsToSelectAgain = false;
try {
selector.selectNow();
} catch (Throwable t) {
logger.warn("Failed to update SelectionKeys.", t);
}
}
}
推荐阅读
-
三 分析easyswoole源码(启动服务&TableManager,略提及Cache工具的原理)
-
Mybatis接口Mapper内的方法为啥不能重载?Mapper的源码分析
-
Java之HashMap源码分析(第五篇:访问元素)
-
分析Netty工作流程
-
分析Netty工作流程
-
springboot bean循环依赖实现以及源码分析
-
skynet源码分析之service_logger,skynet_error
-
Vue.js 源码分析(三十) 高级应用 函数式组件 详解
-
荐 【Android 电量优化】JobScheduler 相关源码分析 ( JobSchedulerService 源码分析 | 任务检查 | 任务执行 )
-
LinkedHashMap 核心源码分析