Java并发编程之CAS
在Java并发编程的世界里,synchronized 和 Lock 是控制多线程并发环境下对共享资源同步访问的两大手段。其中 Lock 是 JDK 层面的锁机制,是轻量级锁,底层使用大量的自旋+CAS操作实现的。
学习并发推荐《Java并发编程的艺术》
那什么是CAS呢?CAS,compare and swap,即比较并交换,什么是比较并交换呢?在Lock锁的理念中,采用的是一种乐观锁的形式,即多线程去修改共享资源时,不是在修改之前就加锁,而是乐观的认为没有别的线程和自己争锁,就是通过CAS的理念去保障共享资源的安全性的。CAS的基本思想是,拿变量的原值和内存中的值进行比较,如果相同,则原值没有被修改过,那么就将原值修改为新值,这两步是原子的,能够保证同一时间只有一个线程修改成功。这就是CAS的理念。
Java中要想使用CAS原子的修改某值,怎么做呢?幸运的是Java提供了这样的API,就是在sun.misc.Unsafe.java类中。Unsafe,中文名不安全的,也被称为魔术类,魔法类。
Unsafe类介绍
Unsafe类使Java拥有了像C语言的指针一样操作内存空间的能力,一旦能够直接操作内存,这也就意味着
(1)不受JVM管理,意思就是使用Unsafe操作内存无法被JVM GC,需要我们手动GC,稍有不慎就会出现内存泄漏。
(2)Unsafe的不少方法中必须提供原始地址(内存地址)和被替换对象的地址,并且偏移量要自己计算(其提供的有计算偏移量的方法),所以一旦出现问题就是JVM崩溃级别的异常,会导致整个JVM实例崩溃,表现为应用程序直接crash掉。
(3)直接操作内存,所以速度更快,在高并发的条件之下能够很好地提高效率。
因此,从上面三个角度来看,虽然在一定程度上提升了效率但是也带来了指针的不安全性。这也是它被取名为Unsafe的原因吧。
下面我们深入到源码中看看,提供了什么方法直接操作内存。
打开Unsafe这个类,我们会发现里面有大量的被native关键字修饰的方法,这意味着这些方法是C语言提供的实现,底层调的是C语言的库函数,我们无法直接看到他的源码实现,需要去从OpenJDK去看了。另外还有一些基于native方法封装的其他方法,整个Unsafe中的方法大致可以归结为以下几类:
(1)初始化操作
(2)操作对象属性
(3)操作数组元素
(4)线程挂起和恢复
(5)CAS机制
CAS的使用
如果你学过java并发编程的话,稍微阅读过JUC并发包里面的源码的话,对这个Unsafe类一定不陌生,因为整个java并发包底层实现的核心就是靠它。JUC并发包中主要使用它提供的CAS(compare and swap,比较并交换)操作,原子的修改锁的状态和一些队列元素。
没看过JUC源码的读者也不用担心,今天我们就是简单介绍Unsafe类中的CAS操作,那么我们接下来就会通过一个简单的例子来看看Unsafe的CAS是怎么使用的。
首先,使用这个类我们第一个要做的事情就是拿到这个类的实例,下面我们自定义了一个Util类用来获取Unsafe的实例
import sun.misc.Unsafe;
import java.lang.reflect.Field;
public class UnsafeUtil {
public static Unsafe reflectGetUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
这个工具类通过反射的方式拿到Unsafe类中的一个名为theUnsafe字段,该字段是Unsafe类型,并在static块中new一个Unsafe对象初始化这个字段(单例模式)。
然后我们定义了一个AtomicState类,这个类很简单,有一个int型的state字段,还有一个Unsafe的常量,以及int型的offsetState,用来记录state字段在AtomicState对象中的偏移量。具体代码如下:
import com.walking.juc.util.UnsafeUtil;
import sun.misc.Unsafe;
public class AtomicState {
private volatile int state = 0;
public int getState() {
return state;
}
private static final Unsafe UNSAFE = UnsafeUtil.reflectGetUnsafe();
private static final long offsetState;
static {
try {
offsetState = UNSAFE.objectFieldOffset(AtomicState.class.getDeclaredField("state"));
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}
public final boolean compareAndSetState(int oldVal, int newVal) {
return UNSAFE.compareAndSwapInt(this, offsetState, oldVal, newVal);
}
}
我们定义了一个compareAndSetState
方法,需要传两个参数,分别是state的旧值和新值,也就是读到的state的之前的值,以及想要把它修改成什么值,该方法内部调用的是Unsafe类的compareAndSwapInt
方法,它有四个参数,分别是要修改的类实例对象、要修改的值的偏移量、旧值、新值。解释一下偏移量,刚才我们提到Unsafe提供给我们直接访问内存的能力,那么访问内存肯定是要知道内存的地址在哪才能去修改其相应的值吧,我们看,第一个参数是对象实例引用,也就是说,已经知道这个对象的地址了,那么我们想修改这个对象里的state的值,就只需要计算出state在这个对象的偏移量就能找到state所在的内存地址,那就可以修改它了。
然后,我们通过一个测试类来验证Unsafe的CAS操作。这个测试类我来解释下大致的思想,我们弄5个线程,让这个5个线程一个个启动,我们无法保证线程同时开始启动,那么我们有办法保证这个5个线程同时执行我们的代码,就是使用JUC包里的CyclicBarrier
工具来实现的,这个工具初始化时需要传入一个int值n,我们在线程的run方法内部在业务代码执行之前调用CyclicBarrie
r的await方法,当指定数量n的线程都调用了这个方法那么这n个线程将同时往下执行,就像设置了一个屏障,所有人都达到这个屏障后,一起通过屏障,依次来模拟多线程并发
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@Slf4j
public class TestAtomicState {
static int tNum = 5;//线程数 我们开10个线程模拟多线程并发
static CyclicBarrier cyclicBarrier = new CyclicBarrier(tNum);//栅栏
static CountDownLatch countDownLatch = new CountDownLatch(tNum);//计数器
static AtomicState atomicState = new AtomicState();
public static void main(String[] args) throws InterruptedException {
for (int i = 1; i <= tNum; i++) {
new Thread(new MyTask(),"t-"+i).start();
}
countDownLatch.await();//为的是让主线程在这句阻塞住,等待所有线程执行完毕(计数器减到0)再往下走
log.info("state最后的值:" + atomicState.getState());
}
static class MyTask implements Runnable{
@Override
public void run() {
try {
log.info(Thread.currentThread().getName() + "到达起跑线");
String name = Thread.currentThread().getName();
String substring = name.substring(name.indexOf("-") + 1);
int i1 = Integer.parseInt(substring);
cyclicBarrier.await();//设置一个屏障,所有线程达到这后开始一起往下执行 模拟并发
boolean b = atomicState.compareAndSetState(0, i1);
if (b) {
log.info("修改成功,tName:{}" ,Thread.currentThread().getName());
} else {
log.info("修改失败,tName:{}" ,Thread.currentThread().getName());
}
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();//线程执行完毕计数器减一
}
}
}
}
在cyclicBarrier.await();
之后我们调用AtomicState
的compareAndSetState
方法传入旧值0和新值,新值就是线程名t-n中的n,哪个线程修改成功,最后state值就是线程名中的数字。
至于CountDownLatch
使用它的目的是让mian线程等到t-1到t-5的线程全部执行完后打印state的值。我们的重点不是CyclicBarrier
和CountDownLatch
,知道它们是干什么的就行。
然后我们运行这个测试程序:
13:57:46.619 [t-2] INFO com.walking.castest.TestAtomicState - t-2到达起跑线
13:57:46.619 [t-3] INFO com.walking.castest.TestAtomicState - t-3到达起跑线
13:57:46.619 [t-5] INFO com.walking.castest.TestAtomicState - t-5到达起跑线
13:57:46.619 [t-1] INFO com.walking.castest.TestAtomicState - t-1到达起跑线
13:57:46.619 [t-4] INFO com.walking.castest.TestAtomicState - t-4到达起跑线
13:57:46.628 [t-1] INFO com.walking.castest.TestAtomicState - 修改失败,tName:t-1
13:57:46.628 [t-4] INFO com.walking.castest.TestAtomicState - 修改成功,tName:t-4
13:57:46.628 [t-2] INFO com.walking.castest.TestAtomicState - 修改失败,tName:t-2
13:57:46.628 [t-5] INFO com.walking.castest.TestAtomicState - 修改失败,tName:t-5
13:57:46.628 [t-3] INFO com.walking.castest.TestAtomicState - 修改失败,tName:t-3
13:57:46.636 [main] INFO com.walking.castest.TestAtomicState - state最后的值:4
可以看到只有一个线程执行成功,这就是CAS的基本使用。
CAS的ABA问题
何为ABA问题呢?举个例子,小明和小花合伙卖煎饼,不就后攒了10万元,他们一起去银行把钱存在他们公共的账户里,但是小明听说最近牛市来了,就偷偷的把钱转移到了股票市场,公共账户余额是0。1个月后股票赚了一笔钱,然后小明把之前转移的10万元又存到他们的公共账户。小明和小花一个月后又去存钱,去查账户余额是10万。这就是ABA问题,简单来说就是一个值本来是A,两个线程同时都看到是A,然后线程1把A改成B后又改成A,线程1结束了。然后线程2去修改时,看到的是A,无法感知到这个过程中值发生过变化,对于线程2来说就发生了ABA的问题。
模拟ABA问题:
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class CAS_ABA_Stampe {
static AtomicInteger atomicInteger = new AtomicInteger(10);
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
try {
log.info("{}拿到state的值为:{}", Thread.currentThread().getName(), atomicInteger.get());
log.info("{}第一次修改", Thread.currentThread().getName());
atomicInteger.getAndSet(0);
Thread.sleep(2000);
log.info("{}第二次修改", Thread.currentThread().getName());
atomicInteger.getAndSet(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1");
t1.start();
Thread t2 = new Thread(() -> {
try {
log.info("{}第一次拿到state的值为:{}", Thread.currentThread().getName(), atomicInteger.get());
Thread.sleep(2500);
log.info("{}第二次拿到state的值为:{}", Thread.currentThread().getName(), atomicInteger.get());
log.info("{}开始修改state的值为2", Thread.currentThread().getName());
atomicInteger.getAndSet(20);
log.info("{}修改成功", Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2");
t2.start();
t1.join();
t2.join();
log.info("最终state的值:{}", atomicInteger.get());
}
}
//结果t2也能修改成功,并没有发现这种变化
15:12:35.999 [t1] INFO com.walking.castest.CAS_ABA_Stampe - t1拿到state的值为:10 15:12:35.999 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2第一次拿到state的值为:10 15:12:36.014 [t1] INFO com.walking.castest.CAS_ABA_Stampe - t1第一次修改 15:12:38.015 [t1] INFO com.walking.castest.CAS_ABA_Stampe - t1第二次修改 15:12:38.515 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2第二次拿到state的值为:10 15:12:38.515 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2开始修改state的值为2 15:12:38.516 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2修改成功 15:12:38.516 [main] INFO com.walking.castest.CAS_ABA_Stampe - 最终state的值:20
怎么解决CAS的ABA问题呢?
那就是基于版本号去解决,增加一个版本号的概念,每次被修改这个版本号就加1,版本号是一直向前的,版本号变了,就说明被修改过。
JUC包中提供了解决ABA问题的工具:
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicStampedReference;
@Slf4j
public class CAS_ABA_Stampe {
static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<>(10, 1);
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
try {
int stamp = stampedReference.getStamp();
int intValue = stampedReference.getReference().intValue();
log.info("{}私挪公款拿到stamp的值为:{},余额:{}", Thread.currentThread().getName(), stamp,intValue);
stampedReference.compareAndSet(10, 0, stamp, stamp + 1);
Thread.sleep(2000);
stamp = stampedReference.getStamp();
intValue = stampedReference.getReference().intValue();
log.info("{}还回公款拿到stamp的值为:{},余额:{}", Thread.currentThread().getName(), stamp,intValue);
stampedReference.compareAndSet(0, 10, stamp, stamp + 1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1");
t1.start();
Thread t2 = new Thread(() -> {
try {
int stamp = stampedReference.getStamp();
int intValue = stampedReference.getReference().intValue();
log.info("{}拿到stamp的值为:{},余额:{}", Thread.currentThread().getName(), stamp, intValue);
Thread.sleep(3000);
log.info("{}开始存款", Thread.currentThread().getName());
if (stampedReference.compareAndSet(10, 20, stamp, stamp + 1)) {
log.info("{}款款成功", Thread.currentThread().getName());
}else {
log.info("{}存款失败,发现账户异常!!oldStamp:{},currentStamp:{}", Thread.currentThread().getName(),stamp,stampedReference.getStamp());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2");
t2.start();
t1.join();
t2.join();
log.info("最终账户余额:{}W", stampedReference.getReference().intValue());
}
}
运行结果:
15:32:37.488 [t1] INFO com.walking.castest.CAS_ABA_Stampe - t1私挪公款拿到stamp的值为:1,余额:10
15:32:37.476 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2拿到stamp的值为:1,余额:10
15:32:39.500 [t1] INFO com.walking.castest.CAS_ABA_Stampe - t1还回公款拿到stamp的值为:2,余额:0
15:32:40.498 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2开始存款
15:32:40.498 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2存款失败,发现账户异常!!oldStamp:1,currentStamp:3
15:32:40.498 [main] INFO com.walking.castest.CAS_ABA_Stampe - 最终账户余额:10W
t2存款时就发现账户异常,因为版本号已经变成了3,和t2刚开始拿到的不一样,说明已经被别人修改过,从而解决ABA问题。
到这里CAS就完啦。别忘了点赞,转发。
往期热文:
欢迎关注公众号,谢谢支持。
上一篇: 详解SWAP分区