Netty源码分析-NioEventLoop(二)
程序员文章站
2022-04-23 11:49:48
...
NioEventLoop源码分析。
EventLoop的本质:内部一个线程,一个有序队列存储,线程源源不断的运行队列中的任务。
register方法把java-nio的channel注册到selector上面。
//把JAVA底层Channel注册到selector上
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
//把ch注册到java底层selector上面,代码省略了一些安全检查
try {
ch.register(selector, interestOps, task);
} catch (Exception e) {
throw new EventLoopException("failed to register a channel", e);
}
}
run方法是核心的启动方法,在父类中,创建好线程启动以后会调用run方法,在这里会处理selector的selelct事件和所有的IO事件
@Override
protected void run() {
//这个方法是父类定义的抽象方法,在父类线程创建启动后调用此方法。
for (;;) {
try {
//如果底层队列存储在任务hasTasks会返回true,那么调用selelctNow返回key的数量,可能为0
//如果没有则返回SelectStrategy.SELECT
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
//查询selector,如果没有任务则会在selector上阻塞一会,否则立即返回
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
//IO时间的比例
final int ioRatio = this.ioRatio;
//如果是百分百
if (ioRatio == 100) {
try {
//先处理所有key事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
//然后处理所有IO任务
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
//先处理所有key事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
//执行一段时间的IO任务
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
processSelectedKey,处理selector的selelct到的key。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) { //key无效
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
//如果ch当中的el不是当前对象则return
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}
try {
//获取key的事件类型
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
//如果是连接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
//完成连接
unsafe.finishConnect();
}
//如果是write事件,则调用底层代码写入逻辑
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
//如果是读事件,交给底层逻辑处理
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}