欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

netty源码分析(6)-NioEventLoop创建过程

程序员文章站 2022-04-23 11:49:36
...

前面几章,在我们分析的时候注册Selector相关代码的时候,提到过一部分NioEventLoop的创建过程。接下来详细分析。new NioEventLoopGroup();

    public NioEventLoopGroup(int nThreads, Executor executor) {
        //从jdk地层中获取了一个SelectorProvider
        this(nThreads, executor, SelectorProvider.provider());
    }

    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        //接着提供了一种默认的SelectStrategy工厂
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        //提供了一种拒绝执行的异常处理器
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        //如果没有指定线程数,那么就使用默认的值,通过断点发现DEFAULT_EVENT_LOOP_THREADS的值是8
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        // 提供了默认的EventExecutor选择器工厂   
        //该值为:new DefaultEventExecutorChooserFactory();
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }

最终走到了MultithreadEventExecutorGroup的构造方法。主要做了几件事

  1. new ThreadPerTaskExecutor(newDefaultThreadFactory())
  2. 根据线程数初始化EventExecutor数组,并提供足够的具体EventLoop,这里是NioEventLoop
  3. 初始化chooser,提供不同的事件执行的选择器,已选择EventExecutor数组种的EventLoop
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
       //省略代码

        if (executor == null) {
            //初始化executor : 每个任务的执行器
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        
        children = new EventExecutor[nThreads];
        for (int i = 0; i < nThreads; i ++) {
            //实例化具体的NioEventLoop
            children[i] = newChild(executor, args);
            
        }
        //初始化 EventExecutorChooserFactory.EventExecutorChooser chooser
        chooser = chooserFactory.newChooser(children);

         //省略相关代码
    }
  • new ThreadPerTaskExecutor(newDefaultThreadFactory())
    任务执行器给了一个默认的工厂,定义了该具体每个线程的名字,类似nioEventLoopGroup-2-1这样的线程名字,第一个数字在初始化的时候便定义了为poolId,第二个数字这是在执行newThread()的时候定义的nextIdThreadPerTaskExecutor还定义了execute方法。
public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        //调用工厂具体的执行方法执行 该task: command
        threadFactory.newThread(command).start();
    }
}

关于poolIdnextId我们发现前者是静态的,因此,每次new NioEventLoopGroup()的时候递增,因此代表线程池,而后者并非静态的,因此仅仅是调用递增而已。

 private static final AtomicInteger poolId = new AtomicInteger();
 private final AtomicInteger nextId = new AtomicInteger();

跟进最终调用的是DefaultThreadFactory#newThread(),并且发现,结果是返回了FastThreadLocalThread,该线程为netty自定义线程。执行的也是这类型的线程。因此们该对象每次执行任务(调用ThreadPerTaskExecutor#execute()),其实都创建了一个线程实体。

@Override
    public Thread newThread(Runnable r) {
        Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
       //省略代码
        return t;
    }

    protected Thread newThread(Runnable r, String name) {
        return new FastThreadLocalThread(threadGroup, r, name);
    }
  • 至于初始化chooser,线程选择器,前面章节《Selector注册》有提到过,其实就是分开两种策略去循环使用EventExecutor数组。

  • 调用newChild()初始化NioEventLoop的时候,这里创建并绑定了selector用于去轮询注册到它上面的一些连接。由此可见一个 selector 对应一个 eventLoop

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

值得注意的是,父类构造方法SingleThreadEventLoop初始化了tailTasks。该队列用于在外部线程执行一些netty任务的时候,判断是否在NioEventLoop中,如果不在的话,则放到该队列中去执行。讲这些任务放到一个线程去执行。

    private final Queue<Runnable> tailTasks;
    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
        tailTasks = newTaskQueue(maxPendingTasks);
    }
    
    //由于本例NioEventLoop重写了该方法,因此调用的子类
    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
        return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
    }

    @Override
    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
        // This event loop never calls takeTask()
        return PlatformDependent.newMpscQueue(maxPendingTasks);
    }

再看看super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);再初始化tailTasks之前还干了一件事:初始化taskQueue

    //成员变量
    private final Queue<Runnable> taskQueue;

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        //初始化taskQueue
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

taskQueue是在NioEventLoop启动后,直接存放任务的地方,存放eventLoop任务