欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

根据ReentrantLock -- 解析AQS原理

程序员文章站 2022-06-24 20:06:18
java.util.concurrent.locks.AbstractQueuedSynchronizer是什么aqs,这是一个队列同步器框架,JUC中的公平锁、非公平锁、重入锁都是以aqs作为基础框架的,定义了加锁、释放锁,加共享锁等一些逻辑AQS是一个抽象类,内部使用了一个FIFO的等待队列,用于多线程等待锁排队,通过state表示当前资源的加锁状态;aqs是基础类,类中定义了模板方法,只需要实现对应的模板方法即可;aqs的作者是Doug Leaaqs内部维护的双向队列,大致是这样的,其中...
java.util.concurrent.locks.AbstractQueuedSynchronizer

根据ReentrantLock -- 解析AQS原理

是什么

aqs,这是一个队列同步器框架,JUC中的公平锁、非公平锁、重入锁都是以aqs作为基础框架的,定义了加锁、释放锁,加共享锁等一些逻辑
AQS是一个抽象类,内部使用了一个FIFO的等待队列,用于多线程等待锁排队,通过state表示当前资源的加锁状态;
aqs是基础类,类中定义了模板方法,只需要实现对应的模板方法即可;aqs的作者是Doug Lea

aqs内部维护的双向队列,大致是这样的,其中,比较特殊的是:第一个节点对应的thread是null,链表中的第二个节点,才是第一个排队的线程;这里的意思是:第一个节点是正在执行的线程,无需排队;图中少画了waitStatus信息,每个节点都会有一个waitStatus信息,用来存储下一个节点的等待状态
根据ReentrantLock -- 解析AQS原理

AQS核心属性


volatile int waitStatus;
    0,这是初始化状态,新Node会处于这种状态,初始化状态
static final int CANCELLED =  1;
    因为超时或者中断,Node被设置为取消状态,被取消的Node不应该去竞争锁,只能保持取消状态不变,不能转换为其他状态,处于这种状态的Node会被踢出队列,被GC回收
static final int SIGNAL = -1;
    由当前节点在队列中的后一个节点将当前节点的waitStatus设置为-1,表示告诉当前节点,如果当前节点unLock之后,通知后面的节点执行锁竞争(通过unPark()来唤醒)
static final int CONDITION = -2;
    表示这个Node在条件队列中,因为等待某个条件而被阻塞
static final int PROPAGATE = -3;
    使用在共享模式头Node有可能处于这种状态, 表示锁的下一次获取可以无条件传播
    
    
volatile Node prev;
    表示队列中当前Node节点的前一个节点
    
volatile Node next;
    表示队列中当前node节点的后一个节点
    
volatile Thread thread;
    这个node持有的线程,在new Node()的时候,需要把线程传进去
    
Node nextWaiter;
    表示下一个等待condition的Node
    
private transient volatile Node head;
    FIFO队列中的头结点
    
private transient volatile Node tail;
    FIFO队列中的尾节点
    
private volatile int state;
同步状态,0表示未加锁;1表示有一个线程加锁,2表示有两个线程加锁,一般是重入锁的场景
getState():获取同步状态
setState():设置同步状态
compareAndSetState(): 利用CAS进行同步状态的设置
spinForTimeoutThreshold = 1000L; :线程自旋等待时间
private Node enq(final Node node) {}: 
	是将当前排队的node节点放到FIFO的队尾;如果队列为空,就在node节点前面创建一个空节点,然后将node节点放到队尾

AQS使用说明

1、aqs中的node节点在排队的时候,waitStatus是0,在下一个排队的节点进来的时候,会把上一个节点的waitStatus设置为-1
2、笔记中说的aqs阻塞和唤醒指的是park()和unpark(); unpark()之后,在哪里park(),就从哪一行代码接着往下执行
3、笔记中说的第一个排队的节点指的是head节点的next;AQS队列中的一个节点head对应的thread永远是null
4、对于非公平锁,加锁失败,去排队之后,就不存在插队的情况;我们所说的加锁,其实就是尝试将state从0变成1;如果是重入锁,那就是在1的基础上再加1
5、AQS为什么要创建一个虚拟空节点?
因为在队列里面,每一个节点都要将前一个节点的waitStatus设置为-1,只有在前一个节点是-1的时候,在前一个节点释放锁的时候,会唤醒后面排队的线程;
那第一个节点没有前置节点,所以,就创建一个空节点,空节点可以理解为当前在执行的线程对应的node节点,在当前线程释放资源之后,会根据head的ws来判断是否需要唤醒下一个排队线程(也可以理解为第一个节点是当前执行线程的站队节点)

