欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【java】ConcurrentHashMap1.7源码详解

程序员文章站 2024-02-11 21:25:16
...

前言

随着高并发时代的到来,原有的HashMap已经不能满足基本的需求,在HashMap1.7中,多线程下可能出现的的死循环是致命的。但在java api的juc包中有这样一个类:ConcurrentHashMap,它基于HashMap1.7且线程是安全的,本篇博文会仔细对它进行讲解。强烈建议,在阅读本篇博文前,先阅读 HashMap1.7源码详解

博主默认读者是了解HashMap1.7基本原理的。因此,对于它们相同的部分将不再赘述。在下面所提到的HashMap和ConcurrentHashMap,如无特殊说明,都基于JDK1.7。

HashTable与ConcurrentHashMap

HashTable为保证线程安全付出的代价太大,get()、put()等方法都是synchronized的,这相当于给整个哈希表加了一把大锁。在并发调用HashTable的方法时就会造成大量的时间损耗。
ConcurrentHashMap的设计就显得非常巧妙,它采用分段加锁的方式保证线程安全,而不是将整个哈希表进行加锁,减少了线程阻塞的损耗时间。

数据结构

Segment + HashEntry

每个HashEntry结构都相当于HashMap中的一个哈希表(数组+链表)
【java】ConcurrentHashMap1.7源码详解
ConcurrentHashMap采用分段锁的机制,用Segment来分割整张哈希表;在对不同分段进行操作时,可以做到互不干扰,避免加锁。

Segment类

static final class Segment<K,V> extends ReentrantLock implements Serializable {}

Segment类继承了ReentrantLock类,ReentrantLock和synchronized都是可重入的独占锁,只允许线程互斥的访问临界区,这就验证了ConcurrentHashMap是基于Segment段来加锁的。它实现了Serializable接口,可进行对象的序列化与反序列化。

属性

// 在强制加锁前的最大尝试次数
// availableProcessors()返回java虚拟机的可用处理器数
static final int MAX_SCAN_RETRIES =     		
    Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
// 每个分段锁里有一个数组
transient volatile HashEntry<K,V>[] table;
// 数组结点数
transient int count;
transient int modCount;
// 扩容阈值
transient int threshold;
// 加载因子
final float loadFactor;

从上面来看,Segment类几乎涵盖了HashMap里的所有核心属性,这也意味着每个Segment对象都相当于一个HashMap,这也就是分段思想的核心。需要强调的是MAX_SCAN_RETRIESmodCount,在ConcurrentHashMap中,一些方法在执行时不是直接加锁,而是通过连续的多次遍历来确定原哈希表是否被别的线程修改了。判断的依据就是modCount是否改变,而循环遍历的次数也不能没有限制,MAX_SCAN_RETRIES就是确定加锁前最大尝试次数的。具体的使用将在代码中进行讲解。

put(K key, int hash, V value, boolean onlyIfAbsent)

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
	// tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,
	// 则返回true,如果获取失败(即锁已被其他线程获取),则返回false,
	// 也就说这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待。
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        // 取得数组下标
        int index = (tab.length - 1) & hash;
        // 得到链表首结点
        HashEntry<K,V> first = entryAt(tab, index);
        // 查找是否有相同的key,若有,则根据onlyIfAbsent来确定是否进行替换
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                // key相同或满足equals条件
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    // onlyIfAbsent: 如果key不存在才增加
                    if (!onlyIfAbsent) {
                        e.value = value;
                        // 修改次数加1
                        ++modCount;
                    }
                    break;
                }
                // 如果当前结点不为空,查询下一结点
                e = e.next;
            }
            // 链表为空,或到了末结点并未找到目标key
            else {
           		// 结点不为空,说明scanAndLockForPut()有返回值
                if (node != null)
               		// 结点前插法,hashmap1.7也用的前插法
                    node.setNext(first);
                // 第一句中tryLock()成功
                else
               		// 前插添加结点,将first作为node的下一结点
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                // 达到扩容阈值且未到最大容量,进行扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                // 将新的头结点组成的链表放到数组中
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    // 保证无论出现任何情况都要解锁
    } finally {
        unlock();
    }
    return oldValue;
}

