欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java并发编程的艺术(十二)——并发容器和框架

程序员文章站 2022-05-05 09:38:25
...

ConcurrentHashMap

为什么需要ConcurrentHashMap

  • HashMap线程不安全,因为HashMap的Entry是以链表的形式存储的,如果多线程操作可能会形成环,那样就会死循环。
  • HashTable效率低,利synchronized保证线程安全,同时只有一个线程访问其同步方法,其他线程都会被阻塞。

特点

  • 锁分段技术提高并发访问率:将数据分成一段一段的,然后对每一段分别加锁,这样,两个线程在访问不同段的数据的时候,就不会出现竞争。

结构

  • ConcurrentHashMap由Segment数组结构和HashEntry数组结构组成。
  • 每个Segmengt包含一个HashEntry数组,并对其进行加锁操作。Segmengt继承自ReentrantLock,是可重入锁。
  • HashEntry就是一个包含数据的链表。
    Java并发编程的艺术(十二)——并发容器和框架
    Java并发编程的艺术(十二)——并发容器和框架

操作

1.get

  • 经过一次再散列,先定位到segment,然后再通过散列运算定位到元素。
  • 不需要加锁,除非读到的值是空的才会加锁重读。
  • 将共享变量定义为volatile。

2.put

  • 先定位到segment,然后在segment里进行插入操作。
  • 是否需要扩容?
    先判断HashEntry数组是否超过阈值,如果超过了就扩容,扩容之后再插入数据。而HashMap是先插入再扩容,这样不好,因为下次可能就没有数据进来了,那就白扩容了。
  • 如何扩容?
    先创建一个容量是原来2倍的数组,然后通过对原数组元素进行再散列后插入到新数组。扩容只会对某个segment进行。

3.size

  • 如何保证多线程下安全统计
    在每个segment中有一个volatile修饰的count属性,表示这个segment中的元素个数,先充实通过2次不加锁的方法统计所有count的总和,如果两次结果不相等,或者容器被修改过了,就将Segment加锁,再将进行第三次统计。
  • 如何判断统计的时候ConcurrentHashMap是否被修改了?
    ConcurrentHashMap中有一个modCount变量,每次put\remove\clean操作,都会对这个值加一,通过比较这个值,就知道是否容器是否被修改了。

ConcurrentLinkedQueue

是一个基于连接节点的*线程安全队列,采用CAS算法实现。

结构

Java并发编程的艺术(十二)——并发容器和框架
继承了AbstractQueue,不是阻塞队列。

ConcurrentLinkedQueue由head节点和tail节点组成。head、tail、next、item均使用volatile修饰,保证其内存可见性。

操作

入队

  • 通过CAS的算法进行入队。
  • 如果tail节点的next不为空,则将入队节点设置为tial节点,如果tail节点的next为空,则将入队节点的设置为tail的next节点。所以tail节点不总是尾节点。
    Java并发编程的艺术(十二)——并发容器和框架
  • 为什么不总是让tail节点指向 尾节点?
    如果将tail节点永远作为尾节点,这样每次都需要循环CAS更新tail节点,而设置一个到尾节点的距离,当tail到尾节点的距离大于某个值(通常为1)的时候再更新tail,这样可以减少更新的次数,提高入队的效率。

出队

  • 不是每次出队都更新head节点,当head中有元素,就直接弹出head的元素,如果没有元素,就弹出head的next,然后更新head节点。
  • 也是通过控制距离的方式,减少CAS更新节点的消耗。
  • Java并发编程的艺术(十二)——并发容器和框架

阻塞队列

什么是阻塞队列

阻塞队列是支持两个阻塞操作的队列:

  1. 支持阻塞的插入:当队列满了,插入操作的线程就被阻塞,直到队列不满。
  2. 支持阻塞的移除:当队列空了,移除操作线程就会被阻塞,直到队列有元素。

阻塞队列的使用场景

  • 用于消费者和生产者的场景,生产者向队列添加元素,消费者向队列读取元素。阻塞队列就是作为二者的中间缓冲。

不可用时的处理方式

Java并发编程的艺术(十二)——并发容器和框架

JDK 提供的阻塞队列

一共七个:

  • ArrayBlockingQueue
  • LinkedBlockingQueue:*
  • LinkedBlockingDeque
  • PriorityBlockingQueue : *
  • DeleyQueue
  • SynchronousQueue:不存储数组
  • LinkedTransferQueue : *

