欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ConcurrentHashMap你都讲不明白怎么拿offer

程序员文章站 2022-04-30 09:25:16
...

一、老朋友HashMap

  • HashMap是一个Entry对象的数组。数组中的每一个Entry元素,又是一个链表的头节点
  • 并不是线程安全的。在高并发环境下做插入容易出现环形链表
  • 处理策略使用:HashTable、Collections.synchronizedMap,但是都存在性能问题,无论是读操作还是写操作,都会给整个集合加锁,导致同一时间的其他操作阻塞

二、ConcurrentHashMap

2.1 总结:

  • JDK1.8 底层是散列表+红黑树

    JDK1.7的底层是Segments+HashEntry数组

    Segment继承了ReentrantLock,每个片段都有一个锁,叫做锁分段

  • 支持高并发的访问和更新,是线程安全的

  • 检索操作不用加锁,get方法是非阻塞的

  • key和value都是不允许为null的

  • 有太多散列碰撞时,表会动态增长

  • 再散列(扩容)非常消耗资源,最好是提前初始化装载因子和初始容量

2.2 同步原理:

  • 在部分加锁和利用CAS算法来实现同步
2.2.1 CAS(比较与交换)
  • 有名的无锁算法
  • 有三个操作数
    • 内存值V
    • 旧的预期值A
    • 要修改的新值B
  • 当且仅当预期值A和内存值V相同时,才将内存值修改为B,否则什么都不做
  • 当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值(A和内存值V相同时),其他线程都失败,失败的线程并不会被挂起。
2.2.2 volatile:
  • volatile仅仅用来保证该变量对所有线程的可见性,单不保证原子性
    • 保证该变量对所有线程的可见性:
      • 在多线程的环境下:当这个变量被修改时,所有的线程都会知道该变量被改变了
    • 不保证原子性
      • 修改变量实质上在JVM中分了好几步,而在这几步内(从装载变量到修改),它是不安全的

2.3 ConCurrentHashMap的域

ConcurrentHashMap你都讲不明白怎么拿offer

	// 散列表,迭代器就是迭代它的了
	transient volatile Node<K,V>[] table;

    //下一张表:除了在扩容的时候,其他时间都是null
    private transient volatile Node<K,V>[] nextTable;

    // 基础计数器,通过CAS来更新
    private transient volatile long baseCount;

    /**
     * 散列表初始化和扩容都是由这个变量来控制的
     * 当为负数时,它正在被初始化或者在扩容
     * -1表示正在初始化
     * -N表示N-1个线程正在扩容
     * 默认为0
     * 初始化后,保存着下一次扩容的大小
     */
    private transient volatile int sizeCtl;

    // 分割表时用到的索引值
    private transient volatile int transferIndex;

    // 与计算size有关
    private transient volatile int cellsBusy;

    // 与计算size相关
    private transient volatile CounterCell[] counterCells;

    // 视图
    private transient KeySetView<K,V> keySet;
    private transient ValuesView<K,V> values;
    private transient EntrySetView<K,V> entrySet;

构造方法

ConcurrentHashMap你都讲不明白怎么拿offer

  • 无参构造器:默认初始容量为16

  • 指定初始容量构造器

  • 省略

  • 指定初始大小和装载因子

  • 指定初始大小、装载因子和并发更新的线程数量

    其中所涉及的tableSizeFor()用来获取大于参数且最接近的2的整数次幂的数

    赋值给sizeCtl属性也说明了这是下次扩容的大小

方法