scanAndLockForPut(K key, int hash, V value)

// 方法作用:创建新结点或找到key相同的结点并加锁,为添加做准备
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
	// 找到该hashcode对应的链表的头结点
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // 定位节点为负
    // 循环获取锁,线程安全
    while (!tryLock()) {
        HashEntry<K,V> f; // 请在下面重新检查
        if (retries < 0) {
       		// 链表为空或遍历完链表未找到key相同的结点
            if (e == null) {
           		// 在最下面的else if中可能会将retries置为-1
           		// 所以还可能再次进入这里,需要判断node是否为空
           		// 这里虽然创建了新的结点,但是并没有链在链表上,e依然为空             		
                if (node == null) // 创建新结点
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            // 如果键相同,就不用new新结点
            else if (key.equals(e.key))
                retries = 0;
            // 未到末结点,且不为当前结点,则查询下一个
            else
                e = e.next;
        }
        // 下面的代码出现情况:1、new出新结点 2、找到key相同的结点
        
        // 如果扫描次数大于阈值,则强制获取锁
        else if (++retries > MAX_SCAN_RETRIES) {
       		// 超过最大尝试次数就强制加锁,若获取不了锁,则会阻塞(对本Segment对象加锁)
            lock();
            break;
        }
        // (retries & 1) == 0,当最低位不为1时成立			
		// (retries & 1) == 0,没有这一句将可能会造成死循环
		// 死循环:如果上面的if总是将retries置为0,而MAX_SCAN_RETRIES >= 1
		// 若没有(retries & 1) == 0限制,那下面总是进行重置扫描,即死循环
        else if ((retries & 1) == 0 &&
   				 // 如果链表发生变化,代表先于当前线程的线程对链表进行了修改
   				 // 一旦链表关系发生变化,重新遍历查询加锁是必然的
                 (f = entryForHash(this, hash)) != first) {
            e = first = f;
            retries = -1;
        }
    }
    return node;
}

// 此方法与上面的方法基本一致,但更高效,用来加锁
private void scanAndLock(Object key, int hash) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    int retries = -1;
    while (!tryLock()) {
        HashEntry<K,V> f;
        if (retries < 0) {
            if (e == null || key.equals(e.key))
                retries = 0;
            else
                e = e.next;
        }
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                 (f = entryForHash(this, hash)) != first) {
            e = first = f;
            retries = -1;
        }
    }
}

上面这个方法难度是比较大的,读者需要深刻理解每一行代码,有利于后面源码的阅读,它也验证了MAX_SCAN_RETRIES的使用场景。

entryAt(HashEntry<K,V>[] tab, int i)

// 使用volatile获取给定表的第i个元素,直接通过偏移地址读取
@SuppressWarnings("unchecked")
static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
    return (tab == null) ? null :
   		// getObjectVolatile()根据tab对象和偏移长度获得对应的属性
   		// 这里就是获得数组偏移长度的元素,Volatile保证可见性和有序性
        (HashEntry<K,V>) UNSAFE.getObjectVolatile
        // TSHIFT:数组每个元素的偏移长度,TBASE:数组首地址的偏移量
        // 由这两个参数可以得到该元素的物理地址
        (tab, ((long)i << TSHIFT) + TBASE);
}

setEntryAt(HashEntry<K,V>[] tab, int i, HashEntry<K,V> e)

// 使用volatile写设置给定表的第i个元素。
static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
                                   HashEntry<K,V> e) {
    // 通过偏移地址将e放到tab的指定位置
    UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}

关于Unsafe类,本文不进行讲解。

private static final sun.misc.Unsafe UNSAFE;
private static final long SBASE;
private static final int SSHIFT;
private static final long TBASE;
private static final int TSHIFT;
private static final long HASHSEED_OFFSET;
private static final long SEGSHIFT_OFFSET;
private static final long SEGMASK_OFFSET;
private static final long SEGMENTS_OFFSET;

