NioEventLoop启动触发条件:
1.服务端绑定本地端口
2.新连接接入通过chooser绑定一个NioEventLoop
服务端绑定本地端口
绑定本地端口,使用下面方法;
ChannelFuture future = bootstrap.bind(host, port).sync();
最终会调用doBind0()方法:
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
public void run() {
if(regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
这个时候就会调用channel对应NioEventLoop的execute方法,会判断是否在当前的eventloop对应的thread中,如果在,直接向任务队列中添加绑定端口的任务,如果不在,首先要start当前eventLoop对应的thread,再将任务放到任务队列中。这里的excute(task)方法,并不是让线程直接执行它,而是将它放到线程的任务队列中,等待线程去执行它。
public void execute(Runnable task) {
if(task == null) {
throw new NullPointerException("task");
} else {
boolean inEventLoop = this.inEventLoop();
if(inEventLoop) {
this.addTask(task);
} else {
this.startThread();
this.addTask(task);
if(this.isShutdown() && this.removeTask(task)) {
reject();
}
}
if(!this.addTaskWakesUp && this.wakesUpForTask(task)) {
this.wakeup(inEventLoop);
}
}
}
这里会调用startThread去启动一个线程,首先会根据状态判断线程是否创建成功,否则使用CAS去创建线程,并调用一个doStartThread去创建一个FastThreadLocalThread,并且这个函数会将一个NioEventLoop与一个thread进行绑定。
private void startThread() {
if (this.state == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) {
this.doStartThread();
}
}
NioEventLoop线程执行逻辑
NioEventLoop对应线程的run方法,run()方法里面是一个死循环,主要的逻辑是首先采用select检查是否有IO事件,如果有IO事件,就采用processSelectedKey()对IO事件进行处理,最后调用runAllTasks()处理任务队列中的任务。
protected void run() {
while(true) {
boolean oldWakenUp = this.wakenUp.getAndSet(false);
try {
if(this.hasTasks()) {
this.selectNow();
} else {
this.select(oldWakenUp);
if(this.wakenUp.get()) {
this.selector.wakeup();
}
}
this.cancelledKeys = 0;
this.needsToSelectAgain = false;
int t = this.ioRatio;
if(t == 100) {
this.processSelectedKeys();
this.runAllTasks();
} else {
long e = System.nanoTime();
this.processSelectedKeys();
long ioTime = System.nanoTime() - e;
this.runAllTasks(ioTime * (long)(100 - t) / (long)t);
}
if(this.isShuttingDown()) {
this.closeAll();
if(this.confirmShutdown()) {
return;
}
}
} catch (Throwable var8) {
logger.warn("Unexpected exception in the selector loop.", var8);
try {
Thread.sleep(1000L);
} catch (InterruptedException var7) {
;
}
}
}
}
这段代码中的ioRadio是控制执行IO事件和执行任务队列中的任务的一个事件比,默认是50,代表执行IO事件处理和执行任务队列的任务事件比是1:1。
1)使用select检测IO事件
通过Selector的select()方法可以选择已经准备就绪的通道 (这些通道包含你感兴趣的的事件)。比如你对读就绪的通道感兴趣,那么select()方法就会返回读事件已经就绪的那些通道。Java中的Selector几个重载的select()方法:
- int select():阻塞到至少有一个通道在你注册的事件上就绪了。
- int select(long timeout):和select()一样,但最长阻塞时间为timeout毫秒。
- int selectNow():非阻塞,只要有通道就绪就立刻返回。
select()方法返回的int值表示有多少通道已经就绪,是自上次调用select()方法后有多少通道变成就绪状态。之前在select()调用时进入就绪的通道不会在本次调用中被记入,而在前一次select()调用进入就绪但现在已经不在处于就绪的通道也不会被记入。例如:首次调用select()方法,如果有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。
一旦调用select()方法,并且返回值不为0时,则可以通过调用Selector的selectedKeys()方法来访问已选择键集合 。如下:
Set selectedKeys=selector.selectedKeys();
Netty中首先判断任务队列是否为空,如果为空的话,就采用select(ltimeout)有超时设置的阻塞方法,如果不为空的话,就调用非阻塞的selectNow()方法,因为即使没有IO事件处理,也可以对任务队列中的任务进行处理。Netty中NioEventLoop的select和selectNow方法其实底层还是依靠selector的select方法。
void selectNow() throws IOException {
try {
this.selector.selectNow();
} finally {
if(this.wakenUp.get()) {
this.selector.wakeup();
}
}
}
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int e = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + this.delayNanos(currentTimeNanos);
while(true) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if(timeoutMillis <= 0L) {
if(e == 0) {
selector.selectNow();
e = 1;
}
break;
}
int selectedKeys = selector.select(timeoutMillis);
++e;
if(selectedKeys != 0 || oldWakenUp || this.wakenUp.get() || this.hasTasks() || this.hasScheduledTasks()) {
break;
}
if(Thread.interrupted()) {
if(logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because Thread.currentThread().interrupt() was called. Use NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
e = 1;
break;
}
long time = System.nanoTime();
if(time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
e = 1;
} else if(SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && e >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding selector.", Integer.valueOf(e));
this.rebuildSelector();
selector = this.selector;
selector.selectNow();
e = 1;
break;
}
currentTimeNanos = time;
}
if(e > 3 && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row.", Integer.valueOf(e - 1));
}
} catch (CancelledKeyException var13) {
if(logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", var13);
}
}
}
可以看到调用selectNow方法是直接调用java nio的select.selectNow方法,而Netty的select方法中有一个参数oldWakeUp记录当前操作是否是唤醒状态(不太清楚这个唤醒状态的作用),每次进行select操作之前,会将其标志位false,表示要进行select操作,而且是未唤醒状态。
Netty中的select方法首先是根据当前时间时间去计算截止时间,这里使用到了超时队列(超时队列的作用也不太清楚),然后根据截止时间去计算超时时间,如果超时时间小于0,就执行selectNow操作,并退出此次select操作,否则执行带有超时时间的select方法,如果返回的selectKey不等于0,也就是有channel在select上注册了,或者该select操作被唤醒了(?),或者任务队列中有了任务,定时任务队列中有了任务,都会break出来。
接下来的代码逻辑是避免JDK空轮询的,当JDK发生了空轮训,select会直接返回,这时并没有IO事件到达,也没有超过超时时间,这样会导致线程进入死循环,CPU利用率飙升至100%,JDK到现在也并没有解决这个问题。
而Netty是通过记录空轮询的次数,如果这个次数达到了一个上限,上限默认是512,那么就新建一个selector,将注册在老selector上的channel注册到新的selector上,并且关闭老的selector,将新的selector替代老的selector。Netty通过rebuildSelector方法重建selector。
public void rebuildSelector() {
if(!this.inEventLoop()) {
this.execute(new Runnable() {
public void run() {
NioEventLoop.this.rebuildSelector();
}
});
} else {
Selector oldSelector = this.selector;
if(oldSelector != null) {
Selector newSelector;
try {
newSelector = this.openSelector();
} catch (Exception var9) {
logger.warn("Failed to create a new Selector.", var9);
return;
}
int nChannels = 0;
label69:
while(true) {
try {
Iterator t = oldSelector.keys().iterator();
while(true) {
if(!t.hasNext()) {
break label69;
}
SelectionKey key = (SelectionKey)t.next();
Object a = key.attachment();
try {
if(key.isValid() && key.channel().keyFor(newSelector) == null) {
int e = key.interestOps();
key.cancel();
SelectionKey var14 = key.channel().register(newSelector, e, a);
if(a instanceof AbstractNioChannel) {
((AbstractNioChannel)a).selectionKey = var14;
}
++nChannels;
}
} catch (Exception var11) {
logger.warn("Failed to re-register a Channel to the new Selector.", var11);
if(a instanceof AbstractNioChannel) {
AbstractNioChannel var13 = (AbstractNioChannel)a;
var13.unsafe().close(var13.unsafe().voidPromise());
} else {
NioTask task = (NioTask)a;
invokeChannelUnregistered(task, key, var11);
}
}
}
} catch (ConcurrentModificationException var12) {
;
}
}
this.selector = newSelector;
try {
oldSelector.close();
} catch (Throwable var10) {
if(logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", var10);
}
}
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
}
}
2)processSelectedKey()
netty中selectedKey的优化
通过调用Selector的selectedKeys()方法来访问已选择键集合,此时返回的是HashSet。但是netty是通过反射的方式,将HashSet替换成数组pssSelectedKeysOptimized去处理IO事件。
private Selector openSelector() {
AbstractSelector selector;
try {
selector = this.provider.openSelector();
} catch (IOException var7) {
throw new ChannelException("failed to open a new selector", var7);
}
if(DISABLE_KEYSET_OPTIMIZATION) {
return selector;
} else {
try {
SelectedSelectionKeySet t = new SelectedSelectionKeySet();
Class selectorImplClass = Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
if(!selectorImplClass.isAssignableFrom(selector.getClass())) {
return selector;
}
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
selectedKeysField.set(selector, t);
publicSelectedKeysField.set(selector, t);
this.selectedKeys = t;
logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
} catch (Throwable var6) {
this.selectedKeys = null;
logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, var6);
}
return selector;
}
}
首先会调用JDK的openSelector方法返回创建的selector,然后会判断是否要对keySet进行优化,通过判断DISABLE_KEYSET_OPTIMIZATION,是否要对keyset进行优化,默认是要对keyset进行优化的。这里的SelectedSelectionKeySet是优化过后的keyset,底层是通过两个数组加上两个数组的大小进行实现的,这样可以使得add操作达到o(1)的时间复杂度(但是是HashSet的add操作时间复杂度不也是o(1))嘛,
processSelectedKey调用processSelectedKeysOptimized
该方法的流程就是遍历数组中所有的selectedKey,一旦遍历完,就将该引用指向为空。获取每一个selectorKey对应的channel,然后通过调用processSelectedKey去处理该channel上感兴趣的事件。
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
int i = 0;
//遍历SelectedKsys
while(true) {
SelectionKey k = selectedKeys[i];
if(k == null) {
return;
}
selectedKeys[i] = null;
//获取selectKey对应的channel
Object a = k.attachment();
if(a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel)((AbstractNioChannel)a));
} else {
NioTask task = (NioTask)a;
processSelectedKey(k, (NioTask)task);
}
if(this.needsToSelectAgain) {
while(selectedKeys[i] != null) {
selectedKeys[i] = null;
++i;
}
this.selectAgain();
selectedKeys = this.selectedKeys.flip();
i = -1;
}
++i;
}
}
这里处理selector上面的IO事件,底层其实都是通过channel的unsafe类进行操作的,这里read和accept事件对应的都是channel的read方法。
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if(!k.isValid()) {
unsafe.close(unsafe.voidPromise());
} else {
try {
int ignored = k.readyOps();
//如果是read或者accept事件就对channel进行读操作
if((ignored & 17) != 0 || ignored == 0) {
unsafe.read();
if(!ch.isOpen()) {
return;
}
}
//write事件
if((ignored & 4) != 0) {
ch.unsafe().forceFlush();
}
//connect事件
if((ignored & 8) != 0) {
int ops = k.interestOps();
ops &= -9;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException var5) {
unsafe.close(unsafe.voidPromise());
}
}
}
3)使用runAllTasks()执行任务队列中的事件
定时任务队列是一个PriorityQueue(优先级队列),定时的任务的排序是按照任务的截止时间排序的,也是非线程安全的队列。
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if(task == null) {
throw new NullPointerException("task");
} else {
if(this.inEventLoop()) {
this.delayedTaskQueue.add(task);
} else {
this.execute(new Runnable() {
public void run() {
SingleThreadEventExecutor.this.delayedTaskQueue.add(task);
}
});
}
return task;
}
}
runAllTask首先从定时任务队列中拉取定时任务,将需要执行的定时任务加入到普通任务队列中,并计算截止时间,然后循环的从普通任务队列中拉取任务,并执行任务,这里判断是否到达超时时间,是每相隔64个任务,就判断是否到达最大任务执行时间。为啥要每隔64个任务判断是否超时呢?因为nanoTime也是比较费时的。
protected boolean runAllTasks(long timeoutNanos) {
this.fetchFromDelayedQueue();
Runnable task = this.pollTask();
if(task == null) {
return false;
} else {
long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0L;
long lastExecutionTime;
while(true) {
try {
task.run();
} catch (Throwable var11) {
logger.warn("A task raised an exception.", var11);
}
++runTasks;
if((runTasks & 63L) == 0L) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if(lastExecutionTime >= deadline) {
break;
}
}
task = this.pollTask();
if(task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
this.lastExecutionTime = lastExecutionTime;
return true;
}
}
从定时队列中拉取任务,这里拉取的任务是拉取截止时间不超过nanoTime的任务,将任务从定时任务队列中删除,将任务加入到普通任务队列中。这个while循环执行完成之后,所有需要执行的定时任务全部都加入到普通任务队列中。
private void fetchFromDelayedQueue() {
long nanoTime = 0L;
while (true) {
ScheduledFutureTask delayedTask = (ScheduledFutureTask) this.delayedTaskQueue.peek();
if (delayedTask == null) {
break;
}
if (nanoTime == 0L) {
nanoTime = ScheduledFutureTask.nanoTime();
}
if (delayedTask.deadlineNanos() > nanoTime) {
break;
}
this.delayedTaskQueue.remove();
this.taskQueue.add(delayedTask);
}
}
定时任务队列是一个优先级队列,队列按照优先级进行排序,这里的优先级是每个任务的截止时间,队列是按照截止时间的早晚对任务进行排序的。
public int compareTo(Delayed o) {
if(this == o) {
return 0;
} else {
ScheduledFutureTask that = (ScheduledFutureTask)o;
long d = this.deadlineNanos() - that.deadlineNanos();
if(d < 0L) {
return -1;
} else if(d > 0L) {
return 1;
} else if(this.id < that.id) {
return -1;
} else if(this.id == that.id) {
throw new Error();
} else {
return 1;
}
}
}
总结:
1.默认情况下,NioEventLoopGroup会创建2*cpu个数的线程池,在调用NioEventLoop.execute(task)的时候,如果当前的NioEventLoop没有创建自己的线程,就会创建线程。
2.Netty如何解决JDK空轮训bug?通过计算空轮训操作的个数,这里的空轮训的判断是既没有IO事件的到达,也没有达到超时时间,如果空轮训的个数超过阈值(512),就会新建一个selector,将旧selector的selectorKey注册到新的selector上,将旧的selector关闭,用新的selector替代旧的selector。
3.Netty在所有外部线程调用NioEventLoop的操作时,如果通过InEventLoop判断是否在NioEventLoop所属的线程,如果不在通过startThread启动NioEventLoop的线程,并且将任务添加到NioEventLoop的任务队列中,所有NioEventLoop对应一个线程,其中的操作只会被一个线程所执行,实现了异步串行无锁化。
4.当NioEventLoop第一次调用execute()方法,会新建一个FastThreadLocalThread与NioEventLoop绑定。