欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

AQS详解

程序员文章站 2022-03-08 19:06:46
...

一、 AQS的类图结构

AQS全称AbstractQueuedSynchronizer,即抽象同步队列,是一个基于先进先出(FIFO)等待队列的实现阻塞锁和同步器的框架。

下面看一下AQS的类图结构:

AQS详解

AQS的子类在其他类中的引用:

AQS详解

1. state

在AQS中维持了一个单一的共享状态state,来实现同步器同步。state用volatile修饰,保证多线程中的可见性。

  • getState():获取当前的同步状态

  • setState(int newState):设置当前同步状态

  • compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性

    以上3种方法都是采用final修饰的,不允许子类重写。

相关代码如下 :

  /**
   * The synchronization state.
   */
  private volatile int state;

  /**
   * Returns the current value of synchronization state.
   * This operation has memory semantics of a {@code volatile} read.
   * @return current state value
   */
  protected final int getState() {
      return state;
  }

  /**
   * Sets the value of synchronization state.
   * This operation has memory semantics of a {@code volatile} write.
   * @param newState the new state value
   */
  protected final void setState(int newState) {
      state = newState;
  }

  /**
   * Atomically sets synchronization state to the given updated
   * value if the current state value equals the expected value.
   * This operation has memory semantics of a {@code volatile} read
   * and write.
   *
   * @param expect the expected value
   * @param update the new value
   * @return {@code true} if successful. False return indicates that the actual
   *         value was not equal to the expected value.
   */
  protected final boolean compareAndSetState(int expect, int update) {
      // See below for intrinsics setup to support this
      return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  }

2. Node

AQS是基于FIFO队列的存储结构,它是以内部类Node节点(储节点)的形式进行存储。这个等待队列是CLH同步队列。

CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),**condition队列的后续节点(nextWaiter)**如下图:

AQS详解
waitStatus几种状态状态:

AQS详解

static final class Node {
    /** 共享节点模式下的节点 */
    static final Node SHARED = new Node();
    
    /** 独占模式下的节点 */
    static final Node EXCLUSIVE = null;
    
    /** 取消状态 */
    // 由于超时或中断而导致当前线程(对应同步队列或条件队列中的一个节点)被取消
    // CANCELLED是终态
    // 被取消了的节点对应的线程永远不会阻塞,放弃竞争锁
    static final int CANCELLED =  1;
    
    /** 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行 */
    // 当前节点的后继节点通过park操作被阻塞(或将要被阻塞)
    // 因此当前节点在它们释放锁或被取消的时候,需要通过unpark操作唤醒它的后继节点
    // 为了避免竞争(依据等待状态进行筛选,无需全部唤醒),
    // 执行竞争锁的方法(acquire methods)的线程首先需要表明它们需要被唤醒,
    // 如果竞争锁失败,它们就会被阻塞,等待被唤醒
    // 是否需要被唤醒,其实是记录在当前节点的前驱节点的等待状态中
    // 因此SIGNAL表示后继节点需要被唤醒,这一点非常重要!!
    static final int SIGNAL    = -1;
    
    /** waitStatus value to indicate thread is waiting on condition */
    // 当前线程对应的节点处于条件队列中
  	// 在当前线程对应的节点转移到同步队列之前,同步队列不会使用当前线程对应的节点
  	// 在当前线程对应的节点转移到同步队列的时候,等待状态会首先被设置为0
    static final int CONDITION = -2;
    /**
     * 下一次共享式同步状态获取将会无条件地传播下去
     */
    static final int PROPAGATE = -3;

    /**
     * Status field, taking on only the values:
     *   SIGNAL:     The successor of this node is (or will soon be)
     *               blocked (via park), so the current node must
     *               unpark its successor when it releases or
     *               cancels. To avoid races, acquire methods must
     *               first indicate they need a signal,
     *               then retry the atomic acquire, and then,
     *               on failure, block.
     *   CANCELLED:  This node is cancelled due to timeout or interrupt.
     *               Nodes never leave this state. In particular,
     *               a thread with cancelled node never again blocks.
     *   CONDITION:  This node is currently on a condition queue.
     *               It will not be used as a sync queue node
     *               until transferred, at which time the status
     *               will be set to 0. (Use of this value here has
     *               nothing to do with the other uses of the
     *               field, but simplifies mechanics.)
     *   PROPAGATE:  A releaseShared should be propagated to other
     *               nodes. This is set (for head node only) in
     *               doReleaseShared to ensure propagation
     *               continues, even if other operations have
     *               since intervened.
     *   0:          None of the above
     *
     * The values are arranged numerically to simplify use.
     * Non-negative values mean that a node doesn't need to
     * signal. So, most code doesn't need to check for particular
     * values, just for sign.
     *
     * The field is initialized to 0 for normal sync nodes, and
     * CONDITION for condition nodes.  It is modified using CAS
     * (or when possible, unconditional volatile writes).
     */
    volatile int waitStatus;

    /**
     * 同步队列中的前驱节点
     */
    volatile Node prev;

    /**
     * 同步队列中的后继节点
     */
    volatile Node next;

    /**
     * 获取同步状态的线程(请求锁或等待Condition的线程)
     */
    volatile Thread thread;

    /**
     * Link to next node waiting on condition, or the special
     * value SHARED.  Because condition queues are accessed only
     * when holding in exclusive mode, we just need a simple
     * linked queue to hold nodes while they are waiting on
     * conditions. They are then transferred to the queue to
     * re-acquire. And because conditions can only be exclusive,
     * we save a field by using special value to indicate shared
     * mode.
     */
    //条件队列的后继节点
    Node nextWaiter;