static {
    int ss, ts;
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class tc = HashEntry[].class;
        Class sc = Segment[].class;
        // 数组第一个元素的偏移量,就是首地址
        TBASE = UNSAFE.arrayBaseOffset(tc);
        SBASE = UNSAFE.arrayBaseOffset(sc);
        // 数组中每个元素的大小,对象(指针)大小
        ts = UNSAFE.arrayIndexScale(tc);
        ss = UNSAFE.arrayIndexScale(sc);
        // 获得属性在对象中的偏移地址
        HASHSEED_OFFSET = UNSAFE.objectFieldOffset(
            ConcurrentHashMap.class.getDeclaredField("hashSeed"));
        SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset(
            ConcurrentHashMap.class.getDeclaredField("segmentShift"));
        SEGMASK_OFFSET = UNSAFE.objectFieldOffset(
            ConcurrentHashMap.class.getDeclaredField("segmentMask"));
        SEGMENTS_OFFSET = UNSAFE.objectFieldOffset(
            ConcurrentHashMap.class.getDeclaredField("segments"));
    } catch (Exception e) {
        throw new Error(e);
    }
    // 元素长度必须是2的指数幂,数组元素是对象(指针)
    // (ss & (ss-1)) != 0 保证二进制位中只有一个1,其余全为0
    if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0)
        throw new Error("data type scale not a power of two");
    
    // numberOfLeadingZeros(ss),从高位到低位第一个1的0的个数
    // 若参数为:0000 0100,那结果为:5
    SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
    TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
}

rehash(HashEntry<K,V> node)

// 扩容,扩大数组容量并将旧数组中的结点转移到新数组中
// 本方法在锁中进行,所以线程安全,避免死循环
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    // 在原有容量条件限制下,扩容1倍
    int newCapacity = oldCapacity << 1;
    // 新的扩容阈值
    threshold = (int)(newCapacity * loadFactor);
    // 创建新的数组
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    int sizeMask = newCapacity - 1;
    // 遍历数组,每一条链都进行转移
    for (int i = 0; i < oldCapacity ; i++) {
   		// 得到原数组下标的首结点
        HashEntry<K,V> e = oldTable[i];
        // 如果链表不为空,则转移
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 重新定位在新数组中的下标
            int idx = e.hash & sizeMask;
            // 链表只有一个结点
            if (next == null)
                newTable[idx] = e;
            // 如果有多个结点,则将所有的结点都进行转移
            else {
           		// 保存上一个结点
                HashEntry<K,V> lastRun = e;
                // 保存上一个结点的下标
                int lastIdx = idx;
                // 遍历链表        
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
					// 每个结点都需要重定位
                    int k = last.hash & sizeMask;
                    // 这里仅对下标做了比较,意味着虽然有多个结点但lastRun一直没变
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                newTable[lastIdx] = lastRun;
                // 上面对下标做了标记,所以这里仅仅转移了一条链
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    // 重定位
                    int k = h & sizeMask;
                    // 前插法
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 将新结点加到链表上
    int nodeIndex = node.hash & sizeMask;
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

由于rehash()方法的算法奇特,这里需要着重讲解。下面以一条链表为例,做出演示。

原数组某下标中的一条链表,x、y代表重定位后的下标
【java】ConcurrentHashMap1.7源码详解
下面来追踪 lastRunlastIdx 的变化过程

lastRun = Entry0    lastIdx = x
lastRun = Entry1    lastIdx = y
lastRun = Entry2    lastIdx = x
lastRun = Entry4    lastIdx = x

newTable[lastIdx] = lastRun
作用:将最后两个下标为y的结点放到新数组的下标y中

遍历Entry4之前的结点,将它们依次放入新数组中的对应下标中
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
    V v = p.value;
    int h = p.hash;
    // 重定位
    int k = h & sizeMask;
    // 前插法
    HashEntry<K,V> n = newTable[k];
    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}

从上面的演示结果来看,如果一条链表中后面的结点重定位后的下标相同,那就可以将后面下标相同的一条链直接转移,避免了一个结点一个结点的转移;当整条链重定位后都相同时,性能最好。

remove(Object key, int hash, Object value)

// 链表删除的基本操作
final V remove(Object key, int hash, Object value) {
    if (!tryLock())
   		// 加锁
        scanAndLock(key, hash);
    V oldValue = null;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        // 找到目标下标的首结点
        HashEntry<K,V> e = entryAt(tab, index);
        // 前一结点
        HashEntry<K,V> pred = null;
        while (e != null) {
            K k;
            HashEntry<K,V> next = e.next;
            if ((k = e.key) == key ||
                (e.hash == hash && key.equals(k))) {
                V v = e.value;
                // 如果值为null或值相等
                if (value == null || value == v || value.equals(v)) {
               		// 当前结点为头结点
                    if (pred == null)
                        setEntryAt(tab, index, next);                     
                    else
                        pred.setNext(next);
                    ++modCount;
                    --count;
                    oldValue = v;
                }
                break;
            }
            pred = e;
            e = next;
        }
    } finally {
        unlock();
    }
    return oldValue;
}