源码

我们通过对ReentrantLock的源码解析,来记录AQS的源码

ReentrantLock.lock()

ReentrantLock重入锁分为了公平锁和非公平锁,两者的区别是:在尝试加锁的时候,公平锁会判断当前线程是否可以加锁,如果可以加锁,就尝试cas,否则就去排队;非公平锁是,无论是否可以加锁,都强制cas加锁,加锁失败,就去排队

公平锁尝试加锁
/**
* 公平锁加锁流程:
*  1、先尝试加锁
*   1.1、如果加锁成功,就返回,流程结束
*   1.2、如果加锁失败,就判断是否可重入,可重入,就加锁成功,流程结束
*   1.3、如果未加锁成功,且不可重入,就去排队
*  2、排队addWaiter()
*   2.1、在排队的时候,先判断当前是否有在排队的节点,或者是否有空白的node节点;如果有,就直接将当前线程加入到队列中排队
*   2.2、如果没有在排队的节点、或者没有空白节点,就先new一个空白节点,插入到队头,然后将当前线程插入到队列,并设置为队尾
*  3、acquireQueued()方法:将排队节点的上一个节点的waitStatus设置为-1,然后进行park()
*
*
* 这里是尝试加锁,如果加锁失败,就放入到队列中
*
* 返回true,表示加锁成功,无需排队
* 如果tryAcquire返回false,表示需要去排队
*
*
* @param arg
*/
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

我们分开来看这几个方法