    /**
     * Returns true if node is waiting in shared mode.
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * Returns previous node, or throws NullPointerException if null.
     * Use when predecessor cannot be null.  The null check could
     * be elided, but is present to help the VM.
     *
     * @return the predecessor of this node
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

二、AQS同步队列(CLH)

1. AQS同步原理

CLH(Craig, Landin, and Hagersten locks) 同步队列 是一个FIFO双向队列,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为Node。AQS依赖它来完成同步状态state的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。 AQS详解

2. AQS同步器的结构—入列与出列

2.1 AQS同步器的结构

同步器拥有首节点(head)和尾节点(tail)。同步队列的基本结构如下:

AQS详解
​ 图 1.同步队列的基本结构 compareAndSetTail(Node expect,Node update)

2.2 入列

同步队列设置尾节点(未获取到锁的线程加入同步队列)

​ 同步器AQS中包含两个节点类型的引用:一个指向头结点的引用(head),一个指向尾节点的引用(tail),当一个线程成功的获取到锁(同步状态),其他线程无法获取到锁而是被构造成节点(包含当前线程,等待状态)加入到同步队列中等待获取到锁的线程释放锁。

注意:

​ 加入队列的过程,必须要保证线程安全。否则如果多个线程的环境下,可能造成添加到队列等待的节点顺序错误,或者数量不对。因此同步器提供了CAS原子的设置尾节点的方法(保证一个未获取到同步状态的线程加入到同步队列后,下一个未获取的线程才能够加入)。

​ 如下图: tail指向新节点、新节点的prev指向当前最后的节点,当前最后一个节点的next指向当前节点

AQS详解 图 2.尾节点的设置 节点加入到同步队列

代码实现可以参考:2.1 独占锁的获取 addWaiter()

2.3 出列

同步队列设置首节点(原头节点释放锁,唤醒后继节点)

​ 同步队列遵循FIFO,头节点是获取锁(同步状态)成功的节点,头节点在释放同步状态的时候,会唤醒后继节点,而后继节点将会在获取锁(同步状态)成功时候将自己设置为头节点。设置头节点是由获取锁(同步状态)成功的线程来完成的,由于只有一个线程能够获取同步状态,则设置头节点的方法不需要CAS保证,只需要将头节点设置成为原首节点的后继节点 ,并断开原头结点的next引用。

​ 如下图,设置首节点:

AQS详解

​ 图 3.首节点的设置

代码实现可以参考:2.2 独占锁的释放

三、 AQS锁

1. AQS支持的锁的类别

  • 独占锁:锁在一个时间点只能被一个线程占有。根据锁的获取机制,又分为“公平锁”和“非公平锁”。等待队列中按照FIFO的原则获取锁,等待时间越长的线程越先获取到锁,这就是公平的获取锁,即公平锁。而非公平锁,线程获取的锁的时候,无视等待队列直接获取锁。ReentrantLock和ReentrantReadWriteLock.Writelock是独占锁。

  • 共享锁:同一个时候能够被多个线程获取的锁,能被共享的锁。JUC包中ReentrantReadWriteLock.ReadLock,CyclicBarrier,CountDownLatch和Semaphore都是共享锁。

2. 基于AQS实现锁(独占与共享模式)

AQS中没有实现任何的同步接口,所以一般子类通过继承AQS以内部类的形式实现锁机制。一般通过继承AQS类实现同步器,通过getState、setState、compareAndSetState来监测状态, AQS定义了的一些模板方法,子类只需要覆盖这几个方法即可注意着五个方法并不是抽象方法,因此子类并不是必须全部覆盖这些方法。JUC中实现这些方法的子类有:

  • tryAcquire():独占方式。尝试获取资源,成功则返回true,失败则返回false。AQS详解

  • tryRelease():独占方式。尝试释放资源,成功则返回true,失败则返回false。AQS详解

  • tryAcquireShared():共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。AQS详解

  • tryReleaseShared():共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

AQS详解

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

注意:AQS用到了模板方法设计模式

模板方法模式: 在一个方法中定义一个算法的骨架,而将一些步骤延迟到子类中。模板方法使得子类可以在不改变算法结构的情况下,重新定义算法中的某些步骤。

独占式:同一时刻仅有一个线程持有同步状态,如ReentrantLock。又可分为公平锁和非公平锁。

  1. 公平锁: 按照线程在队列中的排队顺序,有礼貌的,先到者先拿到锁。
  2. 非公平锁: 当线程要获取锁时,无视队列顺序直接去抢锁,不讲道理的,谁抢到就是谁的。

共享式:多个线程可同时执行,如Semaphore/CountDownLatch等都是共享式的产物。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

2.1 独占锁的获取

参考: https://www.cnblogs.com/200911/p/6031350.html

调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,即线程获取同步状态失败后进入同步队列,后续对线程进行中断操作时,线程不会从同步队列中移除。

(1) 当前线程实现通过tryAcquire()方法尝试获取锁,获取成功的话直接返回,如果尝试失败的话,进入等待队列排队等待,可以保证线程安全(CAS)的获取同步状态。

(2) 如果尝试获取锁失败的话,构造同步节点(独占式的Node.EXCLUSIVE),通过addWaiter(Node node,int args)方法,将节点加入到同步队列的队列尾部。

(3) 最后调用acquireQueued(final Node node, int args)方法,使该节点以死循环的方式获取同步状态,如果获取不到,则阻塞节点中的线程。acquireQueued方法当前线程在死循环中获取同步状态,而只有前驱节点是头节点的时候才能尝试获取锁(同步状态)( p == head && tryAcquire(arg))。

原因:

1.头结点是成功获取同步状态的节点,而头结点的线程释放锁以后,将唤醒后继节点,后继节点线程被唤醒后要检查自己的前驱节点是否为头结点。

2.维护同步队列的FIFO原则,节点进入同步队列以后,就进入了一个自旋的过程,每个节点(后者说每个线程)都在自省的观察。

2.1.1 acquire()

#java.util.concurrent.locks.AbstractQueuedSynchronizer
/**
 * Acquires in exclusive(互斥) mode, ignoring(忽视) interrupts.  Implemented
 * by invoking at least once {@link #tryAcquire},
 * returning on success.  Otherwise the thread is queued(排队), possibly
 * repeatedly(反复的) blocking and unblocking, invoking {@link
 * #tryAcquire} until success.  This method can be used
 * to implement method {@link Lock#lock}.
 *
 * @param arg the acquire argument.  This value is conveyed(传达) to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 *        
 *  独占式的获取同步状态      
 *        
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

2.1.2 tryAcquire()

尝试获取锁:tryAcquire方法:如果获取到了锁,tryAcquire返回true,反之,返回false。

#java.util.concurrent.locks.ReentrantLock.FairSync
    protected final boolean tryAcquire(int acquires) {
    //获取当前线程
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {//当前锁没被占用
        if (!hasQueuedPredecessors() &&//1.判断同步队列中是否有节点在等待
            compareAndSetState(0, acquires)) {//2.如果上面!1成立,修改state值(表明当前锁已被占用)
            setExclusiveOwnerThread(current);//3.如果2成立,修改当前占用锁的线程为当前线程
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {//占用锁线程==当前线程(重入)
        int nextc = c + acquires;//如果是,state继续加1,这里nextc的结果就会 > 1,这个判断表示获取到的锁的线程,还可以再获取锁,这里就是说的可重入的意思
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);//修改status
        return true;
    }
    return false;//直接获取锁失败
}

2.1.3 addWaiter()

如果尝试获取同步状态失败的话,则构造同步节点(独占式的Node.EXCLUSIVE),通过addWaiter(Node node,int args)方法将该节点加入到同步队列的队尾。

#java.util.concurrent.locks.AbstractQueuedSynchronizer
/**
    * Creates and enqueues node for current thread and given mode.
    *
    * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
    * @return the new node
    * 
    */
private Node addWaiter(Node mode) {
    // 用当前线程构造一个Node对象,mode是一个表示Node类型的字段,或者说是这个节点是独占的还是共享的,或者说AQS的这个队列中,哪些节点是独占的,哪些节点是共享的。
  	Node node = new Node(Thread.currentThread(), mode);
  	//Try the fast path of enq; backup to full enq on failure(快速尝试添加尾节点)
  	Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        //CAS设置尾节点
        // 确保节点能够被线程安全的添加,使用CAS方法
        // 尝试修改为节点为最新的节点,如果修改失败,意味着有并发,这个时候进入enq中的死循环,进行“自旋”的方式修改
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //多次尝试
    enq(node);
    return node;
}

由以上代码可得,addWaiter设置尾节点失败的话,调用enq(Node node)方法设置尾节点。

2.1.4 enq()

同步器通过死循环的方式来保证节点的正确添加,在“死循环” 中通过CAS将节点设置成为尾节点之后,当前线程才能从该方法中返回,否则当前线程不断的尝试设置。enq方法将并发添加节点的请求通过CAS变得“串行化”了。

#java.util.concurrent.locks.AbstractQueuedSynchronizer
/**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     * 
     * 同步器通过死循环的方式来保证节点的正确添加,在“死循环” 中通过CAS将节点设置成为尾节点之后,当前线程才能从该方法中返回,否则当前线程不断的尝试设置。
     * enq方法将并发添加节点的请求通过CAS变得“串行化”了。
     * 
     */
private Node enq(final Node node) {
    //死循环尝试,知道成功为止
    for (;;) {
        Node t = tail;
        // 如果 tail 是 null,就创建一个虚拟节点,同时指向 head 和 tail,称为 初始化。
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 如果不是 null
        	// 和上个方法逻辑一样,将新节点追加到tail节点后面,并更新队列的 tail 为新节点。
        	// 只不过这里是死循环的,失败了还可以再来 。
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

为什么要创建一个虚拟节点 ?

参考: https://juejin.im/post/5ae755606fb9a07ab97942a4

原因:每个节点都必须设置前置节点的 ws 状态(这个状态为是为了保证数据一致性)为 SIGNAL,所以必须要一个前置节点,而这个前置节点,实际上就是当前持有锁的节点。 而第一个节点是没有前置节点的,所以需要创建一个虚拟节点。 AQS 使用的 CLH 锁,需要一个虚拟 head 节点,这个节点的作用是防止重复释放锁。当第一个进入队列的节点没有前置节点的时候,就会创建一个虚拟的。

2.1.5 acquireQueued()

在队列中的线程获取锁的过程:

  1. 获取当前节点的前置节点,如果前置节点是头结点,说明自己可以开始竞争锁资源了,如果成功,将当前节点置为头节点返回;
  2. 如果当前节点不是头结点,调用shouldParkAfterFailedAcquire判断该节点是否可以进入休眠状态,分支逻辑如下:
    • 前置节点的waitStatus如果是SIGNAL,说明后续节点可以休息返回ture;
    • 如果前置节点的waitStatus是CANCELLED状态(>0),通过循环跳过当前节点前置所有的CANCELLED节点;(之后结合释放锁来看)
    • 否则将前置节点的状态CAS改为SIGNAL,acquireQueued下一次自旋,节点就会返回true;
  3. 如果返回允许挂起后调用parkAndCheckInterrupt(),通过LockSupport.lock()方法挂起当前线程;

acquireQueued方法当前线程在死循环中获取同步状态,而只有前驱节点是头节点才能尝试获取同步状态(锁)( p == head && tryAcquire(arg))
原因:

1.头结点是成功获取同步状态(锁)的节点,而头节点的线程释放了同步状态以后,将会唤醒其后继节点,后继节点的线程被唤醒后要检查自己的前驱节点是否为头结点。

2.维护同步队列的FIFO原则,节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说是每个线程)都在自省的观察

#java.util.concurrent.locks.AbstractQueuedSynchronizer
/**
    * Acquires in exclusive uninterruptible mode for thread already in
    * queue. Used by condition wait methods as well as acquire.
    *
    * @param node the node
    * @param arg the acquire argument
    * @return {@code true} if interrupted while waiting
    * 
    */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;            
            //死循环检查(自旋检查)当前节点的前驱节点是否为头结点,才能获取锁
            for (;;) {                
                // 获取节点的前驱节点
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    //节点中的线程循环的检查,自己的前驱节点是否为头节点                    
                    //将当前节点设置为头结点,移除之前的头节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }                
                // 否则检查前一个节点的状态,看当前获取锁失败的线程是否要挂起
                if (shouldParkAfterFailedAcquire(p, node) &&                    
                    //如果需要挂起,借助JUC包下面的LockSupport类的静态方法park挂起当前线程,直到被唤醒                    
                    parkAndCheckInterrupt())//利用unsafe.park()进行空转(阻塞)
                    interrupted = true;//如果Thread.interrupt()被调用,(不会真的被打断,会继续循环空转直到获取到锁)
            }
        } finally {            
            //如果有异常
            if (failed)                
                //取消请求,将当前节点从队列中移除
                cancelAcquire(node);
        }
    }

