欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty源码分析(二):Reactor模型在Netty中的应用

程序员文章站 2022-06-27 16:06:46
Netty源码分析主要分两部分:基础知识源码分析服务端测试代码:总体分为三步:初始化+注册+绑定其中bossGroup和workerGroup就是基于主从Reactor模型的两个路由,bossGroup主要负责管理链接,workerGroup主要负责成功链接后链路上的数据读写。EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); Ser...

Netty源码分析主要分两部分:

基础知识

Netty中的Reactor主从模型

服务端测试代码:

总体分为三步:初始化+注册+绑定
其中bossGroup和workerGroup就是基于主从Reactor模型的两个路由,bossGroup主要负责管理链接,workerGroup主要负责成功链接后链路上的数据读写。

EventLoopGroup bossGroup = new NioEventLoopGroup(); 
EventLoopGroup workerGroup = new NioEventLoopGroup(); 
ServerBootstrap sb = new ServerBootstrap();
        sb.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sh) throws Exception {
                        sh.pipeline()
                                .addLast(new RpcDecoder(RpcRequest.class)) //解码
                                .addLast(new RpcEncoder(RpcResponse.class)) //编码
                                .addLast(new ServerHandler());
                    }
                });
        ChannelFuture future = sb.bind(port).sync();
        if (future.isSuccess()) {
            System.out.println("服务端启动成功");
        } else {
            System.out.println("服务端启动失败");
            future.cause().printStackTrace();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
        future.channel().closeFuture().sync();

1. NioEventLoopGroup的初始化

简单总结一下,共三步骤:创建线程池、创建Reactor路由组、给每个执行器设置监听。

/**nEventLoops:     线程池大小默认大小是cpu核数*2
  *executor:        线程池,netty5.0版本开始使用的是ForkJoinPool线程池,之前版本用的是手动版线程池,就是来一个就new一个thread
  *selectorProvider:selector选择器,windows中jdk默认使用的是WindowsSelectorProvider选择器,通过调用openServerSocketChannel方法创建一个ServerSocketChannelImpl类并在里面openPiple建立通信管道,后面在介绍。
 */
public NioEventLoopGroup(int nEventLoops, Executor executor, final SelectorProvider selectorProvider) {
    super(nEventLoops, executor, selectorProvider);
}
//初始化NioEventLoopGroup最终到了这一步,在这里完成创建工作。
(1)、第一步:如果没有,则创建线程池,大小是nEventExecutors,这个线程池主要工作是用来(待补充);netty5.0版本开始使用的是ForkJoinPool线程池,之前版本有single版本的,还有的别的记不得了
(2)、第二步:创建EventExecutor执行组,这个就相当于MainReactor,里面是NioEventLoop的集合。
根据源码分析一中的Reactor相关知识回一下,IO多路复用机制需要serverSocketChannel和Selector,然后设置serverSocketChannel为非阻塞并注册到selector中去,所以这个serverSocketChannel就是一个socket,然后通过selector.select()方法开启监听。这些工作都在NioEventLoop中执行。
(3)、第三步:给每个NioEventLoop设置Future监听器。
private MultithreadEventExecutorGroup(int nEventExecutors, Executor executor, boolean shutdownExecutor, Object... args) {
    //......nEventExecutors小于0抛出异常......
    if (executor == null) {
        executor = newDefaultExecutorService(nEventExecutors); (1)
        shutdownExecutor = true;
    }
    children = new EventExecutor[nEventExecutors]; (2)
    if (isPowerOfTwo(children.length)) {
        chooser = new PowerOfTwoEventExecutorChooser();
    } else {
        chooser = new GenericEventExecutorChooser();
    }
    for (int i = 0; i < nEventExecutors; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            //.....
        } finally {
            //.....如果创建不成功,优雅关闭线程池.....
        }
    }
    final boolean shutdownExecutor0 = shutdownExecutor;
    final Executor executor0 = executor;
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
                if (shutdownExecutor0) {
                    ((ExecutorService) executor0).shutdown();
                }
            }
        }
    };
    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener); (3)
    }
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

2. ServerBootstrap的初始化

bootstrap简单的说就是四步骤:创建socket,然后初始化,注册,绑定。

ChannelFuture future = sb.bind(port).sync();
重头戏就是这个doBind方法。
private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();......1......)
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
    if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);......2......return promise;
    } else {
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    promise.setFailure(cause);
                } else {
                    promise.executor = channel.eventLoop();
                }
                doBind0(regFuture, channel, localAddress, promise);
            }
        });
        return promise;
    }
}

