欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

netty源码分析-server启动-initAndRegister 博客分类: netty源码分析  

程序员文章站 2024-03-17 19:12:16
...
本文包括以下内容
  1. initAndRegister-createChannel分析
  2. initAndRegister-createChannel分析
  3. ChannelInitializer分析

创建channel-server端bind()调用链如下

netty源码分析-server启动-initAndRegister
            
    
    博客分类: netty源码分析    
sever端启动主要处理都在bind()处理中,其中主要代码如下
AbstractBootstrap
private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        final ChannelPromise promise;
        if (regFuture.isDone()) {
            promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    doBind0(regFuture, channel, localAddress, promise);
                }
            });
        }

        return promise;
    }
其中最重要的两个部分已经标出,先来分析一下initAndRegister()

分析initAndRegister方法
时序图如下
netty源码分析-server启动-initAndRegister
            
    
    博客分类: netty源码分析  
    final ChannelFuture initAndRegister() {
        Channel channel;
        try {
//创建channel,并配置其主要属性
            channel = createChannel();
        } catch (Throwable t) {
            return VoidChannel.INSTANCE.newFailedFuture(t);
        }

        try {
//初始化channel
            init(channel);
        } catch (Throwable t) {
            channel.unsafe().closeForcibly();
            return channel.newFailedFuture(t);
        }
//创建ChannelPromise
        ChannelPromise regFuture = channel.newPromise();
        channel.unsafe().register(regFuture);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }
1.createChannel分析
ServerBootstrap:
    Channel createChannel() {
        EventLoop eventLoop = group().next();
        return channelFactory().newChannel(eventLoop, childGroup);
    }
ServerBootstrapChannelFactory
        @Override
        public T newChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
            try {
//class=NioSocketChannel.class,调用它的构造方法。
                Constructor<? extends T> constructor = clazz.getConstructor(EventLoop.class, EventLoopGroup.class);
                return constructor.newInstance(eventLoop, childGroup);
            } catch (Throwable t) {
                throw new ChannelException("Unable to create Channel from class " + clazz, t);
            }
        }
//调用NioServerSocketChannel构造方法创建实例
    public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
//调用父类构造方法
        super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT);
//创建默认配置
        config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());
    }

    protected AbstractNioMessageServerChannel(
            Channel parent, EventLoop eventLoop, EventLoopGroup childGroup, SelectableChannel ch, int readInterestOp) {
//调用父类构造方法
//此时parent=null 。ch=ServerSocketChannelImp(java原生的)
        super(parent, eventLoop, ch, readInterestOp);
//childGroup赋值
        this.childGroup = childGroup;
    }
    protected AbstractNioMessageChannel(
            Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) {
//调用父类构造方法
        super(parent, eventLoop, ch, readInterestOp);
    }

protected AbstractNioChannel(Channel parent, EventLoop eventLoop,     SelectableChannel ch, int readInterestOp) {
//调用父类构造方法
        super(parent, eventLoop);
// 赋值 java SocketChannelImpl
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
//配置java.nio.ServerSocketChannel为非阻塞
            ch.configureBlocking(false);
        } catch (IOException e) {
//异常处理暂时不关注
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }
最后调用
    protected AbstractChannel(Channel parent, EventLoop eventLoop) {
        this.parent = parent;
// 非空和兼容校验
        this.eventLoop = validate(eventLoop);
        unsafe = newUnsafe();//初始化unsafe××重要×××
        pipeline = new DefaultChannelPipeline(this);//初始化pipeline××重要×××
    }

AbstractNioChannel
//兼容校验
    protected boolean isCompatible(EventLoop loop) {
        return loop instanceof NioEventLoop;
    }
小结:createChannel创建了 unsafe,pipeline,java.nio.ServerSocketChannel,然后为其属性eventLoop,childGroup,readInterestOp,config 赋值。
eventLoop是什么时候创建的  ?
创建 EventLoopGroup时创建的

2.init分析
void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options();
//options value= {SO_BACKLOG=128}
        synchronized (options) {
            channel.config().setOptions(options);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs();
//attrs={}
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();
        if (handler() != null) {
            p.addLast(handler());
        }

        final ChannelHandler currentChildHandler = childHandler;
//获取
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
//childOptions={SO_KEEPALIVE=true}

        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
//childAttrs={}
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }
//ChannelInitializer的作用:定义initChannel方法,便于添加handler。
//ServerBootstrapAcceptor的主要方法channelRead获取了ServerBootstrap的参数,貌似ServerBootstrapAcceptor作用只是获取ServerBootstrap的参数
//详细作用以后分析
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
                        currentChildAttrs));
            }
        });
    }

在一个标准的netty server childHandler 对应下面部分


3.ChannelInitializer分析

//ChannelInitializer:用于添加handler,在chnnel执行register()时调用
public abstract class ChannelInitializer<C extends Channel> extends ChannelHandlerAdapter {

//子类实现 通常用于添加handler
    protected abstract void initChannel(C ch) throws Exception;

//在chnnel执行register()时调用
    @Override
    @SuppressWarnings("unchecked")
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ChannelPipeline pipeline = ctx.pipeline();
        boolean success = false;
        try {
//调用子类实现方法, 通常用于添加handler
            initChannel((C) ctx.channel());
//使用完毕 移除当前handler
            pipeline.remove(this);
    //传递ChannelRegistered事件,调用其他处理ChannelRegistered事件的handler
//实现事件传递关键的步骤,如果不写,则事件终止
            ctx.fireChannelRegistered();
            success = true;
        } catch (Throwable t) {
            logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
        } finally {
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
            if (!success) {
                ctx.close();
            }
        }
    }
}

在server启动-initAndRegister-init()中添加ServerBootstrapAcceptor和用户添加自定义handler时用到.

小结:init()将ServerBootstrap传递给channel(),并添加了一个访问ServerBootstrap属性的handler
最后看一下这两行代码
//创建ChannelPromise,ChannelPromise用途详见其分析篇
        ChannelPromise regFuture = channel.newPromise();
//注册channel到EventLoop上
        channel.unsafe().register(regFuture);
OK initAndRegister 分析完毕
总结:initAndRegister 创建了NioServerSocketChannel,初始化了ServerBootstrapAcceptor,注册channel到parent EventLoop上 。
  • netty源码分析-server启动-initAndRegister
            
    
    博客分类: netty源码分析  
  • 大小: 37.1 KB
  • netty源码分析-server启动-initAndRegister
            
    
    博客分类: netty源码分析  
  • 大小: 52.8 KB