欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

并发系列(3)之 AbstractQueuedSynchronizer 源码分析

程序员文章站 2024-03-30 23:45:09
本文将主要讲述 的内部结构和实现逻辑,在看本文之前最好先了解一下 队列锁, 就是根据 队列锁的变种实现的,因为本身 比较复杂不容易看清楚他本身的实现逻辑,所以查看 队列锁的实现,可以帮助我们理清楚他内部的关系;关于队列锁的内容可以参考 , "CLH、MCS 队列锁简介" ; 一、AQS 结构概述 在 ......

本文将主要讲述 abstractqueuedsynchronizer 的内部结构和实现逻辑,在看本文之前最好先了解一下 clh 队列锁,abstractqueuedsynchronizer 就是根据 clh 队列锁的变种实现的,因为本身 aqs 比较复杂不容易看清楚他本身的实现逻辑,所以查看 clh 队列锁的实现,可以帮助我们理清楚他内部的关系;关于队列锁的内容可以参考 ,clh、mcs 队列锁简介

一、aqs 结构概述

在 jdk 中除 synchronized 内置锁外,其他的锁和同步组件,基本可以分为:

  1. 面向用户的逻辑部分(对于锁而言就是 lock interface);
  2. 面向底层的线程调度部分;

abstractqueuedsynchronizer 即同步队列则是 doug lea 大神为我们提供的底层线程调度的封装;aqs 本身是根据 clh 队列锁实现的,这一点在注释中有详细的介绍,clh、mcs 队列锁简介

并发系列(3)之 AbstractQueuedSynchronizer 源码分析

简单来讲,clh 队列锁就是一个单项链表,想要获取锁的线程封装为节点添加到尾部,然后阻塞检查前任节点的状态 (一定要注意是前任节点,因为这样更容易实现取消、超时等功能,同时这也是选择 clh 队列锁的原因),而头结点则是当前已经获得锁的线程,其主要作用是通知后继节点(也就是说在没有发生竞争的情况下,是不需要头结点的,这一点后面会详细分析);


而对于 aqs 的结构大致可以表述为:

并发系列(3)之 AbstractQueuedSynchronizer 源码分析


public abstract class abstractqueuedsynchronizer extends abstractownablesynchronizer implements java.io.serializable {
  protected abstractqueuedsynchronizer() { }
  
  private transient volatile node head;  // 懒加载,只有在发生竞争的时候才会初始化;
  private transient volatile node tail;  // 同样懒加载;
  private volatile int state;  // 自定义的锁状态,可以用来表示锁的个数,以实现互斥锁和共享锁;
}

这里的可以直观的看到链表结构的变化,其实next链表只是相当于遍历的优化,而node节点的变化才是主要的更新;

1. node 结构

static final class node {
  static final node shared = new node();  // 共享模式
  static final node exclusive = null;     // 互斥模式

  static final int cancelled =  1; // 表示线程取消获取锁
  static final int signal    = -1; // 表示后继节点需要被唤醒
  static final int condition = -2; // 表示线程位于条件队列
  static final int propagate = -3; // 共享模式下节点的最终状态,确保在doreleaseshared的时候将共享状态继续传播下去

  /**
   * 节点状态(初始为0,使用cas原则更新)
   * 互斥模式:0,signal,cancelled
   * 共享模式:0,signal,cancelled,propagate
   * 条件队列:condition
   */
  volatile int waitstatus;
  
  volatile node prev;     // 前继节点
  volatile node next;     // 后继节点
  volatile thread thread; // 取锁线程
  node nextwaiter;        // 模式标识,取值:shared、exclusive

  // used by addwaiter,用于添加同队队列
  node(thread thread, node mode) {   
    this.nextwaiter = mode;
    this.thread = thread;
  }

  // used by condition,同于添加条件队列
  node(thread thread, int waitstatus) { 
    this.waitstatus = waitstatus;
    this.thread = thread;
  }
}

根据上面的代码和注释已经可以看到 aqs 为我们提供了两种模式,独占模式和共享模式(彼此独立可以同时使用);其中:

  • abstractqueuedsynchronizer.state : 表示锁的资源状态,是我们上面所说的面向用户逻辑的部分;
  • node.waitstatus : 表示节点在队列中的状态,是面向底层线程调度的部分;

