并发系列(3)之 AbstractQueuedSynchronizer 源码分析
本文将主要讲述 abstractqueuedsynchronizer
的内部结构和实现逻辑,在看本文之前最好先了解一下 clh
就是根据 clh
队列锁的变种实现的,因为本身 aqs
比较复杂不容易看清楚他本身的实现逻辑,所以查看 clh
队列锁的实现,可以帮助我们理清楚他内部的关系;关于队列锁的内容可以参考 ,clh、mcs 队列锁简介 ;
一、aqs 结构概述
在 jdk 中除 synchronized
- 面向用户的逻辑部分(对于锁而言就是 lock interface);
- 面向底层的线程调度部分;
而 abstractqueuedsynchronizer
即同步队列则是 doug lea 大神为我们提供的底层线程调度的封装;aqs
本身是根据 clh
队列锁实现的,这一点在注释中有详细的介绍,clh、mcs 队列锁简介 ;
队列锁就是一个单项链表,想要获取锁的线程封装为节点添加到尾部,然后阻塞检查前任节点的状态 (一定要注意是前任节点,因为这样更容易实现取消、超时等功能,同时这也是选择 clh 队列锁的原因),而头结点则是当前已经获得锁的线程,其主要作用是通知后继节点(也就是说在没有发生竞争的情况下,是不需要头结点的,这一点后面会详细分析);
而对于 aqs
public abstract class abstractqueuedsynchronizer extends abstractownablesynchronizer implements java.io.serializable { protected abstractqueuedsynchronizer() { } private transient volatile node head; // 懒加载,只有在发生竞争的时候才会初始化; private transient volatile node tail; // 同样懒加载; private volatile int state; // 自定义的锁状态,可以用来表示锁的个数,以实现互斥锁和共享锁; }
1. node 结构
static final class node { static final node shared = new node(); // 共享模式 static final node exclusive = null; // 互斥模式 static final int cancelled = 1; // 表示线程取消获取锁 static final int signal = -1; // 表示后继节点需要被唤醒 static final int condition = -2; // 表示线程位于条件队列 static final int propagate = -3; // 共享模式下节点的最终状态,确保在doreleaseshared的时候将共享状态继续传播下去 /** * 节点状态(初始为0,使用cas原则更新) * 互斥模式:0,signal,cancelled * 共享模式:0,signal,cancelled,propagate * 条件队列:condition */ volatile int waitstatus; volatile node prev; // 前继节点 volatile node next; // 后继节点 volatile thread thread; // 取锁线程 node nextwaiter; // 模式标识,取值:shared、exclusive // used by addwaiter,用于添加同队队列 node(thread thread, node mode) { this.nextwaiter = mode; this.thread = thread; } // used by condition,同于添加条件队列 node(thread thread, int waitstatus) { this.waitstatus = waitstatus; this.thread = thread; } }
根据上面的代码和注释已经可以看到 aqs
: 表示锁的资源状态,是我们上面所说的面向用户逻辑的部分; -
: 表示节点在队列中的状态,是面向底层线程调度的部分;
2. aqs 运行逻辑
aqs 的运行逻辑可以简单表述为:
如果你熟悉 synchronized
,应该已经发现他们的运行逻辑其实是差不多的,都用同步队列和条件队列,值得注意的是这里的条件队列和 condition
一一对应,可能有多个;根据上图可以将 aqs
- 同步状态的原子性管理;
- 线程的阻塞与解除阻塞;
- 队列的管理;
3. 入队
因为独占模式和共享模式彼此独立可以同时使用,所以在入队的时候需要首先指定 node
的类型,同时入队的时候有竞争的可能,所以需要 cas 入队;
private node addwaiter(node mode) { node node = new node(thread.currentthread(), mode); // shared、exclusive // try the fast path of enq; backup to full enq on failure node pred = tail; if (pred != null) { node.prev = pred; if (compareandsettail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
private node enq(final node node) { for (;;) { node t = tail; if (t == null) { // must initialize if (compareandsethead(new node())) // 此时head和tail才初始化 tail = head; } else { node.prev = t; if (compareandsettail(t, node)) { t.next = node; return t; } } } }
而对于出队则稍微复杂一点,独占模式下直接出队,因为没有竞争;共享模式下,则需要 cas 设置头结点,因为可能对有多个节点同时出队,同时还需要向后传播状态,保证后面的线程可以及时获得锁;此外还可能发生中断或者异常出队,此时则需要考虑头尾的情况,保证不会影响队列的结构;具体内容将会在源码中一次讲解;
1. 应用
public class mutex implements lock { private final sync sync = new sync(); private static final int lock = 1; private static final int unlock = 0; @override public void lock() { sync.acquire(lock); } @override public boolean trylock() { return sync.tryacquire(lock); } @override public void unlock() { sync.release(unlock); } private static class sync extends abstractqueuedsynchronizer { @override protected boolean isheldexclusively() { return getstate() == lock; } @override public boolean tryacquire(int acquires) { if (compareandsetstate(unlock, lock)) { setexclusiveownerthread(thread.currentthread()); return true; } return false; } @override protected boolean tryrelease(int releases) { if (getstate() == unlock) throw new illegalmonitorstateexception(); setexclusiveownerthread(null); setstate(unlock); return true; } } }
注意代码中特意将 abstractqueuedsynchronizer.state
,主要是便于理解 state
2. 获取锁
tryacquire: 快速尝试取锁,成功时返回true;这是独占模式必须要重写的方法,其他方式获取锁时,也会先尝试快速获取锁;同时
也就决定了,这个锁时公平锁/非公平锁,可重入锁/不重冲入锁等;(比如上面的实例就是不可重入非公平锁,具体分析以后还会详细讲解) - acquire: 不响应中断,阻塞获取锁;
- acquireinterruptibly: 响应中断,阻塞获取锁;
- tryacquirenanos: 响应中断,超时阻塞获取锁;
acquire 方法
public final void acquire(int arg) { if (!tryacquire(arg) && // 首先尝试快速获取锁 acquirequeued(addwaiter(node.exclusive), arg)) // 失败后入队,然后阻塞获取 selfinterrupt(); // 最后如果取锁的有中断,则重新设置中断 }
final boolean acquirequeued(final node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 只要取锁过程中有一次中断,返回时都要重新设置中断 for (;;) { final node p = node.predecessor(); // 一直阻塞到前继节点为头结点 if (p == head && tryacquire(arg)) { // 获取同步状态 sethead(node); // 设置头结点,此时头部不存在竞争,直接设置 // next 主要起优化作用,并且在入队的时候next不是cas设置 // 也就是通过next不一定可以准确取到后继节点,所以在唤醒的时候不能依赖next,需要反向遍历 p.next = null; // help gc failed = false; return interrupted; } if (shouldparkafterfailedacquire(p, node) && // 判断并整理前继节点 parkandcheckinterrupt()) // 当循环最多第二次的时候,必然阻塞 interrupted = true; } } finally { if (failed) // 异常时取消获取 cancelacquire(node); } }
private static boolean shouldparkafterfailedacquire(node pred, node node) { int ws = pred.waitstatus; if (ws == node.signal) return true; if (ws > 0) { // 大于0说明,前继节点异常或者取消获取,直接跳过; do { node.prev = pred = pred.prev; // 跳过pred并建立连接 } while (pred.waitstatus > 0); pred.next = node; } else { compareandsetwaitstatus(pred, ws, node.signal); // 标记后继节点需要唤醒 } return false; }
其中 node.prev = pred = pred.prev;
相关的内存分析可以查看 java 连等赋值问题;
acquireinterruptibly 方法
public final void acquireinterruptibly(int arg) throws interruptedexception { if (thread.interrupted()) throw new interruptedexception(); // 中断退出 if (!tryacquire(arg)) // 获取同步状态 doacquireinterruptibly(arg); // 中断获取 }
private void doacquireinterruptibly(int arg) throws interruptedexception { final node node = addwaiter(node.exclusive); // 加入队尾 boolean failed = true; try { for (;;) { final node p = node.predecessor(); if (p == head && tryacquire(arg)) { sethead(node); p.next = null; // help gc failed = false; return; } if (shouldparkafterfailedacquire(p, node) && // 判断并整理前继节点 parkandcheckinterrupt()) // 等待 throw new interruptedexception(); } } finally { if (failed) cancelacquire(node); } }
tryacquirenanos 方法
public final boolean tryacquirenanos(int arg, long nanostimeout) throws interruptedexception { if (thread.interrupted()) throw new interruptedexception(); return tryacquire(arg) || doacquirenanos(arg, nanostimeout); }
private boolean doacquirenanos(int arg, long nanostimeout) throws interruptedexception { if (nanostimeout <= 0l) return false; final long deadline = system.nanotime() + nanostimeout; final node node = addwaiter(node.exclusive); boolean failed = true; try { for (;;) { final node p = node.predecessor(); if (p == head && tryacquire(arg)) { sethead(node); p.next = null; // help gc failed = false; return true; } nanostimeout = deadline - system.nanotime(); if (nanostimeout <= 0l) return false; // 超时退出 if (shouldparkafterfailedacquire(p, node) && nanostimeout > spinfortimeoutthreshold) locksupport.parknanos(this, nanostimeout); if (thread.interrupted()) throw new interruptedexception(); } } finally { if (failed) cancelacquire(node); } }
3. 释放锁
public final boolean release(int arg) { if (tryrelease(arg)) { // 由用户重写,尝试释放 node h = head; if (h != null && h.waitstatus != 0) unparksuccessor(h); // 唤醒后继节点 return true; } return false; }
1. 应用
public class sharelock implements lock { private syn sync; public sharelock(int count) { this.sync = new syn(count); } @override public void lock() { sync.acquireshared(1); } @override public void lockinterruptibly() throws interruptedexception { sync.acquiresharedinterruptibly(1); } @override public boolean trylock() { return sync.tryacquireshared(1) >= 0; } @override public boolean trylock(long time, timeunit unit) throws interruptedexception { return sync.tryacquiresharednanos(1, unit.tonanos(time)); } @override public void unlock() { sync.releaseshared(1); } @override public condition newcondition() { throw new unsupportedoperationexception(); } private static final class syn extends abstractqueuedsynchronizer { private static final long serialversionuid = 5854536238831876527l; syn(int count) { if (count <= 0) { throw new illegalargumentexception("count must large than zero."); } setstate(count); } @override public int tryacquireshared(int reducecount) { for (; ; ) { int current = getstate(); int newcount = current - reducecount; //如果新的状态小于0 则返回值,则表示没有锁资源,直接返回 if (newcount < 0 || compareandsetstate(current, newcount)) { return newcount; } } } @override public boolean tryreleaseshared(int retruncount) { for (; ; ) { int current = getstate(); int newcount = current + retruncount; if (compareandsetstate(current, newcount)) { return true; } } } } }
上述代码中的 abstractqueuedsynchronizer.state
2. 获取锁
- tryacquireshared: 快速尝试取锁,由用户重写
- acquireshared: 不响应中断,阻塞获取锁;
- acquiresharedinterruptibly: 响应中断,阻塞获取锁;
- tryacquiresharednanos: 响应中断,超时阻塞获取锁;
tryacquireshared 方法
@override public int tryacquireshared(int reducecount) { for (; ; ) { int current = getstate(); int newcount = current - reducecount; //如果新的状态小于0 则返回值,则表示没有锁资源,直接返回 if (newcount < 0 || compareandsetstate(current, newcount)) { return newcount; } } }
需要注意的是 tryacquireshared
acquireshared 方法
public final void acquireshared(int arg) { if (tryacquireshared(arg) < 0) // 快速获取失败 doacquireshared(arg); // 阻塞获取锁 }
private void doacquireshared(int arg) { final node node = addwaiter(node.shared); boolean failed = true; try { boolean interrupted = false; for (;;) { final node p = node.predecessor(); if (p == head) { int r = tryacquireshared(arg); if (r >= 0) { setheadandpropagate(node, r); // 设置头结点,并是情况将信号传播下去 p.next = null; // help gc if (interrupted) selfinterrupt(); // 重新设置中断状态 failed = false; return; } } if (shouldparkafterfailedacquire(p, node) && parkandcheckinterrupt()) interrupted = true; } } finally { if (failed) cancelacquire(node); } }
// propagate 表示线程获取锁后,共享锁剩余的锁资源 private void setheadandpropagate(node node, int propagate) { node h = head; // record old head for check below sethead(node); // propagate > 0 :表示还有剩余的资源 // h.waitstatus < 0 : 表示后继节点需要被唤醒 // 其余还做了很多保守判断,确保后面的节点能及时那到锁 if (propagate > 0 || h == null || h.waitstatus < 0 || (h = head) == null || h.waitstatus < 0) { node s = node.next; if (s == null || s.isshared()) doreleaseshared(); // 唤醒后继节点 } }
- 共享模式可以有多个锁
- 设置头结点的时候,同时还要将状态传播下去
3. 释放锁
同样 tryreleaseshared
是由用户自己重写的,这里需要注意的是如果不能确保释放成功(因为共享模式释放锁的时候可能有竞争,所以可能失败),则在外层 lock
@override public boolean tryreleaseshared(int retruncount) { for (; ; ) { int current = getstate(); int newcount = current + retruncount; if (compareandsetstate(current, newcount)) { return true; } } }
releaseshared 方法
public final boolean releaseshared(int arg) { if (tryreleaseshared(arg)) { // 尝试取锁成功,此时锁资源已重新设置 doreleaseshared(); // 唤醒后继节点 return true; } return false; }
- 第一次头结点释放锁,然后唤醒后继节点
- 第二次后继设置头结点
最终使得头结点的状态必然是 propagate
private void doreleaseshared() { for (;;) { node h = head; if (h != null && h != tail) { int ws = h.waitstatus; if (ws == node.signal) { if (!compareandsetwaitstatus(h, node.signal, 0)) continue; // loop to recheck cases unparksuccessor(h); } else if (ws == 0 && !compareandsetwaitstatus(h, 0, node.propagate)) continue; // loop on failed cas } if (h == head) // loop if head changed break; } }
1. conditionobject 结构
public class conditionobject implements condition, java.io.serializable { private transient node firstwaiter; private transient node lastwaiter; ... }
如代码所示条件队列是一个由 node
组成的链表,注意这里的链表不同于同步队列,是通过 nextwaiter
连接的,在同步队列中 nextwaiter
- node.waitstatus = node.condition;
- node.next = null & node.prev= null;
2. await
public final void await() throws interruptedexception { if (thread.interrupted()) throw new interruptedexception(); node node = addconditionwaiter(); // 添加节点到条件队列 int savedstate = fullyrelease(node); // 确保释放锁,并唤醒后继节点 int interruptmode = 0; while (!isonsyncqueue(node)) { // node 不在同步队列中 locksupport.park(this); // 阻塞 if ((interruptmode = checkinterruptwhilewaiting(node)) != 0) break; } if (acquirequeued(node, savedstate) && interruptmode != throw_ie) interruptmode = reinterrupt; if (node.nextwaiter != null) // clean up if cancelled unlinkcancelledwaiters(); if (interruptmode != 0) reportinterruptafterwait(interruptmode); }
3. signal
public final void signal() { if (!isheldexclusively()) throw new illegalmonitorstateexception(); node first = firstwaiter; if (first != null) dosignal(first); // 从头结点一次唤醒 } private void dosignal(node first) { do { if ( (firstwaiter = first.nextwaiter) == null) lastwaiter = null; first.nextwaiter = null; } while (!transferforsignal(first) && // 将节点移动到同步节点中 (first = firstwaiter) != null); }
- abstractqueuedsynchronizer 通过私有变量继承方式使用
- 观察 abstractqueuedsynchronizer ,其实和 synchronized 的结构基本相同,但是 synchronized 还会自动根据使用情况进行锁升级
- 此外本文的主要参考资料是《java 并发编程的艺术》,有兴趣的可以自行查看;
并发系列(3)之 AbstractQueuedSynchronizer 源码分析
并发编程(九)—— Java 并发队列 BlockingQueue 实现之 LinkedBlockingQueue 源码分析
并发编程(五)——AbstractQueuedSynchronizer 之 ReentrantLock源码分析