欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java并发编程—ConcurrentHashMap源码学习

程序员文章站 2022-03-26 16:38:12
1.JDK1.7版本1.1 设计思路ConcurrentHashMap是conccurrent家族中的一个类,由于它可以高效地支持并发操作,以及被广泛使用,经典的开源框架Spring的底层数据结构就是使用ConcurrentHashMap实现的,它采用了分段锁的设计,只有在一个分段中存在竞争关系,不同分段锁之间没有竞争,提高了并发环境下的处理能力,降低了锁的粒度。相比于HashMap,在高并发的环境下会导致出现环状,死锁的产生;而HashTable虽然保证了并发安全,但是它在实现每个方法的时候都加上了锁...



1.JDK1.7版本

1.1 设计思路

ConcurrentHashMap是conccurrent家族中的一个类,由于它可以高效地支持并发操作,以及被广泛使用,经典的开源框架Spring的底层数据结构就是使用ConcurrentHashMap实现的,它采用了分段锁的设计,只有在一个分段中存在竞争关系,不同分段锁之间没有竞争,提高了并发环境下的处理能力,降低了锁的粒度。相比于HashMap,在高并发的环境下会导致出现环状,死锁的产生;而HashTable虽然保证了并发安全,但是它在实现每个方法的时候都加上了锁,会加剧锁的冲突,导致性能降低。

ConcurrentHashMap中的分段锁称为Segment,是一个可重入锁(继承了ReentrantLock),内部拥有一个Entry数组,数组中的每个元素又是一个链表,HashEntry相比于HashMap中的Entry有所不同:HashEntry中的value以及next都被volatile修饰,这样在多线程读写过程中能够保持他们的可见性

static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; 

1.2 并发度(Concurrency Level)

并发度可以理解为ConcurrentHashMap中的分段锁个数,即Segment[]的数组长度。ConcurrentHashMap默认的并发度为16,但用户也可以在构造函数中设置并发度。当用户设置并发度时,ConcurrentHashMap会使用大于等于该值的最小2幂指数作为实际并发度(假如用户设置并发度为17,实际并发度则为32)。运行时通过将key的高n位(n = 32 – segmentShift)和并发度减1(segmentMask)做位与运算定位到所在的Segment。segmentShift与segmentMask都是在构造过程中根据concurrency level被相应的计算出来。

如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降

1.3 创建分段锁

Segments分段锁的初始化采用的是懒汉式加载模式,只有当每次put之前会检查key对应的Segment是否为null,如果是则调用ensureSegment()以确保对应的Segment被创建。

1.4 put/putlfabsent/putall

在真正申请锁之前,put方法会通过tryLock()方法尝试获得锁,在尝试获得锁的过程中会对对应hashcode的链表进行遍历,如果遍历完毕仍然找不到与key相同的HashEntry节点,则为后续的put操作提前创建一个HashEntry。当tryLock一定次数后仍无法获得锁,则通过lock申请锁


2.JDK1.8版本

1.基本结构

Java并发编程—ConcurrentHashMap源码学习

ConcurrentHashMap在1.8中的实现有几个变化:

  • 结构:取消分段锁Segment设计,改用数组+链表/红黑树的实现
  • 同步机制:从分段锁继承ReentrantLock改为CAS+Synchronized并发
  • Put操作
  • size实现

2.几个重要属性

1.table

默认为null,初始化是在第一次插入元素时进行,默认是大小为16的数组,用来存储node节点,总共存放三种节点:

  • node:普通结点
  • treebin:包装红黑树结构的结点
  • forwardingNode:扩容时用到的结点类型,并发扩容实现的关键之一

2.sizeCtl

默认为0,用来控制table的初始化和扩容操作

当前未初始化: = 0 //未指定初始容量 > 0 //由指定的初始容量计算而来,再找最近的2的幂次方。 //比如传入6,计算公式为6+6/2+1=10,最近的2的幂次方为16,所以sizeCtl就为16。 初始化中: = -1 //table正在初始化 = -N //N是int类型,分为两部分,高15位是指定容量标识,低16位表示 //并行扩容线程数+1,具体在resizeStamp函数介绍。 初始化完成: =table.length * 0.75 //扩容阈值调为table容量大小的0.75倍 

