欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

netty源码分析系列——Bootstrap

程序员文章站 2023-12-30 18:09:34
...

引言

我们在使用netty4的时候,总是从使用它的两个启动器开始的,它们是我们最熟悉的netty的API,也是netty程序的入口,因此我们从启动器来开启研究netty4源码是非常明智的选择,从我们最熟悉的内容开始,让我们一起来探究它背后是怎么实现的。

客户端启动器

简单用法演示

我们知道Bootstrap是netty4的客户端启动器,首先我们一起来回顾一下我们是怎么使用它的。

public class TimeClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

上述代码演示了一个最简单的netty client程序编写方式。我们可以看出来它就是构造了一个Bootstrap对象,然后对它进行一些配置,最后调用connect()连接方法连接到远程服务器。看起来是非常简单的。让我们一起来探究一下它背后发生了什么。

Bootstrap类图

netty源码分析系列——Bootstrap

Bootstrap源码分析

构造函数

先通过构造函数来构造它的对象。它有两个构造方法。

    public Bootstrap() { } //无参构造函数,默认使用这个,没有做任何事情。

    private Bootstrap(Bootstrap bootstrap) {//传递另外一个模版boostrap对象来初始化,将该对象中的属性设置到当前对象中。
        super(bootstrap);
        resolver = bootstrap.resolver;
        remoteAddress = bootstrap.remoteAddress;
    }

通过上述2个构造函数,我们调用它们就可以获得一个Bootstrap对象,我们就可以调用它的方法来继续。

属性配置方法

Bootstrap以及它继承的父类AbstractBootstrap提供了一系列配置它的参数和属性的方法,而且为了方便配置大量的参数,它使用了现在非常流行的链式编程风格,直接返回了当前对象。

配置EvnetLoopGroup对象

    public B group(EventLoopGroup group) {
        if (group == null) {//不允许为空。
            throw new NullPointerException("group");
        }
        if (this.group != null) {
            throw new IllegalStateException("group set already");
        }
        this.group = group;//赋值
        return (B) this;//返回当前对象,链式编程。
    }

group属性就是用于处理I/O事件的事件循环组。是异步事件模型的重要实现。

配置连接通道类和工厂对象

    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

    public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
        return channelFactory((ChannelFactory<C>) channelFactory);
    }

    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory");
        }
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }

        this.channelFactory = channelFactory;
        return (B) this;
    }

从上述代码可以看到最后通过先配置连接通道类,最后生成了一个连接通道工厂对象的属性。该属性用于生产新的连接通道类。不同的连接通道类对应不同的socket传输协议,用户根据需求进行选择。最后默认生成的连接工厂对象是ReflectiveChannelFactory类的。它就是一个使用类的无参数构造函数反射生成连接通道对象的工厂类。

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }
}

上述关键方法是newChannel产生新的连接通道对象,调用了类的反射方法clazz.newInstance方法获得新的对象。

配置通道选项ChannelOption

ChannelOption通道选项对象,是对通道的属性和行为进行定制的一种方式。如例子中通过b.option(ChannelOption.SO_KEEPALIVE, true)来设置连接通道保持连接,它也是一个非常重要的方法。

    public <T> B option(ChannelOption<T> option, T value) {
        if (option == null) {
            throw new NullPointerException("option");
        }
        if (value == null) {
            synchronized (options) {
                options.remove(option);
            }
        } else {
            synchronized (options) {
                options.put(option, value);
            }
        }
        return (B) this;
    }

从代码中我们看出,它最终将值设置到options属性中,如果输入的value是null则表示是删除该选项。设置好的值可供后续连接通道创建,初始化,连接以及读写事件等操作中使用它。

配置用户自定义处理器

用户自己的业务逻辑如何迁入到netty中呢?它提供了设置自定义处理器的一种方法。

    public B handler(ChannelHandler handler) {
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
        return (B) this;
    }

上述代码可以看出,它只是设置了属性handler属性。而我们往往会设置一个ChannelInitializer抽象类的一个实现类的对象,在这个类的实现方法中我们逐一设置用户定义的各种处理器对象。

b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());//将用户自定义的ChannelHandler逐一加入到通道绑定的处理器链中。
                }
            });

还有其它一些配置方法,我们不再一一分析。接下来我们进入到连接方法。

连接方法connect()

连接是Bootstrap最重要的一个方法,当构造以及配置好了之后,只有通过连接方法才能够真正开始建立连接,以及后续的发送和接收信息处理。提供了一系列重载的方法,逻辑完全是一样的,我们这里这分析其中一个方法。

    public ChannelFuture connect(SocketAddress remoteAddress) {//参数remoteAddress是远程服务器端的地址。
        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }

        validate();//检验。
        return doResolveAndConnect(remoteAddress, config.localAddress());解析地址并且连接。
    }

