欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ConcurrentHashMap源码分析

程序员文章站 2024-03-14 20:20:35
...

​1、前言

我们知道HashMap是一个线程不安全的集合类,在多线程中我们是不能使用的,HashTable虽然是线程安全的,但是通过同步方法来实现的,那在JDK中是否存在性能好的Map集合呢?答案是存在的,这便是我们今天所要讲解的ConcurrentHashMap。本文参考的是JAVA 8

2、数据结构

ConcurrentHashMap的底层使用的数据结构同HashMap一样,均是数组+链表+红黑树,对HashMap不熟悉的可查看我之前介绍HashMap的文章,其结构如下所示:
ConcurrentHashMap源码分析
Node节点的定义如下:

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }
    ......
}

3、属性变量

在ConcurrentHashMap中的字段要比HashMap的字段多好多,这里一些常量就列出了,只给出几个成员变量,如下

// 数组
transient volatile Node<K,V>[] table;
// 在进行扩容是会进行初始化 将table上的数据迁移至该数组,然后将其复制给table
private transient volatile Node<K,V>[] nextTable;
// 当不存在冲突时 用来记录元素数量
private transient volatile long baseCount;
// 用于控制数组的初始化和扩容,不同情况有不同的值
// 0 默认值
// -1 代表有线程在进行初始化
// >0 数组为null时  代表数组的初始化长度
// >0 数组不为null时  代表扩容的临界值  数组长度*0.75
// <0 高十六位代表扩容标记  低十六位表示扩容的线程
private transient volatile int sizeCtl;
// 扩容时使用 代表处理数据的下标
private transient volatile int transferIndex;
// 判断CounterCells数组是否出入初始化或扩容状态
private transient volatile int cellsBusy;
// 在并发的情况下  修改数组元素的值来累加元素的个数
private transient volatile CounterCell[] counterCells;

这里需要注意的是sizeCtlbaseCountcounterCells三个属性,在后面的源码中我们会讲解到这三个字段的用处,这里简单的说一下,sizeCtl用来控制数组的初始化和扩容,baseCount和counterCells用来记录元素的个数,链表的总数等于baseCount + counterCells中元素值的和。

4、构造方法

在ConcurrentHashMap中提供了五种构造方法,如下

// 空参,采用默认大小16
public ConcurrentHashMap() {
}
// 指定数组长度  调用tableSizeFor计算出数组的初始长度 传递的为 1.5initialCapacity + 1
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    // 将数组的初始化值存入sizeCtl中
    this.sizeCtl = cap;
}
// 通过Map创建
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;
    putAll(m);
}
// 传入数组长度和负载因子
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}
// 传入数组长度、负载因子和加权因子
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    // 数组长度传入sizeCtrl中
    this.sizeCtl = cap;
}

通过构造方法我们知道在传递了数组长度的构造方法中,sizeCtl存放的是数组的初始化长度,该变量的其他值我们在后面会一个个的讲解到。

5、插入数据 put方法

在这部分我们开始学习ConcurrentHashMap插入数据的逻辑,我们会了解到数组的初始化、插入、扩容及如何保证的线程安全,其源码如下:

public V put(K key, V value) {
    return putVal(key, value, false);
}/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // key和value不能为null  否则抛出NullPointException
    if (key == null || value == null) throw new NullPointerException();
    // 通过key的哈希值计算出node节点的hash属性值
    int hash = spread(key.hashCode());
    int binCount = 0;
    // 自旋
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            // 初始化数组
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 元素在数组对应的位置上为null 使用cas操作将节点插入该位置
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 数组对应的位置不为null 该节点的hash值为-1
        // 在transfer方法中进行设置
        else if ((fh = f.hash) == MOVED)
            // 帮助扩容
            tab = helpTransfer(tab, f);
        else {
            // 解决hash冲突
            V oldVal = null;
            // 同步代码块,锁住的是数组指定位置上的Node对象,不影响其他线程操作别的位置上的数据
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        // 用于记录插入该数据的链表的长度  最后判断是否需要转换为红黑树
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 链表上存在对应的key  覆盖value值
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            // 不存在key 在链表尾部插入新节点
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // key不存在新增加节点时进入该逻辑
            if (binCount != 0) {
                // 链表长度大于等于8
                if (binCount >= TREEIFY_THRESHOLD)
                    // 转为红黑树或者扩容
                    // 数组的长度小于64进行扩容
                    treeifyBin(tab, i);
                // 若key存在 返回原值
                if (oldVal != null)
                    return oldVal;
                // 退出自旋
                break;
            }
        }
    }
    // 元素数据增加 扩容的逻辑
    addCount(1L, binCount);
    return null;
}

