欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty源码分析之NioEventLoop执行流程

程序员文章站 2022-04-23 11:53:21
...

NioEventLoop启动触发条件:

1.服务端绑定本地端口

2.新连接接入通过chooser绑定一个NioEventLoop

服务端绑定本地端口

  绑定本地端口,使用下面方法;

ChannelFuture future = bootstrap.bind(host, port).sync();

  最终会调用doBind0()方法:

private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        public void run() {
            if(regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }

        }
    });
}

  这个时候就会调用channel对应NioEventLoop的execute方法,会判断是否在当前的eventloop对应的thread中,如果在,直接向任务队列中添加绑定端口的任务,如果不在,首先要start当前eventLoop对应的thread,再将任务放到任务队列中。这里的excute(task)方法,并不是让线程直接执行它,而是将它放到线程的任务队列中,等待线程去执行它。

public void execute(Runnable task) {
    if(task == null) {
        throw new NullPointerException("task");
    } else {
        boolean inEventLoop = this.inEventLoop();
        if(inEventLoop) {
            this.addTask(task);
        } else {
            this.startThread();
            this.addTask(task);
            if(this.isShutdown() && this.removeTask(task)) {
                reject();
            }
        }

        if(!this.addTaskWakesUp && this.wakesUpForTask(task)) {
            this.wakeup(inEventLoop);
        }

    }
}

  这里会调用startThread去启动一个线程,首先会根据状态判断线程是否创建成功,否则使用CAS去创建线程,并调用一个doStartThread去创建一个FastThreadLocalThread,并且这个函数会将一个NioEventLoop与一个thread进行绑定。

private void startThread() {
    if (this.state == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) {
        this.doStartThread();
    }
}

NioEventLoop线程执行逻辑

  NioEventLoop对应线程的run方法,run()方法里面是一个死循环,主要的逻辑是首先采用select检查是否有IO事件,如果有IO事件,就采用processSelectedKey()对IO事件进行处理,最后调用runAllTasks()处理任务队列中的任务。

