欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

netty4源码分析-connect

程序员文章站 2022-04-22 18:04:43
...

本文为原创,转载请注明出处

netty4源码分析-connect

 

 

客户端向服务端发起connect请求由以下代码触发:

ChannelFuture f = b.connect(host, port).sync();

 调用Bootstrap的connect方法:

//Bootstrap
public ChannelFuture connect(String inetHost, int inetPort) {
        return connect(new InetSocketAddress(inetHost, inetPort));
}

public ChannelFuture connect(SocketAddress remoteAddress) {
        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }

        validate();
        return doConnect(remoteAddress, localAddress());
    }

private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        final ChannelPromise promise = channel.newPromise();
        if (regFuture.isDone()) {
            doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
        } else {
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
                }
            });
        }

        return promise;
    }

 1、 首先分析initAndRegister()方法

// AbstractBootstrap
final ChannelFuture initAndRegister() {
        final Channel channel = channelFactory().newChannel();
        try {
            init(channel);
        } catch (Throwable t) {
            channel.unsafe().closeForcibly();
            return channel.newFailedFuture(t);
        }

        ChannelPromise regPromise = channel.newPromise();
        group().register(channel, regPromise);
        if (regPromise.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        return regPromise;
    }

 1.1 final Channel channel = channelFactory().newChannel()的执行逻辑与socket那篇文章中分析的差不多

//BootstrapChannelFactory       
public T newChannel() {
            try {
                return clazz.newInstance();
            } catch (Throwable t) {
                throw new ChannelException("Unable to create Channel from class " + clazz, t);
            }
        }

 区别在于:a、本次根据反射创建的channel的类型是SocketChannel

 

//BootstrapChannelFactory       
public NioSocketChannel() {
        this(newSocket());
}
private static SocketChannel newSocket() {
        try {
            return SocketChannel.open();
        } catch (IOException e) {
            throw new ChannelException("Failed to open a socket.", e);
        }
    }
public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new DefaultSocketChannelConfig(this, socket.socket());
    }

 b、NioSocketChannel的父类是AbstractNioByteChannel,设置的interestOps为SelectionKey.OP_READ。(注:NioServerSocketChannel的父类是AbstractNioMessageChannel,设置的interestOps为SelectionKey.OP_ACCEPT)

 

//AbstractNioByteChannel
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }

 c、最后分析父类AbstractNioChannel的构造函数

//AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
}

// AbstractChannel
protected AbstractChannel(Channel parent) {
        this.parent = parent;
        unsafe = newUnsafe();
        pipeline = new DefaultChannelPipeline(this);
    }

 newUnsafe()调用AbstractNioByteChannel的实现,实例化一个NioByteUnsafe(注:在NioServerSocketChannel中实例化的是NioMessageUnsafe)。其他点都一样,将channel设置为非阻塞,然后为channel创建管道。

 

1.2 init(channel)

该方法由Bootstrap实现(注:在NioServerSocketChannel中,该方法由ServerBootstrap实现)

//Bootstrap
void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
        p.addLast(handler());

        final Map<ChannelOption<?>, Object> options = options();
        synchronized (options) {
            for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
                try {
                    if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                        logger.warn("Unknown channel option: " + e);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to set a channel option: " + channel, t);
                }
            }
        }

        final Map<AttributeKey<?>, Object> attrs = attrs();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
        }
    }

  该方法主要做了两件事:

  a) 为管道增加一个Inbound处理器ChannelInitializer。经过此步骤后,管道中的处理器链表为:head(outbound)->ChannelInitializer(inbound)->tail(inbound)。注意ChannelInitializer的实现方法initChannel,里面会当channelRegistered事件发生时将EchoClientHandler加入到管道中。

  b) 设置NioSocketChannel的options和attrs

 

1.3 group().register(channel, regPromise);

 

实际是调用MultithreadEventLoopGroup的register方法

