ConcurrentHashMap源码分析
1、前言
我们知道HashMap
是一个线程不安全的集合类,在多线程中我们是不能使用的,HashTable
虽然是线程安全的,但是通过同步方法来实现的,那在JDK中是否存在性能好的Map集合呢?答案是存在的,这便是我们今天所要讲解的ConcurrentHashMap
。本文参考的是JAVA 8
。
2、数据结构
ConcurrentHashMap的底层使用的数据结构同HashMap一样,均是数组+链表+红黑树,对HashMap不熟悉的可查看我之前介绍HashMap的文章,其结构如下所示:
Node节点的定义如下:
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
......
}
3、属性变量
在ConcurrentHashMap中的字段要比HashMap的字段多好多,这里一些常量就列出了,只给出几个成员变量,如下
// 数组
transient volatile Node<K,V>[] table;
// 在进行扩容是会进行初始化 将table上的数据迁移至该数组,然后将其复制给table
private transient volatile Node<K,V>[] nextTable;
// 当不存在冲突时 用来记录元素数量
private transient volatile long baseCount;
// 用于控制数组的初始化和扩容,不同情况有不同的值
// 0 默认值
// -1 代表有线程在进行初始化
// >0 数组为null时 代表数组的初始化长度
// >0 数组不为null时 代表扩容的临界值 数组长度*0.75
// <0 高十六位代表扩容标记 低十六位表示扩容的线程
private transient volatile int sizeCtl;
// 扩容时使用 代表处理数据的下标
private transient volatile int transferIndex;
// 判断CounterCells数组是否出入初始化或扩容状态
private transient volatile int cellsBusy;
// 在并发的情况下 修改数组元素的值来累加元素的个数
private transient volatile CounterCell[] counterCells;
这里需要注意的是sizeCtl
、baseCount
、counterCells
三个属性,在后面的源码中我们会讲解到这三个字段的用处,这里简单的说一下,sizeCtl用来控制数组的初始化和扩容,baseCount和counterCells用来记录元素的个数
,链表的总数等于baseCount + counterCells中元素值的和。
4、构造方法
在ConcurrentHashMap中提供了五种构造方法,如下
// 空参,采用默认大小16
public ConcurrentHashMap() {
}
// 指定数组长度 调用tableSizeFor计算出数组的初始长度 传递的为 1.5initialCapacity + 1
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
// 将数组的初始化值存入sizeCtl中
this.sizeCtl = cap;
}
// 通过Map创建
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
// 传入数组长度和负载因子
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
// 传入数组长度、负载因子和加权因子
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
// 数组长度传入sizeCtrl中
this.sizeCtl = cap;
}
通过构造方法我们知道在传递了数组长度的构造方法中,sizeCtl存放的是数组的初始化长度
,该变量的其他值我们在后面会一个个的讲解到。
5、插入数据 put方法
在这部分我们开始学习ConcurrentHashMap插入数据的逻辑,我们会了解到数组的初始化、插入、扩容及如何保证的线程安全,其源码如下:
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key和value不能为null 否则抛出NullPointException
if (key == null || value == null) throw new NullPointerException();
// 通过key的哈希值计算出node节点的hash属性值
int hash = spread(key.hashCode());
int binCount = 0;
// 自旋
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 初始化数组
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 元素在数组对应的位置上为null 使用cas操作将节点插入该位置
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 数组对应的位置不为null 该节点的hash值为-1
// 在transfer方法中进行设置
else if ((fh = f.hash) == MOVED)
// 帮助扩容
tab = helpTransfer(tab, f);
else {
// 解决hash冲突
V oldVal = null;
// 同步代码块,锁住的是数组指定位置上的Node对象,不影响其他线程操作别的位置上的数据
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
// 用于记录插入该数据的链表的长度 最后判断是否需要转换为红黑树
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 链表上存在对应的key 覆盖value值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 不存在key 在链表尾部插入新节点
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// key不存在新增加节点时进入该逻辑
if (binCount != 0) {
// 链表长度大于等于8
if (binCount >= TREEIFY_THRESHOLD)
// 转为红黑树或者扩容
// 数组的长度小于64进行扩容
treeifyBin(tab, i);
// 若key存在 返回原值
if (oldVal != null)
return oldVal;
// 退出自旋
break;
}
}
}
// 元素数据增加 扩容的逻辑
addCount(1L, binCount);
return null;
}
通过源码我们发现在put方法中ConcurrentHashMap是通过自旋加CAS及同步代码块来保证的线程安全,代码块锁住的是数组上的某个元素,这样就允许了对数组上其他元素的操作不被阻塞,可以提高并发。其整个处理流程如下图所示
在了解了put方法的处理过程后,我们再来详细的介绍下几个方法
-
initTable
数组初始化 -
helpTransfer
帮助扩容 -
treeifyBin
转换为红黑树 -
addCount
维护集合长度和扩容
5.1 数组的初始化
数组的初始化是在initTable
方法中处理的,该方法的源码如下:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 自旋
while ((tab = table) == null || tab.length == 0) {
// sizeCtl == -1代表有其他线程在进行初始化
if ((sc = sizeCtl) < 0)
// 释放cpu资源
Thread.yield(); // lost initialization race; just spin
// 通过CAS操作 将sizeCtl设置为-1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 创建数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 数组长度的四分之三
sc = n - (n >>> 2);
}
} finally {
// 初始化完成后 将sizeCtl设置为数组长度的四分之三
sizeCtl = sc;
}
// 退出自旋
break;
}
}
// 返回数组
return tab;
}
在该方法中可以看到使用的自旋加CAS操作保证的线程安全,在初始化时首先通过CAS操作将sizeCtl的值设置为-1,此时再有其他线程过来进行初始化时会直接释放掉CPU资源,在这里我们有看到了sizeCtl的其他意义
- sizeCtl == 0 代表未指定数组大小 初始化长度为16的数组
- sizeCtl == -1 代表有线程在进行初始化
- sizeCtl > 0 且 数组为null 代表初始化数组的长度
- 数组不为null sizeCtl > 0 需要扩容的值,为数组长度的四分之三
5.2、addCount方法
通过上面的流程图我们会发现,当插入新的节点时会调用该方法,该方法是用来记录集合中的数据个数并且处理是否需要扩容的逻辑,其源码如下:
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// counterCells为null且CAS修改baseCount的值成功,代表不存在线程竞争
// 此时用baseCount累加的方式来记录元素的数量
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// counterCells是否为空
// counterCells的长度是否小于1
// 随机从counterCells中取一个元素 判断是否为null
// CAS操作取出的元素的值 判断是否成功
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 该方法用来处理CounterCells的初始化或扩容
fullAddCount(x, uncontended);
return;
}
// check代表链表插入数据之前的长度
if (check <= 1)
return;
// 统计元素个数
s = sumCount();
}
// 链表原来的长度不为0 即原来在数据的对应位置存在数据
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 自旋 判断数据个数是否达到扩容值(数组长度*0.75)
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 获取扩容戳 算法为
// Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1))
// Integer.numberOfLeadingZeros获取n的高位连续的非0位的个数
// RESIZE_STAMP_BITS = 16
int rs = resizeStamp(n);
if (sc < 0) {
// 正在进行扩容
// sc左移16位是否等于得到的扩容标记
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// 扩容的逻辑
transfer(tab, nt);
}
// 第一次进行扩容时将sizeCtl设置为rs右移16位加2
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
到这里我们看到了CounterCell
数组,在前面的属性字段介绍时提到过该数组是用来记录元素的个数的,在大多数的集合中都维护了一个记录集合大小的字段,但在ConcurrentHashMap中使用的是一个baseCount和一个数组来记录元素的个数,这是为什么呢?这是因为ConcurrentHashMap是一个多线程安全的集合,采用这种方式,可以减少在高并发的情况下维护数据个数而带来的性能消耗,其源码如下:
@sun.misc.Contended static final class CounterCell {
// 记录元素的数量
volatile long value;
CounterCell(long x) { value = x; }
}
// 获取元素的数量 遍历counterCell获取累加其value值加上baseCount值即为元素的总数
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
// 遍历counterCells在baseCount的基础上累加
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
// 返回数据的总数
return sum;
}
在扩容那里我们再次看到了sizeCtl
的身影,该字段在这部分存储的数据如下:
- 高十六位 扩容标记
- 低十六位 进行扩容的线程数量
5.2.1、扩容逻辑 transfer方法
transfer
方法是用来进行扩容操作的逻辑,也是挺复杂的一个逻辑,源码如下:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 第一个进行扩容的线程传递的该参数为null
if (nextTab == null) { // initiating
// 初始化 nextTable 的逻辑
try {
@SuppressWarnings("unchecked")
// 创建一个长度为数组两倍的Node数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
// 赋值给nextTable变量
nextTable = nextTab;
// 设置transferIndex的值
transferIndex = n;
}
int nextn = nextTab.length;
// 创建一个ForwardingNode hash值为-1 用来表示一个需要移动的节点
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 为当前线程分配操作的区域
while (advance) {
int nextIndex, nextBound;
// --i获取下个数组上的位置
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// CAS操作 修改transferIndex的值
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 退出扩容的逻辑
// 在上个循环中会为i进行赋值 初始为15
// 小于0代表节点处理完成
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 扩容完成
if (finishing) {
// nextTable设置为null
nextTable = null;
// 将nextTable赋值给数组
table = nextTab;
// 修改sizeCtl 是新数组长度的0.75
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 线程执行完扩容后会通过CAS操作将sizeCtl的值减1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 判断是否为最后一个在执行扩容的线程线程
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
// 不是最后一个扩容,直接退出
return;
// 是最后一个扩容的 退出
finishing = advance = true;
// 赋值为原数组长度 再进行一次检查
i = n; // recheck before commit
}
}
// 数据迁移的逻辑 将table上的数据迁移值nextTab上
// 节点不存在数据 直接添加占位的节点
else if ((f = tabAt(tab, i)) == null)
// 将fwd放入该位置进行占位 表示处理完成
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
// 表示已经处理过 直接略过
advance = true; // already processed
else {
// 数据迁移 锁对象是当前节点
// 是将数据迁入nextTab中
synchronized (f) {
if (tabAt(tab, i) == f) {
// 两个链表,分别存储 在新链表位置不变和改变的节点
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 处理完成后会将该位置设置为fwd节点 进行占位
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
通过ConcurrentHashMap
的扩容代码我们能够看出,扩容时是新建一个长度为原数组长度两倍的数组,然后遍历原数组,挨个将节点移至新创建的数组,处理完成后用新数组替换原数组。逻辑比较复杂,需要静下心来查看这段代码。
5.3、帮助扩容 helpTransfer
通过5.2部分的分析,我们了解了ConcurrentHashMap的扩容过程,也知道了在什么时候会将节点的hash设置为MOVED,接下来我们看一下插入时遇到这种情况的处理逻辑,helpTransfer方法的逻辑如下:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// tab != null 代表数组已经初始化
// ForwardingNode 代表在扩容时已经对该节点处理完成且扩容尚未结束
// f.nextTable != null 代表扩容尚未完成
// rs 为扩容标记 算法已在前面讲解过
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
// 判断是否出入同一次扩容中
// 扩容是否完成
// 扩容线程是否达到最大值
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// CAS操作将sizeCtl的值加1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 调用扩容逻辑
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
至此,ConcurrentHashMap的put方法逻辑我们已经学习完了,虽然数据结构同HashMap一直,但为了保证集合的多线程安全,处理逻辑要比HashMap复杂许多,需要大家静下心来慢慢看及思考。在学习了put方法的逻辑后,我们已经将最复杂的一个逻辑学完了,接下来的内容就比较轻松了。
更多文章可以关注我的公众号,会更新Java相关的技术文章
推荐阅读
-
jdk源码——集合(ConcurrentHashMap)
-
ConcurrentHashMap源码整理
-
ConcurrentHashMap源码分析
-
ConcurrentHashMap源码深入解析
-
简述python 的模块的分析
-
awstats日志分析小结(1) nginxAccesslighttpdTomcatApache
-
java流的总结——从原理分析 博客分类: java IO javaio数据源字节字符
-
windows下Elasticsearch+Logstash+Kibana日志收集分析系统安装教程 博客分类: 开源软件 elasticsearchkibanalogstash
-
elasticsearch2.0源码在开发环境eclipse中启动的问题及解决方案 博客分类: 开源软件 elasticsearch
-
elasticsearch2.0源码在开发环境eclipse中启动的问题及解决方案 博客分类: 开源软件 elasticsearch