欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

netty启动源码4

程序员文章站 2022-04-22 18:05:13
...

接着之前讲到 final ChannelFuture regFuture = initAndRegister();
接下来讲doBind0(regFuture, channel, localAddress, promise);

    private ChannelFuture doBind(final SocketAddress localAddress) {
        //初始化和注册
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

跟踪**doBind0(regFuture, channel, localAddress, promise);**方法到AbstractBootstrap类的dobind0()方法

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // 在触发channelregister()之前调用此方法。给用户处理程序设置的机会
        // 管道在其channelRegistered()实现中。
        //这些任务最终被事件轮询线程同步调用
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(" channel.eventLoop().execute(new Runnable() ");
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

发现了channel.eventLoop().execute()这个方法之前就碰到;这个方法是往taskqueue里添加了任务;然后有专门的线程(称为reactor线程)死循环的处理该队列的任务;
所以接下下跟踪 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);到AbstractChannelHandlerContext类的connect()

    @Override
    public ChannelFuture connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }
        if (isNotValidPromise(promise, false)) {
            // cancelled
            return promise;
        }

        final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeConnect(remoteAddress, localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeConnect(remoteAddress, localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }

这个方法是pipeline的处理链表里的tail节点的父类的方法
之前分析的处理链表:head->nettyTestHendler->ServerBootstrapAcceptor-> tail;
这行代码:final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
是表示从tail到head寻找是出栈节点的(实现ChannelOutboundHandler的类);
这里说明一下head节点是HeadContext实现了ChannelOutboundHandler,ChannelInboundHandler
其他三个节点都是只有实现了ChannelInboundHandler所以这里找到的结果就是head节点;
接着跟踪 next.invokeConnect(remoteAddress, localAddress, promise);跟踪到AbstractNioChannel类的内部类AbstractNioUnsafe的如下方法

        @Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            assertEventLoop();

            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            // See: https://github.com/netty/netty/issues/576
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                localAddress instanceof InetSocketAddress &&
                !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
                // Warn a user about the fact that a non-root user can't receive a
                // broadcast packet on *nix if the socket is bound on non-wildcard address.
                logger.warn(
                        "A non-root user can't receive a broadcast packet if the socket " +
                        "is not bound to a wildcard address; binding to a non-wildcard " +
                        "address (" + localAddress + ") anyway as requested.");
            }

            boolean wasActive = isActive();
            try {
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }

            if (!wasActive && isActive()) {
                //线程执行调用channelActive方法
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }

            safeSetSuccess(promise);
        }

其中的doBind(localAddress);就是原生jdk绑定端口号,但是此时还是没有给服务端的seversocketchannel绑定刚兴趣的事件;
接着又往taskqueue里添加任务,所以会执行**pipeline.fireChannelActive();**跟踪代码
知道pipeline.fireChannelActive();这个是执行处理链的是入栈的(inboundhandler接口实现类)的节点的channelActive()方法;先执行head节点这个方法,然后发现head这个方法里有
fireChannelActive();调用下个节点的channelActive();此时这个节点是我们自定义的nettytesthendler的方法,若我们调用fireChannelActive();则回去ServerBootstrapAcceptor的channelActive();的方法但是新版的netty里面加了注解@skip表示直接跳过该方法,所以会跳到最后一个tail去;
当调用完后则会在headcontextd的channelActive();方法中还有个 readIfIsAutoRead();这个方法去找是属于outboundHandler的节点目前只有head节点是属于出栈节点所以最后会调用head的read()方法,.最终继续跟踪下去会给通道绑定监听连接事件;
head->nettyTestHendler->ServerBootstrapAcceptor-> tail;

总结:所以doBind0(regFuture, channel, localAddress, promise);主要是绑定端口.通道绑定刚兴趣事件,接下来就是等待连接事件,可以看reactor线程的那个死循环;