欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

CountDownLatch源码分析,(结合aqs)

程序员文章站 2022-06-24 18:30:05
CountdownLatch源码解析(附带结合aqs)CountdownLatch案例CountDownLatch的源码分析AQSCountdownLatch案例注意:我用的jdk是15,如果源码有出入那么请参照具体版本的jdk源码希望我的解析能帮助大家理解Countdownlatch我们开发种有一个很现实的问题,具体来说:我们写线程的时候需要等一些其他线程结束才能继续进行这个线程,(比如我们早上起来煮一杯牛奶,然后去洗脸刷牙,这两个任务是可以同时发生的。但是我们要喝牛奶的时候一定是已经把牛奶煮好...

CountdownLatch源码解析(附带结合aqs)

CountdownLatch案例

注意:我用的jdk是15,如果源码有出入那么请参照具体版本的jdk源码
希望我的解析能帮助大家理解Countdownlatch
我们开发中有一个很现实的问题,具体来说:
我们写线程的时候需要等一些其他线程结束才能继续进行这个线程,(比如我们早上起来煮一杯牛奶,然后去洗脸刷牙,这两个任务是可以同时发生的。但是我们要喝牛奶的时候一定是已经把牛奶煮好了并且刷完牙洗完脸了。)而我们都知道java线程的分配是抢占式的,那么怎么让某一些线程在一些线程之前执行呢。当然我们可以使用join()函数。但是Countdownlatch比join更加灵活!

众所周知,Countdownlatch种有两个关键方法,第一个是await()。第二个是countdown();
这边为了更容易让大家理解Countdownlatch,写了一个main方法。

    public static void main(String[] args) throws InterruptedException {
        //这里设置了一个初始stat容量为5的countdownlatch,对应5个线程。在这5个线程执行完成之后才开始执行“main线程结束这句话”。
        //为了理解我们把这个5暂时称为count
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int j = 0; j < 5 ; j++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName()+"执行了");
                countDownLatch.countDown();
            },j+"线程").start();
        }
        //等待所有线程结束
        countDownLatch.await();
        System.out.println("main线程结束");
        //等待所有线程结束
        new Thread(()->{
            try {
                countDownLatch.await();
                System.out.println("次main线程结束");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }).start();
    }
/*下面是执行最终结果
1线程执行了
0线程执行了
4线程执行了
2线程执行了
3线程执行了
main线程结束
次main线程结束
*/

这边的每个线程start后进入就绪状态。之后main线程执行到countDownLatch发现countDownLatch的值还是5(有可能是4,3,2,1,这里只是为了说明问题),还不是0。那么线程就在这里挂起,阻塞,实际上它会进入AQS的阻塞队列中不停的cas请求启动线程,这个下面会分析到。
之后执行我们new的五个线程。执行完后就再次回到countDownLatch的地方,发现这个值为0了。那么就可以往下执行了。

我们来深度的分析一下countDownLatch的实现

CountDownLatch的源码分析

我们打开class结构看到:CountDownLatch源码分析,(结合aqs)

这个结构相当简单明了。我们能看到的就是有一个Sync的内部类,以及CountDownLatch的await方法和countDown方法。
而熟悉aqs的人知道CountDownLatch其实是基于AQS(AbstractQueuedSynchronizer)实现的,AQS是很多锁的底层实现,可以说是相当重要。我们翻开Sync这个内部类:CountDownLatch源码分析,(结合aqs)
这个类继承了aqs,实现了tryAcquireShared方法和tryReleaseShared方法。暂且把这个类放在这里不去分析,我们过会再来仔细分析这个类。

现在我们看countDown方法
CountDownLatch源码分析,(结合aqs)
我们发现的是,这个countDown实际上是调用了Sync这个内部类的releaseShared方法。然而这个Sync没有这个releaseShared方法,那么我们得去它的父类(即aqs)去找这个方法。CountDownLatch源码分析,(结合aqs)

这个releaseShared方法先调用了tryReleaseShared方法,这个方法其实再Sync类中是有的。我们回到上面那张Sync图片看。
这个方法实际上是死循环的CAS(compareAndSwap)

			//死循环
            for (;;) {
            //得到AQS中的state,这个state在每一个类的具体含义都不同,在countDownLatch这个类的具体实现中,读者可以把他具体理解为countdownlatch中初始化的那个5的参数
                int c = getState();
                //如果这个参数已经是0的时候那么就返回false。意思就是我们执行countdown方法的时候,如果这个count已经是0了,那么就返回错误。
                if (c == 0)
                    return false;
            //nextc就是countdown后的值,即把count-1。
                int nextc = c - 1;
                //通过cas直接在内存设置count的值,也就是在aqs中state的值
                if (compareAndSetState(c, nextc))
                //如果count是0了,就返回true
                //如果不是0,就返回false。
                //这一点相当重要!!!后面会讲到
                    return nextc == 0;
            }

