欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty中FastThreadLocal源码分析

程序员文章站 2022-07-06 18:57:01
Netty中使用FastThreadLocal替代JDK中的ThreadLocal【JAVA】ThreadLocal源码分析,其用法和ThreadLocal 一样,只不过从名字FastThreadLocal来看,其处理效率要比JDK中的ThreadLocal要高 在类加载的时候,先初始化了一个静态成 ......

netty中使用fastthreadlocal替代jdk中的threadlocal【java】threadlocal源码分析,其用法和threadlocal 一样,只不过从名字fastthreadlocal来看,其处理效率要比jdk中的threadlocal要高

在类加载的时候,先初始化了一个静态成员:

1 private static final int variablestoremoveindex = internalthreadlocalmap.nextvariableindex();

实际上fastthreadlocal的操作都是通过对internalthreadlocalmap的操作来实现的,

而internalthreadlocalmap是unpaddedinternalthreadlocalmap的子类,unpaddedinternalthreadlocalmap的定义比较简单:

 1 class unpaddedinternalthreadlocalmap {
 2     static final threadlocal<internalthreadlocalmap> slowthreadlocalmap = new threadlocal();
 3     static final atomicinteger nextindex = new atomicinteger();
 4     object[] indexedvariables;
 5     int futurelistenerstackdepth;
 6     int localchannelreaderstackdepth;
 7     map<class<?>, boolean> handlersharablecache;
 8     integerholder counterhashcode;
 9     threadlocalrandom random;
10     map<class<?>, typeparametermatcher> typeparametermatchergetcache;
11     map<class<?>, map<string, typeparametermatcher>> typeparametermatcherfindcache;
12     stringbuilder stringbuilder;
13     map<charset, charsetencoder> charsetencodercache;
14     map<charset, charsetdecoder> charsetdecodercache;
15     arraylist<object> arraylist;
16 
17     unpaddedinternalthreadlocalmap(object[] indexedvariables) {
18         this.indexedvariables = indexedvariables;
19     }
20 }

可以看到在类加载时,会初始化一个泛型为internalthreadlocalmap的jdk的threadlocal对象作为其静态成员slowthreadlocalmap ,还有一个原子化的integer静态成员nextindex

internalthreadlocalmap的定义如下:

