欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

CAS与ABA问题产生和优雅解决

程序员文章站 2022-07-04 22:49:24
独占锁:是一种悲观锁,synchronized就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。 乐观锁:每次不加锁,假设没有冲突去完成某项操作,如果因为冲突失败就重试,直到成功为止。 ......

本人免费整理了java高级资料,涵盖了java、redis、mongodb、mysql、zookeeper、spring cloud、dubbo高并发分布式等教程,一共30g,需要自己领取。
传送门:https://mp.weixin.qq.com/s/jzddfh-7ynudmkjt0irl8q

 

独占锁:是一种悲观锁,synchronized就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。

乐观锁:每次不加锁,假设没有冲突去完成某项操作,如果因为冲突失败就重试,直到成功为止。

一、cas 操作

乐观锁用到的机制就是cas,compare and swap。

cas有3个操作数,内存值v,旧的预期值a,要修改的新值b。当且仅当预期值a和内存值v相同时,将内存值v修改为b,否则什么都不做。

1、非阻塞算法 (nonblocking algorithms)

一个线程的失败或者挂起不应该影响其他线程的失败或挂起的算法。

现代的cpu提供了特殊的指令,可以自动更新共享数据,而且能够检测到其他线程的干扰,而 compareandset() 就用这些代替了锁定。

2、atomicinteger示例

拿出atomicinteger来研究在没有锁的情况下是如何做到数据正确性的。

private volatile int value;

 

在没有锁的机制下需要借助volatile原语,保证线程间的数据是可见的(共享的)。

这样才获取变量的值的时候才能直接读取。

public final int get() {
    return value;
}

 

然后来看看 ++i 是怎么做到的。

public final int incrementandget() {
    for (;;) {
        int current = get();
        int next = current + 1;
        if (compareandset(current, next))
            return next;
    }
}

 

在这里采用了cas操作,每次从内存中读取数据然后将此数据和+1后的结果进行cas操作,如果成功就返回结果,否则重试直到成功为止。

而compareandset利用jni来完成cpu指令的操作。

public final boolean compareandset(int expect, int update) {   
    return unsafe.compareandswapint(this, valueoffset, expect, update);
}

 

整体的过程就是这样子的,利用cpu的cas指令,同时借助jni来完成java的非阻塞算法。其它原子操作都是利用类似的特性完成的。

而整个j.u.c都是建立在cas之上的,因此对于synchronized阻塞算法,j.u.c在性能上有了很大的提升。参考资料的文章中介绍了如果利用cas构建非阻塞计数器、队列等数据结构。

二、aba问题

cas看起来很爽,但是会导致“aba问题”。

cas算法实现一个重要前提需要取出内存中某时刻的数据,而在下时刻比较并替换,那么在这个时间差类会导致数据的变化。

比如说一个线程one从内存位置v中取出a,这时候另一个线程two也从内存中取出a,并且two进行了一些操作变成了b,然后two又将v位置的数据变成a,这时候线程one进行cas操作发现内存中仍然是a,然后one操作成功。尽管线程one的cas操作成功,但是不代表这个过程就是没有问题的。

如果链表的头在变化了两次后恢复了原值,但是不代表链表就没有变化。因此前面提到的原子操作atomicstampedreference/atomicmarkablereference就很有用了。这允许一对变化的元素进行原子操作。

在运用cas做lock-free操作中有一个经典的aba问题:

线程1准备用cas将变量的值由a替换为b,在此之前,线程2将变量的值由a替换为c,又由c替换为a,然后线程1执行cas时发现变量的值仍然为a,所以cas成功。但实际上这时的现场已经和最初不同了,尽管cas成功,但可能存在潜藏的问题,例如下面的例子:

CAS与ABA问题产生和优雅解决

现有一个用单向链表实现的堆栈,栈顶为a,这时线程t1已经知道a.next为b,然后希望用cas将栈顶替换为b:

head.compareandset(a,b);

在t1执行上面这条指令之前,线程t2介入,将a、b出栈,再pushd、c、a,此时堆栈结构如下图,而对象b此时处于游离状态:

CAS与ABA问题产生和优雅解决

此时轮到线程t1执行cas操作,检测发现栈顶仍为a,所以cas成功,栈顶变为b,但实际上b.next为null,所以此时的情况变为:

CAS与ABA问题产生和优雅解决

其中堆栈中只有b一个元素,c和d组成的链表不再存在于堆栈中,平白无故就把c、d丢掉了。

以上就是由于aba问题带来的隐患,各种乐观锁的实现中通常都会用版本戳version来对记录或对象标记,避免并发操作带来的问题,在java中,atomicstampedreference<e>也实现了这个作用,它通过包装[e,integer]的元组来对对象标记版本戳stamp,从而避免aba问题,例如下面的代码分别用atomicinteger和atomicstampedreference来对初始值为100的原子整型变量进行更新,atomicinteger会成功执行cas操作,而加上版本戳的atomicstampedreference对于aba问题会执行cas失败:

package concur.lock;

import java.util.concurrent.timeunit;
import java.util.concurrent.atomic.atomicinteger;
import java.util.concurrent.atomic.atomicstampedreference;

public class aba {
    
    private static atomicinteger atomicint = new atomicinteger(100);
    private static atomicstampedreference<integer> atomicstampedref = 
            new atomicstampedreference<integer>(100, 0);
    
    public static void main(string[] args) throws interruptedexception {
        thread intt1 = new thread(new runnable() {
            @override
            public void run() {
                atomicint.compareandset(100, 101);
                atomicint.compareandset(101, 100);
            }
        });
        
        thread intt2 = new thread(new runnable() {
            @override
            public void run() {
                try {
                    timeunit.seconds.sleep(1);
                } catch (interruptedexception e) {
                    e.printstacktrace();
                }
                boolean c3 = atomicint.compareandset(100, 101);
                system.out.println(c3);        //true
            }
        });
        
        intt1.start();
        intt2.start();
        intt1.join();
        intt2.join();
        
        thread reft1 = new thread(new runnable() {
            @override
            public void run() {
                try {
                    timeunit.seconds.sleep(1);
                } catch (interruptedexception e) {
                    e.printstacktrace();
                }
                atomicstampedref.compareandset(100, 101, 
                        atomicstampedref.getstamp(), atomicstampedref.getstamp()+1);
                atomicstampedref.compareandset(101, 100, 
                        atomicstampedref.getstamp(), atomicstampedref.getstamp()+1);
            }
        });
        
        thread reft2 = new thread(new runnable() {
            @override
            public void run() {
                int stamp = atomicstampedref.getstamp();
                system.out.println("before sleep : stamp = " + stamp);    // stamp = 0
                try {
                    timeunit.seconds.sleep(2);
                } catch (interruptedexception e) {
                    e.printstacktrace();
                }
                system.out.println("after sleep : stamp = " + atomicstampedref.getstamp());//stamp = 1
                boolean c3 = atomicstampedref.compareandset(100, 101, stamp, stamp+1);
                system.out.println(c3);        //false
            }
        });
        
        reft1.start();
        reft2.start();
    }

}