欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

并发编程5-AQS的Condition实现原理

程序员文章站 2024-03-12 12:09:08
...

在经典的生产者-消费者模式中,可以使用Object.wait()和Object.notify()阻塞和唤醒线程,但是这样的处理下只能有一个等待队列。在可重入锁ReentrantLock中,使用AQS的condition可以实现设置多个等待队列,使用Lock.newCondition就可以生成一个等待队列,相比较来说这种方式就很灵活。

本篇文章将介绍Condition的实现原理和基本使用方法,基本过程如下:

1、Condition提供了await()方法将当前线程阻塞,并提供signal()方法支持另外一个线程将已经阻塞的线程唤醒。
2、Condition需要结合Lock使用
3、线程调用await()方法前必须获取锁,调用await()方法时,将线程构造成节点加入等待队列,同时释放锁,并挂起当前线程
4、其他线程调用signal()方法前也必须获取锁,当执行signal()方法时将等待队列的节点移入到同步队列,当线程退出临界区释放锁的时候,唤醒同步队列的首个节点

并发编程5-AQS的Condition实现原理

关键源码分析:

1、等待队列使用链表结构

/** First node of condition queue. */
private transient Node firstWaiter;//首个等待节点
/** Last node of condition queue. */
private transient Node lastWaiter;//最后一个等待节点

2、当前线程(线程A)调用await阻塞当前线程

  public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //将当前线程封装成Node加入到等待队列尾部
        Node node = addConditionWaiter();
        //释放锁
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        //关键代码!!!
        //判断当前节点是否已经在同步队列中,如果是则退出循环,如果不是就阻塞当前线程
        //即其他线程如果发出了signal信号,会把等待队列的线程移入同步队列(aqs的同步队列),此时就会退出循环,进入下面的重新获取锁的acquireQueued
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //其他发出signal信号的线程释放锁之后,该线程被唤醒并重新竞争锁
        //acquireQueued这个方法的解析在 文章 《[并发编程4-AQS同步器原理](https://mp.csdn.net/mdeditor/89761057#)》中有介绍
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

 //线程加入等待队列尾部
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != Node.CONDITION) {//清除cancell态的节点
            unlinkCancelledWaiters();
            t = lastWaiter;//t指向最后一个状态正确的节点
        }
        Node node = new Node(Thread.currentThread(), Node.CONDITION);//把线程构建成一个新的节点
        if (t == null)//列表为空,初始化为第一个节点
            firstWaiter = node;
        else   //不为空,则加入到等待队列尾部
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }  

3、其他线程(线程B)调用signal/signalAll方法,将等待队列的节点移入同步队列(signalAll只是循环执行signal而已),signal调用doSignal

private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;//得到firstWaiter
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
  //将节点从等待队列移入同步队列
  final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;//cas节点状态错误,说明已经cancell了,直接返回false

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    Node p = enq(node);//加入同步队列
    int ws = p.waitStatus;
    //设置前置节点状态为signal,可重入锁那篇文章分析过,为了唤醒线程而设置
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);//特殊情况下唤醒线程并重新同步,一般情况下这里不会执行
    return true;
}

4、线程(线程B)发出signal信号之后,退出临界区释放锁(代码在《并发编程4-AQS同步器原理》中有详细分析,这里仅贴出关键代码)

  public final boolean release(int arg) {
        if (tryRelease(arg)) { // tryRelease() 尝试释放当前线程的同步状态(锁)
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//释放锁成功后,找到AQS新的头结点(本例是指线程B),并唤醒它
            return true;
        }
        return false;
    }

总结

1、condition提供了类似object.wait和notify的线程通信机制,但是condition支持多个等待队列,使用上更加灵活
2、condition的await和signal的通信机制是juc中有界队列的实现基础,而有界队列又是线程池实现的基础,常用于生产者-消费者模式
3、condition依赖于锁而存在。