/**
* 这个方法的作用是:判断当前线程是否可以加锁,
*   如果未加锁,尝试加锁,加锁成功,就无需排队,加锁失败,就去排队
*   如果已经加过锁了,判断是否可重入,可重入,就state+1;不可重入就去排队
* 拿到当前是否已经加锁的标识:state(为0,表示未加锁;为1表示已经加锁)
* 1.如果未加锁:
*   1.1 尝试加锁,如果加锁成功,使用cas更新state的值
*     hasQueuedPredecessors();只有这个方法返回false,才表示当前线程可以加锁;否则,就会去排队
* 2.如果已经加锁
*   2.1 判断当前锁和已经加锁的是否是同一把锁,如果是同一把锁,可以重入,重入需要对state+1
* @param acquires
* @return
*/
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //state为0表示当前没有线程加锁
    if (c == 0) {
        /**
             * hasQueuedPredecessors();判断当前线程是否可以加锁,如果返回false,表示可以加锁,返回true,表示需要排队
             * compareAndSetState:尝试加锁,加锁就是把aqs的state+1
             * setExclusiveOwnerThread:将当前线程设置为持有锁的线程
             */
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    /**
         * 如果当前已经有线程加锁了,判断当前加锁的线程和持有锁的线程是否是同一个,也就是所谓的锁重入
         * 如果是重入,就把state+1
         */
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

在尝试加锁的方法中,有一个最为核心的方法,就是hasQueuedPredecessors();这个方法,该方法是来判断当前线程是否可以加锁,如果这个方法返回false,表示可以加锁;如果返回true,表示需要去排队

/**
* 只有这个方法返回false,表示当前线程可以加锁,无需排队;如果返回true,表示需要排队
* 由于这里是&判断条件,所以,只有两个条件都为true,返回返回true;否则就是false
* 那我们来说不需要排队,可以直接加锁的场景:
* h != t 这个判断false有两种场景
*  1、当前队列中,还没有排队的线程,且队头的空白node节点还没有生成,这时候,都是null
*  2、当前队列中,已经生成了空白的node节点,但是没有排队的队列,这时候tail == head,这种场景有可能是在排队的节点都执行完了,然后这时候又来一个线程进行加锁
*
* 如果 h != t为true(为true表示最少有一个排队的线程),就需要判断后面的条件,只要后面的返回false,就可以不排队
* ((s = h.next) == null || s.thread != Thread.currentThread());
*  3.1、首先,来判断这个条件,说明前面的 h!=t是true(否则,如果h!=t为false,就无需判断后面的条件了),也就是说,当前队列中,已经有一个或者多个线程在排队
* 所以我觉得这里,如果在h!=t为true的情况下,不会存在h.next == null的这种场景;因为如果说head.next为null,就表明,当前只有一个空白节点,只有一个空白节点,在h!=t的时候,就直接返回false了
*
*  3.2、这个判断条件返回false,需要两个或条件都为false,否则,就会返回true,让线程去排队
*   两个条件都为false,简单:头结点后面有排队的节点,且头结点的下一个节点的thread是当前线程,这时候,会返回false,无需排队,直接尝试加锁;其实就是:当前来加锁的线程,和第二个节点(第一个排队的节点)
*   是同一个线程,这时候,可以加;因为我自己就是排在第二个的(第一个是空节点),为什么不能加锁呢?意思就是,我已经排到队了,就该我加锁
*
* 假如说,这时候一个线程过来加锁, h!=t,但是当前线程不是排队中第二个节点的thread,就会让排队,原因就是,这是公平锁,已经有人在排队了,为什么你不排队?这种情况就是 s.thread != Thread.currentThread()为true
*
*  3.3、如果h != t为true;后面的这个条件中,只要有一个为true,就需要去排队
*   h.next == null为true 这个条件我觉得在前面条件为true的情况下,这里不太可能为true;所以,后面这个条件为true
*   ,就是当前尝试加锁的线程和排队的线程不是同一个,这时候,就需要去排队,因为,你来加锁的时候,我已经在排队了,并且你和我又不是同一个线程,你为什么不排队?
*
* 所以总结而言,对于公平锁,无需排队的有这三种情况:
*  1、当前aqs队列为空
*  2、当前aqs队列不为空,只有一个空白的node节点
*  3、当前aqs队列不为空,有多个线程在排队,只有当前来加锁的线程是第一个排队的加锁线程时,可以不排队,直接尝试加锁(这个场景之前在网上看到一个帖子:是这样的,假如说现在在排队打水,A排在第一个等待打水的位置,如果A
*  的父母、亲戚来打水了,可以不排队,直接站到A的位置,因为他们是亲戚,一家人,也就是可重入)
*
* @return
*/
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

对于非公平锁和公平锁来说,在加锁的时候有一个区别,就是非公平锁无需执行hasQueuedPredecessors()这个方法,也就是说非公平锁无需判断是否需要加锁,可以直接尝试CAS加锁

截止到这里,尝试加锁的方法就说完了,我们来说下后面的方法,如果tryAcquire(arg)方法返回true,表示加锁成功,就不需要后面去排队的方法了,假如返回的是false,表示当前线程需要去排队,排队的方法就是acquireQueued(addWaiter(Node.EXCLUSIVE), arg);这一行代码,有两个方法,我们拆开来看

非公平锁加锁

非公平锁加锁和公平锁加锁的区别是:
1、在调用lock()方法的时候,会先cas加锁,而公平锁是直接去判断是否允许加锁的
2、在判断到state为0 的时候,公平锁调用hasQueuedPredecessors()去判断是否可以加锁,而非公平锁不会调用这个方法,直接cas
除了这两个地方,其他都是一样的加锁流程
非公平锁只要去排队了,执行的流程和公平锁是一样的

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}s

排队-addWaiter(Node.EXCLUSIVE);

首先,先描述一下这个方法的作用:当线程需要排队的时候,会先new一个node节点,如果当前aqs队列不为空,就直接把new出来的node节点,加到队尾;如果aqs队列为空,就先new一个空白的node节点,作为head队头,然后把需要排队的当前线程,加到head后面;

