欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

AQS 共享锁模式

程序员文章站 2022-05-03 20:49:42
...

前沿

上一篇文章已经向大家讲解了AQS的数据结构,以及独占锁的源码详解,本篇文章接着介绍AQS的共享锁模式,学习并发编程一定要把AQS搞懂,因为JUC包中的很多类都是由AQS实现的。

AQS共享模式acquireShared执行流程

  1. 多个线程通过调用tryAcquireShared方法获取共享资源,返回值大于等于0则获取资源成功,返回值小于0则获取失败。
  2. 当前线程获取共享资源失败后,通过调用addWaiter方法把该线程封装为Node节点,并设置该节点为共享模式。然后把该节点添加到队列的尾部。
  3. 添加到尾部后,判断该节点的前驱节点是不是头节点,如果前驱节点是头节点,那么该节点的前驱节点出队列并获取共享资源,同时调用setHeadAndPropagate方法把该节点设置为新的头节点,同时唤醒队列中所有共享类型的节点,去获取共享资源。如果获取失败,则再次加入到队列中。
  4. 如果该节点的前驱节点不是头节点,那么通过for循环进行自旋转等待,直到当前节点的前驱节点是头节点,结束自旋。

这就是AQS共享模式竞争资源失败的大致流程,这里先让大家有一个大致的印象,下面通过源码具体分析是怎么进行操作的。

AQS共享锁模式

AQS获取共享锁是通过调用acquireShared() 这个顶层方法,我门看一下这个方法的源代码

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

这个方法中有一个if判断,当tryAcquireShared()这个返回值是小于0的时候获取锁失败,进入doAcquireShared()方法。tryAcquireShared方法是用来获取共享模式下的锁,对于tryAcquireShared()这个方法我们重点看一下他的返回值。jdk1.8中是这样写的

* @return a negative value on failure; zero if acquisition in shared
*         mode succeeded but no subsequent shared-mode acquire can
*         succeed; and a positive value if acquisition in shared
*         mode succeeded and subsequent shared-mode acquires might
*         also succeed, in which case a subsequent waiting thread
*         must check availability. (Support for three different
*         return values enables this method to be used in contexts
*         where acquires only sometimes act exclusively.)  Upon
*         success, this object has been acquired.

当失败的时候返回的是负值,如果返回的是0表示获取共享模式成功但是它下一个节点的共享模式无法获取成功。如果返回的是正数也就是大于0,表示当前线程获取共享模式成功,并且它后面的线程也可以获取共享模式。
当共享模式获取失败的时候,我门看一下doAcquireShared源代码做了哪些操作.

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

addWaiter()此方法在上一篇文章中已经按照源码详细的讲解过,它主要是封装为Node节点,并且把该节点添加到队列的尾部。此处传入共享模式的参数,节点就变成了共享模式。
通过自旋(for(;;))获取前驱节点,如果前驱节点是头节点,那么调用tryAcquireShared()方法获取当前节点的状态,注意此方法的返回值在上面已经介绍过,等于0表示不用唤醒后继节点,只有大于0才会唤醒后面的所有节点。如果获取共享资源成功,调用setHeadAndPropagate方法设置当前节点为头节点。如果在获取锁自旋的过程中中断过,那么那么将当前线程中断。如果当前节点的前驱节点不是头节点,则进入shouldParkAfterFailedAcquire判断,该方法主要的作用是获取当前线程的状态,如果线程阻塞返回true,否则返回false. parkAndCheckInterrupt方法是指当前线程在获取锁的过程中是否被中断唤醒,如果是的话那么就把标志为interrupted更新为true.最后finally中执行,如果当前节点在获取锁的时候出现过中断操作那么先给当前节点的状态更新为取消状态,并清除该节点。
setHeadAndPropagate我门看一下这个方法的源代码

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);//设置当前节点为头节点
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {//符合状态的将全部唤醒
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

此方法传递了2个参数,一个是当前节点,一个是tryAcquireShared方法的返回值。从源代码中我门看到它首先记录了当前头节点,然后它通过setHead()方法把当前获取到锁的节点设置为头节点。通过if语句把符合条件的继续唤醒后继节点,如果下一个节点为空那么调用doReleaseShared方法,doReleaseShared方法继续唤醒后面的节点。此方法会在共享锁释放详细讲解。

共享锁释放releaseShared

我门来看一下releaseShared的源代码,此方法是共享模式释放资源的顶层方法。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//
        doReleaseShared();
        return true;
    }
    return false;
}

tryReleaseShared方法获取共享模式资源释放,如果释放成功那么会调用doReleaseShared继续唤醒下一个节点.

我门继续看一下具体的唤醒操作doReleaseShared() 这个方法

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

通过源代码我门发现,当前线程状态如果是Node.SIGNAL,Node.SIGNAL的值是-1,是一个静态常量,此值表示当前线程被挂起。如果当前线程被挂起,那么更新当前线程的状态值为0.如果更新失败那么就继续。更新成功后调用

unparkSuccessor()此方法是唤醒共享锁的第一个节点。如果本身头节点属于重置状态waitStatus==0,并且把它设置为传播状态那么就向下一个节点传播。
我门在看一下unparkSuccessor这个方法的源码

private void unparkSuccessor(Node node) {
   
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
  
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

从这个方法中我门发现如果当先线程的状态是小于0,那么就把当前线程重置为0.为什么是小于0呢,上篇文章已经讲过,waitStatus<0为等待或挂起状态。也就是如果当前线程是等待挂起状态,那么把当前线程状态重置为0。然后找到下一个节点,如果下一个节点是空或下一个线程已经被取消,那么就从头部找下一个没有被取消的节点。当下一个节点不为空的时候,调用LockSupport.unpark方法唤醒当前线程。LockSupport.unpark会调用Unsafe这个类调用native方法进行执行。

流程图

AQS 共享锁模式

更多并发内容尽在公众号 程序员深海探秘

AQS 共享锁模式