netty源码分析(6)-NioEventLoop创建过程
前面几章,在我们分析的时候注册Selector相关代码的时候,提到过一部分NioEventLoop
的创建过程。接下来详细分析。new NioEventLoopGroup();
public NioEventLoopGroup(int nThreads, Executor executor) {
//从jdk地层中获取了一个SelectorProvider
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
//接着提供了一种默认的SelectStrategy工厂
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
//提供了一种拒绝执行的异常处理器
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
//如果没有指定线程数,那么就使用默认的值,通过断点发现DEFAULT_EVENT_LOOP_THREADS的值是8
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
// 提供了默认的EventExecutor选择器工厂
//该值为:new DefaultEventExecutorChooserFactory();
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
最终走到了MultithreadEventExecutorGroup
的构造方法。主要做了几件事
new ThreadPerTaskExecutor(newDefaultThreadFactory())
- 根据线程数初始化
EventExecutor
数组,并提供足够的具体EventLoop
,这里是NioEventLoop
- 初始化
chooser
,提供不同的事件执行的选择器,已选择EventExecutor数组种的EventLoop
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
//省略代码
if (executor == null) {
//初始化executor : 每个任务的执行器
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
//实例化具体的NioEventLoop
children[i] = newChild(executor, args);
}
//初始化 EventExecutorChooserFactory.EventExecutorChooser chooser
chooser = chooserFactory.newChooser(children);
//省略相关代码
}
-
new ThreadPerTaskExecutor(newDefaultThreadFactory())
任务执行器给了一个默认的工厂,定义了该具体每个线程的名字,类似nioEventLoopGroup-2-1
这样的线程名字,第一个数字在初始化的时候便定义了为poolId
,第二个数字这是在执行newThread()
的时候定义的nextId
。ThreadPerTaskExecutor
还定义了execute
方法。
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
//调用工厂具体的执行方法执行 该task: command
threadFactory.newThread(command).start();
}
}
关于poolId
和nextId
我们发现前者是静态的,因此,每次new NioEventLoopGroup()
的时候递增,因此代表线程池,而后者并非静态的,因此仅仅是调用递增而已。
private static final AtomicInteger poolId = new AtomicInteger();
private final AtomicInteger nextId = new AtomicInteger();
跟进最终调用的是DefaultThreadFactory#newThread()
,并且发现,结果是返回了FastThreadLocalThread
,该线程为netty自定义线程。执行的也是这类型的线程。因此们该对象每次执行任务(调用ThreadPerTaskExecutor#execute()
),其实都创建了一个线程实体。
@Override
public Thread newThread(Runnable r) {
Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
//省略代码
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
至于初始化
chooser
,线程选择器,前面章节《Selector注册》有提到过,其实就是分开两种策略去循环使用EventExecutor
数组。调用
newChild()
初始化NioEventLoop
的时候,这里创建并绑定了selector
用于去轮询注册到它上面的一些连接。由此可见一个 selector 对应一个 eventLoop
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
值得注意的是,父类构造方法SingleThreadEventLoop
初始化了tailTasks
。该队列用于在外部线程执行一些netty任务的时候,判断是否在NioEventLoop
中,如果不在的话,则放到该队列中去执行。讲这些任务放到一个线程去执行。
private final Queue<Runnable> tailTasks;
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
}
//由于本例NioEventLoop重写了该方法,因此调用的子类
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return PlatformDependent.newMpscQueue(maxPendingTasks);
}
再看看super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
再初始化tailTasks
之前还干了一件事:初始化taskQueue
//成员变量
private final Queue<Runnable> taskQueue;
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
//初始化taskQueue
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
taskQueue
是在NioEventLoop
启动后,直接存放任务的地方,存放eventLoop
任务
推荐阅读
-
Netty源码分析 (七)----- read过程 源码分析
-
netty中的发动机--EventLoop及其实现类NioEventLoop的源码分析
-
Bootstrap初始化过程源码分析--netty客户端的启动
-
Netty中NioEventLoopGroup的创建源码分析
-
MyBatis源码分析之——配置解析创建SqlSessionFactory的过程
-
Netty源码分析 (八)----- write过程 源码分析
-
Netty源码分析 (六)----- 客户端接入accept过程
-
Spring Bean的创建初始化过程–源码分析
-
Netty源码分析 (七)----- read过程 源码分析
-
netty中的发动机--EventLoop及其实现类NioEventLoop的源码分析