欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

CountDownLatch源码解析之countDown()

程序员文章站 2023-11-29 16:07:10
countdownlatch 源码解析—— countdown() 从源码层面说了一下countdownlatch 中 await() 的原理。这篇文章说一下countd...

countdownlatch 源码解析—— countdown()

从源码层面说了一下countdownlatch 中 await() 的原理。这篇文章说一下countdown() 。

public void countdown() { //countdownlatch
 sync.releaseshared(1);
}
 ↓
public final boolean releaseshared(int arg) { //aqs
 if (tryreleaseshared(arg)) {
  doreleaseshared();
  return true;
 }
 return false;
}
 ↓
protected boolean tryreleaseshared(int releases) { //countdownlatch.sync 
 // decrement count; signal when transition to zero
 for (;;) {
  int c = getstate();
  if (c == 0)
   return false;
  int nextc = c-1;
  if (compareandsetstate(c, nextc))
   return nextc == 0;
 }
}

通过构造器 countdownlatch end = new countdownlatch(2);  state 被设置为2,所以c == 2,nextc = 2-1,

然后通过下面这个cas操作将state设置为1。

protected final boolean compareandsetstate(int expect, int update) {
  // see below for intrinsics setup to support this
  return unsafe.compareandswapint(this, stateoffset, expect, update);
 }

此时nextc还不为0,返回false。一直等到countdown()  方法被调用两次,state == 0,nextc ==0,此时返回true。

进入doreleaseshared()方法。

doreleaseshared();
 ↓
private void doreleaseshared() {
 /*
  * ensure that a release propagates, even if there are other
  * in-progress acquires/releases. this proceeds in the usual
  * way of trying to unparksuccessor of head if it needs
  * signal. but if it does not, status is set to propagate to
  * ensure that upon release, propagation continues.
  * additionally, we must loop in case a new node is added
  * while we are doing this. also, unlike other uses of
  * unparksuccessor, we need to know if cas to reset status
  * fails, if so rechecking.
  */
 for (;;) {
  node h = head;
  if (h != null && h != tail) {
   int ws = h.waitstatus;
   if (ws == node.signal) {
    if (!compareandsetwaitstatus(h, node.signal, 0))
     continue;   // loop to recheck cases
    unparksuccessor(h);
   }
   else if (ws == 0 &&
      !compareandsetwaitstatus(h, 0, node.propagate))
    continue;    // loop on failed cas
  }
  if (h == head)     // loop if head changed
   break;
 }
}

回顾一下此时的等待队列模型。

  +--------------------------+ prev   +------------------+
head | waitstatus = node.signal | <---- node(tail) | currentthread |
  +--------------------------+     +------------------+

此时head 不为null,也不为tail,waitstatus == node.signal,所以进入 if (!compareandsetwaitstatus(h, node.signal, 0)) 这个判断。

if (!compareandsetwaitstatus(h, node.signal, 0))
 ↓
 /**
 * cas waitstatus field of a node.
 */
private static final boolean compareandsetwaitstatus(node node,
              int expect,
              int update) {
 return unsafe.compareandswapint(node, waitstatusoffset,
         expect, update);
}

这个cas 操作将 state 设置为 0 ,也就是说此时head 中的 waitstatus 是0.此时队列模型如下所示

  +----------------+ prev   +------------------+
head | waitstatus = 0 | <---- node(tail) | currentthread |
  +----------------+     +------------------+

该方法返回true。进入unparksuccessor(h);

unparksuccessor(h);
 ↓
private void unparksuccessor(node node) {
 /*
 * if status is negative (i.e., possibly needing signal) try
 * to clear in anticipation of signalling. it is ok if this
 * fails or if status is changed by waiting thread.
 */
 int ws = node.waitstatus;
 if (ws < 0)
  compareandsetwaitstatus(node, ws, 0);

 /*
 * thread to unpark is held in successor, which is normally
 * just the next node. but if cancelled or apparently null,
 * traverse backwards from tail to find the actual
 * non-cancelled successor.
 */
 node s = node.next;
 if (s == null || s.waitstatus > 0) {
  s = null;
  for (node t = tail; t != null && t != node; t = t.prev)
   if (t.waitstatus <= 0)
    s = t;
 }
 if (s != null)
  locksupport.unpark(s.thread);
}

