JUC—CountDownLatch闭锁源码深度解析
基于JDK1.8详细介绍了CountDownLatch闭锁的原理和应用,以及CountDownLatch对于AQS框架的巧妙使用!
文章目录
1 CountDownLatch的概述
public class CountDownLatch
extends Object
CountDownLatch来自于JDK1.5的JUC包,是一种同步工具,常被称为“闭锁”,也叫做“倒计数器”。在完成一组正在其他线程中执行的操作之前,CountDownLatch允许一个或多个线程一直等待。
很明显,这类似于在开始某个行为之前的准备操作。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。或者说CountDownLatch是一个同步辅助类,允许一个或多个线程等待其他线程完成操作。
想要真正明白CountDownLatch的原理,必然离不开AQS!
2 CountDownLatch的原理
2.1 基本结构
通过uml类图可知,CountDownLatch内部同样使用的AQS来实现它的功能的,我们可以大胆猜测,CountDownLatch的这个“倒计数”操作和AQS的state同步状态属性有关。内部类Sync实现了AbstractQueuedSynchronizer,这就类似于锁了。
CountDownLatch的构造函数接收一个int类型的count参数作为计数器,如果你想等待N个计数操作,这里就传入N。通过构造函数,实际上是把count的值赋给了AQS 的同步状态属性state ,也就是这里使用AQS 的state状态值来表示计数器值。
CountDownLatch的Sync实现中,重写了tryAcquireShared和tryReleaseShared方法,很明显是一个共享锁的实现。
一般情况下我们在释放锁的时候会将state资源增加,获得锁的时候会将state资源减少,但是CountDownLatch则不一样:
- 在尝试获取锁的tryAcquireShared方法中,仅仅是判断如果state为0,就表示获得了锁,其他情况下都表示没有获得锁。
- 而在尝试释放锁的tryReleaseShared方法中,虽然名曰释放锁,但是却是在对state尝试自减操作,它的内部是一个循环操作,每一次的调用tryReleaseShared都会首先判断state是否为0,如果是,那么返回false表示“释放锁失败”,如果不是那么尝试CAS的将state自减1,CAS成功之后会判断此时的值是否为0,如果不是那么表示“释放锁失败”,返回false,否则表示“释放锁成功”,返回true,这里的操作可以永远保证只有一个线程能够因为“释放锁成功”而返回true。
/**
* 倒计数的同步控制,使用AQS的state表示倒计数,和一般的“锁”实现不一样
*/ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; /**
* 构造器
*
* @param count 计数器
*/ Sync(int count) { //设置state初始值 setState(count); } /**
* @return 获取计数器值,实际上就是获取state值
*/ int getCount() { return getState(); } /**
* 尝试共享式获取锁
* 实际上仅仅是一个判断操作,只有state=0的时候才会返回1
*
* @param acquires 参数,在实现的时候可以传递自己想要的数据,这里没什么用
* @return 返回大于等于0的值表示获取成功,否则失败。
*/ protected int tryAcquireShared(int acquires) { //判断state是否等于0,如果是那么返回1,否则返回-1 return (getState() == 0) ? 1 : -1; } /**
* 尝试共享式释放锁
* 实际上仅仅是一个判断-自减操作,只有state=0的时候才会返回true
*
* @param releases 参数,在实现的时候可以传递自己想要的数据,这里没什么用
* @return 返回true表示释放成功,否则失败。
*/ protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero /*
* 开启一个循环,实际上是state的自减以及是否唤醒等待线程的操作
*/ for (; ; ) { //获取state的值c int c = getState(); //如果c为0,那么返回false,表示释放失败 if (c == 0) return false; //否则c大于0,尝试CAS更新state为state-1,更新失败直接重试 int nextc = c - 1; if (compareAndSetState(c, nextc)) //CAS成功之后再次判断nextc是否为0 //如果为0,说明是最后一个CAS成功的线程,返回true;如果不为0,说明不是最后一个CAS成功的线程,返回false //这样可以通过CAS控制永远只有一条线程能够返回true,随后唤醒因调用CountDownLatch 的await 方法而被阻塞的线程 return nextc == 0; } } } private final Sync sync; /**
* 构造一个用给定计数初始化的 CountDownLatch。
*
* @param count 在线程能通过 await() 之前,必须调用 countDown() 的次数
* @throws IllegalArgumentException 如果 count 为负数
*/ public CountDownLatch(int count) { //count校验 if (count < 0) throw new IllegalArgumentException("count < 0"); //初始化Sync,使用传的参数 this.sync = new Sync(count); }
这样的“反常规”操作有什么用呢?别急,看看CountDownLatch相关方法就知道了!
2.2 await()方法
public void await()
需要等待的线程调用。调用该方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回:
- 当计数器的值为0 时;
- 其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程就会抛出InterruptedException 异常,然后返回。
根据源码,想要调用await方法的线程能够返回,一般情况下需要获取到共享锁,而CountDownLatch内部的tryAcquireShared返回大于0的要求是state为0,即只有在state为0的时候,调用await方法的线程才能能够返回。
/**
* CountDownLatch 的await方法
*
* @throws InterruptedException 等待时被中断
*/ public void await() throws InterruptedException { //调用了AQS 的acquireSharedInterruptibly方法,共享式可中断获取锁 sync.acquireSharedInterruptibly(1); } /**
* AQS 的acquireSharedInterruptibly方法
* 共享式获取同步状态,可以被中断,在AQS部分我们已经讲过了
*
* @param arg 参数,在实现的时候可以传递自己想要的数据,这里没什么用
* @throws InterruptedException 等待时被中断
*/ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //如果线程被中断则抛出异常 if (Thread.interrupted()) throw new InterruptedException(); //tryAcquireShared方法由AQS的子类实现,尝试共享式获取锁,如果返回值小于0,表示获取失败 if (tryAcquireShared(arg) < 0) //获取锁失败的线程进入AQS的队列等待,在被唤醒之后还是会继续调用tryAcquireShared获取锁,直到获得锁成功 doAcquireSharedInterruptibly(arg); }
2.3 await(timeout, unit)方法
public boolean await(long timeout, TimeUnit unit)
需要等待的线程调用。调用该方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回:
- 当计数器值为0 时,这时候会返回true ;
- 设置的timeout 时间到了,因为超时而返回false ;
-
其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程就会抛出InterruptedException 异常,然后返回。
可以发现,和await方法一样,即只有在state为0的时候,调用await方法的线程才能能够正常返回true,同时加入了超时操作,一段时间范围内state还不为0,则失败返回false。
/**
* CountDownLatch 的await( timeout, unit)方法
* 超时等待
*
* @param timeout 等待时间
* @param unit 时间单位
* @return true 成功 false 失败
* @throws InterruptedException 被中断
*/ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { //调用了AQS 的tryAcquireSharedNanos方法,共享式超时可中断获取锁 return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /**
* AQS 的tryAcquireSharedNanos方法
* 共享式超时获取锁,可以被中断,在AQS部分我们已经讲过了
*
* @param arg 参数
* @param nanosTimeout 超时时间,纳秒
* @return 是否获取锁成功
* @throws InterruptedException 被中断
*/ public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { //最开始就检查一次,如果当前线程是被中断状态,直接抛出异常 if (Thread.interrupted()) throw new InterruptedException(); //下面是一个||运算进行短路连接的代码 //tryAcquireShared尝试获取锁,获取到了(返回大于等于0)直接返回true //获取不到(左边表达式为false) 就执行doAcquireSharedNanos方法 //doAcquireSharedNanos等待一段时间,直到途中计数器变成了0就返回,或者时间到了自动返回,或者等待时被中断 return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
2.4 countDown()方法
public void countDown()
需要准备线程调用。如果当前计数(也就是state)等于0,则什么也不做;如果当前计数大于0,则尝试CAS将计数器递减1,递减成功如果新的计数为零,出于线程调度目的,将唤醒所有的因为调用await而等待的线程。
这个countDown方法不会阻塞调用该方法的线程!
/**
* CountDownLatch的countDown方法
*/ public void countDown() { //调用了AQS 的releaseShared方法,共享式释放锁 sync.releaseShared(1); } /**
* AQS 的releaseShared方法,共享式释放锁
*
* @param arg 参数
* @return true 成功 false 失败
*/ public final boolean releaseShared(int arg) { //tryReleaseShared方法由AQS的子类实现,尝试共享式获取锁,如果返回值小于0,表示获取失败 //在CountDownLatch的Sync子类实现中,如果state自减之后为0,则返回true if (tryReleaseShared(arg)) { /*state自减之后为0,调用doReleaseShared唤醒因调用CountDownLatch 的await 方法而被阻塞的线程*/ doReleaseShared(); return true; } //state为0或者state自减之后不为0,那么不调用AQS 的doReleaseShared方法,不会唤醒因调用CountDownLatch 的await 方法而被阻塞的线程 return false; }
2.5 getCount()方法
public long getCount()
获取当前计数器的值,也就是AQS 的state 的值。
/**
* CountDownLatch的方法
* @return
*/ public long getCount() { return sync.getCount(); }
3 CountDownLatch的使用
在JDK1.5之前,为了完成CountDownLatch的功能,我们通常使用thread.join方法方法
在JDK 1.5之后的并发包中提供的CountDownLatch也可以实现join的功能,并且比join的功能更多。并且配合线程池实现更加优雅的编码,传统join方法无法在线程池中使用join。
案例:要求等两个子线程执行完毕之后,主线程才能开始执行。
class CountDownLatchTest { /**
* 传统join实现线程等待
*/ static class JoinRun { public static void main(String[] args) throws InterruptedException { Runnable run = () -> { try { Thread.sleep(100); System.out.println("子线程" + Thread.currentThread().getName() + "正在执行"); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕"); }; Thread parser1 = new Thread(run, " 1 "); Thread parser2 = new Thread(run, " 2 "); parser1.start(); parser2.start(); System.out.println("主线程等待2个子线程执行完毕"); parser1.join(); parser2.join(); System.out.println("2个子线程已经执行完毕"); System.out.println("继续执行主线程"); } } /**
* CountDownLatch实现线程等待
*/ static class CountDownLatchRun1 { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(2); Runnable run = () -> { try { Thread.sleep(100); System.out.println("子线程" + Thread.currentThread().getName() + "正在执行"); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕"); latch.countDown(); }; Thread parser1 = new Thread(run, " 1 "); Thread parser2 = new Thread(run, " 2 "); parser1.start(); parser2.start(); System.out.println("主线程等待2个子线程执行完毕"); latch.await(); System.out.println("2个子线程已经执行完毕"); System.out.println("继续执行主线程"); } } /**
* CountDownLatch配合线程池使用
*/ static class CountDownLatchRun2 { public static void main(String[] args) throws InterruptedException { final CountDownLatch latch = new CountDownLatch(2); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, 2, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); Runnable run = () -> { try { Thread.sleep(100); System.out.println("子线程" + Thread.currentThread().getName() + "正在执行"); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕"); latch.countDown(); }; threadPoolExecutor.execute(run); threadPoolExecutor.execute(run); threadPoolExecutor.shutdown(); System.out.println("主线程等待2个子线程执行完毕"); latch.await(); System.out.println("2个子线程已经执行完毕"); System.out.println("继续执行主线程"); } } }
4 CountDownLatch的总结
CountDownLatch巧妙地利用了AQS的共享锁的实现原理,构造器要求传入N个点,实际上就是state的初始值。一个线程调用await方法会阻塞当前线程,直到state变成零。而当我们调用一次countDown方法时,state就会尝试自减1。
在tryAcquireShared中只有state为0才表示“获取到了锁”,否则就会阻塞调用线程;而在tryReleaseShared中只有state自减之后值为0才表示“成功释放了锁”,即只有当某个countDown方法将state变成0的时候,此时表示“成功释放了锁”,随后就会唤醒因为调用await方法而阻塞的线程,被唤醒的线程会判断到此时state=0,因此可以返回!
可以发现CountDownLatch对于state的描述和普通锁不一样:countDown释放锁的时候要求state大于0,并且state会反向自减;await获取锁的时候则要求state等于0。这个state就是一个倒计数!这也从侧面反映出了AQS功能的强大,我们可以借用AQS非常简单的实现自己的同步组件,而不仅仅拘泥于“锁”!
countDown方法可以用在任何地方,这里的初始值N,可以是N个线程执行完毕之后调用N次countDown方法,也可以是1个线程里的N次调用countDown方法。
计数器必须大于等于0,只是等于0时候,计数器就是零,调用await方法时不会阻塞当前线程。另外,CountDownLatch不能重新初始化或者修改CountDownLatch对象的count值,因此一个倒计数器只能使用一次!
CountDownLatch一般用来确保某些活动直到其他活动都完成才继续执行,比如:
- 确保某个计算在其需要的所有资源都被初始化之后才继续执行;
- 确保某个服务在其依赖的所有其他服务都已经启动之后才启动;
- 等待直到某个操作所有参与者都准备就绪再继续执行。
CountDownLatch的源码看起来非常简单,那是因为复杂的线程等待、唤醒机制都被AQS同步器框架实现了,如果想要真正了解CountDownLatch的原理,那么AQS的实现必须要了解。同时AQS也是JUC中基本上所有的锁和同步组件的实现基石,比如我们现在讲的CountDownLatch。本文没有讲解AQS的原理,因为那实在太多了,如果真的想要学习AQS,那么看看下面的文章吧!
本文地址:https://blog.csdn.net/weixin_43767015/article/details/108035192
推荐阅读
-
spring源码深度解析— IOC 之 默认标签解析(上)
-
spring源码深度解析— IOC 之 循环依赖处理
-
CountDownLatch源码解析之await()
-
CountDownLatch源码解析之countDown()
-
JUC—CountDownLatch闭锁源码深度解析
-
spring源码深度解析— IOC 之 开启 bean 的加载
-
Concurrent -- 05 -- CountDownLatch源码解析
-
spring源码深度解析— IOC 之 默认标签解析(下)
-
【RocketMQ源码深度解析2】源码目录结构介绍&Remoting通信层
-
JUC—CopyOnWriteArrayList源码深度解析