netty(十四)源码分析之Unsafe
程序员文章站
2024-03-14 21:56:29
...
Unsafe接口实际上是Channel接口的辅助接口,它不应该被用户代码直接调用。实际的I/O读写操作都是由Unsafe接口负责完成的。
Unsafe源码分析
实际的网络I/O操作基本都是由Unsafe功能类负责实现的,我们看下它的主要功能子类和重要的API实现。
AbstractUnsafe源码分析(AbstractUnsafe是AbstractChannel的内部类)
1.register方法
register方法主要用于将当前Unsafe对应的Channel注册到EventLoop的多路复用器上,然后调用DefaultChannelPipeline的fireChannelRegistered方法。如果Channel被**,则调用DefaultChannelPipeline的fireChannelActive方法。
首先判断当前坐在线程是否是Channel对应的NioEventLoop线程,如果是同一个线程,则不存在多线程并发操作的问题,直接调用register0进行注册;如果是由用户线程或者其他线程发起的注册操作,则将注册操作封装成Runnable,放到NioEventLoop任务队列中执行。注意:如果之间诶执行register0,会存在多线程并发操作Channel的问题。
public final void register(final ChannelPromise promise) {
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
promise.setFailure(t);
}
}
}
下面继续看register0方法的实现,代码乳腺癌所示:
首先调用ensureOpen方法判断当前Channel是否打开,如果没有打开则无法注册,直接返回。校验通过后调用doRegister方法,它由AbstractNioUnsafe对应的AbstractNioChannel实现。
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!ensureOpen(promise)) {
return;
}
doRegister();
registered = true;
promise.setSuccess();
pipeline.fireChannelRegistered();
if (isActive()) {
pipeline.fireChannelActive();
}
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
该方法在前面的AbstractNioChannel源码分析中已经介绍过,此处不再赘述。如果doRegister方法没有抛出异常,则说明Channel注册成功,调用ChennelPipeline的fireChannelRegister方法,判断当前的Channel是否已经被**,则调用ChannelPipeline的fireChannelActive方法。
如果注册过程中发生了异常,则强制关闭连接,将异常堆栈信息设置到ChannelPipeline中。
2.bind方法
bind方法主要用于绑定指定的端口,对于服务端,用于绑定监听端口,可以设置backlog参数;对于客户端,主要用于指定客户端Channel的本地绑定Socket地址。代码实现如下:
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
promise.setFailure(t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
promise.setSuccess();
}
调用doBind方法,对于NioSocketChannel和NioServerSocketChannel有不同的实现,客户端的实现代码如下:
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress);
}
服务端的doBind方法实现如下: protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
如果绑定本地端口发生异常,则将异常设置到ChannelPromise中用于通知ChannelFuture,随后调用closeIfClosed方法来关闭Channel。
3.disconnect方法
disconnect用于客户端或者服务端主动关闭连接,代码如下:
@Override
public final void disconnect(final ChannelPromise promise) {
boolean wasActive = isActive();
try {
doDisconnect();
} catch (Throwable t) {
promise.setFailure(t);
closeIfClosed();
return;
}
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}
promise.setSuccess();
closeIfClosed(); // doDisconnect() might have closed the channel
}
4.close方法在链路关闭之前需要首先判断是否处于刷新状态,如果处于刷新状态说明还有消息尚未发送出去,需要等到所有消息发送完成再关闭链路,因此,将关闭操作封装成Runnable稍后再执行。如下:
if (inFlush0) {
invokeLater(new Runnable() {
@Override
public void run() {
close(promise);
}
});
return;
}
如果链路没有处于刷新状态,需要从closeFuture中判断关闭操作是否完成,如果已经完成,
if (closeFuture.isDone()) {
// Closed already.
promise.setSuccess();
return;
}
执行关闭操作,将消息发送缓冲数组设置为空,通知JVM进行内存回收。调用抽象方法doClose关闭链路。 boolean wasActive = isActive();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
try {
doClose();
closeFuture.setClosed();
promise.setSuccess();
} catch (Throwable t) {
closeFuture.setClosed();
promise.setFailure(t);
}
如果关闭操作成功,设置ChannelPromise结果为成功。如果操作失败,则设置异常对象到ChannelPromise中。调用ChannelOutboundBuffer的close方法释放缓冲区的消息,随后构造链路关闭通知Runnable放到NioEventLoop中执行。源码如下:
try {
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
} finally {
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}
deregister();
}
最后,调用deregister方法,将Channel从多路复用器上取消注册,代码实现如下: @Override
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
}
NioEventLoop的cancel方法实际将selectionKey对应的Channel从多路复用器上去注册,NioEventLoop的相关代码如下: void cancel(SelectionKey key) {
key.cancel();
cancelledKeys ++;
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
needsToSelectAgain = true;
}
}
5.write方法write方法实际上将消息添加到环形发送数组中,并不是真正的写Channel,它的代码如下:
@Override
public void write(Object msg, ChannelPromise promise) {
if (!isActive()) {
// Mark the write request as failure if the channel is inactive.
if (isOpen()) {
promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION);
} else {
promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
}
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
} else {
outboundBuffer.addMessage(msg, promise);
}
}
如果Channel没有处于**状态,说明TCP链路还没有真正建立成功,当前Channel存在以下两种状态。(1)Channel打开,但是TCP链路尚未建立成功:NOT_YETCONNECTED_EXCEPTION;
(2)Channel已经关闭:CLOSED_CHANNEL_EXCEPTION。
对链路状态进行判断,给ChannelPromise设置对应的异常,然后调用ReferenceCountUtil的release方法释放发送的msg对象。
如果链路状态正常,则将需要发送的msg和promise放入发送缓冲区中(环形数组)。
6.flush方法
6.flush方法
flush方法负责将发送缓冲区中待发送的消息全部写入到Channel中,并发送给通信对方。
@Override
public void flush() {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
首先将发送环形数组的unflushed指针修改为tail,标识本次要发送消息的缓冲区范围。然后调用flush0进行发送,由于flush0代码非常简单,我们重点分析doWrite方法,代码如下: @Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
// Do non-gathering write for a single buffer case.
final int msgCount = in.size();
if (msgCount <= 1) {
super.doWrite(in);
return;
}
首先计算需要发送的消息的个数(unflushed-flush),如果只有1个消息需要发送,则调用父类的写操作,我们分析AbstractNioByteChannel的doWrite()方法,代码如下: @Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
for (;;) {
Object msg = in.current(true);
if (msg == null) {
// Wrote all messages.
clearOpWrite();
break;
}
因为只有一条消息需要发送,所以直接从ChannelOutboundBuffer中获取当前需要发送的消息,如下: public Object current(boolean preferDirect) {
if (isEmpty()) {
return null;
} else {
// TODO: Think of a smart way to handle ByteBufHolder messages
Object msg = buffer[flushed].msg;
if (threadLocalDirectBufferSize <= 0 || !preferDirect) {
return msg;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return buf;
} else {
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
return buf;
}
// Non-direct buffers are copied into JDK's own internal direct buffer on every I/O.
// We can do a better job by using our pooled allocator. If the current allocator does not
// pool a direct buffer, we use a ThreadLocal based pool.
ByteBufAllocator alloc = channel.alloc();
ByteBuf directBuf;
if (alloc.isDirectBufferPooled()) {
directBuf = alloc.directBuffer(readableBytes);
} else {
directBuf = ThreadLocalPooledByteBuf.newInstance();
}
首先,获取需要发送的消息,如果消息为ByteBuf且它分配的是JDK的非堆内存,则直接返回。对返回的消息进行判断,如果为空,说明该消息已经发送完成并被回收,然后执行清空OP_WRITE操作位的clearOpWrite方法。
继续分析doWrite方法,如果需要发送的ByteBuf已经没有可写的字节了,在说明已经发送完成,将该消息从环形队列中删除,然后继续循环:
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
continue;
}
AbstractNioUnsafe源码分析
AbstractNioUnsafe是AbstractUnsafe类的NIO实现,它主要实现了connect,finishConnect等方法,下面我们对API实现进行源码分析:
1.connect方法
首先获取当前的连接状态进行缓存,然后发起连接操作,doConnect()操作有三种可能的结果:
(1)连接成功,返回true;
(2)暂时没有连接上,服务端没有返回ACK应答,连接结果不确定,返回false;
(3)连接失败,直接抛出I/O异常。
如果是第二种结果,需要将NioSocketChannel中的selectionKey设置为OP_CONNECT,监听连接应答消息。
异步连接返回之后,需要判断连接结果,如果连接成功,则触发ChannelActive事件,代码如下:
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
这里对ChannelActive事件处理不再进行详细说明,它最终会将NioSocketChannel中的selectionKey设置为SelectionKey.OP_READ,用于监听网络读操作位。
如果没有立即连接上服务端,执行如下分支:
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
上面的操作有两个目的。
(1)根据连接超时时间设置定时任务,超时时间到之后触发校验,如果发现连接并没有完成,则关闭连接句柄,释放资源,设置异常堆栈并发起去注册。
(2)设置连接结果监听器,如果接收到连接完成通知则判断连接是否被取消,如果被取消则关闭连接句柄,释放资源,发起取消注册操作。
2.finishConnect方法
客户端接收到服务端的TCP握手应答消息,通过SocketChannel的finishConnect方法对连接结果进行判断,代码如下:
@Override
public void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
assert eventLoop().inEventLoop();
assert connectPromise != null;
try {
boolean wasActive = isActive();
doFinishConnect();
首先缓存连接状态,当前返回false,然后执行doFinishConnect方法判断连接结果,代码如下:
protected void doFinishConnect() throws Exception {
if(!this.javaChannel().finishConnect()) {
throw new Error();
}
}
只要连接失败,就抛出Error(),由调用方执行句柄关闭等资源释放操作,如果返回成功,则执行fulfillConnectPromise方法,它负责将SocketChannel修改为监听读操作位,用来监听网络的读事件。
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
最后对连接超时进行判断,如果连接超时时仍然没有接收到服务端的ACK应答消息,则由定时任务关闭客户端连接,将SocketChannel从Reactor线程的多路复用器上摘除,释放资源:
} finally {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
我们介绍了Netty最重要的接口之一————Channel的设计原理和功能列表,并对其主要实现子类NioSocketChannel和NioServerSocketChannel的源码进行了分析。
由于Channel的很多I/O操作都是通过其内部聚合的Unsafe接口及其子类实现的,如果不清楚Unsafe相关子类的代码实现,也就无法真正了解清楚Channel的实现。