protected void run() {
    while(true) {
        boolean oldWakenUp = this.wakenUp.getAndSet(false);

        try {
            if(this.hasTasks()) {
                this.selectNow();
            } else {
                this.select(oldWakenUp);
                if(this.wakenUp.get()) {
                    this.selector.wakeup();
                }
            }

            this.cancelledKeys = 0;
            this.needsToSelectAgain = false;
            int t = this.ioRatio;
            if(t == 100) {
                this.processSelectedKeys();
                this.runAllTasks();
            } else {
                long e = System.nanoTime();
                this.processSelectedKeys();
                long ioTime = System.nanoTime() - e;
                this.runAllTasks(ioTime * (long)(100 - t) / (long)t);
            }

            if(this.isShuttingDown()) {
                this.closeAll();
                if(this.confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable var8) {
            logger.warn("Unexpected exception in the selector loop.", var8);

            try {
                Thread.sleep(1000L);
            } catch (InterruptedException var7) {
                ;
            }
        }
    }
}

  这段代码中的ioRadio是控制执行IO事件和执行任务队列中的任务的一个事件比,默认是50,代表执行IO事件处理和执行任务队列的任务事件比是1:1。

1)使用select检测IO事件

  通过Selector的select()方法可以选择已经准备就绪的通道 (这些通道包含你感兴趣的的事件)。比如你对读就绪的通道感兴趣,那么select()方法就会返回读事件已经就绪的那些通道。Java中的Selector几个重载的select()方法:

  • int select():阻塞到至少有一个通道在你注册的事件上就绪了。
  • int select(long timeout):和select()一样,但最长阻塞时间为timeout毫秒。
  • int selectNow():非阻塞,只要有通道就绪就立刻返回。

  select()方法返回的int值表示有多少通道已经就绪,是自上次调用select()方法后有多少通道变成就绪状态。之前在select()调用时进入就绪的通道不会在本次调用中被记入,而在前一次select()调用进入就绪但现在已经不在处于就绪的通道也不会被记入。例如:首次调用select()方法,如果有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。

  一旦调用select()方法,并且返回值不为0时,则可以通过调用Selector的selectedKeys()方法来访问已选择键集合 。如下: 

Set selectedKeys=selector.selectedKeys(); 

  Netty中首先判断任务队列是否为空,如果为空的话,就采用select(ltimeout)有超时设置的阻塞方法,如果不为空的话,就调用非阻塞的selectNow()方法,因为即使没有IO事件处理,也可以对任务队列中的任务进行处理。Netty中NioEventLoop的select和selectNow方法其实底层还是依靠selector的select方法。

void selectNow() throws IOException {
    try {
        this.selector.selectNow();
    } finally {
        if(this.wakenUp.get()) {
            this.selector.wakeup();
        }

    }

}

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;

    try {
        int e = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + this.delayNanos(currentTimeNanos);

        while(true) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if(timeoutMillis <= 0L) {
                if(e == 0) {
                    selector.selectNow();
                    e = 1;
                }
                break;
            }

            int selectedKeys = selector.select(timeoutMillis);
            ++e;
            if(selectedKeys != 0 || oldWakenUp || this.wakenUp.get() || this.hasTasks() || this.hasScheduledTasks()) {
                break;
            }

            if(Thread.interrupted()) {
                if(logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because Thread.currentThread().interrupt() was called. Use NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }

                e = 1;
                break;
            }

            long time = System.nanoTime();
            if(time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                e = 1;
            } else if(SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && e >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding selector.", Integer.valueOf(e));
                this.rebuildSelector();
                selector = this.selector;
                selector.selectNow();
                e = 1;
                break;
            }

            currentTimeNanos = time;
        }

        if(e > 3 && logger.isDebugEnabled()) {
            logger.debug("Selector.select() returned prematurely {} times in a row.", Integer.valueOf(e - 1));
        }
    } catch (CancelledKeyException var13) {
        if(logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", var13);
        }
    }

}

  可以看到调用selectNow方法是直接调用java nio的select.selectNow方法,而Netty的select方法中有一个参数oldWakeUp记录当前操作是否是唤醒状态(不太清楚这个唤醒状态的作用),每次进行select操作之前,会将其标志位false,表示要进行select操作,而且是未唤醒状态。

  Netty中的select方法首先是根据当前时间时间去计算截止时间,这里使用到了超时队列(超时队列的作用也不太清楚),然后根据截止时间去计算超时时间,如果超时时间小于0,就执行selectNow操作,并退出此次select操作,否则执行带有超时时间的select方法,如果返回的selectKey不等于0,也就是有channel在select上注册了,或者该select操作被唤醒了(?),或者任务队列中有了任务,定时任务队列中有了任务,都会break出来。

  接下来的代码逻辑是避免JDK空轮询的,当JDK发生了空轮训,select会直接返回,这时并没有IO事件到达,也没有超过超时时间,这样会导致线程进入死循环,CPU利用率飙升至100%,JDK到现在也并没有解决这个问题。

  而Netty是通过记录空轮询的次数,如果这个次数达到了一个上限,上限默认是512,那么就新建一个selector,将注册在老selector上的channel注册到新的selector上,并且关闭老的selector,将新的selector替代老的selector。Netty通过rebuildSelector方法重建selector。