2.1.6 shouldParkAfterFailedAcquire()

shouldParkAfterFailedAcquire()是改变waitStatus状态的方法,通过以下规则,判断“当前线程”是否需要被阻塞。

规则1:如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。
规则2:如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。
规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。

    /**
    * Checks and updates status for a node that failed to acquire.
    * Returns true if thread should block. This is the main signal
    * control in all acquire loops.  Requires that pred == node.prev
    *
    * @param pred node's predecessor holding status
    * @param node the node
    * @return {@code true} if thread should block
    * 返回当前线程是否应该阻塞
    * 
    * 说明:
    (01) 关于waitStatus请参考下表(中扩号内为waitStatus的值),更多关于waitStatus的内容,可以参考前面的Node类的介绍。

    CANCELLED[1]  -- 当前线程已被取消
    SIGNAL[-1]    -- “当前线程的后继线程需要被unpark(唤醒)”。一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
    CONDITION[-2] -- 当前线程(处在Condition休眠状态)在等待Condition唤醒
    PROPAGATE[-3] -- (共享锁)其它线程获取到“共享锁”
    [0]           -- 当前线程不属于上面的任何一种状态。
    * 
    */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前驱节点的状态
        int ws = pred.waitStatus;
        // 如果前驱节点是SIGNAL状态,则意味着当前线程需要unpark唤醒,此时返回true
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release to signal it, so it can safely park.
             */
            return true;
        // 如果前继节点是取消的状态,则设置当前节点的“当前前继节点为”原节点的前继节点
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE. Indicate that we need a signal, but don't park yet. Caller will need to retry to make sure
             * it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