这两个变量一定要分清楚,在后面的代码中也很容易弄混;


2. aqs 运行逻辑

aqs 的运行逻辑可以简单表述为:

并发系列(3)之 AbstractQueuedSynchronizer 源码分析

如果你熟悉 synchronized ,应该已经发现他们的运行逻辑其实是差不多的,都用同步队列和条件队列,值得注意的是这里的条件队列和 condition 一一对应,可能有多个;根据上图可以将 aqs 提供的功能总结为:

  • 同步状态的原子性管理;
  • 线程的阻塞与解除阻塞;
  • 队列的管理;


3. 入队

因为独占模式和共享模式彼此独立可以同时使用,所以在入队的时候需要首先指定 node 的类型,同时入队的时候有竞争的可能,所以需要 cas 入队;

private node addwaiter(node mode) {
  node node = new node(thread.currentthread(), mode); // shared、exclusive
  // try the fast path of enq; backup to full enq on failure
  node pred = tail;
  if (pred != null) {
    node.prev = pred;
    if (compareandsettail(pred, node)) {
      pred.next = node;
      return node;
    }
  }
  enq(node);
  return node;
}

代码中注释也说明了,此处快速尝试入队,是一种优化手段,因为就一般情况而言大多数时候是没有竞争的;失败后在循环入队;

private node enq(final node node) {
  for (;;) {
    node t = tail;
    if (t == null) { // must initialize
      if (compareandsethead(new node())) // 此时head和tail才初始化
        tail = head;
    } else {
      node.prev = t;
      if (compareandsettail(t, node)) {
        t.next = node;
        return t;
      }
    }
  }
}

而对于出队则稍微复杂一点,独占模式下直接出队,因为没有竞争;共享模式下,则需要 cas 设置头结点,因为可能对有多个节点同时出队,同时还需要向后传播状态,保证后面的线程可以及时获得锁;此外还可能发生中断或者异常出队,此时则需要考虑头尾的情况,保证不会影响队列的结构;具体内容将会在源码中一次讲解;


二、独占模式

1. 应用

public class mutex implements lock {
  private final sync sync = new sync();
  private static final int lock = 1;
  private static final int unlock = 0;

  @override
  public void lock() {
    sync.acquire(lock);
  }

  @override
  public boolean trylock() {
    return sync.tryacquire(lock);
  }

  @override
  public void unlock() {
    sync.release(unlock);
  }

  private static class sync extends abstractqueuedsynchronizer {
    @override
    protected boolean isheldexclusively() {
      return getstate() == lock;
    }

    @override
    public boolean tryacquire(int acquires) {
      if (compareandsetstate(unlock, lock)) {
        setexclusiveownerthread(thread.currentthread());
        return true;
      }
      return false;
    }

    @override
    protected boolean tryrelease(int releases) {
      if (getstate() == unlock)
        throw new illegalmonitorstateexception();
      setexclusiveownerthread(null);
      setstate(unlock);
      return true;
    }
  }
}

注意代码中特意将 abstractqueuedsynchronizer.state 取值定为lock\unlock ,主要是便于理解 state 的含义,在互斥锁中可以任意取值,当然也可以是负数,但是一般情况下令其表示为锁的资源数量(也就是0、1)和共享模式对比,比较容易理解;

2. 获取锁

对于独占模式取锁而言有一共有四中方式,

  • tryacquire: 快速尝试取锁,成功时返回true;这是独占模式必须要重写的方法,其他方式获取锁时,也会先尝试快速获取锁;同时 tryacquire 也就决定了,这个锁时公平锁/非公平锁,可重入锁/不重冲入锁等;(比如上面的实例就是不可重入非公平锁,具体分析以后还会详细讲解)
  • acquire: 不响应中断,阻塞获取锁;
  • acquireinterruptibly: 响应中断,阻塞获取锁;
  • tryacquirenanos: 响应中断,超时阻塞获取锁;


acquire 方法

流程图:

并发系列(3)之 AbstractQueuedSynchronizer 源码分析

源码分析:

