欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty优雅退出机制shutdownGracefully源码分析

程序员文章站 2022-04-22 21:56:14
...

 使用Netty开发的小伙伴肯定对下面这两句代码非常熟悉了

bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
那就是Netty中大名鼎鼎的优雅退出,顾名思义它的作用就是使线程池退出,用我们都用过,那么它到底是如何工作的呢?

由于Netty处理的是线程池,线程池的关闭要求其中的每一个线程关闭。而线程的实现实在SingleThreadEventExecutor类,

所以我们将再次回到这个类,首先看其中的shutdownGracefully()方法,其中的参数quietPeriod为静默时间,timeout为截至时间,

第三个参数是TimeUnit unit为时间单位,其中该类还定义了一个变量gracefulShutdownStartTime,即优雅关闭开始时间,代码如下:

    @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        if (isShuttingDown()) {
            return terminationFuture(); // 正在关闭阻止其他线程
        }

        boolean inEventLoop = inEventLoop();
        boolean wakeup;
        int oldState;
        for (;;) {
            if (isShuttingDown()) {
                return terminationFuture(); // 正在关闭阻止其他线程
            }
            int newState;
            wakeup = true;
            oldState = STATE_UPDATER.get(this);
            if (inEventLoop) {
                newState = ST_SHUTTING_DOWN;
            } else {
                switch (oldState) {
                    case ST_NOT_STARTED:
                    case ST_STARTED:
                        newState = ST_SHUTTING_DOWN;
                        break;
                    default: // 一个线程已修改好线程状态,此时这个线程才执行16行代码
                        newState = oldState;
                        wakeup = false; // 已经有线程唤醒,所以不用再唤醒
                }
            }
            if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
                break;  // 保证只有一个线程将oldState修改为newState
            }
            // 隐含STATE_UPDATER已被修改,则在下一次循环返回
        }
         // 在default情况下会更新这两个值
        gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
        gracefulShutdownTimeout = unit.toNanos(timeout);

        if (oldState == ST_NOT_STARTED) {
            thread.start();
        }
        if (wakeup) {
            wakeup(inEventLoop);
        }
        return terminationFuture();
    }
这段代码考虑了多线程同时调用关闭的情况,我们抓住其中的关键点,该方法只是将线程状态修改为ST_SHUTTING_DOWN

并不执行具体的关闭操作(类似shutdown方法将线程状态修改为ST_SHUTDOWN)。for()循环是为了保证修改state的线程(原生

线程或者外部线程)有且只有一个,并且通过CAS操作来确保线程安全。所以这段代码的关键在于修改STATE_UPDATER状态,

这个状态有什么作用呢?

想想如果我们需要让阻塞的线程,从阻塞状态退出,使其从一个EventLoop循环中退出,有什么办法来通知它?

就是通过设置一个状态变量,来告诉它现在线程池的状态,从而它判断接下来是退出还是等待执行。

本人上一篇博文讲解NioEventLoop中讲到,run()方法中有这么一段代码

f (this.isShuttingDown()) {
                this.closeAll();
                if (this.confirmShutdown()) {
                    this.cleanupAndTerminate(true);
                    return;
                }
            }
其实查询线程状态的方法有三个:

    public boolean isShuttingDown() {
        return STATE_UPDATER.get(this) >= ST_SHUTTING_DOWN;
    }

    public boolean isShutdown() {
        return STATE_UPDATER.get(this) >= ST_SHUTDOWN;
    }

    public boolean isTerminated() {
        return STATE_UPDATER.get(this) == ST_TERMINATED;
    }
正是使用了刚才的那个 STATE_UPDATER状态变量。

需要注意的是调用shutdownGracefully()方法后线程状态为ST_SHUTTING_DOWN,调用shutdown()方法后线程状态为

ST_SHUTDOWN,isShuttingDown()可以一并判断这两种状态。closeAll()方法关闭注册到NioEventLoop的所有Channel,

代码不在列出,在博主上一篇博文有。confirmShutdown()方法在SingleThreadEventExecutor类,确定是否可以关闭或者

