欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

《Java并发编程的艺术》读书笔记 第六章 Java并发容器和框架

程序员文章站 2022-05-05 09:38:31
...

《Java并发编程的艺术》读书笔记 第六章 Java并发容器和框架

1.ConcurrentHashMap的实现原理与使用

1.1为什么使用ConcurrentHashMap

​ ​ ​ ​ ​ ​ ​ HashMap可能导致死循环,线程安全的HashTable效率低。

线程不安全的HashMap
​ ​ ​ ​ ​ ​ ​ HashMap在并发执行put操作的时候会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环。

效率低下的HashTable
​ ​ ​ ​ ​ ​ ​ HashTable容易使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率低。一个线程访问HashTable的同步方法时,其它线程也访问HashTable的同步方法时,会进入阻塞或者轮询状态,如线程1使用put进行元素添加,线程2非但不能使用put方法也不能使用get方法。

ConcurrentHashMap的锁分段技术可提高并发访问效率
​ ​ ​ ​ ​ ​ ​ 所有访问HashTable的线程都必须竞争同1把锁,所以HashTable在并发环境下表现效率低。假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。
​ ​ ​ ​ ​ ​ ​ 首先将数据分成一段一段的存储,然后每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其它段的数据也能被其它线程访问。

1.2 ConcurrentHashMap的结构

​ ​ ​ ​ ​ ​ ​ ConcurrntHashMap是Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构,一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment锁。

1.3 ConcurrentHashMap的初始化

​ ​ ​ ​ ​ ​ ​ 通过initialCapacity、loadFactor和concurrencyLevel等几个参数来初始化segment数组、段偏移量segmentShift、段掩码segmentMask和每个segment里的HashEntry数组实现的。

初始化segments数组

​ ​ ​ ​ ​ ​ ​ ​ segments数组的长度ssize是通过concurrencyLevel计算得到的。为了能通过按位与的散列算法来定位segments数组的索引,必须保证segment数组的长度是2的n次方,所以必须计算出一个大于或等于concurrencyLevel的最小的2的N次方值来作为segments数组的长度。

初始化segmentShift和segmentMask

​ ​ ​ ​ ​ ​ ​ 这两个全局变量需要在定位segment时的散列算法里使用,sshift等于ssize从1向左移位的次数,在默认情况下concurrencyLevel等于16,也就是1需要向左移位4次,所以sshift为4。
​ ​ ​ ​ ​ ​ ​ segmentShift用于定位参与散列运算的位数,segmentShift等于32-shift,也就是28。因为ConcurrentHashMap里的hash()方法输出的最大数是32位的所以这里用32-shift。
​ ​ ​ ​ ​ ​ ​ segmentMask是散列运算的掩码,等于ssize-1,也就是15。
​ ​ ​ ​ ​ ​ ​ ssize的最大长度为65536,所以segmentMask最大值为65535,segmentShift最大值是16。

初始化每个segment

​ ​ ​ ​ ​ ​ ​ 在构造方法里需要通过initialCapacity(初始化容量)和loadfactor(每个segment的负载因子)来初始化数组中的每个segment。

1.4 定位 Segment

​ ​ ​ ​ ​ ​ ​ 首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再散列。

​ ​ ​ ​ ​ ​ ​ 之所以进行再散列,目的是减少散列冲突,使元素能够均匀地分布在不同的Segment上,从而提高容器的存取效率。

1.5 ConcurrentHashMap的操作
  • get操作

​ ​ ​ ​ ​ ​ ​ 先经过一次再散列,然后使用这个散列值通过散列运算定位到Segment,再通过散列算法定位到元素。
​ ​ ​ ​ ​ ​ ​ get过程中不需要加锁,除非读的值使空才会加锁重新读。get方法里将要使用的共享变量都定义成volatile类型,如用于统计当前Segement大小的count字段和用于存储值的HashEntry的value。定义成volatile的变量,能够在线程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写,get操作里只读不写,所以不用加锁。

  • put操作