public final void acquire(int arg) {
  if (!tryacquire(arg) &&                                                           // 首先尝试快速获取锁
       acquirequeued(addwaiter(node.exclusive), arg)) // 失败后入队,然后阻塞获取
    selfinterrupt();                                  // 最后如果取锁的有中断,则重新设置中断
}
final boolean acquirequeued(final node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;           // 只要取锁过程中有一次中断,返回时都要重新设置中断
    for (;;) {
      final node p = node.predecessor();   // 一直阻塞到前继节点为头结点
      if (p == head && tryacquire(arg)) {  // 获取同步状态
        sethead(node);                     // 设置头结点,此时头部不存在竞争,直接设置
        // next 主要起优化作用,并且在入队的时候next不是cas设置
        // 也就是通过next不一定可以准确取到后继节点,所以在唤醒的时候不能依赖next,需要反向遍历
        p.next = null; // help gc          
        failed = false;
        return interrupted;
      }
      if (shouldparkafterfailedacquire(p, node) && // 判断并整理前继节点
        parkandcheckinterrupt())                   // 当循环最多第二次的时候,必然阻塞
        interrupted = true;
    }
  } finally {
    if (failed)  // 异常时取消获取
      cancelacquire(node);
  }
}
private static boolean shouldparkafterfailedacquire(node pred, node node) {
  int ws = pred.waitstatus;
  if (ws == node.signal) return true;
  if (ws > 0) {  // 大于0说明,前继节点异常或者取消获取,直接跳过;
    do {
      node.prev = pred = pred.prev;  // 跳过pred并建立连接
    } while (pred.waitstatus > 0);
    pred.next = node;
  } else {
    compareandsetwaitstatus(pred, ws, node.signal);  // 标记后继节点需要唤醒
  }
  return false;
}

其中 node.prev = pred = pred.prev; 相关的内存分析可以查看 java 连等赋值问题


acquireinterruptibly 方法

流程图:

并发系列(3)之 AbstractQueuedSynchronizer 源码分析

源码分析:

public final void acquireinterruptibly(int arg) throws interruptedexception {
  if (thread.interrupted()) throw new interruptedexception();  // 中断退出
  if (!tryacquire(arg))           // 获取同步状态
    doacquireinterruptibly(arg);  // 中断获取
}
private void doacquireinterruptibly(int arg) throws interruptedexception {
  final node node = addwaiter(node.exclusive);   // 加入队尾
  boolean failed = true;
  try {
    for (;;) {
      final node p = node.predecessor();
      if (p == head && tryacquire(arg)) {
        sethead(node);
        p.next = null; // help gc
        failed = false;
        return;
      }
      if (shouldparkafterfailedacquire(p, node) &&   // 判断并整理前继节点
        parkandcheckinterrupt())                     // 等待
        throw new interruptedexception();
    }
  } finally {
    if (failed)
      cancelacquire(node);
  }
}


tryacquirenanos 方法

流程图:

并发系列(3)之 AbstractQueuedSynchronizer 源码分析

源码分析:

public final boolean tryacquirenanos(int arg, long nanostimeout) throws interruptedexception {
  if (thread.interrupted()) throw new interruptedexception();
  return tryacquire(arg) ||
    doacquirenanos(arg, nanostimeout);
}
private boolean doacquirenanos(int arg, long nanostimeout) throws interruptedexception {
  if (nanostimeout <= 0l) return false;
  final long deadline = system.nanotime() + nanostimeout;
  final node node = addwaiter(node.exclusive);
  boolean failed = true;
  try {
    for (;;) {
      final node p = node.predecessor();
      if (p == head && tryacquire(arg)) {
        sethead(node);
        p.next = null; // help gc
        failed = false;
        return true;
      }
      nanostimeout = deadline - system.nanotime();
      if (nanostimeout <= 0l) return false;          // 超时退出
      if (shouldparkafterfailedacquire(p, node) &&
        nanostimeout > spinfortimeoutthreshold)
        locksupport.parknanos(this, nanostimeout);
      if (thread.interrupted())
        throw new interruptedexception();
    }
  } finally {
    if (failed)
      cancelacquire(node);
  }
}


3. 释放锁