说是否可以从NioEventLoop循环中跳出。代码如下:

    protected boolean confirmShutdown() {
        if (!isShuttingDown()) {
            return false;   // 没有调用shutdown相关的方法直接返回
        }
        if (!inEventLoop()) {   // 必须是原生线程
            throw new IllegalStateException("must be invoked from an event loop");
        }

        cancelScheduledTasks(); // 取消调度任务
        if (gracefulShutdownStartTime == 0) {   // 优雅关闭开始时间,这也是一个标记
            gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
        }

        // 执行完普通任务或者没有普通任务时执行完shutdownHook任务
        if (runAllTasks() || runShutdownHooks()) {
            if (isShutdown()) {
                return true;    // 调用shutdown()方法直接退出
            }
            if (gracefulShutdownQuietPeriod == 0) {
                return true;    // 优雅关闭静默时间为0也直接退出
            }
            wakeup(true);   // 优雅关闭但有未执行任务,唤醒线程执行
            return false;
        }

        final long nanoTime = ScheduledFutureTask.nanoTime();
        // shutdown()方法调用直接返回,优雅关闭截止时间到也返回
        if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
            return true;
        }
        // 在静默期间每100ms唤醒线程执行期间提交的任务
        if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
            wakeup(true);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // Ignore
            }
            return false;
        }
        // 静默时间内没有任务提交,可以优雅关闭,此时若用户又提交任务则不会被执行
        return true;
    }
总结一下,调用shutdown()方法只要满足下列任一条件既能从循环跳出:

1.执行完普通任务

2.没有普通任务,执行完shutdownHook任务

3.既没有普通任务,也没有shutdownHook任务

调用shutdownGracefully()方法只要满足下列任一条件既能从循环跳出:

1.执行完普通任务且静默时间为0

2.没有普通任务,执行完shutdownHook任务且静默时间为0

3.静默期间没有任务提交

4.优雅关闭截至时间已到

我们可以将静默时间看作为一段观察期,在此期间如果没有任务执行,说明可以跳出循环;如果此期间有任务执行,

执行完后立即进入下一个观察期继续观察;如果连续多个观察期一直有任务执行,那么截止时间到则跳出循环。

我们看看shutdownGracefully()的默认参数

    public Future<?> shutdownGracefully() {
        return shutdownGracefully(2, 15, TimeUnit.SECONDS);
    }

可知,Netty默认为,在2秒的静默时间内如果没有任务,则关闭;否则15秒截止时间到达时关闭。

至此,我们已经清楚了从EventLoop循环中跳出的机制,最后,我们抵达终点站:线程结束机制。这一部分的代码实现

在线程工厂的生成方法中:

    thread = threadFactory.newThread(new Runnable() {
            @Override
            public void run() {
                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();   // 模板方法,EventLoop实现
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                        // 用户调用了关闭的方法或者抛出异常
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;  // 抛出异常也将状态置为ST_SHUTTING_DOWN
                        }
                    }
                    if (success && gracefulShutdownStartTime == 0) {
                        // time=0,说明confirmShutdown()方法没有调用,记录日志
                    }

                    try {
                        for (;;) {
                            // 抛出异常时,将普通任务和shutdownHook任务执行完毕
                            // 正常关闭时,结合前述的循环跳出条件
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            // 线程状态设置为ST_TERMINATED,线程终止
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                //  关闭时,任务队列中添加了任务,记录日志
                            }
                            terminationFuture.setSuccess(null); // 异步结果设置为成功
                        }
                    }
                }
            }
        });
20-22行代码说明子类在实现模板方法run()时,须调用confirmShutdown()方法,不调用的话会有错误日志。25-31行的

for()循环主要是对异常情况的处理,但同时也兼顾了正常调用关闭方法的情况。可以将抛出异常的情况视为静默时间为0的

shutdownGracefully()方法,这样便于理解循环跳出条件。34行代码cleanup()的默认实现什么也不做,NioEventLoop覆盖了基类。

实现关闭NioEventLoop持有的selector:

    protected void cleanup() {
        try {
            selector.close();
        } catch (IOException e) {
            logger.warn("Failed to close a selector.", e);
        }
    }
关于Netty优雅关闭的机制,还有最后一点细节,那就是runShutdownHooks()方法:

    private boolean runShutdownHooks() {
        boolean ran = false;
        while (!shutdownHooks.isEmpty()) {
            // 使用copy是因为shutdwonHook任务中可以添加或删除shutdwonHook任务
            List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
            shutdownHooks.clear();
            for (Runnable task: copy) {
                try {
                    task.run();
                } catch (Throwable t) {
                    logger.warn("Shutdown hook raised an exception.", t);
                } finally {
                    ran = true;
                }
            }
        }
        if (ran) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
        return ran;
    }
此外,还有threadLock.release()方法,threadLock是一个初始值为0的信号量。一个初值为0的信号量,当线程请求锁时只会阻塞,

这又什么作用呢?从awaitTermination()方法揭晓答案,用来使其它线程阻塞等待原生线程关闭:

    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        // 由于tryAcquire()永远不会成功,所以必定阻塞timeout时间
        if (threadLock.tryAcquire(timeout, unit)) {
            threadLock.release();
        }
        return isTerminated();
    }
关于shutdownGracefully的分析就到此,欢迎一起讨论