public void rebuildSelector() {
    if(!this.inEventLoop()) {
        this.execute(new Runnable() {
            public void run() {
                NioEventLoop.this.rebuildSelector();
            }
        });
    } else {
        Selector oldSelector = this.selector;
        if(oldSelector != null) {
            Selector newSelector;
            try {
                newSelector = this.openSelector();
            } catch (Exception var9) {
                logger.warn("Failed to create a new Selector.", var9);
                return;
            }

            int nChannels = 0;

            label69:
            while(true) {
                try {
                    Iterator t = oldSelector.keys().iterator();

                    while(true) {
                        if(!t.hasNext()) {
                            break label69;
                        }

                        SelectionKey key = (SelectionKey)t.next();
                        Object a = key.attachment();

                        try {
                            if(key.isValid() && key.channel().keyFor(newSelector) == null) {
                                int e = key.interestOps();
                                key.cancel();
                                SelectionKey var14 = key.channel().register(newSelector, e, a);
                                if(a instanceof AbstractNioChannel) {
                                    ((AbstractNioChannel)a).selectionKey = var14;
                                }

                                ++nChannels;
                            }
                        } catch (Exception var11) {
                            logger.warn("Failed to re-register a Channel to the new Selector.", var11);
                            if(a instanceof AbstractNioChannel) {
                                AbstractNioChannel var13 = (AbstractNioChannel)a;
                                var13.unsafe().close(var13.unsafe().voidPromise());
                            } else {
                                NioTask task = (NioTask)a;
                                invokeChannelUnregistered(task, key, var11);
                            }
                        }
                    }
                } catch (ConcurrentModificationException var12) {
                    ;
                }
            }

            this.selector = newSelector;

            try {
                oldSelector.close();
            } catch (Throwable var10) {
                if(logger.isWarnEnabled()) {
                    logger.warn("Failed to close the old Selector.", var10);
                }
            }

            logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
        }
    }
}

2)processSelectedKey()

netty中selectedKey的优化

  通过调用Selector的selectedKeys()方法来访问已选择键集合,此时返回的是HashSet。但是netty是通过反射的方式,将HashSet替换成数组pssSelectedKeysOptimized去处理IO事件。

 

 

private Selector openSelector() {
    AbstractSelector selector;
    try {
        selector = this.provider.openSelector();
    } catch (IOException var7) {
        throw new ChannelException("failed to open a new selector", var7);
    }

    if(DISABLE_KEYSET_OPTIMIZATION) {
        return selector;
    } else {
        try {
            SelectedSelectionKeySet t = new SelectedSelectionKeySet();
            Class selectorImplClass = Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
            if(!selectorImplClass.isAssignableFrom(selector.getClass())) {
                return selector;
            }

            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
            selectedKeysField.setAccessible(true);
            publicSelectedKeysField.setAccessible(true);
            selectedKeysField.set(selector, t);
            publicSelectedKeysField.set(selector, t);
            this.selectedKeys = t;
            logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
        } catch (Throwable var6) {
            this.selectedKeys = null;
            logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, var6);
        }

        return selector;
    }
}

 

  首先会调用JDK的openSelector方法返回创建的selector,然后会判断是否要对keySet进行优化,通过判断DISABLE_KEYSET_OPTIMIZATION,是否要对keyset进行优化,默认是要对keyset进行优化的。这里的SelectedSelectionKeySet是优化过后的keyset,底层是通过两个数组加上两个数组的大小进行实现的,这样可以使得add操作达到o(1)的时间复杂度(但是是HashSet的add操作时间复杂度不也是o(1))嘛,

 

processSelectedKey调用processSelectedKeysOptimized

  该方法的流程就是遍历数组中所有的selectedKey,一旦遍历完,就将该引用指向为空。获取每一个selectorKey对应的channel,然后通过调用processSelectedKey去处理该channel上感兴趣的事件。

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
    int i = 0;
  
  //遍历SelectedKsys while(true) { SelectionKey k = selectedKeys[i]; if(k == null) { return; } selectedKeys[i] = null;
     //获取selectKey对应的channel Object a = k.attachment(); if(a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel)((AbstractNioChannel)a)); } else { NioTask task = (NioTask)a; processSelectedKey(k, (NioTask)task); } if(this.needsToSelectAgain) { while(selectedKeys[i] != null) { selectedKeys[i] = null; ++i; } this.selectAgain(); selectedKeys = this.selectedKeys.flip(); i = -1; } ++i; } }

  这里处理selector上面的IO事件,底层其实都是通过channel的unsafe类进行操作的,这里read和accept事件对应的都是channel的read方法。

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if(!k.isValid()) {
        unsafe.close(unsafe.voidPromise());
    } else {
        try {
            int ignored = k.readyOps();
       //如果是read或者accept事件就对channel进行读操作 if((ignored & 17) != 0 || ignored == 0) { unsafe.read(); if(!ch.isOpen()) { return; } }
       //write事件 if((ignored & 4) != 0) { ch.unsafe().forceFlush(); }
       //connect事件 if((ignored & 8) != 0) { int ops = k.interestOps(); ops &= -9; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException var5) { unsafe.close(unsafe.voidPromise()); } } }

 

3)使用runAllTasks()执行任务队列中的事件

  定时任务队列是一个PriorityQueue(优先级队列),定时的任务的排序是按照任务的截止时间排序的,也是非线程安全的队列。

private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if(task == null) {
        throw new NullPointerException("task");
    } else {
        if(this.inEventLoop()) {
            this.delayedTaskQueue.add(task);
        } else {
            this.execute(new Runnable() {
                public void run() {
                    SingleThreadEventExecutor.this.delayedTaskQueue.add(task);
                }
            });
        }

        return task;
    }
}

  runAllTask首先从定时任务队列中拉取定时任务,将需要执行的定时任务加入到普通任务队列中,并计算截止时间,然后循环的从普通任务队列中拉取任务,并执行任务,这里判断是否到达超时时间,是每相隔64个任务,就判断是否到达最大任务执行时间。为啥要每隔64个任务判断是否超时呢?因为nanoTime也是比较费时的。

