欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

AQS、CAS详解

程序员文章站 2022-03-04 19:13:34
...

AQS详解

原子性操作自:原子性在一个操作是不可中断的,要么全部执行成功要么全部执行失败,有着“同生共死”的感觉。及时在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程所干扰2113

CAS:

全称(Compare And Swap),比较交换,Unsafe类是CAS的核心类,提供硬件级别的原子操作

CAS 中有三个参数:内存值 V、旧的预期值 E、要更新的值 N ,当且仅当内存值 V 的值等于旧的预期值 A 时,才会将内存值V的值修改为 B ,否则什么都不干。

CAS比较与交换的伪代码可以表示为:

do{
		备份旧数据;
		基于旧数据构造新数据;
}while(!CAS( 内存地址,备份的旧数据,新数据 ))

AQS、CAS详解

但是存在这样一种情况:如果一个值原来是A,变成了B,然后又变成了A,那么在CAS检查的时候会发现没有改变,但是实质上它已经发生了改变,这就是所谓的ABA问题。对于ABA问题其解决方案是加上版本号,即在每个变量都加上一个版本号,每次改变时加1,即A —> B —> A,变成1A —> 2B —> 3A。
ABA问题导致的原因,是CAS过程中只简单进行了“值”的校验,再有些情况下,“值”相同不会引入错误的业务逻辑(例如库存),有些情况下,“值”虽然相同,却已经不是原来的数据了。

AQS:

AQS:AbstractQuenedSynchronizer抽象的队列式同步器。是除了java自带的synchronized关键字之外的锁机制,它提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架。

AQS的核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态,如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列(FIFO)锁实现的,即将暂时获取不到锁的线程加入到队列中。

​ ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等并发类均是基于AQS来实现的,具体用法是通过继承AQS实现其模板方法,然后将子类作为同步组件的内部类。

同步状态:

AQS 的主要使用方式是继承,子类通过继承同步器,并实现它的抽象方法来管理同步状态。

AQS 使用一个 int 类型的成员变量 state表示同步状态

  • state > 0 时,表示已经获取了锁。
  • state = 0 时,表示释放了锁。

它提供了三个方法,来对同步状态 state 进行操作,并且 AQS 可以确保对 state 的操作是安全的:

  • #getState()
  • #setState(int newState)
  • #compareAndSetState(int expect, int update)

锁的独占与共享:

​ 独占锁:每次只能有一个线程能持有锁,ReentrantLock就是以独占方式实现的互斥锁。
​ 共享锁:允许多个线程同时获取锁,并发访问。

​ AQS提供了独占锁和共享锁必须实现的方法,具有独占锁功能的子类,它必须实现tryAcquire、tryRelease、isHeldExclusively等;共享锁功能的子类,必须实现tryAcquireShared和tryReleaseShared等方法,带有Shared后缀的方法都是支持共享锁加锁的语义。

独占锁获取锁时,设置节点模式为Node.EXCLUSIVE

​ 独占式获取同步状态。如果当前线程获取同步状态成功,则由该方法返回;否则,将会进入同步队列等待。该方法将会调用可重写#tryAcquire(int arg) 方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

#acquireInterruptibly(int arg):与 #acquire(int arg) 相同,但是该方法响应中断。当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException 异常并返回。

共享锁获取锁,节点模式则为Node.SHARED:
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

主要内置方法:

  1. #tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态。
  2. #tryRelease(int arg):独占式释放同步状态。
  3. #tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于 0 ,则表示获取成功;否则,获取失败。
  4. #tryReleaseShared(int arg):共享式释放同步状态。
  5. #isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占。
  6. #tryAcquireNanos(int arg, long nanos):超时获取同步状态。如果当前线程在 nanos 时间内没有获取到同步状态,那么将会返回 false ,已经获取则返回 true 。
  7. #acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
  8. #acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断。
  9. #tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制。
  10. #release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒。
  11. #releaseShared(int arg):共享式释放同步状态。

CLH同步队列:

AQS 通过内置的 FIFO 同步队列来完成资源获取线程的排队工作

  • 如果当前线程获取同步状态失败(锁)时,AQS 则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程
  • 当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。

AQS、CAS详解

​ CLH阻塞队列采用的是双向链表队列,头部节点默认获取资源获得执行权限。后续节点不断自旋方式查询前置节点是否执行完成,直到头部节点执行完成将自己的waitStatus状态修改以通知后续节点可以获取资源执行。

AQS中Node节点:

static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;
		//当该线程等待超时或者被中断,需要从同步队列中取消等待,则该线程被置1,即被取消
  	//节点进入了取消状态则不再变化
    static final int CANCELLED =  1;
		//后继的节点处于等待状态,当前节点的线程如果释放了同步状态或者被取消(当前节点状态置为-1)
  	//只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行
    static final int SIGNAL    = -1;
  	//该节点从等待队列中转移到同步队列中,加入到对同步状态的获取中
    static final int CONDITION = -2;
		//该状态标识结点的线程处于可运行状态
    static final int PROPAGATE = -3;
		
    volatile int waitStatus;
		//前驱节点,当节点加入同步队列的时候被设置(尾部添加)
    volatile Node prev;
		//后继节点
    volatile Node next;
		//获取同步状态的线程
    volatile Thread thread;
		//该节点唤醒后依据该节点的状态判断是否依据条件唤醒下一个节点
    Node nextWaiter;
		//检查当前节点是否为共享节点
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
		//查找前置节点是否存在
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
状态 判断结果 说明
waitStatus=0 代表初始化状态 该节点尚未被初始化完成
waitStatus>0 取消状态 说明该线程中断或者等待超时,需要移除该线程
waitStatus<0 有效状态 该线程处于可以被唤醒的状态
nextWaiter状态标志 说明
SHARED(共享模式) 直接唤醒下一个节点
EXCLUSIVE(独占模式) 等待当前线程执行完成后再唤醒
其他非空值 依据条件决定怎么唤醒下一个线程。类似semaphore中控制几个线程通过

​ 首先确定自己是否为头部节点,如果是头部节点则直接获取资源开始执行,如果不是则自旋前置节点直到前置节点执行完成状态修改为CANCELLED,然后断开前置节点的链接,获取资源开始执行。
AQS、CAS详解