接下来进入两个方法,一个是校验,一个是解析并且连接。

校验配置信息方法validate

    public Bootstrap validate() {//子类Bootstrap的校验方法。
        super.validate();
        if (config.handler() == null) {
            throw new IllegalStateException("handler not set");
        }
        return this;
    }

    public B validate() {//父类AbstractBootstrap的校验方法。
        if (group == null) {
            throw new IllegalStateException("group not set");
        }
        if (channelFactory == null) {
            throw new IllegalStateException("channel or channelFactory not set");
        }
        return (B) this;
    }

校验方法主要是校验handler,group和channelFactory等几个属性是否为空,意味着要进行连接,这几个重要的属性必须先进行设置。可以利用上述设置方法进行设置。

整体解析并且连接方法

回到connect方法代码,我们接下进入的是doResolveAndConnect方法,该方法解析并且进行连接。

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();//1
        final Channel channel = regFuture.channel();//2

        if (regFuture.isDone()) {//3
            if (!regFuture.isSuccess()) {
                return regFuture;
            }
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {//4
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {//5
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    // Direclty obtain the cause and do a null check so we only need one volatile read in case of a
                    // failure.
                    Throwable cause = future.cause();
                    if (cause != null) {//6
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {//7
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);//8
                    }
                }
            });
            return promise;//9
        }
    }

上面这个解析连接方法的代码我们分几个步骤来看,代码中标注了顺序。

1处调用了initAndRegister方法,表示初始化和注册,返回的对象时一个ChannelFuture对象,这是典型的异步编程,该步并不会阻塞线程。 稍后我们看看此处代码。

2处从ChannelFuture对象中获得了初始化得到的Channel对象。我们在initAndRegister方法中可以看到它是怎么来的。

3处立即判断ChannelFuture对象是否已经完成了,若执行成功则表示注册成功则调用doResolveAndConnect0继续执行其它逻辑,如果没有成功则直接返回ChannelFuture对象。稍后我们继续看doResolveAndConnect0方法代码。

4处若未完成则给ChannelFuture对象添加监听器,当事件到达的时候执行响应处理。我们看到方法里有构造了一个PendingRegistrationPromise对象,将Channel放入该对象中。它也是一个ChannelFuture对象的一个实现类,最终方法返回的是该对象。这是为什么呢?从注视来看是说防止万一regFuture对象未被正确填充,这是什么意思呢?还是不太了解。我们先留一个疑问。

5处添加一个监听器用来监听操作完成事件operationComplete。

6处则是判断若操作完成的时候有异常,则设置PendingRegistrationPromise为失败。

7处则是若执行成功,则设置PendingRegistrationPromise为注册成功,最后也会调用doResolveAndConnect0方法,这与第3步是一样的。

初始化和注册事件循环组方法

我们先回过来看一下initAndRegister方法代码。

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();//1
            init(channel);//2
        } catch (Throwable t) {//3
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);//4
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;//5
    }

1处用我们在初始化的时候设置的ChannelClass产生的一个默认的ChannelFactory对象产生一个新的Channel对象。在这里用到了。

2处调用子类的具体init()初始化方法,就是对于Channel进行初始化,具体逻辑后面的章节内容会介绍。

3处是异常处理当实例化或者初始化抛出异常的情况下则关闭通道并且返回一个失败状态的ChannelFuture对象。

4处将初始化过的连接通道注册到配置的EventLoopGroup对象上,该处也是使用了异步编程,返回一个ChannelFuture对象,如果注册失败则关闭连接通道。

5处返回一个regFutre对象,表示未来注册的结果。如果成功了则表示已经注册成功,则可以安全地调用bind()或者connect方法进行下一步处理了。

初始化方法init()

Boostrape类实现了父类的该方法,它的源码如下。

    void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();//1
        p.addLast(config.handler());//1

        final Map<ChannelOption<?>, Object> options = options0();//3
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();//4
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
        }
    }

1处获得了连接通道绑定的ChannelPipeline对象.该对象是在创建Channel的同时创建的。

2处将用户配置的ChannelHandler添加到pipeline的最末端。

3处将用户配置的选项列表设置到通道对象中。

4处将用户配置的属性类型设置到通道对象中。

将我们在初始化设置Bootstrap的一些配置信息同时设置到新生成的Channel对象中,供真正的后续使用。

我们再次跳出来,回到之前的哪个方法,当Channel对象创建,初始化以及成功注册到EventLoopGroup对象中后,接下来还需要执行真正的连接逻辑。