2.1、initAndRegister()

这个方法分为三步,创建socket,然后初始化,最后注册,分三个小节讲

ChannelFuture initAndRegister() {1)这个方法就是通过反射获取ServerBoostrap中配置的channel:NioServerSocketChannel
    final Channel channel = channelFactory().newChannel();
    try {2)创建完socket之后,然后初始化一下
        init(channel);
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }3)初始化完了之后,然后注册一下
    ChannelFuture regFuture = group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

2.1.1 创建NioServerSocketChannel

  1. 创建socket,然后设置socket兴趣点和非阻塞
  2. 创建unsafe用来做io操作
  3. 创建一个pipeline双向链表
  4. 然后需要提供一个对外暴露的config
//在构造中创建ServerSocketChannel,openServerSocketChannel其实就是创建了一个socket
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        return provider.openServerSocketChannel();
    } catch (IOException e) {
    }
}
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
//然后设置OP_ACCEPT兴趣点,并对外提供config
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;//熟悉的味道,设置兴趣点
    try {
        ch.configureBlocking(false);//熟悉的味道,设置socket为非阻塞
    } catch (IOException e) {
    ........
    }
}
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = DefaultChannelId.newInstance();//最后创建一个id
    unsafe = newUnsafe();//最后创建一个unsafa,就是NioMessageUnsafe(),这个很重要,channel的注册,绑定,链接,读写,也就是socket的相关io都是通过这个类完成。
    pipeline = new DefaultChannelPipeline(this);//这个是重点,创建pipe通道,这个是双向链表。
}
//这个从这个pipeline的构造中可以看出来,这个是双向链表,一头一尾;
    DefaultChannelPipeline(AbstractChannel channel) {
        if (channel == null) {
            throw new NullPointerException("channel");
        }
        this.channel = channel;

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

2.1.2 初始化NioServerSocketChannel

初始化就是做一些socket的配置工作,如设置配置、设置属性

@Override
void init(Channel channel) throws Exception {
//设置socket的一些配置,详细可以查看:https://blog.csdn.net/qq_28198181/article/details/82152338 这个介绍很详细
//常用的比如AUTO_READ、SO_KEEPALIVE、SO_REUSEADDR、SO_LINGER、SO_BACKLOG、SO_TIMEOUT、TCP_NODELAY 
    final Map<ChannelOption<?>, Object> options = options();
    synchronized (options) {
        channel.config().setOptions(options);
    }
    final Map<AttributeKey<?>, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
//这里的pipeline在NioServerSocketChannel初始化的时候就已经创建了pipeline = new DefaultChannelPipeline(this);是一个双向链表,管道里添加的是handler。
    ChannelPipeline p = channel.pipeline();
    if (handler() != null) {
        p.addLast(handler()); //这个添加的是MainReactor中的handler
    }
//添加完MainReactor之后,然后将Acceptor这个handler加入到pipe中去,
//同时,将subReactor的一系列配置都添加到Accept中去,这样当Accept获取到有效链接请求之后,就能直接将channel注册到SubReactor中去了,这个思路跟基础知识一中的主从Reactor实现思路是一样的。
    final EventLoopGroup currentChildGroup = childGroup; //这个是worker的Reactor
    final ChannelHandler currentChildHandler = childHandler; //这个是SubReactor中的handler
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new ServerBootstrapAcceptor(
                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
    });
}

2.1.3 注册NioServerSocketChannel

  1. 关于注册先整理一下思路:根据基础知识中的Reactor模型,首先创建一个线程池,将channel线注册到MianReactor中去专门用来监听客户端的请求事件,然后将有效请求的channel注册到SubReactor中的子线程中去处理IO读写操作。注册结束的标志就是将ServerSocketChannel注册到Selector中去。
    注册过程跳转很长,原因就是专门的人做专门的事,首先注册需要在MainReactor路由中统筹管理,就像大老板,他将活分给某个部门的部门经理,也就是NioEventLoop(chooser.next())去执行,这个部门经理不干太细的活,像注册、绑定、连接、读写这些细活就派给了小员工去完成,也就是unSafa这个类。
  2. 注册结束后,将handler都填充到pipeline中去
//这里注意的是group()获取的是MainReactor
ChannelFuture regFuture = group().register(channel);//通过2.1中的regist开始注册

//调用NioEventLoopGroup中的父类MultithreadEventLoopGroup中的register
//这里的next其实就是通过chooser选择器从NioEventLoop数组中获取一个loop用来执行注册事件。
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