通过源码我们发现在put方法中ConcurrentHashMap是通过自旋加CAS及同步代码块来保证的线程安全,代码块锁住的是数组上的某个元素,这样就允许了对数组上其他元素的操作不被阻塞,可以提高并发。其整个处理流程如下图所示
ConcurrentHashMap源码分析
在了解了put方法的处理过程后,我们再来详细的介绍下几个方法

  • initTable 数组初始化
  • helpTransfer 帮助扩容
  • treeifyBin 转换为红黑树
  • addCount 维护集合长度和扩容

5.1 数组的初始化

数组的初始化是在initTable方法中处理的,该方法的源码如下:

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    // 自旋
    while ((tab = table) == null || tab.length == 0) {
        // sizeCtl == -1代表有其他线程在进行初始化
        if ((sc = sizeCtl) < 0)
            // 释放cpu资源
            Thread.yield(); // lost initialization race; just spin
        // 通过CAS操作 将sizeCtl设置为-1
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    // 创建数组
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    // 数组长度的四分之三
                    sc = n - (n >>> 2);
                }
            } finally {
                // 初始化完成后  将sizeCtl设置为数组长度的四分之三
                sizeCtl = sc;
            }
            // 退出自旋
            break;
        }
    }
    // 返回数组
    return tab;
}

在该方法中可以看到使用的自旋加CAS操作保证的线程安全,在初始化时首先通过CAS操作将sizeCtl的值设置为-1,此时再有其他线程过来进行初始化时会直接释放掉CPU资源,在这里我们有看到了sizeCtl的其他意义

  • sizeCtl == 0 代表未指定数组大小 初始化长度为16的数组
  • sizeCtl == -1 代表有线程在进行初始化
  • sizeCtl > 0 且 数组为null 代表初始化数组的长度
  • 数组不为null sizeCtl > 0 需要扩容的值,为数组长度的四分之三

5.2、addCount方法

通过上面的流程图我们会发现,当插入新的节点时会调用该方法,该方法是用来记录集合中的数据个数并且处理是否需要扩容的逻辑,其源码如下:

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    // counterCells为null且CAS修改baseCount的值成功,代表不存在线程竞争
    // 此时用baseCount累加的方式来记录元素的数量
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        // counterCells是否为空
        // counterCells的长度是否小于1
        // 随机从counterCells中取一个元素  判断是否为null
        // CAS操作取出的元素的值 判断是否成功
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            // 该方法用来处理CounterCells的初始化或扩容
            fullAddCount(x, uncontended);
            return;
        }
        // check代表链表插入数据之前的长度
        if (check <= 1)
            return;
        // 统计元素个数
        s = sumCount();
    }
    // 链表原来的长度不为0  即原来在数据的对应位置存在数据
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        //  自旋  判断数据个数是否达到扩容值(数组长度*0.75)
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            // 获取扩容戳 算法为
            // Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1))
            // Integer.numberOfLeadingZeros获取n的高位连续的非0位的个数
            // RESIZE_STAMP_BITS = 16
            int rs = resizeStamp(n);
            if (sc < 0) {
                // 正在进行扩容
                // sc左移16位是否等于得到的扩容标记
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    // 扩容的逻辑
                    transfer(tab, nt);
            }
            // 第一次进行扩容时将sizeCtl设置为rs右移16位加2
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

到这里我们看到了CounterCell数组,在前面的属性字段介绍时提到过该数组是用来记录元素的个数的,在大多数的集合中都维护了一个记录集合大小的字段,但在ConcurrentHashMap中使用的是一个baseCount和一个数组来记录元素的个数,这是为什么呢?这是因为ConcurrentHashMap是一个多线程安全的集合,采用这种方式,可以减少在高并发的情况下维护数据个数而带来的性能消耗,其源码如下:

