欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

netty源码解析(4.0)-28 ByteBuf内存池:PooledByteBufAllocator-把一切组装起来

程序员文章站 2022-06-20 17:42:44
PooledByteBufAllocator负责初始化PoolArena(PA)和PoolThreadCache(PTC)。它提供了一系列的接口,用来创建使用堆内存或直接内存的PooledByteBuf对象,这些接口只是一张皮,内部完全使用了PA和PTC的能力。初始化过程分两个步骤,首先初始化一系列 ......

 

  pooledbytebufallocator负责初始化poolarena(pa)和poolthreadcache(ptc)。它提供了一系列的接口,用来创建使用堆内存或直接内存的pooledbytebuf对象,这些接口只是一张皮,内部完全使用了pa和ptc的能力。初始化过程分两个步骤,首先初始化一系列的默认参数,然后初始化ptc对象和pa数组。

 

默认参数和它们的值

  default_page_size: poolchunk中的page的大小-pagesize,  使用-dio.netty.allocator.pagesize设置, 默认值:8192。

  default_max_order: poolchunk中二叉树的高度: maxorder, 使用-dio.netty.allocator.maxorder设置,默认值:11。

  default_num_heap_arena: 使用堆内存的pa数组的长度,使用-dio.netty.allocator.numheaparenas设置,默认值: cpu核心数 * 2。

  default_num_direct_arena: 使用直接内存的pa数组的长度,使用-dio.netty.allocator.numheaparenas设置,默认值: cpu核心数 * 2。

  default_tiny_cache_size:  ptc对象中每个用来缓存tiny内存的memoryregioncache对象中queue的长度,使用-dio.netty.allocator.tinycachesize设置,默认值:512。

  default_small_cache_size: ptc对象中每个用来缓存small内存的memoryregioncache对象中queue的长度,使用-dio.netty.allocator.smallcachesize设置,默认值:256。

  default_normal_cache_size: ptc对象中每个用来缓存normal内存的memoryregioncache对象中queue的长度,使用-dio.netty.allocator.normalcachesize设置,默认值:64。

  default_max_cached_buffer_capacity: ptc对象中缓存normal内存的大小上限。使用-dio.netty.allocator.maxcachedbuffercapacity设置,默认值32 * 1024。

  default_cache_trim_interval:  ptc对象中释放缓存的内存阈值。当ptc分配内存次数大于这个值时会释放缓存的内存。使用-dio.netty.allocator.cachetriminterval设置,默认值:8192。

  default_use_cache_for_all_threads: 是否对所有的线程使用缓存。使用-dio.netty.allocator.usecacheforallthreads设置,默认值:true。

  default_direct_memory_cache_alignment: 直接内存的对齐参数,分配直接内存的大小必须是它的整数倍。使用-dio.netty.allocator.directmemorycachealignment设置,默认值:0, 表示不对齐。

 

初始化poolarena数组

  pooledbytebufallocator维护了两个数组:

poolarena<byte[]>[] heaparenas; 
poolarena<bytebuffer>[] directarenas;

  heaparenas用来管理堆内存,directarenas用来管理直接内存。这两个数组在构造方法中初始化,构造方法的定义是:

    public pooledbytebufallocator(boolean preferdirect, int nheaparena, int ndirectarena, int pagesize, int maxorder,
                                  int tinycachesize, int smallcachesize, int normalcachesize,
                                  boolean usecacheforallthreads, int directmemorycachealignment)

  prefredirect: 创建pooledbytebuf时,是否优先使用直接内存。

  nheaparena: 默认使用default_num_heap_arena

  ndirectarena: 默认使用default_num_direct_arena

  pagesize: 默认使用的default_page_size

  maxorder: 默认使用default_max_order

  tinycachesize:  默认使用default_tiny_cache_size

  smallcachesize: 默认使用default_small_cache_size

  normalcachesize: 默认使用default_normal_cache_size。

  usecacheforallthreads: 默认使用default_use_cache_for_all_threads

  directmemorycachealignment: 默认使用default_direct_memory_cache_alignment

  

  这两数组的初始化代码如下:

 1       int pageshifts = validateandcalculatepageshifts(pagesize);
 2 
 3         if (nheaparena > 0) {
 4             heaparenas = newarenaarray(nheaparena);
 5             list<poolarenametric> metrics = new arraylist<poolarenametric>(heaparenas.length);
 6             for (int i = 0; i < heaparenas.length; i ++) {
 7                 poolarena.heaparena arena = new poolarena.heaparena(this,
 8                         pagesize, maxorder, pageshifts, chunksize,
 9                         directmemorycachealignment);
10                 heaparenas[i] = arena;
11                 metrics.add(arena);
12             }
13             heaparenametrics = collections.unmodifiablelist(metrics);
14         } else {
15             heaparenas = null;
16             heaparenametrics = collections.emptylist();
17         }
18 
19         if (ndirectarena > 0) {
20             directarenas = newarenaarray(ndirectarena);
21             list<poolarenametric> metrics = new arraylist<poolarenametric>(directarenas.length);
22             for (int i = 0; i < directarenas.length; i ++) {
23                 poolarena.directarena arena = new poolarena.directarena(
24                         this, pagesize, maxorder, pageshifts, chunksize, directmemorycachealignment);
25                 directarenas[i] = arena;
26                 metrics.add(arena);
27             }
28             directarenametrics = collections.unmodifiablelist(metrics);
29         } else {
30             directarenas = null;
31             directarenametrics = collections.emptylist();
32         }

  1行,计算pageshifts,算法是pageshifts = integer.size - 1 - integer.numberofleadingzeros(pagesize) = 31 - integer.numberofleadingzeros(pagesize)。 integer.numberofleadingzeros(pagesize)是pagesize(32位整数)从最高位起连续是0的位数,因此pageshifts可以简化为pageshifts = log2(pagesize)。

  4,20行,创建数组,new poolarena[size]。  

  6-12,22-17行, 初始化数组中的poolarena对象,分别使用pooarena的两个内部类: heaparena, directarena。

 

