欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty源码分析 (七)----- read过程 源码分析

程序员文章站 2022-10-26 08:54:32
在上一篇文章中,我们分析了processSelectedKey这个方法中的accept过程,本文将分析一下work线程中的read过程。 该方法主要是对SelectionKey k进行了检查,有如下几种不同的情况 1)OP_ACCEPT,接受客户端连接 2)OP_READ, 可读事件, 即 Chan ......

在上一篇文章中,我们分析了processselectedkey这个方法中的accept过程,本文将分析一下work线程中的read过程。

private static void processselectedkey(selectionkey k, abstractniochannel ch) {
    final niounsafe unsafe = ch.unsafe();
    //检查该selectionkey是否有效,如果无效,则关闭channel
    if (!k.isvalid()) {
        // close the channel if the key is not valid anymore
        unsafe.close(unsafe.voidpromise());
        return;
    }

    try {
        int readyops = k.readyops();
        // also check for readops of 0 to workaround possible jdk bug which may otherwise lead
        // to a spin loop
        // 如果准备好read或accept则触发unsafe.read() ,检查是否为0,如上面的源码英文注释所说:解决jdk可能会产生死循环的一个bug。
        if ((readyops & (selectionkey.op_read | selectionkey.op_accept)) != 0 || readyops == 0) {
            unsafe.read();
            if (!ch.isopen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件
                // connection already closed - no need to handle write.
                return;
            }
        }
        // 如果准备好了write则将缓冲区中的数据发送出去,如果缓冲区中数据都发送完成,则清除之前关注的op_write标记
        if ((readyops & selectionkey.op_write) != 0) {
            // call forceflush which will also take care of clear the op_write once there is nothing left to write
            ch.unsafe().forceflush();
        }
        // 如果是op_connect,则需要移除op_connect否则selector.select(timeout)将立即返回不会有任何阻塞,这样可能会出现cpu 100%
        if ((readyops & selectionkey.op_connect) != 0) {
            // remove op_connect as otherwise selector.select(..) will always return without blocking
            // see https://github.com/netty/netty/issues/924
            int ops = k.interestops();
            ops &= ~selectionkey.op_connect;
            k.interestops(ops);

            unsafe.finishconnect();
        }
    } catch (cancelledkeyexception ignored) {
        unsafe.close(unsafe.voidpromise());
    }
}

该方法主要是对selectionkey k进行了检查,有如下几种不同的情况

1)op_accept,接受客户端连接

2)op_read, 可读事件, 即 channel 中收到了新数据可供上层读取。

3)op_write, 可写事件, 即上层可以向 channel 写入数据。

4)op_connect, 连接建立事件, 即 tcp 连接已经建立, channel 处于 active 状态。

本篇博文主要来看下当work 线程 selector检测到op_read事件时,内部干了些什么。

if ((readyops & (selectionkey.op_read | selectionkey.op_accept)) != 0 || readyops == 0) {
    unsafe.read();
    if (!ch.isopen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件
        // connection already closed - no need to handle write.
        return;
    }
} 

从代码中可以看到,当selectionkey发生的事件是selectionkey.op_read,执行unsafe的read方法。注意这里的unsafe是niobyteunsafe的实例

为什么说这里的unsafe是niobyteunsafe的实例呢?在上篇博文netty源码分析:accept中我们知道boss nioeventloopgroup中的nioeventloop只负责accpt客户端连接,然后将该客户端注册到work nioeventloopgroup中的nioeventloop中,即最终是由work线程对应的selector来进行read等时间的监听,即work线程中的channel为socketchannel,socketchannel的unsafe就是niobyteunsafe的实例

下面来看下niobyteunsafe中的read方法

