(二十九) ReentrantLock
前言: 从准备面试的时候就开始时不时接触ReentrantLock,相关博客也看了不少,总是感觉不是很理解,还是自己没有动手主动理解过的原因吧,现在工作也找了,离职也快了,熟悉一下ReentrantLock。
1. 基本概念
1. ReenTrantLock可以指定是公平锁还是非公平锁。而synchronized只能是非公平锁。所谓的公平锁就是先等待的线程先获得锁。
2. ReenTrantLock提供了一个Condition(条件)类,用来实现分组唤醒需要唤醒的线程们,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程。
3. ReenTrantLock提供了一种能够中断等待锁的线程的机制,通过lock.lockInterruptibly()来实现这个机制。
1.1 源码注释
ReentrantLock是一个可重入的互斥锁定 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁定相同的一些基本行为和语义,但功能更强大。ReentrantLock 将由最近成功获得锁定,并且还没有释放该锁定的线程所拥有。当锁定没有被另一个线程所拥有时,调用 lock 的线程将成功获取该锁定并返回。如果当前线程已经拥有该锁定,此方法将立即返回。可以使用 isHeldByCurrentThread() 和 getHoldCount() 方法来检查此情况是否发生。
Reentrant的构造器提供了fairness参数用于指定该lock是公平锁还是非公平锁。当设为true时,各线程互相争夺,一般是等待时间最长的线程获取访问锁的权限。 另外,这个锁并不能保证任何特定的访问顺序。比那些使用默认设置(即使用非公平锁的实现)的程序, 使用被许多线程访问的公平锁的程序可能显示较低的整体吞吐量(即较慢,或者慢得多),但在获取锁和缺少饥饿的保证上有更小的差异。 但请注意,锁的公平性并不能保证线程调度的公平性。 因此,使用一个公平锁可能会被许多线程其中之一连续获得多次,而其他活动线程这时就阻塞住,获取不到该锁。
还要注意,不计时的{@link #tryLock()}方法不会尊重公平的环境。如果锁是可用的,即使其他线程正在等待,tryLock()会成功。
* A reentrant mutual exclusion {@link Lock} with the same basic
* behavior and semantics as the implicit monitor lock accessed using
* {@code synchronized} methods and statements, but with extended
* capabilities.
* <p>A {@code ReentrantLock} is <em>owned</em> by the thread last
* successfully locking, but not yet unlocking it. A thread invoking
* {@code lock} will return, successfully acquiring the lock, when
* the lock is not owned by another thread. The method will return
* immediately if the current thread already owns the lock. This can
* be checked using methods {@link #isHeldByCurrentThread}, and {@link
* #getHoldCount}.
* <p>The constructor for this class accepts an optional
* <em>fairness</em> parameter. When set {@code true}, under
* contention, locks favor granting access to the longest-waiting
* thread. Otherwise this lock does not guarantee any particular
* access order. Programs using fair locks accessed by many threads
* may display lower overall throughput (i.e., are slower; often much
* slower) than those using the default setting, but have smaller
* variances in times to obtain locks and guarantee lack of
* starvation. Note however, that fairness of locks does not guarantee
* fairness of thread scheduling. Thus, one of many threads using a
* fair lock may obtain it multiple times in succession while other
* active threads are not progressing and not currently holding the
* lock.
* Also note that the untimed {@link #tryLock()} method does not
* honor the fairness setting. It will succeed if the lock
* is available even if other threads are waiting.
* <p>It is recommended practice to <em>always</em> immediately
* follow a call to {@code lock} with a {@code try} block, most
* typically in a before/after construction such as:
* <pre> {@code
* class X {
* private final ReentrantLock lock = new ReentrantLock();
* // ...
* public void m() {
* lock.lock(); // block until condition holds
* try {
* // ... method body
* } finally {
* lock.unlock()
* }
* }
* }}</pre>
* <p>In addition to implementing the {@link Lock} interface, this
* class defines a number of {@code public} and {@code protected}
* methods for inspecting the state of the lock. Some of these
* methods are only useful for instrumentation and monitoring.
* <p>Serialization of this class behaves in the same way as built-in
* locks: a deserialized lock is in the unlocked state, regardless of
* its state when serialized.
* <p>This lock supports a maximum of 2147483647 recursive locks by
* the same thread. Attempts to exceed this limit result in
* {@link Error} throws from locking methods.
* @since 1.5
* @author Doug Lea
1.2 相关知识介绍
- 可重入锁。可重入锁是指同一个线程可以多次获取同一把锁。ReentrantLock和synchronized都是可重入锁。
- 可中断锁。可中断锁是指线程尝试获取锁的过程中,是否可以响应中断。synchronized是不可中断锁,而ReentrantLock则提供了中断功能。
- 公平锁与非公平锁。公平锁是指多个线程同时尝试获取同一把锁时,获取锁的顺序按照线程达到的顺序,而非公平锁则允许线程“插队”。synchronized是非公平锁,而ReentrantLock的默认实现是非公平锁,但是也可以设置为公平锁。
- CAS操作(CompareAndSwap)。CAS操作简单的说就是比较并交换。CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该位置的值。CAS 有效地说明了“我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。” Java并发包(java.util.concurrent)中大量使用了CAS操作,涉及到并发的地方都调用了sun.misc.Unsafe类方法进行CAS操作。
2. 使用
2.1 简单使用
package com.example.demo_29_reentrantlock;
import java.util.concurrent.locks.ReentrantLock;
public class MyClass {
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args){
new ReentrantLockThread("jiatai1").start();
new ReentrantLockThread("jiatai2").start();
new ReentrantLockThread("jiatai3").start();
new ReentrantLockThread("jiatai4").start();
new ReentrantLockThread("jiatai5").start();
private static void f1(){
try {
System.out.println("do something" + Thread.currentThread());
}catch (Exception ex){
}finally {
static class ReentrantLockThread extends Thread{
ReentrantLockThread(String s){
public void run() {
2.2 Condition
条件(也称为条件队列 或条件变量)为线程提供了一个含义,以便在某个状态条件现在可能为 true 的另一个线程通知它之前,
的锁与该条件相关联。等待提供一个条件的主要属性是:以原子方式 释放相关的锁,并挂起当前线程,就像
package com.example.demo_29_reentrantlock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProductQueue<T> {
private final T[] items;
private final Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
private int head, tail, count;
public ProductQueue(int maxSize) {
items = (T[]) new Object[maxSize];
public ProductQueue() {
public void put(T t) throws InterruptedException {
try {
while (count == getCapacity()) {
items[tail] = t;
if (++tail == getCapacity()) {
tail = 0;
} finally {
public T take() throws InterruptedException {
try {
while (count == 0) {
T ret = items[head];
items[head] = null;//GC
if (++head == getCapacity()) {
head = 0;
return ret;
} finally {
public int getCapacity() {
return items.length;
public int size() {
try {
return count;
} finally {
package com.example.demo_29_reentrantlock;
public class ProductQueueTest {
private static ProductQueue productQueue = new ProductQueue<Integer>();
public static void main(String[] args) {
new FactoryThread().start();
new ConsumeThread("jiatai1").start();
new ConsumeThread("jiatai2").start();
new ConsumeThread("jiatai3").start();
new ConsumeThread("jiatai4").start();
static class FactoryThread extends Thread{
public void run() {
for(int i = 0; i < 20; i++) {
try {
} catch (InterruptedException e) {
static class ConsumeThread extends Thread{
ConsumeThread(String s){
public void run() {
for(int i = 0; i < 5; i++) {
try {
System.out.println(Thread.currentThread() + " get " + productQueue.take());
} catch (InterruptedException e) {
Thread[jiatai2,5,main] get 1
Thread[jiatai2,5,main] get 2
Thread[jiatai2,5,main] get 3
Thread[jiatai2,5,main] get 4
Thread[jiatai2,5,main] get 5
Thread[jiatai1,5,main] get 0
Thread[jiatai3,5,main] get 7
Thread[jiatai3,5,main] get 8
Thread[jiatai3,5,main] get 9
Thread[jiatai3,5,main] get 10
Thread[jiatai3,5,main] get 11
Thread[jiatai1,5,main] get 12
Thread[jiatai1,5,main] get 13
Thread[jiatai1,5,main] get 14
Thread[jiatai1,5,main] get 15
Thread[jiatai4,5,main] get 6
Thread[jiatai4,5,main] get 16
Thread[jiatai4,5,main] get 17
Thread[jiatai4,5,main] get 18
Thread[jiatai4,5,main] get 19
2.3 lockInterruptibly
2)如果该锁没有被另一个线程保持,则获取该锁并立即返回,将锁的保持计数设置为 1。
3)如果当前线程已经保持此锁,则将保持计数加 1,并且该方法立即返回。
4)如果锁被另一个线程保持,则出于线程调度目的,禁用当前线程,并且在发生以下两种情况之一以前,该线程将一直处于休眠状态: 1)锁由当前线程获得;或者 2)其他某个线程中断当前线程。
5)如果当前线程获得该锁,则将锁保持计数设置为 1。
如果当前线程: 1)在进入此方法时已经设置了该线程的中断状态;或者 2)在等待获取锁的同时被中断。 则抛出 InterruptedException,并且清除当前线程的已中断状态。
6)在此实现中,因为此方法是一个显式中断点,所以要优先考虑响应中断,而不是响应锁的普通获取或重入获取。 指定者: 接口 Lock 中的 lockInterruptibly抛出: InterruptedException 如果当前线程已中断。
3. 对照使用学习源码
3.1 构造函数
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
public ReentrantLock() {
sync = new NonfairSync();
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
* @param fair {@code true} if this lock should use a fair ordering policy
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
3.2 lock()、NonfairSync & FairSync
* Acquires the lock.
* <p>Acquires the lock if it is not held by another thread and returns
* immediately, setting the lock hold count to one.
* <p>If the current thread already holds the lock then the hold
* count is incremented by one and the method returns immediately.
* <p>If the lock is held by another thread then the
* current thread becomes disabled for thread scheduling
* purposes and lies dormant until the lock has been acquired,
* at which time the lock hold count is set to one.
public void lock() {
3.2.1 NonfairSync
* Sync object for non-fair locks
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
final void lock() {
if (compareAndSetState(0, 1))
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
* Sets the thread that currently owns exclusive access.
* A {@code null} argument indicates that no thread owns access.
* This method does not otherwise impose any synchronization or
* {@code volatile} field accesses.
* @param thread the owner thread
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
acquire做了两件事情:1.获取锁 2.如果获取不成功,则将线程排个队。
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
return true;
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
return true;
return false;
* Creates and enqueues node for current thread and given mode.
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
U.putObject(node, Node.PREV, oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
} else {
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)中的addWaiter就是维护一条双向链表,将新添加的Node放在链表的尾巴上。initializeSyncQueue对应的就是在链表为空的时候初始化个默认头结点。
* Initializes head and tail fields on first contention.
private final void initializeSyncQueue() {
Node h;
if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
tail = h;
* Various flavors of acquire, varying in exclusive/shared and
* control modes. Each is mostly the same, but annoyingly
* different. Only a little bit of factoring is possible due to
* interactions of exception mechanics (including ensuring that we
* cancel if tryAcquire throws exception) and other control, at
* least not without hurting performance too much.
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
p.next = null; // help GC
return interrupted;
if (shouldParkAfterFailedAcquire(p, node) &&
interrupted = true;
} catch (Throwable t) {
throw t;
3.2.2 FairSync
* Sync object for fair locks
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
return true;
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
return true;
return false;
* Queries whether any threads have been waiting to acquire longer
* than the current thread.
* <p>An invocation of this method is equivalent to (but may be
* more efficient than):
* <pre> {@code
* getFirstQueuedThread() != Thread.currentThread()
* && hasQueuedThreads()}</pre>
* <p>Note that because cancellations due to interrupts and
* timeouts may occur at any time, a {@code true} return does not
* guarantee that some other thread will acquire before the current
* thread. Likewise, it is possible for another thread to win a
* race to enqueue after this method has returned {@code false},
* due to the queue being empty.
* <p>This method is designed to be used by a fair synchronizer to
* avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>.
* Such a synchronizer's {@link #tryAcquire} method should return
* {@code false}, and its {@link #tryAcquireShared} method should
* return a negative value, if this method returns {@code true}
* (unless this is a reentrant acquire). For example, the {@code
* tryAcquire} method for a fair, reentrant, exclusive mode
* synchronizer might look like this:
* <pre> {@code
* protected boolean tryAcquire(int arg) {
* if (isHeldExclusively()) {
* // A reentrant acquire; increment hold count
* return true;
* } else if (hasQueuedPredecessors()) {
* return false;
* } else {
* // try to acquire normally
* }
* }}</pre>
* @return {@code true} if there is a queued thread preceding the
* current thread, and {@code false} if the current thread
* is at the head of the queue or the queue is empty
* @since 1.7
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
3.3 unlock
* Attempts to release this lock.
* <p>If the current thread is the holder of this lock then the hold
* count is decremented. If the hold count is now zero then the lock
* is released. If the current thread is not the holder of this
* lock then {@link IllegalMonitorStateException} is thrown.
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
public void unlock() {
继而调用到sync的release(int arg)方法
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
return true;
return false;
* Attempts to set the state to reflect a release in exclusive
* mode.
* <p>This method is always invoked by the thread performing release.
* <p>The default implementation throws
* {@link UnsupportedOperationException}.
* @param arg the release argument. This value is always the one
* passed to a release method, or the current state value upon
* entry to a condition wait. The value is otherwise
* uninterpreted and can represent anything you like.
* @return {@code true} if this object is now in a fully released
* state, so that any waiting threads may attempt to acquire;
* and {@code false} otherwise.
* @throws IllegalMonitorStateException if releasing would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if exclusive mode is not supported
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
return free;
* Initializes head and tail fields on first contention.
private final void initializeSyncQueue() {
Node h;
if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
tail = h;
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
p.next = null; // help GC
return interrupted;
if (shouldParkAfterFailedAcquire(p, node) &&
interrupted = true;
} catch (Throwable t) {
throw t;
3.4 newCondition()
* Returns a {@link Condition} instance for use with this
* {@link Lock} instance.
* <p>The returned {@link Condition} instance supports the same
* usages as do the {@link Object} monitor methods ({@link
* Object#wait() wait}, {@link Object#notify notify}, and {@link
* Object#notifyAll notifyAll}) when used with the built-in
* monitor lock.
* <ul>
* <li>If this lock is not held when any of the {@link Condition}
* {@linkplain Condition#await() waiting} or {@linkplain
* Condition#signal signalling} methods are called, then an {@link
* IllegalMonitorStateException} is thrown.
* <li>When the condition {@linkplain Condition#await() waiting}
* methods are called the lock is released and, before they
* return, the lock is reacquired and the lock hold count restored
* to what it was when the method was called.
* <li>If a thread is {@linkplain Thread#interrupt interrupted}
* while waiting then the wait will terminate, an {@link
* InterruptedException} will be thrown, and the thread's
* interrupted status will be cleared.
* <li>Waiting threads are signalled in FIFO order.
* <li>The ordering of lock reacquisition for threads returning
* from waiting methods is the same as for threads initially
* acquiring the lock, which is in the default case not specified,
* but for <em>fair</em> locks favors those threads that have been
* waiting the longest.
* </ul>
* @return the Condition object
public Condition newCondition() {
return sync.newCondition();
- condition的用法和object的wait/notify/notifyAll类似
- 等待线程是以FIFO 的顺序被 signalled
final ConditionObject newCondition() {
return new ConditionObject();
3.4.1 await
* Implements interruptible condition wait.
* <ol>
* <li>If current thread is interrupted, throw InterruptedException.
* <li>Save lock state returned by {@link #getState}.
* <li>Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li>Block until signalled or interrupted.
* <li>Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li>If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
if (interruptMode != 0)
* Adds a new waiter to wait queue.
* @return its new wait node
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
t = lastWaiter;
Node node = new Node(Node.CONDITION);
if (t == null)
firstWaiter = node;
t.nextWaiter = node;
lastWaiter = node;
return node;
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
final int fullyRelease(Node node) {
try {
int savedState = getState();
if (release(savedState))
return savedState;
throw new IllegalMonitorStateException();
} catch (Throwable t) {
node.waitStatus = Node.CANCELLED;
throw t;
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
return true;
return false;
* Wakes up node's successor, if one exists.
* @param node the node
private void unparkSuccessor(Node node) {
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
if (s != null)
这边节点操作不是很懂,这里将node后面的节点,如果是空或者waitStatus是CANCELLED,那就置为空,并且从该节点往前遍历,看waitStatus什么时候小于0,赋值给s。最后唤醒s对应的Thread。LockSupport在demo 9 futureTask里接触过,当时还举了下面的例子,这边unpark是唤醒s对应的Thread。
package com.example.demo_9_locksupport;
import java.util.concurrent.locks.LockSupport;
public class MyClass {
public static void main(String[] args){
Thread boy = new Thread(){
public void run() {
System.out.println("he is playing game");
System.out.println("he must give up playing game and shop with his girl friend");
try {
} catch (InterruptedException e) {
System.out.println("his girl friend calls him to shop with her");
3.4.2 signal
// public methods
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
final boolean transferForSignal(Node node) {
* If cannot change waitStatus, the node has been cancelled.
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
return true;
这边unpark对应前面await里的park,是await里的流程可以继续往下走了。简单看来就是唤醒后重新获取锁,这边 获取锁的流程也和lock是一样的。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
if (interruptMode != 0)
3.5 lockInterruptibly
public void lockInterruptibly() throws InterruptedException {
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
* at least once {@link #tryAcquire}, returning on
* success. Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire}
* until success or the thread is interrupted. This method can be
* used to implement method {@link Lock#lockInterruptibly}.
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
* Acquires in exclusive interruptible mode.
* @param arg the acquire argument
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
p.next = null; // help GC
if (shouldParkAfterFailedAcquire(p, node) &&
throw new InterruptedException();
} catch (Throwable t) {
throw t;