​ ​ ​ ​ ​ ​ ​ 在操作共享变量的时候必须加锁,put方法首先定位到Segment,然后在Segment中进行插入操作:先判断是否需要对Segment里的HashEntry数组进行扩容,第二步定位添加元素的位置,然后将其放在HashEntry数组里面。

  • 是否需要扩容插入元素前先判断Segment里的HashEntry数组是否超过容量(threshold),如果超过阈值,则对数组进行扩容,值得一提的是,Segment的扩容判断比HashMap更恰当,因为HashMap是在插入元素后判断元素是否已经到达容量的,如果达到了就扩容,但很有可能扩容后没有新元素插入,这时候HashMap就进行了一次无效的扩容。

  • 如何扩容:首先创建一个容量是原来容量2倍的数组,然后将数组里的元素进行再散列后插入到新的数组里。为了高效,ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment扩容。

  • size操作

​ ​ ​ ​ ​ ​ ​ 执行该方法,也就是要统计所有Segement里元素的大小后求和。Segment里的全局变量count是一个volatile变量。在多线程场景下,ConcurrentHashMap的做法是先尝试2次通过不锁住Segment的方式来统计各个Segement的大小,如果统计过程中,count发生变化,再采用加锁的方法来统计所有Segment的大小。如何判断统计的过程中容器发生变化?使用modCount变量,在put、remove和clean方法里操作元素前都会将变量modCount进行+1,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化。

2.ConcurrentLinkedQueue

2.1ConcurrentLinkedQueue的结构

​ ​ ​ ​ ​ ​ ​ 默认情况下,head节点存储的元素为空,tail节点等于head节点。

2.2入队列

入队列的过程

​ ​ ​ ​ ​ ​ ​ 入队列就是将入队节点添加到队列的尾部。
​ ​ ​ ​ ​ ​ ​ 入队主要做2件事情(单线程入队的角度):第一是将入队节点设置成当前队列尾节点的的下一个节点,第二十更新tail节点,如果tail节点的next节点不为空则将如对节点设置成tail节点,如果tail节点的next节点为空,则将入队节点设置成tail的next节点,所以tail节点不总是为节点。
​ ​ ​ ​ ​ ​ ​ 下面是offer方法的源码,将通过源码分析下如何使用CAS算法来入队的。该方法没有任何的锁操作,线程安全完全是由CAS操作和队列的算法来保证的,整个方法的核心是for循环,这个循环没有出口,一直到尝试成功。

public boolean offer(E e) { 
  checkNotNull(e);
  final Node<E> newNode = new Node<E>(e);
  for (Node<E> t = tail, p = t;;) {
    Node<E> q = p.next;
    if (q == null) {
        //1、如果第一次进来,次条件成立 
        // 1、程序再次进入,此时,t和p还在head位置上,(由于第一次插入了数据)q!=null 
        // 3、p取到了下一个节点(“直到”最后的节点)q==null
      if (p.casNext(null, newNode)) {// p是最后一个节点 
        //每2次更新一下tail
      if (p != t) 
          // 2、如果是第一次进来,那么t==p是成立,所以不会执行下一句话,所以呢,就不会更新t节点,也就是不会更新tail 
          // 4、经过第二步,p已经不能t了,所以,更新tail节点,由此可以看书,不是每次都更新tail的
        casTail(t, newNode); // Failure is OK.
        return true;
      }
      // cas竞争失败,再次尝试,
    }
    else if (p == q)// 遇到哨兵节点,从head开始遍历(哨兵节点的产生是因为,链表进行了poll()“提取删除”操作)
          //但如果tail被修改了,则使用tail(因为可能被修改正确了),
          // t!=(t=tail)这个操作在并行中是永false,但是在串行中,由于可能对比的时候,t的值被其他线程修改,所以,此值可能为turl
    p = (t != (t = tail)) ? t : head;
    else
    //2、p取下一个节点或者最后一个节点
    p = (p != t && t != (t = tail)) ? t : q;
  }
}

​ ​ ​ ​ ​ ​ ​ 整个入队过程主要做2件事:第一定位出尾节点,第二是使用CAS算法将入队节点设置成尾节点的next节点,如果不成功就重试。

