别再问我ConcurrentHashMap了
以下concurrenthashmap以jdk8中为例进行分析,concurrenthashmap是一个线程安全、基于数组+链表(或者红黑树)的kv容器,主要特性如下:
- 线程安全,数组中单个slot元素个数超过8个时会将链表结构转换成红黑树,注意树节点之间还是有next指针的;
- 当元素个数超过n(
n = tab.length - tab.length>>>2,达到0.75阈值时
)个时触发rehash,成倍扩容; - 当线程扩容时,其他线程put数据时会加入帮助扩容,加快扩容速度;
- put时对单个slot头节点元素进行synchronized加锁,concurrenthashmap中的加锁粒度是针对slot节点的,rehash过程中加锁粒度也是如此;
- get时一般是不加锁。如果slot元素为链表,直接读取返回即可;如果slot元素为红黑树,并且此时该树在进行再平衡或者节点删除操作,读取操作会按照树节点的next指针进行读取,也是不加锁的(因为红黑树中节点也是有链表串起来的);如果该树并没有进行平衡或者节点删除操作,那么会用cas加读锁,防止读取过程中其他线程该树进行更新操作(主要是防止破坏红黑树节点之间的链表特性),破坏“读视图”。
concurrenthashmap默认数组长度16,map最大容量为maximum_capacity = 1 << 30
。创建concurrenthashmap并不是涉及数组的初始化,数组初始化是在第一次put数据才进行的。(注意:jdk1.8中舍弃了之前的分段锁技术,改用cas+synchronized机制)
node结构
concurrenthashmap中一个重要的类就是node,该类存储键值对,所有插入concurrenthashmap的数据都包装在这里面。它与hashmap中的定义很相似,但是有一些差别是concurrenthashmap的value和next属性都是volatile的(保证了get数据时直接返回即可,volatile保证了更新的可见性
),且不允许调用setvalue方法直接改变node的value域,增加了find方法辅助map.get()方法,可在get方法返回的结果中更改对应的value值。
1 static class node<k,v> implements map.entry<k,v> { 2 final int hash; 3 final k key; 4 volatile v val; 5 volatile node<k,v> next; 6 }
concurrenthashmap定义了三个原子操作,用于对数组指定位置的节点进行操作。正是这些原子操作保证了concurrenthashmap的线程安全。
1 //获得在i位置上的node节点 2 static final <k,v> node<k,v> tabat(node<k,v>[] tab, int i) { 3 return (node<k,v>)u.getobjectvolatile(tab, ((long)i << ashift) + abase); 4 } 5 //利用cas算法设置i位置上的node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少 6 //在cas算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改 7 //因此当前线程中的值并不是最新的值,这种修改可能会覆盖掉其他线程的修改结果 8 static final <k,v> boolean castabat(node<k,v>[] tab, int i, 9 node<k,v> c, node<k,v> v) { 10 return u.compareandswapobject(tab, ((long)i << ashift) + abase, c, v); 11 } 12 //利用volatile方法设置节点位置的值 13 static final <k,v> void settabat(node<k,v>[] tab, int i, node<k,v> v) { 14 u.putobjectvolatile(tab, ((long)i << ashift) + abase, v);
下面就按照concurrenthashmap的 put / get / remove 来分析下其实现原理,中间涉及rehash、红黑树转换等。
put流程
put操作流程如下:
- 首先根据key的hashcode计算hash,然后根据hash计算应该在数组中存储位置,如果数据为null,新建数组;
- 然后通过tabat(&操作)直接获取对应slot。如果slot为null,则新建kv节点(node类型)放到slot;
- 如果当前slot节点的hash值等于moved(等于-1),表示其类型为forwardingnode,证明其他线程在进行rehash扩容操作,当前线程也会帮助一起进行扩容操作;
- 然后对slot节点进行synchronized加锁,如果slot节点hash值大于等于0,表示当前slot对应元素为链表结构,遍历当前链表,如果key存在则更新,否则添加到链表尾部;如果slot节点类型为treebin(其hash值为-2),表示slot对应元素为红黑树,则在红黑树中进行更新节点或者添加节点操作,注意,最后如果树不平衡会进行树的再平衡操作,此时对树root节点加cas写锁。
- 最后,如果新添加了节点,会统计map size值;如果当前map数量超过了阈值(
n = tab.length - tab.length>>>2
)会触发rehash扩容,按照成倍扩容。
注意:因为往map中添加元素和增加元素统计值是两个步骤,不是原子的,所以获取map.size()时可能不是准确值。
对key的hashcode计算hash
存到map中的key并不是直接按照hashcode计算的,因为hashcode有可能为负的,并且不合理的hashcode实现可能导致较多冲突,因此concurrenthashmap中会对key对hashcode进行hash操作:
1 // int hash = spread(key.hashcode()); 2 // hash_bits = 0x7fffffff 符号位设置为0 3 static final int spread(int h) { 4 return (h ^ (h >>> 16)) & hash_bits; 5 }
红黑树节点比较
既然使用到了红黑树,这就涉及到节点的大小比较问题(节点数据包含key、value信息)。进行节点的大小比较时,首先是比较节点的hash值,注意hash值不是hashcode,因为hash值是对象hashcode与自己无符号右移16位进行异或后的值。如果节点的hash值相等,判断节点的key对象是否实现了comparable接口,实现的话就是用comparable逻辑比较节点之间的大小。如果key对象未实现comparable接口,则调用tiebreakorder方法进行判断:
1 // dir = tiebreakorder(k, pk); k/pk,带比较两个节点,命名还是挺有意思的 2 static int tiebreakorder(object a, object b) { 3 int d; 4 if (a == null || b == null || 5 (d = a.getclass().getname(). 6 compareto(b.getclass().getname())) == 0) 7 d = (system.identityhashcode(a) <= system.identityhashcode(b) ? 8 -1 : 1); 9 return d; 10 }
这里调用了system.identityhashcode,将由默认方法hashcode()返回,如果对象的hashcode()被重写,则system.identityhashcode和hashcode()的返回值就不一样了。
put源码
1 final v putval(k key, v value, boolean onlyifabsent) { 2 // key value非空 3 if (key == null || value == null) throw new nullpointerexception(); 4 int hash = spread(key.hashcode()); 5 // slot对应元素个数,链表转换成红黑树时用 6 int bincount = 0; 7 for (node<k,v>[] tab = table;;) { 8 node<k,v> f; int n, i, fh; 9 if (tab == null || (n = tab.length) == 0) 10 tab = inittable(); 11 else if ((f = tabat(tab, i = (n - 1) & hash)) == null) { 12 if (castabat(tab, i, null, 13 new node<k,v>(hash, key, value, null))) 14 break; // no lock when adding to empty bin 15 } 16 else if ((fh = f.hash) == moved) 17 // 在rehash扩容,帮助扩容,扩容完成之后才能继续进行put操作 18 tab = helptransfer(tab, f); 19 else { 20 v oldval = null; 21 synchronized (f) { // 加锁 22 if (tabat(tab, i) == f) { // 可能已经被更新需要再次进行判断 23 if (fh >= 0) { // 节点更新或插入 24 bincount = 1; 25 for (node<k,v> e = f;; ++bincount) { 26 k ek; 27 if (e.hash == hash && 28 ((ek = e.key) == key || 29 (ek != null && key.equals(ek)))) { 30 oldval = e.val; 31 if (!onlyifabsent) 32 e.val = value; 33 break; 34 } 35 node<k,v> pred = e; 36 if ((e = e.next) == null) { 37 pred.next = new node<k,v>(hash, key, 38 value, null); 39 break; 40 } 41 } 42 } 43 else if (f instanceof treebin) { // 红黑树更新或插入 44 node<k,v> p; 45 bincount = 2; 46 if ((p = ((treebin<k,v>)f).puttreeval(hash, key, 47 value)) != null) { 48 oldval = p.val; 49 if (!onlyifabsent) 50 p.val = value; 51 } 52 } 53 } 54 } 55 if (bincount != 0) { 56 if (bincount >= treeify_threshold) 57 treeifybin(tab, i); 58 if (oldval != null) 59 return oldval; 60 break; 61 } 62 } 63 } 64 // 增加统计值,可能触发rehash扩容 65 addcount(1l, bincount); 66 return null; 67 } 68 69 private final void addcount(long x, int check) { 70 countercell[] as; long b, s; 71 /** 72 * countercells非空表示当前put并发较大,按照countercells进行分线程统计 73 * 参考longaddr思想 74 */ 75 if ((as = countercells) != null || 76 !u.compareandswaplong(this, basecount, b = basecount, s = b + x)) { 77 countercell a; long v; int m; 78 boolean uncontended = true; 79 if (as == null || (m = as.length - 1) < 0 || 80 (a = as[threadlocalrandom.getprobe() & m]) == null || 81 !(uncontended = 82 u.compareandswaplong(a, cellvalue, v = a.value, v + x))) { 83 fulladdcount(x, uncontended); 84 return; 85 } 86 if (check <= 1) 87 return; 88 s = sumcount(); 89 } 90 if (check >= 0) { 91 node<k,v>[] tab, nt; int n, sc; 92 // 大于等于阈值数时进行扩容操作 93 while (s >= (long)(sc = sizectl) && (tab = table) != null && 94 (n = tab.length) < maximum_capacity) { 95 int rs = resizestamp(n); 96 if (sc < 0) { 97 if ((sc >>> resize_stamp_shift) != rs || sc == rs + 1 || 98 sc == rs + max_resizers || (nt = nexttable) == null || 99 transferindex <= 0) 100 break; 101 if (u.compareandswapint(this, sizectl, sc, sc + 1)) 102 transfer(tab, nt); 103 } 104 else if (u.compareandswapint(this, sizectl, sc, 105 (rs << resize_stamp_shift) + 2)) 106 transfer(tab, null); 107 s = sumcount(); 108 } 109 } 110 }
get流程
get方法比较简单,给定一个key来确定value的时候,必须满足两个条件hash值相同同时 key相同(equals) ,对于节点可能在链表或树上的情况,需要分别去查找。
get时一般是不加锁(node节点中value数据类型是volatile的,保证了内存可见性)。如果slot元素为链表,直接读取返回即可;如果slot元素为红黑树,并且此时该树在进行再平衡或者节点删除操作,读取操作会按照树节点的next指针进行读取,也是不加锁的;如果该树并没有进行平衡或者节点删除操作,那么会用cas加读锁,防止读取过程中其他线程该树进行更新操作,破坏“读视图”。
remove流程
remove流程就是根据key找到对应节点,将该节点从链表(更改节点前后关系)或者红黑树移除的过程,注意,从红黑树中删除元素后,不会将红黑树转换为列表的,只能在put元素时列表可能有转换红黑树操作,不会有反向操作。
注意:hashmap有自动rehash扩容机制,但是当元素remove之后并没有自动缩容机制,如果数组经过多次扩容变得很大,并且当前元素较少,请将这些元素转移到一个新的hashmap中。
rehash流程
rehash时是成倍扩容(老table和新tablenew),对于table中i位置的所有元素,扩容后会被分配到i和i+table.length这两个位置中。rehash主要的流程transfer方法中,具体不再展开。
推荐阅读:
更多文档可扫描以下二维码:
上一篇: C++类的继承
下一篇: 服务端高并发分布式架构演进之路