@override
    public void read() {
        final channelconfig config = config();
        if (!config.isautoread() && !isreadpending()) {
            // channelconfig.setautoread(false) was called in the meantime
            removereadop();
            return;
        }

        final channelpipeline pipeline = pipeline();
        final bytebufallocator allocator = config.getallocator();
        final int maxmessagesperread = config.getmaxmessagesperread();
        recvbytebufallocator.handle allochandle = this.allochandle;
        if (allochandle == null) {
            this.allochandle = allochandle = config.getrecvbytebufallocator().newhandle();
        }

        bytebuf bytebuf = null;
        int messages = 0;
        boolean close = false;
        try {
            int totalreadamount = 0;
            boolean readpendingreset = false;
            do {
                //1、分配缓存
                bytebuf = allochandle.allocate(allocator);
                int writable = bytebuf.writablebytes();//可写的字节容量
                //2、将socketchannel数据写入缓存
                int localreadamount = doreadbytes(bytebuf);
                if (localreadamount <= 0) {
                    // not was read release the buffer
                    bytebuf.release();
                    close = localreadamount < 0;
                    break;
                }
                if (!readpendingreset) {
                    readpendingreset = true;
                    setreadpending(false);
                }
                //3、触发pipeline的channelread事件来对bytebuf进行后续处理
                pipeline.firechannelread(bytebuf);
                bytebuf = null;

                if (totalreadamount >= integer.max_value - localreadamount) {
                    // avoid overflow.
                    totalreadamount = integer.max_value;
                    break;
                }

                totalreadamount += localreadamount;

                // stop reading
                if (!config.isautoread()) {
                    break;
                }

                if (localreadamount < writable) {
                    // read less than what the buffer can hold,
                    // which might mean we drained the recv buffer completely.
                    break;
                }
            } while (++ messages < maxmessagesperread);

            pipeline.firechannelreadcomplete();
            allochandle.record(totalreadamount);

            if (close) {
                closeonread(pipeline);
                close = false;
            }
        } catch (throwable t) {
            handlereadexception(pipeline, bytebuf, t, close);
        } finally {
            if (!config.isautoread() && !isreadpending()) {
                removereadop();
            }
        }
    }
} 

下面一一介绍比较重要的代码

allochandler的实例化过程

allochandle负责自适应调整当前缓存分配的大小,以防止缓存分配过多或过少,先看allochandler的实例化过程

recvbytebufallocator.handle allochandle = this.allochandle;
if (allochandle == null) {
    this.allochandle = allochandle = config.getrecvbytebufallocator().newhandle();
}

其中, config.getrecvbytebufallocator()得到的是一个 adaptiverecvbytebufallocator实例default。

public static final adaptiverecvbytebufallocator default = new adaptiverecvbytebufallocator();

而adaptiverecvbytebufallocator中的newhandler()方法的代码如下:

@override
public handle newhandle() {
    return new handleimpl(minindex, maxindex, initial);
}

handleimpl(int minindex, int maxindex, int initial) {
    this.minindex = minindex;
    this.maxindex = maxindex;

    index = getsizetableindex(initial);
    nextreceivebuffersize = size_table[index];
}

其中,上面方法中所用到参数:minindex maxindex initial是什么意思呢?含义如下:minindex是最小缓存在size_table中对应的下标。maxindex是最大缓存在size_table中对应的下标,initial为初始化缓存大小。

adaptiverecvbytebufallocator的相关常量字段

public class adaptiverecvbytebufallocator implements recvbytebufallocator {

        static final int default_minimum = 64;
        static final int default_initial = 1024;
        static final int default_maximum = 65536;

        private static final int index_increment = 4;
        private static final int index_decrement = 1;

        private static final int[] size_table; 

上面这些字段的具体含义说明如下:

1)、size_table:按照从小到大的顺序预先存储可以分配的缓存大小。 
从16开始,每次累加16,直到496,接着从512开始,每次增大一倍,直到溢出。size_table初始化过程如下。

static {
    list<integer> sizetable = new arraylist<integer>();
    for (int i = 16; i < 512; i += 16) {
        sizetable.add(i);
    }

    for (int i = 512; i > 0; i <<= 1) {
        sizetable.add(i);
    }

    size_table = new int[sizetable.size()];
    for (int i = 0; i < size_table.length; i ++) {
        size_table[i] = sizetable.get(i);
    }
}

2)、default_minimum:最小缓存(64),在size_table中对应的下标为3。

3)、default_maximum :最大缓存(65536),在size_table中对应的下标为38。