那么其实每执行一次countdown方法都会执行一次releaseShared方法,也就是会执行一次Sync中的tryreleaseShared方法。
那么在执行到最后一次时,也就是count为1的执行完变0的时候,返回的时true。那么实际上就执行了signalNext()方法。
这个方法实际上就是把await的主线程给唤醒。那么主线程就可以继续运行。那么为什么次主线程也可由运行?接下来我们就分析await方法

await()
CountDownLatch源码分析,(结合aqs)

我们可以看到实际上调用了tryAcquireSharedInterruptibly方法。这个方法是AQS类中的(AbstractQueuedSynchronizer)

      public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //判断是否中断,如果不中断就去tryAcquiredShared方法,判断返回值是否小于0,实际上我们的返回值是-1;也就是这个时候count不为0,具体参见下面的tryAcquiredShared方法。
        //然后就去找acquire方法,这个方法是aqs中的核心方法
        if (Thread.interrupted() ||
            (tryAcquireShared(arg) < 0 &&
             acquire(null, arg, true, true, false, 0L) < 0))
            throw new InterruptedException();
    }

		//判断count值是否等于0 ,如果不等于就返回-1。实际上这个时候count可能等于1,2,3,4,5。
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

我们最终去找acquire方法,这个方法在jdk15中非常的长,其中我们只截取对于countdownlatch重要的一部分来分析:

//死循环
 for (;;) {
            if (!first && (pred = (node == null) ? null : node.prev) != null &&
                !(first = (head == pred))) {
                if (pred.status < 0) {
                    cleanQueue();           // predecessor cancelled
                    continue;
                } else if (pred.prev == null) {
                    Thread.onSpinWait();    // ensure serialization
                    continue;
                }
            }
            if (first || pred == null) {
                boolean acquired;
                try {
                    if (shared)//此时这个条件成立
                    //再次去计算当前的count是否为0
                        acquired = (tryAcquireShared(arg) >= 0);
                    else
                        acquired = tryAcquire(arg);
                } catch (Throwable ex) {
                    cancelAcquire(node, interrupted, false);
                    throw ex;
                }
                //如果是0了,那么实际上就设置这个线程running,然后不断的设置阻塞队列的线程running
                if (acquired) {
                    if (first) {
                        node.prev = null;
                        head = node;
                        pred.next = null;
                        node.waiter = null;
                        if (shared)
                            signalNextIfShared(node);
                        if (interrupted)
                            current.interrupt();
                    }
                    return 1;
                }
            }

这样所有的线程就得以运行啦,也就是block打破了~~~
最后讲讲aqs。

AQS

实际上aqs就是一个fifo的双向队列。countdownlatch就基于这个实现。aqs有一个aqs阻塞队列和很多的condition队列。以及关键的stat的int变量。这个stat变量在具体的实现中的意义不同,在countdownlatch中就代表count的值。而在ReentrantReadWriteLock中的高16位表示读状态,低16位表示写状态,在semaphore中代表的是可用信号的个数。在ReentrantLock中代表的就是重入的次数。

其中有acquire的独占方法和acquireshared的共享方法。这个独占和共享并不是说这个方法独占和共享。而是线程独占和共享。
这个也基于具体的实现。像ReetrantLock一般是用独占(也有共享)。而countdownlatch只有共享。

还有release方法和releaseShared方法,同上,是对应的独占和共享。
acquire方法可以具体理解为进入aqs队列,请求去添加这个线程。同时改变具体的stat值。
release方法可以具体理解为出aqs队列,执行线程,同时改变具体的stat值。以及执行的线程会被gc回收。
在执行这方法acquire方法之前要先实现tryAcquire。这个待具体的继承类实现,如果条件成立,也就是具体的类允许这个线程去入队,那么再去执行acquire,去入队。

而condtion用于挂起线程的,具体的线程会进一个挂起队列,意思是,这边队列的线程暂时不能执行,要等对应的人去唤醒我们才能把他放到aqs队列中去。

最后再回到countdownlatch,实际上await方法就是使得线程进入了aqs队列不断的去cas请求执行。
而countDown方法的具体作用就是等到stat值(现在我们讲stat了,之前一直说的是count)变为0 时,去把这个aqs队列全部的线程执行一遍。

本文地址:https://blog.csdn.net/m0_46814474/article/details/110006383