啰里吧嗦式CountDownLatch 一
java.util.concurrent
class countdownlatch
目录
- countdownlatch 是什么
countdownlatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行
- countdownlatch 怎么用
countdownlatch是通过一个计数器来实现的,计数器的初始值为线程的数量,这个值只能被设置一次且后期无法更改
每当一个线程完成了自己的任务后,计数器的值就会减1
当计数器值到达0时,它表示所有的线程已经完成了任务,然后在 闭锁上等待的线程就可以恢复执行任务
线程必须在启动其他线程后立即调用 countdownlatch.await() 方法 这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务,并且调用countdownlatch实例的countdown()方法。 每调用一次这个方法,在构造函数中初始化的count值就减1 直到计数器为0的时候, 停止阻塞
- countdownlatch 案例
话不多说,直接上
import java.util.concurrent.countdownlatch; public class testcountdownlatch { static int n = 0; public static void main(string[] args) { int thread_num = 10; final countdownlatch countdown = new countdownlatch(thread_num); long start = system.currenttimemillis(); for (int i =0; i<thread_num; i++) { //模拟多线程执行任务 ,启动10个线程, new thread(new runnable(){ @override public void run() { // 比如你想测多线程环境下 饿汉式懒汉式 执行效率 // 可在里面执行要测试的代码,我就简单模拟下 for (int i =0; i<1000; i++) { n++; } system.out.println("线程:" + thread.currentthread().getname()+" 任务执行完毕"); //计数器减一 countdown.countdown(); } }).start(); } try { //主线程就一直阻塞了 countdown.await(); } catch (interruptedexception e) { // todo auto-generated catch block e.printstacktrace(); } system.out.println("线程:" + thread.currentthread().getname()+" 恢复,开始接着执行"); long end = system.currenttimemillis()-start; system.out.println("执行时间:" + end); } }
执行结果
线程:thread-0 任务执行完毕 线程:thread-2 任务执行完毕 线程:thread-1 任务执行完毕 线程:thread-4 任务执行完毕 线程:thread-3 任务执行完毕 线程:thread-5 任务执行完毕 线程:thread-6 任务执行完毕 线程:thread-8 任务执行完毕 线程:thread-7 任务执行完毕 线程:thread-9 任务执行完毕 线程:main 恢复,开始接着执行 执行时间:2
可见主线程之前一直被阻塞,直到所有的线程都执行完毕,再接着执行
如果不使用countdownlatch, 那么可能其他线程还没执行完, 主线程就结束了, 主线程又不是守护线程
类似这样
线程:thread-0 任务执行完毕 线程:thread-1 任务执行完毕 线程:thread-3 任务执行完毕 线程:thread-7 任务执行完毕 线程:thread-2 任务执行完毕 线程:thread-5 任务执行完毕 线程:main 恢复,开始接着执行 执行时间:19 线程:thread-4 任务执行完毕 线程:thread-9 任务执行完毕 线程:thread-6 任务执行完毕 线程:thread-8 任务执行完毕
题外话,如果不使用countdownlatch有没有其他的办法,其实也有
去掉 count相关代码, 加一句 while(thread.activecount()>1) //保证前面的线程都执行完 thread.yield(); system.out.println("线程:" + thread.currentthread().getname()+" 恢复,开始接着执行");
- countdownlatch 源码解析
首先,如果让你实现这个工具类, 可想的办法有哪些
1. 比如 在主线程执行的代码里 , 用 threadb.join(), 先执行 b线程的join方法, 再执行主线程 2. 比如 用object对象的 wait(),和notify() notifyall()方法, 需要注意的是这两个方法需要配合着 synchronized 一起使用, 不然会报 java.lang.illegalmonitorstateexception 亲测, 并且使用wait 锁住的对象 和 notify 唤醒 释放锁的对象必须是同一个
为了方便eclipse步入,我写了个测试类, 然后打断点看的更清楚,
这里模拟countdown简单写了下
import java.util.concurrent.locks.abstractqueuedsynchronizer; public class testcountdownlatch1 { private static final class sync extends abstractqueuedsynchronizer { private static final long serialversionuid = 4982264981922014374l; sync(int count) { system.out.println("this" + this); setstate(count); } int getcount() { return getstate(); } protected int tryacquireshared(int acquires) { return (getstate() == 0) ? 1 : -1; } protected boolean tryreleaseshared(int releases) { // decrement count; signal when transition to zero for (;;) { int c = getstate(); if (c == 0) return false; int nextc = c-1; if (compareandsetstate(c, nextc)) return nextc == 0; } } } private final sync sync; public testcountdownlatch1(int count) { if (count < 0) throw new illegalargumentexception("count < 0"); this.sync = new sync(count); } public void await() throws interruptedexception { sync.acquiresharedinterruptibly(1);//在这里打上断点 } public void countdown() { sync.releaseshared(1);//在这里打上断点 } public static void main(string[] args) { final testcountdownlatch1 a = new testcountdownlatch1(11); try { a.await(); } catch (interruptedexception e) { // todo auto-generated catch block e.printstacktrace(); } } }
首先看countdownlatch的构造方法
public countdownlatch(int count) { if (count < 0) throw new illegalargumentexception("count < 0"); this.sync = new sync(count); }
发现初始化了sync实例, 并且传入了计数器的值
进入sync构造器,
sync(int count) { setstate(count); }
setstate点进去,发现是继承的abstractqueuedsynchronizer类里的方法, 简称aqs,给抽象类的
private volatile int state;
state赋值, 可以猜到 此变量就是实际用来表示计数器的值, 至于为什么要用 volatile关键字, 有兴趣的童鞋可以去看看这篇博客
https://www.cnblogs.com/dolphin0520/p/3920373.html
简单来说volatile关键字保证了其对线程的透明性, 用其修饰的代码 jvm 保证了其的 可见性和有序性 ,相对来说更安全
具体来说就是 当此变量被修改, 会被立即刷新到主存,并且将其他线程的缓存行置为失效状态
被它修饰的变量 不会被进行指令重排序
简单的猜想下,countdown.await();就是阻塞线程, 然后不停的检查state的值, 如果为0, 则停止阻塞
而 countdown.countdown(); 就是将计数器的值减一
好, 现在看countdown的await方法, 将testcountdownlatch1的断点打好, 然后debug as 启动该类
f5步入 sync.acquiresharedinterruptibly(1); 方法, 发现sync并没有实现该方法, 使用的是aqs里的
public final void acquiresharedinterruptibly(int arg) throws interruptedexception { if (thread.interrupted()) throw new interruptedexception(); if (tryacquireshared(arg) < 0) doacquiresharedinterruptibly(arg); }
这方法的名字叫 获得 共享的 断点? ,
方法声明了一个interruptedexception异常,表示调用该方法的线程支持打断操作,如果中断了,清除掉, 捕获异常,再接着往下执行
这里先检查了下 线程的中断状态 , 这里要说下, thread.interrupted()方法
public static void main(string[] args) { system.out.println(thread.currentthread().getname()); system.out.println(thread.interrupted()); thread.currentthread().interrupt(); system.out.println(thread.interrupted()); system.out.println(thread.interrupted()); )
main
false
true
false
该方法是获得线程的中断状态,并且会清除线程的中断
再接着往下看, 别忘了此时的arg 是1 , 虽然在countdownlatch工具类中没有用到, 但其他工具类有可能会用
aqs有两套方式获取锁,一个独占式,一个共享式
独占式就是只能一个线程访问,例如reentrantlock,同步队列每次也只唤醒一个线程;
共享式就是多个线程访问,例如countdownlatch,同步队列唤醒头节点,然后依次唤醒后面所有节点,实现共享状态传播
方法名:尝试 获得 共享 ,
tryacquireshared(arg) < 0 马后炮猜猜这个方法的作用, 这个方法应该是判断计数器是否为0, 为0 则不阻塞了, 线程接着往下走, 不为0 , 则继续阻塞 返回true 接着执行doacquiresharedinterruptibly
接着f5步入, 发现是countdownlatch的syn内部静态类自己重写了此方法
根据名字和判断<0我觉得这个方法的含义是 返回值 如果 >=0, 那么就是 获得 共享了, 然后停止阻塞, 线程接着往下执行
返回负数 就表示 获取失败, 接着阻塞吧
protected int tryacquireshared(int acquires) { return (getstate() == 0) ? 1 : -1; }
果然, 入参acquires,也就是 arg 是没什么用的, 它判断 aqs的state 计数器是不是0 , 如果为0 返回1 ,
那么 1<0 为false ,方法直接结束退出
我们代码里设置的是10, 返回-1,那么接着看 doacquiresharedinterruptibly 方法
aqs里的 方法名: 去做 获得 共享 中断 --不必在意 瞎解释的
private void doacquiresharedinterruptibly(int arg) throws interruptedexception { final node node = addwaiter(node.shared); // 添加node节点 不明白为什么要这样写 static final node shared = new node(); 一个静态的node对象 ......... }
这里先看下aqs里的一个代码图
* +------+ prev +-----+ +-----+ * head | | <---- | | <---- | | tail * +------+ +-----+ +-----+ *
接着进去看 addwaiter方法, 名字上看是 添加等待者,
这里实际上要说下aqs,抽象的同步队列, aqs里有个 static final class node {}, 静态内部类,
该类里面有 volatile node prev; // 指向 当前节点的 前一个节点
volatile node next; // 指向 当前节点的 后一个节点
volatile thread thread; //放入线程 包装线程
当然如果是头节点,那么它的prev为null,同理尾节点的next为null
然后aqs里有
private transient volatile node head;
private transient volatile node tail;
用来表示同步队列的头节点和尾节点
---
接着f5步入addwaiter方法
private node addwaiter(node mode) { node node = new node(thread.currentthread(), mode); //包装节点 当前节点 置入 当前线程对象 和 node对象 // try the fast path of enq; backup to full enq on failure node pred = tail; // 声明尾节点 if (pred != null) { // node.prev = pred; // 如果尾节点不为空, 那么新节点 的 前一个节点 是尾节点 , if (compareandsettail(pred, node)) { // 毕竟是多线程操作, 1-n个线程都能被阻塞, 等待, 添加到队列里, 有volatile关键字还不够 // 还需要 cas 方式替换aqs里的 尾节点对象 compareandsettail , 会比较 pred 和 现在aqs的尾节点是不是一个对象 // 如果是 则替换 node 为新的尾节点 替换成功 , 则之前的尾节点的 next 指向 新的尾节点 pred.next = node; return node; // } } enq(node); //我们只有主线程阻塞, 而且是第一次进来, 所以尾节点 头节点 肯定都是空的, 所以走这里 return node; }
额外小芝士:
很多人不明白compareandsettail(pred, node) 是什么, 这个其实是cas, compare and swap, 先比较 , 再替换, 只有比较的和预期对象相等, 才会替换成新的对象
模仿着写个小荔枝
package thead1; import sun.misc.unsafe; import java.lang.reflect.field; public class testunsafe { public static void main(string[] args) { node node = new node(); /** * 通过cas方法更新node的next属性 * 原子操作 */ node n = new node(); boolean flag = node.casnext(null,n);// 一开始的 volatile node next; 确实是null system.out.println(flag); //true ,被更新成n flag = node.casnext(new node(),new node()); //没更新 , 因为现在 next 应该是n 指向的对象 system.out.println(flag);//false flag = node.casnext(n,new node()); system.out.println(flag);//true } private static class node{ volatile node next; /** * 使用unsafe cas方法 * @param cmp 目标值与cmp比较,如果相等就更新返回true;如果不相等就不更新返回false; * @param val 需要更新的值; * @return */ boolean casnext(node cmp, node val) { /** * compareandswapobject(object var1, long var2, object var3, object var4) * var1 操作的对象 * var2 操作的对象属性 而这个offset只是记录该属性放哪 , 比较的应该是属性 所指的对象 的地址 * var3 var2与var3比较,相等才更新 * var4 更新值 */ system.out.println("nextoffset : " +nextoffset + " this " +this + " cmp " +cmp + " val " +val + " next " + next); boolean a = unsafe.compareandswapobject(this, nextoffset, cmp, val); system.out.println(" next " + next + " 更新结果 " + a); return a; } private static final sun.misc.unsafe unsafe; private static final long nextoffset; static { try { unsafe = getunsafe(); class<?> k = node.class; nextoffset = unsafe.objectfieldoffset (k.getdeclaredfield("next")); } catch (exception e) { throw new error(e); } } /** * 获取unsafe的方法 * * @return */ public static unsafe getunsafe() { try { field f = unsafe.class.getdeclaredfield("theunsafe"); f.setaccessible(true); return (unsafe)f.get(null); } catch (exception e) { return null; } } } }
会发现第一次更新成功 , 应该刚new 的node对象 next属性为null ,
还记得 之前的volatile 关键字吗 , 由于不保证原子性 , 如果多个线程进行更新, 就会出现问题
比如 i++ 可以拆分成3个动作
读取i的原始值 i副本压入操作数栈
对i进行+1 操作,
弹出操作数栈,写入主存
比如线程a 读取i 的值10, 正准备向cpu发送指令 +1时被阻塞了, 线程a由于还没修改, 不会导致线程b的工作内存中缓存变量inc的缓存行无效
然后线程b 也去读, 线程a还没修改, 线程b 读内存的值10 , +1 , 然后把11 写入工作内存,写入主存 volatile虽然保证线程b修改后可以另其他线程缓存行失效,并立即写入主存
但此时线程a已经读到了i的值,
线程a已经读取到了值, 不在涉及读操作, 所以并没有更新缓存,(我的理解是如果线程a 还需要读, 那么才会发现自己的缓存失效了, 那么才从主存读11)
之前已经把操作数放入了自己的操作数栈中 线程a才中断的 cpu由于保存了上次线程a的工作状态
因此, 轮到线程a工作时, 会继续上次的操作, 即: 开始对操作数栈中的数进行+1操作, 然后立即刷回主存, 因此不再涉及读操作,否则cpu保存线程的工作状态将毫无意义
变成11 写入主存
两次操作,只加了1
写个例子证明下
package thead1; import java.lang.reflect.field; import sun.misc.unsafe; public class testvolatile { private volatile int i = 0 ; private int j = 0 ; private volatile int next = 0 ; private static final sun.misc.unsafe unsafe; private static final long nextoffset; public static unsafe getunsafe() { try { field f = unsafe.class.getdeclaredfield("theunsafe"); f.setaccessible(true); return (unsafe)f.get(null); } catch (exception e) { return null; } } static { try { unsafe = getunsafe(); class<?> k = testvolatile.class; nextoffset = unsafe.objectfieldoffset (k.getdeclaredfield("next")); } catch (exception e) { throw new error(e); } } public final boolean compareandsetstate(int expect, int update) { // see below for intrinsics setup to support this return unsafe.compareandswapint(this, nextoffset, expect, update); } public void add () { i++; } public void add1 () { synchronized(this) { j++; } } public void add2 () { int c = next; int nextc = c+1; for ( ;; ){ if (compareandsetstate(c, nextc)){ return; } } } public static void main(string[] args) { final testvolatile tr = new testvolatile(); for (int i =0; i<10; i++) { new thread(new runnable(){ @override public void run() { // todo auto-generated method stub for (int i =0; i<300; i++) { tr.add(); tr.add1(); //tr.add2(); } } }).start();; } for (int i =0; i<10; i++) { new thread(new runnable(){ @override public void run() { // todo auto-generated method stub for (int i =0; i<300; i++) { tr.add2(); } } }).start();; } while(thread.activecount()>1) //保证前面的线程都执行完 thread.yield(); //然main方法等到他们都执行完了在打印 system.out.println(tr.i); system.out.println(tr.j); system.out.println(tr.next); } }
可以看到变量i 虽然加了volite, 依然不能保证每次执行的结果是3000,
synchronized是用来对比的
线程方法里面的循环可以设置成10000会更明显点, i总是低于10w的一个数
那么用cas原子性的方式去更改能不能保证呢, 答案是肯定了, 我试了很多次
next的结果和 j的结果都一样 ,
附: 有个小疑问, 就是当线程里循环的次数是1w时, 很容易停住不动, 是产生死锁了吗
所以才用的300 200来测试
好的, 题外话说完, 在接着回到aqs,
addwaiter 方法里, 由于我们是第一次进入, 所以aqs的尾节点肯定是空的, 执行enq()方法
private node enq(final node node) { for (;;) {//死循环 node t = tail;//拿到尾节点 if (t == null) { // must initialize if (compareandsethead(new node()))// 必须初始化尾节点, 还是cas, 判断头节点是空的, 那么就new 一个节点实例给 头节点 tail = head; // 头节点 尾节点 都用一个 实例对象 } else { node.prev = t; // 尾节点不为空 将 当前节点的 prev 前一个节点执行 尾节点 head tail <----prev---node if (compareandsettail(t, node)) {// 只有将 尾节点 替换为 当前节点 这个时候方法才结束 退出 t.next = node; head tail <----prev---node 就是新的尾节点 return t; -----next ---> } } } }
这个方法很简单, 就是初始化尾节点和头节点, 并且设置 当前node 为新的尾节点, 然后把前后关系都关联上 ,在回到addwaiter方法
然后返回 新加的 这个尾节点
在回到doacquiresharedinterruptibly
private void doacquiresharedinterruptibly(int arg) throws interruptedexception { final node node = addwaiter(node.shared);// 添加新的节点 为尾节点 并且初始化节点 并且设置新的节点为尾节点 暂时不明白为什么要包装一下 塞一个静态的node对象 boolean failed = true; try { for (;;) {//死循环 注意退出条件 final node p = node.predecessor();//不带着看了, 点进去其实就是 返回 当前节点的 上一个节点 ,如果为空抛异常, if (p == head) {// 如果 当前节点的 上一个节点 就是 头节点 , 我们第一次进来 其实是的, 还记得 enq里的方法吗 , 头尾节点都是一个地址, 当前节点是尾节点, 指向上一个节点即头尾 int r = tryacquireshared(arg); // 不解释了 子类重写的方法 自己定义什么情况下能够获得共享 , 不在阻塞 , 第一次进来肯定是 -1 if (r >= 0) { setheadandpropagate(node, r); p.next = null; // help gc failed = false; return;// 所以第一次进来无法退出 , 然后我发现f6一直走 ,到了判断下面的if 条件后, 走两遍, eclipse的步入 下一步都置灰了, 可能是判断如果没有新的条件, 死循环无法退出吧, //所以一直阻塞着这里 } } if (shouldparkafterfailedacquire(p, node) && parkandcheckinterrupt()) throw new interruptedexception(); } } finally { if (failed) // 只有 上面第二个if条件 中断退出 才会执行 这个方法 cancelacquire(node); } }
里面还有几个方法没讲, 以后在细讲吧
看到这里其实 countdown的方法也能猜到大概了
其实就是 想办法让 state技术器的值减1 , 还得保证线程安全,
volatile其实适合一写多读, 如果多个线程都写, 那么就需要cas去更新
由于我们测试代码是阻塞一个main线程, 其实countdownlatch能同时阻塞多个线程, 所以才用到队列
然后await()方法死循环里检测到条件满足了, 就退出死循环,退出阻塞, 接着往下执行了
参考:https://www.cnblogs.com/yanphet/p/5788260.html
水平有限,欢迎讨论
上一篇: 一:MyBatis知识整理(1)
下一篇: 抖音账单能删除吗 抖音账单查看方法介绍