s 就是head的后继结点,也就是装有当前线程的结点。s != null ,并且s.waitstatus ==0 ,所以进入 locksupport.unpark(s.thread);

 public static void unpark(thread thread) {
  if (thread != null)
   unsafe.unpark(thread);
 }

也就是unlock 被阻塞的线程。裁判被允许吹哨了!

countdown() 的原理就此就非常清晰了。

每执行一次countdown() 方法,state 就是减1,直到state == 0,则开始释放被阻塞在队列中的线程,根据前驱结点中waitstatus的状态,释放后续结点中的线程。

ok,回到上一篇文章的问题,什么时候跳出下面这个循环(await方法中的循环)

for (;;) {
 final node p = node.predecessor();
 if (p == head) {
  int r = tryacquireshared(arg);
  if (r >= 0) {
   setheadandpropagate(node, r);
   p.next = null; // help gc
   failed = false;
   return;
  }
 }
 if (shouldparkafterfailedacquire(p, node) &&
  parkandcheckinterrupt())
  throw new interruptedexception();
}

此时state == 0,所以进入 setheadandpropagate 方法。

setheadandpropagate(node, r);
 ↓
private void setheadandpropagate(node node, int propagate) {
 node h = head; // record old head for check below
 sethead(node);
 /*
  * try to signal next queued node if:
  * propagation was indicated by caller,
  *  or was recorded (as h.waitstatus either before
  *  or after sethead) by a previous operation
  *  (note: this uses sign-check of waitstatus because
  *  propagate status may transition to signal.)
  * and
  * the next node is waiting in shared mode,
  *  or we don't know, because it appears null
  *
  * the conservatism in both of these checks may cause
  * unnecessary wake-ups, but only when there are multiple
  * racing acquires/releases, so most need signals now or soon
  * anyway.
  */
 if (propagate > 0 || h == null || h.waitstatus < 0 ||
  (h = head) == null || h.waitstatus < 0) {
  node s = node.next;
  if (s == null || s.isshared())
   doreleaseshared();
 }
}
 ↓
private void sethead(node node) {
 head = node;
 node.thread = null;
 node.prev = null;
}

这个方法将head 的后继结点变为head。该方法过后,又将node的next结点设置为null,模型变成下图

  prev    +---------+ next
null <---- node(tail/head) | null | ----> null
       +---------+

也就是node head tail 什么的都被置为null,等待gc回收了,这个时候return,跳出了for循环,队列被清空。

下面演示一下整个过程

setheadandpropagate(node, r);

   +----------------+ 
head(tail) | waitstatus=0 |
   | thread =null |
   +----------------+
     ↓
   +----------------+   +----------------+
   | waitstatus=0 | prev  | waitstatus=0 |
head(tail) | thread =null | <---- node | currentthread |
   +----------------+   +----------------+   
     ↓
  +----------------+     +----------------+
  | waitstatus=0 | prev   | waitstatus=0 |
head | thread =null | <---- node(tail) | currentthread |
  +----------------+     +----------------+
     ↓
  +----------------+     +----------------+
  | node.signal | prev   | waitstatus=0 |
head | thread =null | <---- node(tail) | currentthread |
  +----------------+     +----------------+
       ↓
  +----------------+     +----------------+
  | waitstatus=0 | prev   | waitstatus=0 |
head | thread =null | <---- node(tail) | currentthread |
  +----------------+     +----------------+
       ↓
       +----------------+
  prev    | waitstatus=0 | next
null <---- node(tail/head) | null   | ----> null
       +----------------+

countdownlatch 的核心就是一个阻塞线程队列,这是由链表构造而成的队列,里面包含thread 和 waitstatus,其中waitstatus说明了后继结点线程状态。

state 是一个非常重要的标志,构造时,设置为对应的n值,如果n != 0,阻塞队列将一直阻塞,除非中断线程。

每次调用countdown()  方法,就是将state-1,而调用await() 方法就是将调用该方法的线程加入到阻塞队列,直到state==0,才能释放线程。

 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。