欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ThreadLocal&InheritableThreadLocal&transmittable-thread-local

程序员文章站 2022-04-01 15:47:57
...

ThreadLocal

http://blog.csdn.net/yinbingqiu/article/details/71159394

特性

  • 一个线程使用一个对象,线程之间数据隔离

适用场景

  • 线程内参数传递
  • 线程之间数据隔离

init

private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode = new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;   
//斐波那契散列  黄金分割,哈希分布比较均匀
private static int nextHashCode() {
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }

threadLocalHashCode 每个ThreadLocal都有一个,不会重复

ThreadLocal 只介绍get set方法,其中已包含了初始化方法

get

public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
    }

思路:

  • 获取当前线程,然后线程中持有引用ThreadLocal.ThreadLocalMap
  • 如果 ThreadLocalMap 为null,则进行初始化setInitialValue,初始化代码会再次确认ThreadLocalMap是否为null。初始化代码即新建一个对象
void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }  this 就是 ThreadLocal
  • 如果 ThreadLocalMap 不为null,则以ThreadLocal为key,获取 ThreadLocalMap.Entry e

set

set方法也是如此判断ThreadLocalMap是否为null,是否需要初始化,只是最后直接设置指定值

public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }

ThreadLocalMap

ThreadLocalMap 是定义的map,其Entry继承了WeakReference,使用ThreadLocal作为键值,

static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }
        
   ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
            table = new Entry[INITIAL_CAPACITY];
            int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
            table[i] = new Entry(firstKey, firstValue);
            size = 1;
            setThreshold(INITIAL_CAPACITY);
        }

ThreadLocalMap内部存储结构为Entry数组 索引hash值为i(关于散列算法,可再写一篇), 散列值相同的,顺序后放

getEntry
private Entry getEntry(ThreadLocal<?> key) {
            int i = key.threadLocalHashCode & (table.length - 1);
            Entry e = table[i];
            if (e != null && e.get() == key)
                return e;
            else
                return getEntryAfterMiss(key, i, e);
        }
        
        private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
            Entry[] tab = table;
            int len = tab.length;

            while (e != null) {//循环条件
                ThreadLocal<?> k = e.get();
                if (k == key){
                     return e;//跳出循环
                }
                   
                if (k == null){
                    expungeStaleEntry(i);//清除过期数据 
                }else{
                    i = nextIndex(i, len);//获取下一个位置 
                }
                e = tab[i];//循环判断条件值更新
            }
            return null;
        }
        
        private static int nextIndex(int i, int len) {
            return ((i + 1 < len) ? i + 1 : 0);
        }

散列函数找到数组对应的位置,得到Entry,不为空并且key相等,即得到对应entry 否则调用 getEntryAfterMiss

nextIndex不断加1,获取下一个位置的entry,然后 while条件e!=null,进而判断e.get == key,如果 k==null,则expungeStaleEntry

expungeStaleEntry清除所有过期数据

private int expungeStaleEntry(int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;

            // expunge entry at staleSlot 清除 e.get()null的数据
            tab[staleSlot].value = null;
            tab[staleSlot] = null;
            size--;

            // Rehash until we encounter null  重新hash直到遇到null
            Entry e;
            int i;
            for (i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                ThreadLocal<?> k = e.get();
                if (k == null) {//清除 e.get()null的数据
                    e.value = null;
                    tab[i] = null;
                    size--;
                } else {//判断k(e.get())的位置是否和i相同,不相同则表示位置过期了,需要清除i位置的数据,并找到e的新位置
                    int h = k.threadLocalHashCode & (len - 1);
                    if (h != i) {
                        tab[i] = null;

                        // Unlike Knuth 6.4 Algorithm R, we must scan until
                        // null because multiple entries could have been stale.
                        //找到tabl[h]== null的位置,放入数据
                        while (tab[h] != null)
                            h = nextIndex(h, len);
                        tab[h] = e;
                    }
                }
            }
            return i;
        }
