欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

高并发下ConcurrentHashMap到底是怎么实现线程安全的?

程序员文章站 2022-05-04 09:21:54
前言这几天一直在看Java并发相关的知识,又学习了很多新的知识。看见大佬们一个个更博速度如此之快,可想知识渊博如滔滔江水。这一周就学习一些并发知识,搭建了一个ftp服务器,不知道小可爱们这周学习了那些知识呢?微信公众号【JustKeepCoding】抛出正题:ConcurrentHashMap是如何实现线程安全,这里面又隐含那些玄机呢?上一节讲述了Hashtable这个结构虽然线程安全,但是效率不高,就是因为他的每个操作都使用了synchronized同步块。因为synchronized同步块的...

前言


这几天一直在看Java并发相关的知识,又学习了很多新的知识。看见大佬们一个个更博速度如此之快,可想知识渊博如滔滔江水。这一周就学习一些并发知识,搭建了一个ftp服务器,不知道小可爱们这周学习了那些知识呢?微信公众号【JustKeepCoding】

抛出正题:ConcurrentHashMap是如何实现线程安全,这里面又隐含那些玄机呢?

上一节讲述了Hashtable这个结构虽然线程安全,但是效率不高,就是因为他的每个操作都使用了synchronized同步块。因为synchronized同步块的线程如果拿到了锁,就会阻塞其他线程,阻塞线程会使操作系统从用户态转为阻塞态,从而加大性能消耗。

所以在高并发的场景下,使用ConcurrentHashMap是最合适不过的了。那么问题来了,为什么都喜欢使用ConcurrentHashMap这个结构,它到底是怎么实现线程安全和并发度的呢

ConcurrentHashMap的底层到底是怎么实现的?

ConcurrentHashMap是面试必问的知识点,里面涵盖的知识也比较多,为此我们还是通过源码分析再切入到Doug Lea(作者)使用的其他技术。

JDK 1.7的ConcurrentHashMap用了哪些技术?

1.7函数构造和变量等

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable {
static final int DEFAULT_INITIAL_CAPACITY = 16;//默认容量,segments的个数
static final float DEFAULT_LOAD_FACTOR = 0.75f;//负载因子
static final int DEFAULT_CONCURRENCY_LEVEL = 16;//
static final int MAXIMUM_CAPACITY = 1 << 30;//最大容量,还是左移30位
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;//最小的segment容量为2
final Segment<K,V>[] segments;//重要组成
        }

采用分段锁技术,将ConcurrentHashMap的内部分成多个Segment段,每个Segment可以

Segment组成

static final class Segment<K,V> extends ReentrantLock implements Serializable {
static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
transient volatile HashEntry<K,V>[] table;//类似于HashMap中的Entry
transient int modCount;//修改次数
transient int threshold;//阈值
final float loadFactor;//加载因子
}

segment内部是一个HashEntry<K,V>[] table,跟JDK1.7版本的HashMap中的Entry类似,都是数组加链表的形式。但是这里多了一个volatile关键字修饰,这样做到底有什么好处呢?

volatile关键字

要弄清楚volatile关键字的来龙去脉需要具备Java内存模型、CPU缓存模型等知识。那么首先从并发编程的三个重要特性讲起:原子性、有序性和可见性。

  • 原子性
    原子性就是指在一次操作或多次操作中,要么所有的操作同时成功,那么同时失败。这个在上一篇MySQL的基础讲到过,比如我向淘宝买牙刷,我支出20,那么淘宝就要收到20。要么我转不出来,淘宝也收不到转。要么我支出成功,淘宝也收到成功。不可能存在,我支出但淘宝收不到的操作。
  • 有序性
    有序性就是指代码执行过程中的先后顺序,由于Java在编译期间的优化,导致代码执行顺序未必是顺序执行,因为JVM可能会指令重排序。
  • 可见性
    可见性是指当一个线程对一个变量进行修改,另一个线程可以立即看到修改的最新值。这就类似于MySQL的读取未提交的事务隔离级别,防止读取脏数据。