HashEntry类

与HashMap的Entry作用一致,只是使用了Unsafe来提高读写速度

static final class HashEntry<K,V> {
    final int hash;
    final K key;
    // 值和结点指向可以改变
    volatile V value;
    volatile HashEntry<K,V> next;

    HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.value = value;
        this.next = next;
    }

	// 在本对象的next成员的偏移地址处放入n
    final void setNext(HashEntry<K,V> n) {
        UNSAFE.putOrderedObject(this, nextOffset, n);
    }

    static final sun.misc.Unsafe UNSAFE;
    // next成员的偏移地址
    static final long nextOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = HashEntry.class;
            // 得到next成员在对象中的偏移量,用来进行链接操作
            // 利用UnSafe类可以通过直接操作内存来提高速度
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

属性

static final int DEFAULT_INITIAL_CAPACITY = 16;
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// 此表的默认并发级别
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
static final int MAXIMUM_CAPACITY = 1 << 30;
// 每个段表的最小容量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
// 最大段数,略显保守
static final int MAX_SEGMENTS = 1 << 16;
// 加锁前最大尝试次数,与modCount相配合
static final int RETRIES_BEFORE_LOCK = 2;

// 段的掩码值,通过它进行与运算来定位段下标
final int segmentMask;
// 用来确定哈希值中参与段定位的高位的位数
final int segmentShift;
// 段,每个段都是一个专门的哈希表
final Segment<K,V>[] segments;

构造方法

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // concurrencyLevel:并发水平,即,Segment分段数,不能超过最大段数
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // segmentShift:hash值中段所占的位数
    // 若concurrencyLevel = 16,则segmentShift = 27
    this.segmentShift = 32 - sshift;
    // 段的数量-1,用来通过hash值算目标段的下标
    this.segmentMask = ssize - 1;
    // 数组大小不能超过最大容量
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    // c : 每个段需要的哈希表的大小
    int c = initialCapacity / ssize;
    // 如果initialCapacity / ssize为浮点数,需要向上扩展
    if (c * ssize < initialCapacity)
        ++c;
    // 每个段表的数组最小容量为2
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    // 扩大得到满足要求的cap
    while (cap < c)
        cap <<= 1;

	// 虽然创建了段的数组,但是只实例化了第一个元素,类似于懒加载
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    // 将s0放到ss数组的第一个元素位置
    UNSAFE.putOrderedObject(ss, SBASE, s0);
    this.segments = ss;
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap(int initialCapacity) {
    this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap() {
    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

// 以默认的加载因子和同步水平进行初始化
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
		// threshold = capacity * loadFactor
		// -> capacity = m.size / loadFactor
    this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
                  DEFAULT_INITIAL_CAPACITY),
         DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    putAll(m);
}

核心方法

put(K key, V value)

