AQS梳理
程序员文章站
2022-07-06 08:47:12
概述aqs是java并发包的一个基石级别服务,主要是通过队列和大量的cas来玩转。本文我们会基于公平锁来跟进抢锁流程后续补个图,这个是公平锁的lock方法。ReentraLock的内部类 FairSync类关系 FairSync extend Sync(抽象类) extends AbstractQueuedSynchronizersync定义了lock抽象方法给 子类实现,FairSync的实现具体是依赖 aqs的 acquire来看下acquire方法【这是个小模版方法,具....
概述
aqs是java并发包的一个基石级别服务,主要是通过同步队列/条件队列 和大量的CAS来玩转,里面会涉及大量的公平 非公平 中断状态的维护。里面还有一些常用工具的底层实现的介绍,基于共享模式的CountDownLatch和semaphore,基于ReerantLock和condition实现的
CyclicBarrier
老实说感觉大佬的代码写的不是很干净,看起来有点烧脑
本文我们会先基于公平锁来跟进下RenetrantLock.lock()代码
大致流程图
抢锁流程
这个是公平锁的lock方法。ReentraLock的内部类 FairSync
类关系 FairSync extend Sync(抽象类) extends AbstractQueuedSynchronizer
sync定义了lock抽象方法给 子类实现,FairSync的实现具体是依赖 aqs的 acquire
来看下acquire方法【这是个小模版方法,具体实现还是各类自己实现】
tryAcquire和acquireQueued都会尝试抢锁
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
代码也比较简单,前面没有排队的就cas抢锁。如果有线程持有锁,正好那个线程就是自己statue+1
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//0表示没有线程持有锁
if (c == 0) {
//如果前面没有节点 就来一次cas 成功了 把当前线程设置成独占线程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//这个看持有锁的线程是否是自己,如果是的,在进一次 就是可重入的实现
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
再看下入队逻辑
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
如果队列为空enq,看下初始化代码 注意下header节点是个空的,里面没有线程
先初始化链表,再把自己设置成tail节点
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//这个方法如果返回了 说明是抢到锁了 返回结果代表线程的中断状态
//第二次抢锁,前继节点是head节点 就会尝试抢一次。
//第一次抢锁成功返回false,如果第一次阻塞自己,第二次再抢锁成功 返回true
//抢锁失败,把自己的前面那个节点的waitStatue设置成-1,阻塞当前线程
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//把前继节点的signal设置成-1,同时清理阻塞队列中取消等待的节点
//第二个函数很简单 就是park自己 同时返回中断状态
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//是否需要阻塞,这个函数主要是把前面那个节点waitStatus改成-1,
//清除队列中取消抢锁的线程
//完成了上述工作才会安心的返回true 最后把自己静静的挂起来
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//上面是否需要阻塞,主要就是剔除取消节点和把前面节点的waitStatue改成-1
//准备工作做好了,把自己阻塞起来 等待唤醒 [中断和释放锁的动作会释放park]
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
释放锁
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//判断当前线程是否是持有锁的线程,如果statue=0(代表完全释放了) 把当前支持的锁的线程设置为空
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//如果锁释放干净了,先把head的waitStatus设置为0,通知后面等待的兄弟们 起来拿锁
//拿锁有两个动作
如果同步队列里面第一把交椅还活着(不算header),直接唤醒这个对应的线程
如果不存在,之前代码会经常把next=null(帮助gc),只能从tail往前遍历第一个waitStatus<=0的
(代表没有放弃等待的节点,效率不是很高,其实同步队列一般也没那么长)
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
node结构
这个其实就是包装了请求的线程,记录了前后节点,争抢状态,模式-独占/共享
public class Node {
//标记共享还是互斥
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
//取消抢锁
static final int CANCELLED = 1;
//标示后面那个节点可以抢锁了
static final int SIGNAL = -1;
//条件队列节点标志 ConditionObject使用 比synchronized更灵活
static final int CONDITION = -2;
//一般共享模式 被唤醒的节点使用这个连环模式 比如释放所有await在同一个state上的线程节点
static final int PROPAGATE = -3;
//上面四个就是给他用的
int waitStatus;
Node prev;
Node next;
//节点里面的线程
Thread thread;
//还有一个单向链表 条件队列使用的。共享模式 也会用这个来标记
Node nextWaiter;
}
公平与非公平的差异
主要差异在抢锁会判断有没有前继节点,非公平直接来个cas 不上路子的操作(有两个地方会尝试直接抢锁 下面代码有标注),吞吐量会好一些,不过可能会造成之前排队中的线程产生长时间的饥饿等待。
ReeranterLock默认就是非公平
final void lock() {
//第一次-先直接cas抢一次
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//第二次-这里也不看是否是有前继节点
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
条件队列
大致处理流程图
流程代码
简单使用demo
Lock lock = new ReentrantLock();
//每个condition都会有一个对应的条件队列
Condition condition = lock.newCondition();
lock.lock();
业务代码
//阻塞业务执行线程
condition.await();
唤醒阻塞线程-同步队列中的头节点
lock.lock();
condition.signal();
AWAIT|SIGNAL-细节代码
await主要是把自己加入到条件队列中,释放自己持有的锁,开始把自己挂起(一般第一次肯定不在同步队列中)
其他线程调用signal会把对应的线程唤起,正常逻辑 在同步队列中抢到锁就直接返回了
如果是被中断的,也会去抢锁,如果是在signal之后的 node一般在同步队列中,会尝试去抢锁,抢到之后设置自己中断
如果是signal之前的,最后还是会抛中断异常
public final void await() throws InterruptedException {
//响应中断
if (Thread.interrupted())
throw new InterruptedException();
//进入条件队列(单向链表)把自己加入到队尾 同时会把所有取消的节点清除出队列
Node node = addConditionWaiter();
//释放所有的锁,同时记录下 之前持有的锁记录[被唤醒后会重新获取锁]
//这里顺带说一句 和wait notify有点类似,如果当前线程不持有锁,直接await会直接报错 IllegalMonitorStateException
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果没有进入阻塞队列,把自己挂起来 =====while开始
while (!isOnSyncQueue(node)) {
//这里就告一段落了,自己进入了队尾,清理了条件队列,把自己挂起来了
下面两种情况都会打破这个循环
[1.有线程来signal操作-加入阻塞队列 前继节点取消了或者更新前继节点waitStatus=-1失败
|2.对该线程做了中断操作 可能会唤醒他]
LockSupport.park(this);
//下面这个方法的意思是发生过中断 跳出循环
//interruptMode的逻辑如下 signal操作是个分水岭
//如果没有发生中断值是0
//如果在别的线程signal前被中断的是THROW_IE (根据waitStatus!=-2来识别的)
//如果在waitStatus=0 但是不在阻塞队列中 这个时候会一直自旋 直到进入阻塞队列 REINTERRUPT
//Thread.interrupted?(transferAfterCancelledWait(node)?THROW_IE:REINTERRUPT):0
//附上作者的注释 清晰明了 是signal之前中断的Throw_IE 如果是在之后REINTERRUPT
//returning THROW_IE if interrupted
//before signalled, REINTERRUPT if after signalled 0 if not interrupted.
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
} //===========while结束
//如果在阻塞队列中抢锁成功,状态是中断的并且不是在signal之前中断 把这个中断模式设置成1
//两种情况1-正常进入到阻塞队列中 或者在signal之后被中断的
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//这里一定都到了阻塞队列中了,和条件队列中的小兄弟断绝关系
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//发生了中断,这里统一处理
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
//如果await期间,signal之前被中断 抛中断异常
//如果是signal之后被中断的,自己调用一次中断
注意第二个场景 如果在signal前被中断 但是抢锁成功了也走Throw_ie逻辑
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
//从条件队列里面唤醒节点,依此头节点到后面。成功了就返回
//具体就是把对应节点waitStatus设置成0,enq。如果前继节点取消了等待或者更新前继节点waitStatus=-1失败,换新node对应的线程
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
//从第一个开始唤醒,如果失败 尝试唤醒第二个 依此类推
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
//把条件队列里面的node搞到阻塞队列中,先把waitStatus设置成0,然后加入队尾
//如果前继节点的waitStatus>0 说明取消了,或者把前节点更新成-1失败 唤醒node对应的线程
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
//这个一般是0
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
现在跟着CountDownLatch来看下共享模式
//看下构造方法 里面会设置state的值
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
//Sync这个是CountDownLatch的内部类(也是继承自AQS),构造方法就是setState(count)
this.sync = new Sync(count);
}
//这玩意主要就两个方法,我们跟着这两个方法看下代码
//latch.await(); 挂起调用线程
//latch.countDown(); 计数器state-1 减到0就唤起所有挂起的线程
AWAIT代码
//俄罗斯套娃娃 不重要的直接不描述了
//代码大致逻辑先描述下
//1.看下state是否=0 如果没有操作过这个 老子无敌 直接不阻塞
//2.如果status>0 开始我们的逻辑
2.1 加入到阻塞队列中 node的类型是shared,通过nextWaiter
2.2 开始老动作 自旋处理
2.2.1 如果前继节点是header且state=0 尝试把所有等待的线程唤醒
2.2.2 如果满足上述条件,老操作更新前继节点ws=-1 把取消的节点清除 阻塞自己
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//响应中断-一般执行耗时比较久的会支持这个
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//核心代码就这两个
//(getState() == 0) ? 1 : -1 如果锁清空了返回1 否则就是-1
if (tryAcquireShared(arg) < 0)
//如果state>0 就会进入这个方法,我们看下具体的内容
doAcquireSharedInterruptibly(arg);
}
//阻塞大法
void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
//加入队尾这个逻辑很简单
//注意下 他是怎么标记自己的是共享模式节点的,设置nextWait=NODE.SHARED
//判断isShare 逻辑是 return nextWaiter == SHARED;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//如果他的前继节点就是header,在去拼一把 如果state=0 会返回1 避免自己被阻塞
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//前继节点不是header / state!=0
//把前继节点waitStatue设置成-1(清除取消等待的节点) 把自己park起来
//这个方法如果是被中断打破park的 抛中断异常
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//看关键代码
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
//把自己设置成头节点
setHead(node);
//这里的propagate一定是1
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//如果node是共享的模式 基本也是无悬念
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
//队列存在且不是刚初始化的队列
//最理想的模式,waitStatus=-1 cas更新成0 唤醒h
//如果waitStatus=0 把waitStatue更新成-3
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
//这里如果正常唤醒的话 被唤醒的节点会变成头节点,所以一定不会等于
//setHeadAndPropagate方法里面有 setHeader(xx)会传递变化这个头节点
if (h == head)
break;
}
}
//最关键的代码
private void unparkSuccessor(Node node) {
//这边一般是0,直接跳过这个
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//清除取消等待线程 从尾部往前遍历第一个ws=-1的节点 唤醒他
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
countDown代码
//这个如果是当前线程把state设置成了0 返回true
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
//不细看了 就是唤醒shared类型的节点 oneByOne
//unparkSuccessor里面会唤起下一个 依此下去 会唤起所有等待的
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0&&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
CyclicBarrier
使用demo
//下面这段代码 会被挂住 要三个线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("我是小白,我到指定地点了");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("人到齐了 我们去玩吧");
}
});
thread1.start();
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("我是小黑,我到指定地点了");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("人到齐了 我们去玩吧");
}
});
thread2.start();
实现原理
主要看下为啥await能挂起 到达一定数量大家又能愉快的开始
//这个实现只是自己维护了一个count 变成0的时候 用ReentrantLock保证线程安全
//基于条件队列实现阻塞和唤醒
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException, TimeoutException {
//保证线程安全 靠自己内部的ReentrantLock来保证
final ReentrantLock lock = this.lock;
lock.lock();
try {
//这个时候 如果是第一次执行 broken=false
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//count是被挂起的线程数量 index也是一个意思 如果==0说明 大家都到齐了
//下面代码也比较简单 最后一个luckDog执行command,然后开启下一代
//如果command执行失败 runAction=false 在finally里面还会有一些擦屁股的动作
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//nextGeneration里面有一个关键动作 trip.signalAll()
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
//如果有人没到齐 就用trip.await阻塞住
//这个trip是CyclicBarrier的一个成员变量,lock的衍生品 lock.newCondition()
//trip可以看出来 cyclicBarrier的阻塞和唤起就是基于条件队列的
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
Semaphore
使用demo,不看源码 猜测一般就是state>0直接放过 其他的block
final Semaphore semaphore = new Semaphore(100);
Thread t1 = new Thread(()->{
//也是响应中断的
try {
semaphore.acquire(1);
System.out.println("拿到令牌了");
semaphore.acquire(100);
System.out.println("正常情况下 走不到-拿到令牌了");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
Thread.sleep(588);
t1.interrupt();
semaphore-acquire
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
//熟悉的代码 aqs的共享模式 这里和countDownLatch有点差异 不过返回-1一般都是代表不满足条件
//tryAcquire如果countDownlatch操作的 state==0 返回1 否则是-1
//semaphore逻辑见下面详情吧 公平和非公平模式有差异 一般执行成功返回剩余量 代表成功
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//返回正数也有可能没有cas操作成功 令牌数没有减少。不过自己不会被挂着了
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
公平模式
//如果有前继节点 直接返回-1
//如果自己就是第一个 看下逻辑
//1.如果要求的令牌量>state 返回-N
//2.如果满足条件 但是cas失败了 返回剩余量 正数
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
//这块和之前一样 把自己加入到阻塞队列中(共享模式)尝试看看是否还能抢一次成功
//成功的话 唤醒一起等待的线程 就像带着好兄弟一起越狱
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
非公平模式
//非公平模式 不看前继节点 其他的没差 就是不上路子
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
本文地址:https://blog.csdn.net/u013657993/article/details/107078437