ArrayBlockingQueue

是一个 数组实现的 线程安全的 有限 阻塞队列。

ArrayBlockingQueue继承自AbstractQueue,并实现了BlockingQueue接口。
ArrayBlockingQueue由ReentrantLock实现队列的互斥访问,并由notEmpty、notFull这两个Condition分别实现队空、队满的阻塞。
ReentrantLock分为公平锁和非公平锁,可以在构造ArrayBlockingQueue时指定。默认为非公平锁。

阻塞唤醒与原理
  • 如果队列满:添加元素的时候,通过调用notFull.await()阻塞当前线程;移除元素额时候,用notFull.signal()唤醒在notFull上等待的线程。
  • 如果队列空:读取元素的时候,通过notEmpty.await()阻塞当前线程;当添加元素时,调用notEmpty.signal()唤醒在notEmpty上等待的线程。

LinkedBlockingQueue

一共单链表实现的*阻塞队列。

LinkedBlockingQueue继承自AbstractQueue,实现了BlockingQueue接口。
LinkedBlockingQueue由单链表实现,因此是个无限队列。但为了方式无限膨胀,构造时可以加上容量加以限制。
LinkedBlockingQueue分别采用读取锁和插入锁控制读取/删除 和 插入过程的并发访问,并采用notEmpty和notFull两个Condition实现队满队空的阻塞与唤醒。

阻塞唤醒与原理
  • 如果队列满:插入元素的时候需要获取putLock,然后和上面一样,调用notFull.await(),阻塞插入线程;当队列不满了,调用signal进行唤醒,最后释放putLock。
  • 如果队列空:删除获取元素需要获取takeLock,抵用await,阻塞读取线程;当队列有数据了,再调用signal唤醒线程,最后释放takeLock。

DelayQueue

是一个支持延时获取元素的*队列。

  • 使用了PriorityQueue实现。
  • 队列中的元素必须实现Delayed接口,只有延时时间满了,才能提取当前元素。
应用场景
  • 缓存系统设计:用DelayQueue保存缓存元素的有效期,如果能够获取到该元素,说明其有效期到了。
  • 定时任务调度:用DelayQueue保存任务和任务执行时间,当获取到任务,就开始执行。

SynchronousQueue

  • 不存储元素的阻塞队列。
  • 每一个put必须等待另一个get,不然就不能继续添加元素。
  • 支持公平访问队列。
  • 适合数据需要直接传递的场景。

LinkedTransferQueue

相比其他阻塞队列,多了transfer和 tryTransfer方法。

  • transfer():当消费者真在接受元素的时候,直接用transfer方法把数据传给消费者;如果没有消费者在等待,就入队。
  • tryTransfer():用于是否有消费者在接受元素。

阻塞队列实现原理

利用Condition来实现的。

notEmpty = lock.newCondition();
notFull = lock.newChonditon();
  • 如果队列满:添加元素的时候,通过调用notFull.await()阻塞当前线程;移除元素额时候,用notFull.signal()唤醒在notFull上等待的线程。
  • 如果队列空:读取元素的时候,通过notEmpty.await()阻塞当前线程;当添加元素时,调用notEmpty.signal()唤醒在notEmpty上等待的线程。

Fork/Join框架

什么是Fork/Jion

是Java7提供的用于并行执行任务的框架,把大任务分为多个小任务,然后处理好了之后再汇总得到大任务的结果。

工作窃取算法

从某个线程中窃取任务来执行。也就是用来任务分割的算法。

  • 优点:充分利用线程进行并行计算,减少线程的竞争。
  • 缺点:消耗更多资源,比如创建了更多的线程和队列。

工作流程

  1. 分割任务
    通过一个fork类把大任务分割成子任务,直到分割得足够小。
  2. 任务的的执行和结果的合并
    分割好的子任务方法双端队列中,然后启动多个线程去队列中获取任务执行。执行的结果统一放在一个队列里,启动一个线程去合并结果。

实现原理

fork方法

  • 调用pushTask方法异步地执行这个任务,然后返回结果。
  • pushTask方法把当前任务放在ForkJoinTask数组队列中,然后调用signalWork方法唤醒一个工作线程来执行任务。

jion方法

用于阻塞当前线程并等待结果获取