set
private void set(ThreadLocal<?> key, Object value) {

            // We don't use a fast path as with get() because it is at
            // least as common to use set() to create new entries as
            // it is to replace existing ones, in which case, a fast
            // path would fail more often than not.

            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);//hash 找到位置
            //判断i及其后位置,有没有key的数据或者null,有则更新或者替换,然后直接返回。无则在i位置new一个
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                ThreadLocal<?> k = e.get();

                if (k == key) {//相等的情况 设置值
                    e.value = value;
                    return;
                }

                if (k == null) {//null的情况,调用replaceStaleEntry清理过期数据并插入新数据
                什么情况下为null? 因为是弱引用 gc会将ThreadLocal回收掉,
                ThreadLocalMap内的表现就是出现了一个keynullEntry
                
                    replaceStaleEntry(key, value, i);
                    return;
                }
            }

            tab[i] = new Entry(key, value);
            int sz = ++size;
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
            //tab中无null数据,并且size>threshold 触发rehash
                rehash();
        }
replaceStaleEntry 没看明白
遍历清洗过期数据并在index处插入新数据,其他数据后移  
 cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
 清理过期数据必须进行,slotToExpunge只是提前找个清理位置
        private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                       int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;
            Entry e;

            // Back up to check for prior stale entry in current run.
            // We clean out whole runs at a time to avoid continual
            // incremental rehashing due to garbage collector freeing
            // up refs in bunches (i.e., whenever the collector runs).
            int slotToExpunge = staleSlot;
            //找到staleSlot之前第一个为null的位置 slotToExpunge
            for (int i = prevIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = prevIndex(i, len))
                if (e.get() == null)
                    slotToExpunge = i;

            // Find either the key or trailing null slot of run, whichever
            // occurs first
            for (int i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                ThreadLocal<?> k = e.get();

                // If we find key, then we need to swap it
                // with the stale entry to maintain hash table order.
                // The newly stale slot, or any other stale slot
                // encountered above it, can then be sent to expungeStaleEntry
                // to remove or rehash all of the other entries in run.
                //如果找到key,为了保持table的顺序,需要和过期entry交换位置。新的位置或者其他位置之后的数据,可以被送去清除
                if (k == key) {//找到staleSlot之后的位置,相等则设置
                    e.value = value;

                    tab[i] = tab[staleSlot];//tab[staleSlot]keynullentry,交换之后可以被清除
                    tab[staleSlot] = e;

                    // Start expunge at preceding stale entry if it exists 清除过期数据
                    if (slotToExpunge == staleSlot){
                       slotToExpunge = i; 
                    }
                    cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                    return;
                }

                // If we didn't find stale entry on backward scan, the
                // first stale entry seen while scanning for key is the
                // first still present in the run.
                if (k == null && slotToExpunge == staleSlot)
                    slotToExpunge = i;
            }

            // If key not found, put new entry in stale slot
            //没有找到,new一个
            tab[staleSlot].value = null;
            tab[staleSlot] = new Entry(key, value);

            // If there are any other stale entries in run, expunge them
            if (slotToExpunge != staleSlot)//清除过期数据 相等则不需要清除过期数据
                cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
        }
ThreadLocalMap难点

是一个entry数组,entry是弱引用,key可能随时被gc,为了解决内存泄漏问题,需要不断的清除过期的数据。并且数组hash索引方式是递增。

清除过期数据:暂时理解就是遍历key为null的就把value置为null,并且随时比较hash出的位置,进行替换

set方法replaceStaleEntry 略懵。核心:向后遍历数组,查找是否有key=当前数据,若有则设置。找不到,就新增。然后找到过期数据的位置,调用清除方法清除数据

内存泄漏问题:https://www.cnblogs.com/onlywujun/p/3524675.html 线程一直不结束,线程池 http://www.importnew.com/22039.html importnew 应该还是线程一直不结束的问题

https://wiki.apache.org/tomcat/MemoryLeakProtection tomcat的内存泄漏案例值得看

InheritableThreadLocal

父子线程信息传递