那么JVM是如何来保证可见性的呢?

  • 通过volatile关键字,对于共享资源的读操作会直接在主内存中进行,对于共享数据的写操作当然是先修改工作内存,但是修改后会主动刷新到主内存中。
  • 通过synchronized关键字,synchronized只能保证同一时刻只有一个线程获得锁,然后执行同步方法,并且还会保证在锁释放之前,会将对变量的修改会主动刷新到主内存。
  • 通过JUC提供的显式锁Lock也能保证,Lock的lock方法能够保证在同一时刻只有一个线程获得锁然后执行同步方法,并保证在锁释放之前(Lock的unlock)将修改的变量刷新到主内存。

我们假设一个场景,当一个读线程自身有个Init_value的值,每次都需要跟当前值比较,如果值被修改了就打印输出,没修改就不输出。当一个修改线程,每次都会修改当前的值,修改会会进行sleep睡眠10毫秒,用于等待读线程输出,即使其交替输出。(因为代码太长就不贴了)

你猜结果是怎样的?

结果是修改线程一直输出,读线程在控制台没有任何打印。这是因为,共享数据开始是缓存到CPU的cache快里,即读线程的本地内存中。即使修改线程修改数据,读线程也是从本地取,而本地内存值没有被修改,所以不会输出。那么使用volatile关键字,就可以强制读线程从内存刷新到本地线程内存(CPU的cache中)。

volatile关键字语义

  1. 保证可见性
  2. 保证顺序性
  3. 不保证原子性

所以回到HashEntry这里,采用volatile关键字就是为了并发环境下保证HashEntry的有序性可见性

高并发下ConcurrentHashMap到底是怎么实现线程安全的?

ReentrantLock 的使用与理解

从源码可以看到Segment继承ReentrantLock,ReentrantLock 又是一个很重要的知识点,我们都知道可以用互斥同步来保证线程的安全性,而最基本的同步互斥手段就是synchronizedjava.util.concurrent(J.U.C)包下的ReentrantLock 来实现同步。在基本用法上,ReentrantLock 和synchronized很相似,但在JDK1.5之前ReentrantLock 的功能是比较丰富的,在JDK1.8中运用广泛。

JDK 1.8的ConcurrentHashMap又采用了哪些技术呢?

悲观锁策略(阻塞同步)

互斥同步最主要的问题就是线程阻塞和唤醒带来的性能问题,因此这种同步也称阻塞同步。从处理方式来讲,互斥同步是一种悲观的并发策略,总是认为只要不去做正确的的同步措施(列入加锁),那肯定会出现问题,无论共享数据是否出现竞争,都需要加锁。(需要线程挂起

乐观锁策略(非阻塞同步)

基于冲突检测的乐观并发策略就是先进行操作,如果没有其他线程争用共享资源,那么操作就成功了。如果共享数据有争用,出现冲突,那就再采取其他的不就措施。(不需要线程挂起

高并发下ConcurrentHashMap到底是怎么实现线程安全的?

put操作

public V put(K key, V value) {//定位到segment
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();//如果为NULL则会出现异常
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          
         (segments, (j << SSHIFT) + SBASE)) == null) 
        s = ensureSegment(j);
    return s.put(key, hash, value, false);//再从HashEntry中插入
}
/**
**segment数组中的put操作,类似于HashMap中的put
**/
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);//自旋锁策略
            V oldValue;//保存旧值
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;//计算segment[i]中的table的具体位置,这些就类似hashmap的操作了
                HashEntry<K,V> first = entryAt(tab, index);//寻找table其中一个头结点
                for (HashEntry<K,V> e = first;;) {//遍历链表
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {//存在值相同的结点
                            oldValue = e.value;//直接覆盖
                            if (!onlyIfAbsent) {//状态存在
                                e.value = value;
                                ++modCount;//修改次数加一
                            }
                            break;
                        }
                        e = e.next;//更换下一个节点
                    }
                    else {
                        if (node != null)//证明存在值
                            node.setNext(first);//头插法
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);//空节点则直接复制进去
                        int c = count + 1;//
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

