欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java中的锁(四)续

程序员文章站 2022-07-12 11:18:30
...

上一章中说到了AbstractQueuedSynchronizer,其内部有一个ConditionObject类,包含了Condition接口的实现。

五、Condition接口

condition相对于wait和notify,更加灵活,可以用多个condition实例对一个lock控制,通过condition可以更精细的控制多线程的休眠和唤醒。condition接口提供的方法:

public interface Condition{
    //使当前线程进入等待状态直到被通知(signal)或中断,相当于synchronized中的wait()方法
    await() throws InterruptException;

    // 进入等待状态但不响应中断要求
    awaitYninterruptibly();

   //当前线程进入等待状态,直到被唤醒或被中断或超时
    long awaitNanos(long nanosTimeout) throws InterruptedException;

  //同awaitNanos,但可以指明时间单位
  boolean await(long time, TimeUnit unit) throws InterruptedException;

 //调用该方法当前线程进入等待状态,直到被唤醒、中断或到达某个时
  boolean awaitUntil(Date deadline) throws InterruptedException;

 //唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须
 //获取与Condition相关联的锁,功能与notify()相同
  void signal();

 //唤醒所有等待在Condition上的线程,该线程从等待方法返回前必须
 //获取与Condition相关联的锁,功能与notifyAll()相同
  void signalAll();
}

ConditionObject的具体实现类:

public class ConditionObject implements Condition, java.io.Serializable{
    // 第一个等待队列节点
    private transient Node firstWaiter;
    //等待队列最后一个等待结点
    private transient Node lastWaiter;
    ...
}

 每当lock.newCondition(),都会返回一个新的ConditionObject对象。在ConditionObject中,通过一个等待队列来维护等待的线程,所以在一个同步器中可以有多个等待队列,他们等待的条件是不一样的。

等待队列通过firstWaiter和lastWaiter表示队头和队尾,每次加入到等待队列中的线程都会加入到等待队列的尾部,是一个FIFO单向队列(AQS的锁竞争队列是双向的),模型图如下:

Java中的锁(四)续

 等待队列中的节点只有两种状态,CONDITION和CANCELLED,await方法的具体实现有:

public final void await()throws InterruptedException{
    if(Thread.interrupted()){
        throw new InterruptedException();
    }

    Node node = addConditionWaiter();
    // 释放当前线程锁的同步状态
    long savedState = fullyRelease(node);
    int interruptMode = 0;

    // 节点是否在同步队列中,即是否被唤醒。如果线程不在,也没有被中断唤醒,一直处于park状态
    while (!isOnSyncQueue(node)) {
         // 如果不在同步队列中,挂起当前线程
         LockSupport.park(this);
        // 判断是否被中断唤醒,如果是退出循环。
         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
              break;
    }
    // 被唤醒后执行自旋操作争取获得锁,同时判断线程是否被中断
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);

}

添加到等待队列中addConditionWaiter()

 private Node addConditionWaiter() {
      Node t = lastWaiter;
      // If lastWaiter is cancelled, clean out.
      if (t != null && t.waitStatus != Node.CONDITION) {
          unlinkCancelledWaiters();
          t = lastWaiter;
      }
      Node node = new Node(Thread.currentThread(), Node.CONDITION);
      if (t == null)
            firstWaiter = node;
      else
            t.nextWaiter = node;
      lastWaiter = node;
      return node;
}

唤醒操作signal方法

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

signal方法中处理两个事件:一是判断当前线程是否吃用独占锁,如果不持有,就抛出异常;共享模式下没有等待队列,所以是不会有condition的。二是唤醒等待队列的第一个节点,执行doSignal方法。

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

dosignal方法中也处理了两件事,首先把节点从条件等待队列中移除,然后重新维护firstWaiter和lastWaiter;其次是将移除的节点添加到同步队列中,并且唤醒前驱节点。所以,整个signal工作的流程是:先判断当前线程是否持有独占锁,如果有,唤醒等待队列中的第一个节点,并从等待队列中移除,移动到同步队列中去。如果加入同步队列失败,继续循环等待队列中的其他节点。如果加入同步队列成功,如果前驱节点结束或者设置前驱节点为SIGNAL状态失败,则唤醒当前节点代表的线程。到此signal()任务完成,被唤醒后的线程也将从前面的await()方法中的while循环中退出,因为此时该线程的结点已在同步队列中,进而调用AQS的acquireQueued()方法加入获取同步状态的竞争中,这就是等待唤醒机制的整个流程实现原理。
判断是否在同步队列中的方法:

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

唤醒所有线程的方式和唤醒单个线程的方式类似,找到队头后一次唤醒:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}