欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty源码分析 (八)----- write过程 源码分析

程序员文章站 2022-07-02 13:02:26
上一篇文章主要讲了netty的read过程,本文主要分析一下write和writeAndFlush。 主要内容 本文分以下几个部分阐述一个java对象最后是如何转变成字节流,写到socket缓冲区中去的 pipeline中的标准链表结构 一个标准的pipeline链式结构如下 数据从head节点流入 ......

上一篇文章主要讲了netty的read过程,本文主要分析一下write和writeandflush。

主要内容

本文分以下几个部分阐述一个java对象最后是如何转变成字节流,写到socket缓冲区中去的

  1. pipeline中的标准链表结构
  2. java对象编码过程
  3. write:写队列
  4. flush:刷新写队列
  5. writeandflush: 写队列并刷新

pipeline中的标准链表结构

一个标准的pipeline链式结构如下

Netty源码分析 (八)----- write过程 源码分析

数据从head节点流入,先拆包,然后解码成业务对象,最后经过业务handler处理,调用write,将结果对象写出去。而写的过程先通过tail节点,然后通过encoder节点将对象编码成bytebuf,最后将该bytebuf对象传递到head节点,调用底层的unsafe写到jdk底层管道

java对象编码过程

为什么我们在pipeline中添加了encoder节点,java对象就转换成netty可以处理的bytebuf,写到管道里?

我们先看下调用write的code

businesshandler

protected void channelread0(channelhandlercontext ctx, request request) throws exception {
    response response = dobusiness(request);

    if (response != null) {
        ctx.channel().write(response);
    }
}

业务处理器接受到请求之后,做一些业务处理,返回一个response,然后,response在pipeline中传递,落到 encoder节点,我们来跟踪一下 ctx.channel().write(response);

public channelfuture write(object msg) {
    return this.pipeline.write(msg);
}

调用了channel中的pipeline中的write方法,我们接着看

public final channelfuture write(object msg) {
    return this.tail.write(msg);
}

pipeline中有属性tail,调用tail中的write,由此我们知道write消息的时候,从tail开始,接着往下看

private void write(object msg, boolean flush, channelpromise promise) {
    abstractchannelhandlercontext next = this.findcontextoutbound();
    object m = this.pipeline.touch(msg, next);
    eventexecutor executor = next.executor();
    if (executor.ineventloop()) {
        if (flush) {
            next.invokewriteandflush(m, promise);
        } else {
            next.invokewrite(m, promise);
        }
    } else {
        object task;
        if (flush) {
            task = abstractchannelhandlercontext.writeandflushtask.newinstance(next, m, promise);
        } else {
            task = abstractchannelhandlercontext.writetask.newinstance(next, m, promise);
        }

        safeexecute(executor, (runnable)task, promise, m);
    }

}

中间我省略了几个重载的方法,我们来看看第一行代码,next = this.findcontextoutbound();

private abstractchannelhandlercontext findcontextoutbound() {
    abstractchannelhandlercontext ctx = this;

    do {
        ctx = ctx.prev;
    } while(!ctx.outbound);

    return ctx;
}

通过 ctx = ctx.prev; 我们知道从tail开始找到pipeline中的第一个outbound的handler,然后调用 invokewrite(m, promise),此时找到的第一个outbound的handler就是我们自定义的编码器encoder

我们接着看 next.invokewrite(m, promise);

private void invokewrite(object msg, channelpromise promise) {
    if (this.invokehandler()) {
        this.invokewrite0(msg, promise);
    } else {
        this.write(msg, promise);
    }

}
private void invokewrite0(object msg, channelpromise promise) {
    try {
        ((channeloutboundhandler)this.handler()).write(this, msg, promise);
    } catch (throwable var4) {
        notifyoutboundhandlerexception(var4, promise);
    }

}

一路代码跟下来,我们可以知道是调用了第一个outbound类型的handler中的write方法,也就是第一个调用的是我们自定义编码器encoder的write方法

我们来看看自定义encoder

public class encoder extends messagetobyteencoder<response> {
    @override
    protected void encode(channelhandlercontext ctx, response response, bytebuf out) throws exception {
        out.writebyte(response.getversion());
        out.writeint(4 + response.getdata().length);
        out.writebytes(response.getdata());
    }
}