//MultithreadEventLoopGroup
public ChannelFuture register(Channel channel, ChannelPromise promise) {
        return next().register(channel, promise);
    }

 next方法从group中选择一个EventExecutor(实际是一个SingleThreadEventLoop),然后执行register方法

//SingleThreadEventLoop
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
        if (channel == null) {
            throw new NullPointerException("channel");
        }
        if (promise == null) {
            throw new NullPointerException("promise");
        }
        channel.unsafe().register(this, promise);
        return promise;
    }

 channel.unsafe().register(this, promise)这里会调用AbstractChannel的内部类AbstractUnsafe的register方法

//AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }
            AbstractChannel.this.eventLoop = eventLoop;
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    closeForcibly();
                    promise.setFailure(t);
                }
            }
        }

 此处开启了eventloop中的线程(即启动了netty客户端的一个线程),并将register0任务加入到线程的任务队列中。经过此步骤后,线程的任务队列仅含有一个任务,即register0任务,且正在被执行。

接着分析register0任务具体干了什么事情

//AbstractUnsafe
private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!ensureOpen(promise)) {
                    return;
                }
                Runnable postRegisterTask = doRegister();
                registered = true;
                promise.setSuccess();
                pipeline.fireChannelRegistered();
                if (postRegisterTask != null) {
                    postRegisterTask.run();
                }
                if (isActive()) {
                    pipeline.fireChannelActive();
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                if (!promise.tryFailure(t)) {
                    logger.warn(
                            "Tried to fail the registration promise, but it is complete already. " +
                                    "Swallowing the cause of the registration failure:", t);
                }
                closeFuture.setClosed();
            }
        }

protected Runnable doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return null;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    throw e;
                }
            }
        }
    }

 doRegister中的这行代码selectionKey = javaChannel().register(eventLoop().selector, 0, this)将SocketChannel、0、以及this注册到selector中并得到对应的selectionkey

接着promise.setSuccess()将promise设置为success,就会触发异步回调,回调之前main函数所在的线程中为ChannelPromise添加的listener,即Bootstrap的以下代码:

//Bootstrap.java
private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        final ChannelPromise promise = channel.newPromise();
        if (regFuture.isDone()) {
            doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
        } else {
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
                }
            });
        }

 经过此步骤后,线程的任务队列数量由原来的1个增加到了2个,即正在执行的register0任务以及本次新增的doConnect0任务。

再接着分析register0任务中的此行代码pipeline.fireChannelRegistered()

//DefaultChannelPipeline
public ChannelPipeline fireChannelRegistered() {
        head.fireChannelRegistered();
        return this;
    }

//DefaultChannelHandlerContext
public ChannelHandlerContext fireChannelRegistered() {
        final DefaultChannelHandlerContext next = findContextInbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
        return this;
}

private void invokeChannelRegistered() {
        try {
            ((ChannelInboundHandler) handler).channelRegistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    }

 ChannelRegistered是一个Inbound事件,因此会按照head->tail的顺序执行所有的inbound处理器,目前有三个处理器:head-> ChannelInitializer ->tail,ChannelInitializer和tail都是inbound处理器,所以看一下ChannelInitializer的invokeChannelRegistered方法

//ChannelInitializer
public final void channelRegistered(ChannelHandlerContext ctx)
            throws Exception {
        boolean removed = false;
        boolean success = false;
        try {
            initChannel((C) ctx.channel());
            ctx.pipeline().remove(this);
            removed = true;
            ctx.fireChannelRegistered();
            success = true;
        } catch (Throwable t) {
            logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
        } finally {
            if (!removed) {
                ctx.pipeline().remove(this);
            }
            if (!success) {
                ctx.close();
            }
        }
    }
}

 该方法主要做了以下几件事:

   a) initChannel方法是在此处实例化内部类ChannelInitializer实现的