初始化poolthreadcache

   poolthreadcache使用poolthreadlocalcache(ptlc)间接初始化,ptlc是pooledbytebufallocator的内部内,它的定义如下:

final class poolthreadlocalcache extends fastthreadlocal<poolthreadcache>

  这个类派生自io.netty.util.concurrent.fastthreadlocal<t>,  和java.lang.threadlocal<t>功能一样,实现了线程本地存储(tls)的功能,不同的是fastthreadlocal<t>优化了访问性能。ptlc覆盖了父类的initialvalue方法,这个方法负责初始化线程本地的poolthreadcache对象。当第一次调用ptlc对象的get方法时,这个方法会被调用。

 1         @override
 2         protected synchronized poolthreadcache initialvalue() {
 3             final poolarena<byte[]> heaparena = leastusedarena(heaparenas);
 4             final poolarena<bytebuffer> directarena = leastusedarena(directarenas);
 5 
 6             if (usecacheforallthreads || thread.currentthread() instanceof fastthreadlocalthread) {
 7                 return new poolthreadcache(
 8                         heaparena, directarena, tinycachesize, smallcachesize, normalcachesize,
 9                         default_max_cached_buffer_capacity, default_cache_trim_interval);
10             }
11             // no caching for non fastthreadlocalthreads.
12             return new poolthreadcache(heaparena, directarena, 0, 0, 0, 0, 0);
13         }

  3,4行,分别从headarenas,directarenas中取出一个使用次数最少的poolarena对象。poolarena有一个numthreadcaches属性,这个属性是atomicinteger类型的原子变量。它的作用是在用来记录被poolthreadcache对象使用的次数。poolthreadcache对象创建时会在构造方法中会调用它的getandincrement方法,释放时在free0方法中调用他的getanddecrement方法。

  6行,  如果运行每个线程都使用缓存(usercacheforallthreads==true),或者当成线程对象是fastthreadlocalthread时, 在第8行创建一个线程专用的ptc对象。

 

poolchunklist(pckl)

关键属性

  poolchunklist<t> nextlist

  poolchunklist<t> prevlist

  这两个属性表明pckl对象是一个双向链表的节点。

  poolchunk<t> head

  这个属性表明pckl对象还维护的一个pck类型的链表,head指向这个链表的头。

  int minusage;

  int maxusage;

  int maxcapacity;

  minusage是pck链表中每个pck对象内存的最小使用率,maxuseage是pck的最大使用率。这两个值是百分比,例如:minusage=10, maxuse=50,表示pck链表中只能保存使用率在[10%,50%)的pck对象。 maxcapacity表示pck最大可分配的内存数,算法是: maxcapacity = (int)(chunksize * (100l - minuseage) / 100l)。

 