解析远程地址

进入到doResolveAndConnect0私有方法,这里有解析和连接的逻辑。

   private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                               final SocketAddress localAddress, final ChannelPromise promise) {
        try {
            final EventLoop eventLoop = channel.eventLoop();//1
            final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);//2
            if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {//3
                doConnect(remoteAddress, localAddress, promise);
                return promise;
            }
            final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);//4
            if (resolveFuture.isDone()) {//5
                final Throwable resolveFailureCause = resolveFuture.cause();
                if (resolveFailureCause != null) {
                    // Failed to resolve immediately
                    channel.close();
                    promise.setFailure(resolveFailureCause);
                } else {
                    // Succeeded to resolve immediately; cached? (or did a blocking lookup)
                    doConnect(resolveFuture.getNow(), localAddress, promise);
                }
                return promise;
            }
            resolveFuture.addListener(new FutureListener<SocketAddress>() {//6
                @Override
                public void operationComplete(Future<SocketAddress> future) throws Exception {
                    if (future.cause() != null) {
                        channel.close();
                        promise.setFailure(future.cause());
                    } else {
                        doConnect(future.getNow(), localAddress, promise);
                    }
                }
            });
        } catch (Throwable cause) {
            promise.tryFailure(cause);
        }
        return promise;
    }

解析和连接方法的代码如上,该方法又是一个典型的事件模型编程模式,一切耗时操作都采用了事件监听模型。

1处获得了当前Channel绑定的一个eventloop对象。

2处获得当前eventloop对象绑定的一个地址解析器对象。

3处如果地址解析起不支持或者已经解析过,则直接使用该远程地址进行连接。

4处使用地址解析器对远程地址进行解析。

5处如果解析器正好解析完成,则判断解析结果,如果解析抛出异常则关闭连接,设置状态是失败;如果解析成功则进行连接操作。

6处如果未来才会解析完成,则添加一个监听器,监听未来监听的事件。处理逻辑与5相同。

AddressResolver接口是一个负责解析SocketAddress的地址解析器,该类相对比较简单,接口方法较少,也提供了支持Future方式的接口。

最后又调用了doConnect()方法执行真正的连接逻辑。接下来继续看看它的代码逻辑。

连接方法doConnect

    private static void doConnect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
        final Channel channel = connectPromise.channel();//1
        channel.eventLoop().execute(new Runnable() {//2
            @Override
            public void run() {
                if (localAddress == null) {//3
                    channel.connect(remoteAddress, connectPromise);
                } else {//4
                    channel.connect(remoteAddress, localAddress, connectPromise);
                }
                connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);//5
            }
        });
    }

连接方法也是将真正的连接这种可能阻塞的操作异步化了,提交一个任务去执行,再传递一个Future对象。

1处是获得当前的连接对象。因为doConnect方法是一个静态方法,所有的状态都从参数传递,所以当前连接也不例外,从ChannelPromise对象中获取。

2处用当前连接绑定的eventLoop去执行一个异步的任务。

3处是当本地地址为设置,则只传递远程地址调用Channel的连接方法进行真正的连接。

4处是如果设置列本地地址,则将本地地址传递给Channel调用连接方法。

5处是添加了一个当失败关闭连接的事件。

可以看到此处连接方法还是调用了Channel对象的connect方法去进行真正的连接,而此处无非是做了一个异步处理,避免阻塞主线程。

后面我们会对Channel进行深入研究,会看看该方法的实现细节。

服务端启动器

简单用法演示

public class TimerServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new TimeServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

上述代码演示了一个简单事件服务器的代码。它只会给客户端响应一个当前服务系统时间,业务逻辑极其简单,但是它能够演示常见netty服务端程序的构造及配置过程。从演示代码可以看出,它与netty客户端的代码结构相似度极高,只有细微的差别。这是因为客户端和服务端服用了大量的接口及实现类代码,最大程度复用代码,而且也降低了用户的学习成本。

ServerBootstrap源码分析

查看源码我们可以知道ServerBootstrap和Bootstrap都继承了相同的抽象类AbstractBootstrap,它们有非常多相同的代码实现,因此相同部分代码我们不在分析,只分析差异部分的代码。

实例化和配置

ServerBootstrap实例化比较简单,就不进行分析了,我们只看看它设置属性与Bootstrap不同的部分内容。

设置事件循环器组group方法

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }

与Bootstrap不同的是,ServerBootstrap有两个group属性,当然这2个属性可以指向同一个对象,但一般不建议这么用,特意分开就是将两者相互隔离,避免相互干扰。