b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(
                             //new LoggingHandler(LogLevel.INFO),
                             new EchoClientHandler(firstMessageSize));
                 }
             });

 其功能就是将EchoClientHandler加入到管道中,并将自己(ChannelInitializer)从管道中删除,此时的处理器链表为:

      Head->EchoClientHandler->tail

  b) 接着调用EchoClientHandler和tail的channelRegistered方法,都没有做啥实质性的事情,最后以tail的空实现结束。

再分析register0任务中的以下代码

 

//AbstractUnsafe
if (isActive()) 
{
   pipeline.fireChannelActive();
 }

// NioSocketChannel
public boolean isActive() {
        SocketChannel ch = javaChannel();
        return ch.isOpen() && ch.isConnected();
    }
 由于此时还没有执行connect操作,所以isActive返回false,不会执行pipeline.fireChannelActive()

 

执行完此代码后,register0任务就执行完了,boss线程中的任务队列中仅剩下doConnect0任务。

 

2、 接下来分析doConnect0方法

 

//Bootstrap
private static void doConnect0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    if (localAddress == null) {
                        channel.connect(remoteAddress, promise);
                    } else {
                        channel.connect(remoteAddress, localAddress, promise);
                    }
                    promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
 channel.connect(remoteAddress, promise)调用AbstractChannel的方法,channel里会调用管道的方法

 

 

 

//AbstractChannel
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return pipeline.connect(remoteAddress, promise);
}

// DefaultChannelPipeline
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return tail.connect(remoteAddress, promise);
    }
 由于connect是一个Outbound事件,所以按照tail到head的顺序执行所有的outBound处理器。目前共有Head->EchoClientHandler->tail三个处理器,而只有Head是outbound处理器

 

 

 

//HeadHandler
public void connect(
                ChannelHandlerContext ctx,
                SocketAddress remoteAddress, SocketAddress localAddress,
                ChannelPromise promise) throws Exception {
            unsafe.connect(remoteAddress, localAddress, promise);
        }
 看一下AbstractNioUnsafe的connect方法

 

 

//AbstractNioUnsafe
public void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
            if (!ensureOpen(promise)) {
                return;
            }
            try {
                if (connectPromise != null) {
                    throw new IllegalStateException("connection attempt already made");
                }
                boolean wasActive = isActive();
                if (doConnect(remoteAddress, localAddress)) {
                    promise.setSuccess();
                    if (!wasActive && isActive()) {
                        pipeline().fireChannelActive();
                    }
                } else {
                    connectPromise = promise;
                    requestedRemoteAddress = remoteAddress;
                    // Schedule connect timeout.
                    int connectTimeoutMillis = config().getConnectTimeoutMillis();
                    if (connectTimeoutMillis > 0) {
                        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                ConnectTimeoutException cause =
                                        new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                    close(voidPromise());
                                }
                            }
                        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                    }
                    promise.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (future.isCancelled()) {
                                if (connectTimeoutFuture != null) {
                                    connectTimeoutFuture.cancel(false);
                                }
                                connectPromise = null;
                                close(voidPromise());
                            }
                        }
                    });
                }
            } catch (Throwable t) {
                if (t instanceof ConnectException) {
                    Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
                    newT.setStackTrace(t.getStackTrace());
                    t = newT;
                }
                closeIfClosed();
                promise.tryFailure(t);
            }
        }
 重点分析一下doConnect方法,由NioSocketChannel实现

 

//NioSocketChannel
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            javaChannel().socket().bind(localAddress);
        }

        boolean success = false;
        try {
            boolean connected = javaChannel().connect(remoteAddress);
            if (!connected) {
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }

 boolean connected = javaChannel().connect(remoteAddress)此处就向服务端发起了connect请求,准备三次握手。由于是非阻塞模式,所以该方法会立即返回。如果建立连接成功,则返回true,否则返回false,后续需要使用select来检测连接是否已建立成功。如果返回false,此种情况就需要将ops设置为SelectionKey.OP_CONNECT,等待connect的select事件通知,然后调用finishConnect方法。

    对于connect,会加一个超时调度任务,默认的超时时间是30s,超时后做什么操作,后续再分析吧。

 

3、 最后分析客户端线程NioEventLoop的select接收到connect事件后的处理逻辑

//NioEventLoop
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }
        try {
            int readyOps = k.readyOps();
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException e) {
            unsafe.close(unsafe.voidPromise());
        }
    }

 当readyOps包含SelectionKey.OP_CONNECT时,必须首先将OP_CONNECT从ops中去掉,然后调用unsafe.finishConnect()。

 

