jdk1.7 ConcurrentHashMap源码
ConcurrentHashMap类的成员:
static final int DEFAULT_INITIAL_CAPACITY = 16;
//默认的map的容量
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//默认负载因子
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//默认分段锁数量
static final int MAXIMUM_CAPACITY = 1 << 30;
//map最大数量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//每个分段锁最小容量
static final int MAX_SEGMENTS = 1 << 16;
//最大分段锁数量
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
//当一个线程在试图往一个分段锁存数据的时候如果在一开始取得锁失败之后,
//再建立键值对的基础下循环往复尝试获得锁的最大尝试次数。
final int segmentMask;
final int segmentShift;
final Segment<K,V>[] segments;
// segments是map中存储分段锁的数组
transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;
transient Collection<V> values;
在这个类中,在得到key的hash之后,会取hash的高segmentShift位与segmentMask按位相与,所得值结果作为来选择map中分段锁的依据。
在map中通过内部类HashEntry来存储键值对。下面是HashEntry成员:
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
在1.7的ConcurrentHashMap中通过链表来解决碰撞的问题。所以next指向链表下的下一个冲突键值对。
entryAt()方法:直接从所需要查找的HashEntry数组中取得相应下标的键值对:
static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
return (tab == null) ? null :
(HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)i << TSHIFT) + TBASE);
}
在这其中通过unsafe在静态块中的配置直接对内存地址偏移量进行操作。
ConcurrentHashMap有大量这样的操作。
private static final sun.misc.Unsafe UNSAFE;
private static final long SBASE;
private static final int SSHIFT;
private static final long TBASE;
private static final int TSHIFT;
private static final long HASHSEED_OFFSET;
private static final long SEGSHIFT_OFFSET;
private static final long SEGMASK_OFFSET;
private static final long SEGMENTS_OFFSET;
static {
int ss, ts;
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class tc = HashEntry[].class;
Class sc = Segment[].class;
TBASE = UNSAFE.arrayBaseOffset(tc);
SBASE = UNSAFE.arrayBaseOffset(sc);
ts = UNSAFE.arrayIndexScale(tc);
ss = UNSAFE.arrayIndexScale(sc);
HASHSEED_OFFSET = UNSAFE.objectFieldOffset(
ConcurrentHashMap.class.getDeclaredField("hashSeed"));
SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset(
ConcurrentHashMap.class.getDeclaredField("segmentShift"));
SEGMASK_OFFSET = UNSAFE.objectFieldOffset(
ConcurrentHashMap.class.getDeclaredField("segmentMask"));
SEGMENTS_OFFSET = UNSAFE.objectFieldOffset(
ConcurrentHashMap.class.getDeclaredField("segments"));
} catch (Exception e) {
throw new Error(e);
}
if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0)
throw new Error("data type scale not a power of two");
SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
}
以上是unsafe在静态块中通过本地方法对内存地址偏移量的设置。
下来看ConcurrentHashMap中作为内部类的分段锁segment的相关分析。
Segment直接继承了ReentrantLock类,可以直接作为锁来执行加锁解锁操作。同时,可以看下面分段锁中的成员。
transient volatile HashEntry<K,V>[] table;
// table是分段锁中存放键值对的散列数组。
transient int count;
// count为分段锁的元素个数。
transient int modCount;
// modCount作为分段锁中数据被修改的次数。
transient int threshold;
// threshold代表分段锁中容量乘负载因子的值
// 当分段锁中的元素超过这个值就会尝试给这个分段锁扩容
final float loadFactor;
// 负载因子
ConcurrentHashMap的构造方法:
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 构造方法需三个参数。分别是初始化的容量,负载因子,并发度。
// 这三个参数全非负,否则抛异常。
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// 若并发度大于最大分段锁数量,将会直接被赋值为最大分段锁数量。
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0);
this.segments = ss;
}
构造方法需三个参数。分别是初始化的容量,负载因子,并发度。
这三个参数全非负,否则抛异常。
并发度不能大于最大分段锁数量,否则将会直接赋为最大分段锁数量。Map的初始化容量也一样。
接下来使得map中分段锁的数量,是2的第一个大于并发锁数量的次方值。在不断扩大分段锁的数量同时不断扩大在定位分段锁的位移量。
接着给segmentShift和segmentMark依次根据之前并发度得到的结果进行赋值。
(例如说,当并发度为31时,segmentMask为31,segmentShift为5,map中一共存在32个分段锁,接下来将key所得的hash高5位与111111(31)按位相与所得到的结果(一共与32种可能)来定位要存放的分段锁)。
下来根据分段锁数量以及map的初始化容量来确定每个分段锁的容量。
接着初始化第一个分段锁(负载因子,每个分段锁容量与负载因子相乘的结果,每个分段锁容量的键值对数组)。
初始化完毕segment数组后将第一个分段锁放进去,构造方法结束。
ConcurrentHashMap的put()方法:
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
在获得key的hash之后取得hash的高segmentShift位与segmentMask按位相与,结果直接根据分段数组的内存地址偏移量取得所需要的分段锁。
如果分段锁还没初始化,则调用ensureSegment()方法确保该位置的分段锁建立以及初始化。
ensureSegment()方法:
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE;
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0];
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
在构造方法中,我们只建立了分段锁数据的第一个分段锁,与此同时,在这里,我们将以第一个分段锁作为原型将其的信息作为依据建立新的分段锁。
当定位到了所要存放的分段锁的位置后,调用该分段锁的put()方法。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
最初就尝试加锁,如果加锁失败就会调用scanAndLockForPut()方法来先找到分段锁中相应的位置建立键值对,再试图取得锁的过程。
scanAndLockForPut():
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1;
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
if (e == null) {
if (node == null)
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f;
retries = -1;
}
}
return node;
}
在这期间,如果尝试取得锁的次数超过最大的数量,则会直接取得锁,执行接下来的操作。
注意,如果在这期间,相应分段锁上的hash位置上的头结点如果发生了变化,则会导致尝试次数重置。
在成功取得锁之后,则会根据键值对数组的长度减一与hash按位相与确定在键值对数组上的位置。如果已经有在该位置的结点,则直接把新节点放在整个链表后面。(如果key已经存在,则更新value值,并且modCount+1)在加入节点的过程中如果在尝试取得锁的过程中没有新建立节点,则需要重新建立节点,如果建立了,则设置该节点的前驱节点。此时如果个数已经大于分段锁的容量乘负载因子同时键值对数组的长度还是小于最大容量的,则需要调用rehash()进行分段锁的扩容操作。
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null)
newTable[idx] = e;
else {
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask;
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
扩容首先直接将原先的分段锁容量往左移一位,建立新的长度为新容量的键值对数组,根据新容量来确定新的与负载因子相乘的结果来控制下次扩容操作。取容量减一作为标准来控制老节点在新键值对数组的位置。
这里假设,一个键值对的hash是7,原容量为16,原来由于7(0111)与15(1111)按位相与的结果为0111,结果就是在扩容前的键值对数组的7上。那么扩容后,容量左移一位变为32,7(00111)与31(11111)按位相与之后仍旧在原位置。但是有一个键值对hash为55,容量16的条件下,55(1110111)与15(001111)按位相与结果则是0111,则也是在7上,但扩容为32后,55(110111)与31(011111)按位相与结果则是10111,在23(10000+ 00111)上。
具体是否需要更换位置以该节点的上一个key为基准,防止第一个key过于特殊。
完成旧键值对数组中的成员往新的键值对数组的转移,将所要放入的新节点放入新的数组的相应位置。
最终解锁,整个过程完毕,在整个get()方法中都没有加锁,在之前put()里根据hash存数据的前提下取得数据。
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
本文地址:https://blog.csdn.net/weixin_43257196/article/details/110195832