定位尾节点

​ ​ ​ ​ ​ ​ ​ 刚刚也说了,tail节点并不总是尾节点,所以每次入队都必须先通过tail节点找到尾节点。在源码中的循环的第一个if是判断tail是否有next节点,如果有那就表示next节点可能是尾节点,获取tail节点的next节点时需要注意p节点等于p.next的情况(这意味着p和p.next都等于空,这个队列刚刚初始化,正要添加节点),这时候应该返回head节点。

设置入队节点为尾节点

​ ​ ​ ​ ​ ​ ​ p.casNext(null,n)方法用于将入队节点设置为当前队列尾节点的next节点,如果p是null,表示p是当前队列的尾节点,如果不为null,表示其它线程更新了尾节点,则需要重新获取当前队列的尾节点。

2.3 出队列

​ ​ ​ ​ ​ ​ ​ 即从队列里返回一个节点元素,并清空该节点对元素的引用。
​ ​ ​ ​ ​ ​ ​ 首先获取头节点元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置为null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。

3.Java中的阻塞队列

​ ​ ​ ​ ​ ​ ​ 阻塞队列是一个支持两个附加操作的队列,这两个附加的操作支持阻塞的插入和移除方法。

  • 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,一直到队列不满

  • 支持阻塞的溢出方法:意思是当队列空的时候,队列会阻塞移除元素的线程,一直到队列不空

​ ​ ​ ​ ​ ​ ​ 阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

3.1 Java里的阻塞队列

​ ​ ​ ​ ​ ​ ​ JDK 7提供了7个阻塞队列。
《Java并发编程的艺术》读书笔记 第六章 Java并发容器和框架

3.2 阻塞队列的实现原理

​ ​ ​ ​ ​ ​ ​ 推荐文章1推荐文章2

4.Fork/Join框架

​ ​ ​ ​ ​ ​ ​ 该框架是用于并行执行任务的框架,是一个把大任务分割成小任务的,最终汇总每个小任务结果后得到大任务结果的框架。

4.1 工作窃取算法

​ ​ ​ ​ ​ ​ ​ 工作窃取是指某个线程从其它队列里窃取任务来执行。被窃取任务的线程和窃取任务的线程会访问同一个队列,为了减少二者之间的竞争,通常会使用双端你队列,被窃取任务线程永远从双端队列的头部拿任务执行,窃取任务的线程永远从双端队列的尾部拿任务执行。

  • 工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争。
  • 工作窃取算法的缺点:某些情况下仍存在竞争且消耗更多系统资源。
4.2 Fork/join框架的设计

​ ​ ​ ​ ​ ​ ​ 步骤1 分割任务。首先需要一个fork类把大任务分成子任务,有可能子任务还是很大,所以需要不停分割,直到分割出的子任务足够小。

​ ​ ​ ​ ​ ​ ​ 步骤2 执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行,子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

ForkJoinTask和ForkJoinPool

4.3 使用Fork/Join框架

​ ​ ​ ​ ​ ​ ​ ForkJoinTask与一般任务的主要区别在于它需要实现compute方法,在这个方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用fork方法时,又会进入compute方法,看看当前子任务是否需要继续分割成子任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果。

4.4 Fork/Join框架的异常处理

​ ​ ​ ​ ​ ​ ​ ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里面直接捕获异常,所以ForkJoinTask提供了isCompletedAbormally()方法来检查任务是否已经抛出异常或者已经被取消了,并且可以通过ForkJoinTask的getException()方法来获取异常(该方法返回Throwable对象,如果任务没完成或者没有抛出异常返回null)。

4.5 Fork/Join框架的实现原理

​ ​ ​ ​ ​ ​ ​ ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。

  • ForkJoinTask的fork方法实现原理

​ ​ ​ ​ ​ ​ ​ 当调用该方法的时候,程序会调用ForkJoinWorkerThread的pushTask方法异步执行这个任务,然后立即返回结果。

  • ForkJoinTask的join方法实现原理

    Join方法的主要作用是阻塞当前线程并等待获取结果。