netty ByteBuf对象池和内存泄漏检测实现走读
ByteBuf存放在堆外内存中,采用引用计数法的方式进行内存回收,具体的实现在AbstractReferenceCountByteBuf中。
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater;
static {
AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater =
PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
if (updater == null) {
updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
}
refCntUpdater = updater;
}
private volatile int refCnt = 1;
其中refCntUpdater中作为一个静态变量,并没有具体的存放该ByteBuf对应的堆外内存的引用计数,而是封装了通过unsafe来操作成员变量refCnt的操作。
而refCnt,在初始化的之后就为1,默认为该堆外内存原始的ByteBuf就存在一次引用。
UnsafeAtomicIntegerFieldUpdater(Unsafe unsafe, Class<? super T> tClass, String fieldName)
throws NoSuchFieldException {
Field field = tClass.getDeclaredField(fieldName);
if (!Modifier.isVolatile(field.getModifiers())) {
throw new IllegalArgumentException("Must be volatile");
}
this.unsafe = unsafe;
offset = unsafe.objectFieldOffset(field);
}
@Override
public boolean compareAndSet(T obj, int expect, int update) {
return unsafe.compareAndSwapInt(obj, offset, expect, update);
}
以上是UnsafeAtomicIntegerFieldUpdater中对于refCnt的操作的实现。
而AbstractReferenceCountByteBuf正是通过UnsafeAtomicIntegerFieldUpdater来进行引用计数的增加和减少,达到避免内存泄露的目的。
@Override
public boolean release(int decrement) {
return release0(checkPositive(decrement, "decrement"));
}
private boolean release0(int decrement) {
for (;;) {
int refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
if (refCnt == decrement) {
deallocate();
return true;
}
return false;
}
}
}
在初始化后的AbstractReferenceCountByteBuf的refCnt为1,只有通过调用其release()方法,才会通过UnsafeAtomicIntegerFieldUpdater来对refCnt进行减1,当refCnt与相减的值的差为0时,将会调用deallocate()方法对堆外内存进行回收,deallocate()的实现就会根据具体场景实现。对于非池化且没有采用nocleaner的ByteBuf,当调用到deallocate()的时候,将会把这块堆外内存直接释放,在忘记release的情况下,也会在gc中凭借cleaner被回收掉,而对于池化的ByteBuf(必定采用nocleaner策略),调用了deallocate()的时候,将会把这块内存回收到内存对象池recycler中,而不是直接释放,存在内存泄漏的风险。
而如果需要增加计数,则可以通过retain()方法进行增加计数,这样即使调用了一次release(1),也不会导致维护的这块堆外内存被释放。
关于池化ButeBuf的内存对象池Recycler的实现。
内存对象池的核心主要是两个数据结构,一个是每个线程通过ThreadLocal实现的私有的堆栈Stack,用来存放由该线程申请并又由该线程释放的内存对象,可以有效减少对象的申请和回收的开销。
而另一个对象,则是其他线程回收别的线程对象的专属队列。
private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
@Override
protected Map<Stack<?>, WeakOrderQueue> initialValue() {
return new WeakHashMap<Stack<?>, WeakOrderQueue>();
}
};
WeakOrderQueue则是一个链表,其中每个节点都是一个与Stack中堆栈实现方式一样的堆栈节点,当别的线程回收该线程的申请的对象的时候,将不会将该对象直接放到其Stack中,避免回收对象时候遇到的线程竞争,而是直接申请一个当前线程私有的目标线程的链表队列,将回收对象存放到对应的队列中,避免回收过程中的并发竞争。
说起来比较复杂绕口,举个例子。
举个例子,线程A申请了一个对象,当线程B需要将其回收时,将会根据这个对象的申请线程A在线程B的私有空间中寻找对应专属线程A的回收队列,如果没有,则创建,并将该队列作为线程A回收链表队列的首节点,如果有则直接把需要回收的对象存放到该队列中,由于该队列为线程B私有专门针对线程A的,所以不存在资源竞争。
当A本身的Stack不存在对象时,将会依次从其回收链表的首队列 中将对象放回到自己持有的Stack中,达到对象池的目的。
Netty本身对ByteBuf的内存泄漏,进行了检测手段。
在默认的场景下,会以一定的比例对ByteBuf进行采样,对于采样到的ByteBuf增加一个虚引用,以便进行内存泄漏的检测。
private final class DefaultResourceLeak extends PhantomReference<Object> implements ResourceLeak
核心在于,将会把需要进行内存检测的ByteBuf增加一个虚引用PhantomReference,这样便可以通过定时轮询引用队列的方式,当对象即将被回收而出现引用队列中的时候,判断是否已经将对应的堆外内存释放,即可判断是否出现了内存泄漏。同时leak可以在相应的buffer引用技术发生改变的时候将改动记录在其堆栈中,方便内存对象完整的调用链路展现。
本文地址:https://blog.csdn.net/weixin_40318210/article/details/107651161