netty 4源码分析-write

netty 4源码分析-write



  1. Write:将msg存储到ChannelOutboundBuffer中
  2. Flush:将msg从ChannelOutboundBuffer中flush到套接字的发送缓冲区中。


public ChannelFuture write(Object msg) {
        return write(msg, newPromise());
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        validatePromise(promise, true);
        write(msg, false, promise);
        return promise;
private void write(Object msg, boolean flush, ChannelPromise promise) {
        DefaultChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeWrite(msg, promise);
            if (flush) {
        } else {
            int size = channel.estimatorHandle().size(msg);
            if (size > 0) {
                ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
                // Check for null as it may be set to null if the channel is closed already
                if (buffer != null) {
                    buffer.incrementPendingOutboundBytes(size, false);
            executor.execute(WriteTask.newInstance(next, msg, size, flush, promise));


public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            unsafe.write(msg, promise);


// AbstractUnsafe
  public void write(Object msg, ChannelPromise promise) {
            if (!isActive()) {
                // Mark the write request as failure if the channel is inactive.
                if (isOpen()) {
                } else {
                // release message now to prevent resource-leak
            } else {
                outboundBuffer.addMessage(msg, promise);


// ChannelOutboundBuffer
private static final Recycler<ChannelOutboundBuffer> RECYCLER = new Recycler<ChannelOutboundBuffer>() {
        protected ChannelOutboundBuffer newObject(Handle handle) {
            return new ChannelOutboundBuffer(handle);

static ChannelOutboundBuffer newInstance(AbstractChannel channel) {
        ChannelOutboundBuffer buffer = RECYCLER.get();
        buffer.channel = channel;
        buffer.totalPendingSize = 0;
        buffer.writable = 1;
        return buffer;


private ChannelOutboundBuffer(Handle handle) {
        this.handle = handle;
        buffer = new Entry[INITIAL_CAPACITY];
        for (int i = 0; i < buffer.length; i++) {
            buffer[i] = new Entry();
        nioBuffers = new ByteBuffer[INITIAL_CAPACITY];


public abstract class Recycler<T> {
    private final ThreadLocal<Stack<T>> threadLocal = new ThreadLocal<Stack<T>>() {
        protected Stack<T> initialValue() {
            return new Stack<T>(Recycler.this, Thread.currentThread());
    public final T get() {
        Stack<T> stack = threadLocal.get();
        T o = stack.pop();
        if (o == null) {
            o = newObject(stack);
        return o;
    public final boolean recycle(T o, Handle handle) {
        Stack<T> stack = (Stack<T>) handle;
        if (stack.parent != this) {
            return false;
        if (Thread.currentThread() != stack.thread) {
            return false;
        return true;
    protected abstract T newObject(Handle handle);
    public interface Handle { }
    static final class Stack<T> implements Handle {
        private static final int INITIAL_CAPACITY = 256;
        final Recycler<T> parent;
        final Thread thread;
        private T[] elements;
        private int size;
        private final Map<T, Boolean> map = new IdentityHashMap<T, Boolean>(INITIAL_CAPACITY);
        @SuppressWarnings({ "unchecked", "SuspiciousArrayCast" })
        Stack(Recycler<T> parent, Thread thread) {
            this.parent = parent;
            this.thread = thread;
            elements = newArray(INITIAL_CAPACITY);
        T pop() {
            int size = this.size;
            if (size == 0) {
                return null;
            size --;
            T ret = elements[size];
            elements[size] = null;
            this.size = size;
            return ret;

        void push(T o) {
            if (map.put(o, Boolean.TRUE) != null) {
                throw new IllegalStateException("recycled already");

            int size = this.size;
            if (size == elements.length) {
                T[] newElements = newArray(size << 1);
                System.arraycopy(elements, 0, newElements, 0, size);
                elements = newElements;

            elements[size] = o;
            this.size = size + 1;

        @SuppressWarnings({ "unchecked", "SuspiciousArrayCast" })
        private static <T> T[] newArray(int length) {
            return (T[]) new Object[length];


// ChannelOutboundBuffer
     void addMessage(Object msg, ChannelPromise promise) {
        int size = channel.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        Entry e = buffer[tail++];
        e.msg = msg;
        e.pendingSize = size;
        e.promise = promise;
        e.total = total(msg);

        tail &= buffer.length - 1;

        if (tail == flushed) {
        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        incrementPendingOutboundBytes(size, true);

每次都会将msg作为一个Entry存储到buffer数组的tail位置,然后将tail自增1,自增后执行这行代码tail &= buffer.length – 1(譬如假设length为4,当已存储3个msg后,tail累加到4,和3执行与的结果得到0,因此下次的消息又重新存储到buffer的0位置)使得buffer数组可以循环存储。如果出现tail=flushed,说明空间不够,需要将数组扩容到原来大小的两倍.

incrementPendingOutboundBytes则会更新totalPendingSize,将其累加本次msg的大小。如果新的totalPendingSize超过了channel的高水位线writeBufferHighWaterMark(默认值为64 * 1024),则触发ChannelWritabilityChanged事件。(注意:如果网络很繁忙,套接字的发送缓冲区空间 不够,导致Msg不能及时从buffer中flush出去,那么不断的对channel执行write操作,会使得对数组不断地进行两倍扩容,最终导致OOM。所以最好在自己的Inbound处理器里捕获ChannelWritabilityChanged事件,然后调用channel的isWritable方法,根据结果来决定是否继续执行write操作)。

// ChannelOutboundBuffer
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
            AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");

 void incrementPendingOutboundBytes(int size, boolean fireEvent) {
        // Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets
        // recycled while process this method.
        Channel channel = this.channel;
        if (size == 0 || channel == null) {

        long oldValue = totalPendingSize;
        long newWriteBufferSize = oldValue + size;
        while (!TOTAL_PENDING_SIZE_UPDATER.compareAndSet(this, oldValue, newWriteBufferSize)) {
            oldValue = totalPendingSize;
            newWriteBufferSize = oldValue + size;

        int highWaterMark = channel.config().getWriteBufferHighWaterMark();

        if (newWriteBufferSize > highWaterMark) {
            if (WRITABLE_UPDATER.compareAndSet(this, 1, 0)) {
                if (fireEvent) {






