欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty源码分析——EventLoopGroup建立

程序员文章站 2022-03-25 17:20:45
在上一篇中的简单代码中开头的两行代码是 1 EventLoopGroup boss = new NioEventLoopGroup(1); 2 EventLoopGroup worker = new NioEventLoopGroup(); 服务端应用要创建首先要创建两个EventLoopGroup ......

在上一篇中的简单代码中开头的两行代码是

1         eventloopgroup boss = new nioeventloopgroup(1);
2         eventloopgroup worker = new nioeventloopgroup();

服务端应用要创建首先要创建两个eventloopgroup ,为什么是两个eventloopgroup ,这就涉及netty的核心设计思想——主从reactor模型。这里不展开说,内容比较多,可以自行学习。每个eventloopgroup 可以包含一个或多个eventloop,每一个eventloop简单来说的就是一个死循环的线程,这个线程循环监听一个通道的所有事件,这个模式下面其他流程中也会反复提到。下面开始追代码。

说明一下,上面的两个构造函数一个有参数一个无参数,无参数的通过获取本机cpu的核数(注意是核数不是个数,例如我的电脑4个cpu,每个cpu都是双核的,所以无参的时候应该创建8个eventloop)来决定创建线程的个数,有参数就是指定在这个eventloopgroup 中创建具体个数的eventloop。

  • 一步一步追nioeventloopgroup的构造函数可以发现,代码中,如果指定nthreads 就是用指定的,例如第一个构造函数中那样,不然就是用default_event_loop_threads,default_event_loop_threads在本类的静态代码块中初始化
  protected multithreadeventloopgroup(int nthreads, threadfactory threadfactory, object... args) {
        super(nthreads == 0 ? default_event_loop_threads : nthreads, threadfactory, args);
    }
    static {
        default_event_loop_threads = math.max(1, systempropertyutil.getint(
                "io.netty.eventloopthreads", runtime.getruntime().availableprocessors() * 2));  //获取本机cup核数

        if (logger.isdebugenabled()) {
            logger.debug("-dio.netty.eventloopthreads: {}", default_event_loop_threads);
        }
    }
  • 再追一步会看到,defaulteventexecutorchooserfactory.instance使用默认的选择器工厂,在下一步中会提到,这个东东决定了在eventloopgroup 执行next()时候,也就是切换到下一个eventloop的时候,是以怎样的算法找到下一个eventloop,默认就是按照eventloop数组中的顺序查找下一个的
    protected multithreadeventexecutorgroup(int nthreads, executor executor, object... args) {
        this(nthreads, executor, defaulteventexecutorchooserfactory.instance, args);
    }
  • 再追下一步构造函数,真正干活的来了,注意!!!在主要代码会有注释
        protected multithreadeventexecutorgroup(int nthreads, executor executor,
                                                eventexecutorchooserfactory chooserfactory, object... args) {
            if (nthreads <= 0) {
                throw new illegalargumentexception(string.format("nthreads: %d (expected: > 0)", nthreads));
            }
    
            if (executor == null) {
                executor = new threadpertaskexecutor(newdefaultthreadfactory());   //创建默认的线程执行器
            }
    
            children = new eventexecutor[nthreads];  //创建保存eventloop的数组
    
            for (int i = 0; i < nthreads; i ++) {
                boolean success = false;
                try {
                    children[i] = newchild(executor, args);   //循环填充数组,在这里实例化每一个eventloop
                    success = true;
                } catch (exception e) {
                    // todo: think about if this is a good exception type
                    throw new illegalstateexception("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdowngracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            eventexecutor e = children[j];
                            try {
                                while (!e.isterminated()) {
                                    e.awaittermination(integer.max_value, timeunit.seconds);
                                }
                            } catch (interruptedexception interrupted) {
                                // let the caller handle the interruption.
                                thread.currentthread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    
            chooser = chooserfactory.newchooser(children);  //将创建好的eventloop数组注册到选择器工厂
    
            final futurelistener<object> terminationlistener = new futurelistener<object>() {
                @override
                public void operationcomplete(future<object> future) throws exception {
                    if (terminatedchildren.incrementandget() == children.length) {
                        terminationfuture.setsuccess(null);
                    }
                }
            };
    
            for (eventexecutor e: children) {
                e.terminationfuture().addlistener(terminationlistener);
            }
    
            set<eventexecutor> childrenset = new linkedhashset<eventexecutor>(children.length);
            collections.addall(childrenset, children);
            readonlychildren = collections.unmodifiableset(childrenset);
        }
    Netty源码分析——EventLoopGroup建立
    eventexecutor是一个接口,是eventloop接口的子接口,当children[i] = newchild(executor, args)执行时,实际上执行的是nioeventloopgroup中重写的,可以发现创建的nioeventloop的实例,而nioeventloop间接实现了eventloop
        @override
        protected eventloop newchild(executor executor, object... args) throws exception {
            return new nioeventloop(this, executor, (selectorprovider) args[0],
                ((selectstrategyfactory) args[1]).newselectstrategy(), (rejectedexecutionhandler) args[2]);
        }

    至此eventloopgroup以及包含的多个eventloop创建完成,这里只是在eventloopgroup中创建了多个nioeventloop,nioeventloop并没有run起来,需要在注册通道后才能run起来监控通道的事件和执行task。后面章节还会介绍nioeventloop类中的在其他方法。ok,这个部分就到这。