欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

netty(十四)源码分析之Unsafe

程序员文章站 2024-03-14 21:56:29
...

Unsafe接口实际上是Channel接口的辅助接口,它不应该被用户代码直接调用。实际的I/O读写操作都是由Unsafe接口负责完成的。

Unsafe源码分析

实际的网络I/O操作基本都是由Unsafe功能类负责实现的,我们看下它的主要功能子类和重要的API实现。
netty(十四)源码分析之Unsafe

AbstractUnsafe源码分析(AbstractUnsafe是AbstractChannel的内部类)

1.register方法
register方法主要用于将当前Unsafe对应的Channel注册到EventLoop的多路复用器上,然后调用DefaultChannelPipeline的fireChannelRegistered方法。如果Channel被**,则调用DefaultChannelPipeline的fireChannelActive方法。
首先判断当前坐在线程是否是Channel对应的NioEventLoop线程,如果是同一个线程,则不存在多线程并发操作的问题,直接调用register0进行注册;如果是由用户线程或者其他线程发起的注册操作,则将注册操作封装成Runnable,放到NioEventLoop任务队列中执行。注意:如果之间诶执行register0,会存在多线程并发操作Channel的问题。
        public final void register(final ChannelPromise promise) {
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    promise.setFailure(t);
                }
            }
        }

下面继续看register0方法的实现,代码乳腺癌所示:
首先调用ensureOpen方法判断当前Channel是否打开,如果没有打开则无法注册,直接返回。校验通过后调用doRegister方法,它由AbstractNioUnsafe对应的AbstractNioChannel实现。
        private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!ensureOpen(promise)) {
                    return;
                }
                doRegister();
                registered = true;
                promise.setSuccess();
                pipeline.fireChannelRegistered();
                if (isActive()) {
                    pipeline.fireChannelActive();
                }

    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

该方法在前面的AbstractNioChannel源码分析中已经介绍过,此处不再赘述。如果doRegister方法没有抛出异常,则说明Channel注册成功,调用ChennelPipeline的fireChannelRegister方法,判断当前的Channel是否已经被**,则调用ChannelPipeline的fireChannelActive方法。
如果注册过程中发生了异常,则强制关闭连接,将异常堆栈信息设置到ChannelPipeline中。
2.bind方法
bind方法主要用于绑定指定的端口,对于服务端,用于绑定监听端口,可以设置backlog参数;对于客户端,主要用于指定客户端Channel的本地绑定Socket地址。代码实现如下:
            boolean wasActive = isActive();
            try {
                doBind(localAddress);
            } catch (Throwable t) {
                promise.setFailure(t);
                closeIfClosed();
                return;
            }
            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }
            promise.setSuccess();
        }

调用doBind方法,对于NioSocketChannel和NioServerSocketChannel有不同的实现,客户端的实现代码如下:
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        javaChannel().socket().bind(localAddress);
    }
服务端的doBind方法实现如下:
    protected void doBind(SocketAddress localAddress) throws Exception {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }

如果绑定本地端口发生异常,则将异常设置到ChannelPromise中用于通知ChannelFuture,随后调用closeIfClosed方法来关闭Channel。

3.disconnect方法
disconnect用于客户端或者服务端主动关闭连接,代码如下:
        @Override
        public final void disconnect(final ChannelPromise promise) {
            boolean wasActive = isActive();
            try {
                doDisconnect();
            } catch (Throwable t) {
                promise.setFailure(t);
                closeIfClosed();
                return;
            }
            if (wasActive && !isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelInactive();
                    }
                });
            }
            promise.setSuccess();
            closeIfClosed(); // doDisconnect() might have closed the channel
        }
4.close方法
在链路关闭之前需要首先判断是否处于刷新状态,如果处于刷新状态说明还有消息尚未发送出去,需要等到所有消息发送完成再关闭链路,因此,将关闭操作封装成Runnable稍后再执行。如下:
            if (inFlush0) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        close(promise);
                    }
                });
                return;
            }

如果链路没有处于刷新状态,需要从closeFuture中判断关闭操作是否完成,如果已经完成,
            if (closeFuture.isDone()) {
                // Closed already.
                promise.setSuccess();
                return;
            }
执行关闭操作,将消息发送缓冲数组设置为空,通知JVM进行内存回收。调用抽象方法doClose关闭链路。
            boolean wasActive = isActive();
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.

            try {
                doClose();
                closeFuture.setClosed();
                promise.setSuccess();
            } catch (Throwable t) {
                closeFuture.setClosed();
                promise.setFailure(t);
            }