4)、default_initial :初始化缓存大小,第一次分配缓存时,由于没有上一次实际收到的字节数做参考,需要给一个默认初始值。

5)、index_increment:上次预估缓存偏小,下次index的递增值。

6)、index_decrement :上次预估缓存偏大,下次index的递减值。

构造函数:

private adaptiverecvbytebufallocator() {
    this(default_minimum, default_initial, default_maximum);
}

public adaptiverecvbytebufallocator(int minimum, int initial, int maximum) {
    if (minimum <= 0) {
        throw new illegalargumentexception("minimum: " + minimum);
    }
    if (initial < minimum) {
        throw new illegalargumentexception("initial: " + initial);
    }
    if (maximum < initial) {
        throw new illegalargumentexception("maximum: " + maximum);
    }

    int minindex = getsizetableindex(minimum);
    if (size_table[minindex] < minimum) {
        this.minindex = minindex + 1;
    } else {
        this.minindex = minindex;
    }

    int maxindex = getsizetableindex(maximum);
    if (size_table[maxindex] > maximum) {
        this.maxindex = maxindex - 1;
    } else {
        this.maxindex = maxindex;
    }

    this.initial = initial;
}

该构造函数对参数进行了有效性检查,然后初始化了如下3个字段,这3个字段就是上面用于产生allochandle对象所要用到的参数。

private final int minindex;
private final int maxindex;
private final int initial;

其中,getsizetableindex函数的代码如下,该函数的功能为:找到size_table中的元素刚好大于或等于size的位置。

private static int getsizetableindex(final int size) {
    for (int low = 0, high = size_table.length - 1;;) {
        if (high < low) {
            return low;
        }
        if (high == low) {
            return high;
        }

        int mid = low + high >>> 1;
        int a = size_table[mid];
        int b = size_table[mid + 1];
        if (size > b) {
            low = mid + 1;
        } else if (size < a) {
            high = mid - 1;
        } else if (size == a) {
            return mid;
        } else { //这里的情况就是 a < size <= b 的情况
            return mid + 1;
        }
    }
}

bytebuf = allochandle.allocate(allocator);

申请一块指定大小的内存

adaptiverecvbytebufallocator#handlerimpl

@override
public bytebuf allocate(bytebufallocator alloc) {
    return alloc.iobuffer(nextreceivebuffersize);
}

直接调用了iobuffer方法,继续看

abstractbytebufallocator.java

@override
public bytebuf iobuffer(int initialcapacity) {
    if (platformdependent.hasunsafe()) {
        return directbuffer(initialcapacity);
    }
    return heapbuffer(initialcapacity);
}

iobuffer函数中主要逻辑为:看平台是否支持unsafe,选择使用直接物理内存还是堆上内存。先看 heapbuffer

abstractbytebufallocator.java 

@override
public bytebuf heapbuffer(int initialcapacity) {
    return heapbuffer(initialcapacity, integer.max_value);
}

@override
public bytebuf heapbuffer(int initialcapacity, int maxcapacity) {
    if (initialcapacity == 0 && maxcapacity == 0) {
        return emptybuf;
    }
    validate(initialcapacity, maxcapacity);
    return newheapbuffer(initialcapacity, maxcapacity);
} 

这里的newheapbuffer有两种实现:至于具体用哪一种,取决于我们对系统属性io.netty.allocator.type的设置,如果设置为: “pooled”,则缓存分配器就为:pooledbytebufallocator,进而利用对象池技术进行内存分配。如果不设置或者设置为其他,则缓存分配器为:unpooledbytebufallocator,则直接返回一个unpooledheapbytebuf对象。

unpooledbytebufallocator.java

@override
protected bytebuf newheapbuffer(int initialcapacity, int maxcapacity) {
    return new unpooledheapbytebuf(this, initialcapacity, maxcapacity);
}

pooledbytebufallocator.java

