欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty 4 源码分析——EventExecutor

程序员文章站 2023-12-29 14:13:04
...
先从EventExecutor开始,因为它是一个很基础的工具类,是对I/O线程的包装。先了解下它的源码会对后面的分析有更好的理解。

Netty 4 源码分析——EventExecutor
            
    
    博客分类: Netty netty 

先看下EventExecutor的类关系图,这里只是简单的画出了类和接口的继承和实现关系,还有其他的聚合关系没有画出来,为的是便于分析思路的清晰。

说到Executor,很容易联想到jdk中 java.util.concurrent.Executor 接口,这个接口非常简单,就一个方法
void execute(Runnable command);

从方法签名上就能看出这个是为了支持异步模式的。command表示一个命令。当前线程就是命令者角色,Executor内部的去运行Runnable的线程就是执行者。这里没有提供明确的地方供命令者去取得命令的执行结果。

ExecutorService 继承了Executor 接口,增加了对自身生命周期管理的方法,同时提供了一个Future给命令者去获取命令的执行结果。

ScheduledExecutorService 继承了ExecutorService接口,增加了对定时任务的支持。

EventExecutorGroup 继承了ScheduledExecutorService接口,对原来的ExecutorService的关闭接口提供了增强,提供了优雅的关闭接口。从接口名称上可以看出它是对多个EventExecutor的集合,提供了对多个EventExecutor的迭代访问接口。

EventExecutor 继承EventExecutorGroup 看着这个关系真心有些纠结啊。不过细想下还是能理解的。A是B中的一员,但是A也能迭代访问B中的其他成员。这个继承关系支持了迭代访问这个行为。自然的他提供了一个parent接口,来获取所属的EventExecutorGroup 。另外提供了inEventLoop 方法支持查询某个线程是否在EventExecutor所管理的线程中。还有其他一些创建Promise和Future的方法。

AbstractEventExecutor 只是对EventExecutor中某些方法的简单实现

下面重点分析下非常有意思SingleThreadEventExecutor,它也是个抽象类,但是提供了很多重要方法的实现。弄清楚了这个对整个EventExecutor体系都非常有帮助。从类名上可知里面只有一个线程,先搞清楚一个线程的处理过程再理解多线程的就轻松些了。

先从execute方法入口分析
@Override
public void execute(Runnable task) {
	if (task == null) {
		throw new NullPointerException("task");
	}

	boolean inEventLoop = inEventLoop();
	if (inEventLoop) {
		addTask(task);
	} else {
		startThread();
		addTask(task);
		if (isShutdown() && removeTask(task)) {
			reject();
		}
	}

	if (!addTaskWakesUp) {
		wakeup(inEventLoop);
	}
}

先不看这个源码,我们分析下,作为一个Executor,A让你执行命令A,B让你执行命令B。。。命令不是说执行就能执行的吧。总得有个地方保存还没来得及执行的命令吧,总得有个先来后到吧。而这个地方又会被多线程访问,得保证多线程访问可见性,操作的原子性。jdk中提供的BlockingQueue就是为此而生的。BlockingQueue是一个接口,jdk中有很多他的实现。SingleThreadEventExcutor中提供的这个地方叫tasksQueue,类型是jdk中的LinkedBlockingQueue。上面代码中的addTask就是往tasksQueue中添加。

另外SingleThreadEventExcutor实现了ScheduledExecutorService 接口,支持执行定时任务。得有个地方存放定时任务信息。类中的实现是delayedTaskQueue,它是一个PriorityQueue ,也是一个BlockingQueue。不过它里面的元素不是按照先来后到的顺序存取的,而是按照各个元素的优先级判断的。

