Netty 4 源码分析——EventExecutor
程序员文章站
2023-12-29 14:13:04
...
先从EventExecutor开始,因为它是一个很基础的工具类,是对I/O线程的包装。先了解下它的源码会对后面的分析有更好的理解。
先看下EventExecutor的类关系图,这里只是简单的画出了类和接口的继承和实现关系,还有其他的聚合关系没有画出来,为的是便于分析思路的清晰。
说到Executor,很容易联想到jdk中 java.util.concurrent.Executor 接口,这个接口非常简单,就一个方法
从方法签名上就能看出这个是为了支持异步模式的。command表示一个命令。当前线程就是命令者角色,Executor内部的去运行Runnable的线程就是执行者。这里没有提供明确的地方供命令者去取得命令的执行结果。
ExecutorService 继承了Executor 接口,增加了对自身生命周期管理的方法,同时提供了一个Future给命令者去获取命令的执行结果。
ScheduledExecutorService 继承了ExecutorService接口,增加了对定时任务的支持。
EventExecutorGroup 继承了ScheduledExecutorService接口,对原来的ExecutorService的关闭接口提供了增强,提供了优雅的关闭接口。从接口名称上可以看出它是对多个EventExecutor的集合,提供了对多个EventExecutor的迭代访问接口。
EventExecutor 继承EventExecutorGroup 看着这个关系真心有些纠结啊。不过细想下还是能理解的。A是B中的一员,但是A也能迭代访问B中的其他成员。这个继承关系支持了迭代访问这个行为。自然的他提供了一个parent接口,来获取所属的EventExecutorGroup 。另外提供了inEventLoop 方法支持查询某个线程是否在EventExecutor所管理的线程中。还有其他一些创建Promise和Future的方法。
AbstractEventExecutor 只是对EventExecutor中某些方法的简单实现
下面重点分析下非常有意思SingleThreadEventExecutor,它也是个抽象类,但是提供了很多重要方法的实现。弄清楚了这个对整个EventExecutor体系都非常有帮助。从类名上可知里面只有一个线程,先搞清楚一个线程的处理过程再理解多线程的就轻松些了。
先从execute方法入口分析
先不看这个源码,我们分析下,作为一个Executor,A让你执行命令A,B让你执行命令B。。。命令不是说执行就能执行的吧。总得有个地方保存还没来得及执行的命令吧,总得有个先来后到吧。而这个地方又会被多线程访问,得保证多线程访问可见性,操作的原子性。jdk中提供的BlockingQueue就是为此而生的。BlockingQueue是一个接口,jdk中有很多他的实现。SingleThreadEventExcutor中提供的这个地方叫tasksQueue,类型是jdk中的LinkedBlockingQueue。上面代码中的addTask就是往tasksQueue中添加。
另外SingleThreadEventExcutor实现了ScheduledExecutorService 接口,支持执行定时任务。得有个地方存放定时任务信息。类中的实现是delayedTaskQueue,它是一个PriorityQueue ,也是一个BlockingQueue。不过它里面的元素不是按照先来后到的顺序存取的,而是按照各个元素的优先级判断的。
因为这个execute方法,是可以在外部线程调用,也可以在内部线程调用。也就是说外部成员可以给你下命令,内部成员也可以给你下命令。所以在上面的代码中先调用inEventLoop判断当前下命令的是外部的还是内部的。
如果是外部的,先确定内部线程是否启动,没启动就先启动内部线程同时给自己加一个定时清理的定时任务。这个从下面的代码中可以看出
添加命令完成了,下面就看如何去执行命令了,这个就需要分析下内部线程的执行逻辑了。SingleThreadEventExecutor类中有一个实力变量Thread,它引用的就是当前Executor所拥有的那个thread对象。
上面Thread内部run方法执行的是SingleThreadEventExecutor.this.run(),而这个run方法是一个抽象方法,留给了子类去实现了。不过可以肯定的是子类的run方法是不断的去tasksQueue中取出task去执行。现在重点分析下finally块中的代码。
1、首先更改状态为正在关闭状态。
2、如果子类中的run方法中的loop执行成功了,就得先调用confirmShutdown,确认任务队列中的任务是否都已经被执行了。
3、然后还得再次确认下任务队列中是否已被执行完毕,因为在关闭的过程中外部也是能添加任务的。
4、最终执行清理工作,更改状态为已关闭,释放信号量。
5、如果这个时候还是有任务没执行完,那也只能是无奈了,记个log吧
6、更新整个关闭过程为success
再分析下confirmShutdown,看看是如何保证所有的task执行完成的呢
后面再看看这个类中的一些内部变量
这个WAKEUP_TASK什么也不做,为啥取名wakeup呢?我看源码也没太明白。有人理解的给解释下吧
threadLock的内部permits设置为0,也就是说acquire()永远获取不到permit,会一直被阻塞着。那有什么用呢?另一种实现wait()/notify()吧
值得注意的是类中是如何来控制定时任务的呢?秘密在这个方法中
ScheduledFutureTask类中有个变量记录这个类被加载进内存中的时间
所以fetchFromDelayedQueue()方法的逻辑就是先取出即将到期的task,判断是否已经到期,若已经到期就加入到tasksQueue中,等到被执行。
先分析到这里,后面在补上。
有什么不正确的,也请大家指正。
先看下EventExecutor的类关系图,这里只是简单的画出了类和接口的继承和实现关系,还有其他的聚合关系没有画出来,为的是便于分析思路的清晰。
说到Executor,很容易联想到jdk中 java.util.concurrent.Executor 接口,这个接口非常简单,就一个方法
void execute(Runnable command);
从方法签名上就能看出这个是为了支持异步模式的。command表示一个命令。当前线程就是命令者角色,Executor内部的去运行Runnable的线程就是执行者。这里没有提供明确的地方供命令者去取得命令的执行结果。
ExecutorService 继承了Executor 接口,增加了对自身生命周期管理的方法,同时提供了一个Future给命令者去获取命令的执行结果。
ScheduledExecutorService 继承了ExecutorService接口,增加了对定时任务的支持。
EventExecutorGroup 继承了ScheduledExecutorService接口,对原来的ExecutorService的关闭接口提供了增强,提供了优雅的关闭接口。从接口名称上可以看出它是对多个EventExecutor的集合,提供了对多个EventExecutor的迭代访问接口。
EventExecutor 继承EventExecutorGroup 看着这个关系真心有些纠结啊。不过细想下还是能理解的。A是B中的一员,但是A也能迭代访问B中的其他成员。这个继承关系支持了迭代访问这个行为。自然的他提供了一个parent接口,来获取所属的EventExecutorGroup 。另外提供了inEventLoop 方法支持查询某个线程是否在EventExecutor所管理的线程中。还有其他一些创建Promise和Future的方法。
AbstractEventExecutor 只是对EventExecutor中某些方法的简单实现
下面重点分析下非常有意思SingleThreadEventExecutor,它也是个抽象类,但是提供了很多重要方法的实现。弄清楚了这个对整个EventExecutor体系都非常有帮助。从类名上可知里面只有一个线程,先搞清楚一个线程的处理过程再理解多线程的就轻松些了。
先从execute方法入口分析
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp) { wakeup(inEventLoop); } }
先不看这个源码,我们分析下,作为一个Executor,A让你执行命令A,B让你执行命令B。。。命令不是说执行就能执行的吧。总得有个地方保存还没来得及执行的命令吧,总得有个先来后到吧。而这个地方又会被多线程访问,得保证多线程访问可见性,操作的原子性。jdk中提供的BlockingQueue就是为此而生的。BlockingQueue是一个接口,jdk中有很多他的实现。SingleThreadEventExcutor中提供的这个地方叫tasksQueue,类型是jdk中的LinkedBlockingQueue。上面代码中的addTask就是往tasksQueue中添加。
另外SingleThreadEventExcutor实现了ScheduledExecutorService 接口,支持执行定时任务。得有个地方存放定时任务信息。类中的实现是delayedTaskQueue,它是一个PriorityQueue ,也是一个BlockingQueue。不过它里面的元素不是按照先来后到的顺序存取的,而是按照各个元素的优先级判断的。
因为这个execute方法,是可以在外部线程调用,也可以在内部线程调用。也就是说外部成员可以给你下命令,内部成员也可以给你下命令。所以在上面的代码中先调用inEventLoop判断当前下命令的是外部的还是内部的。
如果是外部的,先确定内部线程是否启动,没启动就先启动内部线程同时给自己加一个定时清理的定时任务。这个从下面的代码中可以看出
private void startThread() { synchronized (stateLock) { if (state == ST_NOT_STARTED) { state = ST_STARTED; delayedTaskQueue.add(new ScheduledFutureTask<Void>( this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null), ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL)); thread.start(); } } } private final class PurgeTask implements Runnable { @Override public void run() { Iterator<ScheduledFutureTask<?>> i = delayedTaskQueue.iterator(); while (i.hasNext()) { ScheduledFutureTask<?> task = i.next(); if (task.isCancelled()) { i.remove(); } } } }
添加命令完成了,下面就看如何去执行命令了,这个就需要分析下内部线程的执行逻辑了。SingleThreadEventExecutor类中有一个实力变量Thread,它引用的就是当前Executor所拥有的那个thread对象。
thread = threadFactory.newThread(new Runnable() { @Override public void run() { boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { // 更改状态 if (state < ST_SHUTTING_DOWN) { state = ST_SHUTTING_DOWN; } // Check if confirmShutdown() was called at the end of the loop. // 这里说明在try块中调用的SingleThreadEventExecutor.this.run();中在方法结束之前必须调用confirmShutdown方法,这个在其之类实现的run方法中得到验证 if (success && gracefulShutdownStartTime == 0) { logger.error( "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + "before run() implementation terminates."); } try { // Run all remaining tasks and shutdown hooks. // 确保tasksQueue和shutDownHooks中的runable都处理完成了,这里的处理完成有可能是超时了 for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { synchronized (stateLock) { state = ST_TERMINATED; } //释放信号量,使用Semaphore(0)来让另一个线程一直等待,知道内部线程调用了release() threadLock.release(); if (!taskQueue.isEmpty()) { logger.warn( "An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } terminationFuture.setSuccess(null); } } } } });
上面Thread内部run方法执行的是SingleThreadEventExecutor.this.run(),而这个run方法是一个抽象方法,留给了子类去实现了。不过可以肯定的是子类的run方法是不断的去tasksQueue中取出task去执行。现在重点分析下finally块中的代码。
1、首先更改状态为正在关闭状态。
2、如果子类中的run方法中的loop执行成功了,就得先调用confirmShutdown,确认任务队列中的任务是否都已经被执行了。
3、然后还得再次确认下任务队列中是否已被执行完毕,因为在关闭的过程中外部也是能添加任务的。
4、最终执行清理工作,更改状态为已关闭,释放信号量。
5、如果这个时候还是有任务没执行完,那也只能是无奈了,记个log吧
6、更新整个关闭过程为success
再分析下confirmShutdown,看看是如何保证所有的task执行完成的呢
protected boolean confirmShutdown() { // 如果state状态 state < ST_SHUTTING_DOWN则直接return false if (!isShuttingDown()) { return false; } // 这个方法必须从内部调用,从修饰符 protected也可以看出 if (!inEventLoop()) { throw new IllegalStateException("must be invoked from an event loop"); } // 取消所有的定时任务 cancelDelayedTasks(); if (gracefulShutdownStartTime == 0) { // 标记shutdown处理的开始时间 gracefulShutdownStartTime = ScheduledFutureTask.nanoTime(); } // 运行tasksQueue或者shutdownHooks中的所有Runnable都处理完成 if (runAllTasks() || runShutdownHooks()) { //分析了下源码,isShutdown()这个只能是在外部线程调用了shutdown()接口的时候才会有可能成为true //但是现在这个方法已经@Deprecated,所以这个if块是不会进入的 if (isShutdown()) { // shutdown 成功,没有更多的runnable需要执行 return true; } // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period. wakeup(true); return false; } final long nanoTime = ScheduledFutureTask.nanoTime(); // runAllTasks() 或者runAllTasks() + runShutdownHooks()方法执行时间操作了最大限制 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) { return true; } // 现在时间与上个任务执行完成的时间差小于quietPeriod时间,继续检测 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) { // Check if any tasks were added to the queue every 100ms. // TODO: Change the behavior of takeTask() so that it returns on timeout. wakeup(true); try { //内部线程sleep 100ms Thread.sleep(100); } catch (InterruptedException e) { // Ignore } return false; } // No tasks were added for last quiet period - hopefully safe to shut down. // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.) return true; }
后面再看看这个类中的一些内部变量
private static final Runnable WAKEUP_TASK = new Runnable() { @Override public void run() { // Do nothing. } };
这个WAKEUP_TASK什么也不做,为啥取名wakeup呢?我看源码也没太明白。有人理解的给解释下吧
private final Semaphore threadLock = new Semaphore(0);
threadLock的内部permits设置为0,也就是说acquire()永远获取不到permit,会一直被阻塞着。那有什么用呢?另一种实现wait()/notify()吧
值得注意的是类中是如何来控制定时任务的呢?秘密在这个方法中
private void fetchFromDelayedQueue() { long nanoTime = 0L; for (;;) { ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek(); if (delayedTask == null) { break; } if (nanoTime == 0L) { nanoTime = ScheduledFutureTask.nanoTime(); } if (delayedTask.deadlineNanos() <= nanoTime) { delayedTaskQueue.remove(); taskQueue.add(delayedTask); } else { break; } } }
ScheduledFutureTask类中有个变量记录这个类被加载进内存中的时间
private static final long START_TIME = System.nanoTime(); static long nanoTime() { return System.nanoTime() - START_TIME; } // 返回到期时间,到期时间在构造函数中指定了 public long deadlineNanos() { return deadlineNanos; } // 这个方法决定了排序的优先级 @Override public int compareTo(Delayed o) { if (this == o) { return 0; } ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o; long d = deadlineNanos() - that.deadlineNanos(); if (d < 0) { return -1; } else if (d > 0) { return 1; } else if (id < that.id) { return -1; } else if (id == that.id) { throw new Error(); } else { return 1; } }
所以fetchFromDelayedQueue()方法的逻辑就是先取出即将到期的task,判断是否已经到期,若已经到期就加入到tasksQueue中,等到被执行。
先分析到这里,后面在补上。
有什么不正确的,也请大家指正。
推荐阅读
-
Netty 4 源码分析——EventExecutor
-
commons-logging + log4j源码分析
-
Netty源码分析 (四)----- ChannelPipeline
-
Netty源码分析 (三)----- 服务端启动源码分析
-
SpringMVC源码分析4:DispatcherServlet如何找到正确的Controller
-
Netty源码分析之ChannelPipeline(二)—ChannelHandler的添加与删除
-
commons-logging + log4j源码分析
-
Netty源码分析 (十)----- 拆包器之LineBasedFrameDecoder
-
netty之NioEventLoopGroup源码分析二
-
Netty源码分析 (七)----- read过程 源码分析