独占式的获取同步状态的流程如下:

AQS详解

2.2. 独占锁的释放

release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

2.2.1 unlock()

unlock()在ReentrantLock.java中实现的,源码如下:

/* 
 * 1. unlock():unlock()是解锁函数,它是通过AQS的release()函数来实现的。
 * 在这里,“1”的含义和“获取锁的函数acquire(1)的含义”一样,它是设置“释放锁的状态”的参数。
 * 由于“公平锁”是可重入的,所以对于同一个线程,每释放锁一次,锁的状态-1。
 */
public void unlock() {
    sync.release(1);
}

2.2.2 release()

release()会调用tryRelease方法尝试释放当前线程持有的锁(同步状态),成功的话唤醒后继线程,并返回true,否则直接返回false

/**
    * Releases in exclusive mode.  Implemented by unblocking one or
    * more threads if {@link #tryRelease} returns true.
    * This method can be used to implement method {@link Lock#unlock}.
    *
    * @param arg the release argument.  This value is conveyed to
    *        {@link #tryRelease} but is otherwise uninterpreted and
    *        can represent anything you like.
    * @return the value returned from {@link #tryRelease}
    */
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

2.2.3 tryRelease()

tryRelease() 尝试释放当前线程的同步状态(锁)

protected final boolean tryRelease(int releases) {
    //c为释放后的同步状态
    int c = getState() - releases;
    //判断当前释放锁的线程是否为获取到锁(同步状态)的线程,不是抛出异常(非法监视器状态异常)
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //如果锁(同步状态)已经被当前线程彻底释放,则设置锁的持有者为null,同步状态(锁)变的可获取
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
      

2.2.4 unparkSuccessor()

释放锁成功后,找到AQS的头结点,并唤醒它即可:

// 4. 唤醒头结点的后继节点
private void unparkSuccessor(Node node) {
    //获取头结点(线程)的状态
    int ws = node.waitStatus;
    //如果状态<0,设置当前线程对应的锁的状态为0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;

    //解释:Thread to unpark is held in successor, which is normally just the next node. 
    //But if cancelled or apparently(显然) null, traverse backwards(向后遍历) from tail to find the actual(实际的) non-cancelled successor(前继节点).
    //从队列尾部开始往前去找最前面的一个waitStatus小于0的节点。
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //唤醒后继节点对应的线程
    if (s != null)
        LockSupport.unpark(s.thread);
}
    

2.3 共享锁的获取

acquireShared(int)是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。

2.3.1 acquireShared()

public final void acquireShared(int arg) {
     if (tryAcquireShared(arg) < 0)
         doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);//加入队列尾部
    boolean failed = true;//是否成功标志
    try {
        boolean interrupted = false;//等待过程中是否被中断过的标志
        for (;;) {
            final Node p = node.predecessor();//前驱
            //如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
            if (p == head) {
                int r = tryAcquireShared(arg);//尝试获取资源
                if (r >= 0) {//成功
                    //将head指向自己,还有剩余资源可以再唤醒之后的线程
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)//如果等待过程中被打断过,此时将中断补上。
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }

            //判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这里tryAcquireShared()依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。所以这里acquireShared()的流程就是:

  1. tryAcquireShared()尝试获取资源,成功则直接返回;

  2. 失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。

  3. doAcquireShared(int)此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。

2.4 共享锁的释放

releaseShared()是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。

2.4.1 releaseShared()

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//尝试释放资源
        doReleaseShared();//唤醒后继结点
        return true;
    }
    return false;
}

此方法的流程也比较简单,一句话:释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。例如,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值。

四、ConditionObject

参考: http://zhongmingmao.me/2016/08/11/concurrent-conditionobject/

synchronized控制同步的时候,可以配合Object的wait()、notify(),notifyAll() 系列方法可以实现等待/通知模式。

而Lock呢?它提供了条件Condition接口,配合await(),signal(),signalAll() 等方法也可以实现等待/通知机制。ConditionObject实现了Condition接口,给AQS提供条件变量的支持

1. Condition接口

public interface Condition {
    // ===== await方法列表
    // 使当前线程进入等待状态直到被signal或中断,相当于synchronized等待唤醒机制中的wait()方法
    void await() throws InterruptedException;
    // 使当前线程进入等待状态直到被signal,不响应中断
    void awaitUninterruptibly();
    // 使当前线程进入等待状态直到被signal、或被中断、或超时(相对时间)
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    // 与awaitNanos类似,可以指明时间单位
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    // 与awaitNanos类似,只是采用的是绝对时间
    boolean awaitUntil(Date deadline) throws InterruptedException;

    // ===== signal方法列表
    // 唤醒一个等待在某个Condition实例上的线程,必须首先持有与Condition相关联的锁,相当于notify()
    void signal();
    // 与signal类似,相当于notifyAll()
    void signalAll();
}

一个很关键的地方:Condition的实现类在重写Condition所有方法都建议先持有与Condition关联的锁AQS中的ConditionObject就满足这一点,因此调用ConditionObject的方法是线程安全的,这里说的线程安全有一个前提,就是线程必须先持有独占锁

newCondition()实际创建的是ConditionObject对象

// From ReentrantLock
public Condition newCondition() {
    return sync.newCondition();
}
// From Sync
final ConditionObject newCondition() {
    return new ConditionObject();
}

2. 核心结构

public class ConditionObject implements Condition, java.io.Serializable {    
	// 条件队列(condition queue)的头结点    
    private transient Node firstWaiter;    
    // 条件队列(condition queue)的尾节点    
    private transient Node lastWaiter;    
    // 中断模式是为了对不同的中断情况做不同的处理,进而告诉上层调用者中断情况,有2种模式    
    // 中断模式:需要重新设置线程的中断状态    
    private static final int REINTERRUPT =  1;    
    // 中断模式:需要抛出InterruptedException异常    
    private static final int THROW_IE    = -1;
}

3. Condition(条件)队列与CLH(同步)队列

  1. 条件队列仅有nextWaiter,因此**条件队列是单向非循环队列,而同步队列是双向非循环队列**
  2. 条件队列中节点只有3种等待状态:
    • CANCELLED:需要从条件队列中移除
    • CONDITION:等待被转移到同步队列
    • 0:转移过程中或已经转移完成,在_transferAfterCancelledWaittransferForSignal_中设置,后面会详细分析
  3. AQS只能拥有**1个同步队列_,但可以拥有多个条件队列**_

条件队列同步队列的关系大致如下:
AQS详解
简单说明(后面源码分析将详细介绍):

  1. ReentrantLock.newCondition():创建一个新的ConditionObject实例,每个ConditionObject拥有firstWaiter属性和lastWaiter属性,对应一个Condition(条件)队列
  2. ConditionObject.await():将当前线程包装成节点后加入到对应的Condition(条件)队列并进行阻塞,然后等待被转移同步队列中。并且唤醒CLH(同步)队列中head节点的下一个节点。
  3. ConditionObject.signal():将ConditionObject实例对应的条件队列中的节点(从头结点开始往后遍历筛选)转移到AQS同步队列的队尾等待获得独占锁(等待被唤醒)。获得独占锁后,上面的ConditionObject.await()方法return,继续执行

区别:

  • ConditionObject对象都维护了一个单独的等待队列 ,AQS所维护的CLH队列是同步队列,它们节点类型相同,都是Node。

4. 源码分析

4.1 await

该方法的分析在本文中是最复杂的

4.1.1 await()

// From ConditionObject
// 需要先持有独占锁,线程安全
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        // 线程被中断则直接抛出InterruptedException,可响应中断
        throw new InterruptedException();

    // ===== 1. 创建新节点并加入条件队列的队尾
    Node node = addConditionWaiter();

    // ===== 2. 释放锁
    // 完全释放独占锁(锁是可重入的)并尝试唤醒同步队列头结点的后继节点,并返回释放锁之前的同步状态
    int savedState = fullyRelease(node);

    int interruptMode = 0;

    // ===== 3. 自旋转移节点(条件队列 -> 同步队列),并记录中断模式
    while (!isOnSyncQueue(node)) { // isOnSyncQueue:判断节点是否已经由条件队列转移到同步队列
        // 节点还在条件队列中,挂起当前线程,等待被唤醒或被中断
        LockSupport.park(this);
        // 执行到这里,说明当前线程退出休眠状态,有3种情况:
        // 1. ConditionObject.signal -> 节点从条件队列转移到同步队列(前驱节点等待状态为SIGNAL) -> 等待被前驱节点唤醒(unpark)
        // 2. ConditionObject.signal -> 节点从条件队列转移到同步队列(前驱节点等待状态为CANCELLED) -> 直接唤醒(unpark)
        // 3. 当前线程被中断(interrupt)

        // 节点转移过程中当前线程的中断情况,有3种情况:
        // 1. 当前线程没有被中断,返回0
        // 2. 当前线程被中断 + 中断发生在ConditionObject.signal()调用之前,执行自旋入队操作,记录中断模式:THROW_IE(-1)
        //    转移到同步队列后,再次抛出InterruptedException异常,然后执行cancelAcquire,将节点的等待状态置为CANCELLED
        // 3. 当前线程被中断 + 中断发生在ConditionObject.signal()调用之后,自旋等待入队操作完成,记录中断模式:REINTERRUPT(1)
        //    转移到同步队列后,仅仅设置对应线程的中断状态
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            // 由于中断而导致退出休眠状态,则退出循环
            // interruptMode = THROW_IE(-1) OR REINTERRUPT(1)
            break;
    }

    // ===== 4. 自旋请求独占锁,并维护中断模式
    // 执行到这里,说明线程被中断(被中断也会完成节点转移,下面会详细分析)或者节点转移成功,所以此时节点已经转移到了同步队列
    // fullyRelease已经释放了独占锁,下面会等待独占锁(acquireQueued方法),与包裹await()方法的unlock()方法配对
    if (acquireQueued(node, savedState) // acquireQueued是通过自旋来等待锁,并且返回退出休眠状态去竞争锁的原因是否是被中断
            && interruptMode != THROW_IE) // 执行第二个条件判断,说明已经获得锁并且当前线程被中断了,但中断标志被重置了
        // 执行到这里,说明interruptMode为0或REINTERRUPT(1)
        // 1. 对于REINTERRUPT(1),下面的语句interruptMode=REINTERRUPT,显然是没有意义的
        // 2. 对于interruptMode=0,说明上面的while(!isOnSyncQueue(node))循环没有被中断,但在acquireQueued被中断了,
        //    且中断标志被重置了,因此需要将interruptMode设置为REINTERRUPT
        // 3. 对于THROW_IE(-1),说明判断节点是否已经由条件队列转移到同步队列时发生中断,且中断发生在ConditionObject.signal()调用之前,
        //    直接抛出异常即可
        // 总结:acquireQueued被中断,但while(!isOnSyncQueue(node))没有被中断,需要记录中断模式为REINTERRUPT
        interruptMode = REINTERRUPT;

    // ===== 5. 清理条件队列
    // 执行到这里说明节点已经转移到同步队列中,且已经获得独占锁(或在acquireQueued的过程中被中断)
    // 此时节点不应该跟条件队列有关联了,而且此时节点的状态肯定不为CONDITION
    // 因此执行unlinkCancelledWaiters,从条件队列移除该节点
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();

    // ===== 6. 已经中断模式,向上层反馈中断情况
    if (interruptMode != 0) // interruptMode = THROW_IE(-1) OR REINTERRUPT(1)
        // 依据不同的中断模式,向调用方报告当前线程的中断情况
        // 1. ConditionObject.signal方法调用之前中断了当前线程,往外抛出InterruptedException异常,中断线程的后续操作
        // 2. ConditionObject.signal方法调用之后中断了当前线程,重置当前线程的中断状态,对线程不会有实际性的影响
        reportInterruptAfterWait(interruptMode);
}

