CountDownLatch源码解析之countDown()
countdownlatch 源码解析—— countdown()
从源码层面说了一下countdownlatch 中 await() 的原理。这篇文章说一下countdown() 。
public void countdown() { //countdownlatch sync.releaseshared(1); } ↓ public final boolean releaseshared(int arg) { //aqs if (tryreleaseshared(arg)) { doreleaseshared(); return true; } return false; } ↓ protected boolean tryreleaseshared(int releases) { //countdownlatch.sync // decrement count; signal when transition to zero for (;;) { int c = getstate(); if (c == 0) return false; int nextc = c-1; if (compareandsetstate(c, nextc)) return nextc == 0; } }
通过构造器 countdownlatch end = new countdownlatch(2); state 被设置为2,所以c == 2,nextc = 2-1,
然后通过下面这个cas操作将state设置为1。
protected final boolean compareandsetstate(int expect, int update) { // see below for intrinsics setup to support this return unsafe.compareandswapint(this, stateoffset, expect, update); }
此时nextc还不为0,返回false。一直等到countdown() 方法被调用两次,state == 0,nextc ==0,此时返回true。
进入doreleaseshared()方法。
doreleaseshared(); ↓ private void doreleaseshared() { /* * ensure that a release propagates, even if there are other * in-progress acquires/releases. this proceeds in the usual * way of trying to unparksuccessor of head if it needs * signal. but if it does not, status is set to propagate to * ensure that upon release, propagation continues. * additionally, we must loop in case a new node is added * while we are doing this. also, unlike other uses of * unparksuccessor, we need to know if cas to reset status * fails, if so rechecking. */ for (;;) { node h = head; if (h != null && h != tail) { int ws = h.waitstatus; if (ws == node.signal) { if (!compareandsetwaitstatus(h, node.signal, 0)) continue; // loop to recheck cases unparksuccessor(h); } else if (ws == 0 && !compareandsetwaitstatus(h, 0, node.propagate)) continue; // loop on failed cas } if (h == head) // loop if head changed break; } }
回顾一下此时的等待队列模型。
+--------------------------+ prev +------------------+ head | waitstatus = node.signal | <---- node(tail) | currentthread | +--------------------------+ +------------------+
此时head 不为null,也不为tail,waitstatus == node.signal,所以进入 if (!compareandsetwaitstatus(h, node.signal, 0)) 这个判断。
if (!compareandsetwaitstatus(h, node.signal, 0)) ↓ /** * cas waitstatus field of a node. */ private static final boolean compareandsetwaitstatus(node node, int expect, int update) { return unsafe.compareandswapint(node, waitstatusoffset, expect, update); }
这个cas 操作将 state 设置为 0 ,也就是说此时head 中的 waitstatus 是0.此时队列模型如下所示
+----------------+ prev +------------------+ head | waitstatus = 0 | <---- node(tail) | currentthread | +----------------+ +------------------+
该方法返回true。进入unparksuccessor(h);
unparksuccessor(h); ↓ private void unparksuccessor(node node) { /* * if status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. it is ok if this * fails or if status is changed by waiting thread. */ int ws = node.waitstatus; if (ws < 0) compareandsetwaitstatus(node, ws, 0); /* * thread to unpark is held in successor, which is normally * just the next node. but if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ node s = node.next; if (s == null || s.waitstatus > 0) { s = null; for (node t = tail; t != null && t != node; t = t.prev) if (t.waitstatus <= 0) s = t; } if (s != null) locksupport.unpark(s.thread); }
s 就是head的后继结点,也就是装有当前线程的结点。s != null ,并且s.waitstatus ==0 ,所以进入 locksupport.unpark(s.thread);
public static void unpark(thread thread) { if (thread != null) unsafe.unpark(thread); }
也就是unlock 被阻塞的线程。裁判被允许吹哨了!
countdown() 的原理就此就非常清晰了。
每执行一次countdown() 方法,state 就是减1,直到state == 0,则开始释放被阻塞在队列中的线程,根据前驱结点中waitstatus的状态,释放后续结点中的线程。
ok,回到上一篇文章的问题,什么时候跳出下面这个循环(await方法中的循环)
for (;;) { final node p = node.predecessor(); if (p == head) { int r = tryacquireshared(arg); if (r >= 0) { setheadandpropagate(node, r); p.next = null; // help gc failed = false; return; } } if (shouldparkafterfailedacquire(p, node) && parkandcheckinterrupt()) throw new interruptedexception(); }
此时state == 0,所以进入 setheadandpropagate 方法。
setheadandpropagate(node, r); ↓ private void setheadandpropagate(node node, int propagate) { node h = head; // record old head for check below sethead(node); /* * try to signal next queued node if: * propagation was indicated by caller, * or was recorded (as h.waitstatus either before * or after sethead) by a previous operation * (note: this uses sign-check of waitstatus because * propagate status may transition to signal.) * and * the next node is waiting in shared mode, * or we don't know, because it appears null * * the conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitstatus < 0 || (h = head) == null || h.waitstatus < 0) { node s = node.next; if (s == null || s.isshared()) doreleaseshared(); } } ↓ private void sethead(node node) { head = node; node.thread = null; node.prev = null; }
这个方法将head 的后继结点变为head。该方法过后,又将node的next结点设置为null,模型变成下图
prev +---------+ next null <---- node(tail/head) | null | ----> null +---------+
也就是node head tail 什么的都被置为null,等待gc回收了,这个时候return,跳出了for循环,队列被清空。
下面演示一下整个过程
setheadandpropagate(node, r); +----------------+ head(tail) | waitstatus=0 | | thread =null | +----------------+ ↓ +----------------+ +----------------+ | waitstatus=0 | prev | waitstatus=0 | head(tail) | thread =null | <---- node | currentthread | +----------------+ +----------------+ ↓ +----------------+ +----------------+ | waitstatus=0 | prev | waitstatus=0 | head | thread =null | <---- node(tail) | currentthread | +----------------+ +----------------+ ↓ +----------------+ +----------------+ | node.signal | prev | waitstatus=0 | head | thread =null | <---- node(tail) | currentthread | +----------------+ +----------------+ ↓ +----------------+ +----------------+ | waitstatus=0 | prev | waitstatus=0 | head | thread =null | <---- node(tail) | currentthread | +----------------+ +----------------+ ↓ +----------------+ prev | waitstatus=0 | next null <---- node(tail/head) | null | ----> null +----------------+
countdownlatch 的核心就是一个阻塞线程队列,这是由链表构造而成的队列,里面包含thread 和 waitstatus,其中waitstatus说明了后继结点线程状态。
state 是一个非常重要的标志,构造时,设置为对应的n值,如果n != 0,阻塞队列将一直阻塞,除非中断线程。
每次调用countdown() 方法,就是将state-1,而调用await() 方法就是将调用该方法的线程加入到阻塞队列,直到state==0,才能释放线程。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
推荐阅读
-
CountDownLatch源码解析之await()
-
CountDownLatch源码解析之countDown()
-
Spring5源码解析4-refresh方法之invokeBeanFactoryPostProcessors
-
Laravel框架源码解析之模型Model原理与用法解析
-
Spring源码解析之ConfigurableApplicationContext
-
angularjs 源码解析之injector
-
angularjs 源码解析之scope
-
Java并发之ReentrantLock类源码解析
-
Java并发系列之CountDownLatch源码分析
-
Android图片加载利器之Picasso源码解析