//PowerOfTwoEventExecutorChooser选择器,位运算取余,效率更高,需要满足2的N次幂
children[childIndex.getAndIncrement() & children.length - 1];
//GenericEventExecutorChooser选择器,摩除取余,任意数取余,效率低点
children[Math.abs(childIndex.getAndIncrement() % children.length)];

//NioEventLoop中的注册方法
public ChannelFuture register(Channel channel, ChannelPromise promise) {
    channel.unsafe().register(this, promise);
    return promise;
}

//最后通过unsafa完成注册工作,上面说过,注册、绑定、链接、读写都在这个类中完成。
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
		........判空+是否注册过...........
        if (AbstractChannel.this.eventLoop == null) {
            AbstractChannel.this.eventLoop = new PausableChannelEventLoop(eventLoop);
        } else {
            AbstractChannel.this.eventLoop.unwrapped = eventLoop;
        }
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
            //第一次进来走这里,调用NioEventLoop中的自定义execute方法将register作为一个task任务添加到LinkedBlockingQueue队列中
            //然后先开启select.select(),同时开启死循环,不断调用run方法,执行register0任务。
                eventLoop.execute(new OneTimeTask() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
            ........
            }
        }
}
//接着调用register0();这个注册工作主要做了两件事。
1、执行doRegister方法:终于看到了这句非常熟悉而又亲切的serverSocketChannel注册到selector的语句,到这里,注册工作就算完成了。
2、执行fireChannelRegistered()方法完成pipeline的handler填充工作
private void register0(ChannelPromise promise) {
       try {
           if (!promise.setUncancellable() || !ensureOpen(promise)) {
               return;
           }
           boolean firstRegistration = neverRegistered;
           doRegister();1)
           neverRegistered = false;
           registered = true;
           eventLoop.acceptNewTasks();
           safeSetSuccess(promise);
           pipeline.fireChannelRegistered();2if (firstRegistration && isActive()) {
               pipeline.fireChannelActive();
           }
       } catch (Throwable t) {
       		.............
       }
}
// 首先看1的注册工作,罗里吧嗦调用了一堆,最后还是调用了这句熟悉又简练的语句来结束注册工作
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
			........这里抛出异常..........
        }
    }
}
//注册完了之后,然后往pipeLine管道中添加handler事件,从head开始,通过next.fireChannelRegistered这个责任链模式依次寻找下一个节点执行其中的channelRegistered()方法
public ChannelPipeline fireChannelRegistered() {
    head.fireChannelRegistered();
    return this;
}
public ChannelHandlerContext fireChannelRegistered() {
    AbstractChannelHandlerContext next = findContextInbound();//其实就是找到head.next且标志位是MASKGROUP_INBOUND的节点
    next.invoker().invokeChannelRegistered(next); //这个invoker的角色就相当于Android系统的instrumentation的角色,这里负责调用handler中的一些方法。
    --------->invokeChannelRegisteredNow(ctx);
    --------->ctx.handler().channelRegistered(ctx);
    return this;
}
//如果channelRegister发现handler是ChannelInitializer,那么就执行initChannel()方法,将里面的handler添加到pipeLine中,然后继续分发责任链fireChannelRegistered。
//如果是ChannelHandlerAdapter就继续分发fireChannelRegistered(),知道添加tail结束
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ChannelPipeline pipeline = ctx.pipeline();
        boolean success = false;
        try {
            initChannel((C) ctx.channel());
            pipeline.remove(this);
            ctx.fireChannelRegistered();
            success = true;
        } catch (Throwable t) {
            logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
        } finally {
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
            if (!success) {
                ctx.close();
            }
        }
    }

2.2、doBind0(regFuture, channel, localAddress, promise)

注册完成后,最后开始绑定端口的工作。绑定肯定是socket来绑定,最后到serverSocketChannel.bind()完成绑定工作。

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                附加调转代码:
                --------------->pipeline.bind(localAddress, promise);
                --------------->tail.bind(localAddress, promise);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}
可以看到绑定跟注册其实是一样的,只不过一个从head开始往后遍历,一个从tail开始往前遍历。
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound(); //就是通过this.prev上一个节点,this指的是tail。
    next.invoker().invokeBind(next, localAddress, promise);//这个跟注册一样,调用pipeline上的节点
    附加调转代码:
    --------------->invokeBindNow(ctx, localAddress, promise);
    --------------->ctx.handler().bind(ctx, localAddress, promise);
    --------------->unsafe.bind(localAddress, promise); //绑定工作交给unsafe
    --------------->doBind(localAddress);
    --------------->javaChannel().socket().bind(localAddress, config.getBacklog());
    return promise;
}
绑定完成之后,通过pipeline.fireChannelActive()方法来给每个handler挨个发送通知。从head开始,到目前pipeline中只有一个ServerBootstrapAccept,没有channelActive方法,所以可以直接结束了。