parentGroup负责处理ServerChannel(接收客户端连接通道)的I/O事件, 而chidGroup则是用于处理Channel(客户端连接通道)的I/O事件。

其他部分设置与Bootstrap相同,不再赘述。

绑定端口方法bind

服务端启动器绑定一个端口号,启动ServerChannel,等待客户端的连接。这是最为关键的一个方法,源码如下。

    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));//1
    }
    public ChannelFuture bind(SocketAddress localAddress) {
        validate();//2
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);//3
    }

1处调用了另外一个重载方法执行。将参数端口号inetPort转换为一个InetSocketAddress对象。

2处要对配置的属性进行校验。该方法源码Bootstrap已经看过,唯一的差别是它还需要校验一下childGroup不能是空的。

3处调用了私有方法doBind执行真正的校验逻辑。需要继续查看该方法源码。

执行绑定方法doBind

   private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();//1
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);//2
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

该方法的代码逻辑与Bootstrap的doResolveAndConnect几乎一致,我们只讲解差异部分。

1处也是调用了initAndRegister方法,该方法是初始化Channel并且注册到group上,该方法也分析过,但是调用的init方法不同,下面重点看看该方法。由于设置的channel属性的值是NioServerSocketChannel,则我们得到的channel对象也是NioServerSocketChannel类型的。

2处最终又调用了doBind0方法来执行真正的绑定逻辑,稍后继续查看。

初始化方法init

void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();//1
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();//2
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
        ChannelPipeline p = channel.pipeline();
        final EventLoopGroup currentChildGroup = childGroup;//3
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();//4
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                ch.eventLoop().execute(new Runnable() {//5
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

该初始化方法与Boostrap稍有些不同,它不到要负责初始化ServerSocketChannel,还需要负责接收来自于客户端的SockeChannel,并且用针对于child的属性配置它们。

1处将用户设置的选项信息初始化连接。

2处将用户设置的属性信息初始化连接。

3处获取用户配置的子选项和子属性信息,以备后面初始化客户端的连接使用。

4处将用户设置的handler追加到pipeline的尾部。

5处提交一个异步任务添加一个ServerBootstrapAcceptor处理器。该异步任务在init方法之后执行,确保ServerBootstrapAcceptor处理器位于用户设置的handler之后。

ServerBootstrapAcceptor

ServerBootstrapAcceptor是一个内部静态类。它是一个ChannelInboundHandlerAdapter的子类,说明是一个输入事件处理器。

  private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

        private final EventLoopGroup childGroup;
        private final ChannelHandler childHandler;
        private final Entry<ChannelOption<?>, Object>[] childOptions;
        private final Entry<AttributeKey<?>, Object>[] childAttrs;

        ServerBootstrapAcceptor(//1
                EventLoopGroup childGroup, ChannelHandler childHandler,
                Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
            this.childGroup = childGroup;
            this.childHandler = childHandler;
            this.childOptions = childOptions;
            this.childAttrs = childAttrs;
        }

        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;//2

            child.pipeline().addLast(childHandler);

            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {//3
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            final ChannelConfig config = ctx.channel().config();
            if (config.isAutoRead()) {//4
                config.setAutoRead(false);
                ctx.channel().eventLoop().schedule(new Runnable() {
                    @Override
                    public void run() {
                        config.setAutoRead(true);
                    }
                }, 1, TimeUnit.SECONDS);
            }
            ctx.fireExceptionCaught(cause);
        }
    }

1处是ServerBootstrapAcceptor的构造函数,传入的参数是子连接的配置信息,包括childHandler,childGroup,childOptions和childAttrs等。这些信息用于初始化子连接。

2处是channelRead事件处理方法。ServerSocketChannel接收到的消息是从客户端过来的连接channel。然后将childHandler添加到channel的pipeline中,注意设置选项和属性等信息。

3处是将channel注册到childGroup中去。同样也是提交了一个异步任务。监听注册完成的事件,如果注册失败则强制关闭连接,若出现其它异常也会关闭连接。

4处是exceptionCaught事件处理方法。当接收到异常的事件,则将autoRead状态临时设置为false,暂停一分钟接受客户端的连接。等待1秒钟之后再接收客户端新的连接。

执行绑定doBind0

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
        channel.eventLoop().execute(new Runnable() {//1
            @Override
            public void run() {
                if (regFuture.isSuccess()) {//2
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

1处提交了一个异步任务用于执行可能阻碍的真正bian逻辑。

2处当注册成功后调用ServerScoketChannel的bind方法执行真正的绑定端口逻辑。当注册失败则设置Futrue为失败状态。

转载于:https://my.oschina.net/ywbrj042/blog/868798

上一篇:

下一篇: