Java并发编程的艺术(十二)——并发容器和框架
ConcurrentHashMap
为什么需要ConcurrentHashMap
- HashMap线程不安全,因为HashMap的Entry是以链表的形式存储的,如果多线程操作可能会形成环,那样就会死循环。
- HashTable效率低,利synchronized保证线程安全,同时只有一个线程访问其同步方法,其他线程都会被阻塞。
特点
- 锁分段技术提高并发访问率:将数据分成一段一段的,然后对每一段分别加锁,这样,两个线程在访问不同段的数据的时候,就不会出现竞争。
结构
- ConcurrentHashMap由Segment数组结构和HashEntry数组结构组成。
- 每个Segmengt包含一个HashEntry数组,并对其进行加锁操作。Segmengt继承自ReentrantLock,是可重入锁。
- HashEntry就是一个包含数据的链表。
操作
1.get
- 经过一次再散列,先定位到segment,然后再通过散列运算定位到元素。
- 不需要加锁,除非读到的值是空的才会加锁重读。
- 将共享变量定义为volatile。
2.put
- 先定位到segment,然后在segment里进行插入操作。
- 是否需要扩容?
先判断HashEntry数组是否超过阈值,如果超过了就扩容,扩容之后再插入数据。而HashMap是先插入再扩容,这样不好,因为下次可能就没有数据进来了,那就白扩容了。 - 如何扩容?
先创建一个容量是原来2倍的数组,然后通过对原数组元素进行再散列后插入到新数组。扩容只会对某个segment进行。
3.size
- 如何保证多线程下安全统计
在每个segment中有一个volatile修饰的count属性,表示这个segment中的元素个数,先充实通过2次不加锁的方法统计所有count的总和,如果两次结果不相等,或者容器被修改过了,就将Segment加锁,再将进行第三次统计。 - 如何判断统计的时候ConcurrentHashMap是否被修改了?
ConcurrentHashMap中有一个modCount变量,每次put\remove\clean操作,都会对这个值加一,通过比较这个值,就知道是否容器是否被修改了。
ConcurrentLinkedQueue
是一个基于连接节点的*线程安全队列,采用CAS算法实现。
结构
继承了AbstractQueue,不是阻塞队列。
ConcurrentLinkedQueue由head节点和tail节点组成。head、tail、next、item均使用volatile修饰,保证其内存可见性。
操作
入队
- 通过CAS的算法进行入队。
- 如果tail节点的next不为空,则将入队节点设置为tial节点,如果tail节点的next为空,则将入队节点的设置为tail的next节点。所以tail节点不总是尾节点。
- 为什么不总是让tail节点指向 尾节点?
如果将tail节点永远作为尾节点,这样每次都需要循环CAS更新tail节点,而设置一个到尾节点的距离,当tail到尾节点的距离大于某个值(通常为1)的时候再更新tail,这样可以减少更新的次数,提高入队的效率。
出队
- 不是每次出队都更新head节点,当head中有元素,就直接弹出head的元素,如果没有元素,就弹出head的next,然后更新head节点。
- 也是通过控制距离的方式,减少CAS更新节点的消耗。
阻塞队列
什么是阻塞队列
阻塞队列是支持两个阻塞操作的队列:
- 支持阻塞的插入:当队列满了,插入操作的线程就被阻塞,直到队列不满。
- 支持阻塞的移除:当队列空了,移除操作线程就会被阻塞,直到队列有元素。
阻塞队列的使用场景
- 用于消费者和生产者的场景,生产者向队列添加元素,消费者向队列读取元素。阻塞队列就是作为二者的中间缓冲。
不可用时的处理方式
JDK 提供的阻塞队列
一共七个:
- ArrayBlockingQueue
- LinkedBlockingQueue:*
- LinkedBlockingDeque
- PriorityBlockingQueue : *
- DeleyQueue
- SynchronousQueue:不存储数组
- LinkedTransferQueue : *
ArrayBlockingQueue
是一个 数组实现的 线程安全的 有限 阻塞队列。
ArrayBlockingQueue继承自AbstractQueue,并实现了BlockingQueue接口。
ArrayBlockingQueue由ReentrantLock实现队列的互斥访问,并由notEmpty、notFull这两个Condition分别实现队空、队满的阻塞。
ReentrantLock分为公平锁和非公平锁,可以在构造ArrayBlockingQueue时指定。默认为非公平锁。
阻塞唤醒与原理
- 如果队列满:添加元素的时候,通过调用notFull.await()阻塞当前线程;移除元素额时候,用notFull.signal()唤醒在notFull上等待的线程。
- 如果队列空:读取元素的时候,通过notEmpty.await()阻塞当前线程;当添加元素时,调用notEmpty.signal()唤醒在notEmpty上等待的线程。
LinkedBlockingQueue
一共单链表实现的*阻塞队列。
LinkedBlockingQueue继承自AbstractQueue,实现了BlockingQueue接口。
LinkedBlockingQueue由单链表实现,因此是个无限队列。但为了方式无限膨胀,构造时可以加上容量加以限制。
LinkedBlockingQueue分别采用读取锁和插入锁控制读取/删除 和 插入过程的并发访问,并采用notEmpty和notFull两个Condition实现队满队空的阻塞与唤醒。
阻塞唤醒与原理
- 如果队列满:插入元素的时候需要获取putLock,然后和上面一样,调用notFull.await(),阻塞插入线程;当队列不满了,调用signal进行唤醒,最后释放putLock。
- 如果队列空:删除获取元素需要获取takeLock,抵用await,阻塞读取线程;当队列有数据了,再调用signal唤醒线程,最后释放takeLock。
DelayQueue
是一个支持延时获取元素的*队列。
- 使用了PriorityQueue实现。
- 队列中的元素必须实现Delayed接口,只有延时时间满了,才能提取当前元素。
应用场景
- 缓存系统设计:用DelayQueue保存缓存元素的有效期,如果能够获取到该元素,说明其有效期到了。
- 定时任务调度:用DelayQueue保存任务和任务执行时间,当获取到任务,就开始执行。
SynchronousQueue
- 不存储元素的阻塞队列。
- 每一个put必须等待另一个get,不然就不能继续添加元素。
- 支持公平访问队列。
- 适合数据需要直接传递的场景。
LinkedTransferQueue
相比其他阻塞队列,多了transfer和 tryTransfer方法。
- transfer():当消费者真在接受元素的时候,直接用transfer方法把数据传给消费者;如果没有消费者在等待,就入队。
- tryTransfer():用于是否有消费者在接受元素。
阻塞队列实现原理
利用Condition来实现的。
notEmpty = lock.newCondition();
notFull = lock.newChonditon();
- 如果队列满:添加元素的时候,通过调用notFull.await()阻塞当前线程;移除元素额时候,用notFull.signal()唤醒在notFull上等待的线程。
- 如果队列空:读取元素的时候,通过notEmpty.await()阻塞当前线程;当添加元素时,调用notEmpty.signal()唤醒在notEmpty上等待的线程。
Fork/Join框架
什么是Fork/Jion
是Java7提供的用于并行执行任务的框架,把大任务分为多个小任务,然后处理好了之后再汇总得到大任务的结果。
工作窃取算法
从某个线程中窃取任务来执行。也就是用来任务分割的算法。
- 优点:充分利用线程进行并行计算,减少线程的竞争。
- 缺点:消耗更多资源,比如创建了更多的线程和队列。
工作流程
- 分割任务
通过一个fork类把大任务分割成子任务,直到分割得足够小。 - 任务的的执行和结果的合并
分割好的子任务方法双端队列中,然后启动多个线程去队列中获取任务执行。执行的结果统一放在一个队列里,启动一个线程去合并结果。
实现原理
fork方法
- 调用pushTask方法异步地执行这个任务,然后返回结果。
- pushTask方法把当前任务放在ForkJoinTask数组队列中,然后调用signalWork方法唤醒一个工作线程来执行任务。
jion方法
用于阻塞当前线程并等待结果获取
推荐阅读
-
Java并发编程,互斥同步和线程之间的协作
-
Java并发编程的艺术-----Java并发编程基础(线程间通信)
-
《Java并发编程的艺术》笔记
-
<
>-阅读笔记和思维导图 -
[转]Java7中的ForkJoin并发框架初探(上)——需求背景和设计原理
-
Java并发编程中CountDownLatch和CyclicBarrier的使用
-
Java多线程并发编程中并发容器第二篇之List的并发类讲解
-
干货!4面阿里java后端,才发现并发编程和JVM是必不可少,特此分享给为金九银十备战的你们
-
读书笔记之《Java 并发编程的艺术》
-
Java并发编程:Java线程池核心ThreadPoolExecutor的使用和原理分析