initTable初始化:
 private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            // 说明其他线程执行CAS成功,正在进行初始化
            if ((sc = sizeCtl) < 0)
                // 让出CPU使用权
                Thread.yield(); 
            // 设置为-1表示本线程正在初始化
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        // 相当于0.75*n 设置一个扩容的阈值
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
Put方法:
  • 根据key计算出hashcode
  • 判断是否需要进行初始化
  • 即为当前key定位出的Node,如果为空表示当前位置可以写入数据,利用CAS尝试写入,失败则自旋保证成功
  • 如果当前位置hashcode==MOVED == -1则需要扩容
  • 如果都不满足,则利用synchronized锁写入数据
  • 如果数量大于TREEIFY_THRESHOLD,则要转为红黑树
  public V put(K key, V value) {
        return putVal(key, value, false);
    }

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        //对key进行散列,获取哈希值
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            // f=目标位置元素 fh后面存放目标位置元素的hash值
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                // 数组桶为空,初始化数组桶(自旋+CAS)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 桶内为空,CAS放入,不加锁,成功了就直接break跳出
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 插入位置是表的连接点时睡眠在扩容,就帮助当前线程扩容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                // 处理散列冲突
                V oldVal = null;
                // 加锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        // 节点的方式处理
                        if (fh >= 0) {
                            binCount = 1;
                            // 循环加入新的或者覆盖节点
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 按照树的方式插入
                        else if (f instanceof TreeBin) {
                            // 红黑树
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 链表长度大于8,把链表结构转成树型结构
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
Get方法:
  • 不用加锁的,是非阻塞的
  • 其中的Node节点是重写的,设置了volatile关键字修饰,致使它每次获取的都是最新设置的值
  • 根据hash值计算位置
  • 查找到指定位置,如果头结点就是要找的,直接返回它的value
  • 如果头结点hash值小于0,说明正在扩容或者红黑树,查找之
  • 如果是链表,遍历查找之
public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            // 如果指定位置元素存在,头结点hash值相同
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    // key hash值相等,key相同,直接返回元素value
                    return e.val;
            }
            
            else if (eh < 0)
                // 头结点hash值小于0,说明正在扩容或者是红黑树,find查找
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                // 在链表上,遍历查找
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
Size方法:

在1.7和1.8之间的区别

1.7

  • 底层数据结构:数组(Segment)+数组(HashEntry)+链表(HashEntry结点)

  • 基本属性

    • class Segment内部类,继承ReentrantLock,有一个HashEntry数组用来存储表头结点

      int count;     // 此对象中存放的是HashEntry个数
      int threshold; // 扩容阈值
      volatile HashEntry<K,V>[]table;  // 存储entry数组,每一个entry都是链表的头部
      float loadFactor;   // 加载因子
      
    • class HashEntry 定义的结点,里面存储数据和下一个节点

  • 主要方法

    • get():
      • 第一次哈希找到对应的Segment 段,调用Segment中的get方法
      • 再次哈希找到对应的链表
      • 最后在链表中查找
    • put ():
      • 首先确定段的位置,调用Segment中的put方法
      • 加锁
      • 检查当前Segment数组中包含的HashEntry节点的个数,如果超过阈值就重新hash
      • 然后再次hash确定放的链表
      • 在对应的链表中查找是否相同节点,如果有直接覆盖,如果没有将其放置链表尾部
  • 重哈希方式:只是对Segment对象中的HashEntry数组进行重哈希.

1.8

  • 数据结构:Node数组+链表/红黑树

    • Node数组用来存放树或者链表的头结点,当一个链表中的数量达到一定数值后会使查询速率降低,所以达到一定阈值以后,会将一个链表转换为一个红黑二叉树,以提高查询的速率
  • 主要属性

    外部类的基本属性
    volatile Node<K,V>[] table; // Node数组用于存放链表或者树的头结点
    static final int TREEIFY_THRESHOLD = 8; // 链表转红黑树的阈值 > 8 时
    static final int UNTREEIFY_THRESHOLD = 6; // 红黑树转链表的阈值 <= 6 时
    static final int TREEBIN = -2; // 树根节点的hash值
    static final float LOAD_FACTOR = 0.75f;// 负载因子
    static final int DEFAULT_CAPACITY = 16; // 默认大小为16
    内部类
    class Node<K,V> implements Map.Entry<K,V> {
    int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    }
    jdk1.8中虽然不在使用分段锁,但是仍然有Segment这个类,但是没有实际作用

  • 主要方法

    • put
      • 检查key或value是否为null
      • 得到key的hash值
      • 如果Node数组是空的,此时才初始化initTable()
      • 如果找的对于的下标的位置为空,直接new一个Node节点并放入,break
      • 如果对于头节点不为空,则进入代码块
        • 判断此头节点的hash值,
          • 大于0说明是链表的头结点在链表中寻找,如果有想换hash值并且key相同,则之间覆盖返回旧值,如果没有则就直接放置在链表的尾部
          • 小于0说明此节点是红黑二叉树的根节点,调用树的添加元素的方法。
    • get方法
      • 首先获取到key的hash值
      • 然后找到对应的数组下标处的元素
      • 如果此元素是要找的直接返回
      • 如果此元素时null则返回null
      • 如果key小于0,说明是红黑树

如何保证线程安全

  • jdk1.7:
    • 分段锁对整个桶数组进行了分隔分段(Segment),每一把锁只锁容器其中一部分数据,多线程访问容器里不同数据段的数据, 就不会存在锁竞争,提高并发访问率
  • jdk1.8:
    • 使用的是优化的Synchronized关键字同步代码块和cas操作维护并发
相关标签: 所有文章 Java