释放锁时,判断有后继节点需要唤醒,则唤醒后继节点,然后退出;有唤醒的后继节点重新设置头结点,并标记状态

public final boolean release(int arg) {
  if (tryrelease(arg)) {   // 由用户重写,尝试释放
    node h = head;
    if (h != null && h.waitstatus != 0)
      unparksuccessor(h);  // 唤醒后继节点
    return true;
  }
  return false;
}   


三、共享模式

1. 应用

public class sharelock implements lock {
  private syn sync;

  public sharelock(int count) { this.sync = new syn(count); }

  @override
  public void lock() { sync.acquireshared(1); }

  @override
  public void lockinterruptibly() throws interruptedexception {
    sync.acquiresharedinterruptibly(1);
  }

  @override
  public boolean trylock() { return sync.tryacquireshared(1) >= 0; }

  @override
  public boolean trylock(long time, timeunit unit) throws interruptedexception {
    return sync.tryacquiresharednanos(1, unit.tonanos(time));
  }

  @override
  public void unlock() { sync.releaseshared(1); }

  @override
  public condition newcondition() { throw new unsupportedoperationexception(); }

  private static final class syn extends abstractqueuedsynchronizer {
    private static final long serialversionuid = 5854536238831876527l;
    syn(int count) {
      if (count <= 0) {
        throw new illegalargumentexception("count must large than zero.");
      }
      setstate(count);
    }

    @override
    public int tryacquireshared(int reducecount) {
      for (; ; ) {
        int current = getstate();
        int newcount = current - reducecount;
        //如果新的状态小于0 则返回值,则表示没有锁资源,直接返回
        if (newcount < 0 || compareandsetstate(current, newcount)) {
          return newcount;
        }
      }
    }

    @override
    public boolean tryreleaseshared(int retruncount) {
      for (; ; ) {
        int current = getstate();
        int newcount = current + retruncount;
        if (compareandsetstate(current, newcount)) {
          return true;
        }
      }
    }
  }
}

上述代码中的 abstractqueuedsynchronizer.state 表示锁的资源数,但是仍然是不可重入的;


2. 获取锁

同样对于共享模式取锁也有四中方式:

  • tryacquireshared: 快速尝试取锁,由用户重写
  • acquireshared: 不响应中断,阻塞获取锁;
  • acquiresharedinterruptibly: 响应中断,阻塞获取锁;
  • tryacquiresharednanos: 响应中断,超时阻塞获取锁;

tryacquireshared 方法

@override
public int tryacquireshared(int reducecount) {
  for (; ; ) {
    int current = getstate();
    int newcount = current - reducecount;
    //如果新的状态小于0 则返回值,则表示没有锁资源,直接返回
    if (newcount < 0 || compareandsetstate(current, newcount)) {
      return newcount;
    }
  }
}

需要注意的是 tryacquireshared 方法是快速尝试获取锁,并更新锁状态,如果失败则必然锁资源不足,返回负值;

acquireshared 方法

public final void acquireshared(int arg) {
  if (tryacquireshared(arg) < 0)  // 快速获取失败
    doacquireshared(arg);         // 阻塞获取锁
}
private void doacquireshared(int arg) {
  final node node = addwaiter(node.shared);
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      final node p = node.predecessor();
      if (p == head) {
        int r = tryacquireshared(arg);
        if (r >= 0) {
          setheadandpropagate(node, r);     // 设置头结点,并是情况将信号传播下去
          p.next = null; // help gc
          if (interrupted) selfinterrupt(); // 重新设置中断状态
          failed = false;
          return;
        }
      }
      if (shouldparkafterfailedacquire(p, node) &&
        parkandcheckinterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelacquire(node);
  }
}
// propagate 表示线程获取锁后,共享锁剩余的锁资源
private void setheadandpropagate(node node, int propagate) {
  node h = head; // record old head for check below
  sethead(node);
  
  // propagate > 0 :表示还有剩余的资源
  // h.waitstatus < 0 : 表示后继节点需要被唤醒
  // 其余还做了很多保守判断,确保后面的节点能及时那到锁
  if (propagate > 0 || h == null || h.waitstatus < 0 ||
    (h = head) == null || h.waitstatus < 0) {
    node s = node.next;
    if (s == null || s.isshared())
      doreleaseshared();  // 唤醒后继节点
  }
}

