欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

AbstractQueuedSynchronizer 队列同步器源码分析

程序员文章站 2022-03-20 10:34:14
AbstractQueuedSynchronizer 队列同步器(AQS) 队列同步器 (AQS), 是用来构建锁或其他同步组件的基础框架,它通过使用 int 变量表示同步状态,通过内置的 FIFO 的队列完成资源获取的排队工作。(摘自《Java并发编程的艺术》) 我们知道获取同步状态有独占和共享两 ......

abstractqueuedsynchronizer 队列同步器(aqs)

队列同步器 (aqs), 是用来构建锁或其他同步组件的基础框架,它通过使用 int 变量表示同步状态,通过内置的 fifo 的队列完成资源获取的排队工作。(摘自《java并发编程的艺术》)

我们知道获取同步状态有独占和共享两种模式,本文先针对独占模式进行分析。

变量定义

private transient volatile node head;

head 同步队列头节点

private transient volatile node tail;

tail 同步队列尾节点

private volatile int state;

state 同步状态值

node - 同步队列节点定义

volatile int waitstatus;

waitstatus 节点的等待状态,可取值如下 :

  • 0 : 初始状态
  • -1 : signal 处于该状态的节点,说明其后置节点处于等待状态; 若当前节点释放了锁可唤醒后置节点
  • -2 : condition 该状态与 condition 操作有关后续在说明
  • -3 : propagate 该状态与共享式获取同步状态操作有关后续在说明
  • 1 : cancelled 处于该状态的节点会取消等待,从队列中移除
volatile node prev;

prev 指向当前节点的前置节点

volatile node next;

next 指向当前节点的后置节点

volatile thread thread;

thread 节点对应的线程也是指当前获取锁失败的线程

node nextwaiter;

acquire()

独占模式下获取同步状态, 既是当前只允许一个线程获取到同步状态

public final void acquire(int arg) {
    if (!tryacquire(arg) &&
        acquirequeued(addwaiter(node.exclusive), arg))
        selfinterrupt();
}

从 acquire 方法中我们可以大概猜测下,获取锁的过程如下:

  • tryacquire 尝试获取同步状态, 具体如何判定获取到同步状态由子类实现
  • 当获取同步状态失败时,执行 addwaiter 创建独占模式下的 node 并将其添加到同步队列尾部
  • 加入同步队列之后,再次尝试获取同步状态,当达到某种条件的时候将当前线程挂起等待唤醒

下面具体看下各个阶段如何实现:

private node addwaiter(node mode) {
    // 绑定当前线程 创建 node 节点
    node node = new node(thread.currentthread(), mode);
    // try the fast path of enq; backup to full enq on failure
    node pred = tail;
    // 判断同步队列尾节点是否为空
    if (pred != null) {
        // node 的前置节点指向队列尾部
        node.prev = pred;
        // 将同步队列的 tail 移动指向 node
        if (compareandsettail(pred, node)) {
            // 将原同步队列的尾部后置节点指向 node
            pred.next = node;
            return node;
        }
    }
    // tail 为空说明同步队列还未初始化
    // 此时调用 enq 完成队列的初始化及 node 入队
    enq(node);
    return node;
}
private node enq(final node node) {
    // 轮询的方式执行
    // 成功入队后退出
    for (;;) {
        node t = tail;
        if (t == null) { // must initialize
            // 创建 node, 并将 head 指向该节点
            // 同时将 tail 指向该节点
            // 完成队列的初始化
            if (compareandsethead(new node()))
                tail = head;
        } else {
            // node 的前置节点指向队列尾部
            node.prev = t;
            // 将同步队列的 tail 移动指向 node
            if (compareandsettail(t, node)) {
                // 将原同步队列的尾部后置节点指向 node
                t.next = node;
                return t;
            }
        }
    }
}

从代码中可以看出通过 cas 操作保证节点入队的有序安全,其入队过程中如下图所示:

AbstractQueuedSynchronizer 队列同步器源码分析

