欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

AQS-Condition详解

程序员文章站 2022-04-17 21:31:11
...
AtomicInteger解析:[url]http://donald-draper.iteye.com/blog/2359555[/url]
锁持有者管理器AbstractOwnableSynchronizer:[url]http://donald-draper.iteye.com/blog/2360109[/url]
AQS线程挂起辅助类:LockSupport:[url]http://donald-draper.iteye.com/blog/2360206[/url]
AQS详解-CLH队列,线程等待状态:[url]http://donald-draper.iteye.com/blog/2360256[/url]
/**
* Condition implementation for a {@link
* AbstractQueuedSynchronizer} serving as the basis of a {@link
* Lock} implementation.
* 作为AQS实现锁的一个基础实现Condition。
* <p>Method documentation for this class describes mechanics,
* not behavioral specifications from the point of view of Lock
* and Condition users. Exported versions of this class will in
* general need to be accompanied by documentation describing
* condition semantics that rely on those of the associated
* <tt>AbstractQueuedSynchronizer</tt>.
*方法文档用于描述这个条件实现机制,不是锁和条件的使用者,可以使用的操作。
此类的版本与AbstractQueuedSynchronizer相关联。
* <p>This class is Serializable, but all fields are transient,
* so deserialized conditions have no waiters.
*/
//这个所有的all fields are transient,所以反序列化时,条件没有等待者。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
队列中第一个等待节点线程
private transient Node firstWaiter;
/** Last node of condition queue. */
队列中最后一个等待条件的节点线程
private transient Node lastWaiter;

/**
* Creates a new <tt>ConditionObject</tt> instance.
*/
//初始化实例
public ConditionObject() { }

// Internal methods

/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
//添加一个条件等待线程节点,到条件等待队列
private Node addConditionWaiter() {
Node t = lastWaiter;//取得队列的尾节点
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
//移除队列中非等待条件的线程节点
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建新节点条件线程等待节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//等队列为空,则新节点为头节点,否则加入到队尾
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/**
* Unlinks cancelled waiter nodes from condition queue.
* Called only while holding lock. This is called when
* cancellation occurred during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes into play only when
* timeouts or cancellations occur in the absence of
* signals. It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
*/
//从条件队列,移除取消等待条件的节点线程。当线程只有锁时,会调用此方法。
当在线程等待条件时,被取消,或者添加新节点时,发现尾节点已经取消等待,则
调用此方法。在没有信号条件的时候,方法需要避免垃圾的产生。尽管可能需要遍历
队列,当超时或取消条件等待时候,仍然会触发此方法。此方法会遍历会有节点,
不会因为某个特殊事件的发生,不移除取消条件等待的垃圾节点。

private void unlinkCancelledWaiters() {
//取得头结点
Node t = firstWaiter;
Node trail = null;
//遍历队列,移除非等待条件的节点
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;//队列头
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;//队列尾
}
else
trail = t;
t = next;
}
}
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
//唤醒等待队列中,第一个等待条件的线程节点。
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

}

//AQS
将一个条件队列节点,转移到同步等待队列
transferForSignal(Node node)


/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal).
*/
//将一个条件队列节点,转移到同步等待队列
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/

//将节点状态为等待条件,则将节点的状态,设置线程等待的初始状态0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);//添加到队列
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//如果线程取消等待,或设置唤醒成功,则unpark线程
LockSupport.unpark(node.thread);
return true;
}

/**
* CAS waitStatus field of a node.
*/
//修改节点的等待状态
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}


/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
//添加节点到等待队列,以CAS操作,将节点添加到等待队列中
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

小节:从上面可以看出,唤醒节点时,首先等待条件线程节点状态,
设置为线程等待的初始状态0,然后添加到等待队列,
如果线程取消等待,则移除节点线程,否则通知节点前驱,当前驱
节点线程释放锁时,unpark线程。

再回到ConditionObject
//ConditionObject       
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
//唤醒队列中所有等待条件的节点
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}


// public methods

/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
//如果线程持有独占锁,从头结点开始,唤醒第一个等待条件的线程节点
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
//如果线程持有独占锁,从头结点开始,唤醒所有等待条件的线程节点
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}

/**
* Implements uninterruptible condition wait.
* [list=1]不可中断条件等待
* <li> Save lock state returned by {@link #getState}.
保存锁的当前state
* <li> Invoke {@link #release} with
* saved state as argument, throwing
* IllegalMonitorStateException if it fails.
* <li> Block until signalled.
调用此方法的线程,阻塞至被唤醒
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* [/list]
*/
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();//添加条件等待节点
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}

