J.U.C之AQS
AQS 简介
AQS,AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),它是JUC并发包中的核心基础组件。
AQS解决了实现同步器时涉及到的大量细节问题,例如获取同步状态、FIFO同步队列。基于AQS来构建同步器可以带来很多好处。它不仅能够极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。在基于AQS构建的同步器中,只能在一个时刻发生阻塞,从而降低上下文切换的开销,提高了吞吐量。同时在设计AQS时充分考虑了可伸缩行,因此J.U.C中所有基于AQS构建的同步器均可以获得这个优势。
AQS使用一个int类型的成员变量 state 来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法 getState()
、setState(int newState)
、compareAndSetState(int expect,int update)
来对同步状态state进行操作,当然AQS可以确保对state的操作是线程安全的。
AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 指向同步队列的头节点
private transient volatile Node head;
// 指向同步队列的尾节点
private transient volatile Node tail;
// 同步状态
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}
同步队列
AQS内部维护着一个FIFO队列,该队列就是CLH同步队列。CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其定义如下:
static final class Node {
// 锁分为两种锁,一种是共享锁,另一种是独占锁
/** 标记获取共享锁失败,在队列中等待的节点 */
static final Node SHARED = new Node();
/** 标记获取独占锁失败,在队列中等待的节点 */
static final Node EXCLUSIVE = null;
// 以下是 waitStatus 的值,用来表示当前节点中现成的状态
/**
* 表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
*/
static final int CANCELLED = 1;
/**
* 表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
*/
static final int SIGNAL = -1;
/**
* 表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,
* CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
*/
static final int CONDITION = -2;
/**
* 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
*/
static final int PROPAGATE = -3;
/**
* 节点状态,包含上面四种状态(另外还有一种初始化,新结点入队时的默认状态。)
* 特别注意:它是volatile关键字修饰的,保证对其线程可见性,但是不保证原子性。
* 所以更新状态时,采用CAS方式去更新, 如:compareAndSetWaitStatus
*/
volatile int waitStatus;
/**
* 前驱节点,比如当前节点被取消,那就需要前驱节点和后继节点来完成连接。
*/
volatile Node prev;
/**
* 后继节点。
*/
volatile Node next;
/**
* 入队列时的当前线程。
*/
volatile Thread thread;
/**
* 存储condition队列中的后继节点。
*/
Node nextWaiter;
/**
* 判断是否共享模式
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 获取前置节点,如果前置节点为空就抛出异常
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
入队
当一个线程获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成为节点加入到同步队列中,而这个加入队列的过程必须要保证线程安全,同步器使用CAS的方式,将节点加入到同步队列的尾节点。
private Node addWaiter(Node mode) {
//以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//快速尝试添加尾节点
Node pred = tail;
if (pred != null) {
node.prev = pred;
//CAS设置尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//多次尝试
enq(node);
return node;
}
addWaiter(Node node)先通过快速尝试设置尾节点,如果失败,则调用enq(Node node)方法设置尾节点。
private Node enq(final Node node) {
//多次尝试,直到成功为止
for (;;) {
Node t = tail;
//tail不存在,设置为首节点
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
//设置为尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在上面代码中,两个方法都是通过一个CAS方法compareAndSetTail(Node expect, Node update)来设置尾节点,该方法可以确保节点是线程安全添加的。在enq(Node node)方法中,AQS通过“死循环”的方式来保证节点可以正确添加,只有成功添加后,当前线程才会从该方法返回,否则会一直执行下去。
同步器将节点加入到同步队列的过程如下:
出队
CLH同步队列遵循FIFO,首节点的线程释放同步状态后,将会唤醒它的后继节点(也就是说是链表中的第二个节点,并不是首节点),而后继节点将会在获取同步状态成功时将自己设置为首节点,这个过程非常简单,head执行该节点并断开原首节点的next和当前节点的prev即可,注意在这个过程是不需要使用CAS来保证的,因为只有一个线程能够成功获取到同步状态。过程图如下:
设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首点的后继节点并断开首结点的next引用即可。
同步状态的获取和释放
独占式
独占式,同一时刻仅有一个线程持有同步状态。
获取同步状态
acquire(int arg)方法为AQS提供的模板方法,该方法为独占式获取同步状态,但是该方法对中断不敏感,也就是说由于线程获取同步状态失败加入到CLH同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移除。代码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
函数流程如下:
1. tryAcquire:去尝试获取锁,获取成功则设置锁状态并返回true,否则返回false。该方法自定义同步组件自己实现,该方法必须要保证线程安全的获取同步状态。
2. addWaiter:将该线程加入等待队列的尾部,并标记为独占模式;
3. acquireQueued:使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
4. selfInterrupt:如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
acquireQueued方法为一个自旋的过程,也就是说当前线程(Node)进入同步队列后,就会进入一个自旋的过程,每个节点都会自省地观察,当条件满足,获取到同步状态后,就可以从这个自旋过程中退出,否则会一直执行下去。如下:
// node 为获取锁失败,并已经被加入到同步队列中的节点
final boolean acquireQueued(final Node node, int arg) {
//标记是否成功拿到资源
boolean failed = true;
try {
//标记等待过程中是否被中断过
boolean interrupted = false;
// 自旋
for (;;) {
// 获取当前节点的前驱节点
//如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
final Node p = node.predecessor();
// 如果前驱节点是头节点,则当前节点尝试去获取锁
// 为什么这样?
// 头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,
// 后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
// 返回等待过程中是否被中断过
return interrupted;
}
// 执行到这里说明存在竞争,有多个线程都在等待一个锁
// 如果当前线程获取锁失败或者前驱节点不是头节点,则执行下面的方法
// shouldParkAfterFailedAcquire 方法的作用有三个:
// 1、如果这个节点已经取消了获取锁的操作,则删除队列中所有被取消了的节点。
// 2、若状态位不为Node.SIGNAL,且没有取消操作,则会尝试将状态位修改为Node.SIGNAL
// 3、状态位是Node.SIGNAL,表明线程是否已经准备好被阻塞并等待唤醒。
// 最终,只有在pred.waitStatus已经等于Node.SIGNAL时才会返回true。
// 其他情况返回false,然后acquireQueued会继续循环。
// 在shouldParkAfterFailedAcquire返回true之后,
// acquireQueued方法体内继续执行parkAndCheckInterrupt():
// 该方法调用LockSupport.park()方法使线程阻塞。
// 这里阻塞的原因是由于轮不到其获取锁,避免无效的自旋,浪费CPU资源
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //里面会对当前线程执行中断,当被唤醒时,继续循环
// 如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
interrupted = true;
}
} finally {
if (failed)
// 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
cancelAcquire(node);
}
}
线程阻塞
在线程获取同步状态时如果获取失败,则加入CLH同步队列,通过通过自旋的方式不断获取同步状态,但是在自旋的过程中则需要判断当前线程是否需要阻塞,其主要方法在acquireQueued():
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱节点的 waitStatus
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 已经告诉前驱节点,让其拿到锁后通知自己
return true;
if (ws > 0) {
/*
* 前驱节点被取消,跳过前驱节点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿到锁后通知自己一下。
// 有可能失败,人家说不定刚刚释放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
这段代码主要检查当前线程是否需要被阻塞,具体规则如下:
1、如果当前线程的前驱节点状态为SINNAL,则表明其前驱节点需要被唤醒,当前线程需要被阻塞,直接返回true,让当前线程阻塞。
2、如果当前线程的前驱节点状态为CANCELLED(ws > 0),则表明该线程的前驱节点已经等待超时或者被中断了,则需要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态 <= 0 ,返回false
3、如果前驱节点非SINNAL,非CANCELLED,则通过CAS的方式将其前驱节点设置为SINNAL,返回false。
整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己获取锁。
如果 shouldParkAfterFailedAcquire(Node pred, Node node)
方法返回true,则调用parkAndCheckInterrupt()
方法阻塞当前线程:
// 让当前线程阻塞
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
parkAndCheckInterrupt()
方法主要是把当前线程挂起,从而阻塞住线程的调用栈,同时返回当前线程的中断状态。其内部则是调用LockSupport
工具类的park()
方法来阻塞该方法。
独占式释放同步状态
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒等待队列里的下一个线程
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 如果当前线程不是锁的拥有者
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果锁的重入数为0,则将锁的拥有者置为null
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 更新 state 的值
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
//这里,node一般为当前线程所在的结点。
int ws = node.waitStatus;
if (ws < 0)
// 置零当前线程所在的结点状态,允许失败。
compareAndSetWaitStatus(node, ws, 0);
//找到下一个需要唤醒的结点s
Node s = node.next;
//如果为空或已取消
if (s == null || s.waitStatus > 0) {
//如果为空或已取消
s = null;
// 从后向前找,找到一个可以被唤醒的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒线程
LockSupport.unpark(s.thread);
}
共享式
共享式与独占式的最主要区别在于同一时刻独占式只能有一个线程获取同步状态,而共享式在同一时刻可以有多个线程获取同步状态。例如读操作可以有多个线程同时进行,而写操作同一时刻只能有一个线程进行写操作,其他操作都会被阻塞。
获取同步状态
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
//获取失败,自旋获取同步状态
doAcquireShared(arg);
}
这里tryAcquireShared()依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。所以这里acquireShared()的流程就是:
1、tryAcquireShared()尝试获取资源,成功则直接返回;
2、失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。
private void doAcquireShared(int arg) {
// 共享式节点
// 把当前线程封装到一个SHARE类型Node中,添加到SyncQueue末尾上
final Node node = addWaiter(Node.SHARED);
// 是否成功标志
boolean failed = true;
try {
// 等待过程中是否被中断过的标志
boolean interrupted = false;
for (;;) {
//前驱节点
final Node p = node.predecessor();
// 如果前继节点是head节点,获取同步状态
if (p == head) {
//尝试获取同步
int r = tryAcquireShared(arg);
// 返回值大于等于0,表示该次获取同步状态成功并从自旋过程中退出
if (r >= 0) {
// 将head指向自己,还有剩余资源可以再唤醒之后的线程
setHeadAndPropagate(node, r);
// head已经指向node节点,oldHead.next索引置空,方便p节点对象回收
p.next = null; // help GC
// 如果等待过程中被打断过,此时将中断补上。
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
跟独占模式比,还有一点需要注意的是,如果有剩余的话还会唤醒之后的队友。那么问题就来了,假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要2个。老大先唤醒老二,老二一看资源不够,他是把资源让给老三呢,还是不让?答案是否定的!老二会继续park()等待其他线程释放资源,也更不会去唤醒老三和老四了。独占模式,同一时刻只有一个线程去执行,这样做未尝不可;但共享模式下,多个线程是可以同时执行的,现在因为老二的资源需求量大,而把后面量小的老三和老四也都卡住了。当然,这并不是问题,只是AQS保证严格按照入队顺序唤醒罢了(保证公平,但降低了并发)。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);//head指向自己
//如果还有剩余量,继续唤醒下一个邻居线程
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
释放同步状态
public final boolean releaseShared(int arg) {
//尝试释放资源
if (tryReleaseShared(arg)) {
//唤醒后继结点
doReleaseShared();
return true;
}
return false;
}
因为可能会存在多个线程同时进行释放同步状态资源,所以需要确保同步状态安全地成功释放,一般都是通过CAS和循环来完成的。