@override
protected bytebuf newheapbuffer(int initialcapacity, int maxcapacity) {
    poolthreadcache cache = threadcache.get();
    poolarena<byte[]> heaparena = cache.heaparena;

    bytebuf buf;
    if (heaparena != null) {
        buf = heaparena.allocate(cache, initialcapacity, maxcapacity);
    } else {
        buf = new unpooledheapbytebuf(this, initialcapacity, maxcapacity);
    }

    return toleakawarebuffer(buf);
}

再看directbuffer

abstractbytebufallocator.java

@override
public bytebuf directbuffer(int initialcapacity) {
    return directbuffer(initialcapacity, integer.max_value);
}  

@override
public bytebuf directbuffer(int initialcapacity, int maxcapacity) {
    if (initialcapacity == 0 && maxcapacity == 0) {
        return emptybuf;
    }
    validate(initialcapacity, maxcapacity);//参数的有效性检查
    return newdirectbuffer(initialcapacity, maxcapacity);
}

与newheapbuffer一样,这里的newdirectbuffer方法也有两种实现:至于具体用哪一种,取决于我们对系统属性io.netty.allocator.type的设置,如果设置为: “pooled”,则缓存分配器就为:pooledbytebufallocator,进而利用对象池技术进行内存分配。如果不设置或者设置为其他,则缓存分配器为:unpooledbytebufallocator。这里主要看下unpooledbytebufallocator. newdirectbuffer的内部实现

unpooledbytebufallocator.java

@override
protected bytebuf newdirectbuffer(int initialcapacity, int maxcapacity) {
    bytebuf buf;
    if (platformdependent.hasunsafe()) {
        buf = new unpooledunsafedirectbytebuf(this, initialcapacity, maxcapacity);
    } else {
        buf = new unpooleddirectbytebuf(this, initialcapacity, maxcapacity);
    }

    return toleakawarebuffer(buf);
}

unpooledunsafedirectbytebuf是如何实现缓存管理的?对nio的bytebuffer进行了封装,通过bytebuffer的allocatedirect方法实现缓存的申请。

protected unpooledunsafedirectbytebuf(bytebufallocator alloc, int initialcapacity, int maxcapacity) {
    super(maxcapacity);
    //省略了部分参数检查的代码
    this.alloc = alloc;
    setbytebuffer(allocatedirect(initialcapacity));
}
protected bytebuffer allocatedirect(int initialcapacity) {
    return bytebuffer.allocatedirect(initialcapacity);
}

private void setbytebuffer(bytebuffer buffer) {
    bytebuffer oldbuffer = this.buffer;
    if (oldbuffer != null) {
        if (donotfree) {
            donotfree = false;
        } else {
            freedirect(oldbuffer);
        }
    }

    this.buffer = buffer;
    memoryaddress = platformdependent.directbufferaddress(buffer);
    tmpniobuf = null;
    capacity = buffer.remaining();
}

上面代码的主要逻辑为:

1、先利用bytebuffer的allocatedirect方法分配了大小为initialcapacity的缓存

2、然后判断将旧缓存给free掉

3、最后将新缓存赋给字段buffer上

其中:memoryaddress = platformdependent.directbufferaddress(buffer) 获取buffer的address字段值,指向缓存地址。
capacity = buffer.remaining() 获取缓存容量。

接下来看toleakawarebuffer(buf)方法

protected static bytebuf toleakawarebuffer(bytebuf buf) {
    resourceleak leak;
    switch (resourceleakdetector.getlevel()) {
        case simple:
            leak = abstractbytebuf.leakdetector.open(buf);
            if (leak != null) {
                buf = new simpleleakawarebytebuf(buf, leak);
            }
            break;
        case advanced:
        case paranoid:
            leak = abstractbytebuf.leakdetector.open(buf);
            if (leak != null) {
                buf = new advancedleakawarebytebuf(buf, leak);
            }
            break;
    }
    return buf;
}

方法toleakawarebuffer(buf)对申请的buf又进行了一次包装。

上面一长串的分析,得到了缓存后,回到abstractniobytechannel.read方法,继续看。

doreadbytes方法

下面看下doreadbytes方法:将socketchannel数据写入缓存。

niosocketchannel.java

@override
protected int doreadbytes(bytebuf bytebuf) throws exception {
    return bytebuf.writebytes(javachannel(), bytebuf.writablebytes());
}