@sun.misc.Contended static final class CounterCell {
    // 记录元素的数量
    volatile long value;
    CounterCell(long x) { value = x; }
}
// 获取元素的数量  遍历counterCell获取累加其value值加上baseCount值即为元素的总数
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    // 遍历counterCells在baseCount的基础上累加
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    // 返回数据的总数
    return sum;
}

在扩容那里我们再次看到了sizeCtl的身影,该字段在这部分存储的数据如下:

  • 高十六位 扩容标记
  • 低十六位 进行扩容的线程数量

5.2.1、扩容逻辑 transfer方法

transfer方法是用来进行扩容操作的逻辑,也是挺复杂的一个逻辑,源码如下:

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    // 第一个进行扩容的线程传递的该参数为null
    if (nextTab == null) {            // initiating
        // 初始化 nextTable 的逻辑
        try {
            @SuppressWarnings("unchecked")
            // 创建一个长度为数组两倍的Node数组
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        // 赋值给nextTable变量
        nextTable = nextTab;
        // 设置transferIndex的值
        transferIndex = n;
    }
    int nextn = nextTab.length;
    // 创建一个ForwardingNode hash值为-1 用来表示一个需要移动的节点
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        // 为当前线程分配操作的区域
        while (advance) {
            int nextIndex, nextBound;
            // --i获取下个数组上的位置
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // CAS操作 修改transferIndex的值
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        // 退出扩容的逻辑
        // 在上个循环中会为i进行赋值  初始为15
        // 小于0代表节点处理完成
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 扩容完成
            if (finishing) {
                // nextTable设置为null
                nextTable = null;
                // 将nextTable赋值给数组
                table = nextTab;
                // 修改sizeCtl 是新数组长度的0.75
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // 线程执行完扩容后会通过CAS操作将sizeCtl的值减1
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 判断是否为最后一个在执行扩容的线程线程
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    // 不是最后一个扩容,直接退出
                    return;
                // 是最后一个扩容的 退出
                finishing = advance = true;
                // 赋值为原数组长度  再进行一次检查
                i = n; // recheck before commit
            }
        }
        // 数据迁移的逻辑  将table上的数据迁移值nextTab上
        // 节点不存在数据 直接添加占位的节点
        else if ((f = tabAt(tab, i)) == null)
            // 将fwd放入该位置进行占位   表示处理完成
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            // 表示已经处理过 直接略过
            advance = true; // already processed
        else {
            // 数据迁移  锁对象是当前节点
            // 是将数据迁入nextTab中
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    // 两个链表,分别存储 在新链表位置不变和改变的节点
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        // 处理完成后会将该位置设置为fwd节点 进行占位
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

通过ConcurrentHashMap的扩容代码我们能够看出,扩容时是新建一个长度为原数组长度两倍的数组,然后遍历原数组,挨个将节点移至新创建的数组,处理完成后用新数组替换原数组。逻辑比较复杂,需要静下心来查看这段代码。

5.3、帮助扩容 helpTransfer

通过5.2部分的分析,我们了解了ConcurrentHashMap的扩容过程,也知道了在什么时候会将节点的hash设置为MOVED,接下来我们看一下插入时遇到这种情况的处理逻辑,helpTransfer方法的逻辑如下:

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    // tab != null 代表数组已经初始化
    // ForwardingNode 代表在扩容时已经对该节点处理完成且扩容尚未结束
    // f.nextTable != null 代表扩容尚未完成
    // rs 为扩容标记  算法已在前面讲解过
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        int rs = resizeStamp(tab.length);
        // 判断是否出入同一次扩容中
        // 扩容是否完成
        // 扩容线程是否达到最大值
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            // CAS操作将sizeCtl的值加1
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                // 调用扩容逻辑
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

至此,ConcurrentHashMap的put方法逻辑我们已经学习完了,虽然数据结构同HashMap一直,但为了保证集合的多线程安全,处理逻辑要比HashMap复杂许多,需要大家静下心来慢慢看及思考。在学习了put方法的逻辑后,我们已经将最复杂的一个逻辑学完了,接下来的内容就比较轻松了。

更多文章可以关注我的公众号,会更新Java相关的技术文章
ConcurrentHashMap源码分析