根据上面的代码可以看到,共享模式和独占模式获取锁的主要区别:

  • 共享模式可以有多个锁
  • 设置头结点的时候,同时还要将状态传播下去

其余的思路和独占模式差不多,他家可以自己看源码;

3. 释放锁

同样 tryreleaseshared 是由用户自己重写的,这里需要注意的是如果不能确保释放成功(因为共享模式释放锁的时候可能有竞争,所以可能失败),则在外层 lock 接口使用的时候,就需要额外处理;

@override
public boolean tryreleaseshared(int retruncount) {
  for (; ; ) {
    int current = getstate();
    int newcount = current + retruncount;
    if (compareandsetstate(current, newcount)) {
      return true;
    }
  }
}

releaseshared 方法

public final boolean releaseshared(int arg) {
  if (tryreleaseshared(arg)) {  // 尝试取锁成功,此时锁资源已重新设置
    doreleaseshared();          // 唤醒后继节点
    return true;
  }
  return false;
}

doreleaseshared 方法必然执行两次,

  • 第一次头结点释放锁,然后唤醒后继节点
  • 第二次后继设置头结点

最终使得头结点的状态必然是 propagate

private void doreleaseshared() {
  for (;;) {
    node h = head;
    if (h != null && h != tail) {
      int ws = h.waitstatus;
      if (ws == node.signal) {
        if (!compareandsetwaitstatus(h, node.signal, 0))
          continue;      // loop to recheck cases
        unparksuccessor(h);
      }
      else if (ws == 0 &&
           !compareandsetwaitstatus(h, 0, node.propagate))
        continue;        // loop on failed cas
    }
    if (h == head)       // loop if head changed
      break;
  }
}

四、条件队列

1. conditionobject 结构

并发系列(3)之 AbstractQueuedSynchronizer 源码分析

public class conditionobject implements condition, java.io.serializable {
  private transient node firstwaiter;
  private transient node lastwaiter;
  ...
}

如代码所示条件队列是一个由 node 组成的链表,注意这里的链表不同于同步队列,是通过 nextwaiter 连接的,在同步队列中 nextwaiter 用来表示独占和共享模式,所以区分条件队列的方法就有两个:

  • node.waitstatus = node.condition;
  • node.next = null & node.prev= null;

2. await

public final void await() throws interruptedexception {
  if (thread.interrupted()) throw new interruptedexception();
  node node = addconditionwaiter();     // 添加节点到条件队列
  int savedstate = fullyrelease(node);  // 确保释放锁,并唤醒后继节点
  int interruptmode = 0;
  while (!isonsyncqueue(node)) {        // node 不在同步队列中
    locksupport.park(this);             // 阻塞
    if ((interruptmode = checkinterruptwhilewaiting(node)) != 0)
      break;
  }
  if (acquirequeued(node, savedstate) && interruptmode != throw_ie)
    interruptmode = reinterrupt;
  if (node.nextwaiter != null) // clean up if cancelled
    unlinkcancelledwaiters();
  if (interruptmode != 0)
    reportinterruptafterwait(interruptmode);
}

3. signal

public final void signal() {
  if (!isheldexclusively()) throw new illegalmonitorstateexception();
  node first = firstwaiter;
  if (first != null)  
    dosignal(first);  // 从头结点一次唤醒
}

private void dosignal(node first) {
  do {
    if ( (firstwaiter = first.nextwaiter) == null)
      lastwaiter = null;
    first.nextwaiter = null;
  } while (!transferforsignal(first) &&  // 将节点移动到同步节点中
       (first = firstwaiter) != null);
}

因为篇幅有点长了,所以条件队列讲的也就相对简单了一点,但是大体的思路还是讲了;

总结

  • abstractqueuedsynchronizer 通过私有变量继承方式使用
  • 观察 abstractqueuedsynchronizer ,其实和 synchronized 的结构基本相同,但是 synchronized 还会自动根据使用情况进行锁升级
  • 此外本文的主要参考资料是《java 并发编程的艺术》,有兴趣的可以自行查看;