/**
* 首先根据当前线程,生成一个node节点:
* 这个方法完成的作用就是:如果当前有节点在排队,或者已经生成了空白节点,就直接将new的node节点插入到队列中;如果当前没有节点在排队,没有生成空白节点,就先生成空白节点,插入到队列中,再将new的node查到空白节点后面
*   1、如果尾结点不为null,表示当前已经存在排队的节点;或者当前不存在排队的节点,但是已经生成了队列中的第一个空节点,这是,就不需要做其他操作,直接加到队尾即可
*   2、如果pred == null,表示当前没有尾结点;这时候,在插入到队列之前,需要先放一个空节点在最前面,然后把new出来的node加到空节点后面;这个操作,就是在enq(node);中完成的
* @param mode
* @return
*/
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    //队尾不为null
    if (pred != null) {
        //由于是双向链表,所以需要设置node.prev和pred.next
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //这里是表示当前aqs队列为空,需要先new一个空白node节点作为head
    enq(node);
    return node;
}


-------------------------------------


/**
* 代码如果进入到这里,表示当前需向队列中插入node节点,但是没有空白节点
* @param node
* @return
*/
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        /**
         * 这里还是先判断一下队尾是否为null,如果不为null,表示在这一段时间内,有其他线程已经创建了空白节点
         * 1、如果尾结点为null,就new一个空白节点,并将空白节点设置为头结点;这时,其实头结点和尾结点都是这个空白节点
         * 2、这里是一个for死循环,所以,在创建了空白节点之后,第二次循环的时候,会进入到else
         *  将node节点设置为尾结点,并将head的next设置为node,至此,这个方法结束,return
         */
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

截止到这里,需要排队的队列已经加入到了aqs队列中,在入队之后,还要完成后续的动作:将前一个节点的waitStatus设置为-1,表示前一个节点在执行结束,释放锁之后,要唤醒后面的排队界面;这里的一个点是:在当前节点插入队列之后,需要把上一个节点(node.prev)的ws设置为-1

这个把当前排队节点的上一个节点的waitStatus设置为-1是非常重要的一个操作,因为在前一个节点解锁之后,会判断自己的waitStatus是否是-1,来判断:当前是否还有排队的线程;如果自己的waitStatus是-1,就表示后面还有排队的线程,就要去唤醒;否则,就不需要去唤醒

acquireQueued()方法:休眠

这个方法主要完成了以下工作:
1.如果当前node节点是第一个来排队的线程,就尝试进行一次加锁,防止在这期间,加锁的线程执行完毕,未唤醒后面的节点
2.如果加锁失败,或者是非第一个排队节点,那就尝试将上一个节点的waitStatus设置为-1,然后进行park()即可

/**
* 这个方法完成的是:判断当前入队的node节点是否需要park
*  如果当前是第二个线程进入到这里,会尝试去竞争锁;
* @param node
* @param arg
* @return
*/
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //这里的p是当前节点的上一个节点 prev
            final Node p = node.predecessor();
            /**
             * 如果当前节点的prev就是head头结点,就表示当前只有我自己一个线程来排队了,这时候,就尝试加一次锁,看是否可以加锁成功;因为有可能在我入队的这一段时间,线程已经执行完了,所以这里要尝试加一次锁
             *
             * 如果加锁成功,就把当前节点设置为头结点,其实就是把当前节点设置为空节点,将原来的空节点回收掉
             * 如果加锁失败,就走下面的流程
             */
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            /**
             * 将上一个节点的waitStatus设置为-1的逻辑是在shouldParkAfterFailedAcquire方法中完成的
             * 代码走到这里,表示当前我不是第一个来排队的线程,,或者我是第一个来排队的,上面尝试加锁的时候,失败了;这里判断是否需要park(),需要判断我的上一个节点的waitStatus是否是-1(待唤醒状态)
             * ;如果不是,就将上一个节点的ws设置为-1,然后再调用LockSupport.park
             * (this);并将当前线程park(),然后返回;
             *
             * parkAndCheckInterrupt();会将当前线程阻塞
             *
             * 需要注意的是:线程在哪里被park()的,等到被unpark()的时候,就会接着park()的代码执行,也就是说,线程被唤醒之后,会执行return Thread.interrupted();
             * 如果这里是false,就会接着for循环,尝试加锁,这时候,会加锁成功,因为本来就该我来加锁了,然后return interrupted,这时候的interrupted是false
             */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
        /**
         * 这里的failed为true的条件是比较苛刻的,如果上面的线程加锁成功了,failed为false;如果未加锁成功,又会unpark掉,所以理论上,这里是不可能为true
         * 的,除非是在上面代码的执行过程中,抛出了异常
         */
            cancelAcquire(node);
    }
}

