欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty服务端口的绑定

程序员文章站 2022-06-04 19:47:37
...

调用nettybootstrapbind()方法会开始netty对本地端口的绑定与监听

serverBootstrap的超类abstractBootstrapbind()方法开始绑定的全过程

public ChannelFuture bind() {
    validate();
    SocketAddress localAddress = this.localAddress;
    if (localAddress == null) {
        throw new IllegalStateException("localAddress not set");
    }
    return doBind(localAddress);
}

首先会在validate()方法当中对netty中的配置进行验证,主要是对事件处理循环器的eventGroup和通道工厂channelFactory是否已经被配置完毕。

然后判断是否已经配置了本地地址然后会通过doBind()继续下面的绑定过程。

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and succesful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.executor = channel.eventLoop();
                }
                doBind0(regFuture, channel, localAddress, promise);
            }
        });
        return promise;
    }
}

doBind()方法中,首先会通过initAndRegister()方法开始将所需要的通道注册到eventGroup当中去

final ChannelFuture initAndRegister() {
    final Channel channel = channelFactory().newChannel();
    try {
        init(channel);
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
首先通过channelFactory的newChannel()方法得到新的具体的通道实例。然后调用init()方法开始对通道进行初始化。
void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options();
    synchronized (options) {
        channel.config().setOptions(options);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();
    if (handler() != null) {
        p.addLast(handler());
    }

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new ServerBootstrapAcceptor(
                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
    });
}

init()方法在abstractBootstrap中是作为虚拟方法实现的而并没有给出真正的具体实现可以把目光移到serverBootstrap里有init()方法的具体实现

一开始则会在这里这个服务端锁生成的通道进行配置optionattr属性的相关配置。

然后则会取出通道的责任链pipline,如果这个netty已经设置了相应的channelhandler则会在这里将这个handler加入责任链。

如果还配置了相应的childGroupchildHandler作为channelInitializer中重写initChannel()方法中生成的ServerBootStrapAcceptor的构造成员则会在这里加入责任链的末尾

channelInitializer实现了channelchannelRegister方法

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    ChannelPipeline pipeline = ctx.pipeline();
    boolean success = false;
    try {
        initChannel((C) ctx.channel());
        pipeline.remove(this);
        ctx.fireChannelRegistered();
        success = true;
    } catch (Throwable t) {
        logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
    } finally {
        if (pipeline.context(this) != null) {
            pipeline.remove(this);
        }
        if (!success) {
            ctx.close();
        }
    }
}

在之后的注册通道过程中,将在这里被调用这个register()方法。也就是说,这里直接会调用initChannel()方法,而initChannel()方法的具体实现,在刚才serverBootstrap里的init()方法中,被实现了。而本身,提供了这个注册方法的通道初始化者也会被移除通道的责任链当中。

p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(new ServerBootstrapAcceptor(
                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
    }
});

在这里,重写了initChaneel()方法。在这里,将ServerBootstrapAcceptor生成并且加入通道的责任链当中ServerBootstrapAcceptor重写了channelRead()方法给出里读取通道数据的方式但在这里并不是重点

在初始化完毕通道之后把目光回到abatractBootstrapinitAndRegister()方法顾名思义既然已经初始化完毕了通道那么接下来将会去向eventLoopGroup注册相应的通道。

eventLoopGroup中的NioEventLoopGroup的超类的超类里MutiThreadEventExecutorGroup存在着chooser根据注册通道时index选择相应的singleThreadEventLoop来注册相应所需要注册的通道

而注册方法也恰恰实现在了singleThreadEventLoop

public ChannelFuture register(Channel channel) {
    return register(channel, new DefaultChannelPromise(channel, this));
}

在这里根据相应的通道和eventExcutor生成相应的channelPromise并且继续注册。

public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    if (promise == null) {
        throw new NullPointerException("promise");
    }

    channel.unsafe().register(this, promise);
    return promise;
}

在这里直接通过channelunsafe方法注册相应的eventLoop

unsasfe的注册过程中,核心之处在于AbstractNioUnsafedoRegister()方法。

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }

这里完成真正的java通道与eventLoopselector的注册。

在所有注册完毕之后也就是说initAndRegister()方法的初始化与注册都已经完毕。这样的话,在确保注册已经完毕之后,将会通过bind0()方法开始正式的绑定。

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

在这里,在eventLoop异步的去将本地端口和通道去绑定,并添加通道监听器。

channelbind()方法中,在abstractChannel中实现了bind()方法,而其中是直接调用了pipeLinebind()方法而在pipeline中也是直接调用了tailbind方法。在headContextbind()方法而是直接调用了Unsafebind()方法由此可以看见和注册的过程一样最后还是在unsafe中完成bind()的具体实现。

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    // See: https://github.com/netty/netty/issues/576
    if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&
        Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {
        // Warn a user about the fact that a non-root user can't receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
        logger.warn(
                "A non-root user can't receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");
    }

    boolean wasActive = isActive();
    try {
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        invokeLater(new OneTimeTask() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

在这里调用了doBind()方法nioServerSocketChannel中实现

protected void doBind(SocketAddress localAddress) throws Exception {
    javaChannel().socket().bind(localAddress, config.getBacklog());
}

这样,变成了java通道的端口绑定,由此,netty的端口绑定结束。



相关标签: java netty 源码