4.1.2 addConditionWaiter()

// From ConditionObject
// 需要先持有独占锁,线程安全
private Node addConditionWaiter() {
    Node t = lastWaiter;

    if (t != null && t.waitStatus != Node.CONDITION) {
        // 如果条件队列尾节点是非CONDITION节点,从头结点开始遍历条件队列,并移除非CONDITION节点
        unlinkCancelledWaiters();
        // 获取条件队列最新的尾节点
        t = lastWaiter;
    }

    // 创建新节点,初始等待状态为CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        // 条件队列为空,将firstWaiter指向刚创建的节点node
        firstWaiter = node;
    else
        // 条件队列不为空,原条件队列队尾的后继节点设置为刚创建的节点node
        t.nextWaiter = node;

    // 更新条件队列的队尾为刚创建的节点
    lastWaiter = node;
    return node;
}

4.1.3 unlinkCancelledWaiters()

// From ConditionObject
// 需要先持有独占锁,线程安全
// 从头结点开始遍历条件队列,并移除非CONDITION节点
// 很巧妙的代码!!
private void unlinkCancelledWaiters() {
    Node t = firstWaiter; // 从头结点开始遍历条件队列,t用于迭代
    Node trail = null; // 遍历过程中,用于记录最近的已遍历的CONDITION节点,初始值为null,这点非常重要!!
    while (t != null) {
        Node next = t.nextWaiter; // next为t在条件队列中的后继节点
        if (t.waitStatus != Node.CONDITION) {
            // t为非CONDITION节点,首先需要断开t与next的单线链接nextWaiter
            t.nextWaiter = null;
            if (trail == null)
                // trail等于null,说明从头结点到当前遍历节点t都是非CONDITION节点,
                // 直接将头结点设置为当前遍历节点的后继节点next
                firstWaiter = next;
            else
                // trail不为null,即已经找到CONDITION节点,
                // 将trail的后继节点设置为当前遍历节点的后继节点next,
                // 这将跳过trail(不包括)到当前遍历节点(包括),因为这些节点都明确是非CONDITION节点
                // 但在当前循环没必要判断next是不是CONDITION节点,那是下个循环的任务
                trail.nextWaiter = next;

            if (next == null)
                // next=null,说明t是原尾节点,
                // 直接将尾节点更新为trail(最近的已遍历的CONDITION节点)
                lastWaiter = trail;
        }
        else
            // trail用于记录最近的已遍历的CONDITION节点
            trail = t;
        // t是迭代节点,往后迭代
        t = next;
    }
}