channel.read();注册结束之后,将accept打开,开始读取数据
从read到doBeginRead中间跳转的过程比较曲折,简单来讲还是一样的,读写从pipeLine管道的tail开始,然后通过invoke管理工具同意操作handler,最终交给unsafe最具体的操作。 

最后就到了Reactor中熟悉的画面:在2.1.3中已经注册完socket了,2.2中已经bin完socket了,这里将兴趣点设置成OP_ACCEPT。
protected void doBeginRead() throws Exception {
    final SelectionKey selectionKey = this.selectionKey;   
    if (!selectionKey.isValid()) {
        return;
    }
    readPending = true;
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) { 
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

3. Accept的功能

那么到这里为止MainReactor的结构和注册逻辑就分析完了,上面2.1.2 初始化NioServerSocketChannel的时候已经分析了,所有child的一些配置项都包装到Accept中去了,所以当Accept成功接收到一个客户端的请求时,就会将这个socket注册到subReactor中去,init和register的步骤跟Main是一样的。

这里的步骤跟Main中的初始化socket的步骤一样,添加pipeline、配置soket、最后注册channel到Reactor中去。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    child.pipeline().addLast(childHandler);
    for (Entry<ChannelOption<?>, Object> e: childOptions) {
        try {
            if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                logger.warn("Unknown channel option: " + e);
            }
        } catch (Throwable t) {
            logger.warn("Failed to set a channel option: " + child, t);
        }
    }
    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }
    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

上面的都是分析,看着很散,最后总结一下吧:
简单回忆一下基于IO多路复用机制的Reactor的设计模型:在主从Reactor模型中,MainReactor随机选择一个线程启动Accept,专门负责接收来自客户端的连接请求,并在Accept中封装了SubReactor,这样当Accept接收到有效请求后,将socket注册到SubReactor中去,职责分工完毕。
结合上面的分析先理一下流程:

  1. 主从Reactor需要new两个NioEventLoopGroup对象,分别为boss和worker;
  2. 在NioEventLoopGroup中,创建一个线程池,一个EventExecutor执行组(大小是内核数两倍)、每个执行组中添加NioEventLoop执行者,一个chooser选择器(专门用来挑选执行者的)、添加listener监听;
  3. 初始化ServerBootstrap总指挥,然后配置一些参数,如group(封装Reactor)、channel(socket对象)、handler(boss用的)、option(boss的socket的参数),childOption(subReactor的socket的参数)、childHandler(worker用的)
  4. 创建socket,通过反射创建的NioServerSocketChannel对象,里面构造的时候,通过select的provider创建ServerSocketChannel,同时创建一个unSafe对象,一个PipeLine对象(是一个双向链表head和tail,通过fireChannelXXX责任链将消息分发给各个节点处理)
  5. 初始化socket,就是添加socket配置和相关属性,如添加AUTO_READ、SO_KEEPLIVE、SO_BACKLOG等待,同时填充pipeLine。
  6. 注册socket,通过一系列的调用:选择一个NioEvnetLoop,通过其中的unSafe最终socket注册到selector选择器上,这是通过forkJoinPool线程池执行的异步任务,任务中开始执行selector的select方法,然后调用doRegister执行注册;此外通过责任链依次遍历节点,填充pipeLine管道,此时在MainReactor中填充的就是ServerBootstrapAccepter,同时将SubReactor中的所有信息都封装进去,你懂的~~。
  7. socket准备就绪后,开始绑定端口了,跟上面一样一系列分工调用,最后在通过socket.bind完成绑定工作,同时将Accepter的兴趣点设置为OP_ACCEPT;
  8. 因为在第6点的异步任务中已经执行了select()阻塞监听,因此当有连接进入的时候,在Accept这个handler中回调channelRead方法,接收消socket,并将socket注册到SubReactor中去,同是完成subReactor中的一系列的创建初始化和注册功能。过程跟MainReactor过程是一样的。

关于Reactor模型在Netty中的应用就分析完了,打完收工。

本文地址:https://blog.csdn.net/massonJ/article/details/108173436