JUC之AQS
程序员文章站
2022-05-04 21:39:00
...
AbstractQueuedSynchronizer
先大致讲一下工作原理:AQS内部主要维护了一个Node类型的链表,用于储存排队的线程信息,当有新的需要阻塞的线程进来时一般创建一个node对象,加入到链表的尾部,当链表中首节点释放资源时调用LockSupport的unpark去唤醒等待的线程。
配上一个比较经典的图片
接下来是AQS主要的成员变量:
//链表的首节点
private transient volatile Node head;
//链表的尾节点
private transient volatile Node tail;
//当前同步器的状态
private volatile int state;
//一般用于申请资源时判断是否自旋的一个时间阈值,当请求设置的超时时间小于这个时间的时候,就直接自旋而不是等待
static final long spinForTimeoutThreshold = 1000L;
//Unsafe对象,java中用于直接操作内存地址的工具,授信于jdk代码
private static final Unsafe unsafe = Unsafe.getUnsafe();
//下面可以理解为上述成员变量在对应对象的内存地址内的偏移量
//waitStatus与next是AQS的内部类Node中的成员变量
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
接下来介绍一些核心方法
- acquire(int arg)
无视线程interrupt不断的申请资源
/**
* acquire用于排他的申请资源,其中tryAcquire是一个抽象方法,需要子类去自己实现。
* 当第一次tryAcquire失败后,会添加一个排他的node到node链表中,然后不断的申请资源。
* /
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* 添加当前线程到等待的链表中,其中通过unsafe中的方法修改自己为tail节点
* 如果失败了则进入enq中循环执行把自己设置为tail的操作
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* 该方法循环执行以下逻辑
* 当前线程前一个线程是head时,去执行tryAcquire方法尝试获取资源,同上tryAcquire为子类实现
* 执行shouldParkAfterFailedAcquire判断在请求资源失败之后是否需要park(即阻塞自己),如果需要阻塞则执行parkAndCheckInterrupt
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 取链表中前驱节点的waitStatus
* 如果是signal的,则需要阻塞自己,因为之前的节点在释放资源之后会唤醒自己
* 如果状态>0的,即是cancelled,则往前找,一直找到不是cancelled状态的节点,然后把自己放在它后面
* 如果状态是0或者-3,则需要把前驱的状态设置为signal的,此次自己不阻塞等下次自己再进入这个方法时,便会阻塞自己
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* 执行LockSupport中的park方法,阻塞自己,等待其他线程释放资源时唤醒自己,执行下一行即return
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
- acquireInterruptibly(int arg)
/**
* 与accquire方法不同的是,当线程interrupt的时候,该方法会抛出InterruptedException
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
/**
* 与之前acquireQueued不同的是,这里当执行到parkAndCheckInterrupt返回true的时候,即线程状态是interrupted状态时,会抛出异常
* 这里补充一下,线程一般销毁是在run方法执行完后(stop destry这些不推荐使用),而执行线程的interrupt方法时,
* 线程不会结束,只会去修改线程的状态,以下是网上处理interrupt的一些建议
* 1)线程处于阻塞状态,如使用了sleep,同步锁的wait,socket中的receiver,accept等方法时,会使线程处于阻塞状态。
* 当调用线程的interrupt()方法时,会抛出InterruptException异常。阻塞中的那个方法抛出这个异常,通过代码捕获该异常,
* 然后break跳出循环状态,从而让我们有机会结束这个线程的执行。
* 一定要先捕获InterruptedException异常之后通过break来跳出循环,才能正常结束run方法
* 2)线程未处于阻塞状态,使用isInterrupted()判断线程的中断标志来退出循环。
* 当使用interrupt()方法时,中断标志就会置true,和使用自定义的标志来控制循环是一样的道理
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- acquireShared(int arg)
/**
* 同样的是tryAcquireShared是个抽象方法,需要子类去实现。
* 返回负值就表示失败,然后进入自旋
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* 这里先添加一个共享模式的node到链表尾部,然后自旋,大部分逻辑与acquireQueued是一样的
* 判断是否需要阻塞
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* 当前线程取得资源 或者 首节点为空 或者 首节点不是被cancel 或者 此时head为空 或者 此时head没有被cancel
* 这些条件满足一条时 判断下个节点是否是空或者共享节点,是的话就调用doReleaseShared
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
- acquireSharedInterruptibly(int arg)
与排它模式的interrupt一样,就是线程被设置为interrupt状态后会抛出异常
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
- release(int arg)
释放资源
/**
* tryRelease为抽象方法,需要子类实现
* 这里tryRelease成功后,如果head不为空且waitStatus不为0就唤醒后继的阻塞线程
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
然后是两个内部类,先介绍Node这个内部类
成员变量
/** 标志node为共享模式 */
static final Node SHARED = new Node();
/** 标志node为排他模式 */
static final Node EXCLUSIVE = null;
/** waitStatus的值,标志着线程被撤销了 */
static final int CANCELLED = 1;
/** waitStatus的值,标志着后续的线程需要被unpark,说简单点,就是处于这个状态的node,在head节点释放之后会被唤醒 */
static final int SIGNAL = -1;
/** waitStatus的值,标志着线程正在等待某种条件满足 */
static final int CONDITION = -2;
/**
* waitStatus的值,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
*/
static final int PROPAGATE = -3;
/** waitStatus**/
volatile int waitStatus;
/** 前驱节点 **/
volatile Node prev;
/** 后继节点 **/
volatile Node next;
/**当前线程**/
volatile Thread thread;
/**下一个等待环节,指向排他类型的node,区分next在于next可能是共享模式的,方便快速定位到下一个等待线程**/
Node nextWaiter;
另一个内部类ConditionObject,实现了Condition接口
成员变量
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;