4.1.4 fullyRelease()

// From AQS
// 需要先持有独占锁,线程安全
// 完全释放独占锁(锁是可重入的)并尝试唤醒同步队列头结点的后继节点,并返回释放锁之前的同步状态
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 释放锁之前的同步状态
        int savedState = getState();
        // 尝试释放独占锁并唤醒同步队列中头结点的后继节点
        // 释放锁调用的tryRelease方法必须首先要持有锁
        // 说明了ConditionObject.await()方法必须要先持有ConditionObject对应的锁
        if (release(savedState)) {
            failed = false;
            // 成功释放独占锁
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            // fullyRelease在addConditionWaiter之后,节点已经进入了条件队列,
            // 因此在释放独占锁失败,需要将节点的等待状态置为CANCELLED,等待被移除
            // 释放独占锁失败的场景:调用await方法时没有先持有独占锁
            node.waitStatus = Node.CANCELLED;
    }
}

4.1.5 isOnSyncQueue()

// From AQS
// 判断节点是否已经由条件队列转移到同步队列
// ConditionObject.signal()会将节点从条件队列转移到同步队列
final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        // 1. 节点加入条件队列时,等待状态为CONDITION,在节点转移过程中,会将等待状态设置为0,
        //    所以如果节点的等待状态为CONDITION,说明节点一定还在条件队列中;
        // 2. 转移过程中会首先设置节点的同步队列前驱节点属性prev,
        //    如果节点的同步队列前驱节点属性为null,说明节点一定还在条件队列中,
        //    另外需要注意的是,即使节点拥有了同步队列的前驱节点属性prev也不能说明节点已经转移到了同步队列中,
        //    因为有可能compareAndSetTail失败,那么同步队列的原尾节点的后继节点依旧为null,而不是node
        //    此时node还只是单方面的连接到同步队列,同步队列中没有任何节点将其作为前驱节点或后继节点
        //    更详细的分析请参照博文:「并发 - JUC - ReentrantLock - 源码剖析」
        return false;
    if (node.next != null)
        // 如果节点拥有了同步队列的后继节点next,那么节点一定已经转移到了同步队列中
        // 更详细的分析请参照博文:「并发 - JUC - ReentrantLock - 源码剖析」
        return true;

    // 从同步队列的尾节点向前遍历,看能否找到节点node
    // 由于入队操作是在队尾,因此大部分情况下,当前节点不会离同步队列队尾太远,效率比较高
    return findNodeFromTail(node);
}