如果关闭操作成功,设置ChannelPromise结果为成功。如果操作失败,则设置异常对象到ChannelPromise中。

调用ChannelOutboundBuffer的close方法释放缓冲区的消息,随后构造链路关闭通知Runnable放到NioEventLoop中执行。源码如下:
            try {
                outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
                outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
            } finally {

                if (wasActive && !isActive()) {
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireChannelInactive();
                        }
                    });
                }

                deregister();
            }
最后,调用deregister方法,将Channel从多路复用器上取消注册,代码实现如下:
    @Override
    protected void doDeregister() throws Exception {
        eventLoop().cancel(selectionKey());
    }
NioEventLoop的cancel方法实际将selectionKey对应的Channel从多路复用器上去注册,NioEventLoop的相关代码如下:
    void cancel(SelectionKey key) {
        key.cancel();
        cancelledKeys ++;
        if (cancelledKeys >= CLEANUP_INTERVAL) {
            cancelledKeys = 0;
            needsToSelectAgain = true;
        }
    }
5.write方法
write方法实际上将消息添加到环形发送数组中,并不是真正的写Channel,它的代码如下:

        @Override
        public void write(Object msg, ChannelPromise promise) {
            if (!isActive()) {
                // Mark the write request as failure if the channel is inactive.
                if (isOpen()) {
                    promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION);
                } else {
                    promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
                }
                // release message now to prevent resource-leak
                ReferenceCountUtil.release(msg);
            } else {
                outboundBuffer.addMessage(msg, promise);
            }
        }
如果Channel没有处于**状态,说明TCP链路还没有真正建立成功,当前Channel存在以下两种状态。
(1)Channel打开,但是TCP链路尚未建立成功:NOT_YETCONNECTED_EXCEPTION;
(2)Channel已经关闭:CLOSED_CHANNEL_EXCEPTION。
对链路状态进行判断,给ChannelPromise设置对应的异常,然后调用ReferenceCountUtil的release方法释放发送的msg对象。
如果链路状态正常,则将需要发送的msg和promise放入发送缓冲区中(环形数组)。

6.flush方法
flush方法负责将发送缓冲区中待发送的消息全部写入到Channel中,并发送给通信对方。
        @Override
        public void flush() {
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                return;
            }

            outboundBuffer.addFlush();
            flush0();
        }
