netty源码解析(4.0)-28 ByteBuf内存池:PooledByteBufAllocator-把一切组装起来
pooledbytebufallocator负责初始化poolarena(pa)和poolthreadcache(ptc)。它提供了一系列的接口,用来创建使用堆内存或直接内存的pooledbytebuf对象,这些接口只是一张皮,内部完全使用了pa和ptc的能力。初始化过程分两个步骤,首先初始化一系列的默认参数,然后初始化ptc对象和pa数组。
默认参数和它们的值
default_page_size: poolchunk中的page的大小-pagesize, 使用-dio.netty.allocator.pagesize设置, 默认值:8192。
default_max_order: poolchunk中二叉树的高度: maxorder, 使用-dio.netty.allocator.maxorder设置,默认值:11。
default_num_heap_arena: 使用堆内存的pa数组的长度,使用-dio.netty.allocator.numheaparenas设置,默认值: cpu核心数 * 2。
default_num_direct_arena: 使用直接内存的pa数组的长度,使用-dio.netty.allocator.numheaparenas设置,默认值: cpu核心数 * 2。
default_tiny_cache_size: ptc对象中每个用来缓存tiny内存的memoryregioncache对象中queue的长度,使用-dio.netty.allocator.tinycachesize设置,默认值:512。
default_small_cache_size: ptc对象中每个用来缓存small内存的memoryregioncache对象中queue的长度,使用-dio.netty.allocator.smallcachesize设置,默认值:256。
default_normal_cache_size: ptc对象中每个用来缓存normal内存的memoryregioncache对象中queue的长度,使用-dio.netty.allocator.normalcachesize设置,默认值:64。
default_max_cached_buffer_capacity: ptc对象中缓存normal内存的大小上限。使用-dio.netty.allocator.maxcachedbuffercapacity设置,默认值32 * 1024。
default_cache_trim_interval: ptc对象中释放缓存的内存阈值。当ptc分配内存次数大于这个值时会释放缓存的内存。使用-dio.netty.allocator.cachetriminterval设置,默认值:8192。
default_use_cache_for_all_threads: 是否对所有的线程使用缓存。使用-dio.netty.allocator.usecacheforallthreads设置,默认值:true。
default_direct_memory_cache_alignment: 直接内存的对齐参数,分配直接内存的大小必须是它的整数倍。使用-dio.netty.allocator.directmemorycachealignment设置,默认值:0, 表示不对齐。
初始化poolarena数组
pooledbytebufallocator维护了两个数组:
poolarena<byte[]>[] heaparenas; poolarena<bytebuffer>[] directarenas;
heaparenas用来管理堆内存,directarenas用来管理直接内存。这两个数组在构造方法中初始化,构造方法的定义是:
public pooledbytebufallocator(boolean preferdirect, int nheaparena, int ndirectarena, int pagesize, int maxorder, int tinycachesize, int smallcachesize, int normalcachesize, boolean usecacheforallthreads, int directmemorycachealignment)
prefredirect: 创建pooledbytebuf时,是否优先使用直接内存。
nheaparena: 默认使用default_num_heap_arena。
ndirectarena: 默认使用default_num_direct_arena。
pagesize: 默认使用的default_page_size。
maxorder: 默认使用default_max_order。
tinycachesize: 默认使用default_tiny_cache_size。
smallcachesize: 默认使用default_small_cache_size。
normalcachesize: 默认使用default_normal_cache_size。
usecacheforallthreads: 默认使用default_use_cache_for_all_threads。
directmemorycachealignment: 默认使用default_direct_memory_cache_alignment。
这两数组的初始化代码如下:
1 int pageshifts = validateandcalculatepageshifts(pagesize); 2 3 if (nheaparena > 0) { 4 heaparenas = newarenaarray(nheaparena); 5 list<poolarenametric> metrics = new arraylist<poolarenametric>(heaparenas.length); 6 for (int i = 0; i < heaparenas.length; i ++) { 7 poolarena.heaparena arena = new poolarena.heaparena(this, 8 pagesize, maxorder, pageshifts, chunksize, 9 directmemorycachealignment); 10 heaparenas[i] = arena; 11 metrics.add(arena); 12 } 13 heaparenametrics = collections.unmodifiablelist(metrics); 14 } else { 15 heaparenas = null; 16 heaparenametrics = collections.emptylist(); 17 } 18 19 if (ndirectarena > 0) { 20 directarenas = newarenaarray(ndirectarena); 21 list<poolarenametric> metrics = new arraylist<poolarenametric>(directarenas.length); 22 for (int i = 0; i < directarenas.length; i ++) { 23 poolarena.directarena arena = new poolarena.directarena( 24 this, pagesize, maxorder, pageshifts, chunksize, directmemorycachealignment); 25 directarenas[i] = arena; 26 metrics.add(arena); 27 } 28 directarenametrics = collections.unmodifiablelist(metrics); 29 } else { 30 directarenas = null; 31 directarenametrics = collections.emptylist(); 32 }
1行,计算pageshifts,算法是pageshifts = integer.size - 1 - integer.numberofleadingzeros(pagesize) = 31 - integer.numberofleadingzeros(pagesize)。 integer.numberofleadingzeros(pagesize)是pagesize(32位整数)从最高位起连续是0的位数,因此pageshifts可以简化为pageshifts = log2(pagesize)。
4,20行,创建数组,new poolarena[size]。
6-12,22-17行, 初始化数组中的poolarena对象,分别使用pooarena的两个内部类: heaparena, directarena。
初始化poolthreadcache
poolthreadcache使用poolthreadlocalcache(ptlc)间接初始化,ptlc是pooledbytebufallocator的内部内,它的定义如下:
final class poolthreadlocalcache extends fastthreadlocal<poolthreadcache>
这个类派生自io.netty.util.concurrent.fastthreadlocal<t>, 和java.lang.threadlocal<t>功能一样,实现了线程本地存储(tls)的功能,不同的是fastthreadlocal<t>优化了访问性能。ptlc覆盖了父类的initialvalue方法,这个方法负责初始化线程本地的poolthreadcache对象。当第一次调用ptlc对象的get方法时,这个方法会被调用。
1 @override 2 protected synchronized poolthreadcache initialvalue() { 3 final poolarena<byte[]> heaparena = leastusedarena(heaparenas); 4 final poolarena<bytebuffer> directarena = leastusedarena(directarenas); 5 6 if (usecacheforallthreads || thread.currentthread() instanceof fastthreadlocalthread) { 7 return new poolthreadcache( 8 heaparena, directarena, tinycachesize, smallcachesize, normalcachesize, 9 default_max_cached_buffer_capacity, default_cache_trim_interval); 10 } 11 // no caching for non fastthreadlocalthreads. 12 return new poolthreadcache(heaparena, directarena, 0, 0, 0, 0, 0); 13 }
3,4行,分别从headarenas,directarenas中取出一个使用次数最少的poolarena对象。poolarena有一个numthreadcaches属性,这个属性是atomicinteger类型的原子变量。它的作用是在用来记录被poolthreadcache对象使用的次数。poolthreadcache对象创建时会在构造方法中会调用它的getandincrement方法,释放时在free0方法中调用他的getanddecrement方法。
6行, 如果运行每个线程都使用缓存(usercacheforallthreads==true),或者当成线程对象是fastthreadlocalthread时, 在第8行创建一个线程专用的ptc对象。
poolchunklist(pckl)
关键属性
poolchunklist<t> nextlist
poolchunklist<t> prevlist
这两个属性表明pckl对象是一个双向链表的节点。
poolchunk<t> head
这个属性表明pckl对象还维护的一个pck类型的链表,head指向这个链表的头。
int minusage;
int maxusage;
int maxcapacity;
minusage是pck链表中每个pck对象内存的最小使用率,maxuseage是pck的最大使用率。这两个值是百分比,例如:minusage=10, maxuse=50,表示pck链表中只能保存使用率在[10%,50%)的pck对象。 maxcapacity表示pck最大可分配的内存数,算法是: maxcapacity = (int)(chunksize * (100l - minuseage) / 100l)。
初始化pckl链表
pckl链表有poolarena负责维护,在poolarena的构造方法中初始化:
1 // io.netty.buffer.poolarena#poolarena(pooledbytebufallocator parent, int pagesize, 2 // int maxorder, int pageshifts, int chunksize, int cachealignment) 3 4 q100 = new poolchunklist<t>(this, null, 100, integer.max_value, chunksize); 5 q075 = new poolchunklist<t>(this, q100, 75, 100, chunksize); 6 q050 = new poolchunklist<t>(this, q075, 50, 100, chunksize); 7 q025 = new poolchunklist<t>(this, q050, 25, 75, chunksize); 8 q000 = new poolchunklist<t>(this, q025, 1, 50, chunksize); 9 qinit = new poolchunklist<t>(this, q000, integer.min_value, 25, chunksize); 10 11 q100.prevlist(q075); 12 q075.prevlist(q050); 13 q050.prevlist(q025); 14 q025.prevlist(q000); 15 q000.prevlist(null); 16 qinit.prevlist(qinit);
4-9行,初始化pckl节点。每个节点的名字q{num},其中num表示这个节点的最小使用率minusage,如q075节点的minusage=%75。
11-16行,把pckl节点组装成一个链表。
使用q(minusage, maxusage)表示一个节点,那么:
qinit = q(integer.min_value, 25%)
q000 = q(1%, 50%)
q025 = q(25%, 75%)
q075 = q(75%, 100%)
q100 = q(100%, integer.max_value)
这个链表的结构如下图所示:
poolchunk(pck)在poolchunklist(pckl)中移动
一个新创建的pck对象,它的内存使用率是usage=%0,被放进qinit节节点。每次从这个pck对象中分配内存,都会导致它的使用率增加,当usage>=25%,即大于等于qinit的maxusage时,会把它移动到q000中。继续从pck对象中分配内存,它的usage继续增加,当usage大于等于它所属pckl的maxusage时,把它移动到pkcl链表中的下一个节点,直到q100为止。下面是内存分配导致pck移动的代码:
1 //io.netty.buffer.poolchunklist#allocate 2 boolean allocate(pooledbytebuf<t> buf, int reqcapacity, int normcapacity) { 3 if (head == null || normcapacity > maxcapacity) { 4 // either this poolchunklist is empty or the requested capacity is larger then the capacity which can 5 // be handled by the poolchunks that are contained in this poolchunklist. 6 return false; 7 } 8 9 for (poolchunk<t> cur = head;;) { 10 long handle = cur.allocate(normcapacity); 11 if (handle < 0) { 12 cur = cur.next; 13 if (cur == null) { 14 return false; 15 } 16 } else { 17 cur.initbuf(buf, handle, reqcapacity); 18 if (cur.usage() >= maxusage) { 19 remove(cur); 20 nextlist.add(cur); 21 } 22 return true; 23 } 24 } 25 }
9-12行,尝试从pck链表中的所有pck节点分配所需的内存。
14行,没有找到能分配内存的pck节点。
17行,从cur节点分配到所需的内存,并初始化pooledbytebuf对象。
18-21行,如cur节点的使用率大于等于当前pckl节点maxusage,调用remove方法把cur从head链表中删除,然后调用pckl链表中的下一个节点的add方法,把cur移动到下一个节点中。
如果持续地释放内存,把内存还给pck对象,会导致usage持续减小,当usage小于它所属的pckl的minusage时,把它移动到pckl链表中的前一个节点,直到q000位为止。当释放内存导致pck对象的usage等于%0,会销毁这个pck对象,释放整个chunk的内存。下面是释放内存导致pck对象移动的代码:
1 //io.netty.buffer.poolchunklist#free 2 boolean free(poolchunk<t> chunk, long handle) { 3 chunk.free(handle); 4 if (chunk.usage() < minusage) { 5 remove(chunk); 6 // move the poolchunk down the poolchunklist linked-list. 7 return move0(chunk); 8 } 9 return true; 10 } 11 12 //io.netty.buffer.poolchunklist#move0 13 private boolean move0(poolchunk<t> chunk) { 14 if (prevlist == null) { 15 // there is no previous poolchunklist so return false which result in having the poolchunk destroyed and 16 // all memory associated with the poolchunk will be released. 17 assert chunk.usage() == 0; 18 return false; 19 } 20 return prevlist.move(chunk); 21 }
第3行,释放内存,把内存返还给pck对象。
4-7行,如pck的使用率小于当前pckl的minusage,调用remove方法把pck对象从当前pckl对象中删除,然后调用move0方法把它移动到前一个pckl节点。
13-31行,移动pck到前一个pckl。
完整的内存分配释放流程
内存分配
入口方法:
io.netty.buffer.abstractbytebufallocator#heapbuffer(int, int),创建使用堆内存的bytebuf, 调用newheapbuffer方法。
io.netty.buffer.abstractbytebufallocator#directbuffer(int, int), 创建使用直接内存的bytebuf, 调用newdirectbuffer方法。
具体实现:
io.netty.buffer.pooledbytebufallocator#newheapbuffer(int initialcapacity, int maxcapacity)。
io.netty.buffer.pooledbytebufallocator#newdirectbuffer(int initialcapacity, int maxcapacity)。
这两个方法都是从poolthreadcache对象中得到线程专用的poolarena对象,然后调用poolarena的allocate方法创建poolbytebuf对象。
poolarena入口方法:
io.netty.buffer.poolarena#allocate(io.netty.buffer.poolthreadcache, int, int),这个方法是poolarena分配内存,创建poolbytebuf对象的入口方法。它先调用子类实现的newbytebuf创建一个poolbytebuf对象,这个方法有两个实现:
io.netty.buffer.poolarena.heaparena#newbytebuf(int maxcapacity),创建使用堆内存的pooledbytebuf对象。
io.netty.buffer.poolarena.directarena#newbytebuf(int maxcapacity),创建使用直接内存pooledbytebuf对象。
然后调用io.netty.buffer.poolarena#allocate(io.netty.buffer.poolthreadcache, io.netty.buffer.pooledbytebuf<t>, int)方法为poolbytebuf对象分配内存,这个方法是分配内存的核心方法,下面来重点分析一下它的代码:
1 private void allocate(poolthreadcache cache, pooledbytebuf<t> buf, final int reqcapacity) { 2 final int normcapacity = normalizecapacity(reqcapacity); 3 if (istinyorsmall(normcapacity)) { // capacity < pagesize 4 int tableidx; 5 poolsubpage<t>[] table; 6 boolean tiny = istiny(normcapacity); 7 if (tiny) { // < 512 8 if (cache.allocatetiny(this, buf, reqcapacity, normcapacity)) { 9 // was able to allocate out of the cache so move on 10 return; 11 } 12 tableidx = tinyidx(normcapacity); 13 table = tinysubpagepools; 14 } else { 15 if (cache.allocatesmall(this, buf, reqcapacity, normcapacity)) { 16 // was able to allocate out of the cache so move on 17 return; 18 } 19 tableidx = smallidx(normcapacity); 20 table = smallsubpagepools; 21 } 22 23 final poolsubpage<t> head = table[tableidx]; 24 25 /** 26 * synchronize on the head. this is needed as {@link poolchunk#allocatesubpage(int)} and 27 * {@link poolchunk#free(long)} may modify the doubly linked list as well. 28 */ 29 synchronized (head) { 30 final poolsubpage<t> s = head.next; 31 if (s != head) { 32 assert s.donotdestroy && s.elemsize == normcapacity; 33 long handle = s.allocate(); 34 assert handle >= 0; 35 s.chunk.initbufwithsubpage(buf, handle, reqcapacity); 36 inctinysmallallocation(tiny); 37 return; 38 } 39 } 40 synchronized (this) { 41 allocatenormal(buf, reqcapacity, normcapacity); 42 } 43 44 inctinysmallallocation(tiny); 45 return; 46 } 47 if (normcapacity <= chunksize) { 48 if (cache.allocatenormal(this, buf, reqcapacity, normcapacity)) { 49 // was able to allocate out of the cache so move on 50 return; 51 } 52 synchronized (this) { 53 allocatenormal(buf, reqcapacity, normcapacity); 54 ++allocationsnormal; 55 } 56 } else { 57 // huge allocations are never served via the cache so just call allocatehuge 58 allocatehuge(buf, reqcapacity); 59 } 60 }
第2行,根据需要的内存大小reqcapacity,计算可以分配的标准内存大小normcapacity。必须满足(1)normcapacity>=reqcapacity, (2)normcapacity是directmemorycachealignment的整数倍,此外,还要根据reqcapacity的大小分3中情况:
reqcapacity>=chunksize:normcapacity取同时满足(1),(2)的最小值。
reqcapacity>=512且reqcapacity<chunksize: (3)normcapacity>=512*2k, (4)normcapacity<=chunksize,normcapacit取同时满足(1),(2),(3),(4)的最小值。
reqcapacity<412: (5)normcapacity<512, (6)normcapacity是16的整数倍,normcapacity取同时满足(1),(2),(5),(6)的最小值。
8-13行,分配tiny类型的内存(<512)。 8-10行,如果poolthreadcache缓存对象中分配到内存,分配内流程结束。12-13行,如果缓存中没有,就从tiny内存池中分配一块内存。
15-20行,分配small类型的内存(>=512且<pagesize)。和分配tiny内存的逻辑相同。
29-27行, 使用从前两个步骤中得到的tiny或small内存的索引,从子页面池中分配一块内存。33行,从子页面中分配内存。35行,使用分配到的内存初始化poolbytebuf对象,如果能到这里,分配内存流程结束。
41行,如果子页面池中还没有内存可用,调用allocatenormal方法从poolchunk对象中分配一个子页面,再从子页面中分配所需的内存。
47-55行,分配normal类型的内存(>=pagesize且<chunksize)。48,49行,从缓存中分配内存,如果成功,分配内存流程结束。53行,缓存中没有可用的内存,调用allocatenormal方法从poolchunk中分配内存。
58行,如果分配的是>chunksize的内存。这块内存不会进入pckl链表中。
上面代码中的allocatenormal方法封装了创建pck对象,从pck对象中分配内存,再把pck对象放入到pckl链表中的逻辑,也是十分重要的代码。
1 private void allocatenormal(pooledbytebuf<t> buf, int reqcapacity, int normcapacity) { 2 if (q050.allocate(buf, reqcapacity, normcapacity) || q025.allocate(buf, reqcapacity, normcapacity) || 3 q000.allocate(buf, reqcapacity, normcapacity) || qinit.allocate(buf, reqcapacity, normcapacity) || 4 q075.allocate(buf, reqcapacity, normcapacity)) { 5 return; 6 } 7 8 // add a new chunk. 9 poolchunk<t> c = newchunk(pagesize, maxorder, pageshifts, chunksize); 10 long handle = c.allocate(normcapacity); 11 assert handle > 0; 12 c.initbuf(buf, handle, reqcapacity); 13 qinit.add(c); 14 }
2-5行,依次尝试从每个pckl节点中分配内存,如果成功,分配内存流程结束。
9-13行,先创建一个新的pck对象,然后从中分配内存,使用内存初始化pooledbytebuf对象,最后把pck对象添加pckl链表头节点qinit中。pkcl对象的add方法会和allocate一样,根据pck对象的内存使用率,把它移动到链表中合适的位置。
内存释放
io.netty.buffer.pooledbytebuf#deallocate方法调用io.netty.buffer.poolarena#free方法,这个free方法负责整个内存释放过程。
1 void free(poolchunk<t> chunk, long handle, int normcapacity, poolthreadcache cache) { 2 if (chunk.unpooled) { 3 int size = chunk.chunksize(); 4 destroychunk(chunk); 5 activebyteshuge.add(-size); 6 deallocationshuge.increment(); 7 } else { 8 sizeclass sizeclass = sizeclass(normcapacity); 9 if (cache != null && cache.add(this, chunk, handle, normcapacity, sizeclass)) { 10 // cached so not free it. 11 return; 12 } 13 14 freechunk(chunk, handle, sizeclass); 15 } 16 }
这段代码重点在8-14行。第8,9行,优先把内存放到缓存中,这样下次就能快速地从缓存中直接取用。第14行,在不能放进缓存的情况下把内存返回给pck对象。
1 void freechunk(poolchunk<t> chunk, long handle, sizeclass sizeclass) { 2 final boolean destroychunk; 3 synchronized (this) { 4 switch (sizeclass) { 5 case normal: 6 ++deallocationsnormal; 7 break; 8 case small: 9 ++deallocationssmall; 10 break; 11 case tiny: 12 ++deallocationstiny; 13 break; 14 default: 15 throw new error(); 16 } 17 destroychunk = !chunk.parent.free(chunk, handle); 18 } 19 if (destroychunk) { 20 // destroychunk not need to be called while holding the synchronized lock. 21 destroychunk(chunk); 22 } 23 }
第17行,掉用pckl对象的free方法把内存还给pck对象,移动pck对象在pckl链表中位置。如果此时这个pck对象的使用率变成0,destroychunk=true。
第21行,调用destroychunk方法销毁掉pck对象。
上一篇: python与中文的那点事