高并发下ConcurrentHashMap到底是怎么实现线程安全的?
前言
这几天一直在看Java并发相关的知识,又学习了很多新的知识。看见大佬们一个个更博速度如此之快,可想知识渊博如滔滔江水。这一周就学习一些并发知识,搭建了一个ftp服务器,不知道小可爱们这周学习了那些知识呢?微信公众号【JustKeepCoding】
抛出正题:ConcurrentHashMap是如何实现线程安全
,这里面又隐含那些玄机呢?
上一节讲述了Hashtable这个结构虽然线程安全
,但是效率不高
,就是因为他的每个操作都使用了synchronized
同步块。因为synchronized同步块的线程如果拿到了锁,就会阻塞其他线程,阻塞线程会使操作系统从用户态转为阻塞态,从而加大性能消耗。
所以在高并发的场景下,使用ConcurrentHashMap是最合适不过的了。那么问题来了,为什么都喜欢使用ConcurrentHashMap这个结构,它到底是怎么实现线程安全和并发度的呢?
ConcurrentHashMap的底层到底是怎么实现的?
ConcurrentHashMap是面试必问的知识点,里面涵盖的知识也比较多,为此我们还是通过源码分析再切入到Doug Lea
(作者)使用的其他技术。
JDK 1.7的ConcurrentHashMap用了哪些技术?
1.7函数构造和变量等
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
static final int DEFAULT_INITIAL_CAPACITY = 16;//默认容量,segments的个数
static final float DEFAULT_LOAD_FACTOR = 0.75f;//负载因子
static final int DEFAULT_CONCURRENCY_LEVEL = 16;//
static final int MAXIMUM_CAPACITY = 1 << 30;//最大容量,还是左移30位
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;//最小的segment容量为2
final Segment<K,V>[] segments;//重要组成
}
采用分段锁
技术,将ConcurrentHashMap的内部分成多个Segment段,每个Segment可以
Segment组成
static final class Segment<K,V> extends ReentrantLock implements Serializable {
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
transient volatile HashEntry<K,V>[] table;//类似于HashMap中的Entry
transient int modCount;//修改次数
transient int threshold;//阈值
final float loadFactor;//加载因子
}
segment内部是一个HashEntry<K,V>[] table
,跟JDK1.7版本的HashMap中的Entry类似,都是数组加链表的形式
。但是这里多了一个volatile关键字修饰,这样做到底有什么好处呢?
volatile关键字
要弄清楚volatile关键字的来龙去脉需要具备Java内存模型、CPU缓存模型等知识。那么首先从并发编程的三个重要特性讲起:原子性、有序性和可见性。
- 原子性
原子性就是指在一次操作或多次操作中,要么所有的操作同时成功,那么同时失败。这个在上一篇MySQL的基础讲到过,比如我向淘宝买牙刷,我支出20,那么淘宝就要收到20。要么我转不出来,淘宝也收不到转。要么我支出成功,淘宝也收到成功。不可能存在,我支出但淘宝收不到的操作。 - 有序性
有序性就是指代码执行过程中的先后顺序,由于Java在编译期间的优化,导致代码执行顺序未必是顺序执行,因为JVM可能会指令重排序。 - 可见性
可见性是指当一个线程对一个变量进行修改,另一个线程可以立即看到修改的最新值。这就类似于MySQL的读取未提交的事务隔离级别,防止读取脏数据。
那么JVM是如何来保证可见性
的呢?
- 通过volatile关键字,对于共享资源的读操作会直接在主内存中进行,对于共享数据的写操作当然是先修改工作内存,但是修改后会主动刷新到主内存中。
- 通过synchronized关键字,synchronized只能保证同一时刻只有一个线程获得锁,然后执行同步方法,并且还会保证在锁释放之前,会将对变量的修改会主动刷新到主内存。
- 通过JUC提供的显式锁Lock也能保证,Lock的lock方法能够保证在同一时刻只有一个线程获得锁然后执行同步方法,并保证在锁释放之前(Lock的unlock)将修改的变量刷新到主内存。
我们假设一个场景,当一个读线程自身有个Init_value的值,每次都需要跟当前值比较,如果值被修改了就打印输出,没修改就不输出。当一个修改线程,每次都会修改当前的值,修改会会进行sleep睡眠10毫秒,用于等待读线程输出,即使其交替输出。(因为代码太长就不贴了)
你猜结果是怎样的?
结果是修改线程一直输出,读线程在控制台没有任何打印。这是因为,共享数据开始是缓存到CPU的cache快里,即读线程的本地内存中。即使修改线程修改数据,读线程也是从本地取,而本地内存值没有被修改,所以不会输出。那么使用volatile关键字,就可以强制读线程从内存刷新到本地线程内存(CPU的cache中)。
volatile关键字语义:
- 保证可见性
- 保证顺序性
- 不保证原子性
所以回到HashEntry这里,采用volatile关键字就是为了并发环境下保证HashEntry的有序性和可见性。
ReentrantLock 的使用与理解
从源码可以看到Segment继承ReentrantLock,ReentrantLock 又是一个很重要的知识点,我们都知道可以用互斥同步
来保证线程的安全性,而最基本的同步互斥手段就是synchronized
和java.util.concurrent
(J.U.C)包下的ReentrantLock 来实现同步。在基本用法上,ReentrantLock 和synchronized很相似,但在JDK1.5之前ReentrantLock 的功能是比较丰富的,在JDK1.8中运用广泛。
JDK 1.8的ConcurrentHashMap又采用了哪些技术呢?
悲观锁策略(阻塞同步)
互斥同步最主要的问题就是线程阻塞和唤醒带来的性能问题,因此这种同步也称阻塞同步。从处理方式来讲,互斥同步是一种悲观的并发策略,总是认为只要不去做正确的的同步措施(列入加锁),那肯定会出现问题,无论共享数据是否出现竞争,都需要加锁。(需要线程挂起
)
乐观锁策略(非阻塞同步)
基于冲突检测的乐观并发策略就是先进行操作,如果没有其他线程争用共享资源,那么操作就成功了。如果共享数据有争用,出现冲突,那就再采取其他的不就措施。(不需要线程挂起
)
put操作
public V put(K key, V value) {//定位到segment
Segment<K,V> s;
if (value == null)
throw new NullPointerException();//如果为NULL则会出现异常
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
return s.put(key, hash, value, false);//再从HashEntry中插入
}
/**
**segment数组中的put操作,类似于HashMap中的put
**/
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);//自旋锁策略
V oldValue;//保存旧值
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;//计算segment[i]中的table的具体位置,这些就类似hashmap的操作了
HashEntry<K,V> first = entryAt(tab, index);//寻找table其中一个头结点
for (HashEntry<K,V> e = first;;) {//遍历链表
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {//存在值相同的结点
oldValue = e.value;//直接覆盖
if (!onlyIfAbsent) {//状态存在
e.value = value;
++modCount;//修改次数加一
}
break;
}
e = e.next;//更换下一个节点
}
else {
if (node != null)//证明存在值
node.setNext(first);//头插法
else
node = new HashEntry<K,V>(hash, key, value, first);//空节点则直接复制进去
int c = count + 1;//
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
自旋锁
/**
**扫描HashEntry节点去获取锁(自旋锁)
**/
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);//获取插入值当前链表头结点
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;//新的节点
int retries = -1; // 尝试获取锁的次数,初始为-1
while (!tryLock()) {//没有获取到锁的话
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {//进入第一种情况
if (e == null) {//头结点为空,创建新的节点
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);//为了少做一点事,以后直接使用
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {//第二种情况,超过自旋次数加以阻塞
lock();//获取不到锁,则阻塞等待
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
//因为当前没有获取锁,可能头结点被更另一个线程更改,需要判断是否是之前的头结点。如果不是则重新循环判断
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
tryLock()
会尝试获取锁,获取不到就返回false,获取的到就返回true,做其他事,并且不会阻塞。Lock()
方法如果获取不到锁就会阻塞。
流程图:
扩容操作
/**
**扩容操作,不需要rehash,直接使用之前的hash
**/
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;//旧的hash表
int oldCapacity = oldTable.length;//之前的hash表长
int newCapacity = oldCapacity << 1;//扩容两倍
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];//扩容后的hash表
int sizeMask = newCapacity - 1;//为了计算数组下标
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;//每次记录最后一串index相同的节点,重新赋值
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {//遍历节点,记录最后index值相同的一串节点
int k = last.hash & sizeMask;
if (k != lastIdx) {//最后的节点的index不跟数组的index值相同时
lastIdx = k;//更新到最后节点的index
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {//将最后一串index值相同的重新移到新的数组下标里
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // 对新增的节点进行获取index
node.setNext(newTable[nodeIndex]);//头插法
newTable[nodeIndex] = node;
table = newTable;
}
get操作
get 就比较简单啦,它是将 Key
通过Hash
之后定位到具体的 Segment
,再通过一次 Hash 定位到具体的元素上。
即使多个线程对HashEntry修改,有volatile
修饰的HashEntry,每次都会get到最新值。
结论
为什么线程安全呢,就是因为在1.7版本它的每个方法都会加锁,put采用自旋锁不会使线程阻塞(操作状态切换会消耗性能),从而性能比hashtable好很多。而且它的每个HashEntry[i]都是被volatile
修饰,可以保证线程操作的可见性,即每次不会脏读,即使其他线程修改了值,都会强制刷新到本地内存
。
那为什么并发度高呢?
因为是对单个segment[i]进行加锁,意思就是segment如果有16个,那么可以同时有16个线程修改
而且还是线程安全的。相对于Hashtable的锁,是锁定整个Hashtable对象,那么多个线程访问就需要被阻塞。
JDK1.8的ConcurrentHashMap
1.8版本取消了1.7中的分段锁策略,使用了CAS + synchronized
来保证安全性。这跟1.8版本的HashMap也很相似,引入了红黑树,在链表节点数大于8的时候会转换成红黑树。
1.8源码构造
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
private static final int MAXIMUM_CAPACITY = 1 << 30;//最大容量
private static final int DEFAULT_CAPACITY = 16;//默认容量
private static final float LOAD_FACTOR = 0.75f;//负载因子
static final int TREEIFY_THRESHOLD = 8;//链表节点数大于8 转红黑树
static final int UNTREEIFY_THRESHOLD = 6;//删除红黑树节点小于6转链表
transient volatile Node<K,V>[] table;//节点桶,volatile修饰
private transient volatile Node<K,V>[] nextTable;//下一张表
}
1.8的put方法
final V putVal(K key, V value, boolean onlyIfAbsent) {//put中调用putVal
if (key == null || value == null) throw new NullPointerException();//插入的值不能为空
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {//遍历表节点
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();//初始化表
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//获取table[i]的头结点是空
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))//CAS操作,写入node
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)//当此时另一个线程正在扩容
tab = helpTransfer(tab, f);//当前线程帮助扩容,增加效率
else {//进入table节点,判断当前节点是链表节点还是红黑树节点
V oldVal = null;
synchronized (f) {//对表头结点获取锁资源
if (tabAt(tab, i) == f) {//重新判断一下是否被修改,被修改返回上级重新获取
if (fh >= 0) {//链表节点
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {//遍历链表节点
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {//相同直接覆盖
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {//尾插法 插入节点
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {//红黑树节点
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)//链表节点大于8
treeifyBin(tab, i);//转红黑树
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);//统计节点数
return null;
}
TreeBin
保证了即使红黑树自平衡,也不会释放锁资源。因为如果使用HashMap的红黑树结构的话,当插入节点会自平衡使头结点更换,而头结点是获取到锁了,如果头结点更换,其他线程此时就会获取到该节点的锁,造成插入的异常。
put流程:
- 1.先判断key和value是否为空,为空抛出异常
- 2.计算出hash值,遍历表结构,进入下面三种情况
- 是否需要初始化表,空则初始化,再进入2
- 获取table[i]的头节点,空则用
CAS
插入(key,value)
,进入2 - 看是否当前有线程正在扩容,有则帮忙扩容
helpTransfer
,进入2 -
synchronized
获锁资源,判断是链表节点还是红黑树节点,插入
- 3.统计节点数
- 返回空
比较并替换(Compare-and-swap,简称为CAS)
乐观锁策略需要CAS指令来保证进行,就是实现乐观锁
的一种方式,CAS操作是由sun.misc.Unsafe类中的compareAndSwapInt()
和compareAndSwapLong()
等几个方法包装提供,是一种轻量级锁,J.U.C中很多工具类就是基于CAS实现的,比如CompareAndSet
和getAndIncrement()
等都使用了Unsafe类
的CAS操作。
CAS操作流程就是线程在读取数据时不进行加锁,在准备写回数据时,比较原值是否修改,若未被其他线程修改则写回,若已被修改,则重新执行读取流程。
1.8 get操作
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {//hash桶存在,且当前的桶不为空
if ((eh = e.hash) == h) {//如果是红黑树,按照红黑树获取值
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)//按照链表获取
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
总结:
1.8在1.7上做的改动还是挺大的,ConcurrentHashMap这个数据结构的知识还是挺多的,比如CAS的操作和理解、1.8中synchronized关键字的优化、1.8中的扩容方法插入方法、红黑树自平衡等等。都是Java程序员面试的必备知识点,这节又写了11000多字,挺辛苦的,所以能够给我点个赞吗?
最后抛出两个问题:
- 1.8中的ConcurrentHashMap为什么没有使用
Lastrun
(还记得1.7中扩容的Lastrun吗) - unsafe类你属于吗?Java程序中最牛逼的类,为什么最牛逼呢?
你的点赞就是我最大的努力!关注我的公众号【JustKeepCoding】,每周两篇为你推送不一样的知识!
本文地址:https://blog.csdn.net/liyuanbo1997/article/details/107580664