解锁过程

公平锁和非公平锁调用的是同一个解锁方法,解锁主要完成以下几个操作
1.将当前state-1,如果-1之后不为0,就return,表示当前锁进行了重入,还有线程未释放锁
2.如果state为0,表示当前线程已经将锁完成释放,尝试唤醒后面排队的节点
3.在唤醒后面排队节点的时候,会判断当前head节点的waitStatus是否是-1,如果为0,表示后面没有排队节点,为-1,就去尝试唤醒第一个排队的节点
4.在唤醒时,调用LockSupport.unpark(thread)即可

/**
* 注意,ReentrantLock.unLock()和ReentrantReadWriteLock.writeLock.unLock()都是调用的这里
*
* tryRelease()方法返回true,表示解锁成功;返回false,表示有重入锁,还未解锁成功
* 1、如果解锁成功,就判断当前头结点的ws是否为非0,正常场景下,这里的ws应该是-1
* 如果这里的ws为0,表示这是最后一个线程了,因为后面有排队的线程的话,就会把我的ws变成-1
* 换而言之,在不考虑其他情况,如果一个node节点的ws为0,就是队尾了
*
* 在解锁成功之后,需要进行下一步的操作,这里可能会有多种情况
* 1、当前aqs队列为空,没有在排队的线程,这里的head就是null
* 2、aqs队列不为空,但是只有一个空白node节点,这时候,head的ws就是0,因为我后面没有人在排队了,没有人将我的ws设置为-1;也就是说,我执行完之后,就不需要唤醒后面的节点了
* 3、aqs队列不为空,并且空白node节点后面,还有线程在排队,这就是要调用unparkSuccessor(h)的场景
*
* @param arg
* @return
*/
//如果是非公平锁,在这个方法调用之前,会先进行一次cas,如果cas失败,再走这个代码流程
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

/**
* unlock解锁的时候,公平锁和非公平锁是一样的逻辑
*  1、判断当前解锁的线程和当前加锁的线程是否是同一个;如果不是同一个,报错
*  2、如果是同一个,state-1,如果-1之后的state为0;就表示当前线程已经解锁成;如果不为0,表示锁进行了重入,就将减1之后的值,set到state
*
* @param releases
* @return
*/
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

我们再看下unparkSuccessor方法

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    /**
     * s == null的场景就是当前队列中没有排队的线程,只有一个空的node节点
     */
    if (s == null || s.waitStatus > 0) {
        s = null;
        /**
         * 这个for循环的作用是这样的:
         *  假如现在AQS队列中是这样的:
         *   空白node节点 -- A节点 --B节点 -- C节点 -- D节点
         *   假如说A节点被取消了,这时候A的ws就是0,就会进入到这个循环中
         *
         *   最终会找到B返回
         */
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //这里的s就是head的下一个节点,这里调用unpark();有一个原则,在哪里park()了,在被unpark()之后,就接着park()后面的代码来执行
    if (s != null)
        LockSupport.unpark(s.thread);
}

所以总结而言,有几个关键点
1.在aqs队列中,第一个head节点是当前正在持有锁的线程对应的node节点
2.aqs队列中第二个节点,才是第一个排队的线程
3.非公平锁和公平锁的区别是在加锁的时候,是否会判断需要排队,如果一旦排队,就没有任何差别
4.在排队的时候,如果当前节点是第一个排队的节点(aqs中的第二个节点),需要尝试进行一次加锁,因为防止在排队的这个过程中,锁释放了,但是又没有通知node节点进行加锁
5.在排队之后,需要将上一个节点的waitStatus设置为-1
6.在释放锁之后,会尝试唤醒后面第一个排队的节点
7.所谓的加锁、解锁,就是将volatile修饰的state变量 +1 -1

本文地址:https://blog.csdn.net/CPLASF_/article/details/109627542

相关标签: JUC 并发编程