// put()方法,若存在原始的键值对,则替换值,不存在直接添加
// key、value都不能为null
@SuppressWarnings("unchecked")
public V put(K key, V value) {
    Segment<K,V> s;
    // 值不能为null
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    // 通过散列值的高n位来确定段下标
    int j = (hash >>> segmentShift) & segmentMask;
    // 根据物理地址来取得目标段
	// SSHIFT :Segment数组每个元素的偏移量,SBASE :Segment数组第一个元素的偏移量
	// 由上面两个偏移量可以算出目标下标元素的物理地址
    if ((s = (Segment<K,V>)UNSAFE.getObject          
         (segments, (j << SSHIFT) + SBASE)) == null)
        // 确认该段存在,若不存在,该方法会进行初始化
        s = ensureSegment(j);
    // false : boolean onlyIfAbsent
    return s.put(key, hash, value, false);
}

// 若存在原始的键值对,则不替换值,不存在直接添加
@SuppressWarnings("unchecked")
public V putIfAbsent(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject
         (segments, (j << SSHIFT) + SBASE)) == null)
        s = ensureSegment(j);
    // true : boolean onlyIfAbsent
    return s.put(key, hash, value, true);
}

// 将目标map的所有映射添加到本对象中
public void putAll(Map<? extends K, ? extends V> m) {
    for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
        put(e.getKey(), e.getValue());
}

ensureSegment(int k)

// 确认段,若没有该段,则创建
private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    // 获得物理地址,SSHIFT:Segment元素的偏移量,SBASE:ss的第一个元素的地址
    long u = (k << SSHIFT) + SBASE;
    Segment<K,V> seg;
    // 通过物理地址得到的对象为空时,条件成立
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
   		// 在构造函数中对segments数组的第一个元素进行了初始化
   		// 因此第一个元素可以作为模板对其它的空段进行初始化
        Segment<K,V> proto = ss[0];
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        // 再次检查该段是否为空,以免别的线程已经创建好了
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) {
            // 创建段对象
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                // cas操作,当目标段为空时,才进行替换,否则不替换
                // 这个判断可以防止两个线程同时到这里出现替换两次的情况
                // 若一个线程完成了替换,则另一个线程在下一次get时条件不成立退出循环
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

get(Object key)

public V get(Object key) {
    Segment<K,V> s;
    HashEntry<K,V>[] tab;
    // 得到key的散列值
    int h = hash(key);
    // h >>> segmentShift : 绝对右移,相当于hash值的高n位参加了段的定位
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 如果该下标的段不为空且段中的数组不为空则条件成立
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        // UNSAFE.getObjectVolatile()得到该数组物理地址下得对象
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
        				 // (tab.length - 1) & h : 使用全部散列值进行元素的下标定位
        				 // TSHIFT :哈希表每个元素的偏移量,TBASE :哈希表第一个元素的偏移量
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

size()

public int size() {
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // 如果大小溢出32位,则为真
    long sum;         // modCounts之和
    long last = 0L;   // 之前的总和
    int retries = -1; // 第一次迭代不是重试
    try {
        for (;;) {
       		// 加锁前重试次数,RETRIES_BEFORE_LOCK = 2,重试3次
       		// 当不加锁得不到理想的结果时,强制加锁进行size的统计
            if (retries++ == RETRIES_BEFORE_LOCK) {
           		// 对每一个段都进行加锁
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock();
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    // 溢出
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            // 重试,一直到统计期间没有别的线程对哈希表进行操作
            // 当遍历两次的sum相同,说明没有别的线程进行干涉,可以返回值
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
   		// 只有加锁了才有必要进行解锁
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}

remove(Object key)

public V remove(Object key) {
    int hash = hash(key);
    // 根据散列值取得目标段
    Segment<K,V> s = segmentForHash(hash);
    return s == null ? null : s.remove(key, hash, null);
}

public boolean remove(Object key, Object value) {
    int hash = hash(key);
    Segment<K,V> s;
    return value != null && (s = segmentForHash(hash)) != null &&
        s.remove(key, hash, value) != null;
}

总结

本篇博文已将ConcurrentHashMap1.7的关键内部类和方法进行了讲解,通过阅读它的源码,我们发现JDK作者在保证多线程安全性的相关操作上,十分灵活,我们可以从中学习到一些思想,运用到日常的并发编程中。

相关标签: Java底层