JDK源码分析(12)之 ConcurrentHashMap 详解
本文将主要讲述 jdk1.8 版本 的 concurrenthashmap,其内部结构和很多的哈希优化算法,都是和 jdk1.8 版本的 hashmap是一样的,所以在阅读本文之前,一定要先了解 hashmap,可以参考 hashmap 相关;另外 concurrenthashmap 中同样有红黑树,这部分可以先不看不影响整体结构把握,有兴趣的可以查看 ;
一、concurrenthashmap 结构概述
1. 整体概述
chm 的源码有 6k 多行,包含的内容多,精巧,不容易理解;建议在查看源码的时候,可以首先把握整体结构脉络,对于一些精巧的优化,哈希技巧可以先了解目的就可以了,不用深究;对整体把握比较清楚后,在逐步分析,可以比较快速的看懂;
jdk1.8 版本中的 chm,和 jdk1.7 版本的差别非常大,在查看资料的时候要注意区分,1.7 中主要是使用 segment 分段锁 来解决并发问题的;而在 1.8 中则完全没有这些稍显臃肿的结构,其结构基本和 hashmap 是一样的,都是 数组 + 链表 + 红黑树,如图所示:
其主要区别就在 chm 支持并发:
- 使用 unsafe 方法操作数组内部元素,保证可见性;(u.getobjectvolatile、u.compareandswapobject、u.putobjectvolatile);
- 在更新和移动节点的时候,直接锁住对应的哈希桶,锁粒度更小,且动态扩展;
- 针对扩容慢操作进行优化,
- 首先扩容过程的中,节点首先移动到过度表 nexttable ,所有节点移动完毕时替换散列表 table;
- 移动时先将散列表定长等分,然后逆序依次领取任务扩容,设置 sizectl 标记正在扩容;
- 移动完成一个哈希桶或者遇到空桶时,将其标记为 forwardingnode 节点,并指向 nexttable ;
- 后有其他线程在操作哈希表时,遇到 forwardingnode 节点,则先帮助扩容(继续领取分段任务),扩容完成后再继续之前的操作;
- 优化哈希表计数器,采用 longadder、striped64 类似思想;
- 以及大量的哈希算法优化和状态变量优化;
以上讲的这些不太清楚也没有关系,主要是有一个印象,大致清楚 chm 的实现方向,具体细节后面还会结合源码详细讲解;
2. 类定义和成员变量
public class concurrenthashmap<k,v> extends abstractmap<k,v> implements concurrentmap<k,v>, serializable { private static final int maximum_capacity = 1 << 30; // 最大容量 private static final int default_capacity = 16; // 默认初始化容量 private static final int default_concurrency_level = 16; // 并发级别,为兼容1.7,实际未用 private static final float load_factor = 0.75f; // 固定负载系数,n - (n >>> 2) static final int treeify_threshold = 8; // 链表超过8时,转为红黑树 static final int untreeify_threshold = 6; // 红黑树低于6时,转为链表 static final int min_treeify_capacity = 64; // 树化最小容量,容量小于64时,先扩容 private static final int min_transfer_stride = 16; // 扩容时拆分散列表,最小步长 private static int resize_stamp_bits = 16; private static final int max_resizers = (1 << (32 - resize_stamp_bits)) - 1; // 可参与扩容的最大线程 static final int ncpu = runtime.getruntime().availableprocessors(); // cpu 数 transient volatile node<k,v>[] table; // 散列表 private transient volatile node<k,v>[] nexttable; // 扩容时的过度表 private transient volatile int sizectl; // 最重要的状态变量,下面详讲 private transient volatile int transferindex; // 扩容进度指示 private transient volatile long basecount; // 计数器,基础基数 private transient volatile int cellsbusy; // 计数器,并发标记 private transient volatile countercell[] countercells; // 计数器,并发累计 public concurrenthashmap() { } public concurrenthashmap(int initialcapacity) { if (initialcapacity < 0) throw new illegalargumentexception(); int cap = ((initialcapacity >= (maximum_capacity >>> 1)) ? maximum_capacity : tablesizefor(initialcapacity + (initialcapacity >>> 1) + 1)); // 注意这里不是0.75,后面介绍 this.sizectl = cap; } public concurrenthashmap(map<? extends k, ? extends v> m) { this.sizectl = default_capacity; putall(m); } public concurrenthashmap(int initialcapacity, float loadfactor) { this(initialcapacity, loadfactor, 1); } public concurrenthashmap(int initialcapacity, float loadfactor, int concurrencylevel) { if (!(loadfactor > 0.0f) || initialcapacity < 0 || concurrencylevel <= 0) throw new illegalargumentexception(); if (initialcapacity < concurrencylevel) // use at least as many bins initialcapacity = concurrencylevel; // as estimated threads long size = (long)(1.0 + (long)initialcapacity / loadfactor); // 注意这里的初始化 int cap = (size >= (long)maximum_capacity) ? maximum_capacity : tablesizefor((int)size); this.sizectl = cap; } ... }
上面有几个重要的地方这里单独讲:
load_factor:
这里的负载系数,同 hashmap 等其他 map 的系数有明显区别:
通常的系数默认 0.75,可以由构造函数传入,当节点数 size 超过 loadfactor * capacity 时扩容;
而 cmh 的系数则固定 0.75(使用
n - (n >>> 2)
表示),构造函数传入的系数只影响初始化容量,见第5个构造函数;-
上面第二个构造函数中,
initialcapacity + (initialcapacity >>> 1) + 1)
,这里居然不是使用的默认0.75,可以看作bug,也可视作优化,见
sizectl:
sizectl 是 chm 中最重要的状态变量,其中包括很多中状态,这里先整体介绍帮助后面源码理解;
sizectl = 0 :初始值,还未指定初始容量;
-
sizectl > 0 :
- table 未初始化,表示初始化容量;
- table 已初始化,表示扩容阈值(0.75n);
sizectl = -1 :表示正在初始化;
-
sizectl < -1 :表示正在扩容,具体结构如图所示:
计算代码如下:
/* * n=64 * integer.numberofleadingzeros(n)=26 * resizestamp(64) = 0001 1010 | 1000 0000 0000 0000 = 1000 0000 0001 1010 */ static final int resizestamp(int n) { return integer.numberofleadingzeros(n) | (1 << (resize_stamp_bits - 1)); }
所以 resizestamp(64) << resize_stamp_shift) + 2
,表示扩容目标为 64,有一个线程正在扩容;
3. node 节点
static class node<k,v> implements map.entry<k,v> { // 哈希表普通节点 final int hash; final k key; volatile v val; volatile node<k,v> next; node<k,v> find(int h, object k) {} // 主要在扩容时,利用多态查询已转移节点 } static final class forwardingnode<k,v> extends node<k,v> { // 标识扩容节点 final node<k,v>[] nexttable; // 指向成员变量 concurrenthashmap.nexttable forwardingnode(node<k,v>[] tab) { super(moved, null, null, null); // hash = -1,快速确定 forwardingnode 节点 this.nexttable = tab; } node<k,v> find(int h, object k) {} } static final class treebin<k,v> extends node<k,v> { // 红黑树根节点 treebin(treenode<k,v> b) { super(treebin, null, null, null); // hash = -2,快速确定红黑树, ... } } static final class treenode<k,v> extends node<k,v> { } // 红黑树普通节点,其 hash 同 node 普通节点 > 0;
4. 哈希计算
static final int moved = -1; // hash for forwarding nodes static final int treebin = -2; // hash for roots of trees static final int reserved = -3; // hash for transient reservations static final int hash_bits = 0x7fffffff; // usable bits of normal node hash // 让高位16位,参与哈希桶定位运算的同时,保证 hash 为正 static final int spread(int h) { return (h ^ (h >>> 16)) & hash_bits; }
除此之外还有,
- tablesizefor : 将容量转为大于n,且最小的2的幂;
- 除留余数法 :
hash % length = hash & (length-1)
; - 扩容后哈希桶定位:
(e.hash & oldcap)
,0 - 位置不变,1 - 原来的位置 + oldcap;
以上这些哈希优化的具体原理,都在之前的博客讲过了,就不在重复了,hashmap 相关;
5. 哈希桶可见性
我们都知道一个数组即使声明为 volatile
,也只能保证这个数组引用本身的可见性,其内部元素的可见性是无法保证的,如果每次都加锁,则效率必然大大降低,在 chm 中则使用 unsafe
方法来保证:
static final <k,v> node<k,v> tabat(node<k,v>[] tab, int i) { return (node<k,v>)u.getobjectvolatile(tab, ((long)i << ashift) + abase); } static final <k,v> boolean castabat(node<k,v>[] tab, int i, node<k,v> c, node<k,v> v) { return u.compareandswapobject(tab, ((long)i << ashift) + abase, c, v); } static final <k,v> void settabat(node<k,v>[] tab, int i, node<k,v> v) { u.putobjectvolatile(tab, ((long)i << ashift) + abase, v); }
二、源码解析
1. inittable 方法
private final node<k,v>[] inittable() { node<k,v>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizectl) < 0) thread.yield(); // 有其他线程在初始化 else if (u.compareandswapint(this, sizectl, sc, -1)) { // 设置状态 -1 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : default_capacity; // 注意此时的 sizectl 表示初始容量,完毕后表示扩容阈值 @suppresswarnings("unchecked") node<k,v>[] nt = (node<k,v>[])new node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); // 同 0.75n } } finally { sizectl = sc; // 注意这里没有 cas 更新,这就是状态变量的高明了,因为前面设置了 -1,此时这里没有竞争 } break; } } return tab; }
2. get 方法
get 方法可能看代码不是很长,但是他却能 保证无锁状态下的内存一致性 ,他的每一句代码都要仔细理解,多设想一下如果发生竞争会怎样,如此才能有所得;
public v get(object key) { node<k,v>[] tab; node<k,v> e, p; int n, eh; k ek; int h = spread(key.hashcode()); // 计算 hash if ((tab = table) != null && (n = tab.length) > 0 && // 确保 table 已经初始化 // 确保对应的哈希桶不为空,注意这里是 volatile 语义获取;因为扩容的时候,是完全拷贝,所以只要不为空,则链表必然完整 (e = tabat(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // hash < 0,则必然在扩容,原来位置的节点可能全部移动到 i + oldcap 位置,所以利用多态到 nexttable 中查找 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { // 遍历链表 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
3. putval 方法
注意 chm 的 key 和 value 都不能为空
final v putval(k key, v value, boolean onlyifabsent) { if (key == null || value == null) throw new nullpointerexception(); int hash = spread(key.hashcode()); // hash 计算 int bincount = 0; // 状态变量,主要表示查找链表节点数,最后判断是否转为红黑树 for (node<k,v>[] tab = table;;) { node<k,v> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = inittable(); // 初始化 else if ((f = tabat(tab, i = (n - 1) & hash)) == null) { // cas 获取哈希桶 if (castabat(tab, i, null, new node<k,v>(hash, key, value, null))) // cas 更新,失败时继续循环更新 break; // no lock when adding to empty bin } else if ((fh = f.hash) == moved) tab = helptransfer(tab, f); // 正在扩容的时候,先帮助扩容 else { v oldval = null; synchronized (f) { // 注意这里只锁定了一个哈希桶,所以比 1.7 中的 segment 分段锁 粒度更低 if (tabat(tab, i) == f) { // 确认该哈希桶是否已经移动 if (fh >= 0) { // hash >=0 则必然是普通节点,直接遍历链表即可 bincount = 1; for (node<k,v> e = f;; ++bincount) { k ek; if(e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 查找成功 oldval = e.val; if (!onlyifabsent) e.val = value; break; } node<k,v> pred = e; if ((e = e.next) == null) { // 查找失败时,直接在末尾添加新节点 pred.next = new node<k,v>(hash, key, value, null); break; } } } else if (f instanceof treebin) { // 树根节点 node<k,v> p; bincount = 2; if ((p = ((treebin<k,v>)f).puttreeval(hash, key, value)) != null) { // 红黑树查找 oldval = p.val; if (!onlyifabsent) p.val = value; } } } } if (bincount != 0) { if (bincount >= treeify_threshold) treeifybin(tab, i); // 如果链表长度大于8,转为红黑树 if (oldval != null) return oldval; break; } } } addcount(1l, bincount); // 计数加一,注意这里使用的是计数器,普通的 atomic 变量仍然可能称为性能瓶颈; return null; }
其具体流程如图所示:
4. 扩容
扩容操作一直都是比较慢的操作,而 chm 中巧妙的利用任务划分,使得多个线程可能同时参与扩容;另外扩容条件也有两个:
- 有链表长度超过 8,但是容量小于 64 的时候,发生扩容;
- 节点数超过阈值的时候,发生扩容;
其扩容的过程可描述为:
- 首先扩容过程的中,节点首先移动到过度表 nexttable ,所有节点移动完毕时替换散列表 table;
- 移动时先将散列表定长等分,然后逆序依次领取任务扩容,设置 sizectl 标记正在扩容;
- 移动完成一个哈希桶或者遇到空桶时,将其标记为 forwardingnode 节点,并指向 nexttable ;
- 后有其他线程在操作哈希表时,遇到 forwardingnode 节点,则先帮助扩容(继续领取分段任务),扩容完成后再继续之前的操作;
图形化表示如下:
源码分析如下:
private final void transfer(node<k,v>[] tab, node<k,v>[] nexttab) { int n = tab.length, stride; if ((stride = (ncpu > 1) ? (n >>> 3) / ncpu : n) < min_transfer_stride) stride = min_transfer_stride; // 根据 cpu 数量计算任务步长 if (nexttab == null) { // 初始化 nexttab try { @suppresswarnings("unchecked") node<k,v>[] nt = (node<k,v>[])new node<?,?>[n << 1]; // 扩容一倍 nexttab = nt; } catch (throwable ex) { sizectl = integer.max_value; // 发生 oom 时,不再扩容 return; } nexttable = nexttab; transferindex = n; } int nextn = nexttab.length; forwardingnode<k,v> fwd = new forwardingnode<k,v>(nexttab); // 标记空桶,或已经转移完毕的桶 boolean advance = true; boolean finishing = false; // to ensure sweep before committing nexttab for (int i = 0, bound = 0;;) { // 逆向遍历扩容 node<k,v> f; int fh; while (advance) { // 向前获取哈希桶 int nextindex, nextbound; if (--i >= bound || finishing) // 已经取到哈希桶,或已完成时退出 advance = false; else if ((nextindex = transferindex) <= 0) { // 遍历到达头节点,已经没有待迁移的桶,线程准备退出 i = -1; advance = false; } else if (u.compareandswapint (this, transferindex, nextindex, nextbound = (nextindex > stride ? nextindex - stride : 0))) { // 当前任务完成,领取下一批哈希桶 bound = nextbound; i = nextindex - 1; // 索引指向下一批哈希桶 advance = false; } } // i < 0 :表示扩容结束,已经没有待移动的哈希桶 // i >= n :扩容结束,再次检查确认 // i + n >= nextn : 在使用 nexttable 替换 table 时,有线程进入扩容就会出现 if (i < 0 || i >= n || i + n >= nextn) { // 完成扩容准备退出 int sc; if (finishing) { // 两次检查,只有最后一个扩容线程退出时,才更新变量 nexttable = null; table = nexttab; sizectl = (n << 1) - (n >>> 1); // 0.75*2*n return; } if (u.compareandswapint(this, sizectl, sc = sizectl, sc - 1)) { // 扩容线程减一 if ((sc - 2) != resizestamp(n) << resize_stamp_shift) return; // 不是最后一个线程,直接退出 finishing = advance = true; // 最后一个线程,再次检查 i = n; // recheck before commit } } else if ((f = tabat(tab, i)) == null) // 当前节点为空,直接标记为 forwardingnode,然后继续获取下一个桶 advance = castabat(tab, i, null, fwd); // 之前的线程已经完成该桶的移动,直接跳过,正常情况下自己的任务区间,不会出现 forwardingnode 节点, else if ((fh = f.hash) == moved) // 此处为极端条件下的健壮性检查 advance = true; // already processed // 开始处理链表 else { // 注意在 get 的时候,可以无锁获取,是因为扩容是全拷贝节点,完成后最后在更新哈希桶 // 而在 put 的时候,是直接将节点加入尾部,获取修改其中的值,此时如果允许 put 操作,最后就会发生脏读, // 所以 put 和 transfer,需要竞争同一把锁,也就是对应的哈希桶,以保证内存一致性效果 synchronized (f) { if (tabat(tab, i) == f) { // 确认锁定的是同一个桶 node<k,v> ln, hn; if (fh >= 0) { // 正常节点 int runbit = fh & n; // hash & n,判断扩容后的索引 node<k,v> lastrun = f; // 此处找到链表最后扩容后处于同一位置的连续节点,这样最后一节就不用再一次复制了 for (node<k,v> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runbit) { runbit = b; lastrun = p; } } if (runbit == 0) { ln = lastrun; hn = null; } else { hn = lastrun; ln = null; } // 依次将链表拆分成,lo、hi 两条链表,即位置不变的链表,和位置 + oldcap 的链表 // 注意最后一节链表没有new,而是直接使用原来的节点 // 同时链表的顺序也被打乱了,lastrun 到最后为正序,前面一节为逆序 for (node<k,v> p = f; p != lastrun; p = p.next) { int ph = p.hash; k pk = p.key; v pv = p.val; if ((ph & n) == 0) ln = new node<k,v>(ph, pk, pv, ln); else hn = new node<k,v>(ph, pk, pv, hn); } settabat(nexttab, i, ln); // 插入 lo 链表 settabat(nexttab, i + n, hn); // 插入 hi 链表 settabat(tab, i, fwd); // 哈希桶移动完成,标记为 forwardingnode 节点 advance = true; // 继续获取下一个桶 } else if (f instanceof treebin) { // 拆分红黑树 treebin<k,v> t = (treebin<k,v>)f; treenode<k,v> lo = null, lotail = null; // 为避免最后在反向遍历,先留头结点的引用, treenode<k,v> hi = null, hitail = null; // 因为顺序的链表,可以加速红黑树构造 int lc = 0, hc = 0; // 同样记录 lo,hi 链表的长度 for (node<k,v> e = t.first; e != null; e = e.next) { // 中序遍历红黑树 int h = e.hash; treenode<k,v> p = new treenode<k,v>(h, e.key, e.val, null, null); // 构造红黑树节点 if ((h & n) == 0) { if ((p.prev = lotail) == null) lo = p; else lotail.next = p; lotail = p; ++lc; } else { if ((p.prev = hitail) == null) hi = p; else hitail.next = p; hitail = p; ++hc; } } // 判断是否需要将其转化为红黑树,同时如果只有一条链,那么就可以不用在构造 ln = (lc <= untreeify_threshold) ? untreeify(lo) : (hc != 0) ? new treebin<k,v>(lo) : t; hn = (hc <= untreeify_threshold) ? untreeify(hi) : (lc != 0) ? new treebin<k,v>(hi) : t; settabat(nexttab, i, ln); settabat(nexttab, i + n, hn); settabat(tab, i, fwd); advance = true; } } } } } }
还有其他相关方法不是很复杂,就不详细讲了,比如 trypresize,helptransfer,addcount
5. 计数器
当获取 map.size 的时候,如果使用 atomic 变量,很容易导致过度竞争,产生性能瓶颈,所以 chm 中使用了,计数器的方式:
public int size() { long n = sumcount(); return ((n < 0l) ? 0 : (n > (long)integer.max_value) ? integer.max_value : (int)n); }
private transient volatile countercell[] countercells; // 计数器 @sun.misc.contended static final class countercell { // @sun.misc.contended 避免伪缓存 volatile long value; countercell(long x) { value = x; } } final long sumcount() { countercell[] as = countercells; countercell a; long sum = basecount; if (as != null) { for (int i = 0; i < as.length; ++i) { // 累计计数 if ((a = as[i]) != null) sum += a.value; } } return sum; }
具体细节还比较多,之后在单独开一篇博客详细讲解;
总结
- 首先 jdk1.8 的 chm,没有使用 segment 分段锁,而是直接锁定单个哈希桶
- 对数组中的哈希桶使用 cas 操作,保证其可见性
- 对扩容是用,任务拆分,多线程同时扩容的方式,加速扩容
- 对 size 使用计数器思想
- chm 中对状态变量的应用,使得很多操作都得以无所化进行
上一篇: 并发编程之多线程基础篇及面试
下一篇: java数据结构和算法02(栈)