ConcurrentHashMap源码刨析(基于jdk1.7)
看源码前我们必须先知道一下concurrenthashmap的基本结构。concurrenthashmap是采用分段锁来进行并发控制的。
其中有一个内部类为segment类用来表示锁。而segment类里又有一个hashentry<k,v>[]数组,这个数组才是真正用
来存放我们的key-value的。
大概为如下图结构。一个segment数组,而segment数组每个元素为一个hashentry数组
看源码前我们还必须了解的几个默认的常量值:
-
default_initial_capacity = 16 容器默认容量为16
-
default_load_factor = 0.75f 默认扩容因子是0.75
-
default_concurrency_level = 16 默认并发度是16
-
maximum_capacity = 1 << 30 容器最大容量为1073741824
-
min_segment_table_capacity = 2 段的最小大小
-
max_segments = 1 << 16 段的最大大小
-
retries_before_lock = 2 通过不获取锁的方式尝试获取size的次数
以上以及默认值是concurrenthashmap中定义好的,下面我们很多地方会用到他们。
先从初始化开始说起
通过我们使用concurrenthashmap都是通过 concurrenthashmap<string,string> map = new concurrenthashmap<>();的方式
我们点进去跟踪下源码
/** * creates a new, empty map with a default initial capacity (16), * load factor (0.75) and concurrencylevel (16). */ public concurrenthashmap() { this(default_initial_capacity, default_load_factor, default_concurrency_level); }
可以看到,默认无参构造函数内调用了另一个带参构造函数,而这个构造函数也就是不管你初始化时传进来什么参数,最终都会跳到那个带参构造函数。
点进去看看这个带参构造函数实现了什么功能
public concurrenthashmap(int initialcapacity, float loadfactor, int concurrencylevel) { if (!(loadfactor > 0) || initialcapacity < 0 || concurrencylevel <= 0) throw new illegalargumentexception(); if (concurrencylevel > max_segments) concurrencylevel = max_segments; // find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencylevel) { ++sshift; ssize <<= 1; } this.segmentshift = 32 - sshift; this.segmentmask = ssize - 1; if (initialcapacity > maximum_capacity) initialcapacity = maximum_capacity; int c = initialcapacity / ssize; if (c * ssize < initialcapacity) ++c; int cap = min_segment_table_capacity; while (cap < c) cap <<= 1; // create segments and segments[0] segment<k,v> s0 = new segment<k,v>(loadfactor, (int)(cap * loadfactor), (hashentry<k,v>[])new hashentry[cap]); segment<k,v>[] ss = (segment<k,v>[])new segment[ssize]; unsafe.putorderedobject(ss, sbase, s0); // ordered write of segments[0] this.segments = ss; }
我们看到该构造函数一共有三个参数,分别是容器的初始化大小、负载因子、并发度,这三个参数如果我们new 一个concurrenthashmap时没有指定,
那么将会采用默认的参数,也就是我们本文开始说的那几个常量值。
在这里我对这三个参数做下解释。容器初始化大小是整个map的容量。负载因子是用来计算每个segment里的hashentry数组扩容时的阈值的。并发度是
用来设置segment数组的长度的。
开头这两个if没什么好说的。就是用来判断我们传进来的参数的正确性。当负载因子,初始容量和并发度不按照规范来时会抛出算术异常。第二个if时当传进来的
并发度大于最大段大小的时候,就将其设置为最大段大小。
这段就比较有意思了。由于segment数组要求长度必须为2的n次方,当我们传进来的并发度不是2的n次方时会计算出一个最接近它的2的n次方值
比如如何我们传进来的并发度为14 15那么通过计算segment数组长度就是16。在上图中我们可以看到两个局部变量ssize和sshift,在循环中如果ssize小于
并发度就将其二进制左移一位,即乘2。因此ssize就是用来保存我们计算出来的最接近并发度的2的n次方值。而ssfhit是用来计算偏移量的。在这里我们又
要说两个很重要的全局常量。segmentmask和segmentshift。其中segmentmask为ssize - 1,由于ssize为2的倍数。那么segmentmask就是奇数。化为
二进制就是全1,而segmentshift为32 - sshift大小。32是key值经过再hash求出来的值的二进制位。segmentmask和segmentshift是用来定位当前元素
在segment数组那个位置,和在hashentry数组的哪个位置,后面我们会详细说说怎么算的。
这一段代码就是用来确定每个segment里面的hashentry的一些参数和初始化segment数组了。第一个if是防止我们设置的初始化
容量大于最大容量。而c是用来计算每个hashentry数组的容量。由于每个hashentry数组容量也需要为2的n次方,因此这里也需要
一个cap和循环来计算一个2的n次方值,方法和上面一样。这里计算出来的cap值就是最终hashentry数组实际的大小了。
初始化就做了这些工作了。
那么我们在说说最简单的get方法。
get方法就需要用到定位我们的元素了。而定位元素就需要我们上面初始化时设置好的两个值:segmentmask和segmentshift
上面说了,并发度默认值为16,那么ssize也为16,因此segmentmask为15.由于ssize二进制往左移了4位,那么sshift就是4,
segmentshift就是32-4=28.下面我们就用segmentmask=15,segmentshift为28来说说怎么确定元素位置的。
在这里我们要说下hash值,这里的hash值不是key的hashcode值,而是经过再hash确定下来的一个hash值,目的是为了减少hash冲突。
hash值二进制为32位。
上图两个红框就是分别确定segment数组中的位置和hashentry数组中的位置。
我们可以看到确定segment数组是采用 (h >>> segmentshift) & segmentmask,其中h为再hash过的hash值。将32为的hash值往右移segmentshift位。这里我们假设移了28位。
而segmentmask为15,就是4位都为一的二进制。将高4位与segmentmask相与会等到一个小于16的值,就是当前元素再的segment位置。
确定了所属的segment后。就要确认在的hashentry位置了。通过第二个红框处,我们可以看到确定hashentry的位置没有使用上面两个值了。而是直接使用当前hashentry数组的长度减一
和hash值想与。通过两种不同的算法分别定位segment和hashenrty可以保证元素在segment数组和hashentry数组里面都散列开了。
put方法
public v put(k key, v value) { segment<k,v> s; if (value == null) throw new nullpointerexception(); int hash = hash(key); int j = (hash >>> segmentshift) & segmentmask; if ((s = (segment<k,v>)unsafe.getobject // nonvolatile; recheck (segments, (j << sshift) + sbase)) == null) // in ensuresegment s = ensuresegment(j); return s.put(key, hash, value, false); }
final v put(k key, int hash, v value, boolean onlyifabsent) { hashentry<k,v> node = trylock() ? null : scanandlockforput(key, hash, value); v oldvalue; try { hashentry<k,v>[] tab = table; int index = (tab.length - 1) & hash; hashentry<k,v> first = entryat(tab, index); for (hashentry<k,v> e = first;;) { if (e != null) { k k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldvalue = e.value; if (!onlyifabsent) { e.value = value; ++modcount; } break; } e = e.next; } else { if (node != null) node.setnext(first); else node = new hashentry<k,v>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < maximum_capacity) rehash(node); else setentryat(tab, index, node); ++modcount; count = c; oldvalue = null; break; } } } finally { unlock(); } return oldvalue; }
上面两片代码就是put一个元素的过程。由于put方法里需要对共享变量进行写入操作,因此为了安全,需要在操作共享变量时加锁。put时先定位到segment,然后在segment里及逆行擦汗如操作。
插入有两个步骤,第一步判断是否需要对segment里的hashenrty数组进行扩容。第二步是定位添加元素的位置,然后将其放在hashenrty数组里。
我们先说说扩容。
在插入元素的时候会先判断segment里面的hashenrty数组是否超过容量threshold。这个容量是我们刚开始初始化hashenrty数组时采用容量大小和负载因子计算出来的。
如果超过这个阈值(threshold)那么就会进行扩容。扩容括的时当前hashenrty而不是整个map。
如何扩容
扩容的时候会先创建一个容量是原来两个容量大小的数组,然后将原数组里的元素进行再散列后插入到新的数组里。
size方法
由于map里的元素是遍布所有hashenrty的。因此统计size的时候需要统计每个hashenrty的大小。由于是并发环境下,可能出现有线程在插入或者删除的情况。因此会出现
错误。我们能想到的就是使用size方法时把所有的segment的put,remove和clean方法都锁起来。但是这种方法时很低效的。因此concurrenthashmap采用了以下办法:
先尝试2次通过不加锁的方式来统计各个segment大小,如果统计的过程中,容器的count发生了变化,再采用加锁的方式来统计所有segment的大小。
concurrenthashmap时使用modcount变量来判断再统计的时候容器是否放生了变化。在put、remove、clean方法里操作数据前都会将辩能力modcount进行加一,那么在统计
size千后比较modcount是否发生变化,就可以知道容器大小是否发生变化了。