Netty中FastThreadLocal源码分析
netty中使用fastthreadlocal替代jdk中的threadlocal【java】threadlocal源码分析,其用法和threadlocal 一样,只不过从名字fastthreadlocal来看,其处理效率要比jdk中的threadlocal要高
在类加载的时候,先初始化了一个静态成员:
1 private static final int variablestoremoveindex = internalthreadlocalmap.nextvariableindex();
实际上fastthreadlocal的操作都是通过对internalthreadlocalmap的操作来实现的,
而internalthreadlocalmap是unpaddedinternalthreadlocalmap的子类,unpaddedinternalthreadlocalmap的定义比较简单:
1 class unpaddedinternalthreadlocalmap { 2 static final threadlocal<internalthreadlocalmap> slowthreadlocalmap = new threadlocal(); 3 static final atomicinteger nextindex = new atomicinteger(); 4 object[] indexedvariables; 5 int futurelistenerstackdepth; 6 int localchannelreaderstackdepth; 7 map<class<?>, boolean> handlersharablecache; 8 integerholder counterhashcode; 9 threadlocalrandom random; 10 map<class<?>, typeparametermatcher> typeparametermatchergetcache; 11 map<class<?>, map<string, typeparametermatcher>> typeparametermatcherfindcache; 12 stringbuilder stringbuilder; 13 map<charset, charsetencoder> charsetencodercache; 14 map<charset, charsetdecoder> charsetdecodercache; 15 arraylist<object> arraylist; 16 17 unpaddedinternalthreadlocalmap(object[] indexedvariables) { 18 this.indexedvariables = indexedvariables; 19 } 20 }
可以看到在类加载时,会初始化一个泛型为internalthreadlocalmap的jdk的threadlocal对象作为其静态成员slowthreadlocalmap ,还有一个原子化的integer静态成员nextindex
internalthreadlocalmap的定义如下:
1 public final class internalthreadlocalmap extends unpaddedinternalthreadlocalmap { 2 private static final internallogger logger = internalloggerfactory.getinstance(internalthreadlocalmap.class); 3 private static final int default_array_list_initial_capacity = 8; 4 private static final int string_builder_initial_size = systempropertyutil.getint("io.netty.threadlocalmap.stringbuilder.initialsize", 1024); 5 private static final int string_builder_max_size; 6 public static final object unset = new object(); 7 private bitset cleanerflags;
internalthreadlocalmap的nextvariableindex方法:
1 public static int nextvariableindex() { 2 int index = nextindex.getandincrement(); 3 if (index < 0) { 4 nextindex.decrementandget(); 5 throw new illegalstateexception("too many thread-local indexed variables"); 6 } else { 7 return index; 8 } 9 }
这是一个cas滞后自增操作,获取nextindex自增前的值,那么variablestoremoveindex初始化时就是0,且恒为0,nextindex此时变成了1
fastthreadlocal对象的初始化:
1 private final int index = internalthreadlocalmap.nextvariableindex(); 2 3 public fastthreadlocal() { 4 }
由上面可知,index成员恒等于nextvariableindex的返回值,nextindex 的cas操作保障了每个fastthreadlocal对象的index是不同的
首先看到set方法:
1 public final void set(v value) { 2 if (value != internalthreadlocalmap.unset) { 3 internalthreadlocalmap threadlocalmap = internalthreadlocalmap.get(); 4 if (this.setknownnotunset(threadlocalmap, value)) { 5 this.registercleaner(threadlocalmap); 6 } 7 } else { 8 this.remove(); 9 } 10 11 }
只要set的value不是internalthreadlocalmap.unset,会先调用internalthreadlocalmap的get方法:
1 public static internalthreadlocalmap get() { 2 thread thread = thread.currentthread(); 3 return thread instanceof fastthreadlocalthread ? fastget((fastthreadlocalthread)thread) : slowget(); 4 }
判断当前线程是否是fastthreadlocalthread,是则调用fastget,否则调用slowget
fastthreadlocalthread是经过包装后的thread:
1 public class fastthreadlocalthread extends thread { 2 private final boolean cleanupfastthreadlocals; 3 private internalthreadlocalmap threadlocalmap; 4 5 public fastthreadlocalthread() { 6 this.cleanupfastthreadlocals = false; 7 } 8 9 public fastthreadlocalthread(runnable target) { 10 super(fastthreadlocalrunnable.wrap(target)); 11 this.cleanupfastthreadlocals = true; 12 } 13 14 public fastthreadlocalthread(threadgroup group, runnable target) { 15 super(group, fastthreadlocalrunnable.wrap(target)); 16 this.cleanupfastthreadlocals = true; 17 } 18 19 public fastthreadlocalthread(string name) { 20 super(name); 21 this.cleanupfastthreadlocals = false; 22 } 23 24 public fastthreadlocalthread(threadgroup group, string name) { 25 super(group, name); 26 this.cleanupfastthreadlocals = false; 27 } 28 29 public fastthreadlocalthread(runnable target, string name) { 30 super(fastthreadlocalrunnable.wrap(target), name); 31 this.cleanupfastthreadlocals = true; 32 } 33 34 public fastthreadlocalthread(threadgroup group, runnable target, string name) { 35 super(group, fastthreadlocalrunnable.wrap(target), name); 36 this.cleanupfastthreadlocals = true; 37 } 38 39 public fastthreadlocalthread(threadgroup group, runnable target, string name, long stacksize) { 40 super(group, fastthreadlocalrunnable.wrap(target), name, stacksize); 41 this.cleanupfastthreadlocals = true; 42 } 43 44 public final internalthreadlocalmap threadlocalmap() { 45 return this.threadlocalmap; 46 } 47 48 public final void setthreadlocalmap(internalthreadlocalmap threadlocalmap) { 49 this.threadlocalmap = threadlocalmap; 50 } 51 52 public boolean willcleanupfastthreadlocals() { 53 return this.cleanupfastthreadlocals; 54 } 55 56 public static boolean willcleanupfastthreadlocals(thread thread) { 57 return thread instanceof fastthreadlocalthread && ((fastthreadlocalthread)thread).willcleanupfastthreadlocals(); 58 } 59 }
如果看过我之前写的threadlocal源码分析,看到这就明白,jdk的threadlocal中很重要的一点是在thread类中有一个threadlocalmap类型的成员,每个线程都维护这一张threadlocalmap,通过threadlocalmap来和threadlocal对象产生映射关系;而这里和jdk同理绑定的就是internalthreadlocalmap。
fastget方法:
1 private static internalthreadlocalmap fastget(fastthreadlocalthread thread) { 2 internalthreadlocalmap threadlocalmap = thread.threadlocalmap(); 3 if (threadlocalmap == null) { 4 thread.setthreadlocalmap(threadlocalmap = new internalthreadlocalmap()); 5 } 6 7 return threadlocalmap; 8 }
这里也和jdk的threadlocal类似,判断fastthreadlocalthread 线程的threadlocalmap成员是否为null,若是null,则先创建一个internalthreadlocalmap实例:
1 private internalthreadlocalmap() { 2 super(newindexedvariabletable()); 3 }
先调用newindexedvariabletable方法:
1 private static object[] newindexedvariabletable() { 2 object[] array = new object[32]; 3 arrays.fill(array, unset); 4 return array; 5 }
创建了一个大小为32的数组,并且用unset这个object填充了整个数组,然后调用unpaddedinternalthreadlocalmap的构造,令indexedvariables成员保存该数组
再来看slowget方法:
1 private static internalthreadlocalmap slowget() { 2 threadlocal<internalthreadlocalmap> slowthreadlocalmap = unpaddedinternalthreadlocalmap.slowthreadlocalmap; 3 internalthreadlocalmap ret = (internalthreadlocalmap)slowthreadlocalmap.get(); 4 if (ret == null) { 5 ret = new internalthreadlocalmap(); 6 slowthreadlocalmap.set(ret); 7 } 8 9 return ret; 10 }
可以看到,其实这里为了提高效率,并没有直接使用jdk的threadlocal,而是给当前非fastthreadlocalthread线程绑定了一个threadlocal<internalthreadlocalmap>对象,避免直接使用jdk的threadlocal效率低。
回到fastthreadlocal的set方法,在取得到了当前线程的internalthreadlocalmap成员后,调用setknownnotunset方法:
1 private boolean setknownnotunset(internalthreadlocalmap threadlocalmap, v value) { 2 if (threadlocalmap.setindexedvariable(this.index, value)) { 3 addtovariablestoremove(threadlocalmap, this); 4 return true; 5 } else { 6 return false; 7 } 8 }
首先调用了internalthreadlocalmap的setindexedvariable方法:
1 public boolean setindexedvariable(int index, object value) { 2 object[] lookup = this.indexedvariables; 3 if (index < lookup.length) { 4 object oldvalue = lookup[index]; 5 lookup[index] = value; 6 return oldvalue == unset; 7 } else { 8 this.expandindexedvariabletableandset(index, value); 9 return true; 10 } 11 }
因为index是不可更改的常量,所以这里有两种情况:
当indexedvariables这个object数组的长度大于index时,直接将value放在indexedvariables数组下标为index的位置,返回oldvalue是否等于unset,若是不等于unset,说明已经set过了,直进行替换,若是等于unset,还要进行后续的registercleaner
当indexedvariables这个object数组的长度小于等于index时,调用expandindexedvariabletableandset方法扩容
expandindexedvariabletableandset方法:
1 private void expandindexedvariabletableandset(int index, object value) { 2 object[] oldarray = this.indexedvariables; 3 int oldcapacity = oldarray.length; 4 int newcapacity = index | index >>> 1; 5 newcapacity |= newcapacity >>> 2; 6 newcapacity |= newcapacity >>> 4; 7 newcapacity |= newcapacity >>> 8; 8 newcapacity |= newcapacity >>> 16; 9 ++newcapacity; 10 object[] newarray = arrays.copyof(oldarray, newcapacity); 11 arrays.fill(newarray, oldcapacity, newarray.length, unset); 12 newarray[index] = value; 13 this.indexedvariables = newarray; 14 }
如果读过hashmap源码的话对上述的位运算操作因该不陌生,这个位运算产生的newcapacity的值是大于oldcapacity的最小的二的整数幂(【java】hashmap中的tablesizefor方法)
然后申请一个newcapacity大小的数组,将原数组的内容拷贝到新数组,并且用unset填充剩余部分,还是将value放在下标为index的位置,用indexedvariables保存新数组。
setindexedvariable成立后,setknownnotunset继续调用addtovariablestoremove方法:
1 private static void addtovariablestoremove(internalthreadlocalmap threadlocalmap, fastthreadlocal<?> variable) { 2 object v = threadlocalmap.indexedvariable(variablestoremoveindex); 3 set variablestoremove; 4 if (v != internalthreadlocalmap.unset && v != null) { 5 variablestoremove = (set)v; 6 } else { 7 variablestoremove = collections.newsetfrommap(new identityhashmap()); 8 threadlocalmap.setindexedvariable(variablestoremoveindex, variablestoremove); 9 } 10 11 variablestoremove.add(variable); 12 }
上面说过variablestoremoveindex恒为0,调用internalthreadlocalmap的indexedvariable方法:
1 public object indexedvariable(int index) { 2 object[] lookup = this.indexedvariables; 3 return index < lookup.length ? lookup[index] : unset; 4 }
由于variablestoremoveindex恒等于0,所以这里判断indexedvariables这个object数组是否为空,若是为空,则返回第0个元素,若不是则返回unset
在addtovariablestoremove中,接着对indexedvariables的返回值进行了判断,
判断不是unset,并且不等于null,则说明是set过的,然后将刚才的返回值强转为set类型
若上述条件不成立,创建一个identityhashmap,将其包装成set赋值给variablestoremove,然后调用internalthreadlocalmap的setindexedvariable方法,这里就和上面不一样了,上面是将value放在下标为index的位置,而这里是将set放在下标为0的位置。
看到这,再结合上面来看,其实已经有一个大致的想法了,一开始在set时,是将value放在internalthreadlocalmap的object数组下标为index的位置,然后在这里获取下标为0的set,说明value是暂时放在下标为index的位置,然后判断下标为0的位置有没有set,若是有,取出这个set ,将当前fastthreadlocal对象放入set中,则说明这个set中存放的是fastthreadlocal集合
那么就有如下关系:
回到fastthreadlocal的set方法,在setknownnotunset成立后,调用registercleaner方法:
1 private void registercleaner(internalthreadlocalmap threadlocalmap) { 2 thread current = thread.currentthread(); 3 if (!fastthreadlocalthread.willcleanupfastthreadlocals(current) && !threadlocalmap.iscleanerflagset(this.index)) { 4 threadlocalmap.setcleanerflag(this.index); 5 } 6 }
willcleanupfastthreadlocals的返回值在前面fastthreadlocalthread的初始化时就确定了,看到iscleanerflagset方法:
1 public boolean iscleanerflagset(int index) { 2 return this.cleanerflags != null && this.cleanerflags.get(index); 3 }
cleanerflags 是一个bitset对象,在internalthreadlocalmap初始化时是null,
若不是第一次的set操作,则根据index,获取index在bitset对应位的值
这里使用bitset,使其持有的位和indexedvariables这个object数组形成了一一对应关系,每一位都是0和1代表当前indexedvariables的对应下标位置的使用情况,0表示没有使用对应unset,1则代表有value
在上面条件成立的情况下,调用setcleanerflag方法:
1 public void setcleanerflag(int index) { 2 if (this.cleanerflags == null) { 3 this.cleanerflags = new bitset(); 4 } 5 6 this.cleanerflags.set(index); 7 }
逻辑比较简单,判断cleanerflags是否初始化,若没有,则立即初始化,再将cleanerflags中对应index位的值设为1;
这里通过registercleaner直接标记了所有set了value的下标可,为以后的removeall 清除提高效率。
下来看fastthreadlocal的get方法:
1 public final v get() { 2 internalthreadlocalmap threadlocalmap = internalthreadlocalmap.get(); 3 object v = threadlocalmap.indexedvariable(this.index); 4 if (v != internalthreadlocalmap.unset) { 5 return v; 6 } else { 7 v value = this.initialize(threadlocalmap); 8 this.registercleaner(threadlocalmap); 9 return value; 10 } 11 }
和上面一样,先取得当前线程持有的internalthreadlocalmap ,调用indexedvariable方法,根据当前fastthreadlocal的index定位,判断是否是unset(set过),若没有set过则和jdk一样调用initialize先set:
1 private v initialize(internalthreadlocalmap threadlocalmap) { 2 object v = null; 3 4 try { 5 v = this.initialvalue(); 6 } catch (exception var4) { 7 platformdependent.throwexception(var4); 8 } 9 10 threadlocalmap.setindexedvariable(this.index, v); 11 addtovariablestoremove(threadlocalmap, this); 12 return v; 13 }
initialvalue()方法就是对外提供的,需要手动覆盖:
1 protected v initialvalue() throws exception { 2 return null; 3 }
后面的操作就和set的逻辑一样。
remove方法:
1 public final void remove() { 2 this.remove(internalthreadlocalmap.getifset()); 3 }
getifset方法:
1 public static internalthreadlocalmap getifset() { 2 thread thread = thread.currentthread(); 3 return thread instanceof fastthreadlocalthread ? ((fastthreadlocalthread)thread).threadlocalmap() : (internalthreadlocalmap)slowthreadlocalmap.get(); 4 }
和上面的get方法思路相似,只不过在这里如果获取不到不会创建
然后调用remove重载:
1 public final void remove(internalthreadlocalmap threadlocalmap) { 2 if (threadlocalmap != null) { 3 object v = threadlocalmap.removeindexedvariable(this.index); 4 removefromvariablestoremove(threadlocalmap, this); 5 if (v != internalthreadlocalmap.unset) { 6 try { 7 this.onremoval(v); 8 } catch (exception var4) { 9 platformdependent.throwexception(var4); 10 } 11 } 12 13 } 14 }
先检查threadlocalmap是否存在,若存在才进行后续操作:
调用removeindexedvariable方法:
1 public object removeindexedvariable(int index) { 2 object[] lookup = this.indexedvariables; 3 if (index < lookup.length) { 4 object v = lookup[index]; 5 lookup[index] = unset; 6 return v; 7 } else { 8 return unset; 9 } 10 }
和之前的setindexedvariable逻辑相似,只不过现在是把index位置的元素设置为unset
接着调用removefromvariablestoremove方法:
1 private static void removefromvariablestoremove(internalthreadlocalmap threadlocalmap, fastthreadlocal<?> variable) { 2 object v = threadlocalmap.indexedvariable(variablestoremoveindex); 3 if (v != internalthreadlocalmap.unset && v != null) { 4 set<fastthreadlocal<?>> variablestoremove = (set)v; 5 variablestoremove.remove(variable); 6 } 7 }
之前说过variablestoremoveindex恒为0,在object数组中下标为0存储的set<fastthreadlocal<?>>集合(不为unset情况下),从集合中,将当前fastthreadlocal移除掉
最后调用了onremoval方法,该方法需要由用户去覆盖:
1 protected void onremoval(v value) throws exception { 2 }
removeall方法,是一个静态方法:
1 public static void removeall() { 2 internalthreadlocalmap threadlocalmap = internalthreadlocalmap.getifset(); 3 if (threadlocalmap != null) { 4 try { 5 object v = threadlocalmap.indexedvariable(variablestoremoveindex); 6 if (v != null && v != internalthreadlocalmap.unset) { 7 set<fastthreadlocal<?>> variablestoremove = (set)v; 8 fastthreadlocal<?>[] variablestoremovearray = (fastthreadlocal[])variablestoremove.toarray(new fastthreadlocal[0]); 9 fastthreadlocal[] var4 = variablestoremovearray; 10 int var5 = variablestoremovearray.length; 11 12 for(int var6 = 0; var6 < var5; ++var6) { 13 fastthreadlocal<?> tlv = var4[var6]; 14 tlv.remove(threadlocalmap); 15 } 16 } 17 } finally { 18 internalthreadlocalmap.remove(); 19 } 20 21 } 22 }
首先获取当前线程的internalthreadlocalmap,若是存在继续后续操作:
通过indexedvariable方法,取出object数组中下标为0的set集合(如果不是unset情况下),将其转换为fastthreadlocal数组,遍历这个数组调用上面的remove方法。
fastthreadlocal源码分析到此结束。
推荐阅读
-
PHP中array_keys和array_unique函数源码的分析
-
python中pygame针对游戏窗口的显示方法实例分析(附源码)
-
Python的socket模块源码中的一些实现要点分析
-
Java编程中ArrayList源码分析
-
Netty源码分析 (四)----- ChannelPipeline
-
Netty源码分析 (三)----- 服务端启动源码分析
-
Flink中watermark为什么选择最小一条(源码分析)
-
Python的socket模块源码中的一些实现要点分析
-
python中pygame针对游戏窗口的显示方法实例分析(附源码)
-
Netty源码分析之ChannelPipeline(二)—ChannelHandler的添加与删除