#####################################################################
我们知道多线程操作共享资源时,会出现三个问题:可见性、有序性以及原子性。
一般情况下,我们采用synchronized同步锁(独占锁、互斥锁),即同一时间只有一个线程能够修改共享变量,其他线程必须等待。但是这样的话就相当于单线程,体现不出来多线程的优势。
那么我们有没有另一种方式来解决这三个问题呢?
在上一章中,我们提到了一个volatile关键字,它可以解决可见性和有序性的问题。而且如果操作的共享变量是基本数据类型,并且同一时间只对变量进行读取或者写入的操作,那么原子性问题也得到了解决,就不会产生多线程问题了。
但是通常,我们都要先读取共享变量,然后操作共享变量,最后写入共享变量,那么这个时候怎么保证整个操作的原子性呢?一种解决方式就是CAS技术。
CAS(Compare and Swap)即比较并交换。在讲解这个之前,先了解两个重要概念:悲观锁与乐观锁。
一. 悲观锁与乐观锁
- 悲观锁: 假定会发生并发冲突,即共享资源会被某个线程更改。所以当某个线程获取共享资源时,会阻止别的线程获取共享资源。也称独占锁或者互斥锁,例如java中的synchronized同步锁。
- 乐观锁: 假设不会发生并发冲突,只有在最后更新共享资源的时候会判断一下在此期间有没有别的线程修改了这个共享资源。如果发生冲突就重试,直到没有冲突,更新成功。CAS就是一种乐观锁实现方式。
悲观锁会阻塞其他线程。乐观锁不会阻塞其他线程,如果发生冲突,采用死循环的方式一直重试,直到更新成功。
二. CAS的实现原理
CAS的原理很简单,包含三个值当前内存值(V)、预期原来的值(A)以及期待更新的值(B)。
如果内存位置V的值与预期原值A相匹配,那么处理器会自动将该位置值更新为新值B,返回true。否则处理器不做任何操作,返回false。
实现CAS最重要的一点,就是比较和交换操作的一致性,否则就会产生歧义。
比如当前线程比较成功后,准备更新共享变量值的时候,这个共享变量值被其他线程更改了,那么CAS函数必须返回false。
要实现这个需求,java中提供了Unsafe类,它提供了三个函数,分别用来操作基本类型int和long,以及引用类型Object
public final native boolean compareAndSwapObject
(Object obj, long valueOffset, Object expect, Object update);
public final native boolean compareAndSwapInt
(Object obj, long valueOffset, int expect, int update);
public final native boolean compareAndSwapLong
(Object obj, long valueOffset, long expect, long update);
参数的意义:
- obj 和 valueOffset:表示这个共享变量的内存地址。这个共享变量是obj对象的一个成员属性,valueOffset表示这个共享变量在obj类中的内存偏移量。所以通过这两个参数就可以直接在内存中修改和读取共享变量值。
- expect: 表示预期原来的值。
- update: 表示期待更新的值。
接下来我们来看看java并发框架下的atomic包是如何使用CAS的。
三. JUC并发框架下的原子类(atomic)
调用JUC并发框架下原子类的方法时,不需要考虑多线程问题。那么我们分析它是怎么解决多线程问题的。以AtomicInteger类为例
3.1 成员变量
// 通过它来实现CAS操作的。因为是int类型,所以调用它的compareAndSwapInt方法
private static final Unsafe unsafe = Unsafe.getUnsafe();
// value这个共享变量在AtomicInteger对象上内存偏移量,
// 通过它直接在内存中修改value的值,compareAndSwapInt方法中需要这个参数
private static final long valueOffset;
// 通过静态代码块,在AtomicInteger类加载时就会调用
static {
try {
// 通过unsafe类,获取value变量在AtomicInteger对象上内存偏移量
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
// 共享变量,AtomicInteger就保证了对它多线程操作的安全性。
// 使用volatile修饰,解决了可见性和有序性问题。
private volatile int value;
有三个重要的属性:
- unsafe: 通过它实现CAS操作,因为共享变量是int类型,所以调用compareAndSwapInt方法。
- valueOffset: 共享变量value在AtomicInteger对象上内存偏移量
- value: 共享变量,使用volatile修饰,解决了可见性和有序性问题。
3.2 重要方法
3.2.1 get与set方法
// 直接读取。因为是volatile关键子修饰的,总是能看到(任意线程)对这个volatile变量最新的写入
public final int get() {
return value;
}
// 直接写入。因为是volatile关键子修饰的,所以它修改value变量也会立即被别的线程读取到。
public final void set(int newValue) {
value = newValue;
}
因为value变量是volatile关键字修饰的,它总是能读取(任意线程)对这个volatile变量最新的写入。它修改value变量也会立即被别的线程读取到。
3.2.2 compareAndSet方法
// 如果value变量的当前值(内存值)等于期望值(expect),那么就把update赋值给value变量,返回true。
// 如果value变量的当前值(内存值)不等于期望值(expect),就什么都不做,返回false。
// 这个就是CAS操作,使用unsafe.compareAndSwapInt方法,保证整个操作过程的原子性
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
通过调用unsafe的compareAndSwapInt方法实现CAS函数的。但是CAS函数只能保证比较并交换操作的原子性,但是更新操作并不一定会执行。比如我们想让共享变量value自增。
共享变量value自增是三个操作,1.读取value值,2.计算value+1的值,3.将value+1的值赋值给value。分析这三个操作:
- 读取value值,因为value变量是volatile关键字修饰的,能够读取到任意线程对它最后一次修改的值,所以没问题。
- 计算value+1的值:这个时候就有问题了,可能在计算这个值的时候,其他线程更改了value值,因为没有加同步锁,所以其他线程可以更改value值。
- 将value+1的值赋值给value: 使用CAS函数,如果返回false,说明在当前线程读取value值到调用CAS函数方法前,共享变量被其他线程修改了,那么value+1的结果值就不是我们想要的了,因为要重新计算。
3.2.3 getAndAddInt方法
public final int getAndAddInt(Object obj, long valueOffset, int var) {
int expect;
// 利用循环,直到更新成功才跳出循环。
do {
// 获取value的最新值
expect = this.getIntVolatile(obj, valueOffset);
// expect + var表示需要更新的值,如果compareAndSwapInt返回false,说明value值被其他线程更改了。
// 那么就循环重试,再次获取value最新值expect,然后再计算需要更新的值expect + var。直到更新成功
} while(!this.compareAndSwapInt(obj, valueOffset, expect, expect + var));
// 返回当前线程在更改value成功后的,value变量原先值。并不是更改后的值
return expect;
}
这个方法在Unsafe类中,利用do_while循环,先利用当前值,计算更新值,然后通过compareAndSwapInt方法设置value变量,如果compareAndSwapInt方法返回失败,表示value变量的值被别的线程更改了,所以循环获取value变量最新值,再通过compareAndSwapInt方法设置value变量。直到设置成功。跳出循环,返回更新前的值。
// 将value的值当前值的基础上加1,并返回当前值
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
// 将value的值当前值的基础上加-1,并返回当前值
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
// 将value的值当前值的基础上加delta,并返回当前值
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
// 将value的值当前值的基础上加1,并返回更新后的值(即当前值加1)
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
// 将value的值当前值的基础上加-1,并返回更新后的值(即当前值加-1)
public final int decrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}
// 将value的值当前值的基础上加delta,并返回更新后的值(即当前值加delta)
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
都是利用unsafe.getAndAddInt方法实现的。
四.重要示例
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
class Data {
AtomicInteger num;
public Data(int num) {
this.num = new AtomicInteger(num);
}
public int getAndDecrement() {
return num.getAndDecrement();
}
}
class MyRun implements Runnable {
private Data data;
// 用来记录所有卖出票的编号
private List<Integer> list;
private CountDownLatch latch;
public MyRun(Data data, List<Integer> list, CountDownLatch latch) {
this.data = data;
this.list = list;
this.latch = latch;
}
@Override
public void run() {
try {
action();
} finally {
// 释放latch共享锁
latch.countDown();
}
}
// 进行买票操作,注意这里没有使用data.num>0作为判断条件,直到卖完线程退出。
// 那么做会导致这两处使用了共享变量data.num,那么做多线程同步时,就要考虑更多条件。
// 这里只for循环了5次,表示每个线程只卖5张票,并将所有卖出去编号存入list集合中。
public void action() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
int newNum = data.getAndDecrement();
System.out.println("线程"+Thread.currentThread().getName()+" num=="+newNum);
list.add(newNum);
}
}
}
public class ThreadTest {
public static void startThread(Data data, String name, List<Integer> list,CountDownLatch latch) {
Thread t = new Thread(new MyRun(data, list, latch), name);
t.start();
}
public static void main(String[] args) {
// 使用CountDownLatch来让主线程等待子线程都执行完毕时,才结束
CountDownLatch latch = new CountDownLatch(6);
long start = System.currentTimeMillis();
// 这里用并发list集合
List<Integer> list = new CopyOnWriteArrayList();
Data data = new Data(30);
startThread(data, "t1", list, latch);
startThread(data, "t2", list, latch);
startThread(data, "t3", list, latch);
startThread(data, "t4", list, latch);
startThread(data, "t5", list, latch);
startThread(data, "t6", list, latch);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 处理一下list集合,进行排序和翻转
Collections.sort(list);
Collections.reverse(list);
System.out.println(list);
long time = System.currentTimeMillis() - start;
// 输出一共花费的时间
System.out.println("\n主线程结束 time=="+time);
}
}
结果输出
线程t1 num==30
线程t2 num==29
线程t3 num==28
线程t5 num==26
线程t4 num==27
线程t6 num==25
线程t1 num==24
线程t2 num==23
线程t6 num==22
线程t4 num==19
线程t5 num==20
线程t3 num==21
线程t2 num==18
线程t3 num==13
线程t5 num==14
线程t1 num==15
线程t6 num==17
线程t4 num==16
线程t2 num==12
线程t1 num==9
线程t5 num==10
线程t3 num==11
线程t4 num==7
线程t6 num==8
线程t5 num==5
线程t4 num==4
线程t1 num==6
线程t2 num==3
线程t3 num==2
线程t6 num==1
[30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1]
主线程结束 time==58
我们使用AtomicInteger,代替同步锁来解决多线程安全的。
######################################################################
1、举例说明。
假设现在线程A和线程B同时执行getAndAdd操作:
1.AtomicInteger里面的value原始值为3,即主内存中AtomicInteger的value为3,根据Java内存模型,线程A和线程B各自持有一份value的副本,值为3。
2.线程A通过getIntVolatile(var1, var2)方法获取到value值3,线程切换,线程A挂起。
3.线程B通过getIntVolatile(var1, var2)方法获取到value值3,并利用compareAndSwapInt方法比较内存值也为3,比较成功,修改内存值为2,线程切换,线程B挂起。
4.线程A恢复,利用compareAndSwapInt方法比较,发现手里的值3和内存值2不一致,此时value正在被另外一个线程修改,线程A不能修改value值。
5.线程的compareAndSwapInt实现,循环判断,重新获取value值,因为value是volatile变量,所以线程对它的修改,线程A总是能够看到。线程A继续利用compareAndSwapInt进行比较并替换,直到compareAndSwapInt修改成功返回true。
整个过程中,利用CAS保证了对于value的修改的线程安全性。
不过由于CAS编码确实稍微复杂,而且jdk作者本身也不希望你直接使用unsafe(后面会讲到)来进行代码的编写,所以如果不能深刻理解CAS以及unsafe还是要慎用,使用一些别人已经实现好的无锁类或者框架就好了。
2、CAS问题。
CAS虽然很高效的解决原子操作,但是CAS仍然存在三大问题。
(1)ABA问题。
循环时间长开销大和只能保证一个共享变量的原子操作ABA问题。因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
(2)循环时间长开销大。
自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。
(3)只能保证对一个共享变量的原子操作。
当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。
##########################################################################
##########################################################################