4.1.6 findNodeFromTail()

// From AQS
// 从同步队列的尾节点向前遍历(依据节点的prev属性,而prev属性用于连接同步队列的),看能否找到节点node
private boolean findNodeFromTail(Node node) {
    Node t = tail; // 从同步队列尾节点开始遍历
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            // t.next为head,即同步队列头结点
            return false;
        t = t.prev;
    }
}

4.1.7 checkInterruptWhileWaiting()

// From ConditionObject
// 判断节点转移过程中当前线程的中断情况
// 1. 当前线程没有被中断,返回0
// 2. 当前线程被中断 + 中断发生在ConditionObject.signal()调用之前,执行自旋入队操作,返回THROW_IE(-1)
//    时序:中断(直接中断await线程,继续执行) -> signal,
//    这种情况下,中断先发生,按照正常语义,对应线程已经没有继续执行的必要,因此转移到同步队列后,需要再次抛出异常,取消排队
// 3. 当前线程被中断 + 中断发生在ConditionObject.signal()调用之后,自旋等待入队操作的完成,返回REINTERRUPT(1)
//    时序:signal(完成节点转移才会唤醒await线程,继续执行) -> 中断,
//    这种情况下,signal先发生,按照正常语义,对应的线程应该继续执行
// 代码的套路好深啊!!
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ? // 返回线程是否被中断,并重置中断状态
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0; // 没有被线程没有被中断,返回0
}

4.1.8 transferAfterCancelledWait()

// From AQS
// 如果线程中断发生在ConditionObject.signal()调用之前,执行入队操作,返回true,对应THROW_IE
// 如果线程中断发生在ConditionObject.signal()调用之后,自旋等待入队操作完成,返回false,对应REINTERRUPT
// 即便发生中断,也会自旋完成节点的转移,这一点很重要!!
final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { // 很巧妙的设计!!
        // CAS成功地将节点的等待状态从CONDITION置为0,则进入同步队列
        // 执行到这里,说明是ConditionObject.signal(实际上是transferForSignal)尚未被调用
        // 因为ConditionObject.signal在将节点转移到同步节点时也会执行同样的CAS操作(将节点的等待状态从CONDITION置为0)
        // 如果ConditionObject.signal的CAS操作成功了,上面的CAS操作就会失败
        // 因此返回true,表明中断发生在ConditionObject.signal()之前

        // 这会导致transferForSignal不会继续执行转移操作,因此这里要完成transferForSignal本该完成的工作(节点转移)
        // 自旋进入同步队列!!
        enq(node);
        return true;
    }

    // 执行到这里是因为ConditionObject.signal已经将节点的等待状态置为0,导致上面的CAS操作失败
    // 因此返回false,表明中断发生在ConditionObject.signal()调用之后,这时节点转移可能还没有完成(这个概率很低)
    // 出现这种情况,就通过自旋来等待转移操作完成(即便发生中断,依旧会转移节点)!!
    while (!isOnSyncQueue(node))
        Thread.yield(); // 尝试让出CPU资源,但不会让出锁资源
    return false;
}

