欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

AQS 在 CountDownLatch 类中的应用原理

程序员文章站 2022-06-16 13:39:25
...

AQS 在 CountDownLatch 类中的应用原理

AQS用法

  • 第一步,新建一个自己的线程协作工具类,在内部类写一个Sync类继承AbstractQueuedSynchronized,即AQS;
  • 第二步,想好设计的线程协作工具类的协作逻辑,在Sync类,根据是否独占,来重写对应的方法,如果是独占,则重写tryAcquire和tryRelease等方法,如果是非独占,则重写tryAcquireShared和tryReleaseShared等方法;
  • 第三步,在自己的线程协作工具类中,实现获取/释放的相关方法,并在里面调用AQS对应的方法,如果是独占则调用acquire和release等方法,非独占则调用acquireShared和releaseShared或acquireSharedInterruptibly等方法。

上面第二步是根据某些条件来重写特定的部分方法,看到这,你是否会想为什么要通过继承方式判断选择重写其中某些方法,而不是通过实现接口方式实现?

ASQ原作者Doug Lea 的论文中已经进行了说明,如果是实现某个接口,那每一个抽象方法都需要实现。比如把整个AQS作为接口,则需要实现的方法很多,包括tryAcquire、tryRelease、tryAcquireShared、tryRelaeseShared等,但实际上并需要每个方法都重写,所以根据需求的不同,有选择的去实现一部分就足以,所以AQS用的继承类重写方法的方式,而不采用实现接口的方式。

那可能又有疑问了,继承类后,我什么方法也不重写,行不行,答案是不行的,就像刚才说的tryAcquire等方法,是不行的,在执行的时候会抛出异常。

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

protected int tryAcquireShared(int arg) {
  throw new UnsupportedOperationException();
}

protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

可以看到在不重写的情况下,调用方法时会直接抛出异常。所以要求在继承AQS后,必须把相关方法去重写、覆盖,这样才能保证线程协作工具正常运行。

AQS在CountDownLatch的应用

在CountDownLatch里面有一个子类,Sync,继承自AQS。

public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        Sync(int count) {
            setState(count);
        }
        int getCount() {
            return getState();
        }
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    private final Sync sync;
   //省略其他代码...
}

CountDownLatch里面有一个sync的变量,正是Sync类的一个对象。

Sync不但继承了AQS类,还重写了tryAcquireShared和tryReleaseShared方法,这正对应了“第二步,想好设计的线程协作工具类的协作逻辑”,在Sync类里,根据是否独占,来重写对应的方法。

这里的CountDownLatch属于非独占的类型,因此它重写了tryAcquireShared和tryReleaseShared方法。

接下来带着疑问,去看这个类里面最重要的4个方法进行分析,看看具体的含义是什么?

构造函数

CountDownLatch只有一个构造方法,传入的这个参数是需要“倒数”的次数,每次调用countDown方法就会倒数1,直到达到了最开始设定的次数之后,相当于是打开了“门闩”,所以之前在等待的线程可以继续工作了。

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

通过代码可以看出,当count < 0 时会抛出异常,大于 等于 0 时,即代码this.sync = new Sync(count),往Sync中传入了count,这里的构造方法如下:

Sync(int count) {
     setState(count);
}

该构造函数调用了AQS的setState方法,并且把count参数传入了,而setState正是给AQS中的state变量赋值的,代码如下:

protected final void setState(int newState) {
    state = newState;
}

所以我们通过CountDownLatch构造函数传入的count最终递给AQS内部的state变量,给state赋值,state就是代表需要倒数的次数。

getCount

getCount方法的作用是获取当前需要“倒数”的数量,源码如下:

public long getCount() {
     return sync.getCount();
}

该方法return的是sync.getCount:

int getCount() {
     return getState();
}

通过源码跟踪可以发现,getCount方法调用的是AQS的getState方法:

protected final int getState() {
    return state;
}

可以看到,getState方法返回的state就是最终它获取到的AQS中的state变量的值。

countDown

countDown方法其实是CountDownLatch的“释放”方法,源码如下:

public void countDown() {
    sync.releaseShared(1);
}

在countDown方法中,能看出调用的sync类中的releaseShared方法,并传入1。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

可以看出releaseShared方法,先进行if判断,tryReleaseShared方法的返回结果,因此,先把目光聚焦到doReleaseShared方法,源码如下所示:

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

tryReleaseShared方法内是一个for的死循环,通过getState获取count数量并赋值给c,如果此时c等于0,则意味着已经倒数为0了,会执行return false语句,再往上一层看是releaseShared方法,则会直接跳过整个if判断语句,直接返回false,意味着coundDown方法不产生效果。

再回到tryReleaseShared方法,如果if语句中的c不等于0,则进行减1操作,然后通过cas方式设置state变量,最后看netxc变量值是否等于0,等于0则说明倒数后正好达到规定的倒数次数,所以tryReleaseShared方法返回true,再回到之前releaseShared方法中,可以看到,接下来会调用doReleaseShared方法,意思是对之前阻塞的线程进行唤醒,让它们继续执行。

await

await方法,它是CountDownLatch的“获取”方法,调用await方法会阻塞当前线程,直到倒数为0才能继续执行。await和countDown方法是配对使用的。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

它会调用sync的acquireSharedInterruptibly,并传入1。源码如下:

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

可以看到,它除了对于中断的处理之外,还比较重要的是tryAcquireShared方法,这个方法会直接判断getState的值是不是等于0,如果等于0就返回1,不等于0返回-1。

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

getState方法获取到的值就是需要倒数的次数,如果此时倒数次数的值等于0则返回1,否则返回-1,再看到 if (tryAcquireShared(arg) < 0)语句中,符合判断条件的则去执行doAcquireSharedInterruptibly 方法,然后让线程进入阻塞状态。

另一种情况,当getState此时已等于0,那其实意味着倒数结束了,不需要再等待,就是说门闩是打开状态,tryAcquireShared方法返回1,对于acquireSharedInterruptibly立即返回,也意味着await方法也会立刻返回,那么此时线程就不会进入阻塞状态了,相当于结束,立刻放行。

AQS在CountDownLatch的应用总结

当线程调用CountDownLatch的await方法,便会尝试获取“共享锁”,不过一开始通常获取不到锁,于是线程被阻塞。共享锁可获取到的条件是“锁计数器为0”,而锁计数器的初始值为count,每次调用countDown方法时,也可以把“锁计数器”-1。通过这种方式,调用count次countDown方法之后,“锁计数器”就为0了,于是之前等待的线程就会继续运行了,并且此时再有线程想调用await方法时也会立刻放行,不再做任务阻塞操作。