欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

java的JUC包系列文章(一): 聊一聊countdownLatch

程序员文章站 2024-02-02 08:31:40
...
    从今天开始,尽量多花一点时间在这个技术博客上,去成为一个优秀程序员 dloading...

    好了这个系列是想要系统的研究一下java的几乎可以是最重要的并发包(java.util.concurrent 以下简称JUC)里的一些东西,聊到JUC就不可避免的要聊到AbstractQueuedSynchronizer(以下简称AQS)这个关键的类,在我们本系列的文章中也会反复的出现。但是作为本系列的第一篇文章,我并不想先对AQS这个东西做详细的介绍,我会先从一个我认为比较简单的实现方法中进入到整个的java并发的世界,所以开始我们的并发之旅吧
介绍我们本文的主人公CountdownLatch(以下简称CDL)。
首先这个CDL是干什么的呢?为什么java的作者大神会把他放到并发的范围内呢。通俗的话来说,他的作用是使主任务可以在指定的几个其他任务完成之后再开始运行,CDL在场景里的作用就相当于一个计数器,ok,三个任务都完成了,那我就开始我的主要任务了。看到这里,可能会有人问,这种描述的场景好像跟join这个方法的用法很类似,也可以直接将三个任务都join到主任务不就行了?这里先留个问题,在文章的最后会有解答
好的,现在直接看一下CDL的源码。
    
/**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

构建方法也非常简单,只需要传入一个count,传入的这个count会作为内部实现的AQS的阈值参数
countdownLatch除了构造函数和toString方法外,总共只提供了3个方法,一个await方法,一个带超时时间的await方法,一个countdown方法,一个getcount方法,接下来我会主要对着几个方法主要进行介绍。
类似上面的例子中,主流程调用await方法,等待其他线程中调用countdown方法,满足条件后,主流程中的堵塞才解除。那await到底在等什么呢?
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

可以看到里面只有一行代码,就是调用之前定义的AQS的子类的tryAcquireSharedNanos方法。再看一下AQS这个方法里面做了什么
    //AbstractQueuedSynchronizer.java
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
        protected final int getState() {
        return state;
    }
    // CountDownLatch.Sync.java
            protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

    因为这是本系列第一篇文章,暂时先不对AQS做太多解释,可以从上面的代码中看到调用了AQS的acquireSharedInterruptibly方法,这里先提一句,在AQS中,资源抢占模式主要有两种,一个是Share模式,一个是exclusive模式,两个模式的的抢占方法都有所不同,在方法名中可以看出,这个方法明显是share模式下的获取资源方法。这个方法会先调用tryAcquireShared方法,这个方法在AQS中是一个抽象方法,由每个继承的子类自己实现具体 的方法,从此可以看出,AQS是实现了一个并发抢占资源的框架,抽象出了几个方法待具体实现,本系列中接下来会提到的各种实现方式的不同,造就了他们具体功能的不同。继续回到我们的countDownLatch方法中,他自己内部继承实现的Sync类中,实现了这个tryAcquireShared。
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

方法看起来很简单,判断竞争的资源还有没有,如果已经已经为0,则返回1,否则返回-1.这个返回值在上面的方法中是进行判断是否要进行堵塞的标志位,如果返回的为1,则不会进入if判断内的方法,如果返回-1,则会再调用这个doAcquireSharedInterruptibly方法
        //AbstractQueuedSynchronizer.java
        private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    在AQS框架内实现的这个方法,很明显看到里面有一个永真循环,暂时可以简单理解为会一直去尝试查看资源是否已经获取到,若最终获取到,则逃出这个永真循环,否则会一直进行堵塞。
    以上就是await方法在堵塞时和AQS框架之间进行调用的关系。
    再看一下另一个countdown方法
    public void countDown() {
        sync.releaseShared(1);
    }
    //AQS.java
        public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
        public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

   // countDownLatch.sync.java
    protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

    调用的是AQS中releaseShared方法,同样的在框架中预留了一个tryReleaseShared的抽象方法由继承的子类去实现,在countDownLatch实现的方法中,是用CAS的方法去将资源释放掉,若释放成功,则在AQS框架中
/**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

    这个方法留在下一篇文章中详细解释。
    以上就是对countDownLatch这个同步类的简单解释,主要还是引出本系列中最重要的AQS框架,所以本文有部分概念并没有过多的解释,下一篇会进行详细解释AQS的内容.


    最后简单附一个使用CountDownLatch类的小例子
/**
 * Created by sugu on 2018/12/24.
 */
public class CountDownLatchDemo {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountTo50 countTo50 = new CountTo50("a",countDownLatch, 5);
        CountTo50 countTo100 = new CountTo50("b",countDownLatch, 10);
        countTo50.start();
        countTo100.start();
        countDownLatch.await();
        for (int i = 0; i < 10; i++) {
            System.out.println("current_thread:[" + Thread.currentThread().getId() + "] : " + i);
        }
    }
}
class CountTo50 extends Thread{
    private CountDownLatch countDownLatch;
    private int max;
    private String name;
    public CountTo50(String name, CountDownLatch countDownLatch,int max) {
        this.countDownLatch = countDownLatch;
        this.max = max;
        this.name = name;
    }

    public void run() {
        for(int i=0 ; i<20 ; i++ ){
            System.out.println("current_thread:["+ name+"] : "+ i );
            if(i == this.max){
                System.out.println("current_thread:["+ name+"] : "+ i +": count down");
                countDownLatch.countDown();
                try {
                    Thread.sleep(1000l);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}