并发编程之ThreadLocal源码分析
当访问共享的可变数据时,通常需要使用同步。一种避免同步的方式就是不共享数据,仅在单线程内部访问数据,就不需要同步。该技术称之为线程封闭。
当数据封装到线程内部,即使该数据不是线程安全的,也会实现自动线程安全性。
维持线程封闭性可以通过ad-hoc线程封闭、栈封闭来实现,一种更加规范的方法是使用threadlocal类。threadlocal类提供线程局部变量,通过get、set等方法访问变量,为每个使用该变量的线程创建一个独立的副本。
一、threadlocal使用案例
案例中只开启了一个线程threada,展示了在线程内部设置、获取、清除局部变量。
public class threadlocaltest { // 初始化threadlocal变量 static threadlocal<string> localvariable = new threadlocal<>(); static void print(string str) { // 打印当前线程本地内存中的变量值 system.out.println(str + ": " + localvariable.get()); // 清除当前线程本地内存中的变量 localvariable.remove(); } public static void main(string[] args) { // 创建线程a thread threada = new thread(new runnable() { @override public void run() { // 设置线程a中的本地变量的值 localvariable.set("threada localvariable"); print("threada"); // 获取线程a中的本地变量的值 system.out.println("threada remove after: " + localvariable.get()); } }); // 启动线程 threada.start(); } } 运行结果: threada: threada localvariable threada remove after: null
案例二:线程唯一标识符生成器,为每个调用threadid.get()方法的线程创建id。
public class threadid { // 下一个要被分配的线程id private static final atomicinteger nextid = new atomicinteger(0); // 线程局部变量 private static final threadlocal<integer> threadid = new threadlocal<integer>() { @override protected integer initialvalue() { return nextid.getandincrement(); } }; // 返回当前线程唯一的id public static int get() { return threadid.get(); } }
二、threadlocal类的实现原理
在thread类中有一个threadlocals成员变量,其类型是threadlocalmap,默认情况下为null。
threadlocal.threadlocalmap threadlocals = null;
当某线程首次调用threadlocal变量的get或set方法时,会进行对象创建。在线程退出时,当前线程的threadlocals变量被清空。
private void exit() { ... threadlocals = null; inheritablethreadlocals = null; ... }
每个线程的局部变量不是存放于threadlocal实例中,而是存放于线程的threadlocals变量,即线程内存空间中。threadlocals变量本质上是map数据结构,可以存放多个threadlocal变量键值对。
【助解】threadlocal类可以看出一个外壳,线程中调用某threadlocal变量的set方法可以将变量值放入到该线程的threadlocals变量中,数据格式是<当前线程中该threadlocal变量的this引用,变量值>。当调用线程调用threadlocal变量的get方法时,从当前线程的threadlocals变量中取出key(引用)对应的value值。
1. threadlocal类核心方法--set()
将threadlocal变量的当前线程副本的值设置为指定value值。
public void set(t value) { // 获取调用方法的当前线程 thread t = thread.currentthread(); // 获取当前线程自身的threadlocals变量 threadlocalmap map = getmap(t); if (map != null) // map不为空,则设置 map.set(this, value); else // map为空,说明第一次调用,初始化线程的threadlocals变量 createmap(t, value); } threadlocalmap getmap(thread t) { return t.threadlocals; } void createmap(thread t, t firstvalue) { t.threadlocals = new threadlocalmap(this, firstvalue); }
线程的threadlocals变量,即threadlocal.threadlocalmap,是hashmap结构,它的key是当前threadlocal的实例对象引用,value值是该threadlocal实例对象调用set方法设置的值。
2. threadlocal类核心方法--t get()
返回threadlocal变量在当前线程副本中的值。如果当前线程中没有该变量的值,返回值会被首次初始化为initialvalue()方法的值。
public t get() { // 获取当前线程以及其threadlocals变量 thread t = thread.currentthread(); threadlocalmap map = getmap(t); // 如果threadlocals变量不为空 if (map != null) { // 根据当前threadlocal对象应用获取entry,存在则直接返回value值 threadlocalmap.entry e = map.getentry(this); if (e != null) { @suppresswarnings("unchecked") t result = (t)e.value; return result; } } // threadlocals为空,初始化当前线程threadlocals变量 return setinitialvalue(); } // threadlocals存在,设置初始值;不存在,初始化threadlocals变量 private t setinitialvalue() { // 返回当前threadlocal变量的当前线程初始值 t value = initialvalue(); thread t = thread.currentthread(); threadlocalmap map = getmap(t); if (map != null) map.set(this, value); else createmap(t, value); return value; } protected t initialvalue() { return null; }
3. threadlocal类核心方法--void remove()
当前线程threadlocals变量存在的话,删除当前线程的threadlocal实例对象。
public void remove() { threadlocal.threadlocalmap var1 = this.getmap(thread.currentthread()); if (var1 != null) { var1.remove(this); } }
三、threadlocal.threadlocalmap结构分析
threadlocal类图
threadlocal是一种存储变量与线程绑定的方式,在每个线程中用自己的threadlocalmap安全隔离变量,实现线程封闭。
threadlocalmap是threadlocal内的一个map实现,没有实现任何接口,仅用于线程内部存储threadlocal变量值。
static class threadlocalmap { ... } static class entry extends weakreference<threadlocal<?>> { object value; // threadlocal变量值 entry(threadlocal<?> k, object v) { super(k); value = v; } }
底层是entry数组,entry的key为threadlocal,value是线程的该threadlocal变量值。entry内部类继承了weakreference类。当entry.get()方法得到的threadlocal引用为空,表示该key不再被引用,此时entry对象视为【过期】,在数组中删除。
threadlocalmap类的字段
private entry[] table; // 表,必须为2的幂次方大小 private int size = 0; // 初始entry数 private int threshold; // resize操作,元素个数阈值 // 负载因子固定为2/3 private void setthreshold(int len) { threshold = len * 2 / 3; }
构造方法:
threadlocalmap(threadlocal<?> firstkey, object firstvalue) { // 初始化大小为16的entry数组 table = new entry[initial_capacity]; // 取模对应数组索引 int i = firstkey.threadlocalhashcode & (initial_capacity - 1); table[i] = new entry(firstkey, firstvalue); // 插入元素 size = 1; // 更新size setthreshold(initial_capacity); // 设置resize阈值,此时为10 }
1. threadlocalmap类之hashcode的计算
threadlocal变量与当前线程绑定,在hashmap中作为key,通过threadlocalhashcode值来查找。
// 自定义hashcode,可以用来解决同一线程连续构造threadlocal对象引起的冲突。 private final int threadlocalhashcode = nexthashcode() // 下一个hashcode值,原子更新,初始值为0 private static atomicinteger nexthashcode = new atomicinteger(); // 返回下一个hashcode private static int nexthashcode() { return nexthashcode.getandadd(hash_increment); } // 魔数:相邻两个hashcode之间的偏移 // 对2的幂次方大小的表,产生近似最优的hash值 private static final int hash_increment = 0x61c88647;
2. threadlocalmap类之set()方法
设置threadlocal变量值。
private void set(threadlocal<?> key, object value) { entry[] tab = table; int len = tab.length; int i = key.threadlocalhashcode & (len-1); // 计算索引位置 // 开放地址法 for (entry e = tab[i]; e != null; // entry不为空 e = tab[i = nextindex(i, len)]) { threadlocal<?> k = e.get(); // 获取entry的key--threadlocal // 如果当前entry的key与形参key相等,更新value值 if (k == key) { e.value = value; return; } // 如果当前entry的key为空,说明已过期,做清理! if (k == null) { // 清理过期entry,继续探索放置位置 replacestaleentry(key, value, i); return; } } // 若没有找到对应key,则在空位置创建entry tab[i] = new entry(key, value); int sz = ++size; // 更新size // 清理一些过期的位置,判断是否需要扩容 if (!cleansomeslots(i, sz) && sz >= threshold) rehash(); }
从set方法中,可以看出threadlocalmap中哈希冲突解决方法是开放地址法,而不是hashmap等采用链地址法。
前后索引位置--循环
private static int nextindex(int i, int len) { return ((i + 1 < len) ? i + 1 : 0); } private static int previndex(int i, int len) { return ((i - 1 >= 0) ? i - 1 : len - 1); }
(1) replacestaleentry()方法
清理过期entry,设置输入键值对。
// staleslot:key == null的位置 private void replacestaleentry(threadlocal<?> key, object value, int staleslot) { entry[] tab = table; int len = tab.length; entry e; // 从前一个位置开始向前寻找过期entry,直到entry不为空 // 不断向前移动清理位置 int slottoexpunge = staleslot; // 清理元素的最开始位置 for (int i = previndex(staleslot, len); (e = tab[i]) != null; // entry不为空 i = previndex(i, len)) if (e.get() == null) slottoexpunge = i; // 从后一个位置向后探索 for (int i = nextindex(staleslot, len); (e = tab[i]) != null; // entry不为空 i = nextindex(i, len)) { threadlocal<?> k = e.get(); // 如果找到key,需要将它与过期位置元素做交换,来维持哈希表顺序。 if (k == key) { e.value = value; // 更新value tab[i] = tab[staleslot]; // 元素交换 tab[staleslot] = e; // 如果相等,表示向前遍历时没有找到key为null的元素 // 前面语句进行了元素交换,此时位置i之前的元素均不需要清理 if (slottoexpunge == staleslot) slottoexpunge = i; // 更新清理开始位置 // 从slottoexpunge位置清理过期entry(key == null) cleansomeslots(expungestaleentry(slottoexpunge), len); return; } // 如果遍历中找到key为null的元素,并且向前没有找到key为null的位置 // 更新清理开始位置slottoexpunge为当前位置i if (k == null && slottoexpunge == staleslot) slottoexpunge = i; } // 如果没找到key,新建entry发在staleslot位置 tab[staleslot].value = null; // 清理动作 tab[staleslot] = new entry(key, value); // 如果除了staleslot位置,还有其他位置元素要清理(key == null的entry) if (slottoexpunge != staleslot) cleansomeslots(expungestaleentry(slottoexpunge), len); }
【注】slottoexpunge变量记录着元素清理的最开始位置。
(2)cleansomeslots方法
尝试扫描一些过期元素的位置,当添加新元素、清理其他过期元素时被调用。
从i位置开始扫描,i位置不是过期元素。而参数n用来限制扫描次数,如果过期entry没有发现,可以扫描log(n)次,log(n)的设定出于性能的考虑。
private boolean cleansomeslots(int i, int n) { boolean removed = false; entry[] tab = table; int len = tab.length; do { i = nextindex(i, len); // 从下一个位置开始 entry e = tab[i]; // 遍历到key==null的entry if (e != null && e.get() == null) { n = len; // 重置n removed = true; // 标志有清理元素 i = expungestaleentry(i); // 清理 } } while ( (n >>>= 1) != 0); // log(n) 限制--对数次 return removed; }
(3)expungestaleentry(int staleslot) 方法
作用:从staleslot位置开始,清理key为null的entry,直到entry为null的位置。过程中,遇到的entry不为空,并且entry.get()不为空的元素,进行rehash重新确定该元素位置。
private int expungestaleentry(int staleslot) { entry[] tab = table; int len = tab.length; // 清理staleslot位置的entry对象 tab[staleslot].value = null; tab[staleslot] = null; size--; // rehash直到entry为null entry e; int i; for (i = nextindex(staleslot, len); (e = tab[i]) != null; // entry不为null i = nextindex(i, len)) { threadlocal<?> k = e.get(); if (k == null) { // 过期entry,处理方式同staleslot e.value = null; tab[i] = null; size--; } else { // rehash计算出当前entry索引 int h = k.threadlocalhashcode & (len - 1); if (h != i) { // 不相等,说明原位置是探测出的。 tab[i] = null; // 原位置对象置空 // 如果h索引位置不为null,向后探测,直到找到null位置 while (tab[h] != null) h = nextindex(h, len); tab[h] = e; } } } return i; // i位置 (entry = tab[i]) == null }
3. threadlocalmap类之getentry()方法
根据threadlocal获取对应entry对象。
private entry getentry(threadlocal<?> key) { // 根据hashcode计算直接hash索引 int i = key.threadlocalhashcode & (table.length - 1); entry e = table[i]; // 如果是找到并且是有效entry对象,直接返回 if (e != null && e.get() == key) return e; else // 不是目标entry return getentryaftermiss(key, i, e); } private entry getentryaftermiss(threadlocal<?> key, int i, entry e) { entry[] tab = table; int len = tab.length; while (e != null) { threadlocal<?> k = e.get(); if (k == key) // 找到目标entry return e; if (k == null) // 清理过期entry expungestaleentry(i); else // 当前位置是有效entry,索引右移 i = nextindex(i, len); e = tab[i]; } return null; // 找不到,返回null }
4. threadlocalmap类之remove()方法
移除threadlocal变量对应的entry对象。
private void remove(threadlocal<?> key) { entry[] tab = table; int len = tab.length; int i = key.threadlocalhashcode & (len-1); // 遍历找到指定key的entry对象 for (entry e = tab[i]; e != null; e = tab[i = nextindex(i, len)]) { if (e.get() == key) { e.clear(); // 清理引用 expungestaleentry(i); // 清理过期entry return; } } }
5. threadlocalmap类之rehash()方法
private void rehash() { // 清理table中所有过期entry对象。 expungestaleentries(); // 如果元素个数超过阈值的3/4时,进行扩容 // 注:阈值本身是数组长度的2/3 if (size >= threshold - threshold / 4) resize(); } // 扩容至原容量的2倍 private void resize() { entry[] oldtab = table; int oldlen = oldtab.length; int newlen = oldlen * 2; entry[] newtab = new entry[newlen]; int count = 0; // 遍历老表,设置新表 for (int j = 0; j < oldlen; ++j) { entry e = oldtab[j]; if (e != null) { threadlocal<?> k = e.get(); if (k == null) { // 过期entry,清空value e.value = null; // 利于垃圾回收 } else { // 计算索引 int h = k.threadlocalhashcode & (newlen - 1); // 如果直接hash索引位置不为空,继续向后探索 while (newtab[h] != null) h = nextindex(h, newlen); newtab[h] = e; count++; } } } setthreshold(newlen); // 更新阈值 size = count; table = newtab; }
四、threadlocal内存泄漏问题
每个线程的threadlocal变量都存放在该线程的threadlocals变量中,如果当前线程一直不退出,这些threadlocal变量会一直存在,因此可能会导致内存泄漏。通过调用threadlocal类的remove方法避免这一问题。
参考博客的见解
threadlocalmap中采用threadlocal弱引用作为entry的key,如果一个threadlocal没有外部强引用来引用它,下一次系统gc时,这个threadlocal必然会被回收,threadlocalmap中就会出现key为null的entry。
threadlocal类的set、get、remove方法都可能触发对key为null的entry清理操作。expungestaleentry方法会清空entry及其value,entry会在下次gc被回收。
如果当前线程一直在运行,并且一直不执行get、set、remove方法,这些key为null的entry的value就会一直存在一条强引用链:thread ref -> thread -> threadlocalmap -> entry -> value,导致这些key为null的entry的value永远无法回收,造成内存泄漏。
参考资料:
- 《java并发编程实战》
- 《java并发编程之美》
- https://blog.csdn.net/v123411739/article/details/78698834
推荐阅读