protected boolean runAllTasks(long timeoutNanos) {
    this.fetchFromDelayedQueue();
    Runnable task = this.pollTask();
    if(task == null) {
        return false;
    } else {
        long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0L;

        long lastExecutionTime;
        while(true) {
            try {
                task.run();
            } catch (Throwable var11) {
                logger.warn("A task raised an exception.", var11);
            }

            ++runTasks;
            if((runTasks & 63L) == 0L) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if(lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = this.pollTask();
            if(task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
}

  从定时队列中拉取任务,这里拉取的任务是拉取截止时间不超过nanoTime的任务,将任务从定时任务队列中删除,将任务加入到普通任务队列中。这个while循环执行完成之后,所有需要执行的定时任务全部都加入到普通任务队列中。

private void fetchFromDelayedQueue() {
    long nanoTime = 0L;

    while (true) {
        ScheduledFutureTask delayedTask = (ScheduledFutureTask) this.delayedTaskQueue.peek();
        if (delayedTask == null) {
            break;
        }

        if (nanoTime == 0L) {
            nanoTime = ScheduledFutureTask.nanoTime();
        }

        if (delayedTask.deadlineNanos() > nanoTime) {
            break;
        }

        this.delayedTaskQueue.remove();
        this.taskQueue.add(delayedTask);
    }
}

   定时任务队列是一个优先级队列,队列按照优先级进行排序,这里的优先级是每个任务的截止时间,队列是按照截止时间的早晚对任务进行排序的。

public int compareTo(Delayed o) {
    if(this == o) {
        return 0;
    } else {
        ScheduledFutureTask that = (ScheduledFutureTask)o;
        long d = this.deadlineNanos() - that.deadlineNanos();
        if(d < 0L) {
            return -1;
        } else if(d > 0L) {
            return 1;
        } else if(this.id < that.id) {
            return -1;
        } else if(this.id == that.id) {
            throw new Error();
        } else {
            return 1;
        }
    }
}

总结:

1.默认情况下,NioEventLoopGroup会创建2*cpu个数的线程池,在调用NioEventLoop.execute(task)的时候,如果当前的NioEventLoop没有创建自己的线程,就会创建线程。

2.Netty如何解决JDK空轮训bug?通过计算空轮训操作的个数,这里的空轮训的判断是既没有IO事件的到达,也没有达到超时时间,如果空轮训的个数超过阈值(512),就会新建一个selector,将旧selector的selectorKey注册到新的selector上,将旧的selector关闭,用新的selector替代旧的selector。

3.Netty在所有外部线程调用NioEventLoop的操作时,如果通过InEventLoop判断是否在NioEventLoop所属的线程,如果不在通过startThread启动NioEventLoop的线程,并且将任务添加到NioEventLoop的任务队列中,所有NioEventLoop对应一个线程,其中的操作只会被一个线程所执行,实现了异步串行无锁化。

 4.当NioEventLoop第一次调用execute()方法,会新建一个FastThreadLocalThread与NioEventLoop绑定。