下面来看fullyRelease(node);
//AQS

/**
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
//释放锁的当前状态,返回保存值,如果是取消等待节点,则抛出异常
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();//获取当前等待状态
if (release(savedState)) {
failed = false;
return savedState;
} else {
//释放失败,抛出异常
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
//如果失释放败,设置节点状态为取消
node.waitStatus = Node.CANCELLED;
}
}


/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
* 释放独占模式锁,用unblock线程
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
//如果尝试释放锁成功
Node h = head;
if (h != null && h.waitStatus != 0)
//如果头结点不为null,且非初始等待状态0,则unpark头结点的后继
unparkSuccessor(h);
return true;
}
return false;
}
//待子类扩展
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

/**
* Wakes up node's successor, if one exists.
*唤醒接待的后继
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
//如果状态值为负,线程需要唤醒,尝试着清除状态值,
int ws = node.waitStatus;
if (ws < 0)
//设置线程状态为初始值
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
//从队尾遍历器前驱,找到最后一个状态值为负的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//unpark节点线程
LockSupport.unpark(s.thread);
}


回到 awaitUninterruptibly
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();//添加条件等待节点
int savedState = fullyRelease(node);//释放节点持有的锁,唤醒队列第一个节点线程
boolean interrupted = false;
//如果节点在同步等待队列上
while (!isOnSyncQueue(node)) {
//park当前线程
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}

看下面一句
isOnSyncQueue(node)

//AQS
/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/
//如果一个节点,刚开始在条件队列,现在,再同步等待队列
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
//条件等待节点线程,返回false
return false;
if (node.next != null) // If has successor, it must be on queue
如果有前驱,代表在同步等待队列上
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
节点的前驱可能为非null,但不在队列中,可能由于CAS操作失败。
所以我们不得不确定,,节点是否在队尾附近。
return findNodeFromTail(node);
}
/**
* Returns true if node is on sync queue by searching backwards from tail.
* Called only when needed by isOnSyncQueue.
* @return true if present
*/
//从队尾遍历,查看节点是否在同步等待队列上
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}

回到 awaitUninterruptibly
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();//添加条件等待节点
int savedState = fullyRelease(node);//释放节点持有的锁,唤醒队列第一个节点线程
boolean interrupted = false;
//如果节点在同步等待队列上
while (!isOnSyncQueue(node)) {
//park当前线程
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}

//Thread
/**
* Tests whether the current thread has been interrupted. The
* <i>interrupted status</i> of the thread is cleared by this method. In
* other words, if this method were to be called twice in succession, the
* second call would return false (unless the current thread were
* interrupted again, after the first call had cleared its interrupted
* status and before the second call had examined it).
*
* <p>A thread interruption ignored because a thread was not alive
* at the time of the interrupt will be reflected by this method
* returning false.
*
判断当前线程是否已经被中断,这个方法可以清楚线程的中断状态。换种说法,
当此方法成功调用两次时,第二次返回为false,
* @return <code>true</code> if the current thread has been interrupted;
* <code>false</code> otherwise.
* @see #isInterrupted()
* @revised 6.0
*/
查看当前线程是否已经被可中断,如果当前线程处于非中断状态,可返回true,否
返回else。
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}

回到 awaitUninterruptibly
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();//添加条件等待节点
int savedState = fullyRelease(node);//释放节点持有的锁,唤醒队列第一个节点线程
boolean interrupted = false;
//如果节点在同步等待队列上
while (!isOnSyncQueue(node)) {
//park当前线程
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}

再看这一段
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();

//AQS
/*
* Various flavors of acquire, varying in exclusive/shared and
* control modes. Each is mostly the same, but annoyingly
* different. Only a little bit of factoring is possible due to
* interactions of exception mechanics (including ensuring that we
* cancel if tryAcquire throws exception) and other control, at
* least not without hurting performance too much.
*/
不同的获取锁方式,对应着不同的控制模式,不如独占、共享。
每一种大部分相同,但极少数不同。不同的,不如获取出现异常时的,取消
方式。
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果节点的前继,是头节点则,尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//前驱线程释放锁之后,是否应该唤醒后继节点,如果是,则
//park当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//如果失败,则取消线程
cancelAcquire(node);
}
}