实现思路:

  • Thread类新增一个字段inheritableThreadLocals 指向 ThreadLocal.ThreadLocalMap,原来的threadLocals不再使用
  • InheritableThreadLocal 重写getMap createMap方法,使其inheritableThreadLocals 指向ThreadLocal.ThreadLocalMap
  • 当new Thread子线程时,判断inheritableThreadLocals是否为空,不空则拷贝父线程中ThreadLocalMap到子线程中,就可以获取到父线程中的值了
问题:子线程父线程中各自都有一个inheritableThreadLocals,指向各自的

ThreadLocal.ThreadLocalMap。所以子线程中InheritableThreadLocal修改了值,不会反应的父线程中。测试结果也是如此

public class InheritableThreadLocal<T> extends ThreadLocal<T> {
    protected T childValue(T parentValue) {
        return parentValue;
    }
    ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }
    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}

Thread 类

if (inheritThreadLocals && parent.inheritableThreadLocals != null){
    this.inheritableThreadLocals =
                ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); 
 }

ThreadLocal 类

static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
        return new ThreadLocalMap(parentMap);
    }
    
    private ThreadLocalMap(ThreadLocalMap parentMap) {
        Entry[] parentTable = parentMap.table;
        int len = parentTable.length;
        setThreshold(len);
        table = new Entry[len];

        for (int j = 0; j < len; j++) {
            Entry e = parentTable[j];
            if (e != null) {
                @SuppressWarnings("unchecked")
                ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                if (key != null) {
                    Object value = key.childValue(e.value);
                    Entry c = new Entry(key, value);
                    int h = key.threadLocalHashCode & (len - 1);
                    while (table[h] != null)
                        h = nextIndex(h, len);
                    table[h] = c;
                    size++;
                }
            }
        }
    }

transmittable-thread-local 框架

https://github.com/alibaba/transmittable-thread-local InheritableThreadLocal只能在new Thread中使用 本框架 则是封装 Runnable Callable

但对于使用线程池等会缓存线程的组件的情况,线程由线程池创建好,并且线程是缓存起来反复使用的;这时父子线程关系的ThreadLocal值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时的ThreadLocal值传递到 任务执行时。

实现思路

  • 自定义 TransmittableThreadLocal 并继承InheritableThreadLocal。*
TransmittableThreadLocal 类静态参数holder持有所有
new出来的TransmittableThreadLocal
  • 自定义 TtlRunnable 实现 Runnable TtlRunnable初始化方法中保持当前线程中已有的TransmittableThreadLocal
private TtlCallable(Callable<V> callable, boolean releaseTtlValueReferenceAfterCall) {
        this.copiedRef = new AtomicReference<Map<TransmittableThreadLocal<?>, Object>>(TransmittableThreadLocal.copy());
        this.callable = callable;
        this.releaseTtlValueReferenceAfterCall = releaseTtlValueReferenceAfterCall;
    }

run方法执行前中先backup holder中所有的TransmittableThreadLocal, copiedRef中不存在,holder存在的,说明是后来加进去的,remove掉holder中的 copied中的TransmittableThreadLocal set到当前线程中

执行后再恢复backup的数据到holder中(backup中不存在,holder中存在的,remove掉holder中的) backup中的TransmittableThreadLocal set到当前线程中

public void run() {
        Map<TransmittableThreadLocal<?>, Object> copied = copiedRef.get();
        if (copied == null || releaseTtlValueReferenceAfterRun && !copiedRef.compareAndSet(copied, null)) {
            throw new IllegalStateException("TTL value reference is released after run!");
        }

        Map<TransmittableThreadLocal<?>, Object> backup = TransmittableThreadLocal.backupAndSetToCopied(copied);
        try {
            runnable.run();
        } finally {
            TransmittableThreadLocal.restoreBackup(backup);
        }
    }

agent

-使用javaAgent, 如果类是 ThreadPoolExecutor 或者ScheduledThreadPoolExecutor, 把其中方法中参数为Runnable的参数,使用javassist替换为TtlRunnable,这样所有调用Runnable的方法就都是调用 TtlRunnable 了

String code = String.format("$%d = %s.get($%d, false, true);", i + 1, TTL_RUNNABLE_CLASS_NAME, i + 1);
相关标签: 多线程