Netty源码分析 (七)----- read过程 源码分析
在上一篇文章中,我们分析了processselectedkey这个方法中的accept过程,本文将分析一下work线程中的read过程。
private static void processselectedkey(selectionkey k, abstractniochannel ch) { final niounsafe unsafe = ch.unsafe(); //检查该selectionkey是否有效,如果无效,则关闭channel if (!k.isvalid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidpromise()); return; } try { int readyops = k.readyops(); // also check for readops of 0 to workaround possible jdk bug which may otherwise lead // to a spin loop // 如果准备好read或accept则触发unsafe.read() ,检查是否为0,如上面的源码英文注释所说:解决jdk可能会产生死循环的一个bug。 if ((readyops & (selectionkey.op_read | selectionkey.op_accept)) != 0 || readyops == 0) { unsafe.read(); if (!ch.isopen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件 // connection already closed - no need to handle write. return; } } // 如果准备好了write则将缓冲区中的数据发送出去,如果缓冲区中数据都发送完成,则清除之前关注的op_write标记 if ((readyops & selectionkey.op_write) != 0) { // call forceflush which will also take care of clear the op_write once there is nothing left to write ch.unsafe().forceflush(); } // 如果是op_connect,则需要移除op_connect否则selector.select(timeout)将立即返回不会有任何阻塞,这样可能会出现cpu 100% if ((readyops & selectionkey.op_connect) != 0) { // remove op_connect as otherwise selector.select(..) will always return without blocking // see https://github.com/netty/netty/issues/924 int ops = k.interestops(); ops &= ~selectionkey.op_connect; k.interestops(ops); unsafe.finishconnect(); } } catch (cancelledkeyexception ignored) { unsafe.close(unsafe.voidpromise()); } }
该方法主要是对selectionkey k进行了检查,有如下几种不同的情况
1)op_accept,接受客户端连接
2)op_read, 可读事件, 即 channel 中收到了新数据可供上层读取。
3)op_write, 可写事件, 即上层可以向 channel 写入数据。
4)op_connect, 连接建立事件, 即 tcp 连接已经建立, channel 处于 active 状态。
本篇博文主要来看下当work 线程 selector检测到op_read事件时,内部干了些什么。
if ((readyops & (selectionkey.op_read | selectionkey.op_accept)) != 0 || readyops == 0) { unsafe.read(); if (!ch.isopen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件 // connection already closed - no need to handle write. return; } }
从代码中可以看到,当selectionkey发生的事件是selectionkey.op_read,执行unsafe的read方法。注意这里的unsafe是niobyteunsafe的实例
为什么说这里的unsafe是niobyteunsafe的实例呢?在上篇博文netty源码分析:accept中我们知道boss nioeventloopgroup中的nioeventloop只负责accpt客户端连接,然后将该客户端注册到work nioeventloopgroup中的nioeventloop中,即最终是由work线程对应的selector来进行read等时间的监听,即work线程中的channel为socketchannel,socketchannel的unsafe就是niobyteunsafe的实例
下面来看下niobyteunsafe中的read方法
@override public void read() { final channelconfig config = config(); if (!config.isautoread() && !isreadpending()) { // channelconfig.setautoread(false) was called in the meantime removereadop(); return; } final channelpipeline pipeline = pipeline(); final bytebufallocator allocator = config.getallocator(); final int maxmessagesperread = config.getmaxmessagesperread(); recvbytebufallocator.handle allochandle = this.allochandle; if (allochandle == null) { this.allochandle = allochandle = config.getrecvbytebufallocator().newhandle(); } bytebuf bytebuf = null; int messages = 0; boolean close = false; try { int totalreadamount = 0; boolean readpendingreset = false; do { //1、分配缓存 bytebuf = allochandle.allocate(allocator); int writable = bytebuf.writablebytes();//可写的字节容量 //2、将socketchannel数据写入缓存 int localreadamount = doreadbytes(bytebuf); if (localreadamount <= 0) { // not was read release the buffer bytebuf.release(); close = localreadamount < 0; break; } if (!readpendingreset) { readpendingreset = true; setreadpending(false); } //3、触发pipeline的channelread事件来对bytebuf进行后续处理 pipeline.firechannelread(bytebuf); bytebuf = null; if (totalreadamount >= integer.max_value - localreadamount) { // avoid overflow. totalreadamount = integer.max_value; break; } totalreadamount += localreadamount; // stop reading if (!config.isautoread()) { break; } if (localreadamount < writable) { // read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } } while (++ messages < maxmessagesperread); pipeline.firechannelreadcomplete(); allochandle.record(totalreadamount); if (close) { closeonread(pipeline); close = false; } } catch (throwable t) { handlereadexception(pipeline, bytebuf, t, close); } finally { if (!config.isautoread() && !isreadpending()) { removereadop(); } } } }
下面一一介绍比较重要的代码
allochandler的实例化过程
allochandle负责自适应调整当前缓存分配的大小,以防止缓存分配过多或过少,先看allochandler的实例化过程
recvbytebufallocator.handle allochandle = this.allochandle; if (allochandle == null) { this.allochandle = allochandle = config.getrecvbytebufallocator().newhandle(); }
其中, config.getrecvbytebufallocator()
得到的是一个 adaptiverecvbytebufallocator实例default。
public static final adaptiverecvbytebufallocator default = new adaptiverecvbytebufallocator();
而adaptiverecvbytebufallocator中的newhandler()方法的代码如下:
@override public handle newhandle() { return new handleimpl(minindex, maxindex, initial); } handleimpl(int minindex, int maxindex, int initial) { this.minindex = minindex; this.maxindex = maxindex; index = getsizetableindex(initial); nextreceivebuffersize = size_table[index]; }
其中,上面方法中所用到参数:minindex maxindex initial是什么意思呢?含义如下:minindex是最小缓存在size_table
中对应的下标。maxindex是最大缓存在size_table
中对应的下标,initial为初始化缓存大小。
adaptiverecvbytebufallocator的相关常量字段
public class adaptiverecvbytebufallocator implements recvbytebufallocator { static final int default_minimum = 64; static final int default_initial = 1024; static final int default_maximum = 65536; private static final int index_increment = 4; private static final int index_decrement = 1; private static final int[] size_table;
上面这些字段的具体含义说明如下:
1)、size_table
:按照从小到大的顺序预先存储可以分配的缓存大小。
从16开始,每次累加16,直到496,接着从512开始,每次增大一倍,直到溢出。size_table初始化过程如下。
static { list<integer> sizetable = new arraylist<integer>(); for (int i = 16; i < 512; i += 16) { sizetable.add(i); } for (int i = 512; i > 0; i <<= 1) { sizetable.add(i); } size_table = new int[sizetable.size()]; for (int i = 0; i < size_table.length; i ++) { size_table[i] = sizetable.get(i); } }
2)、default_minimum:最小缓存(64),在size_table中对应的下标为3。
3)、default_maximum :最大缓存(65536),在size_table中对应的下标为38。
4)、default_initial :初始化缓存大小,第一次分配缓存时,由于没有上一次实际收到的字节数做参考,需要给一个默认初始值。
5)、index_increment:上次预估缓存偏小,下次index的递增值。
6)、index_decrement :上次预估缓存偏大,下次index的递减值。
构造函数:
private adaptiverecvbytebufallocator() { this(default_minimum, default_initial, default_maximum); } public adaptiverecvbytebufallocator(int minimum, int initial, int maximum) { if (minimum <= 0) { throw new illegalargumentexception("minimum: " + minimum); } if (initial < minimum) { throw new illegalargumentexception("initial: " + initial); } if (maximum < initial) { throw new illegalargumentexception("maximum: " + maximum); } int minindex = getsizetableindex(minimum); if (size_table[minindex] < minimum) { this.minindex = minindex + 1; } else { this.minindex = minindex; } int maxindex = getsizetableindex(maximum); if (size_table[maxindex] > maximum) { this.maxindex = maxindex - 1; } else { this.maxindex = maxindex; } this.initial = initial; }
该构造函数对参数进行了有效性检查,然后初始化了如下3个字段,这3个字段就是上面用于产生allochandle对象所要用到的参数。
private final int minindex; private final int maxindex; private final int initial;
其中,getsizetableindex函数的代码如下,该函数的功能为:找到size_table中的元素刚好大于或等于size的位置。
private static int getsizetableindex(final int size) { for (int low = 0, high = size_table.length - 1;;) { if (high < low) { return low; } if (high == low) { return high; } int mid = low + high >>> 1; int a = size_table[mid]; int b = size_table[mid + 1]; if (size > b) { low = mid + 1; } else if (size < a) { high = mid - 1; } else if (size == a) { return mid; } else { //这里的情况就是 a < size <= b 的情况 return mid + 1; } } }
bytebuf = allochandle.allocate(allocator);
申请一块指定大小的内存
adaptiverecvbytebufallocator#handlerimpl
@override public bytebuf allocate(bytebufallocator alloc) { return alloc.iobuffer(nextreceivebuffersize); }
直接调用了iobuffer方法,继续看
abstractbytebufallocator.java
@override public bytebuf iobuffer(int initialcapacity) { if (platformdependent.hasunsafe()) { return directbuffer(initialcapacity); } return heapbuffer(initialcapacity); }
iobuffer函数中主要逻辑为:看平台是否支持unsafe,选择使用直接物理内存还是堆上内存。先看 heapbuffer
abstractbytebufallocator.java
@override public bytebuf heapbuffer(int initialcapacity) { return heapbuffer(initialcapacity, integer.max_value); } @override public bytebuf heapbuffer(int initialcapacity, int maxcapacity) { if (initialcapacity == 0 && maxcapacity == 0) { return emptybuf; } validate(initialcapacity, maxcapacity); return newheapbuffer(initialcapacity, maxcapacity); }
这里的newheapbuffer有两种实现:至于具体用哪一种,取决于我们对系统属性io.netty.allocator.type的设置,如果设置为: “pooled”,则缓存分配器就为:pooledbytebufallocator,进而利用对象池技术进行内存分配。如果不设置或者设置为其他,则缓存分配器为:unpooledbytebufallocator,则直接返回一个unpooledheapbytebuf对象。
unpooledbytebufallocator.java
@override protected bytebuf newheapbuffer(int initialcapacity, int maxcapacity) { return new unpooledheapbytebuf(this, initialcapacity, maxcapacity); }
pooledbytebufallocator.java
@override protected bytebuf newheapbuffer(int initialcapacity, int maxcapacity) { poolthreadcache cache = threadcache.get(); poolarena<byte[]> heaparena = cache.heaparena; bytebuf buf; if (heaparena != null) { buf = heaparena.allocate(cache, initialcapacity, maxcapacity); } else { buf = new unpooledheapbytebuf(this, initialcapacity, maxcapacity); } return toleakawarebuffer(buf); }
再看directbuffer
abstractbytebufallocator.java
@override public bytebuf directbuffer(int initialcapacity) { return directbuffer(initialcapacity, integer.max_value); } @override public bytebuf directbuffer(int initialcapacity, int maxcapacity) { if (initialcapacity == 0 && maxcapacity == 0) { return emptybuf; } validate(initialcapacity, maxcapacity);//参数的有效性检查 return newdirectbuffer(initialcapacity, maxcapacity); }
与newheapbuffer一样,这里的newdirectbuffer方法也有两种实现:至于具体用哪一种,取决于我们对系统属性io.netty.allocator.type的设置,如果设置为: “pooled”,则缓存分配器就为:pooledbytebufallocator,进而利用对象池技术进行内存分配。如果不设置或者设置为其他,则缓存分配器为:unpooledbytebufallocator。这里主要看下unpooledbytebufallocator. newdirectbuffer的内部实现
unpooledbytebufallocator.java
@override protected bytebuf newdirectbuffer(int initialcapacity, int maxcapacity) { bytebuf buf; if (platformdependent.hasunsafe()) { buf = new unpooledunsafedirectbytebuf(this, initialcapacity, maxcapacity); } else { buf = new unpooleddirectbytebuf(this, initialcapacity, maxcapacity); } return toleakawarebuffer(buf); }
unpooledunsafedirectbytebuf是如何实现缓存管理的?对nio的bytebuffer进行了封装,通过bytebuffer的allocatedirect方法实现缓存的申请。
protected unpooledunsafedirectbytebuf(bytebufallocator alloc, int initialcapacity, int maxcapacity) { super(maxcapacity); //省略了部分参数检查的代码 this.alloc = alloc; setbytebuffer(allocatedirect(initialcapacity)); }
protected bytebuffer allocatedirect(int initialcapacity) { return bytebuffer.allocatedirect(initialcapacity); } private void setbytebuffer(bytebuffer buffer) { bytebuffer oldbuffer = this.buffer; if (oldbuffer != null) { if (donotfree) { donotfree = false; } else { freedirect(oldbuffer); } } this.buffer = buffer; memoryaddress = platformdependent.directbufferaddress(buffer); tmpniobuf = null; capacity = buffer.remaining(); }
上面代码的主要逻辑为:
1、先利用bytebuffer的allocatedirect方法分配了大小为initialcapacity的缓存
2、然后判断将旧缓存给free掉
3、最后将新缓存赋给字段buffer上
其中:memoryaddress = platformdependent.directbufferaddress(buffer) 获取buffer的address字段值,指向缓存地址。
capacity = buffer.remaining() 获取缓存容量。
接下来看toleakawarebuffer(buf)方法
protected static bytebuf toleakawarebuffer(bytebuf buf) { resourceleak leak; switch (resourceleakdetector.getlevel()) { case simple: leak = abstractbytebuf.leakdetector.open(buf); if (leak != null) { buf = new simpleleakawarebytebuf(buf, leak); } break; case advanced: case paranoid: leak = abstractbytebuf.leakdetector.open(buf); if (leak != null) { buf = new advancedleakawarebytebuf(buf, leak); } break; } return buf; }
方法toleakawarebuffer(buf)对申请的buf又进行了一次包装。
上面一长串的分析,得到了缓存后,回到abstractniobytechannel.read方法,继续看。
doreadbytes方法
下面看下doreadbytes方法:将socketchannel数据写入缓存。
niosocketchannel.java
@override protected int doreadbytes(bytebuf bytebuf) throws exception { return bytebuf.writebytes(javachannel(), bytebuf.writablebytes()); }
将channel中的数据读入缓存bytebuf中。继续看
wrappedbytebuf.java
@override public int writebytes(scatteringbytechannel in, int length) throws ioexception { return buf.writebytes(in, length); }
abstractbytebuf.java
@override public int writebytes(scatteringbytechannel in, int length) throws ioexception { ensureaccessible(); ensurewritable(length); int writtenbytes = setbytes(writerindex, in, length); if (writtenbytes > 0) { writerindex += writtenbytes; } return writtenbytes; }
这里的setbytes方法有不同的实现,这里看下unpooledunsafedirectbytebuf的setbytes的实现。
unpooledunsafedirectbytebuf.java
@override public int setbytes(int index, scatteringbytechannel in, int length) throws ioexception { ensureaccessible(); bytebuffer tmpbuf = internalniobuffer(); tmpbuf.clear().position(index).limit(index + length); try { return in.read(tmpbuf); } catch (closedchannelexception ignored) { return -1;//当channel 已经关闭,则返回-1. } } private bytebuffer internalniobuffer() { bytebuffer tmpniobuf = this.tmpniobuf; if (tmpniobuf == null) { this.tmpniobuf = tmpniobuf = buffer.duplicate(); } return tmpniobuf; }
最终底层采用bytebuffer实现read操作,无论是pooledbytebuf、还是unpooledxxxbuf,里面都将底层数据结构bufbuffer/array转换为bytebuffer 来实现read操作。即无论是unpooledxxxbuf还是pooledxxxbuf里面都有一个bytebuffer tmpniobuf,这个tmpniobuf才是真正用来存储从管道channel中读取出的内容的。到这里就完成了将channel的数据读入到了缓存buf中。
我们具体来看看 in.read(tmpbuf); filechannel和socketchannel的read最后都是依赖的ioutil来实现,代码如下
public int read(bytebuffer dst) throws ioexception { ensureopen(); if (!readable) throw new nonreadablechannelexception(); synchronized (positionlock) { int n = 0; int ti = -1; try { begin(); ti = threads.add(); if (!isopen()) return 0; do { n = ioutil.read(fd, dst, -1, nd); } while ((n == iostatus.interrupted) && isopen()); return iostatus.normalize(n); } finally { threads.remove(ti); end(n > 0); assert iostatus.check(n); } } }
最后目的就是将socketchannel中的数据读出存放到bytebuffer dst中,我们看看 ioutil.read(fd, dst, -1, nd)
static int read(filedescriptor var0, bytebuffer var1, long var2, nativedispatcher var4) throws ioexception { if (var1.isreadonly()) { throw new illegalargumentexception("read-only buffer"); //如果最终承载数据的buffer是directbuffer,则直接将数据读入到堆外内存中 } else if (var1 instanceof directbuffer) { return readintonativebuffer(var0, var1, var2, var4); } else { // 分配临时的堆外内存 bytebuffer var5 = util.gettemporarydirectbuffer(var1.remaining()); int var7; try { // socket i/o 操作会将数据读入到堆外内存中 int var6 = readintonativebuffer(var0, var5, var2, var4); var5.flip(); if (var6 > 0) { // 将堆外内存的数据拷贝到堆内存中(用户定义的缓存,在jvm中分配内存) var1.put(var5); } var7 = var6; } finally { // 里面会调用directbuffer.cleaner().clean()来释放临时的堆外内存 util.offerfirsttemporarydirectbuffer(var5); } return var7; } }
2、如果缓存内存是堆内存,则先申请一块和缓存同大小的临时 directbytebuffer var5。
3、将内核缓存中的数据读到堆外缓存var5,底层由nativedispatcher的read实现。
4、把堆外缓存var5的数据拷贝到堆内存var1(用户定义的缓存,在jvm中分配内存)。
private static int readintonativebuffer(filedescriptor filedescriptor, bytebuffer bytebuffer, long l, nativedispatcher nativedispatcher, object obj) throws ioexception { int i = bytebuffer.position(); int j = bytebuffer.limit(); //如果断言开启,buffer的position大于limit,则抛出断言错误 if(!$assertionsdisabled && i > j) throw new assertionerror(); //获取需要读的字节数 int k = i > j ? 0 : j - i; if(k == 0) return 0; int i1 = 0; //从输入流读取k个字节到buffer if(l != -1l) i1 = nativedispatcher.pread(filedescriptor, ((directbuffer)bytebuffer).address() + (long)i, k, l, obj); else i1 = nativedispatcher.read(filedescriptor, ((directbuffer)bytebuffer).address() + (long)i, k); //重新定位buffer的position if(i1 > 0) bytebuffer.position(i + i1); return i1; }
回到abstractniobytechannel.read方法,继续看。
@override public void read() { //... try { int totalreadamount = 0; boolean readpendingreset = false; do { bytebuf = allochandle.allocate(allocator); int writable = bytebuf.writablebytes(); int localreadamount = doreadbytes(bytebuf); if (localreadamount <= 0) { // not was read release the buffer bytebuf.release(); close = localreadamount < 0; break; } if (!readpendingreset) { readpendingreset = true; setreadpending(false); } pipeline.firechannelread(bytebuf); bytebuf = null; if (totalreadamount >= integer.max_value - localreadamount) { // avoid overflow. totalreadamount = integer.max_value; break; } totalreadamount += localreadamount; // stop reading if (!config.isautoread()) { break; } if (localreadamount < writable) { // read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } } while (++ messages < maxmessagesperread); pipeline.firechannelreadcomplete(); allochandle.record(totalreadamount); if (close) { closeonread(pipeline); close = false; } } catch (throwable t) { handlereadexception(pipeline, bytebuf, t, close); } finally { if (!config.isautoread() && !isreadpending()) { removereadop(); } } } }
int localreadamount = doreadbytes(bytebuf);
1、如果返回0,则表示没有读取到数据,则退出循环。
2、如果返回-1,表示对端已经关闭连接,则退出循环。
3、否则,表示读取到了数据,数据读入缓存后,触发pipeline的channelread事件,bytebuf作为参数进行后续处理,这时自定义inbound类型的handler就可以进行业务处理了。pipeline的事件处理在我之前的博文中有详细的介绍。处理完成之后,再一次从channel读取数据,直至退出循环。
4、循环次数超过maxmessagesperread时,即只能在管道中读取maxmessagesperread次数据,既是还没有读完也要退出。在上篇博文中,boss线程接受客户端连接也用到了此变量,即当boss线程 selector检测到op_accept事件后一次只能接受maxmessagesperread个客户端连接
上一篇: C/S架构和B/S架构的概念和区别
推荐阅读
-
PHP源码分析之变量的存储过程分解
-
Netty源码分析 (四)----- ChannelPipeline
-
Netty源码分析 (三)----- 服务端启动源码分析
-
Tomcat源码分析 (九)----- HTTP请求处理过程(二)
-
Tomcat源码分析三:Tomcat启动加载过程(一)的源码解析
-
Qt事件分发机制源码分析之QApplication对象构建过程
-
Netty源码分析之ChannelPipeline(二)—ChannelHandler的添加与删除
-
Mybaits 源码解析 (六)----- 全网最详细:Select 语句的执行过程分析(上篇)(Mapper方法是如何调用到XML中的SQL的?)
-
Netty源码分析 (十)----- 拆包器之LineBasedFrameDecoder
-
vuex 源码分析(七) module和namespaced 详解