netty 4源码分析-write
本文为原创,转载请注明出处
netty 4源码分析-write
Netty的写操作由两个步骤组成:
- Write:将msg存储到ChannelOutboundBuffer中
- Flush:将msg从ChannelOutboundBuffer中flush到套接字的发送缓冲区中。
本文介绍第一个步骤write
//DefaultChannelHandlerContext
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
validatePromise(promise, true);
write(msg, false, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeWrite(msg, promise);
if (flush) {
next.invokeFlush();
}
} else {
int size = channel.estimatorHandle().size(msg);
if (size > 0) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size, false);
}
}
executor.execute(WriteTask.newInstance(next, msg, size, flush, promise));
}
}
Write是一个Outbound事件,所以会调用outbound处理器的write方法。下面分析headHandler的write方法。
//HeadHandler
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
里面会调用AbstractUnsafe的write方法
// AbstractUnsafe
public void write(Object msg, ChannelPromise promise) {
if (!isActive()) {
// Mark the write request as failure if the channel is inactive.
if (isOpen()) {
promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION);
} else {
promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
}
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
} else {
outboundBuffer.addMessage(msg, promise);
}
}
outboundBuffer是AbstractUnsafe使用的一种数据结构ChannelOutboundBuffer,用来存储待发送的消息。该数据结构在实例化AbstractUnsafe的同时被初始化:
// ChannelOutboundBuffer
private static final Recycler<ChannelOutboundBuffer> RECYCLER = new Recycler<ChannelOutboundBuffer>() {
@Override
protected ChannelOutboundBuffer newObject(Handle handle) {
return new ChannelOutboundBuffer(handle);
}
};
static ChannelOutboundBuffer newInstance(AbstractChannel channel) {
ChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
buffer.totalPendingSize = 0;
buffer.writable = 1;
return buffer;
}
ChannelOutboundBuffer的结构如下:
Buffer是用来存储msg的Entry结构数组,entry的结构如下:
ChannelOutboundBuffer实例化时,buffer数组的大小为32,nioBuffers数组的大小也为32.由于ChannelOutboundBuffer的实例化的代价实际上是很高的,看以下构造方法:
private ChannelOutboundBuffer(Handle handle) {
this.handle = handle;
buffer = new Entry[INITIAL_CAPACITY];
for (int i = 0; i < buffer.length; i++) {
buffer[i] = new Entry();
}
nioBuffers = new ByteBuffer[INITIAL_CAPACITY];
}
所以netty使用基于thread-local的轻量级对象池Recycler对ChannelOutboundBuffer进行回收。当ChannelOutboundBuffer第一次被实例化且使用完毕后,会回收到Recycler中(见下面的recyle方法),下次需要用时,直接从Recycler中取(见下面的get方法),避免了再次实例化和垃圾回收的开销。
public abstract class Recycler<T> {
private final ThreadLocal<Stack<T>> threadLocal = new ThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread());
}
};
public final T get() {
Stack<T> stack = threadLocal.get();
T o = stack.pop();
if (o == null) {
o = newObject(stack);
}
return o;
}
public final boolean recycle(T o, Handle handle) {
@SuppressWarnings("unchecked")
Stack<T> stack = (Stack<T>) handle;
if (stack.parent != this) {
return false;
}
if (Thread.currentThread() != stack.thread) {
return false;
}
stack.push(o);
return true;
}
protected abstract T newObject(Handle handle);
public interface Handle { }
static final class Stack<T> implements Handle {
private static final int INITIAL_CAPACITY = 256;
final Recycler<T> parent;
final Thread thread;
private T[] elements;
private int size;
private final Map<T, Boolean> map = new IdentityHashMap<T, Boolean>(INITIAL_CAPACITY);
@SuppressWarnings({ "unchecked", "SuspiciousArrayCast" })
Stack(Recycler<T> parent, Thread thread) {
this.parent = parent;
this.thread = thread;
elements = newArray(INITIAL_CAPACITY);
}
T pop() {
int size = this.size;
if (size == 0) {
return null;
}
size --;
T ret = elements[size];
elements[size] = null;
map.remove(ret);
this.size = size;
return ret;
}
void push(T o) {
if (map.put(o, Boolean.TRUE) != null) {
throw new IllegalStateException("recycled already");
}
int size = this.size;
if (size == elements.length) {
T[] newElements = newArray(size << 1);
System.arraycopy(elements, 0, newElements, 0, size);
elements = newElements;
}
elements[size] = o;
this.size = size + 1;
}
@SuppressWarnings({ "unchecked", "SuspiciousArrayCast" })
private static <T> T[] newArray(int length) {
return (T[]) new Object[length];
}
}
下面接着分析ChannelOutboundBuffer的addMessage方法。
// ChannelOutboundBuffer
void addMessage(Object msg, ChannelPromise promise) {
int size = channel.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
Entry e = buffer[tail++];
e.msg = msg;
e.pendingSize = size;
e.promise = promise;
e.total = total(msg);
tail &= buffer.length - 1;
if (tail == flushed) {
addCapacity();
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(size, true);
}
每次都会将msg作为一个Entry存储到buffer数组的tail位置,然后将tail自增1,自增后执行这行代码tail &= buffer.length – 1(譬如假设length为4,当已存储3个msg后,tail累加到4,和3执行与的结果得到0,因此下次的消息又重新存储到buffer的0位置)使得buffer数组可以循环存储。如果出现tail=flushed,说明空间不够,需要将数组扩容到原来大小的两倍.
incrementPendingOutboundBytes则会更新totalPendingSize,将其累加本次msg的大小。如果新的totalPendingSize超过了channel的高水位线writeBufferHighWaterMark(默认值为64 * 1024),则触发ChannelWritabilityChanged事件。(注意:如果网络很繁忙,套接字的发送缓冲区空间 不够,导致Msg不能及时从buffer中flush出去,那么不断的对channel执行write操作,会使得对数组不断地进行两倍扩容,最终导致OOM。所以最好在自己的Inbound处理器里捕获ChannelWritabilityChanged事件,然后调用channel的isWritable方法,根据结果来决定是否继续执行write操作)。
// ChannelOutboundBuffer
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
void incrementPendingOutboundBytes(int size, boolean fireEvent) {
// Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets
// recycled while process this method.
Channel channel = this.channel;
if (size == 0 || channel == null) {
return;
}
long oldValue = totalPendingSize;
long newWriteBufferSize = oldValue + size;
while (!TOTAL_PENDING_SIZE_UPDATER.compareAndSet(this, oldValue, newWriteBufferSize)) {
oldValue = totalPendingSize;
newWriteBufferSize = oldValue + size;
}
int highWaterMark = channel.config().getWriteBufferHighWaterMark();
if (newWriteBufferSize > highWaterMark) {
if (WRITABLE_UPDATER.compareAndSet(this, 1, 0)) {
if (fireEvent) {
channel.pipeline().fireChannelWritabilityChanged();
}
}
}
}
需要注意的是,该方法是线程安全的,采用了一个技巧,使用AtomicLongFieldUpdater来对totalPendingSize进行更新,实现CAS的效果,达到并发安全读写。相对于synchronized同步,AtomicLongFieldUpdater的开销是比较小的。
总结可以借鉴的几个点:
1、轻量级对象池的使用
2、buffer数组的循环存储
3、ChannelWritabilityChanged事件的触发
4、AtomicLongFieldUpdater的使用
上一篇: netty4源码分析-connect
下一篇: Hibernate关联关系映射
推荐阅读
-
Netty源码分析之核心线程处理
-
Netty源码分析-Bootstrap
-
netty源码分析系列——Bootstrap
-
Netty 4 源码分析——EventExecutor
-
commons-logging + log4j源码分析
-
Netty源码分析 (四)----- ChannelPipeline
-
Netty源码分析 (三)----- 服务端启动源码分析
-
SpringMVC源码分析4:DispatcherServlet如何找到正确的Controller
-
Netty源码分析之ChannelPipeline(二)—ChannelHandler的添加与删除
-
commons-logging + log4j源码分析