初始化pckl链表

  pckl链表有poolarena负责维护,在poolarena的构造方法中初始化:

 1 // io.netty.buffer.poolarena#poolarena(pooledbytebufallocator parent, int pagesize,
 2 //          int maxorder, int pageshifts, int chunksize, int cachealignment)
 3 
 4         q100 = new poolchunklist<t>(this, null, 100, integer.max_value, chunksize);
 5         q075 = new poolchunklist<t>(this, q100, 75, 100, chunksize);
 6         q050 = new poolchunklist<t>(this, q075, 50, 100, chunksize);
 7         q025 = new poolchunklist<t>(this, q050, 25, 75, chunksize);
 8         q000 = new poolchunklist<t>(this, q025, 1, 50, chunksize);
 9         qinit = new poolchunklist<t>(this, q000, integer.min_value, 25, chunksize);
10 
11         q100.prevlist(q075);
12         q075.prevlist(q050);
13         q050.prevlist(q025);
14         q025.prevlist(q000);
15         q000.prevlist(null);
16         qinit.prevlist(qinit); 

  4-9行,初始化pckl节点。每个节点的名字q{num},其中num表示这个节点的最小使用率minusage,如q075节点的minusage=%75。

  11-16行,把pckl节点组装成一个链表。

  使用q(minusage, maxusage)表示一个节点,那么:

  qinit = q(integer.min_value, 25%)

  q000 = q(1%, 50%)

  q025 = q(25%, 75%)

  q075 = q(75%, 100%)

  q100 = q(100%, integer.max_value)

  这个链表的结构如下图所示:

  netty源码解析(4.0)-28 ByteBuf内存池:PooledByteBufAllocator-把一切组装起来

poolchunk(pck)在poolchunklist(pckl)中移动

  一个新创建的pck对象,它的内存使用率是usage=%0,被放进qinit节节点。每次从这个pck对象中分配内存,都会导致它的使用率增加,当usage>=25%,即大于等于qinit的maxusage时,会把它移动到q000中。继续从pck对象中分配内存,它的usage继续增加,当usage大于等于它所属pckl的maxusage时,把它移动到pkcl链表中的下一个节点,直到q100为止。下面是内存分配导致pck移动的代码:

 1     //io.netty.buffer.poolchunklist#allocate
 2     boolean allocate(pooledbytebuf<t> buf, int reqcapacity, int normcapacity) {
 3         if (head == null || normcapacity > maxcapacity) {
 4             // either this poolchunklist is empty or the requested capacity is larger then the capacity which can
 5             // be handled by the poolchunks that are contained in this poolchunklist.
 6             return false;
 7         }
 8 
 9         for (poolchunk<t> cur = head;;) {
10             long handle = cur.allocate(normcapacity);
11             if (handle < 0) {
12                 cur = cur.next;
13                 if (cur == null) {
14                     return false;
15                 }
16             } else {
17                 cur.initbuf(buf, handle, reqcapacity);
18                 if (cur.usage() >= maxusage) {
19                     remove(cur);
20                     nextlist.add(cur);
21                 }
22                 return true;
23             }
24         }
25     }

  9-12行,尝试从pck链表中的所有pck节点分配所需的内存。

  14行,没有找到能分配内存的pck节点。

  17行,从cur节点分配到所需的内存,并初始化pooledbytebuf对象。

  18-21行,如cur节点的使用率大于等于当前pckl节点maxusage,调用remove方法把cur从head链表中删除,然后调用pckl链表中的下一个节点的add方法,把cur移动到下一个节点中。

 

  如果持续地释放内存,把内存还给pck对象,会导致usage持续减小,当usage小于它所属的pckl的minusage时,把它移动到pckl链表中的前一个节点,直到q000位为止。当释放内存导致pck对象的usage等于%0,会销毁这个pck对象,释放整个chunk的内存。下面是释放内存导致pck对象移动的代码:

 1     //io.netty.buffer.poolchunklist#free
 2     boolean free(poolchunk<t> chunk, long handle) {
 3         chunk.free(handle);
 4         if (chunk.usage() < minusage) {
 5             remove(chunk);
 6             // move the poolchunk down the poolchunklist linked-list.
 7             return move0(chunk);
 8         }
 9         return true;
10     }
11 
12     //io.netty.buffer.poolchunklist#move0
13     private boolean move0(poolchunk<t> chunk) {
14         if (prevlist == null) {
15             // there is no previous poolchunklist so return false which result in having the poolchunk destroyed and
16             // all memory associated with the poolchunk will be released.
17             assert chunk.usage() == 0;
18             return false;
19         }
20         return prevlist.move(chunk);
21     }

  第3行,释放内存,把内存返还给pck对象。

  4-7行,如pck的使用率小于当前pckl的minusage,调用remove方法把pck对象从当前pckl对象中删除,然后调用move0方法把它移动到前一个pckl节点。

  13-31行,移动pck到前一个pckl。

 