1 public final class internalthreadlocalmap extends unpaddedinternalthreadlocalmap {
2     private static final internallogger logger = internalloggerfactory.getinstance(internalthreadlocalmap.class);
3     private static final int default_array_list_initial_capacity = 8;
4     private static final int string_builder_initial_size = systempropertyutil.getint("io.netty.threadlocalmap.stringbuilder.initialsize", 1024);
5     private static final int string_builder_max_size;
6     public static final object unset = new object();
7     private bitset cleanerflags;

internalthreadlocalmap的nextvariableindex方法:

1 public static int nextvariableindex() {
2     int index = nextindex.getandincrement();
3     if (index < 0) {
4         nextindex.decrementandget();
5         throw new illegalstateexception("too many thread-local indexed variables");
6     } else {
7         return index;
8     }
9 }

这是一个cas滞后自增操作,获取nextindex自增前的值,那么variablestoremoveindex初始化时就是0,且恒为0,nextindex此时变成了1

fastthreadlocal对象的初始化:

1 private final int index = internalthreadlocalmap.nextvariableindex();
2 
3 public fastthreadlocal() {
4 }

由上面可知,index成员恒等于nextvariableindex的返回值,nextindex 的cas操作保障了每个fastthreadlocal对象的index是不同的

首先看到set方法:

 1 public final void set(v value) {
 2     if (value != internalthreadlocalmap.unset) {
 3         internalthreadlocalmap threadlocalmap = internalthreadlocalmap.get();
 4         if (this.setknownnotunset(threadlocalmap, value)) {
 5             this.registercleaner(threadlocalmap);
 6         }
 7     } else {
 8         this.remove();
 9     }
10 
11 }

只要set的value不是internalthreadlocalmap.unset,会先调用internalthreadlocalmap的get方法:

1 public static internalthreadlocalmap get() {
2     thread thread = thread.currentthread();
3     return thread instanceof fastthreadlocalthread ? fastget((fastthreadlocalthread)thread) : slowget();
4 }

判断当前线程是否是fastthreadlocalthread,是则调用fastget,否则调用slowget
fastthreadlocalthread是经过包装后的thread:

 1 public class fastthreadlocalthread extends thread {
 2     private final boolean cleanupfastthreadlocals;
 3     private internalthreadlocalmap threadlocalmap;
 4 
 5     public fastthreadlocalthread() {
 6         this.cleanupfastthreadlocals = false;
 7     }
 8 
 9     public fastthreadlocalthread(runnable target) {
10         super(fastthreadlocalrunnable.wrap(target));
11         this.cleanupfastthreadlocals = true;
12     }
13 
14     public fastthreadlocalthread(threadgroup group, runnable target) {
15         super(group, fastthreadlocalrunnable.wrap(target));
16         this.cleanupfastthreadlocals = true;
17     }
18 
19     public fastthreadlocalthread(string name) {
20         super(name);
21         this.cleanupfastthreadlocals = false;
22     }
23 
24     public fastthreadlocalthread(threadgroup group, string name) {
25         super(group, name);
26         this.cleanupfastthreadlocals = false;
27     }
28 
29     public fastthreadlocalthread(runnable target, string name) {
30         super(fastthreadlocalrunnable.wrap(target), name);
31         this.cleanupfastthreadlocals = true;
32     }
33 
34     public fastthreadlocalthread(threadgroup group, runnable target, string name) {
35         super(group, fastthreadlocalrunnable.wrap(target), name);
36         this.cleanupfastthreadlocals = true;
37     }
38 
39     public fastthreadlocalthread(threadgroup group, runnable target, string name, long stacksize) {
40         super(group, fastthreadlocalrunnable.wrap(target), name, stacksize);
41         this.cleanupfastthreadlocals = true;
42     }
43 
44     public final internalthreadlocalmap threadlocalmap() {
45         return this.threadlocalmap;
46     }
47 
48     public final void setthreadlocalmap(internalthreadlocalmap threadlocalmap) {
49         this.threadlocalmap = threadlocalmap;
50     }
51 
52     public boolean willcleanupfastthreadlocals() {
53         return this.cleanupfastthreadlocals;
54     }
55 
56     public static boolean willcleanupfastthreadlocals(thread thread) {
57         return thread instanceof fastthreadlocalthread && ((fastthreadlocalthread)thread).willcleanupfastthreadlocals();
58     }
59 }

如果看过我之前写的threadlocal源码分析,看到这就明白,jdk的threadlocal中很重要的一点是在thread类中有一个threadlocalmap类型的成员,每个线程都维护这一张threadlocalmap,通过threadlocalmap来和threadlocal对象产生映射关系;而这里和jdk同理绑定的就是internalthreadlocalmap。

fastget方法:

1 private static internalthreadlocalmap fastget(fastthreadlocalthread thread) {
2    internalthreadlocalmap threadlocalmap = thread.threadlocalmap();
3     if (threadlocalmap == null) {
4         thread.setthreadlocalmap(threadlocalmap = new internalthreadlocalmap());
5     }
6 
7     return threadlocalmap;
8 }

这里也和jdk的threadlocal类似,判断fastthreadlocalthread 线程的threadlocalmap成员是否为null,若是null,则先创建一个internalthreadlocalmap实例:

1 private internalthreadlocalmap() {
2     super(newindexedvariabletable());
3 }

先调用newindexedvariabletable方法:

1 private static object[] newindexedvariabletable() {
2     object[] array = new object[32];
3     arrays.fill(array, unset);
4     return array;
5 }

创建了一个大小为32的数组,并且用unset这个object填充了整个数组,然后调用unpaddedinternalthreadlocalmap的构造,令indexedvariables成员保存该数组

再来看slowget方法:

 1 private static internalthreadlocalmap slowget() {
 2     threadlocal<internalthreadlocalmap> slowthreadlocalmap = unpaddedinternalthreadlocalmap.slowthreadlocalmap;
 3     internalthreadlocalmap ret = (internalthreadlocalmap)slowthreadlocalmap.get();
 4     if (ret == null) {
 5         ret = new internalthreadlocalmap();
 6         slowthreadlocalmap.set(ret);
 7     }
 8 
 9     return ret;
10 }

可以看到,其实这里为了提高效率,并没有直接使用jdk的threadlocal,而是给当前非fastthreadlocalthread线程绑定了一个threadlocal<internalthreadlocalmap>对象,避免直接使用jdk的threadlocal效率低。

回到fastthreadlocal的set方法,在取得到了当前线程的internalthreadlocalmap成员后,调用setknownnotunset方法:

1 private boolean setknownnotunset(internalthreadlocalmap threadlocalmap, v value) {
2     if (threadlocalmap.setindexedvariable(this.index, value)) {
3         addtovariablestoremove(threadlocalmap, this);
4         return true;
5     } else {
6         return false;
7     }
8 }

首先调用了internalthreadlocalmap的setindexedvariable方法:

 1 public boolean setindexedvariable(int index, object value) {
 2     object[] lookup = this.indexedvariables;
 3     if (index < lookup.length) {
 4         object oldvalue = lookup[index];
 5         lookup[index] = value;
 6         return oldvalue == unset;
 7     } else {
 8         this.expandindexedvariabletableandset(index, value);
 9         return true;
10     }
11 }

因为index是不可更改的常量,所以这里有两种情况:
当indexedvariables这个object数组的长度大于index时,直接将value放在indexedvariables数组下标为index的位置,返回oldvalue是否等于unset,若是不等于unset,说明已经set过了,直进行替换,若是等于unset,还要进行后续的registercleaner
当indexedvariables这个object数组的长度小于等于index时,调用expandindexedvariabletableandset方法扩容

expandindexedvariabletableandset方法:

 1 private void expandindexedvariabletableandset(int index, object value) {
 2     object[] oldarray = this.indexedvariables;
 3     int oldcapacity = oldarray.length;
 4     int newcapacity = index | index >>> 1;
 5     newcapacity |= newcapacity >>> 2;
 6     newcapacity |= newcapacity >>> 4;
 7     newcapacity |= newcapacity >>> 8;
 8     newcapacity |= newcapacity >>> 16;
 9     ++newcapacity;
10     object[] newarray = arrays.copyof(oldarray, newcapacity);
11     arrays.fill(newarray, oldcapacity, newarray.length, unset);
12     newarray[index] = value;
13     this.indexedvariables = newarray;
14 }

如果读过hashmap源码的话对上述的位运算操作因该不陌生,这个位运算产生的newcapacity的值是大于oldcapacity的最小的二的整数幂(【java】hashmap中的tablesizefor方法

然后申请一个newcapacity大小的数组,将原数组的内容拷贝到新数组,并且用unset填充剩余部分,还是将value放在下标为index的位置,用indexedvariables保存新数组。

setindexedvariable成立后,setknownnotunset继续调用addtovariablestoremove方法:

 1 private static void addtovariablestoremove(internalthreadlocalmap threadlocalmap, fastthreadlocal<?> variable) {
 2     object v = threadlocalmap.indexedvariable(variablestoremoveindex);
 3     set variablestoremove;
 4     if (v != internalthreadlocalmap.unset && v != null) {
 5         variablestoremove = (set)v;
 6     } else {
 7         variablestoremove = collections.newsetfrommap(new identityhashmap());
 8         threadlocalmap.setindexedvariable(variablestoremoveindex, variablestoremove);
 9     }
10 
11     variablestoremove.add(variable);
12 }

上面说过variablestoremoveindex恒为0,调用internalthreadlocalmap的indexedvariable方法:

1 public object indexedvariable(int index) {
2     object[] lookup = this.indexedvariables;
3     return index < lookup.length ? lookup[index] : unset;
4 }

由于variablestoremoveindex恒等于0,所以这里判断indexedvariables这个object数组是否为空,若是为空,则返回第0个元素,若不是则返回unset

在addtovariablestoremove中,接着对indexedvariables的返回值进行了判断,
判断不是unset,并且不等于null,则说明是set过的,然后将刚才的返回值强转为set类型
若上述条件不成立,创建一个identityhashmap,将其包装成set赋值给variablestoremove,然后调用internalthreadlocalmap的setindexedvariable方法,这里就和上面不一样了,上面是将value放在下标为index的位置,而这里是将set放在下标为0的位置。

看到这,再结合上面来看,其实已经有一个大致的想法了,一开始在set时,是将value放在internalthreadlocalmap的object数组下标为index的位置,然后在这里获取下标为0的set,说明value是暂时放在下标为index的位置,然后判断下标为0的位置有没有set,若是有,取出这个set ,将当前fastthreadlocal对象放入set中,则说明这个set中存放的是fastthreadlocal集合
那么就有如下关系:

Netty中FastThreadLocal源码分析

回到fastthreadlocal的set方法,在setknownnotunset成立后,调用registercleaner方法:

1 private void registercleaner(internalthreadlocalmap threadlocalmap) {
2     thread current = thread.currentthread();
3     if (!fastthreadlocalthread.willcleanupfastthreadlocals(current) && !threadlocalmap.iscleanerflagset(this.index)) {
4         threadlocalmap.setcleanerflag(this.index);
5     }
6 }

willcleanupfastthreadlocals的返回值在前面fastthreadlocalthread的初始化时就确定了,看到iscleanerflagset方法:

1 public boolean iscleanerflagset(int index) {
2     return this.cleanerflags != null && this.cleanerflags.get(index);
3 }

cleanerflags 是一个bitset对象,在internalthreadlocalmap初始化时是null,
若不是第一次的set操作,则根据index,获取index在bitset对应位的值

这里使用bitset,使其持有的位和indexedvariables这个object数组形成了一一对应关系,每一位都是0和1代表当前indexedvariables的对应下标位置的使用情况,0表示没有使用对应unset,1则代表有value

在上面条件成立的情况下,调用setcleanerflag方法:

1 public void setcleanerflag(int index) {
2     if (this.cleanerflags == null) {
3         this.cleanerflags = new bitset();
4     }
5 
6     this.cleanerflags.set(index);
7 }

逻辑比较简单,判断cleanerflags是否初始化,若没有,则立即初始化,再将cleanerflags中对应index位的值设为1;

这里通过registercleaner直接标记了所有set了value的下标可,为以后的removeall 清除提高效率。

下来看fastthreadlocal的get方法:

 1 public final v get() {
 2     internalthreadlocalmap threadlocalmap = internalthreadlocalmap.get();
 3     object v = threadlocalmap.indexedvariable(this.index);
 4     if (v != internalthreadlocalmap.unset) {
 5         return v;
 6     } else {
 7         v value = this.initialize(threadlocalmap);
 8         this.registercleaner(threadlocalmap);
 9         return value;
10     }
11 }

和上面一样,先取得当前线程持有的internalthreadlocalmap ,调用indexedvariable方法,根据当前fastthreadlocal的index定位,判断是否是unset(set过),若没有set过则和jdk一样调用initialize先set:

 1 private v initialize(internalthreadlocalmap threadlocalmap) {
 2     object v = null;
 3 
 4     try {
 5         v = this.initialvalue();
 6     } catch (exception var4) {
 7         platformdependent.throwexception(var4);
 8     }
 9 
10     threadlocalmap.setindexedvariable(this.index, v);
11     addtovariablestoremove(threadlocalmap, this);
12     return v;
13 }

initialvalue()方法就是对外提供的,需要手动覆盖:

1 protected v initialvalue() throws exception {
2     return null;
3 }

后面的操作就和set的逻辑一样。

 

remove方法:

1 public final void remove() {
2     this.remove(internalthreadlocalmap.getifset());
3 }

getifset方法:

1 public static internalthreadlocalmap getifset() {
2     thread thread = thread.currentthread();
3     return thread instanceof fastthreadlocalthread ? ((fastthreadlocalthread)thread).threadlocalmap() : (internalthreadlocalmap)slowthreadlocalmap.get();
4 }

和上面的get方法思路相似,只不过在这里如果获取不到不会创建
然后调用remove重载:

 1 public final void remove(internalthreadlocalmap threadlocalmap) {
 2     if (threadlocalmap != null) {
 3         object v = threadlocalmap.removeindexedvariable(this.index);
 4         removefromvariablestoremove(threadlocalmap, this);
 5         if (v != internalthreadlocalmap.unset) {
 6             try {
 7                 this.onremoval(v);
 8             } catch (exception var4) {
 9                 platformdependent.throwexception(var4);
10             }
11         }
12 
13     }
14 }

先检查threadlocalmap是否存在,若存在才进行后续操作:
调用removeindexedvariable方法:

 1 public object removeindexedvariable(int index) {
 2     object[] lookup = this.indexedvariables;
 3     if (index < lookup.length) {
 4         object v = lookup[index];
 5         lookup[index] = unset;
 6         return v;
 7     } else {
 8         return unset;
 9     }
10 }

和之前的setindexedvariable逻辑相似,只不过现在是把index位置的元素设置为unset

接着调用removefromvariablestoremove方法:

1 private static void removefromvariablestoremove(internalthreadlocalmap threadlocalmap, fastthreadlocal<?> variable) {
2     object v = threadlocalmap.indexedvariable(variablestoremoveindex);
3     if (v != internalthreadlocalmap.unset && v != null) {
4         set<fastthreadlocal<?>> variablestoremove = (set)v;
5         variablestoremove.remove(variable);
6     }
7 }

之前说过variablestoremoveindex恒为0,在object数组中下标为0存储的set<fastthreadlocal<?>>集合(不为unset情况下),从集合中,将当前fastthreadlocal移除掉
最后调用了onremoval方法,该方法需要由用户去覆盖:

1 protected void onremoval(v value) throws exception {
2 }


removeall方法,是一个静态方法:

 1 public static void removeall() {
 2     internalthreadlocalmap threadlocalmap = internalthreadlocalmap.getifset();
 3     if (threadlocalmap != null) {
 4         try {
 5             object v = threadlocalmap.indexedvariable(variablestoremoveindex);
 6             if (v != null && v != internalthreadlocalmap.unset) {
 7                 set<fastthreadlocal<?>> variablestoremove = (set)v;
 8                 fastthreadlocal<?>[] variablestoremovearray = (fastthreadlocal[])variablestoremove.toarray(new fastthreadlocal[0]);
 9                 fastthreadlocal[] var4 = variablestoremovearray;
10                 int var5 = variablestoremovearray.length;
11 
12                 for(int var6 = 0; var6 < var5; ++var6) {
13                     fastthreadlocal<?> tlv = var4[var6];
14                     tlv.remove(threadlocalmap);
15                 }
16             }
17         } finally {
18             internalthreadlocalmap.remove();
19         }
20 
21     }
22 }

首先获取当前线程的internalthreadlocalmap,若是存在继续后续操作:
通过indexedvariable方法,取出object数组中下标为0的set集合(如果不是unset情况下),将其转换为fastthreadlocal数组,遍历这个数组调用上面的remove方法。

fastthreadlocal源码分析到此结束。