final boolean acquirequeued(final node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 
        for (;;) {
            // 获取当前节点的前置节点
            final node p = node.predecessor();
            // 判断前置节点是否为 head 头节点
            // 若前置节点为 head 节点,则再次尝试获取同步状态
            if (p == head && tryacquire(arg)) {
                // 若获取同步状态成功
                // 则将队列的 head 移动指向当前节点
                sethead(node);
                // 将原头部节点的 next 指向为空,便于对象回收
                p.next = null; // help gc
                failed = false;
                // 退出轮询过程
                return interrupted;
            }
            if (shouldparkafterfailedacquire(p, node) &&
                parkandcheckinterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelacquire(node);
    }
}
private static boolean shouldparkafterfailedacquire(node pred, node node) {
    int ws = pred.waitstatus;
    if (ws == node.signal)
        /*
         * this node has already set status asking a release
         * to signal it, so it can safely park.
         */
        // 若前置节点状态为 -1 ,则说明后置节点 node 可以安全挂起了
        return true;
    if (ws > 0) {
        /*
         * predecessor was cancelled. skip over predecessors and
         * indicate retry.
         */
        do {
            // ws > 0 说明前置节点状态为 cancelled , 也就是说前置节点为无效节点
            // 此时从前置节点开始向队列头节点方向寻找有效的前置节点
            // 此操作也即是将 cancelled 节点从队列中移除
            node.prev = pred = pred.prev;
        } while (pred.waitstatus > 0);
        pred.next = node;
    } else {
        /*
         * waitstatus must be 0 or propagate.  indicate that we
         * need a signal, but don't park yet.  caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        // 若前置节点状态为初始状态 则将其状态设为 -1
        compareandsetwaitstatus(pred, ws, node.signal);
    }
    return false;
}
private final boolean parkandcheckinterrupt() {
    // 将当前线程挂起
    locksupport.park(this);
    // 被唤醒后检查当前线程是否被挂起
    return thread.interrupted();
}

从 acquirequeued 的实现可以看出,节点在入队后会采用轮询的方式(自旋)重复执行以下过程:

  • 判断前置节点是否为 head, 若为 head 节点则尝试获取同步状态; 若获取同步状态成功则移动 head 指向当前节点并退出循环
  • 若前置节点非 head 节点或者获取同步状态失败,则将前置节点状态修改为 -1, 并挂起当前线程,等待被唤醒重复执行以上过程

如下图所示:

AbstractQueuedSynchronizer 队列同步器源码分析

接下来我们看看同步状态释放的实现。

release

释放同步状态

public final boolean release(int arg) {
    // 尝试释放同步状态
    if (tryrelease(arg)) {
        node h = head;
        if (h != null && h.waitstatus != 0)
            // 唤醒后置节点
            unparksuccessor(h);
        return true;
    }
    return false;
}
private void unparksuccessor(node node) {
    /*
     * if status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  it is ok if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitstatus;
    if (ws < 0)
        // 将 head 节点状态改为 0
        compareandsetwaitstatus(node, ws, 0);

    /*
     * thread to unpark is held in successor, which is normally
     * just the next node.  but if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    // 获取后置节点
    node s = node.next;
    if (s == null || s.waitstatus > 0) {
        s = null;
        for (node t = tail; t != null && t != node; t = t.prev)
            if (t.waitstatus <= 0)
                s = t;
    }
    if (s != null)
        // 唤醒后置节点上所阻塞的线程
        locksupport.unpark(s.thread);
}

从上述代码,我们可以明白释放同步状态的过程如下:

  • 调用 tryrelease 尝试释放同步状态,同样其具体的实现由子类控制
  • 成功释放同步状态后,将 head 节点状态改为 0
  • 唤醒后置节点上阻塞的线程

如下图所示(红色曲线表示节点自旋过程) :

AbstractQueuedSynchronizer 队列同步器源码分析

acquireinterruptibly()

独占模式下获取同步状态,不同于 acquire 方法,该方法对中断操作敏感; 也就是说当前线程在获取同步状态的过程中,若被中断则会抛出中断异常

public final void acquireinterruptibly(int arg)
            throws interruptedexception {
    if (thread.interrupted())
        // 检查线程是否被中断
        // 中断则抛出中断异常由调用方处理
        throw new interruptedexception();
    if (!tryacquire(arg))
        doacquireinterruptibly(arg);
}
private void doacquireinterruptibly(int arg)
        throws interruptedexception {
    final node node = addwaiter(node.exclusive);
    boolean failed = true;
    try {
        for (;;) {
            final node p = node.predecessor();
            if (p == head && tryacquire(arg)) {
                sethead(node);
                p.next = null; // help gc
                failed = false;
                return;
            }
            if (shouldparkafterfailedacquire(p, node) &&
                parkandcheckinterrupt())
                // 不同于 acquire 的操作,此处在唤醒后检查是否中断,若被中断直接抛出中断异常
                throw new interruptedexception();
        }
    } finally {
        if (failed)
            // 抛出中断异常后最终执行 cancelacquire
            cancelacquire(node);
    }
}
private void cancelacquire(node node) {
        // ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // skip cancelled predecessors
    node pred = node.prev;
    while (pred.waitstatus > 0)
        node.prev = pred = pred.prev;

    // prednext is the apparent node to unsplice. cases below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    node prednext = pred.next;

    // can use unconditional write instead of cas here.
    // after this atomic step, other nodes can skip past us.
    // before, we are free of interference from other threads.
    node.waitstatus = node.cancelled;

    // if we are the tail, remove ourselves.
    // 若当前节点为 tail 节点,则将 tail 移动指向 node 的前置节点
    if (node == tail && compareandsettail(node, pred)) {
        // 同时将node 前置节点的 next 指向 null
        compareandsetnext(pred, prednext, null);
    } else {
        // if successor needs signal, try to set pred's next-link
        // so it will get one. otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitstatus) == node.signal ||
             (ws <= 0 && compareandsetwaitstatus(pred, ws, node.signal))) &&
            pred.thread != null) {
            // 当前节点位于队列中部    
            node next = node.next;
            if (next != null && next.waitstatus <= 0)
                // 将前置节点的 next 指向 node 的后置节点
                compareandsetnext(pred, prednext, next);
        } else {
            // 若 node 的前置节点为 head 节点则唤醒 node 节点的后置节点
            unparksuccessor(node);
        }

        node.next = node; // help gc
    }
}

从 acquireinterruptibly 的实现可以看出,若线程在获取同步状态的过程中出现中断操作,则会将当前线程对应的同步队列等待节点从队列中移除并唤醒可获取同步状态的线程。

tryacquirenanos()

独占模式超时获取同步状态,该操作与acquireinterruptibly一样对中断操作敏感,不同在于超过等待时间若未获取到同步状态将会返回

public final boolean tryacquirenanos(int arg, long nanostimeout)
            throws interruptedexception {
    if (thread.interrupted())
        throw new interruptedexception();
    return tryacquire(arg) ||
        doacquirenanos(arg, nanostimeout);
}
private boolean doacquirenanos(int arg, long nanostimeout)
            throws interruptedexception {
    if (nanostimeout <= 0l)
        return false;
    // 计算等待到期时间
    final long deadline = system.nanotime() + nanostimeout;
    final node node = addwaiter(node.exclusive);
    boolean failed = true;
    try {
        for (;;) {
            final node p = node.predecessor();
            if (p == head && tryacquire(arg)) {
                sethead(node);
                p.next = null; // help gc
                failed = false;
                return true;
            }
            nanostimeout = deadline - system.nanotime();
            if (nanostimeout <= 0l)
                // 超时时间到期直接返回
                return false;
            if (shouldparkafterfailedacquire(p, node) &&
                nanostimeout > spinfortimeoutthreshold)
                // 按指定时间挂起s
                locksupport.parknanos(this, nanostimeout);
            if (thread.interrupted())
                throw new interruptedexception();
        }
    } finally {
        if (failed)
            cancelacquire(node);
    }
}

节点的状态

同步队列中的节点在自旋获取同步状态的过程中,会将前置节点的状态由 0 初始状态改为 -1 signal, 若是中断敏感的操作则会将状态由 0 改为 1

同步队列中的节点在释放同步状态的过程中会将同步队列的 head 节点的状态改为 0, 也即是由 -1 变为 0;

小结

本文主要分析了独占模式获取同步状态的操作,其大概流程如下:

  • 在获取同步状态时,aqs 内部维护了一个同步队列,获取状态失败的线程会被构造一个节点加入到队列中并进行一系列自旋操作
  • 在释放同步状态时,唤醒 head 的后置节点去获取同步状态