完整的内存分配释放流程

内存分配

  入口方法:

  io.netty.buffer.abstractbytebufallocator#heapbuffer(int, int),创建使用堆内存的bytebuf, 调用newheapbuffer方法。

  io.netty.buffer.abstractbytebufallocator#directbuffer(int, int), 创建使用直接内存的bytebuf,  调用newdirectbuffer方法。

  具体实现:

  io.netty.buffer.pooledbytebufallocator#newheapbuffer(int initialcapacity, int maxcapacity)。

  io.netty.buffer.pooledbytebufallocator#newdirectbuffer(int initialcapacity, int maxcapacity)。 

  这两个方法都是从poolthreadcache对象中得到线程专用的poolarena对象,然后调用poolarena的allocate方法创建poolbytebuf对象。

  poolarena入口方法:

  io.netty.buffer.poolarena#allocate(io.netty.buffer.poolthreadcache, int, int),这个方法是poolarena分配内存,创建poolbytebuf对象的入口方法。它先调用子类实现的newbytebuf创建一个poolbytebuf对象,这个方法有两个实现:

  io.netty.buffer.poolarena.heaparena#newbytebuf(int maxcapacity),创建使用堆内存的pooledbytebuf对象。

  io.netty.buffer.poolarena.directarena#newbytebuf(int maxcapacity),创建使用直接内存pooledbytebuf对象。

  然后调用io.netty.buffer.poolarena#allocate(io.netty.buffer.poolthreadcache, io.netty.buffer.pooledbytebuf<t>, int)方法为poolbytebuf对象分配内存,这个方法是分配内存的核心方法,下面来重点分析一下它的代码:

 1      private void allocate(poolthreadcache cache, pooledbytebuf<t> buf, final int reqcapacity) {
 2         final int normcapacity = normalizecapacity(reqcapacity);
 3         if (istinyorsmall(normcapacity)) { // capacity < pagesize
 4             int tableidx;
 5             poolsubpage<t>[] table;
 6             boolean tiny = istiny(normcapacity);
 7             if (tiny) { // < 512
 8                 if (cache.allocatetiny(this, buf, reqcapacity, normcapacity)) {
 9                     // was able to allocate out of the cache so move on
10                     return;
11                 }
12                 tableidx = tinyidx(normcapacity);
13                 table = tinysubpagepools;
14             } else {
15                 if (cache.allocatesmall(this, buf, reqcapacity, normcapacity)) {
16                     // was able to allocate out of the cache so move on
17                     return;
18                 }
19                 tableidx = smallidx(normcapacity);
20                 table = smallsubpagepools;
21             }
22 
23             final poolsubpage<t> head = table[tableidx];
24 
25             /**
26              * synchronize on the head. this is needed as {@link poolchunk#allocatesubpage(int)} and
27              * {@link poolchunk#free(long)} may modify the doubly linked list as well.
28              */
29             synchronized (head) {
30                 final poolsubpage<t> s = head.next;
31                 if (s != head) {
32                     assert s.donotdestroy && s.elemsize == normcapacity;
33                     long handle = s.allocate();
34                     assert handle >= 0;
35                     s.chunk.initbufwithsubpage(buf, handle, reqcapacity);
36                     inctinysmallallocation(tiny);
37                     return;
38                 }
39             }
40             synchronized (this) {
41                 allocatenormal(buf, reqcapacity, normcapacity);
42             }
43 
44             inctinysmallallocation(tiny);
45             return;
46         }
47         if (normcapacity <= chunksize) {
48             if (cache.allocatenormal(this, buf, reqcapacity, normcapacity)) {
49                 // was able to allocate out of the cache so move on
50                 return;
51             }
52             synchronized (this) {
53                 allocatenormal(buf, reqcapacity, normcapacity);
54                 ++allocationsnormal;
55             }
56         } else {
57             // huge allocations are never served via the cache so just call allocatehuge
58             allocatehuge(buf, reqcapacity);
59         }
60     }

  第2行,根据需要的内存大小reqcapacity,计算可以分配的标准内存大小normcapacity。必须满足(1)normcapacity>=reqcapacity, (2)normcapacity是directmemorycachealignment的整数倍,此外,还要根据reqcapacity的大小分3中情况:

    reqcapacity>=chunksize:normcapacity取同时满足(1),(2)的最小值。

    reqcapacity>=512且reqcapacity<chunksize: (3)normcapacity>=512*2k, (4)normcapacity<=chunksize,normcapacit取同时满足(1),(2),(3),(4)的最小值。

    reqcapacity<412: (5)normcapacity<512, (6)normcapacity是16的整数倍,normcapacity取同时满足(1),(2),(5),(6)的最小值。

  8-13行,分配tiny类型的内存(<512)。 8-10行,如果poolthreadcache缓存对象中分配到内存,分配内流程结束。12-13行,如果缓存中没有,就从tiny内存池中分配一块内存。

  15-20行,分配small类型的内存(>=512且<pagesize)。和分配tiny内存的逻辑相同。

  29-27行,  使用从前两个步骤中得到的tiny或small内存的索引,从子页面池中分配一块内存。33行,从子页面中分配内存。35行,使用分配到的内存初始化poolbytebuf对象,如果能到这里,分配内存流程结束。

  41行,如果子页面池中还没有内存可用,调用allocatenormal方法从poolchunk对象中分配一个子页面,再从子页面中分配所需的内存。

  47-55行,分配normal类型的内存(>=pagesize且<chunksize)。48,49行,从缓存中分配内存,如果成功,分配内存流程结束。53行,缓存中没有可用的内存,调用allocatenormal方法从poolchunk中分配内存。

  58行,如果分配的是>chunksize的内存。这块内存不会进入pckl链表中。

  

  上面代码中的allocatenormal方法封装了创建pck对象,从pck对象中分配内存,再把pck对象放入到pckl链表中的逻辑,也是十分重要的代码。

 1     private void allocatenormal(pooledbytebuf<t> buf, int reqcapacity, int normcapacity) {
 2         if (q050.allocate(buf, reqcapacity, normcapacity) || q025.allocate(buf, reqcapacity, normcapacity) ||
 3             q000.allocate(buf, reqcapacity, normcapacity) || qinit.allocate(buf, reqcapacity, normcapacity) ||
 4             q075.allocate(buf, reqcapacity, normcapacity)) {
 5             return;
 6         }
 7 
 8         // add a new chunk.
 9         poolchunk<t> c = newchunk(pagesize, maxorder, pageshifts, chunksize);
10         long handle = c.allocate(normcapacity);
11         assert handle > 0;
12         c.initbuf(buf, handle, reqcapacity);
13         qinit.add(c);
14     }

  2-5行,依次尝试从每个pckl节点中分配内存,如果成功,分配内存流程结束。

  9-13行,先创建一个新的pck对象,然后从中分配内存,使用内存初始化pooledbytebuf对象,最后把pck对象添加pckl链表头节点qinit中。pkcl对象的add方法会和allocate一样,根据pck对象的内存使用率,把它移动到链表中合适的位置。

  