将channel中的数据读入缓存bytebuf中。继续看

wrappedbytebuf.java

@override
public int writebytes(scatteringbytechannel in, int length) throws ioexception {
    return buf.writebytes(in, length);
} 

abstractbytebuf.java

@override
public int writebytes(scatteringbytechannel in, int length) throws ioexception {
    ensureaccessible();
    ensurewritable(length);
    int writtenbytes = setbytes(writerindex, in, length);
    if (writtenbytes > 0) {
        writerindex += writtenbytes;
    }
    return writtenbytes;
}

这里的setbytes方法有不同的实现,这里看下unpooledunsafedirectbytebuf的setbytes的实现。

unpooledunsafedirectbytebuf.java

@override
public int setbytes(int index, scatteringbytechannel in, int length) throws ioexception {
    ensureaccessible();
    bytebuffer tmpbuf = internalniobuffer();
    tmpbuf.clear().position(index).limit(index + length);
    try {
        return in.read(tmpbuf);
    } catch (closedchannelexception ignored) {
        return -1;//当channel 已经关闭,则返回-1.    
    }
} 

private bytebuffer internalniobuffer() {
    bytebuffer tmpniobuf = this.tmpniobuf;
    if (tmpniobuf == null) {
        this.tmpniobuf = tmpniobuf = buffer.duplicate();
    }
    return tmpniobuf;
}

最终底层采用bytebuffer实现read操作,无论是pooledbytebuf、还是unpooledxxxbuf,里面都将底层数据结构bufbuffer/array转换为bytebuffer 来实现read操作。即无论是unpooledxxxbuf还是pooledxxxbuf里面都有一个bytebuffer tmpniobuf,这个tmpniobuf才是真正用来存储从管道channel中读取出的内容的。到这里就完成了将channel的数据读入到了缓存buf中。

我们具体来看看 in.read(tmpbuf); filechannel和socketchannel的read最后都是依赖的ioutil来实现,代码如下

public int read(bytebuffer dst) throws ioexception {
    ensureopen();
    if (!readable)
        throw new nonreadablechannelexception();
    synchronized (positionlock) {
        int n = 0;
        int ti = -1;
        try {
            begin();
            ti = threads.add();
            if (!isopen())
                return 0;
            do {
                n = ioutil.read(fd, dst, -1, nd);
            } while ((n == iostatus.interrupted) && isopen());
            return iostatus.normalize(n);
        } finally {
            threads.remove(ti);
            end(n > 0);
            assert iostatus.check(n);
        }
    }
}

最后目的就是将socketchannel中的数据读出存放到bytebuffer dst我们看看 ioutil.read(fd, dst, -1, nd)

