CountDownLatch,jdk源码阅读(六)
程序员文章站
2022-06-01 13:40:52
...
CountDownLatch
CountDownLatch控制线程的执行顺序
CountDownLatch中主要用的方法一个是await(),调用这个方法的线程会被阻塞,另外一个是countDown(),调用这个方法会使计数器减1,当计数器的值为0时,调用await()方法被阻塞的线程会被唤醒,继续执行。
CountDownLatch和join的区别
在当前线程中,如果调用某个thread的join方法,那么当前线程就会被阻塞,直到thread线程执行完毕,当前线程才能继续执行。join的原理是,不断的检查thread是否存活,如果存活,那么让当前线程一直wait,直到thread线程终止,线程的this.notifyAll 就会被调用。
而CountDownLatch只需要检查计数器的值为零就可以继续向下执行。(即可以在thread中加上countDown()方法,只要计数器值为0,阻塞的线程就可以继续执行,不需要等待thread执行完毕),所以,CountDownLatch比join更灵活,可以实现更复杂的业务场景。
源码分析
- 初始化
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// 初始化一个同步器
this.sync = new Sync(count);
}
- 初始化同步器
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
// count存放在state中
setState(count);
}
}
- void await()
public void await() throws InterruptedException {
// 获取共享锁
sync.acquireSharedInterruptibly(1);
}
// 获取共享锁
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 判断state值是否为0
if (tryAcquireShared(arg) < 0)
// 获取共享锁
doAcquireSharedInterruptibly(arg);
}
// 判断state值是否为0
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
- void doAcquireSharedInterruptibly(int arg)
// 获取共享锁
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 将此还未获得共享锁的线程放入节点且加入到队列中(AQS队列)并返回此节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
// 自旋
try {
for (;;) {
// 获取上一个节点
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 和互斥锁锁的区别在这里,设置头结点并且传递不断的去唤醒阻塞的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 把节点设置为SIGNAL状态
if (shouldParkAfterFailedAcquire(p, node) &&
// 挂起,被唤醒后在这接着执行
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 抢占锁失败后是否要挂起,true挂起,false不挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// SIGNAL允许被唤醒
if (ws == Node.SIGNAL)
return true;
// CANCELLED>大于0
if (ws > 0) {
// 移除取消状态的线程节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 把waitStatus等待状态的值替换为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 挂起,这里挂起后除了unpark可以唤醒,阻塞的线程的interrupt()方法可以唤醒处于park状态下的线程
LockSupport.park(this);
// 被唤醒后接着这里继续执行
// 获得当前中断标记,并复位
return Thread.interrupted();
}
- void countDown()
public void countDown() {
// 释放共享锁(1次减1)
sync.releaseShared(1);
}
// 释放共享锁(1次减1)
public final boolean releaseShared(int arg) {
// 尝试释放共享锁(1次减1)
if (tryReleaseShared(arg)) {
// 如果state=0,返回true进入到这里,说明可以进行释放共享锁
doReleaseShared();
return true;
}
return false;
}
// 尝试释放共享锁(1次减1)
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
- void doReleaseShared()
// 释放共享锁
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
- void unparkSuccessor(Node node)
// 唤醒
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 去除无效节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒
LockSupport.unpark(s.thread);
}
- 唤醒之后,沿着被阻塞的地方接着执行,正常情况返回的是false所以不抛异常,再次进入for循环判断state是否=0,此时一定=0,所以进入if
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 将此还未获得共享锁的线程放入节点且加入到队列中(AQS队列)并返回此节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
// 自旋
try {
for (;;) {
// 获取上一个节点
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 和互斥锁锁的区别在这里,设置头结点并且传递不断的去唤醒阻塞的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 把节点设置为SIGNAL状态
if (shouldParkAfterFailedAcquire(p, node) &&
// 挂起,被唤醒后在这接着执行
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
int r = tryAcquireShared(arg);
if (r >= 0) {
//和互斥锁锁的区别在这里,设置头结点并且传递不断的去唤醒阻塞的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
- setHeadAndPropagate(Node node, int propagate)
// 和互斥锁的区别,设置头结点并且传递不断的去唤醒阻塞的线程
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 释放
doReleaseShared();
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}