AQS详解
文章目录
- 一、 AQS的类图结构
- 二、AQS同步队列(CLH)
- 三、 AQS锁
- 四、ConditionObject
- 1. Condition接口
- 2. 核心结构
- 3. Condition(条件)队列与CLH(同步)队列
- 4. 源码分析
- 4.1 await
- 4.1.1 await()
- 4.1.2 addConditionWaiter()
- 4.1.3 unlinkCancelledWaiters()
- 4.1.4 fullyRelease()
- 4.1.5 isOnSyncQueue()
- 4.1.6 findNodeFromTail()
- 4.1.7 checkInterruptWhileWaiting()
- 4.1.8 transferAfterCancelledWait()
- 4.1.9 reportInterruptAfterWait()
- 4.2 signal
- 4.3 awaitUninterruptibly
- 4.4 await(long time,TimeUnit unit)
一、 AQS的类图结构
AQS全称AbstractQueuedSynchronizer,即抽象同步队列,是一个基于先进先出(FIFO)等待队列的实现阻塞锁和同步器的框架。
下面看一下AQS的类图结构:
AQS的子类在其他类中的引用:
1. state
在AQS中维持了一个单一的共享状态state,来实现同步器同步。state用volatile修饰,保证多线程中的可见性。
-
getState():获取当前的同步状态
-
setState(int newState):设置当前同步状态
-
compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性
以上3种方法都是采用final修饰的,不允许子类重写。
相关代码如下 :
/**
* The synchronization state.
*/
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
2. Node
AQS是基于FIFO队列的存储结构,它是以内部类Node节点(储节点)的形式进行存储。这个等待队列是CLH同步队列。
CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),**condition队列的后续节点(nextWaiter)**如下图:
waitStatus几种状态状态:
static final class Node {
/** 共享节点模式下的节点 */
static final Node SHARED = new Node();
/** 独占模式下的节点 */
static final Node EXCLUSIVE = null;
/** 取消状态 */
// 由于超时或中断而导致当前线程(对应同步队列或条件队列中的一个节点)被取消
// CANCELLED是终态
// 被取消了的节点对应的线程永远不会阻塞,放弃竞争锁
static final int CANCELLED = 1;
/** 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行 */
// 当前节点的后继节点通过park操作被阻塞(或将要被阻塞)
// 因此当前节点在它们释放锁或被取消的时候,需要通过unpark操作唤醒它的后继节点
// 为了避免竞争(依据等待状态进行筛选,无需全部唤醒),
// 执行竞争锁的方法(acquire methods)的线程首先需要表明它们需要被唤醒,
// 如果竞争锁失败,它们就会被阻塞,等待被唤醒
// 是否需要被唤醒,其实是记录在当前节点的前驱节点的等待状态中
// 因此SIGNAL表示后继节点需要被唤醒,这一点非常重要!!
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
// 当前线程对应的节点处于条件队列中
// 在当前线程对应的节点转移到同步队列之前,同步队列不会使用当前线程对应的节点
// 在当前线程对应的节点转移到同步队列的时候,等待状态会首先被设置为0
static final int CONDITION = -2;
/**
* 下一次共享式同步状态获取将会无条件地传播下去
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* 同步队列中的前驱节点
*/
volatile Node prev;
/**
* 同步队列中的后继节点
*/
volatile Node next;
/**
* 获取同步状态的线程(请求锁或等待Condition的线程)
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
//条件队列的后继节点
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
二、AQS同步队列(CLH)
1. AQS同步原理
CLH(Craig, Landin, and Hagersten locks) 同步队列 是一个FIFO双向队列,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为Node。AQS依赖它来完成同步状态state的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
2. AQS同步器的结构—入列与出列
2.1 AQS同步器的结构
同步器拥有首节点(head)和尾节点(tail)。同步队列的基本结构如下:
图 1.同步队列的基本结构 compareAndSetTail(Node expect,Node update)
2.2 入列
同步队列设置尾节点(未获取到锁的线程加入同步队列)
同步器AQS中包含两个节点类型的引用:一个指向头结点的引用(head),一个指向尾节点的引用(tail),当一个线程成功的获取到锁(同步状态),其他线程无法获取到锁而是被构造成节点(包含当前线程,等待状态)加入到同步队列中等待获取到锁的线程释放锁。
注意:
加入队列的过程,必须要保证线程安全。否则如果多个线程的环境下,可能造成添加到队列等待的节点顺序错误,或者数量不对。因此同步器提供了CAS原子的设置尾节点的方法(保证一个未获取到同步状态的线程加入到同步队列后,下一个未获取的线程才能够加入)。
如下图: tail指向新节点、新节点的prev指向当前最后的节点,当前最后一个节点的next指向当前节点。
图 2.尾节点的设置 节点加入到同步队列
代码实现可以参考:2.1 独占锁的获取 addWaiter()
2.3 出列
同步队列设置首节点(原头节点释放锁,唤醒后继节点)
同步队列遵循FIFO,头节点是获取锁(同步状态)成功的节点,头节点在释放同步状态的时候,会唤醒后继节点,而后继节点将会在获取锁(同步状态)成功时候将自己设置为头节点。设置头节点是由获取锁(同步状态)成功的线程来完成的,由于只有一个线程能够获取同步状态,则设置头节点的方法不需要CAS保证,只需要将头节点设置成为原首节点的后继节点 ,并断开原头结点的next引用。
如下图,设置首节点:
图 3.首节点的设置
代码实现可以参考:2.2 独占锁的释放
三、 AQS锁
1. AQS支持的锁的类别
-
独占锁:锁在一个时间点只能被一个线程占有。根据锁的获取机制,又分为“公平锁”和“非公平锁”。等待队列中按照FIFO的原则获取锁,等待时间越长的线程越先获取到锁,这就是公平的获取锁,即公平锁。而非公平锁,线程获取的锁的时候,无视等待队列直接获取锁。ReentrantLock和ReentrantReadWriteLock.Writelock是独占锁。
-
共享锁:同一个时候能够被多个线程获取的锁,能被共享的锁。JUC包中ReentrantReadWriteLock.ReadLock,CyclicBarrier,CountDownLatch和Semaphore都是共享锁。
2. 基于AQS实现锁(独占与共享模式)
AQS中没有实现任何的同步接口,所以一般子类通过继承AQS以内部类的形式实现锁机制。一般通过继承AQS类实现同步器,通过getState、setState、compareAndSetState来监测状态, AQS定义了的一些模板方法,子类只需要覆盖这几个方法即可注意着五个方法并不是抽象方法,因此子类并不是必须全部覆盖这些方法。JUC中实现这些方法的子类有:
-
tryAcquire():独占方式。尝试获取资源,成功则返回true,失败则返回false。
-
tryRelease():独占方式。尝试释放资源,成功则返回true,失败则返回false。
-
tryAcquireShared():共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
-
tryReleaseShared():共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
注意:AQS用到了模板方法设计模式。
模板方法模式: 在一个方法中定义一个算法的骨架,而将一些步骤延迟到子类中。模板方法使得子类可以在不改变算法结构的情况下,重新定义算法中的某些步骤。
独占式:同一时刻仅有一个线程持有同步状态,如ReentrantLock。又可分为公平锁和非公平锁。
- 公平锁: 按照线程在队列中的排队顺序,有礼貌的,先到者先拿到锁。
- 非公平锁: 当线程要获取锁时,无视队列顺序直接去抢锁,不讲道理的,谁抢到就是谁的。
共享式:多个线程可同时执行,如Semaphore/CountDownLatch等都是共享式的产物。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
2.1 独占锁的获取
参考: https://www.cnblogs.com/200911/p/6031350.html
调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,即线程获取同步状态失败后进入同步队列,后续对线程进行中断操作时,线程不会从同步队列中移除。
(1) 当前线程实现通过tryAcquire()方法尝试获取锁,获取成功的话直接返回,如果尝试失败的话,进入等待队列排队等待,可以保证线程安全(CAS)的获取同步状态。
(2) 如果尝试获取锁失败的话,构造同步节点(独占式的Node.EXCLUSIVE),通过addWaiter(Node node,int args)方法,将节点加入到同步队列的队列尾部。
(3) 最后调用acquireQueued(final Node node, int args)方法,使该节点以死循环的方式获取同步状态,如果获取不到,则阻塞节点中的线程。acquireQueued方法当前线程在死循环中获取同步状态,而只有前驱节点是头节点的时候才能尝试获取锁(同步状态)( p == head && tryAcquire(arg))。
原因:
1.头结点是成功获取同步状态的节点,而头结点的线程释放锁以后,将唤醒后继节点,后继节点线程被唤醒后要检查自己的前驱节点是否为头结点。
2.维护同步队列的FIFO原则,节点进入同步队列以后,就进入了一个自旋的过程,每个节点(后者说每个线程)都在自省的观察。
2.1.1 acquire()
#java.util.concurrent.locks.AbstractQueuedSynchronizer
/**
* Acquires in exclusive(互斥) mode, ignoring(忽视) interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued(排队), possibly
* repeatedly(反复的) blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed(传达) to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*
* 独占式的获取同步状态
*
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2.1.2 tryAcquire()
尝试获取锁:tryAcquire方法:如果获取到了锁,tryAcquire返回true,反之,返回false。
#java.util.concurrent.locks.ReentrantLock.FairSync
protected final boolean tryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//当前锁没被占用
if (!hasQueuedPredecessors() &&//1.判断同步队列中是否有节点在等待
compareAndSetState(0, acquires)) {//2.如果上面!1成立,修改state值(表明当前锁已被占用)
setExclusiveOwnerThread(current);//3.如果2成立,修改当前占用锁的线程为当前线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//占用锁线程==当前线程(重入)
int nextc = c + acquires;//如果是,state继续加1,这里nextc的结果就会 > 1,这个判断表示获取到的锁的线程,还可以再获取锁,这里就是说的可重入的意思
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);//修改status
return true;
}
return false;//直接获取锁失败
}
2.1.3 addWaiter()
如果尝试获取同步状态失败的话,则构造同步节点(独占式的Node.EXCLUSIVE),通过addWaiter(Node node,int args)方法将该节点加入到同步队列的队尾。
#java.util.concurrent.locks.AbstractQueuedSynchronizer
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*
*/
private Node addWaiter(Node mode) {
// 用当前线程构造一个Node对象,mode是一个表示Node类型的字段,或者说是这个节点是独占的还是共享的,或者说AQS的这个队列中,哪些节点是独占的,哪些节点是共享的。
Node node = new Node(Thread.currentThread(), mode);
//Try the fast path of enq; backup to full enq on failure(快速尝试添加尾节点)
Node pred = tail;
if (pred != null) {
node.prev = pred;
//CAS设置尾节点
// 确保节点能够被线程安全的添加,使用CAS方法
// 尝试修改为节点为最新的节点,如果修改失败,意味着有并发,这个时候进入enq中的死循环,进行“自旋”的方式修改
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//多次尝试
enq(node);
return node;
}
由以上代码可得,addWaiter设置尾节点失败的话,调用enq(Node node)方法设置尾节点。
2.1.4 enq()
同步器通过死循环的方式来保证节点的正确添加,在“死循环” 中通过CAS将节点设置成为尾节点之后,当前线程才能从该方法中返回,否则当前线程不断的尝试设置。enq方法将并发添加节点的请求通过CAS变得“串行化”了。
#java.util.concurrent.locks.AbstractQueuedSynchronizer
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*
* 同步器通过死循环的方式来保证节点的正确添加,在“死循环” 中通过CAS将节点设置成为尾节点之后,当前线程才能从该方法中返回,否则当前线程不断的尝试设置。
* enq方法将并发添加节点的请求通过CAS变得“串行化”了。
*
*/
private Node enq(final Node node) {
//死循环尝试,知道成功为止
for (;;) {
Node t = tail;
// 如果 tail 是 null,就创建一个虚拟节点,同时指向 head 和 tail,称为 初始化。
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 如果不是 null
// 和上个方法逻辑一样,将新节点追加到tail节点后面,并更新队列的 tail 为新节点。
// 只不过这里是死循环的,失败了还可以再来 。
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
为什么要创建一个虚拟节点 ?
参考: https://juejin.im/post/5ae755606fb9a07ab97942a4
原因:每个节点都必须设置前置节点的 ws 状态(这个状态为是为了保证数据一致性)为 SIGNAL,所以必须要一个前置节点,而这个前置节点,实际上就是当前持有锁的节点。 而第一个节点是没有前置节点的,所以需要创建一个虚拟节点。 AQS 使用的 CLH 锁,需要一个虚拟 head 节点,这个节点的作用是防止重复释放锁。当第一个进入队列的节点没有前置节点的时候,就会创建一个虚拟的。
2.1.5 acquireQueued()
在队列中的线程获取锁的过程:
- 获取当前节点的前置节点,如果前置节点是头结点,说明自己可以开始竞争锁资源了,如果成功,将当前节点置为头节点返回;
- 如果当前节点不是头结点,调用shouldParkAfterFailedAcquire判断该节点是否可以进入休眠状态,分支逻辑如下:
- 前置节点的waitStatus如果是SIGNAL,说明后续节点可以休息返回ture;
- 如果前置节点的waitStatus是CANCELLED状态(>0),通过循环跳过当前节点前置所有的CANCELLED节点;(之后结合释放锁来看)
- 否则将前置节点的状态CAS改为SIGNAL,acquireQueued下一次自旋,节点就会返回true;
- 如果返回允许挂起后调用parkAndCheckInterrupt(),通过LockSupport.lock()方法挂起当前线程;
acquireQueued方法当前线程在死循环中获取同步状态,而
只有前驱节点是头节点才能尝试获取同步状态
(锁)( p == head && tryAcquire(arg))
原因:1.头结点是成功获取同步状态(锁)的节点,
而头节点的线程释放了同步状态以后
,将会唤醒其后继节点,后继节点的线程被唤醒后要检查自己的前驱节点是否为头结点。2.维护同步队列的FIFO原则,节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说是每个线程)都在自省的观察
#java.util.concurrent.locks.AbstractQueuedSynchronizer
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//死循环检查(自旋检查)当前节点的前驱节点是否为头结点,才能获取锁
for (;;) {
// 获取节点的前驱节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//节点中的线程循环的检查,自己的前驱节点是否为头节点
//将当前节点设置为头结点,移除之前的头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 否则检查前一个节点的状态,看当前获取锁失败的线程是否要挂起
if (shouldParkAfterFailedAcquire(p, node) &&
//如果需要挂起,借助JUC包下面的LockSupport类的静态方法park挂起当前线程,直到被唤醒
parkAndCheckInterrupt())//利用unsafe.park()进行空转(阻塞)
interrupted = true;//如果Thread.interrupt()被调用,(不会真的被打断,会继续循环空转直到获取到锁)
}
} finally {
//如果有异常
if (failed)
//取消请求,将当前节点从队列中移除
cancelAcquire(node);
}
}
2.1.6 shouldParkAfterFailedAcquire()
shouldParkAfterFailedAcquire()是改变waitStatus状态的方法,通过以下规则,判断“当前线程”是否需要被阻塞。
规则1:如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。
规则2:如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。
规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
* 返回当前线程是否应该阻塞
*
* 说明:
(01) 关于waitStatus请参考下表(中扩号内为waitStatus的值),更多关于waitStatus的内容,可以参考前面的Node类的介绍。
CANCELLED[1] -- 当前线程已被取消
SIGNAL[-1] -- “当前线程的后继线程需要被unpark(唤醒)”。一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
CONDITION[-2] -- 当前线程(处在Condition休眠状态)在等待Condition唤醒
PROPAGATE[-3] -- (共享锁)其它线程获取到“共享锁”
[0] -- 当前线程不属于上面的任何一种状态。
*
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点的状态
int ws = pred.waitStatus;
// 如果前驱节点是SIGNAL状态,则意味着当前线程需要unpark唤醒,此时返回true
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release to signal it, so it can safely park.
*/
return true;
// 如果前继节点是取消的状态,则设置当前节点的“当前前继节点为”原节点的前继节点
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we need a signal, but don't park yet. Caller will need to retry to make sure
* it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
独占式的获取同步状态的流程如下:
2.2. 独占锁的释放
release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
2.2.1 unlock()
unlock()在ReentrantLock.java中实现的,源码如下:
/*
* 1. unlock():unlock()是解锁函数,它是通过AQS的release()函数来实现的。
* 在这里,“1”的含义和“获取锁的函数acquire(1)的含义”一样,它是设置“释放锁的状态”的参数。
* 由于“公平锁”是可重入的,所以对于同一个线程,每释放锁一次,锁的状态-1。
*/
public void unlock() {
sync.release(1);
}
2.2.2 release()
release()会调用tryRelease方法尝试释放当前线程持有的锁(同步状态),成功的话唤醒后继线程,并返回true,否则直接返回false
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
2.2.3 tryRelease()
tryRelease() 尝试释放当前线程的同步状态(锁)
protected final boolean tryRelease(int releases) {
//c为释放后的同步状态
int c = getState() - releases;
//判断当前释放锁的线程是否为获取到锁(同步状态)的线程,不是抛出异常(非法监视器状态异常)
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果锁(同步状态)已经被当前线程彻底释放,则设置锁的持有者为null,同步状态(锁)变的可获取
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2.2.4 unparkSuccessor()
释放锁成功后,找到AQS的头结点,并唤醒它即可:
// 4. 唤醒头结点的后继节点
private void unparkSuccessor(Node node) {
//获取头结点(线程)的状态
int ws = node.waitStatus;
//如果状态<0,设置当前线程对应的锁的状态为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//解释:Thread to unpark is held in successor, which is normally just the next node.
//But if cancelled or apparently(显然) null, traverse backwards(向后遍历) from tail to find the actual(实际的) non-cancelled successor(前继节点).
//从队列尾部开始往前去找最前面的一个waitStatus小于0的节点。
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒后继节点对应的线程
if (s != null)
LockSupport.unpark(s.thread);
}
2.3 共享锁的获取
acquireShared(int)是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。
2.3.1 acquireShared()
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//加入队列尾部
boolean failed = true;//是否成功标志
try {
boolean interrupted = false;//等待过程中是否被中断过的标志
for (;;) {
final Node p = node.predecessor();//前驱
//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
if (p == head) {
int r = tryAcquireShared(arg);//尝试获取资源
if (r >= 0) {//成功
//将head指向自己,还有剩余资源可以再唤醒之后的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)//如果等待过程中被打断过,此时将中断补上。
selfInterrupt();
failed = false;
return;
}
}
//判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这里tryAcquireShared()依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。所以这里acquireShared()的流程就是:
-
tryAcquireShared()尝试获取资源,成功则直接返回;
-
失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。
-
doAcquireShared(int)此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。
2.4 共享锁的释放
releaseShared()是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。
2.4.1 releaseShared()
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//尝试释放资源
doReleaseShared();//唤醒后继结点
return true;
}
return false;
}
此方法的流程也比较简单,一句话:释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。例如,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值。
四、ConditionObject
参考: http://zhongmingmao.me/2016/08/11/concurrent-conditionobject/
synchronized控制同步的时候,可以配合Object的wait()、notify(),notifyAll() 系列方法可以实现等待/通知模式。
而Lock呢?它提供了条件Condition接口,配合await(),signal(),signalAll() 等方法也可以实现等待/通知机制。ConditionObject实现了Condition接口,给AQS提供条件变量的支持 。
1. Condition接口
public interface Condition {
// ===== await方法列表
// 使当前线程进入等待状态直到被signal或中断,相当于synchronized等待唤醒机制中的wait()方法
void await() throws InterruptedException;
// 使当前线程进入等待状态直到被signal,不响应中断
void awaitUninterruptibly();
// 使当前线程进入等待状态直到被signal、或被中断、或超时(相对时间)
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 与awaitNanos类似,可以指明时间单位
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 与awaitNanos类似,只是采用的是绝对时间
boolean awaitUntil(Date deadline) throws InterruptedException;
// ===== signal方法列表
// 唤醒一个等待在某个Condition实例上的线程,必须首先持有与Condition相关联的锁,相当于notify()
void signal();
// 与signal类似,相当于notifyAll()
void signalAll();
}
一个很关键的地方:Condition
的实现类在重写Condition
的所有方法
都建议先持有与Condition关联的锁
,AQS
中的ConditionObject
就满足这一点,因此调用ConditionObject
的方法是线程安全
的,这里说的线程安全有一个前提,就是线程必须先持有独占锁
。
newCondition()
实际创建的是ConditionObject
对象
// From ReentrantLock
public Condition newCondition() {
return sync.newCondition();
}
// From Sync
final ConditionObject newCondition() {
return new ConditionObject();
}
2. 核心结构
public class ConditionObject implements Condition, java.io.Serializable {
// 条件队列(condition queue)的头结点
private transient Node firstWaiter;
// 条件队列(condition queue)的尾节点
private transient Node lastWaiter;
// 中断模式是为了对不同的中断情况做不同的处理,进而告诉上层调用者中断情况,有2种模式
// 中断模式:需要重新设置线程的中断状态
private static final int REINTERRUPT = 1;
// 中断模式:需要抛出InterruptedException异常
private static final int THROW_IE = -1;
}
3. Condition(条件)队列与CLH(同步)队列
-
条件队列
仅有nextWaiter
,因此**条件队列是单向非循环队列
,而同步队列是双向非循环队列
** - 条件队列中节点只有3种等待状态:
-
CANCELLED
:需要从条件队列中移除 -
CONDITION
:等待被转移到同步队列 -
0
:转移过程中或已经转移完成,在_transferAfterCancelledWait
或transferForSignal
_中设置,后面会详细分析
-
-
AQS
只能拥有**1个同步队列
_,但可以拥有多个条件队列
**_
条件队列
与同步队列
的关系大致如下:
简单说明(后面源码分析将详细介绍):
-
ReentrantLock.newCondition()
:创建一个新的ConditionObject
实例,每个ConditionObject
拥有firstWaiter
属性和lastWaiter
属性,对应一个Condition(条件)队列
-
ConditionObject.await()
:将当前线程
包装成节点
后加入到对应的Condition(条件)队列
并进行阻塞
,然后等待被转移
到同步队列
中。并且唤醒CLH(同步)队列中head节点的下一个节点。 -
ConditionObject.signal()
:将ConditionObject实例对应的条件队列中的节点
(从头结点开始往后遍历筛选)转移到AQS同步队列的队尾
,等待获得独占锁(等待被唤醒)
。获得独占锁后,上面的ConditionObject.await()
方法return,继续执行
区别:
- ConditionObject对象都维护了一个单独的等待队列 ,AQS所维护的CLH队列是同步队列,它们节点类型相同,都是Node。
4. 源码分析
4.1 await
该方法的分析在本文中是最复杂的
4.1.1 await()
// From ConditionObject
// 需要先持有独占锁,线程安全
public final void await() throws InterruptedException {
if (Thread.interrupted())
// 线程被中断则直接抛出InterruptedException,可响应中断
throw new InterruptedException();
// ===== 1. 创建新节点并加入条件队列的队尾
Node node = addConditionWaiter();
// ===== 2. 释放锁
// 完全释放独占锁(锁是可重入的)并尝试唤醒同步队列头结点的后继节点,并返回释放锁之前的同步状态
int savedState = fullyRelease(node);
int interruptMode = 0;
// ===== 3. 自旋转移节点(条件队列 -> 同步队列),并记录中断模式
while (!isOnSyncQueue(node)) { // isOnSyncQueue:判断节点是否已经由条件队列转移到同步队列
// 节点还在条件队列中,挂起当前线程,等待被唤醒或被中断
LockSupport.park(this);
// 执行到这里,说明当前线程退出休眠状态,有3种情况:
// 1. ConditionObject.signal -> 节点从条件队列转移到同步队列(前驱节点等待状态为SIGNAL) -> 等待被前驱节点唤醒(unpark)
// 2. ConditionObject.signal -> 节点从条件队列转移到同步队列(前驱节点等待状态为CANCELLED) -> 直接唤醒(unpark)
// 3. 当前线程被中断(interrupt)
// 节点转移过程中当前线程的中断情况,有3种情况:
// 1. 当前线程没有被中断,返回0
// 2. 当前线程被中断 + 中断发生在ConditionObject.signal()调用之前,执行自旋入队操作,记录中断模式:THROW_IE(-1)
// 转移到同步队列后,再次抛出InterruptedException异常,然后执行cancelAcquire,将节点的等待状态置为CANCELLED
// 3. 当前线程被中断 + 中断发生在ConditionObject.signal()调用之后,自旋等待入队操作完成,记录中断模式:REINTERRUPT(1)
// 转移到同步队列后,仅仅设置对应线程的中断状态
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
// 由于中断而导致退出休眠状态,则退出循环
// interruptMode = THROW_IE(-1) OR REINTERRUPT(1)
break;
}
// ===== 4. 自旋请求独占锁,并维护中断模式
// 执行到这里,说明线程被中断(被中断也会完成节点转移,下面会详细分析)或者节点转移成功,所以此时节点已经转移到了同步队列
// fullyRelease已经释放了独占锁,下面会等待独占锁(acquireQueued方法),与包裹await()方法的unlock()方法配对
if (acquireQueued(node, savedState) // acquireQueued是通过自旋来等待锁,并且返回退出休眠状态去竞争锁的原因是否是被中断
&& interruptMode != THROW_IE) // 执行第二个条件判断,说明已经获得锁并且当前线程被中断了,但中断标志被重置了
// 执行到这里,说明interruptMode为0或REINTERRUPT(1)
// 1. 对于REINTERRUPT(1),下面的语句interruptMode=REINTERRUPT,显然是没有意义的
// 2. 对于interruptMode=0,说明上面的while(!isOnSyncQueue(node))循环没有被中断,但在acquireQueued被中断了,
// 且中断标志被重置了,因此需要将interruptMode设置为REINTERRUPT
// 3. 对于THROW_IE(-1),说明判断节点是否已经由条件队列转移到同步队列时发生中断,且中断发生在ConditionObject.signal()调用之前,
// 直接抛出异常即可
// 总结:acquireQueued被中断,但while(!isOnSyncQueue(node))没有被中断,需要记录中断模式为REINTERRUPT
interruptMode = REINTERRUPT;
// ===== 5. 清理条件队列
// 执行到这里说明节点已经转移到同步队列中,且已经获得独占锁(或在acquireQueued的过程中被中断)
// 此时节点不应该跟条件队列有关联了,而且此时节点的状态肯定不为CONDITION
// 因此执行unlinkCancelledWaiters,从条件队列移除该节点
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// ===== 6. 已经中断模式,向上层反馈中断情况
if (interruptMode != 0) // interruptMode = THROW_IE(-1) OR REINTERRUPT(1)
// 依据不同的中断模式,向调用方报告当前线程的中断情况
// 1. ConditionObject.signal方法调用之前中断了当前线程,往外抛出InterruptedException异常,中断线程的后续操作
// 2. ConditionObject.signal方法调用之后中断了当前线程,重置当前线程的中断状态,对线程不会有实际性的影响
reportInterruptAfterWait(interruptMode);
}
4.1.2 addConditionWaiter()
// From ConditionObject
// 需要先持有独占锁,线程安全
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
// 如果条件队列尾节点是非CONDITION节点,从头结点开始遍历条件队列,并移除非CONDITION节点
unlinkCancelledWaiters();
// 获取条件队列最新的尾节点
t = lastWaiter;
}
// 创建新节点,初始等待状态为CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
// 条件队列为空,将firstWaiter指向刚创建的节点node
firstWaiter = node;
else
// 条件队列不为空,原条件队列队尾的后继节点设置为刚创建的节点node
t.nextWaiter = node;
// 更新条件队列的队尾为刚创建的节点
lastWaiter = node;
return node;
}
4.1.3 unlinkCancelledWaiters()
// From ConditionObject
// 需要先持有独占锁,线程安全
// 从头结点开始遍历条件队列,并移除非CONDITION节点
// 很巧妙的代码!!
private void unlinkCancelledWaiters() {
Node t = firstWaiter; // 从头结点开始遍历条件队列,t用于迭代
Node trail = null; // 遍历过程中,用于记录最近的已遍历的CONDITION节点,初始值为null,这点非常重要!!
while (t != null) {
Node next = t.nextWaiter; // next为t在条件队列中的后继节点
if (t.waitStatus != Node.CONDITION) {
// t为非CONDITION节点,首先需要断开t与next的单线链接nextWaiter
t.nextWaiter = null;
if (trail == null)
// trail等于null,说明从头结点到当前遍历节点t都是非CONDITION节点,
// 直接将头结点设置为当前遍历节点的后继节点next
firstWaiter = next;
else
// trail不为null,即已经找到CONDITION节点,
// 将trail的后继节点设置为当前遍历节点的后继节点next,
// 这将跳过trail(不包括)到当前遍历节点(包括),因为这些节点都明确是非CONDITION节点
// 但在当前循环没必要判断next是不是CONDITION节点,那是下个循环的任务
trail.nextWaiter = next;
if (next == null)
// next=null,说明t是原尾节点,
// 直接将尾节点更新为trail(最近的已遍历的CONDITION节点)
lastWaiter = trail;
}
else
// trail用于记录最近的已遍历的CONDITION节点
trail = t;
// t是迭代节点,往后迭代
t = next;
}
}
4.1.4 fullyRelease()
// From AQS
// 需要先持有独占锁,线程安全
// 完全释放独占锁(锁是可重入的)并尝试唤醒同步队列头结点的后继节点,并返回释放锁之前的同步状态
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 释放锁之前的同步状态
int savedState = getState();
// 尝试释放独占锁并唤醒同步队列中头结点的后继节点
// 释放锁调用的tryRelease方法必须首先要持有锁
// 说明了ConditionObject.await()方法必须要先持有ConditionObject对应的锁
if (release(savedState)) {
failed = false;
// 成功释放独占锁
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
// fullyRelease在addConditionWaiter之后,节点已经进入了条件队列,
// 因此在释放独占锁失败,需要将节点的等待状态置为CANCELLED,等待被移除
// 释放独占锁失败的场景:调用await方法时没有先持有独占锁
node.waitStatus = Node.CANCELLED;
}
}
4.1.5 isOnSyncQueue()
// From AQS
// 判断节点是否已经由条件队列转移到同步队列
// ConditionObject.signal()会将节点从条件队列转移到同步队列
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
// 1. 节点加入条件队列时,等待状态为CONDITION,在节点转移过程中,会将等待状态设置为0,
// 所以如果节点的等待状态为CONDITION,说明节点一定还在条件队列中;
// 2. 转移过程中会首先设置节点的同步队列前驱节点属性prev,
// 如果节点的同步队列前驱节点属性为null,说明节点一定还在条件队列中,
// 另外需要注意的是,即使节点拥有了同步队列的前驱节点属性prev也不能说明节点已经转移到了同步队列中,
// 因为有可能compareAndSetTail失败,那么同步队列的原尾节点的后继节点依旧为null,而不是node
// 此时node还只是单方面的连接到同步队列,同步队列中没有任何节点将其作为前驱节点或后继节点
// 更详细的分析请参照博文:「并发 - JUC - ReentrantLock - 源码剖析」
return false;
if (node.next != null)
// 如果节点拥有了同步队列的后继节点next,那么节点一定已经转移到了同步队列中
// 更详细的分析请参照博文:「并发 - JUC - ReentrantLock - 源码剖析」
return true;
// 从同步队列的尾节点向前遍历,看能否找到节点node
// 由于入队操作是在队尾,因此大部分情况下,当前节点不会离同步队列队尾太远,效率比较高
return findNodeFromTail(node);
}
4.1.6 findNodeFromTail()
// From AQS
// 从同步队列的尾节点向前遍历(依据节点的prev属性,而prev属性用于连接同步队列的),看能否找到节点node
private boolean findNodeFromTail(Node node) {
Node t = tail; // 从同步队列尾节点开始遍历
for (;;) {
if (t == node)
return true;
if (t == null)
// t.next为head,即同步队列头结点
return false;
t = t.prev;
}
}
4.1.7 checkInterruptWhileWaiting()
// From ConditionObject
// 判断节点转移过程中当前线程的中断情况
// 1. 当前线程没有被中断,返回0
// 2. 当前线程被中断 + 中断发生在ConditionObject.signal()调用之前,执行自旋入队操作,返回THROW_IE(-1)
// 时序:中断(直接中断await线程,继续执行) -> signal,
// 这种情况下,中断先发生,按照正常语义,对应线程已经没有继续执行的必要,因此转移到同步队列后,需要再次抛出异常,取消排队
// 3. 当前线程被中断 + 中断发生在ConditionObject.signal()调用之后,自旋等待入队操作的完成,返回REINTERRUPT(1)
// 时序:signal(完成节点转移才会唤醒await线程,继续执行) -> 中断,
// 这种情况下,signal先发生,按照正常语义,对应的线程应该继续执行
// 代码的套路好深啊!!
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ? // 返回线程是否被中断,并重置中断状态
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0; // 没有被线程没有被中断,返回0
}
4.1.8 transferAfterCancelledWait()
// From AQS
// 如果线程中断发生在ConditionObject.signal()调用之前,执行入队操作,返回true,对应THROW_IE
// 如果线程中断发生在ConditionObject.signal()调用之后,自旋等待入队操作完成,返回false,对应REINTERRUPT
// 即便发生中断,也会自旋完成节点的转移,这一点很重要!!
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { // 很巧妙的设计!!
// CAS成功地将节点的等待状态从CONDITION置为0,则进入同步队列
// 执行到这里,说明是ConditionObject.signal(实际上是transferForSignal)尚未被调用
// 因为ConditionObject.signal在将节点转移到同步节点时也会执行同样的CAS操作(将节点的等待状态从CONDITION置为0)
// 如果ConditionObject.signal的CAS操作成功了,上面的CAS操作就会失败
// 因此返回true,表明中断发生在ConditionObject.signal()之前
// 这会导致transferForSignal不会继续执行转移操作,因此这里要完成transferForSignal本该完成的工作(节点转移)
// 自旋进入同步队列!!
enq(node);
return true;
}
// 执行到这里是因为ConditionObject.signal已经将节点的等待状态置为0,导致上面的CAS操作失败
// 因此返回false,表明中断发生在ConditionObject.signal()调用之后,这时节点转移可能还没有完成(这个概率很低)
// 出现这种情况,就通过自旋来等待转移操作完成(即便发生中断,依旧会转移节点)!!
while (!isOnSyncQueue(node))
Thread.yield(); // 尝试让出CPU资源,但不会让出锁资源
return false;
}
4.1.9 reportInterruptAfterWait()
// From ConditionObject
// 依据不同的中断模式,向调用方报告当前线程的中断情况
// 1. 如果中断模式是THROW_IE时,则抛出InterruptedException异常
// 3. 如果中断模式是REINTERRUPT时,则执行线程自我中断,重置当前线程中断状态
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
4.2 signal
ConditionObject.signal**并不总是直接唤醒线程
**,而是首先将节点从条件队列
转移到同步队列
,再依据在同步队列中前驱节点的等待状态
做不同的处理
- 如果
被转移的节点
在同步队列中的**前驱节点没有被取消
**,那么被转移的节点
在同步队列
中等待锁
- 如果
被转移的节点
在同步队列中的**前驱节点被取消
**了,才会直接唤醒被转移节点
的关联线程,这点比较重要,不要认为signal就是直接唤醒
4.2.1 signal()
// From ConditionObject
// 从条件队列头节点开始遍历,找出第一个需要转移的节点,并转移到同步队列
public final void signal() {
if (!isHeldExclusively()) // 当前线程是否持有独占锁
// 说明调用ConditionObject.signal()方法之前必须先持有与ConditionObject关联的独占锁
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 条件队列不为空时,从条件队列头节点开始遍历,找出第一个需要转移的节点,并转移到同步队列
doSignal(first);
}
4.2.2 isHeldExclusively()
// From Sync
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
4.2.3 doSignal()
// From ConditionObject
// 从条件队列头节点开始遍历,找出第一个需要转移的节点,并转移到同步队列
private void doSignal(Node first) {
do {
if ((firstWaiter = first.nextWaiter) == null)
// 如果条件队列的头结点为null,条件队列的尾节点必为null
lastWaiter = null;
// first将要被转移到同步队列,需要从条件队列中断开
first.nextWaiter = null;
} while (
// 没有成功转移有效节点并且未达到条件队列尾节点,继续循环
!transferForSignal(first) && (first = firstWaiter) != null);
}
4.2.4 transferForSignal()
// From AQS
// 将节点从条件队列转移到同步队列,转移成功且没有被中断则返回true,因中断而取消则返回false
// 即成功转移有效节点返回true,否则返回false
final boolean transferForSignal(Node node) {
// 转移节点之前首先将其等待状态设置为0
// 这与ReentrantLock.lock()竞争锁失败时,封装成节点并准备进入同步队列的场景保持一致
// 那时节点的等待状态也是0,因此当前节点准备进入同步队列前,等待状态也设置为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
// 节点的等待状态为0:在transferForSignal被调用前,线程因中断而退出休眠状态,继续执行await()后半段代码
// 这会通过transferAfterCancelledWait来校验中断发生在transferForSignal之前还是transferForSignal之后
// 如果是之前,那么此时的预期值为0,CAS会失败,直接返回false,
// transferAfterCancelledWait()方法会在中断产生时完成节点转移工作,进入下一循环
return false;
// 节点自旋进入同步队列,并返回前驱节点
// 更详细的分析请参照博文:「并发 - JUC - ReentrantLock - 源码剖析」
Node p = enq(node);
int ws = p.waitStatus; // 前驱节点的等待状态
// 1. ws>0,说明前驱节点的等待状态为CANCELLED,放弃竞争锁,直接唤醒当前节点
// 2. 如果ws<=0,则统一将前驱节点跟新为SIGNAL,表示当前驱节点取消时,能够唤醒当前节点,当前节点可以被安全地挂起
// 如果CAS更新失败,则直接唤醒当前节点
// 简单概括起来就是如果前驱节点取消了,就直接唤醒当前节点
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 线程被唤醒后会继续执行await()的后半段代码
return true;
}
4.3 awaitUninterruptibly
前面分析的await()
方法是响应中断
的,本节介绍的waitUninterruptibly()
是不响应中断
的
// From ConditionObject
// 与await()方法类似,仅标注不一样的地方
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
// 如果曾经由于中断而退出休眠状态,而标记被中断
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt(); // 在节点转移过程中,如果曾经被中断,则重新设置中断标志
}
4.4 await(long time,TimeUnit unit)
前面分析的await()
方法是不限时等待
的,本节介绍的await(long time,TimeUnit unit)
是限时等待
的
// From ConditionObject
// 与await()方法类似,仅标注不一样的地方
public final boolean await(long time, TimeUnit unit) throws InterruptedException {
// 剩余的等待时长(纳秒)
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
// 超时的绝对时间
final long deadline = System.nanoTime() + nanosTimeout;
// 标注是否超时
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
// 剩余的等待时长为非正值,说明超时了,则执行transferAfterCancelledWait并取消等待
// transferAfterCancelledWait如果返回true,说明节点转移成功
// transferAfterCancelledWait如果返回false,说明在超时发生前,ConditionObject.signal已经触发,可以归纳为没有超时
timedout = transferAfterCancelledWait(node);
break;
}
// 当剩余的等待时长不小于1000纳秒时,这选择限时挂起线程,线程在nanosTimeout会自动唤醒(假如期间没有被中断)
// 当剩余的等待时长小于1000纳秒时,选择自旋,不挂起线程
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
// 更新剩余的等待时长
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout; // 返回是否await等待超时
}
下一篇: 并发编程的基石:操作系统