static int read(filedescriptor var0, bytebuffer var1, long var2, nativedispatcher var4) throws ioexception {
    if (var1.isreadonly()) {
        throw new illegalargumentexception("read-only buffer");
    //如果最终承载数据的buffer是directbuffer,则直接将数据读入到堆外内存中
    } else if (var1 instanceof directbuffer) {
        return readintonativebuffer(var0, var1, var2, var4);
    } else {
        // 分配临时的堆外内存
        bytebuffer var5 = util.gettemporarydirectbuffer(var1.remaining());

        int var7;
        try {
            // socket i/o 操作会将数据读入到堆外内存中
            int var6 = readintonativebuffer(var0, var5, var2, var4);
            var5.flip();
            if (var6 > 0) {
                // 将堆外内存的数据拷贝到堆内存中(用户定义的缓存,在jvm中分配内存)
                var1.put(var5);
            }

            var7 = var6;
        } finally {
            // 里面会调用directbuffer.cleaner().clean()来释放临时的堆外内存
            util.offerfirsttemporarydirectbuffer(var5);
        }

        return var7;
    }
}
通过上述实现可以看出,基于channel的数据读取步骤如下:
1、如果缓存内存是directbuffer,就直接将channel中的数据读取到堆外内存
2、如果缓存内存是堆内存,则先申请一块和缓存同大小的临时 directbytebuffer var5。
3、将内核缓存中的数据读到堆外缓存var5,底层由nativedispatcher的read实现。
4、把堆外缓存var5的数据拷贝到堆内存var1(用户定义的缓存,在jvm中分配内存)。
5、会调用directbuffer.cleaner().clean()来释放创建的临时的堆外内存
如果abstractniobytechannel.read中第一步创建的是堆外内存,则会直接将数据读入到堆外内存,并不会先创建临时堆外内存,再将数据读入到堆外内存,最后将堆外内存拷贝到堆内存
简单的说,如果使用堆外内存,则只会复制一次数据,如果使用堆内存,则会复制两次数据
我们来看看readintonativebuffer
private static int readintonativebuffer(filedescriptor filedescriptor, bytebuffer bytebuffer, long l, nativedispatcher nativedispatcher, object obj)  throws ioexception  {  
    int i = bytebuffer.position();  
    int j = bytebuffer.limit();  
    //如果断言开启,buffer的position大于limit,则抛出断言错误  
    if(!$assertionsdisabled && i > j)  
        throw new assertionerror();  
    //获取需要读的字节数  
    int k = i > j ? 0 : j - i;  
    if(k == 0)  
        return 0;  
    int i1 = 0;  
    //从输入流读取k个字节到buffer  
    if(l != -1l)  
        i1 = nativedispatcher.pread(filedescriptor, ((directbuffer)bytebuffer).address() + (long)i, k, l, obj);  
    else  
        i1 = nativedispatcher.read(filedescriptor, ((directbuffer)bytebuffer).address() + (long)i, k);  
    //重新定位buffer的position  
    if(i1 > 0)  
        bytebuffer.position(i + i1);  
    return i1;  
}  
这个函数就是将内核缓冲区中的数据读取到堆外缓存directbuffer

回到abstractniobytechannel.read方法,继续看。

@override
public void read() {
        //...
        try {
            int totalreadamount = 0;
            boolean readpendingreset = false;
            do {
                bytebuf = allochandle.allocate(allocator);
                int writable = bytebuf.writablebytes();
                int localreadamount = doreadbytes(bytebuf);
                if (localreadamount <= 0) {
                    // not was read release the buffer
                    bytebuf.release();
                    close = localreadamount < 0;
                    break;
                }
                if (!readpendingreset) {
                    readpendingreset = true;
                    setreadpending(false);
                }
                pipeline.firechannelread(bytebuf);
                bytebuf = null;

                if (totalreadamount >= integer.max_value - localreadamount) {
                    // avoid overflow.
                    totalreadamount = integer.max_value;
                    break;
                }

                totalreadamount += localreadamount;

                // stop reading
                if (!config.isautoread()) {
                    break;
                }

                if (localreadamount < writable) {
                    // read less than what the buffer can hold,
                    // which might mean we drained the recv buffer completely.
                    break;
                }
            } while (++ messages < maxmessagesperread);

            pipeline.firechannelreadcomplete();
            allochandle.record(totalreadamount);

            if (close) {
                closeonread(pipeline);
                close = false;
            }
        } catch (throwable t) {
            handlereadexception(pipeline, bytebuf, t, close);
        } finally {
            if (!config.isautoread() && !isreadpending()) {
                removereadop();
            }
        }
    }
}

int localreadamount = doreadbytes(bytebuf);
1、如果返回0,则表示没有读取到数据,则退出循环。
2、如果返回-1,表示对端已经关闭连接,则退出循环。
3、否则,表示读取到了数据,数据读入缓存后,触发pipeline的channelread事件,bytebuf作为参数进行后续处理,这时自定义inbound类型的handler就可以进行业务处理了。pipeline的事件处理在我之前的博文中有详细的介绍。处理完成之后,再一次从channel读取数据,直至退出循环。

4、循环次数超过maxmessagesperread时,即只能在管道中读取maxmessagesperread次数据,既是还没有读完也要退出。在上篇博文中,boss线程接受客户端连接也用到了此变量,即当boss线程 selector检测到op_accept事件后一次只能接受maxmessagesperread个客户端连接