Netty服务端口的绑定
调用netty的bootstrap的bind()方法会开始netty对本地端口的绑定与监听。
在serverBootstrap的超类abstractBootstrap的bind()方法开始绑定的全过程。
public ChannelFuture bind() {
validate();
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
return doBind(localAddress);
}
首先会在validate()方法当中对netty中的配置进行验证,主要是对事件处理循环器的eventGroup和通道工厂channelFactory是否已经被配置完毕。
然后判断是否已经配置了本地地址,然后会通过doBind()继续下面的绑定过程。
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and succesful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.executor = channel.eventLoop();
}
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}
在doBind()方法中,首先会通过initAndRegister()方法开始将所需要的通道注册到eventGroup当中去。
final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
首先通过channelFactory的newChannel()方法得到新的具体的通道实例。然后调用init()方法开始对通道进行初始化。
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
if (handler() != null) {
p.addLast(handler());
}
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
init()方法在abstractBootstrap中是作为虚拟方法实现的,而并没有给出真正的具体实现,可以把目光移到serverBootstrap里有init()方法的具体实现。
一开始则会在这里这个服务端锁生成的通道进行配置option和attr属性的相关配置。
然后则会取出通道的责任链pipline,如果这个netty已经设置了相应的channelhandler则会在这里将这个handler加入责任链。
如果还配置了相应的childGroup和childHandler作为channelInitializer中重写initChannel()方法中生成的ServerBootStrapAcceptor的构造成员则会在这里加入责任链的末尾。
在channelInitializer中,实现了channel的channelRegister方法。
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline pipeline = ctx.pipeline();
boolean success = false;
try {
initChannel((C) ctx.channel());
pipeline.remove(this);
ctx.fireChannelRegistered();
success = true;
} catch (Throwable t) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
} finally {
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
if (!success) {
ctx.close();
}
}
}
在之后的注册通道过程中,将在这里被调用这个register()方法。也就是说,这里直接会调用initChannel()方法,而initChannel()方法的具体实现,在刚才serverBootstrap里的init()方法中,被实现了。而本身,提供了这个注册方法的通道初始化者也会被移除通道的责任链当中。
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
在这里,重写了initChaneel()方法。在这里,将ServerBootstrapAcceptor生成,并且加入通道的责任链当中。ServerBootstrapAcceptor重写了channelRead()方法,给出里读取通道数据的方式,但在这里并不是重点。
在初始化完毕通道之后。把目光回到abatractBootstrap的initAndRegister()方法。顾名思义,既然已经初始化完毕了通道,那么接下来将会去向eventLoopGroup注册相应的通道。
在eventLoopGroup中的NioEventLoopGroup的超类的超类里MutiThreadEventExecutorGroup中,存在着chooser根据注册通道时index选择相应的singleThreadEventLoop来注册相应所需要注册的通道。
而注册方法也恰恰实现在了singleThreadEventLoop中。
public ChannelFuture register(Channel channel) {
return register(channel, new DefaultChannelPromise(channel, this));
}
在这里根据相应的通道和eventExcutor生成相应的channelPromise并且继续注册。
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
}
channel.unsafe().register(this, promise);
return promise;
}
在这里直接通过channel的unsafe方法注册相应的eventLoop。
在unsasfe的注册过程中,核心之处在于AbstractNioUnsafe的doRegister()方法。
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
这里完成真正的java通道与eventLoop的selector的注册。
在所有注册完毕之后,也就是说initAndRegister()方法的初始化与注册都已经完毕。这样的话,在确保注册已经完毕之后,将会通过bind0()方法开始正式的绑定。
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
在这里,在eventLoop中异步的去将本地端口和通道去绑定,并添加通道监听器。
在channel的bind()方法中,在abstractChannel中实现了bind()方法,而其中是直接调用了pipeLine的bind()方法,而在pipeline中也是直接调用了tail的bind方法。在headContext中,bind()方法而是直接调用了Unsafe的bind()方法,由此可以看见,和注册的过程一样,最后还是在unsafe中完成bind()的具体实现。
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&
Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
在这里调用了doBind()方法。在nioServerSocketChannel中实现。
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
这样,变成了java通道的端口绑定,由此,netty的端口绑定结束。
上一篇: php获取本地连接ip
下一篇: Netty多端口绑定服务端