3.nextCtl

扩容时存放数据的变量,扩容后置null

3.构造函数

 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { //concurrencyLevel 默认并发级别,也就是table[]的默认大小 if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // 初始化容量至少要为concurrencyLevel initialCapacity = concurrencyLevel; long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; } 

在创建ConcurrentHashMap时没有初始化table[]数组,只对容量和并发等级赋值

4.put操作

 public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); //@1,讲解见下面小标题。 //i处结点数量,2: TreeBin或链表结点数, 其它:链表结点数。主要用于每次加入结点后查看是否要由链表转为红黑树 int binCount = 0; for (Node<K,V>[] tab = table;;) { //CAS经典写法,不成功无限重试,让再次进行循环进行相应操作。 Node<K,V> f; int n, i, fh; //除非构造时指定初始化集合,否则默认构造不初始化table,所以需要在添加时元素检查是否需要初始化。 if (tab == null || (n = tab.length) == 0) tab = initTable(); //@2 //CAS操作得到对应table中元素 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //@3 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; //null创建Node对象做为链表首结点 } else if ((fh = f.hash) == MOVED) //当前结点正在扩容 //让当前线程调用helpTransfer也参与到扩容过程中来,扩容完毕后tab指向新table。 tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { //双重检查i处结点未变化 if (fh >= 0) { //表明是链表结点类型,hash值是大于0的,即spread()方法计算而来 binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; //onlyIfAbsent表示是新元素才加入,旧值不替换,默认为fase。 if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { //jdk1.8版本是把新结点加入链表尾部,next由volatile修饰 pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { //红黑树结点类型 Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { //@4 oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) //默认桶中结点数超过8个数据结构会转为红黑树 treeifyBin(tab, i); //@5 if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); //更新size,检测扩容 return null; } 

从上面代码可以看出,Put操作步骤如下:

  1. 参数校验
  2. 若table[]未创建,则初始化
  3. 当table[i]后面无节点时,直接创建node(不加锁)
  4. 如果当前正在扩容,则协助扩容
  5. 在链表或者红黑树中追加结点
  6. 最后判断是否链表长度到达阈值,有则转化为红黑树

@1 spread()

jdk1.8的hash策略,与以往版本一样都是为了减少hash冲突

@2 initTable()

private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); //正在初始化时将sizeCtl设为-1 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; //DEFAULT_CAPACITY为16 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); //扩容阈值为新容量的0.75倍 } } finally { sizeCtl = sc; //扩容保护 } break; } } return tab; } 

初始化时会通过CAS操作将sizeCtl置为-1,而sizeCtl由volatile修饰,保证修改对后面线程可见。
这之后如果再有线程执行到此方法时检测到sizeCtl为负数,说明已经有线程在给扩容了,这个线程就会调用Thread.yield()让出一次CPU执行时间。

@3 tabAt()/casTabAt()/setTabAt()

@4 TreeBin

红黑树是一种自平衡的二叉查找树

  • 每个节点要么是红色,要么是黑色。
  • 根节点永远是黑色的。
  • 所有的叶节点都是空节点(即 null),并且是黑色的。
  • 每个红色节点的两个子节点都是黑色。(从每个叶子到- 根的路径上不会有两个连续的红色节点)
  • 从任一节点到其每个叶子的所有简单路径都包含相同数目的黑色节点。

@5 TreeifyBin()

当链表节点数量大于8的时候,改用红黑树结构代替链表,将时间复杂度从O(N)变为O(logN)

private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); //如果数组整体容量太小则去扩容,放弃转红黑树结构 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); 

5.get操作

 public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode());// 定位到table[]中的i if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {// 若table[i]存在 if ((eh = e.hash) == h) {// 比较链表头部 if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0)// 若为红黑树,查找树 return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) {// 循环链表查找 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null;// 未找到 } 

从上面流程可以看出

  1. 首先定位到table[]中的i
  2. 若table[i]存在,就比较链表头部
  3. 如果头部就是要找的结点,返回该节点
  4. 如果是红黑树就查找树
  5. 如果这次没找到,则循环链表查找

本文地址:https://blog.csdn.net/qq_35580701/article/details/107899516