《Java并发编程的艺术》第六章——Java并发容器和框架
知识点:
1. ConcurrentHashMap
2. ConcurrentLinkedQueue
3. Java中的阻塞队列
4. Fork/Join框架
1. ConcurrentHashMap
ConcurrentHashMap是Java5后出现的,是线程安全且高效的HashMap。普通的HashMap在多线程下可能出现环形链接的问题,导致CPU飙升,程序挂死。而Hashtable虽然支持多线程,但所有的方法都使用synchronized关键字来保证可见性,即使是在不同线程中分别调用put()和get()方法都被阻塞,导致其性能实在不敢恭维。
于是,ConcurrentHashMap华丽登场。ConcurrentHashMap使用锁分段技术,将数据分成一段一段的存储,然后给每一段数据分配一把锁,当一个线程占用锁访问一个段数据时,其他段数据也能被其他线程访问。
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁,在ConcurrentHashMap中扮演锁的角色。HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,是一种数组和链表的结构。一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素。每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment锁。其结构图如下:
初始化
ConcurrentHashMap初始化方法是通过initialCapacity,loadFactor, concurrencyLevel几个参数来初始化segments数组,段偏移量segmentShift,段掩码segmentMask和每个segment里的HashEntry数组 。
- 初始化segments数组:
由上面的代码可知segments数组的长度ssize通过concurrencyLevel计算得出。为了能通过按位与的哈希算法来定位segments数组的索引,必须保证segments数组的长度是2的N次方(power-of-two size),所以必须计算出一个是大于或等于concurrencyLevel的最小的2的N次方值来作为segments数组的长度。假如concurrencyLevel等于14,15或16,ssize都会等于16,即容器里锁的个数也是16。
- 初始化segmentShift和segmentMask:
这两个全局变量在定位segment时的哈希算法里需要使用,sshift等于ssize从1向左移位的次数,在默认情况下concurrencyLevel等于16,1需要向左移位移动4次,所以sshift等于4。segmentShift用于定位参与hash运算的位数,segmentShift等于32减sshift,所以等于28,这里之所以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的,后面的测试中我们可以看到这点。segmentMask是哈希运算的掩码,等于ssize减1,即15,掩码的二进制各个位的值都是1。因为ssize的最大长度是65536,所以segmentShift最大值是16,segmentMask最大值是65535,对应的二进制是16位,每个位都是1。
- 初始化每个segment:
输入参数initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每个segment的负载因子,在构造方法里需要通过这两个参数来初始化数组中的每个segment。
。
上面代码中的变量cap就是segment里HashEntry数组的长度,它等于initialCapacity除以ssize的倍数c,如果c大于1,就会取大于等于c的2的N次方值,所以cap不是1,就是2的N次方。segment的容量threshold=(int)cap*loadFactor,默认情况下initialCapacity等于16,loadfactor等于0.75,通过运算cap等于1,threshold等于零。
【备注】:参数concurrencyLevel是用户估计的并发级别,就是说你觉得最多有多少线程共同修改这个map,根据这个来确定Segment数组的大小,默认为16。
定位Segment
既然ConcurrentHashMap使用分段锁Segment来保护不同段的数据,那么在插入和获取元素的时候,必须先通过哈希算法定位到Segment。可以看到ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再哈希。
之所以进行再哈希,其目的是为了减少哈希冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。假如哈希的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段锁也会失去意义。
ConcurrentHashMap的操作
- get操作:Segment的get操作实现非常简单和高效。先经过一次再散列,然后使用这个散列值通过散列运算定位到Segment,再通过散列算法定位到元素。
get操作的高效之处在于整个get过程不需要加锁,除非读到值为空才会加锁重读。因为用于统计当前Segment大小的count字段和用于存储值得HashEntry的value都被定义成volatile变量,而在get操作里只需要读不需要写共享变量count和value。
在定位元素的代码里我们可以发现,定位HashEntry和定位Segment的散列算法虽然一样,都与数组的长度减去1再相“与”,但是相“与”的值不一样,定位Segment使用的是元素的hashcode通过再散列后得到的值得高位,而定位HashEntry直接使用的是再散列后的值。其目的是避免两次散列后的值一样,虽然元素在Segment里散列开了,但是却没有在HashEntry里散列开。
- put操作
由于put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须加锁。put方法首先定位到Segment,然后再Segment里进行插入操作。插入操作需要经历两个步骤,第一步在插入元素之前判断Segment里的HashEntry数组是否需要扩容,如果HashEntry数组超过容量,则创建一个容量是原来容量两倍的数组,然后将原数组里的元素进行再散列后插入到新的数组里。为了高效,ConcurrentHashMap只针对某个Segment进行扩容而不是整个容器。第二步定位添加元素的位置,然后将其放在HashEntry数组里。
- size操作
如果要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小后求和。Segment里的全局变量count虽然被定义为volatile变量,但如果在累加前使用的count发生了变化,那么统计结果就不准了。最安全的做法是在统计size时,锁住所有的Segment的put,remove,clean方法,但显然很低效。
ConcurrentHashMap统计size的方法是,尝试2次不锁住Segment的方式来统计各个Segment大小,如果在统计过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。ConcurrentHashMap中modCount变量在调用put,remove和clean方法素前加1,从而来记录容器大小是否发生变化,
2 ConcurrentLinkedQueue
在并发编程中,如果要实现一个线程安全的队列,则有两种方式:
- 使用阻塞算法,入队和出队使用锁来控制。
- 非阻塞算法:即循环CAS方式来实现。
ConcurrentLinkedQueue就是使用非阻塞方式实现的基于链接节点的*线程安全队列。它采用先进先出的规则对节点进行排序。新添加的元素会被添加到对尾,获取元素时,它会返回头部的元素。
ConcurrentLinkedQueue的类图如下:
ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,由此组成一张链表结构的队列。默认情况下head节点存储的元素为空,tail节点等于head节点。
入队列
入队列就是将入队节点添加到队列的尾部。
以下为添加4个元素到队列的快照图:
流程如下:
- 添加元素1:队列更新head节点的next节点为元素1节点。又因为tail节点默认情况下等于head节点,所以他们的next节点都指向元素1节点。
- 添加元素2:队列首先设置元素1节点的next节点为元素2节点,然后更新tail节点指向元素2节点。
- 添加元素3:设置tail节点的next节点为元素3节点。
- 添加元素4:设置元素3的next节点为元素4节点,然后将tail节点指向元素4节点。
通过上图可以看出入队主要做两件事情:
- 将入队节点设置为当前队列尾节点的下一个节点。
- 更新tail节点,如果tail节点的next节点不为空,则将入队节点设置为tail节点,如果tail节点的next为空,则将入队节点设置为tail节点的next节点(注意:此时并未更新tail节点为尾节点)。所以,tail节点并不总是尾节点。
如果在单线程中执行没有任何问题,但如果在多线程中可能出现插队的情况。如果有一个线程正在入队,那么首先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这是如果另一个线程插队,则队列尾节点发生变化,当前线程需要暂停入队操作,重新获取新的尾节点。所以使用CAS算法来将入队节点设置为尾节点的next节点:
定位尾节点
因为尾节点并不总是tail点。尾节点可能是tail节点或tail节点的next节点。所以,节点在入队之前必须通过tail节点来找到尾节点。
如果队列尾节点p与p的next节点都为空,则表示这个队列刚初始化,正准备添加节点,所以需要返回head节点。
设置入队节点为尾节点
p.casNext(null,n)方法用于将入队节点设置为当前队列尾节点的next节点,如果p是null,表示p是当前队列的尾节点,如果不为null,表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。
为什么不保证tail节点总是尾节点
如果保证tail节点总是尾节点的话,那么入队操作直接通过tail节点定位到尾节点,然后把尾节点next节点更新为新的入队节点,随后更新tail节点为新的尾节点不就可以了吗?但这么做的有一个很明显的缺陷:每次都需要使用循环CAS更新tail节点为尾节点。一定程度上降低了入队的效率。所以在ConcurrentLinkedQueue入队时,并不是每次更新tail节点为尾节点,只有当tail节点和尾节点距离大于等于常量HOPS的值(默认为1)时才会更新tail节点。tail节点与尾节点距离越长,使用CAS更新tail节点次数越少,但每次入队时通过tail节点定位尾节点的时间就越长。但这样仍然可以提升入队效率,因为本质上来看通过增加volatile变量的读操作来减少volatile变量的写操作,而对volatile变量写操作的开销远远大于读操作。
【备注】:入队方法永远返回true,所以不要通过返回值判断入队是否成功。
出队列
出队列就是从队列里返回一个节点元素,并清空该节点对元素的引用。
以下为从队列获取4个元素的快照图:
从上图可以看出,并不是每次出队列都需要更新head节点为首节点,当head节点为空时,更新head节点为首节点,如果head节点不为空,则直接弹出head节点里的元素,并不会更新head节点为新的首节点。之所以这样设计,同样是为了减少CAS更新head节点从而提高出队效率。
首先获取首节点,然后判断首节点是否为空,如果为空,则证明另一个线程已经进行了一次出队操作,需要获取新首节点即原首节点的next节点。如果不为空则使用CAS方式将首节点引用设置为null,如果成功,则直接返回首节点的元素,如果不成功,则表示另外一个线程已经进行了一次出队操作并更新了head节点,导致元素发生变化,需要重新获取首节点。
3.Java中的阻塞队列
什么是阻塞队列?
阻塞队列是一个支持两个附加操作的队列。
- 支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满。
- 支持阻塞的移除方法:在队列为空时,获取元素的线程会等待队列变成非空。
阻塞队列常用于生产者和消费者的场景,生产者向队列里添加元素,消费者从队列里取元素。在阻塞队列不可用时(消费者取时,队列为空,生产者添加时,队列已满),这两个附加操作提供了4中处理方式:
- 抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException(“Queuefull”)异常。当队列空时,从队列获取元素会抛出NoSuchElmentException异常。
- 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。当从队列取元素时,如果没有则返回null。
- 一直阻塞:当队列满时,如果生产者线程往队列put元素,则队列会一直阻塞生产者线程直到队列可用或响应中断。当队列空时,如果消费者线程从队列里take元素,队列会阻塞消费者线程直到队列不为空。
- 超时退出:当队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。当队列空时,如果消费者线程从队列取元素,队列会阻塞消费者线程一段时间,直到超过指定的时间,消费者线程退出。
【备注】:如果是*阻塞队列,队列不可能出现满的情况,所以使用put或offer方法永远不会被阻塞,而且使用offer方法时,永远返回true。
JDK7 提供的7个阻塞队列
ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平的访问队列。但可通过以下代码创建一个公平的阻塞队列:
为了保证公平性,通常会降低吞吐量,其实现是依靠可重入锁:
LinkedBlockQueue:一个由链表结构组成的有界阻塞队列。
此队列的默认和最大长度为Integer.MAX_VALUE。按照先进先出(FIFO)的原则对元素进行排序。但不保证线程公平的访问队列,也不提供创建公平访问队列的方法。
PriorityBlockingQueue:一个支持优先级的*阻塞队列。
默认情况下元素采用自然顺序升序排序。也可自定义类实现compareTo()方法来执行元素排序规则,或初始化PriorityBlockingQueue时,执行构造参数Comparator来对元素进行排序。但PriorityBlockingQueue不能保证同优先级元素的顺序。
DelayQueue:一个支持延时获取元素的*阻塞队列。
队列使用PriorityQueue实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
一般可以用来:
- 缓存系统的设计:用来保存缓存元素的有效期。使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
- 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueued中获取到任务就开始执行,如TimerQueue就是使用DelayQueue实现的。
package com.lipeng.sixth;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* DelayQueue演示Demo
* @author LiPeng
*
*/
public class DelayQueueDemo {
public static void main(String[] args) {
final DelayQueue<Node> queue=new DelayQueue<Node>();
Node node1=new Node("node-1",5000); //延迟5s
Node node2=new Node("node-2",8000); //延迟8s
Node node3=new Node("node-3",4000); //延迟4s
queue.offer(node1);
queue.offer(node2);
queue.offer(node3);
Thread thread1=new Thread(new Runnable() {
@Override
public void run() {
try {
for(;;){
//此处使用take,如果队列中无符合条件节点,则阻塞
Node node=queue.take();
System.out.println(node.toString());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread1.start();
}
}
class Node implements Delayed{
private String name;
private long delayTime;
private long endTime;
public Node(String name, long delayTime) {
super();
this.name = name;
this.delayTime = delayTime;
this.endTime=delayTime+System.currentTimeMillis();
}
//保证队列中,延迟时间长的元素排在队尾
@Override
public int compareTo(Delayed o) {
if(o==this)
return 0;
if(o instanceof Node){
Node node=(Node) o;
//和首节点(o)进行对比,如果当前入队节点延迟时间小于首节点,则返回-1
if(node.getEndTime()-this.getEndTime()>0)
return -1;
else
return 1;
}
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
//TimeUnit 默认纳秒 此处转为毫秒
return unit.convert(endTime-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getDelayTime() {
return delayTime;
}
public void setDelayTime(long delayTime) {
this.delayTime = delayTime;
}
public long getEndTime() {
return endTime;
}
public void setEndTime(long endTime) {
this.endTime = endTime;
}
@Override
public String toString() {
return "Node [name=" + name + ", delayTime=" + delayTime + ", endTime="
+ endTime + "]";
}
}
SynchronousQueue:一个不存储元素的阻塞队列。
此队列每一个put操作必须等待一个take操作,否则不能继续添加元素。默认情况下线程采用非公平性策略访问队列,但可通过以下方法设置以公平策略访问:
SynchronousQueue本身不存储任何元素,只是负责把生产者线程处理的数据直接传递给消费者线程。其吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。
LinkedTransferQueue:一个由链表结构组成的*阻塞TransferQueue队列。
相比其他的阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。
transfer方法:如果当前有消费者正在等待接收元素,transfer方法可以把生产者传入的元素立刻transfer给消费者。如果没有消费者等待接受元素,transfer方法会将元素存放在队列的tail节点,直到该元素被消费者消费才返回。
tryTransfer方法:tryTransfer方法用来试探生产者传入的元素是否能直接传给消费者消费。如果没有消费者等待接收元素,返回false,反之,返回true。tryTransfer方法会立即返回,而不用等待元素被消费以后才返回。
对于带有时间限制的tryTransfer(E e,long timeout,TimeUnit unit)方法,试图把生产者传入的元素直接传给消费者,但如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没有消费元素,则返回false,如果在超时时间内消费了元素则返回true。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
LinkedBlockingDeque是一个可以从队列两端插入和移除元素的队列。因为与其他阻塞队列相比,多了一个操作队列的入口,所以在多线程同时入队时,也就减少一半的竞争。
阻塞队列的原理
阻塞队列使用通知模式实现,即当生产者往满的队列里添加元素时会阻塞生产者线程,当消费者消费了队列中一个元素后,会通知生产者当前队列可用。
以下为ArrayBlockingQueue源码:
又看到了Condition,有没有觉得很熟悉呢?不知道怎么回事的,请读我的
这篇博客,此处不再讲解。
4.Fork/Join框架
Fork/Join框架的定义
Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,它可以把大任务分割成若干个小任务,并最终汇总每个小任务结果后得到大任务结果的框架。 其运行流程如下:
工作窃取算法
工作窃取算法是指某个线程从其他队列里窃取任务来执行。当我们需要做一个大任务时,可以把这个任务分割成若干互不依赖的子任务,为了减少线程竞争,把子任务分别放入不同队列,并为每个队列创建一个单独的工作线程,假设,有其他线程提前把自己队列任务做完之后,还需要等待其他线程,这时,为了提高效率,已经做完任务的线程会去其他队列窃取任务执行,以帮助其他未完成的线程。此时他们访问同一个队列,为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列。被窃取任务线程永远从双端队列头部取任务执行,窃取任务线程永远从双端队列尾部拿任务执行。其运行流程图如下:
窃取算法的优点就是充分利用线程进行并行计算,减少了线程间的竞争。
其缺点就是,当双端队列里只有一个任务时,还时会存在竞争。而且该算法创建多个线程和多个双端队列,会消耗更过的系统资源。
Fork/Join框架的设计
Fork/Join框架有以下两个步骤:
- 分割任务:首先我们需要有一个fork类来把大任务分割成子任务,如果子任务仍然很大,还需要不停分割,直到分割出的子任务足够小。
- 执行任务并合并结果:分割的子任务分别放在双端队列,然后启动线程分别从队列取任务执行,执行完的结果统一放在一个队列里,启动一个线程从队列里拿数据并合并数据。
Fork/Join使用两个类来完成上述工作:
- ForkJoinTask:要使用ForkJoin框架,必须先创建一个ForkJoin任务。它提供在任务中执行fork和join操作机制,在使用时,我们通常继承ForkJoinTask的子类:RecursiveAction或RecursiveTask,两者区别是RecursiveAction用于没有返回结果的任务,RecursiveTask用于有返回结果的任务。
- ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。被分割出来的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部窃取一个任务执行。
使用Fork/Join框架
package com.lipeng.sixth;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* ForkJoind框架Demo.计算1+2+3+4
* @author LiPeng
*
*/
public class ForkJoinDemo extends RecursiveTask<Integer>{
private static final int THERSHOLD=2; //阈值为2
private int start;
private int end;
public ForkJoinDemo(int start, int end) {
super();
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
//任务小于阈值,不需要拆分
if(end-start<=THERSHOLD){
int sum=0;
for(int i=start;i<=end;++i){
sum+=i;
}
return sum;
}else{
//拆分为两个子任务
int middle=(end+start)/2;
ForkJoinDemo fork1=new ForkJoinDemo(start,middle);
ForkJoinDemo fork2=new ForkJoinDemo(middle+1,end);
fork1.fork();
fork2.fork();
//合并任务结果
return fork1.join()+fork2.join();
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool pool=new ForkJoinPool();
//创建任务计算1+2+3+4
ForkJoinTask<Integer> task=pool.submit(new ForkJoinDemo(1,4));
System.out.println(task.get());
}
}
ForkJoinTask与一般任务主要区别在于它需要实现compute方法,在这个方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就分割成两个子任务,每个子任务在调用fork方法是,又会递归的进入compute方法,查看当前子任务是否需要继续分割。
Fork/Join框架的异常处理
ForkJoinTask在执行时可能抛出异常,但我们无法再主线程里捕捉异常,但可以通过ForkJoinTask的isCompletedAbnnormally()方法来检查任务是否已经抛出异常或已经被取消,调用getException()可以获取异常,此方法返回Throwable对象,如果任务取消则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。
实现原理
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
- ForkJoinTask的fork方法实现原理
当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步地执行这个任务,然后立即返回结果。
pushTask方法把当前任务存放在ForkJoinTask数组队列里,然后调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务。
- ForkJoinTask的join方法实现原理
Join方法的主要作用是阻塞当前线程并等待获取结果。
首先,它调用doJoin()方法获取当前任务状态,任务有4中状态:已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL)。如果任务状态是已完成,则直接返回任务结果。如果任务被取消,则抛出CancellationException。如果任务抛出异常,则直接抛出对应异常。
doJoin()方法源代码:
在doJoin()方法中,会查看任务状态,如果任务已经执行完成,则直接返回任务状态,如果没有执行完成,则从任务数组里取出任务并执行。如果顺利执行完毕,则将任务状态设置为NORMAL,如果出现异常,则将任务状态设置为EXCEPTION。
【备注】:本文图片均摘自《Java并发编程的艺术》·方腾飞,若本文有错或不恰当的描述,请各位不吝斧正。谢谢!
推荐阅读
-
Java并发编程,互斥同步和线程之间的协作
-
Java并发编程的艺术-----Java并发编程基础(线程间通信)
-
《Java并发编程的艺术》笔记
-
<
>-阅读笔记和思维导图 -
[转]Java7中的ForkJoin并发框架初探(上)——需求背景和设计原理
-
Java并发编程中CountDownLatch和CyclicBarrier的使用
-
Java多线程并发编程中并发容器第二篇之List的并发类讲解
-
干货!4面阿里java后端,才发现并发编程和JVM是必不可少,特此分享给为金九银十备战的你们
-
读书笔记之《Java 并发编程的艺术》
-
Java并发编程:Java线程池核心ThreadPoolExecutor的使用和原理分析