自定义encoder继承 messagetobyteencoder ,并且重写了 encode方法,这就是编码器的核心,我们先来看 messagetobyteencoder

public abstract class messagetobyteencoder<i> extends channeloutboundhandleradapter {

我们看到 messagetobyteencoder 继承了 channeloutboundhandleradapter,说明了 encoder 是一个 outbound的handler

我们来看看 encoder 的父类 messagetobyteencoder中的write方法

messagetobyteencoder

@override
public void write(channelhandlercontext ctx, object msg, channelpromise promise) throws exception {
    bytebuf buf = null;
    try {
        // 判断当前handelr是否能处理写入的消息
        if (acceptoutboundmessage(msg)) {
            @suppresswarnings("unchecked")
            // 强制换换
            i cast = (i) msg;
            // 分配一段butebuf
            buf = allocatebuffer(ctx, cast, preferdirect);
            try {
            // 调用encode,这里就调回到  `encoder` 这个handelr中    
                encode(ctx, cast, buf);
            } finally {
                // 既然自定义java对象转换成bytebuf了,那么这个对象就已经无用了,释放掉
                // (当传入的msg类型是bytebuf的时候,就不需要自己手动释放了)
                referencecountutil.release(cast);
            }
            // 如果buf中写入了数据,就把buf传到下一个节点
            if (buf.isreadable()) {
                ctx.write(buf, promise);
            } else {
            // 否则,释放buf,将空数据传到下一个节点    
                buf.release();
                ctx.write(unpooled.empty_buffer, promise);
            }
            buf = null;
        } else {
            // 如果当前节点不能处理传入的对象,直接扔给下一个节点处理
            ctx.write(msg, promise);
        }
    } catch (encoderexception e) {
        throw e;
    } catch (throwable e) {
        throw new encoderexception(e);
    } finally {
        // 当buf在pipeline中处理完之后,释放
        if (buf != null) {
            buf.release();
        }
    }
}

这里,我们详细阐述一下encoder是如何处理传入的java对象的

1.判断当前handler是否能处理写入的消息,如果能处理,进入下面的流程,否则,直接扔给下一个节点处理
2.将对象强制转换成encoder可以处理的 response对象
3.分配一个bytebuf
4.调用encoder,即进入到 encoderencode方法,该方法是用户代码,用户将数据写入bytebuf
5.既然自定义java对象转换成bytebuf了,那么这个对象就已经无用了,释放掉,(当传入的msg类型是bytebuf的时候,就不需要自己手动释放了)
6.如果buf中写入了数据,就把buf传到下一个节点,否则,释放buf,将空数据传到下一个节点
7.最后,当buf在pipeline中处理完之后,释放节点

总结一点就是,encoder节点分配一个bytebuf,调用encode方法,将java对象根据自定义协议写入到bytebuf,然后再把bytebuf传入到下一个节点,在我们的例子中,最终会传入到head节点,因为head节点是一个outbount类型的handler

headcontext

public void write(channelhandlercontext ctx, object msg, channelpromise promise) throws exception {
    unsafe.write(msg, promise);
}

这里的msg就是前面在encoder节点中,载有java对象数据的自定义bytebuf对象,进入下一节

write:写队列

我们来看看channel中unsafe的write方法,先来看看其中的一个属性

abstractunsafe

protected abstract class abstractunsafe implements unsafe {
    private volatile channeloutboundbuffer outboundbuffer = new channeloutboundbuffer(abstractchannel.this);

我们来看看 channeloutboundbuffer 这个类

public final class channeloutboundbuffer {
    private final channel channel;
    private channeloutboundbuffer.entry flushedentry;
    private channeloutboundbuffer.entry unflushedentry;
    private channeloutboundbuffer.entry tailentry;

channeloutboundbuffer内部维护了一个entry链表,并使用entry封装msg。其中的属性我们下面会详细讲

我们回到正题,接着看 unsafe.write(msg, promise);

abstractunsafe

@override
public final void write(object msg, channelpromise promise) {
    asserteventloop();

    channeloutboundbuffer outboundbuffer = this.outboundbuffer;

    int size;
    try {
        msg = filteroutboundmessage(msg);
        size = pipeline.estimatorhandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (throwable t) {
        safesetfailure(promise, t);
        referencecountutil.release(msg);
        return;
    }

    outboundbuffer.addmessage(msg, size, promise);
}

1.调用 filteroutboundmessage() 方法,将待写入的对象过滤,把非bytebuf对象和fileregion过滤,把所有的非直接内存转换成直接内存directbuffer

@override
protected final object filteroutboundmessage(object msg) {
    if (msg instanceof bytebuf) {
        bytebuf buf = (bytebuf) msg;
        if (buf.isdirect()) {
            return msg;
        }

        return newdirectbuffer(buf);
    }

    if (msg instanceof fileregion) {
        return msg;
    }

    throw new unsupportedoperationexception(
            "unsupported message type: " + stringutil.simpleclassname(msg) + expected_types);
}

2.接下来,估算出需要写入的bytebuf的size
3.最后,调用 channeloutboundbuffer 的addmessage(msg, size, promise) 方法,所以,接下来,我们需要重点看一下这个方法干了什么事情

channeloutboundbuffer

public void addmessage(object msg, int size, channelpromise promise) {
    // 创建一个待写出的消息节点
    entry entry = entry.newinstance(msg, size, total(msg), promise);
    if (tailentry == null) {
        flushedentry = null;
        tailentry = entry;
    } else {
        entry tail = tailentry;
        tail.next = entry;
        tailentry = entry;
    }
    if (unflushedentry == null) {
        unflushedentry = entry;
    }

    incrementpendingoutboundbytes(size, false);
}

想要理解上面这段代码,必须得掌握写缓存中的几个消息指针,如下图

Netty源码分析 (八)----- write过程 源码分析

channeloutboundbuffer 里面的数据结构是一个单链表结构,每个节点是一个 entryentry 里面包含了待写出bytebuf 以及消息回调 promise,下面分别是三个指针的作用

1.flushedentry 指针表示第一个被写到操作系统socket缓冲区中的节点
2.unflushedentry 指针表示第一个未被写入到操作系统socket缓冲区中的节点
3.tailentry指针表示channeloutboundbuffer缓冲区的最后一个节点

初次调用 addmessage 之后,各个指针的情况为

Netty源码分析 (八)----- write过程 源码分析

fushedentry指向空,unfushedentry和 tailentry 都指向新加入的节点

第二次调用 addmessage之后,各个指针的情况为

Netty源码分析 (八)----- write过程 源码分析

第n次调用 addmessage之后,各个指针的情况为

Netty源码分析 (八)----- write过程 源码分析

可以看到,调用n次addmessage,flushedentry指针一直指向null,表示现在还未有节点需要写出到socket缓冲区,而unfushedentry之后有n个节点,表示当前还有n个节点尚未写出到socket缓冲区中去

flush:刷新写队列

不管调用channel.flush(),还是ctx.flush(),最终都会落地到pipeline中的head节点

headcontext

@override
public void flush(channelhandlercontext ctx) throws exception {
    unsafe.flush();
}

之后进入到abstractunsafe

abstractunsafe

public final void flush() {
   asserteventloop();

   channeloutboundbuffer outboundbuffer = this.outboundbuffer;
   if (outboundbuffer == null) {
       return;
   }

   outboundbuffer.addflush();
   flush0();
}

flush方法中,先调用 outboundbuffer.addflush();

channeloutboundbuffer

public void addflush() {
    entry entry = unflushedentry;
    if (entry != null) {
        if (flushedentry == null) {
            flushedentry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setuncancellable()) {
                int pending = entry.cancel();
                decrementpendingoutboundbytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);
        unflushedentry = null;
    }
}

可以结合前面的图来看,首先拿到 unflushedentry 指针,然后将 flushedentry 指向unflushedentry所指向的节点,调用完毕之后,三个指针的情况如下所示

Netty源码分析 (八)----- write过程 源码分析

 

相当于所有的节点都即将开始推送出去

接下来,调用 flush0();

abstractunsafe

protected void flush0() {
    dowrite(outboundbuffer);
}

发现这里的核心代码就一个 dowrite,继续跟

abstractniobytechannel

protected void dowrite(channeloutboundbuffer in) throws exception {
    int writespincount = -1;

    boolean setopwrite = false;
    for (;;) {
        // 拿到第一个需要flush的节点的数据
        object msg = in.current();

        if (msg instanceof bytebuf) {
            // 强转为bytebuf,若发现没有数据可读,直接删除该节点
            bytebuf buf = (bytebuf) msg;

            boolean done = false;
            long flushedamount = 0;
            // 拿到自旋锁迭代次数
            if (writespincount == -1) {
                writespincount = config().getwritespincount();
            }
            // 自旋,将当前节点写出
            for (int i = writespincount - 1; i >= 0; i --) {
                int localflushedamount = dowritebytes(buf);
                if (localflushedamount == 0) {
                    setopwrite = true;
                    break;
                }

                flushedamount += localflushedamount;
                if (!buf.isreadable()) {
                    done = true;
                    break;
                }
            }

            in.progress(flushedamount);

            // 写完之后,将当前节点删除
            if (done) {
                in.remove();
            } else {
                break;
            }
        } 
    }
}

这里略微有点复杂,我们分析一下

1.第一步,调用current()先拿到第一个需要flush的节点的数据

 channeloutboundbuffer

public object current() {
    entry entry = flushedentry;
    if (entry == null) {
        return null;
    }

    return entry.msg;
}

2.第二步,拿到自旋锁的迭代次数

if (writespincount == -1) {
    writespincount = config().getwritespincount();
}

3.自旋的方式将bytebuf写出到jdk nio的channel

for (int i = writespincount - 1; i >= 0; i --) {
    int localflushedamount = dowritebytes(buf);
    if (localflushedamount == 0) {
        setopwrite = true;
        break;
    }

    flushedamount += localflushedamount;
    if (!buf.isreadable()) {
        done = true;
        break;
    }
}

dowritebytes 方法跟进去

protected int dowritebytes(bytebuf buf) throws exception {
    final int expectedwrittenbytes = buf.readablebytes();
    return buf.readbytes(javachannel(), expectedwrittenbytes);
}

我们发现,出现了 javachannel(),表明已经进入到了jdk nio channel的领域,我们来看看 buf.readbytes(javachannel(), expectedwrittenbytes);

public int readbytes(gatheringbytechannel out, int length) throws ioexception {
    this.checkreadablebytes(length);
    int readbytes = this.getbytes(this.readerindex, out, length);
    this.readerindex += readbytes;
    return readbytes;
}

我们来看关键代码 this.getbytes(this.readerindex, out, length)

private int getbytes(int index, gatheringbytechannel out, int length, boolean internal) throws ioexception {
    this.checkindex(index, length);
    if (length == 0) {
        return 0;
    } else {
        bytebuffer tmpbuf;
        if (internal) {
            tmpbuf = this.internalniobuffer();
        } else {
            tmpbuf = ((bytebuffer)this.memory).duplicate();
        }

        index = this.idx(index);
        tmpbuf.clear().position(index).limit(index + length);
        //将tmpbuf中的数据写到out中
        return out.write(tmpbuf);
    }
}

我们来看看out.write(tmpbuf)

public int write(bytebuffer src) throws ioexception {
    ensureopen();
    if (!writable)
        throw new nonwritablechannelexception();
    synchronized (positionlock) {
        int n = 0;
        int ti = -1;
        try {
            begin();
            ti = threads.add();
            if (!isopen())
                return 0;
            do {
                n = ioutil.write(fd, src, -1, nd);
            } while ((n == iostatus.interrupted) && isopen());
            return iostatus.normalize(n);
        } finally {
            threads.remove(ti);
            end(n > 0);
            assert iostatus.check(n);
        }
    }
}

和read实现一样,socketchannelimpl的write方法通过ioutil的write实现:关键代码 n = ioutil.write(fd, src, -1, nd);

static int write(filedescriptor var0, bytebuffer var1, long var2, nativedispatcher var4) throws ioexception {
    //如果是directbuffer,直接写,将堆外缓存中的数据拷贝到内核缓存中进行发送
    if (var1 instanceof directbuffer) {
        return writefromnativebuffer(var0, var1, var2, var4);
    } else {
        //非directbuffer
        //获取已经读取到的位置
        int var5 = var1.position();
        //获取可以读到的位置
        int var6 = var1.limit();

        assert var5 <= var6;
        //申请一个原buffer可读大小的directbytebuffer
        int var7 = var5 <= var6 ? var6 - var5 : 0;
        bytebuffer var8 = util.gettemporarydirectbuffer(var7);

        int var10;
        try {

            var8.put(var1);
            var8.flip();
            var1.position(var5);
            //通过directbuffer写,将堆外缓存的数据拷贝到内核缓存中进行发送
            int var9 = writefromnativebuffer(var0, var8, var2, var4);
            if (var9 > 0) {
                var1.position(var5 + var9);
            }

            var10 = var9;
        } finally {
            //回收分配的directbytebuffer
            util.offerfirsttemporarydirectbuffer(var8);
        }

        return var10;
    }
}

代码逻辑我们就不再讲了,代码注释已经很清楚了,这里我们关注一点,我们可以看看我们前面的一个方法 filteroutboundmessage(),将待写入的对象过滤,把非bytebuf对象和fileregion过滤,把所有的非直接内存转换成直接内存directbuffer

说明到了这一步所有的 var1 意境是直接内存directbuffer,就不需要走到else,就不需要write两次了

4.删除该节点

节点的数据已经写入完毕,接下来就需要删除该节点

channeloutboundbuffer

public boolean remove() {
    entry e = flushedentry;
    object msg = e.msg;

    channelpromise promise = e.promise;
    int size = e.pendingsize;

    removeentry(e);

    if (!e.cancelled) {
        referencecountutil.saferelease(msg);
        safesuccess(promise);
    }

    // recycle the entry
    e.recycle();

    return true;
}

首先拿到当前被flush掉的节点(flushedentry所指),然后拿到该节点的回调对象 channelpromise, 调用 removeentry()方法移除该节点

private void removeentry(entry e) {
    if (-- flushed == 0) {
        flushedentry = null;
        if (e == tailentry) {
            tailentry = null;
            unflushedentry = null;
        }
    } else {
        flushedentry = e.next;
    }
}

这里的remove是逻辑移除,只是将flushedentry指针移到下个节点,调用完毕之后,节点图示如下

Netty源码分析 (八)----- write过程 源码分析

writeandflush: 写队列并刷新

理解了write和flush这两个过程,writeandflush 也就不难了

public final channelfuture writeandflush(object msg) {
    return tail.writeandflush(msg);
}

public channelfuture writeandflush(object msg) {
    return writeandflush(msg, newpromise());
}

public channelfuture writeandflush(object msg, channelpromise promise) {
    write(msg, true, promise);

    return promise;
}

private void write(object msg, boolean flush, channelpromise promise) {
    abstractchannelhandlercontext next = findcontextoutbound();
    eventexecutor executor = next.executor();
    if (executor.ineventloop()) {
        if (flush) {
            next.invokewriteandflush(m, promise);
        } else {
            next.invokewrite(m, promise);
        }
    } 
}

可以看到,最终,通过一个boolean变量,表示是调用 invokewriteandflush,还是 invokewriteinvokewrite便是我们上文中的write过程

private void invokewriteandflush(object msg, channelpromise promise) {
    invokewrite0(msg, promise);
    invokeflush0();
}

可以看到,最终调用的底层方法和单独调用 write 和 flush 是一样的

private void invokewrite(object msg, channelpromise promise) {
        invokewrite0(msg, promise);
}

private void invokeflush(object msg, channelpromise promise) {
        invokeflush0(msg, promise);
}

由此看来,invokewriteandflush基本等价于write方法之后再来一次flush

总结

1.pipeline中的编码器原理是创建一个bytebuf,将java对象转换为bytebuf,然后再把bytebuf继续向前传递
2.调用write方法并没有将数据写到socket缓冲区中,而是写到了一个单向链表的数据结构中,flush才是真正的写出
3.writeandflush等价于先将数据写到netty的缓冲区,再将netty缓冲区中的数据写到socket缓冲区中,写的过程与并发编程类似,用自旋锁保证写成功
4.netty中的缓冲区中的bytebuf为directbytebuf