acquireQueued过程是这样的:
1. 如果当前节点是AQS队列的头结点(如果第一个节点是DUMP节点也就是傀儡节点,
那么第二个节点实际上就是头结点了),
就尝试在此获取锁tryAcquire(arg)。
如果成功就将头结点设置为当前节点(不管第一个结点是否是DUMP节点),返回中断位。否则进行2。
2. 检测当前节点是否应该park(),如果应该park()就挂起当前线程并且返回当前线程中断位。进行操作1。


/**
* Sets head of queue to be node, thus dequeuing. Called only by
* acquire methods. Also nulls out unused fields for sake of GC
* and to suppress unnecessary signals and traversals.
*设置头节点
* @param node the node
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev
*如果一个节点获取锁失败,则检查和更新节点状态
如果节点应该block,返回true
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
先驱节点释放锁时,需要唤醒后继节点
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
如果先驱节点,等待被取消,向前遍历,找到第一个等待条件的节点
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
设置先驱节点状态为SIGNAL,当先驱释放锁时,需要唤醒后继节点线程
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}


/**
* Convenience method to park and then check if interrupted
* park当前线程
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

// Utilities for various versions of acquire

/**
* Cancels an ongoing attempt to acquire.
*取消一个尝试获取锁的线程
* @param node the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
//清空节点线程
node.thread = null;

// Skip cancelled predecessors
Node pred = node.prev;
//找到第一个,等待条件的节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;

// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
//设置节点状态,为取消状态
node.waitStatus = Node.CANCELLED;

// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
//节点指向自己,等待gc回收
node.next = node; // help GC
}
}


再来看
 selfInterrupt();


  /**
* Convenience method to interrupt current thread.
*/
private static void selfInterrupt() {
Thread.currentThread().interrupt();
}

在回到这个方法
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();//添加条件等待节点
int savedState = fullyRelease(node);//释放节点持有的锁,唤醒队列第一个节点线程
boolean interrupted = false;
//如果节点在同步等待队列上
while (!isOnSyncQueue(node)) {
//park当前线程
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}

小节:非中断模式等待,首先添加新的等待条件线程节点,到等待条件线程队列;
释放节点状态,释放节点锁状态过程,待子类扩展,在过程中,同时唤醒等待队列
头结点;再判断节点是在同步等待队列上,如果不在,则park当前线程;
如果线程已经中断,则取消线程中断状态,即线程非中断等待条件。


回到ConditionObject的其他方法
//ConditionObject
  /*
* For interruptible waits, we need to track whether to throw
* InterruptedException, if interrupted while blocked on
* condition, versus reinterrupt current thread, if
* interrupted while blocked waiting to re-acquire.
*/
当线程是可中断等待时,我们需要捕捉是否抛出中断异常,
当阻塞在条件上时,则中断

/** Mode meaning to reinterrupt on exit from wait */
//在此模式上表示,当退出等待条件时,需要二次中断,即消除中断状态
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
//此模式用于描述,当退出等待条件时,需要抛出异常
private static final int THROW_IE = -1;

/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
//检查节点线程,是否需要消除中断,抛出异常,还是不需要中断
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}

//AQS
/**
* Transfers node, if necessary, to sync queue after a cancelled
* wait. Returns true if thread was cancelled before being
* signalled.
* @param current the waiting thread
* @param node its node
* @return true if cancelled before the node was signalled
*/
在线程被唤醒之前,取消等待则返回true。如果需要的话,
当线程等待被取消时,将线程节点放到,同步等待队列中
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//如果设置节点为初始化状态成功,则添加到同步等待队列
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}

//Thread
该方法与sleep()类似,只是不能由用户指定暂停多长时间,
并且yield()方法只能让同优先级的线程有执行的机会。
/**
* A hint to the scheduler that the current thread is willing to yield
* its current use of a processor. The scheduler is free to ignore this
* hint.
*
* <p> Yield is a heuristic attempt to improve relative progression
* between threads that would otherwise over-utilise a CPU. Its use
* should be combined with detailed profiling and benchmarking to
* ensure that it actually has the desired effect.
*
* <p> It is rarely appropriate to use this method. It may be useful
* for debugging or testing purposes, where it may help to reproduce
* bugs due to race conditions. It may also be useful when designing
* concurrency control constructs such as the ones in the
* {@link java.util.concurrent.locks} package.
*/
public static native void yield();



//ConditionObject


