欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

netty源码分析(一)

程序员文章站 2022-04-22 22:18:10
...

nio的源码都是和具体的操作系统底层的io模型有关,都是底层的东西,分析意义不大。
linux的底层支持以及几种io对比
比如nio就是linux的多路复用io模型,aio是信号驱动io模型。

先说结论:

  1. 一个EventLoopGroup,可以看作一个线程池,里面维护了一个EventLoop数组,一个EventLoop其实也是一个线程池,只不过EventLoop是一个单线程池而已,所以可以把EventLoop看做一个线程。

  2. Server端,主线程会使用一个EventLoop去启动一个线程去循环select,然后主线程注册

服务端:

public static void main(String[] args) {
        EventLoopGroup  bossstrap = new NioEventLoopGroup();
        EventLoopGroup  workerstrap = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossstrap, workerstrap)
                .channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new MyServerInitializer());
        try {
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossstrap.shutdownGracefully();
            workerstrap.shutdownGracefully();
        }
    }

从上往下分析:EventLoopGroup继承于MultithreadEventLoopGroup

   EventLoopGroup  bossstrap = new NioEventLoopGroup();

public NioEventLoopGroup() {
        this(0);
    }
public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }
public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }
 public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory);
    }
// 注意,这里,如果不传参,则默认的线程池大小为cpu线程数*2。
    private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
    }
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
        // 不传入executor,则executor为ThreadPerTaskExecutor,这个线程池其实
//是给单个EventLoop用的,ThreadPerTaskExecutor线程池execute一个任务,
//启动一个线程池,但是其实Eventloop只会运行一次ThreadPerTaskExecutor的execute方法,也就是单线程运行。
// 这个executor是后面创建EventLoop的传参,也就是EventLoop里的线程池是这个线程池
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        // 维护了一个EventExecutor数组,EventLoop是EventExecutor的子接口,
// 所以其实就是维护了一个EventLoop数组
        children = new EventExecutor[nThreads];
         // 根据数组长度是否是2次方,创建线程池选择线程的选择器,
        // 了解就好
        if (isPowerOfTwo(children.length)) {
            chooser = new PowerOfTwoEventExecutorChooser();
        } else {
            chooser = new GenericEventExecutorChooser();
        }
        // 循环创建EventLoop
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
            /**
newChild创建EventLoop,调用的是NioEventLoopGroup的方法
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy());
    }
*/
// 把前面提到的ThreadPerTaskExecutor线程池传过去了,
//ThreadPerTaskExecutor其实是EventLoop维护的,用来真正启动线程的
// 这里很重要,这里生成eventLoop,EventLoop里面维护了select,select不是淡
// 出的nio的select,而是反射修改了nio select的成员变量selectkey,使得后面,
// 直接select()方法,netty能够直接获取到selectKey
// 看下文
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
        // 不成功,则关掉资源,不看
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
    // 下面添加结束监听器,暂时忽略
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

EventLoop的构造。

// 反射修改select的成员变量selectedKeys为netty的NioEventLoop的
// selectedKeySet成员变量,这样Nio的select执行select堵塞方法的时候,
// NioEventLoop的selectedKeySet可以直接获取到selectKey。
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy) {
        super(parent, executor, false);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        selector = openSelector();
        selectStrategy = strategy;
    }

private Selector openSelector() {
        final Selector selector;
        try {
            selector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }
        if (DISABLE_KEYSET_OPTIMIZATION) {
            return selector;
        }
        try {
            SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
            Class<?> selectorImplClass =
                    Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
            // Ensure the current selector implementation is what we can instrument.
            if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
                return selector;
            }
            // 反射修改select的成员变量selectedKeys为netty的NioEventLoop的selectedKeySet成员变量,这样Nio的select执行select堵塞方法的时候,NioEventLoop的selectedKeySet可以直接获取到selectKey。
            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

            selectedKeysField.setAccessible(true);
            publicSelectedKeysField.setAccessible(true);

            selectedKeysField.set(selector, selectedKeySet);
            publicSelectedKeysField.set(selector, selectedKeySet);

            selectedKeys = selectedKeySet;
            logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
        } catch (Throwable t) {
            selectedKeys = null;
            logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
        }

        return selector;
    }

总结:new EventLoopGroup其实里面维护了一个EventLoop数组,默认数组大小为,cpu线程数*2,初始化了一下EventLoop数组,每个EventLoop维护了一个ThreadPerTaskExecutor线程池。

new ServerBootstrap()构造方法里什么都没做。

        ServerBootstrap serverBootstrap = new ServerBootstrap();
    public ServerBootstrap() { }

serverBootstrap.group(bossstrap, workerstrap)就是给ServerBootstrap设置两个成员变量,group和child,group为一个bossstrap线程池,child为一个workerstrap线程池。

serverBootstrap.group(bossstrap, workerstrap);

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }
public B group(EventLoopGroup group) {
        if (group == null) {
            throw new NullPointerException("group");
        }
        if (this.group != null) {
            throw new IllegalStateException("group set already");
        }
        this.group = group;
        return (B) this;
    }

serverBootstrap.channel(NioServerSocketChannel.class)就是
初始化serverBootstrap的channelFactory变量,指定了工厂创建channel的
类型为传入的参数NioServerSocketChanne
l

serverBootstrap.group(bossstrap, workerstrap)
                .channel(NioServerSocketChannel.class);


public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        // 初始化serverBootstrap的channelFactory变量,指定了工厂创建channel的
// 类型为传入的参数NioServerSocketChannel
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }
public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
        return channelFactory((ChannelFactory<C>) channelFactory);
    }
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory");
        }
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }

        this.channelFactory = channelFactory;
        return (B) this;
    }

serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new MyServerInitializer());
指定了serverBootstrap的成员变量handler为传入的handler,指定了serverBootstrap的成员变量childHandler 为传入的childHandler

serverBootstrap.handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new MyServerInitializer());

// 指定了serverBootstrap的成员变量handler为传入的handler
public B handler(ChannelHandler handler) {
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
        return (B) this;
    }
//  指定了serverBootstrap的成员变量childHandler 为传入的childHandler 
 public ServerBootstrap childHandler(ChannelHandler childHandler) {
        if (childHandler == null) {
            throw new NullPointerException("childHandler");
        }
        this.childHandler = childHandler;
        return this;
    }

总结:
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();之前的一系列操作,其实都在初始化一些变量和类,做准备工作而已。

EventLoopGroup  bossstrap = new NioEventLoopGroup();
        EventLoopGroup  workerstrap = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossstrap, workerstrap)
                .channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new MyServerInitializer());

转载于:https://www.jianshu.com/p/cfdd7f821767