自旋锁

/**
**扫描HashEntry节点去获取锁(自旋锁)
**/
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);//获取插入值当前链表头结点
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;//新的节点
            int retries = -1; // 尝试获取锁的次数,初始为-1
            while (!tryLock()) {//没有获取到锁的话
                HashEntry<K,V> f; // to recheck first below
                if (retries < 0) {//进入第一种情况
                    if (e == null) {//头结点为空,创建新的节点
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);//为了少做一点事,以后直接使用
                        retries = 0;
                    }
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {//第二种情况,超过自旋次数加以阻塞
                    lock();//获取不到锁,则阻塞等待
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                         //因为当前没有获取锁,可能头结点被更另一个线程更改,需要判断是否是之前的头结点。如果不是则重新循环判断
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

tryLock()会尝试获取锁,获取不到就返回false,获取的到就返回true,做其他事,并且不会阻塞。
Lock()方法如果获取不到锁就会阻塞。

流程图

高并发下ConcurrentHashMap到底是怎么实现线程安全的?

扩容操作

/**
**扩容操作,不需要rehash,直接使用之前的hash
**/
private void rehash(HashEntry<K,V> node) {
            
            HashEntry<K,V>[] oldTable = table;//旧的hash表
            int oldCapacity = oldTable.length;//之前的hash表长
            int newCapacity = oldCapacity << 1;//扩容两倍
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];//扩容后的hash表
            int sizeMask = newCapacity - 1;//为了计算数组下标
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        HashEntry<K,V> lastRun = e;//每次记录最后一串index相同的节点,重新赋值
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {//遍历节点,记录最后index值相同的一串节点
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {//最后的节点的index不跟数组的index值相同时
                                lastIdx = k;//更新到最后节点的index
                                lastRun = last;
                            }
                        }
                        newTable[lastIdx] = lastRun;
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {//将最后一串index值相同的重新移到新的数组下标里
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            int nodeIndex = node.hash & sizeMask; // 对新增的节点进行获取index
            node.setNext(newTable[nodeIndex]);//头插法
            newTable[nodeIndex] = node;
            table = newTable;
        }
高并发下ConcurrentHashMap到底是怎么实现线程安全的?

get操作

get 就比较简单啦,它是将 Key 通过Hash 之后定位到具体的 Segment ,再通过一次 Hash 定位到具体的元素上。
即使多个线程对HashEntry修改,有volatile修饰的HashEntry,每次都会get到最新值

结论

为什么线程安全呢,就是因为在1.7版本它的每个方法都会加锁,put采用自旋锁不会使线程阻塞(操作状态切换会消耗性能),从而性能比hashtable好很多。而且它的每个HashEntry[i]都是被volatile修饰,可以保证线程操作的可见性,即每次不会脏读,即使其他线程修改了值,都会强制刷新到本地内存
那为什么并发度高呢?
因为是对单个segment[i]进行加锁,意思就是segment如果有16个,那么可以同时有16个线程修改而且还是线程安全的。相对于Hashtable的锁,是锁定整个Hashtable对象,那么多个线程访问就需要被阻塞。

JDK1.8的ConcurrentHashMap

1.8版本取消了1.7中的分段锁策略,使用了CAS + synchronized来保证安全性。这跟1.8版本的HashMap也很相似,引入了红黑树,在链表节点数大于8的时候会转换成红黑树。

1.8源码构造

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {
    private static final int MAXIMUM_CAPACITY = 1 << 30;//最大容量
    private static final int DEFAULT_CAPACITY = 16;//默认容量
    private static final float LOAD_FACTOR = 0.75f;//负载因子
    static final int TREEIFY_THRESHOLD = 8;//链表节点数大于8 转红黑树
    static final int UNTREEIFY_THRESHOLD = 6;//删除红黑树节点小于6转链表
    transient volatile Node<K,V>[] table;//节点桶,volatile修饰
    private transient volatile Node<K,V>[] nextTable;//下一张表
}

1.8的put方法

final V putVal(K key, V value, boolean onlyIfAbsent) {//put中调用putVal
        if (key == null || value == null) throw new NullPointerException();//插入的值不能为空
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {//遍历表节点
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();//初始化表
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//获取table[i]的头结点是空
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))//CAS操作,写入node
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)//当此时另一个线程正在扩容
                tab = helpTransfer(tab, f);//当前线程帮助扩容,增加效率
            else {//进入table节点,判断当前节点是链表节点还是红黑树节点
                V oldVal = null;
                synchronized (f) {//对表头结点获取锁资源
                    if (tabAt(tab, i) == f) {//重新判断一下是否被修改,被修改返回上级重新获取
                        if (fh >= 0) {//链表节点
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {//遍历链表节点
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {//相同直接覆盖
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {//尾插法 插入节点
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {//红黑树节点 
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)//链表节点大于8
                        treeifyBin(tab, i);//转红黑树
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);//统计节点数
        return null;
    }

TreeBin保证了即使红黑树自平衡,也不会释放锁资源。因为如果使用HashMap的红黑树结构的话,当插入节点会自平衡使头结点更换,而头结点是获取到锁了,如果头结点更换,其他线程此时就会获取到该节点的锁,造成插入的异常。

高并发下ConcurrentHashMap到底是怎么实现线程安全的?

put流程

  • 1.先判断key和value是否为空,为空抛出异常
  • 2.计算出hash值,遍历表结构,进入下面三种情况
    • 是否需要初始化表,空则初始化,再进入2
    • 获取table[i]的头节点,空则用CAS插入(key,value),进入2
    • 看是否当前有线程正在扩容,有则帮忙扩容helpTransfer,进入2
    • synchronized获锁资源,判断是链表节点还是红黑树节点,插入
  • 3.统计节点数
  • 返回空

比较并替换(Compare-and-swap,简称为CAS)

乐观锁策略需要CAS指令来保证进行,就是实现乐观锁的一种方式,CAS操作是由sun.misc.Unsafe类中的compareAndSwapInt()compareAndSwapLong()等几个方法包装提供,是一种轻量级锁,J.U.C中很多工具类就是基于CAS实现的,比如CompareAndSetgetAndIncrement()等都使用了Unsafe类的CAS操作。
CAS操作流程就是线程在读取数据时不进行加锁,在准备写回数据时,比较原值是否修改,若未被其他线程修改则写回,若已被修改,则重新执行读取流程。

1.8 get操作

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {//hash桶存在,且当前的桶不为空
            if ((eh = e.hash) == h) {//如果是红黑树,按照红黑树获取值
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)//按照链表获取
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

总结
1.8在1.7上做的改动还是挺大的,ConcurrentHashMap这个数据结构的知识还是挺多的,比如CAS的操作和理解、1.8中synchronized关键字的优化、1.8中的扩容方法插入方法红黑树自平衡等等。都是Java程序员面试的必备知识点,这节又写了11000多字,挺辛苦的,所以能够给我点个赞吗?

最后抛出两个问题

  • 1.8中的ConcurrentHashMap为什么没有使用Lastrun(还记得1.7中扩容的Lastrun吗)
  • unsafe类你属于吗?Java程序中最牛逼的类,为什么最牛逼呢?

你的点赞就是我最大的努力!关注我的公众号【JustKeepCoding】,每周两篇为你推送不一样的知识!

高并发下ConcurrentHashMap到底是怎么实现线程安全的?

本文地址:https://blog.csdn.net/liyuanbo1997/article/details/107580664

相关标签: Java集合 java