//AbstractUnsafe
public void finishConnect() {
            // Note this method is invoked by the event loop only if the connection attempt was
            // neither cancelled nor timed out.

            assert eventLoop().inEventLoop();
            assert connectPromise != null;

            try {
                boolean wasActive = isActive();
                doFinishConnect();
                connectPromise.setSuccess();
                if (!wasActive && isActive()) {
                    pipeline().fireChannelActive();
                }
            } catch (Throwable t) {
                if (t instanceof ConnectException) {
                    Throwable newT = new ConnectException(t.getMessage() + ": " + requestedRemoteAddress);
                    newT.setStackTrace(t.getStackTrace());
                    t = newT;
                }

                connectPromise.setFailure(t);
                closeIfClosed();
            } finally {
                // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
                // See https://github.com/netty/netty/issues/1770
                if (connectTimeoutFuture != null) {
                    connectTimeoutFuture.cancel(false);
                }
                connectPromise = null;
            }
        }

 doFinishConnect方法通过调用SocketChannel的finishConnect方法完成连接的建立,在NioSocketChannel中实现

//NioSocketChannel
protected void doFinishConnect() throws Exception {
        if (!javaChannel().finishConnect()) {
            throw new Error();
        }
    }

 此时,isActive()返回true,所以触发ChannelActive事件,该事件是一个inbound事件,所以Inbound的处理器可以通过实现channelActive方法来进行相应的操作,netty的自带例子中,EchoClientHandler实现该方法来开始向服务端写数据。

 

总结:从发起connect请求到请求建立先后共经历了以下几件事情:

  1. 创建套接字SocketChannel
  2. 设置套接字为非阻塞
  3. 设置channel当前感兴趣的事件为SelectionKey.OP_READ
  4. 创建作用于SocketChannel的管道Pipeline,该管道中此时的处理器链表为:Head(outbound)->tail(inbound)。
  5. 设置SocketChannel的options和attrs。
  6. 为管道增加一个Inbound处理器ChannelInitializer。经过此步骤后,管道中的处理器链表为:head(outbound)->ChannelInitializer(inbound)->tail(inbound)。注意ChannelInitializer的实现方法initChannel,里面会当channelRegisgered事件发生时将EchoClientHandler加入到管道中。
  7. 启动客户端线程,并将register0任务加入到线程的任务队列中。而register0任务做的事情为:将SocketChannel、0、注册到selector中并得到对应的selectionkey。然后通过回调,将doConnect0任务加入到线程的任务队列中。线程从启动到现在这段时间内,任务队列的变化如下:register0任务->register0任务,doConnect0任务-> doConnect0任务
  8. 通过channelRegistered事件,将EchoClientHandler加入到管道中,并移除ChannelInitializer,经过此步骤后,管道中的处理器链表为:head(outbound)-> EchoClientHandler (inbound)->tail(inbound)。管道从创建到现在这段时间内,处理器链表的变化历史为:head->tail,head->ChannelInitializer(inbound)->tail,head-> EchoClientHandler (inbound)->tail
  9. doConnect0任务会触发connect事件,connect是一个Outbound事件,headHandler通过调用AbstractNioUnsafe的方法向服务端发起connect请求,并设置ops为SelectionKey.OP_CONNECT
  10. 客户端线程NioEventLoop中的select接收到connect事件后,将SelectionKey.OP_CONNECT从ops中移除,然后调用finishConnect方法完成连接的建立。到此,connect就正式建立了。
  11. 最后触发ChannelActive事件。