/**
* Implements interruptible condition wait.
* [list=1]
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with
* saved state as argument, throwing
* IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* [/list]
*/
//可中断的条件等待
public final void await() throws InterruptedException {
if (Thread.interrupted())
//如果线程中断,则抛出异常
throw new InterruptedException();
//添加新的线程节点到等待条件队列
Node node = addConditionWaiter();
//释放节点锁状态,unpark等待队列头节点
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//如果节点不在同步等待队列,则park
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//判断线程等待条件后,是中断,还是抛出异常
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//移除取消等待条件的节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode.
*/
//根据线程当前中断模式,确定中断,还是抛出异常
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
//抛出异常
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
//中断
selfInterrupt();
}
/**
* Implements timed condition wait.
* [list=1]
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with
* saved state as argument, throwing
* IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* [/list]
*/
超时等待条件,如果当前线程处于中断,则抛出中断异常,
线程阻塞到,知道唤醒,或时间超时,或被中断。
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
long lastTime = System.nanoTime();
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;

long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return nanosTimeout - (System.nanoTime() - lastTime);
}

/**
* Implements absolute timed condition wait.
* [list=1]
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with
* saved state as argument, throwing
* IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* <li> If timed out while blocked in step 4, return false, else true.
* [/list]
*/
超时等待条件,如果当前线程处于中断,则抛出中断异常,
线程阻塞到,知道唤醒,或到指定时间,或被中断。
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
if (deadline == null)
throw new NullPointerException();
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}

/**
* Implements timed condition wait.
* [list=1]
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with
* saved state as argument, throwing
* IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* <li> If timed out while blocked in step 4, return false, else true.
* [/list]
*/
超时等待条件,如果当前线程处于中断,则抛出中断异常,
线程阻塞到,知道唤醒,或到时间超时,或被中断。
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
if (unit == null)
throw new NullPointerException();
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
long lastTime = System.nanoTime();
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}

// support for instrumentation

/**
* Returns true if this condition was created by the given
* synchronization object.
*
* @return {@code true} if owned
*/
//判断条件是否为AQS创建
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}

/**
* Queries whether any threads are waiting on this condition.
* Implements {@link AbstractQueuedSynchronizer#hasWaiters}.
*
* @return {@code true} if there are any waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
//判断时候还有线程在等待这个条件
protected final boolean hasWaiters() {
if (!isHeldExclusively())
//如果当前线程非独占锁持有者,抛出异常
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
//如果等待条件队列中,有等在次条件的节点线程,则返回true
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}

/**
* Returns an estimate of the number of threads waiting on
* this condition.
* Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength}.
*
* @return the estimated number of waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
//返回等待此条件的节点线程数
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}

/**
* Returns a collection containing those threads that may be
* waiting on this Condition.
* Implements {@link AbstractQueuedSynchronizer#getWaitingThreads}.
*
* @return the collection of threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
//获取等待此条件的线程
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}

总结:
[color=green]ConditionObject是AQS的一个内部类,其实现是基于AQS;ConditionObject中的条件等待队列中的节点与同步队列中的节点基本相同,最大的不同时,同步等待队列
中的节点有先驱pre和后继next,而条件等待队列中的节点只有后继nextWaiter。
条件等待有两种方法,一种为非中断等待awaitUninterruptibly,在线程等待条件时不可中断线程;另外一种,可中断等待await,等待后,确定中断当前线程,还是抛出异常。
非中断模式等待,首先添加新的等待条件线程节点,到等待条件线程队列;
释放节点状态,释放节点锁状态过程,待子类扩展,在过程中,同时唤醒等待队列
头结点;再判断节点是在同步等待队列上,如果不在,则park当前线程;
如果线程已经中断,则取消线程中断状态,即线程非中断等待条件。
唤醒有两种,一种是唤醒等待条件队列中的头结点,另一种,唤醒条件等待队列中,
所有等待此条件的节点线程。唤醒节点时,首先设置线程等待的节点状态的初始状态0,
然后添加到同步等待队列,如果线程取消等待,则移除线程,否则通知节点前驱,当前驱
节点线程释放锁时,unpark线程。
在唤醒等待条件的线程时,为什么要将,节点移动同步等待线程节点能,这是因为条件时锁的一部分,先创建锁,再创建条件;当线程被唤醒时,只能说明,线程条件发生,能处于park状态,要获取锁,才能unpark线程。[/color]
相关标签: juc