死磕 java集合之ConcurrentHashMap源码分析(一)
开篇问题
(1)concurrenthashmap与hashmap的数据结构是否一样?
(2)hashmap在多线程环境下何时会出现并发安全问题?
(3)concurrenthashmap是怎么解决并发安全问题的?
(4)concurrenthashmap使用了哪些锁?
(5)concurrenthashmap的扩容是怎么进行的?
(6)concurrenthashmap是否是强一致性的?
(7)concurrenthashmap不能解决哪些问题?
(8)concurrenthashmap中有哪些不常见的技术值得学习?
简介
concurrenthashmap是hashmap的线程安全版本,内部也是使用(数组 + 链表 + 红黑树)的结构来存储元素。
相比于同样线程安全的hashtable来说,效率等各方面都有极大地提高。
各种锁简介
这里先简单介绍一下各种锁,以便下文讲到相关概念时能有个印象。
(1)synchronized
java中的关键字,内部实现为监视器锁,主要是通过对象监视器在对象头中的字段来表明的。
synchronized从旧版本到现在已经做了很多优化了,在运行时会有三种存在方式:偏向锁,轻量级锁,重量级锁。
偏向锁,是指一段同步代码一直被一个线程访问,那么这个线程会自动获取锁,降低获取锁的代价。
轻量级锁,是指当锁是偏向锁时,被另一个线程所访问,偏向锁会升级为轻量级锁,这个线程会通过自旋的方式尝试获取锁,不会阻塞,提高性能。
重量级锁,是指当锁是轻量级锁时,当自旋的线程自旋了一定的次数后,还没有获取到锁,就会进入阻塞状态,该锁升级为重量级锁,重量级锁会使其他线程阻塞,性能降低。
(2)cas
cas,compare and swap,它是一种乐观锁,认为对于同一个数据的并发操作不一定会发生修改,在更新数据的时候,尝试去更新数据,如果失败就不断尝试。
(3)volatile(非锁)
java中的关键字,当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。(这里牵涉到java内存模型的知识,感兴趣的同学可以自己查查相关资料)
volatile只保证可见性,不保证原子性,比如 volatile修改的变量 i,针对i++操作,不保证每次结果都正确,因为i++操作是两步操作,相当于 i = i +1,先读取,再加1,这种情况volatile是无法保证的。
(4)自旋锁
自旋锁,是指尝试获取锁的线程不会阻塞,而是循环的方式不断尝试,这样的好处是减少线程的上下文切换带来的开锁,提高性能,缺点是循环会消耗cpu。
(5)分段锁
分段锁,是一种锁的设计思路,它细化了锁的粒度,主要运用在concurrenthashmap中,实现高效的并发操作,当操作不需要更新整个数组时,就只锁数组中的一项就可以了。
(5)reentrantlock
可重入锁,是指一个线程获取锁之后再尝试获取锁时会自动获取锁,可重入锁的优点是避免死锁。
其实,synchronized也是可重入锁。
源码分析
构造方法
public concurrenthashmap() { } public concurrenthashmap(int initialcapacity) { if (initialcapacity < 0) throw new illegalargumentexception(); int cap = ((initialcapacity >= (maximum_capacity >>> 1)) ? maximum_capacity : tablesizefor(initialcapacity + (initialcapacity >>> 1) + 1)); this.sizectl = cap; } public concurrenthashmap(map<? extends k, ? extends v> m) { this.sizectl = default_capacity; putall(m); } public concurrenthashmap(int initialcapacity, float loadfactor) { this(initialcapacity, loadfactor, 1); } public concurrenthashmap(int initialcapacity, float loadfactor, int concurrencylevel) { if (!(loadfactor > 0.0f) || initialcapacity < 0 || concurrencylevel <= 0) throw new illegalargumentexception(); if (initialcapacity < concurrencylevel) // use at least as many bins initialcapacity = concurrencylevel; // as estimated threads long size = (long)(1.0 + (long)initialcapacity / loadfactor); int cap = (size >= (long)maximum_capacity) ? maximum_capacity : tablesizefor((int)size); this.sizectl = cap; }
构造方法与hashmap对比可以发现,没有了hashmap中的threshold和loadfactor,而是改用了sizectl来控制,而且只存储了容量在里面,那么它是怎么用的呢?官方给出的解释如下:
(1)-1,表示有线程正在进行初始化操作
(2)-(1 + nthreads),表示有n个线程正在一起扩容
(3)0,默认值,后续在真正初始化的时候使用默认容量
(4)> 0,初始化或扩容完成后下一次的扩容门槛
至于,官方这个解释对不对我们后面再讨论。
添加元素
public v put(k key, v value) { return putval(key, value, false); } final v putval(k key, v value, boolean onlyifabsent) { // key和value都不能为null if (key == null || value == null) throw new nullpointerexception(); // 计算hash值 int hash = spread(key.hashcode()); // 要插入的元素所在桶的元素个数 int bincount = 0; // 死循环,结合cas使用(如果cas失败,则会重新取整个桶进行下面的流程) for (node<k,v>[] tab = table;;) { node<k,v> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) // 如果桶未初始化或者桶个数为0,则初始化桶 tab = inittable(); else if ((f = tabat(tab, i = (n - 1) & hash)) == null) { // 如果要插入的元素所在的桶还没有元素,则把这个元素插入到这个桶中 if (castabat(tab, i, null, new node<k,v>(hash, key, value, null))) // 如果使用cas插入元素时,发现已经有元素了,则进入下一次循环,重新操作 // 如果使用cas插入元素成功,则break跳出循环,流程结束 break; // no lock when adding to empty bin } else if ((fh = f.hash) == moved) // 如果要插入的元素所在的桶的第一个元素的hash是moved,则当前线程帮忙一起迁移元素 tab = helptransfer(tab, f); else { // 如果这个桶不为空且不在迁移元素,则锁住这个桶(分段锁) // 并查找要插入的元素是否在这个桶中 // 存在,则替换值(onlyifabsent=false) // 不存在,则插入到链表结尾或插入树中 v oldval = null; synchronized (f) { // 再次检测第一个元素是否有变化,如果有变化则进入下一次循环,从头来过 if (tabat(tab, i) == f) { // 如果第一个元素的hash值大于等于0(说明不是在迁移,也不是树) // 那就是桶中的元素使用的是链表方式存储 if (fh >= 0) { // 桶中元素个数赋值为1 bincount = 1; // 遍历整个桶,每次结束bincount加1 for (node<k,v> e = f;; ++bincount) { k ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 如果找到了这个元素,则赋值了新值(onlyifabsent=false) // 并退出循环 oldval = e.val; if (!onlyifabsent) e.val = value; break; } node<k,v> pred = e; if ((e = e.next) == null) { // 如果到链表尾部还没有找到元素 // 就把它插入到链表结尾并退出循环 pred.next = new node<k,v>(hash, key, value, null); break; } } } else if (f instanceof treebin) { // 如果第一个元素是树节点 node<k,v> p; // 桶中元素个数赋值为2 bincount = 2; // 调用红黑树的插入方法插入元素 // 如果成功插入则返回null // 否则返回寻找到的节点 if ((p = ((treebin<k,v>)f).puttreeval(hash, key, value)) != null) { // 如果找到了这个元素,则赋值了新值(onlyifabsent=false) // 并退出循环 oldval = p.val; if (!onlyifabsent) p.val = value; } } } } // 如果bincount不为0,说明成功插入了元素或者寻找到了元素 if (bincount != 0) { // 如果链表元素个数达到了8,则尝试树化 // 因为上面把元素插入到树中时,bincount只赋值了2,并没有计算整个树中元素的个数 // 所以不会重复树化 if (bincount >= treeify_threshold) treeifybin(tab, i); // 如果要插入的元素已经存在,则返回旧值 if (oldval != null) return oldval; // 退出外层大循环,流程结束 break; } } } // 成功插入元素,元素个数加1(是否要扩容在这个里面) addcount(1l, bincount); // 成功插入元素返回null return null; }
整体流程跟hashmap比较类似,大致是以下几步:
(1)如果桶数组未初始化,则初始化;
(2)如果待插入的元素所在的桶为空,则尝试把此元素直接插入到桶的第一个位置;
(3)如果正在扩容,则当前线程一起加入到扩容的过程中;
(4)如果待插入的元素所在的桶不为空且不在迁移元素,则锁住这个桶(分段锁);
(5)如果当前桶中元素以链表方式存储,则在链表中寻找该元素或者插入元素;
(6)如果当前桶中元素以红黑树方式存储,则在红黑树中寻找该元素或者插入元素;
(7)如果元素存在,则返回旧值;
(8)如果元素不存在,整个map的元素个数加1,并检查是否需要扩容;
添加元素操作中使用的锁主要有(自旋锁 + cas + synchronized + 分段锁)。
为什么使用synchronized而不是reentrantlock?
因为synchronized已经得到了极大地优化,在特定情况下并不比reentrantlock差。
未完待续~~
上一篇: 乐字节-Java8新特性之函数式接口
下一篇: SpringBoot跨域问题
推荐阅读
-
死磕 java同步系列之CyclicBarrier源码解析——有图有真相
-
死磕 java同步系列之ReentrantLock源码解析(二)——条件锁
-
死磕 java集合之ArrayList源码分析
-
死磕 java集合之LinkedHashMap源码分析
-
死磕 java同步系列之ReentrantLock源码解析(一)——公平锁、非公平锁
-
死磕 java同步系列之Semaphore源码解析
-
死磕 java集合之TreeMap源码分析(二)- 内含红黑树分析全过程
-
死磕 java集合之TreeMap源码分析(三)- 内含红黑树分析全过程
-
死磕 java集合之ConcurrentHashMap源码分析(一)
-
JAVA核心知识之ConcurrentHashMap源码分析