首先将发送环形数组的unflushed指针修改为tail,标识本次要发送消息的缓冲区范围。然后调用flush0进行发送,由于flush0代码非常简单,我们重点分析doWrite方法,代码如下:
    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        for (;;) {
            // Do non-gathering write for a single buffer case.
            final int msgCount = in.size();
            if (msgCount <= 1) {
                super.doWrite(in);
                return;
            }
首先计算需要发送的消息的个数(unflushed-flush),如果只有1个消息需要发送,则调用父类的写操作,我们分析AbstractNioByteChannel的doWrite()方法,代码如下:
    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        int writeSpinCount = -1;

        for (;;) {
            Object msg = in.current(true);
            if (msg == null) {
                // Wrote all messages.
                clearOpWrite();
                break;
            }
因为只有一条消息需要发送,所以直接从ChannelOutboundBuffer中获取当前需要发送的消息,如下:
    public Object current(boolean preferDirect) {
        if (isEmpty()) {
            return null;
        } else {
            // TODO: Think of a smart way to handle ByteBufHolder messages
            Object msg = buffer[flushed].msg;
            if (threadLocalDirectBufferSize <= 0 || !preferDirect) {
                return msg;
            }
            if (msg instanceof ByteBuf) {
                ByteBuf buf = (ByteBuf) msg;
                if (buf.isDirect()) {
                    return buf;
                } else {
                    int readableBytes = buf.readableBytes();
                    if (readableBytes == 0) {
                        return buf;
                    }

                    // Non-direct buffers are copied into JDK's own internal direct buffer on every I/O.
                    // We can do a better job by using our pooled allocator. If the current allocator does not
                    // pool a direct buffer, we use a ThreadLocal based pool.
                    ByteBufAllocator alloc = channel.alloc();
                    ByteBuf directBuf;
                    if (alloc.isDirectBufferPooled()) {
                        directBuf = alloc.directBuffer(readableBytes);
                    } else {
                        directBuf = ThreadLocalPooledByteBuf.newInstance();
                    }

首先,获取需要发送的消息,如果消息为ByteBuf且它分配的是JDK的非堆内存,则直接返回。对返回的消息进行判断,如果为空,说明该消息已经发送完成并被回收,然后执行清空OP_WRITE操作位的clearOpWrite方法。

继续分析doWrite方法,如果需要发送的ByteBuf已经没有可写的字节了,在说明已经发送完成,将该消息从环形队列中删除,然后继续循环:
            if (msg instanceof ByteBuf) {
                ByteBuf buf = (ByteBuf) msg;
                int readableBytes = buf.readableBytes();
                if (readableBytes == 0) {
                    in.remove();
                    continue;
                }

AbstractNioUnsafe源码分析

AbstractNioUnsafe是AbstractUnsafe类的NIO实现,它主要实现了connect,finishConnect等方法,下面我们对API实现进行源码分析:
1.connect方法
首先获取当前的连接状态进行缓存,然后发起连接操作,doConnect()操作有三种可能的结果:
(1)连接成功,返回true;
(2)暂时没有连接上,服务端没有返回ACK应答,连接结果不确定,返回false;
(3)连接失败,直接抛出I/O异常。
如果是第二种结果,需要将NioSocketChannel中的selectionKey设置为OP_CONNECT,监听连接应答消息。

异步连接返回之后,需要判断连接结果,如果连接成功,则触发ChannelActive事件,代码如下:

        private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
            // trySuccess() will return false if a user cancelled the connection attempt.
            boolean promiseSet = promise.trySuccess();

            // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
            // because what happened is what happened.
            if (!wasActive && isActive()) {
                pipeline().fireChannelActive();
            }

            // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
            if (!promiseSet) {
                close(voidPromise());
            }
        }

这里对ChannelActive事件处理不再进行详细说明,它最终会将NioSocketChannel中的selectionKey设置为SelectionKey.OP_READ,用于监听网络读操作位。
如果没有立即连接上服务端,执行如下分支:

                    int connectTimeoutMillis = config().getConnectTimeoutMillis();
                    if (connectTimeoutMillis > 0) {
                        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                ConnectTimeoutException cause =
                                        new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                    close(voidPromise());
                                }
                            }
                        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                    }

                    promise.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (future.isCancelled()) {
                                if (connectTimeoutFuture != null) {
                                    connectTimeoutFuture.cancel(false);
                                }
                                connectPromise = null;
                                close(voidPromise());
                            }
                        }
                    });

上面的操作有两个目的。
(1)根据连接超时时间设置定时任务,超时时间到之后触发校验,如果发现连接并没有完成,则关闭连接句柄,释放资源,设置异常堆栈并发起去注册。
(2)设置连接结果监听器,如果接收到连接完成通知则判断连接是否被取消,如果被取消则关闭连接句柄,释放资源,发起取消注册操作。

2.finishConnect方法
客户端接收到服务端的TCP握手应答消息,通过SocketChannel的finishConnect方法对连接结果进行判断,代码如下:
        @Override
        public void finishConnect() {
            // Note this method is invoked by the event loop only if the connection attempt was
            // neither cancelled nor timed out.

            assert eventLoop().inEventLoop();
            assert connectPromise != null;

            try {
                boolean wasActive = isActive();
                doFinishConnect();

首先缓存连接状态,当前返回false,然后执行doFinishConnect方法判断连接结果,代码如下:
    protected void doFinishConnect() throws Exception {
        if(!this.javaChannel().finishConnect()) {
            throw new Error();
        }
    }

只要连接失败,就抛出Error(),由调用方执行句柄关闭等资源释放操作,如果返回成功,则执行fulfillConnectPromise方法,它负责将SocketChannel修改为监听读操作位,用来监听网络的读事件。
        private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
            // trySuccess() will return false if a user cancelled the connection attempt.
            boolean promiseSet = promise.trySuccess();

            // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
            // because what happened is what happened.
            if (!wasActive && isActive()) {
                pipeline().fireChannelActive();
            }

            // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
            if (!promiseSet) {
                close(voidPromise());
            }
        }

最后对连接超时进行判断,如果连接超时时仍然没有接收到服务端的ACK应答消息,则由定时任务关闭客户端连接,将SocketChannel从Reactor线程的多路复用器上摘除,释放资源:
            } finally {
                // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
                // See https://github.com/netty/netty/issues/1770
                if (connectTimeoutFuture != null) {
                    connectTimeoutFuture.cancel(false);
                }
                connectPromise = null;
            }

我们介绍了Netty最重要的接口之一————Channel的设计原理和功能列表,并对其主要实现子类NioSocketChannel和NioServerSocketChannel的源码进行了分析。
由于Channel的很多I/O操作都是通过其内部聚合的Unsafe接口及其子类实现的,如果不清楚Unsafe相关子类的代码实现,也就无法真正了解清楚Channel的实现。