Netty中 什么是NioEventLoop源码分析
interface Executor
public interface ExecutorService extends Executor
public abstract class AbstractExecutorService implements ExecutorService
public abstract class AbstractEventExecutor extends AbstractExecutorService
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor ->//这个类实现了具体的execute方法,方法实现中调用了run方法
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor
public final class NioEventLoop extends SingleThreadEventLoop ->//这个类中实现了run方法,被execute方法调用
Exector接口最重要的就是execute方法,我们找下这个方法的实现
EventLoopContext.executeAsync
->nettyEventLoop().execute(task);
->AbstractScheduledEventExecutor.execute(task)
-> AbstractScheduledEventExecutor.startThread() / addTask(task)
->NioEventLoop.run()
->AbstractScheduledEventExecutor.runAllTasks()
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor{
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//调用了run方法,这个run方法的实现如下
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
}
}
}
public final class NioEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
//execute()-> startThread->run()->runAllTasks
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
推荐阅读
-
Flink中watermark为什么选择最小一条(源码分析)
-
Mybaits 源码解析 (六)----- 全网最详细:Select 语句的执行过程分析(上篇)(Mapper方法是如何调用到XML中的SQL的?)
-
SpringMVC中DispatchServlet是单例还是多例(附源码分析)
-
netty中的发动机--EventLoop及其实现类NioEventLoop的源码分析
-
Netty中的ChannelPipeline源码分析
-
Netty中NioEventLoopGroup的创建源码分析
-
业内人士从三方面分析,短视频源码为什么是互联网的下个风口
-
Netty中FastThreadLocal源码分析
-
Netty源码分析(二):Reactor模型在Netty中的应用
-
Flink中watermark为什么选择最小一条(源码分析)