CAS与ABA问题产生和优雅解决
本人免费整理了java高级资料,涵盖了java、redis、mongodb、mysql、zookeeper、spring cloud、dubbo高并发分布式等教程,一共30g,需要自己领取。
传送门:https://mp.weixin.qq.com/s/jzddfh-7ynudmkjt0irl8q
独占锁:是一种悲观锁,synchronized就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。
乐观锁:每次不加锁,假设没有冲突去完成某项操作,如果因为冲突失败就重试,直到成功为止。
一、cas 操作
乐观锁用到的机制就是cas,compare and swap。
cas有3个操作数,内存值v,旧的预期值a,要修改的新值b。当且仅当预期值a和内存值v相同时,将内存值v修改为b,否则什么都不做。
1、非阻塞算法 (nonblocking algorithms)
一个线程的失败或者挂起不应该影响其他线程的失败或挂起的算法。
现代的cpu提供了特殊的指令,可以自动更新共享数据,而且能够检测到其他线程的干扰,而 compareandset() 就用这些代替了锁定。
2、atomicinteger示例
拿出atomicinteger来研究在没有锁的情况下是如何做到数据正确性的。
private volatile int value;
在没有锁的机制下需要借助volatile原语,保证线程间的数据是可见的(共享的)。
这样才获取变量的值的时候才能直接读取。
public final int get() { return value; }
然后来看看 ++i 是怎么做到的。
public final int incrementandget() { for (;;) { int current = get(); int next = current + 1; if (compareandset(current, next)) return next; } }
在这里采用了cas操作,每次从内存中读取数据然后将此数据和+1后的结果进行cas操作,如果成功就返回结果,否则重试直到成功为止。
而compareandset利用jni来完成cpu指令的操作。
public final boolean compareandset(int expect, int update) { return unsafe.compareandswapint(this, valueoffset, expect, update); }
整体的过程就是这样子的,利用cpu的cas指令,同时借助jni来完成java的非阻塞算法。其它原子操作都是利用类似的特性完成的。
而整个j.u.c都是建立在cas之上的,因此对于synchronized阻塞算法,j.u.c在性能上有了很大的提升。参考资料的文章中介绍了如果利用cas构建非阻塞计数器、队列等数据结构。
二、aba问题
cas看起来很爽,但是会导致“aba问题”。
cas算法实现一个重要前提需要取出内存中某时刻的数据,而在下时刻比较并替换,那么在这个时间差类会导致数据的变化。
比如说一个线程one从内存位置v中取出a,这时候另一个线程two也从内存中取出a,并且two进行了一些操作变成了b,然后two又将v位置的数据变成a,这时候线程one进行cas操作发现内存中仍然是a,然后one操作成功。尽管线程one的cas操作成功,但是不代表这个过程就是没有问题的。
如果链表的头在变化了两次后恢复了原值,但是不代表链表就没有变化。因此前面提到的原子操作atomicstampedreference/atomicmarkablereference就很有用了。这允许一对变化的元素进行原子操作。
在运用cas做lock-free操作中有一个经典的aba问题:
线程1准备用cas将变量的值由a替换为b,在此之前,线程2将变量的值由a替换为c,又由c替换为a,然后线程1执行cas时发现变量的值仍然为a,所以cas成功。但实际上这时的现场已经和最初不同了,尽管cas成功,但可能存在潜藏的问题,例如下面的例子:
现有一个用单向链表实现的堆栈,栈顶为a,这时线程t1已经知道a.next为b,然后希望用cas将栈顶替换为b:
head.compareandset(a,b);
在t1执行上面这条指令之前,线程t2介入,将a、b出栈,再pushd、c、a,此时堆栈结构如下图,而对象b此时处于游离状态:
此时轮到线程t1执行cas操作,检测发现栈顶仍为a,所以cas成功,栈顶变为b,但实际上b.next为null,所以此时的情况变为:
其中堆栈中只有b一个元素,c和d组成的链表不再存在于堆栈中,平白无故就把c、d丢掉了。
以上就是由于aba问题带来的隐患,各种乐观锁的实现中通常都会用版本戳version来对记录或对象标记,避免并发操作带来的问题,在java中,atomicstampedreference<e>也实现了这个作用,它通过包装[e,integer]的元组来对对象标记版本戳stamp,从而避免aba问题,例如下面的代码分别用atomicinteger和atomicstampedreference来对初始值为100的原子整型变量进行更新,atomicinteger会成功执行cas操作,而加上版本戳的atomicstampedreference对于aba问题会执行cas失败:
package concur.lock; import java.util.concurrent.timeunit; import java.util.concurrent.atomic.atomicinteger; import java.util.concurrent.atomic.atomicstampedreference; public class aba { private static atomicinteger atomicint = new atomicinteger(100); private static atomicstampedreference<integer> atomicstampedref = new atomicstampedreference<integer>(100, 0); public static void main(string[] args) throws interruptedexception { thread intt1 = new thread(new runnable() { @override public void run() { atomicint.compareandset(100, 101); atomicint.compareandset(101, 100); } }); thread intt2 = new thread(new runnable() { @override public void run() { try { timeunit.seconds.sleep(1); } catch (interruptedexception e) { e.printstacktrace(); } boolean c3 = atomicint.compareandset(100, 101); system.out.println(c3); //true } }); intt1.start(); intt2.start(); intt1.join(); intt2.join(); thread reft1 = new thread(new runnable() { @override public void run() { try { timeunit.seconds.sleep(1); } catch (interruptedexception e) { e.printstacktrace(); } atomicstampedref.compareandset(100, 101, atomicstampedref.getstamp(), atomicstampedref.getstamp()+1); atomicstampedref.compareandset(101, 100, atomicstampedref.getstamp(), atomicstampedref.getstamp()+1); } }); thread reft2 = new thread(new runnable() { @override public void run() { int stamp = atomicstampedref.getstamp(); system.out.println("before sleep : stamp = " + stamp); // stamp = 0 try { timeunit.seconds.sleep(2); } catch (interruptedexception e) { e.printstacktrace(); } system.out.println("after sleep : stamp = " + atomicstampedref.getstamp());//stamp = 1 boolean c3 = atomicstampedref.compareandset(100, 101, stamp, stamp+1); system.out.println(c3); //false } }); reft1.start(); reft2.start(); } }