4.1.9 reportInterruptAfterWait()

// From ConditionObject
// 依据不同的中断模式,向调用方报告当前线程的中断情况
// 1. 如果中断模式是THROW_IE时,则抛出InterruptedException异常
// 3. 如果中断模式是REINTERRUPT时,则执行线程自我中断,重置当前线程中断状态
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

4.2 signal

ConditionObject.signal**并不总是直接唤醒线程**,而是首先将节点从条件队列转移到同步队列,再依据在同步队列中前驱节点的等待状态做不同的处理

  1. 如果被转移的节点在同步队列中的**前驱节点没有被取消**,那么被转移的节点同步队列等待锁
  2. 如果被转移的节点在同步队列中的**前驱节点被取消**了,才会直接唤醒被转移节点的关联线程,这点比较重要,不要认为signal就是直接唤醒

4.2.1 signal()

// From ConditionObject
// 从条件队列头节点开始遍历,找出第一个需要转移的节点,并转移到同步队列
public final void signal() {
    if (!isHeldExclusively()) // 当前线程是否持有独占锁
        // 说明调用ConditionObject.signal()方法之前必须先持有与ConditionObject关联的独占锁
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        // 条件队列不为空时,从条件队列头节点开始遍历,找出第一个需要转移的节点,并转移到同步队列
        doSignal(first);
}

4.2.2 isHeldExclusively()

// From Sync
protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}

4.2.3 doSignal()

// From ConditionObject
// 从条件队列头节点开始遍历,找出第一个需要转移的节点,并转移到同步队列
private void doSignal(Node first) {
    do {
        if ((firstWaiter = first.nextWaiter) == null)
            // 如果条件队列的头结点为null,条件队列的尾节点必为null
            lastWaiter = null;
        // first将要被转移到同步队列,需要从条件队列中断开
        first.nextWaiter = null;
    } while (
        // 没有成功转移有效节点并且未达到条件队列尾节点,继续循环
        !transferForSignal(first) && (first = firstWaiter) != null);
}

4.2.4 transferForSignal()

// From AQS
// 将节点从条件队列转移到同步队列,转移成功且没有被中断则返回true,因中断而取消则返回false
// 即成功转移有效节点返回true,否则返回false
final boolean transferForSignal(Node node) {
    // 转移节点之前首先将其等待状态设置为0
    // 这与ReentrantLock.lock()竞争锁失败时,封装成节点并准备进入同步队列的场景保持一致
    // 那时节点的等待状态也是0,因此当前节点准备进入同步队列前,等待状态也设置为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        // 节点的等待状态为0:在transferForSignal被调用前,线程因中断而退出休眠状态,继续执行await()后半段代码
        // 这会通过transferAfterCancelledWait来校验中断发生在transferForSignal之前还是transferForSignal之后
        // 如果是之前,那么此时的预期值为0,CAS会失败,直接返回false,
        // transferAfterCancelledWait()方法会在中断产生时完成节点转移工作,进入下一循环
        return false;

    // 节点自旋进入同步队列,并返回前驱节点
    // 更详细的分析请参照博文:「并发 - JUC - ReentrantLock - 源码剖析」
    Node p = enq(node);
    int ws = p.waitStatus; // 前驱节点的等待状态

    // 1. ws>0,说明前驱节点的等待状态为CANCELLED,放弃竞争锁,直接唤醒当前节点
    // 2. 如果ws<=0,则统一将前驱节点跟新为SIGNAL,表示当前驱节点取消时,能够唤醒当前节点,当前节点可以被安全地挂起
    //    如果CAS更新失败,则直接唤醒当前节点
    // 简单概括起来就是如果前驱节点取消了,就直接唤醒当前节点
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread); // 线程被唤醒后会继续执行await()的后半段代码
    return true;
}

4.3 awaitUninterruptibly

前面分析的await()方法是响应中断的,本节介绍的waitUninterruptibly()不响应中断

// From ConditionObject
// 与await()方法类似,仅标注不一样的地方
public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
            // 如果曾经由于中断而退出休眠状态,而标记被中断
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt(); // 在节点转移过程中,如果曾经被中断,则重新设置中断标志
}

4.4 await(long time,TimeUnit unit)

前面分析的await()方法是不限时等待的,本节介绍的await(long time,TimeUnit unit)限时等待

// From ConditionObject
// 与await()方法类似,仅标注不一样的地方
public final boolean await(long time, TimeUnit unit) throws InterruptedException {
    // 剩余的等待时长(纳秒)
    long nanosTimeout = unit.toNanos(time);
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    // 超时的绝对时间
    final long deadline = System.nanoTime() + nanosTimeout;
    // 标注是否超时
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (nanosTimeout <= 0L) {
            // 剩余的等待时长为非正值,说明超时了,则执行transferAfterCancelledWait并取消等待
            // transferAfterCancelledWait如果返回true,说明节点转移成功
            // transferAfterCancelledWait如果返回false,说明在超时发生前,ConditionObject.signal已经触发,可以归纳为没有超时
            timedout = transferAfterCancelledWait(node);
            break;
        }
        // 当剩余的等待时长不小于1000纳秒时,这选择限时挂起线程,线程在nanosTimeout会自动唤醒(假如期间没有被中断)
        // 当剩余的等待时长小于1000纳秒时,选择自旋,不挂起线程
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        // 更新剩余的等待时长
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout; // 返回是否await等待超时
}
相关标签: Java