内存释放

  io.netty.buffer.pooledbytebuf#deallocate方法调用io.netty.buffer.poolarena#free方法,这个free方法负责整个内存释放过程。

 1     void free(poolchunk<t> chunk, long handle, int normcapacity, poolthreadcache cache) {
 2         if (chunk.unpooled) {
 3             int size = chunk.chunksize();
 4             destroychunk(chunk);
 5             activebyteshuge.add(-size);
 6             deallocationshuge.increment();
 7         } else {
 8             sizeclass sizeclass = sizeclass(normcapacity);
 9             if (cache != null && cache.add(this, chunk, handle, normcapacity, sizeclass)) {
10                 // cached so not free it.
11                 return;
12             }
13 
14             freechunk(chunk, handle, sizeclass);
15         }
16     }

  这段代码重点在8-14行。第8,9行,优先把内存放到缓存中,这样下次就能快速地从缓存中直接取用。第14行,在不能放进缓存的情况下把内存返回给pck对象。

 1     void freechunk(poolchunk<t> chunk, long handle, sizeclass sizeclass) {
 2         final boolean destroychunk;
 3         synchronized (this) {
 4             switch (sizeclass) {
 5             case normal:
 6                 ++deallocationsnormal;
 7                 break;
 8             case small:
 9                 ++deallocationssmall;
10                 break;
11             case tiny:
12                 ++deallocationstiny;
13                 break;
14             default:
15                 throw new error();
16             }
17             destroychunk = !chunk.parent.free(chunk, handle);
18         }
19         if (destroychunk) {
20             // destroychunk not need to be called while holding the synchronized lock.
21             destroychunk(chunk);
22         }
23     }

  第17行,掉用pckl对象的free方法把内存还给pck对象,移动pck对象在pckl链表中位置。如果此时这个pck对象的使用率变成0,destroychunk=true。

  第21行,调用destroychunk方法销毁掉pck对象。