因为这个execute方法,是可以在外部线程调用,也可以在内部线程调用。也就是说外部成员可以给你下命令,内部成员也可以给你下命令。所以在上面的代码中先调用inEventLoop判断当前下命令的是外部的还是内部的。
如果是外部的,先确定内部线程是否启动,没启动就先启动内部线程同时给自己加一个定时清理的定时任务。这个从下面的代码中可以看出
private void startThread() {
	synchronized (stateLock) {
		if (state == ST_NOT_STARTED) {
			state = ST_STARTED;
			delayedTaskQueue.add(new ScheduledFutureTask<Void>(
					this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
					ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
			thread.start();
		}
	}
}

private final class PurgeTask implements Runnable {
	@Override
	public void run() {
		Iterator<ScheduledFutureTask<?>> i = delayedTaskQueue.iterator();
		while (i.hasNext()) {
			ScheduledFutureTask<?> task = i.next();
			if (task.isCancelled()) {
				i.remove();
			}
		}
	}
}

添加命令完成了,下面就看如何去执行命令了,这个就需要分析下内部线程的执行逻辑了。SingleThreadEventExecutor类中有一个实力变量Thread,它引用的就是当前Executor所拥有的那个thread对象。
thread = threadFactory.newThread(new Runnable() {
	@Override
	public void run() {
		boolean success = false;
		updateLastExecutionTime();
		try {
			SingleThreadEventExecutor.this.run();
			success = true;
		} catch (Throwable t) {
			logger.warn("Unexpected exception from an event executor: ", t);
		} finally {
			// 更改状态
			if (state < ST_SHUTTING_DOWN) {
				state = ST_SHUTTING_DOWN;
			}

			// Check if confirmShutdown() was called at the end of the loop.
			// 这里说明在try块中调用的SingleThreadEventExecutor.this.run();中在方法结束之前必须调用confirmShutdown方法,这个在其之类实现的run方法中得到验证
			if (success && gracefulShutdownStartTime == 0) {
				logger.error(
						"Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
						SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
						"before run() implementation terminates.");
			}

			try {
				// Run all remaining tasks and shutdown hooks.
				// 确保tasksQueue和shutDownHooks中的runable都处理完成了,这里的处理完成有可能是超时了
				for (;;) {
					if (confirmShutdown()) {
						break;
					}
				}
			} finally {
				try {
					cleanup();
				} finally {
					synchronized (stateLock) {
						state = ST_TERMINATED;
					}
					//释放信号量,使用Semaphore(0)来让另一个线程一直等待,知道内部线程调用了release()
					threadLock.release();
					if (!taskQueue.isEmpty()) {
							logger.warn(
									"An event executor terminated with " +
									"non-empty task queue (" + taskQueue.size() + ')');
						}

						terminationFuture.setSuccess(null);
					}
				}
			}
		}
	});

上面Thread内部run方法执行的是SingleThreadEventExecutor.this.run(),而这个run方法是一个抽象方法,留给了子类去实现了。不过可以肯定的是子类的run方法是不断的去tasksQueue中取出task去执行。现在重点分析下finally块中的代码。
1、首先更改状态为正在关闭状态。
2、如果子类中的run方法中的loop执行成功了,就得先调用confirmShutdown,确认任务队列中的任务是否都已经被执行了。
3、然后还得再次确认下任务队列中是否已被执行完毕,因为在关闭的过程中外部也是能添加任务的。
4、最终执行清理工作,更改状态为已关闭,释放信号量。
5、如果这个时候还是有任务没执行完,那也只能是无奈了,记个log吧
6、更新整个关闭过程为success

再分析下confirmShutdown,看看是如何保证所有的task执行完成的呢
protected boolean confirmShutdown() {
	// 如果state状态 state < ST_SHUTTING_DOWN则直接return false
	if (!isShuttingDown()) {
		return false;
	}
	// 这个方法必须从内部调用,从修饰符 protected也可以看出
	if (!inEventLoop()) {
		throw new IllegalStateException("must be invoked from an event loop");
	}
	// 取消所有的定时任务
	cancelDelayedTasks();
	
	if (gracefulShutdownStartTime == 0) {
		// 标记shutdown处理的开始时间
		gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
	}
	// 运行tasksQueue或者shutdownHooks中的所有Runnable都处理完成
	if (runAllTasks() || runShutdownHooks()) {
		//分析了下源码,isShutdown()这个只能是在外部线程调用了shutdown()接口的时候才会有可能成为true
		//但是现在这个方法已经@Deprecated,所以这个if块是不会进入的
		if (isShutdown()) {
			// shutdown 成功,没有更多的runnable需要执行
			return true;
		}

		// There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period.
		wakeup(true);
		return false;
	}

	final long nanoTime = ScheduledFutureTask.nanoTime();
	// runAllTasks() 或者runAllTasks() + runShutdownHooks()方法执行时间操作了最大限制
	if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
		return true;
	}
	// 现在时间与上个任务执行完成的时间差小于quietPeriod时间,继续检测
	if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
		// Check if any tasks were added to the queue every 100ms.
		// TODO: Change the behavior of takeTask() so that it returns on timeout.
		wakeup(true);
		try {
			//内部线程sleep 100ms
			Thread.sleep(100);
		} catch (InterruptedException e) {
			// Ignore
		}

		return false;
	}

	// No tasks were added for last quiet period - hopefully safe to shut down.
	// (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
	return true;
}

后面再看看这个类中的一些内部变量
private static final Runnable WAKEUP_TASK = new Runnable() {
	@Override
	public void run() {
		// Do nothing.
	}
};

这个WAKEUP_TASK什么也不做,为啥取名wakeup呢?我看源码也没太明白。有人理解的给解释下吧
private final Semaphore threadLock = new Semaphore(0);

threadLock的内部permits设置为0,也就是说acquire()永远获取不到permit,会一直被阻塞着。那有什么用呢?另一种实现wait()/notify()吧

值得注意的是类中是如何来控制定时任务的呢?秘密在这个方法中
private void fetchFromDelayedQueue() {
	long nanoTime = 0L;
	for (;;) {
		ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
		if (delayedTask == null) {
			break;
		}

		if (nanoTime == 0L) {
			nanoTime = ScheduledFutureTask.nanoTime();
		}

		if (delayedTask.deadlineNanos() <= nanoTime) {
			delayedTaskQueue.remove();
			taskQueue.add(delayedTask);
		} else {
			break;
		}
	}
}

ScheduledFutureTask类中有个变量记录这个类被加载进内存中的时间
private static final long START_TIME = System.nanoTime();

static long nanoTime() {
	return System.nanoTime() - START_TIME;
}
// 返回到期时间,到期时间在构造函数中指定了
public long deadlineNanos() {
	return deadlineNanos;
}
// 这个方法决定了排序的优先级
@Override
public int compareTo(Delayed o) {
	if (this == o) {
		return 0;
	}

	ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
	long d = deadlineNanos() - that.deadlineNanos();
	if (d < 0) {
		return -1;
	} else if (d > 0) {
		return 1;
	} else if (id < that.id) {
		return -1;
	} else if (id == that.id) {
		throw new Error();
	} else {
		return 1;
	}
}


所以fetchFromDelayedQueue()方法的逻辑就是先取出即将到期的task,判断是否已经到期,若已经到期就加入到tasksQueue中,等到被执行。

先分析到这里,后面在补上。
有什么不正确的,也请大家指正。
  • Netty 4 源码分析——EventExecutor
            
    
    博客分类: Netty netty 
  • 大小: 23.7 KB
相关标签: netty

上一篇:

下一篇: