根据ReentrantLock -- 解析AQS原理
java.util.concurrent.locks.AbstractQueuedSynchronizer
是什么
aqs,这是一个队列同步器框架,JUC中的公平锁、非公平锁、重入锁都是以aqs作为基础框架的,定义了加锁、释放锁,加共享锁等一些逻辑
AQS是一个抽象类,内部使用了一个FIFO的等待队列,用于多线程等待锁排队,通过state表示当前资源的加锁状态;
aqs是基础类,类中定义了模板方法,只需要实现对应的模板方法即可;aqs的作者是Doug Lea
aqs内部维护的双向队列,大致是这样的,其中,比较特殊的是:第一个节点对应的thread是null,链表中的第二个节点,才是第一个排队的线程;这里的意思是:第一个节点是正在执行的线程,无需排队;图中少画了waitStatus信息,每个节点都会有一个waitStatus信息,用来存储下一个节点的等待状态
AQS核心属性
volatile int waitStatus;
0,这是初始化状态,新Node会处于这种状态,初始化状态
static final int CANCELLED = 1;
因为超时或者中断,Node被设置为取消状态,被取消的Node不应该去竞争锁,只能保持取消状态不变,不能转换为其他状态,处于这种状态的Node会被踢出队列,被GC回收
static final int SIGNAL = -1;
由当前节点在队列中的后一个节点将当前节点的waitStatus设置为-1,表示告诉当前节点,如果当前节点unLock之后,通知后面的节点执行锁竞争(通过unPark()来唤醒)
static final int CONDITION = -2;
表示这个Node在条件队列中,因为等待某个条件而被阻塞
static final int PROPAGATE = -3;
使用在共享模式头Node有可能处于这种状态, 表示锁的下一次获取可以无条件传播
volatile Node prev;
表示队列中当前Node节点的前一个节点
volatile Node next;
表示队列中当前node节点的后一个节点
volatile Thread thread;
这个node持有的线程,在new Node()的时候,需要把线程传进去
Node nextWaiter;
表示下一个等待condition的Node
private transient volatile Node head;
FIFO队列中的头结点
private transient volatile Node tail;
FIFO队列中的尾节点
private volatile int state;
同步状态,0表示未加锁;1表示有一个线程加锁,2表示有两个线程加锁,一般是重入锁的场景
getState():获取同步状态
setState():设置同步状态
compareAndSetState(): 利用CAS进行同步状态的设置
spinForTimeoutThreshold = 1000L; :线程自旋等待时间
private Node enq(final Node node) {}:
是将当前排队的node节点放到FIFO的队尾;如果队列为空,就在node节点前面创建一个空节点,然后将node节点放到队尾
AQS使用说明
1、aqs中的node节点在排队的时候,waitStatus是0,在下一个排队的节点进来的时候,会把上一个节点的waitStatus设置为-1
2、笔记中说的aqs阻塞和唤醒指的是park()和unpark(); unpark()之后,在哪里park(),就从哪一行代码接着往下执行
3、笔记中说的第一个排队的节点指的是head节点的next;AQS队列中的一个节点head对应的thread永远是null
4、对于非公平锁,加锁失败,去排队之后,就不存在插队的情况;我们所说的加锁,其实就是尝试将state从0变成1;如果是重入锁,那就是在1的基础上再加1
5、AQS为什么要创建一个虚拟空节点?
因为在队列里面,每一个节点都要将前一个节点的waitStatus设置为-1,只有在前一个节点是-1的时候,在前一个节点释放锁的时候,会唤醒后面排队的线程;
那第一个节点没有前置节点,所以,就创建一个空节点,空节点可以理解为当前在执行的线程对应的node节点,在当前线程释放资源之后,会根据head的ws来判断是否需要唤醒下一个排队线程(也可以理解为第一个节点是当前执行线程的站队节点)
源码
我们通过对ReentrantLock的源码解析,来记录AQS的源码
ReentrantLock.lock()
ReentrantLock重入锁分为了公平锁和非公平锁,两者的区别是:在尝试加锁的时候,公平锁会判断当前线程是否可以加锁,如果可以加锁,就尝试cas,否则就去排队;非公平锁是,无论是否可以加锁,都强制cas加锁,加锁失败,就去排队
公平锁尝试加锁
/**
* 公平锁加锁流程:
* 1、先尝试加锁
* 1.1、如果加锁成功,就返回,流程结束
* 1.2、如果加锁失败,就判断是否可重入,可重入,就加锁成功,流程结束
* 1.3、如果未加锁成功,且不可重入,就去排队
* 2、排队addWaiter()
* 2.1、在排队的时候,先判断当前是否有在排队的节点,或者是否有空白的node节点;如果有,就直接将当前线程加入到队列中排队
* 2.2、如果没有在排队的节点、或者没有空白节点,就先new一个空白节点,插入到队头,然后将当前线程插入到队列,并设置为队尾
* 3、acquireQueued()方法:将排队节点的上一个节点的waitStatus设置为-1,然后进行park()
*
*
* 这里是尝试加锁,如果加锁失败,就放入到队列中
*
* 返回true,表示加锁成功,无需排队
* 如果tryAcquire返回false,表示需要去排队
*
*
* @param arg
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我们分开来看这几个方法
/**
* 这个方法的作用是:判断当前线程是否可以加锁,
* 如果未加锁,尝试加锁,加锁成功,就无需排队,加锁失败,就去排队
* 如果已经加过锁了,判断是否可重入,可重入,就state+1;不可重入就去排队
* 拿到当前是否已经加锁的标识:state(为0,表示未加锁;为1表示已经加锁)
* 1.如果未加锁:
* 1.1 尝试加锁,如果加锁成功,使用cas更新state的值
* hasQueuedPredecessors();只有这个方法返回false,才表示当前线程可以加锁;否则,就会去排队
* 2.如果已经加锁
* 2.1 判断当前锁和已经加锁的是否是同一把锁,如果是同一把锁,可以重入,重入需要对state+1
* @param acquires
* @return
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//state为0表示当前没有线程加锁
if (c == 0) {
/**
* hasQueuedPredecessors();判断当前线程是否可以加锁,如果返回false,表示可以加锁,返回true,表示需要排队
* compareAndSetState:尝试加锁,加锁就是把aqs的state+1
* setExclusiveOwnerThread:将当前线程设置为持有锁的线程
*/
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
/**
* 如果当前已经有线程加锁了,判断当前加锁的线程和持有锁的线程是否是同一个,也就是所谓的锁重入
* 如果是重入,就把state+1
*/
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
在尝试加锁的方法中,有一个最为核心的方法,就是hasQueuedPredecessors();这个方法,该方法是来判断当前线程是否可以加锁,如果这个方法返回false,表示可以加锁;如果返回true,表示需要去排队
/**
* 只有这个方法返回false,表示当前线程可以加锁,无需排队;如果返回true,表示需要排队
* 由于这里是&判断条件,所以,只有两个条件都为true,返回返回true;否则就是false
* 那我们来说不需要排队,可以直接加锁的场景:
* h != t 这个判断false有两种场景
* 1、当前队列中,还没有排队的线程,且队头的空白node节点还没有生成,这时候,都是null
* 2、当前队列中,已经生成了空白的node节点,但是没有排队的队列,这时候tail == head,这种场景有可能是在排队的节点都执行完了,然后这时候又来一个线程进行加锁
*
* 如果 h != t为true(为true表示最少有一个排队的线程),就需要判断后面的条件,只要后面的返回false,就可以不排队
* ((s = h.next) == null || s.thread != Thread.currentThread());
* 3.1、首先,来判断这个条件,说明前面的 h!=t是true(否则,如果h!=t为false,就无需判断后面的条件了),也就是说,当前队列中,已经有一个或者多个线程在排队
* 所以我觉得这里,如果在h!=t为true的情况下,不会存在h.next == null的这种场景;因为如果说head.next为null,就表明,当前只有一个空白节点,只有一个空白节点,在h!=t的时候,就直接返回false了
*
* 3.2、这个判断条件返回false,需要两个或条件都为false,否则,就会返回true,让线程去排队
* 两个条件都为false,简单:头结点后面有排队的节点,且头结点的下一个节点的thread是当前线程,这时候,会返回false,无需排队,直接尝试加锁;其实就是:当前来加锁的线程,和第二个节点(第一个排队的节点)
* 是同一个线程,这时候,可以加;因为我自己就是排在第二个的(第一个是空节点),为什么不能加锁呢?意思就是,我已经排到队了,就该我加锁
*
* 假如说,这时候一个线程过来加锁, h!=t,但是当前线程不是排队中第二个节点的thread,就会让排队,原因就是,这是公平锁,已经有人在排队了,为什么你不排队?这种情况就是 s.thread != Thread.currentThread()为true
*
* 3.3、如果h != t为true;后面的这个条件中,只要有一个为true,就需要去排队
* h.next == null为true 这个条件我觉得在前面条件为true的情况下,这里不太可能为true;所以,后面这个条件为true
* ,就是当前尝试加锁的线程和排队的线程不是同一个,这时候,就需要去排队,因为,你来加锁的时候,我已经在排队了,并且你和我又不是同一个线程,你为什么不排队?
*
* 所以总结而言,对于公平锁,无需排队的有这三种情况:
* 1、当前aqs队列为空
* 2、当前aqs队列不为空,只有一个空白的node节点
* 3、当前aqs队列不为空,有多个线程在排队,只有当前来加锁的线程是第一个排队的加锁线程时,可以不排队,直接尝试加锁(这个场景之前在网上看到一个帖子:是这样的,假如说现在在排队打水,A排在第一个等待打水的位置,如果A
* 的父母、亲戚来打水了,可以不排队,直接站到A的位置,因为他们是亲戚,一家人,也就是可重入)
*
* @return
*/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
对于非公平锁和公平锁来说,在加锁的时候有一个区别,就是非公平锁无需执行hasQueuedPredecessors()这个方法,也就是说非公平锁无需判断是否需要加锁,可以直接尝试CAS加锁
截止到这里,尝试加锁的方法就说完了,我们来说下后面的方法,如果tryAcquire(arg)方法返回true,表示加锁成功,就不需要后面去排队的方法了,假如返回的是false,表示当前线程需要去排队,排队的方法就是acquireQueued(addWaiter(Node.EXCLUSIVE), arg);这一行代码,有两个方法,我们拆开来看
非公平锁加锁
非公平锁加锁和公平锁加锁的区别是:
1、在调用lock()方法的时候,会先cas加锁,而公平锁是直接去判断是否允许加锁的
2、在判断到state为0 的时候,公平锁调用hasQueuedPredecessors()去判断是否可以加锁,而非公平锁不会调用这个方法,直接cas
除了这两个地方,其他都是一样的加锁流程
非公平锁只要去排队了,执行的流程和公平锁是一样的
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}s
排队-addWaiter(Node.EXCLUSIVE);
首先,先描述一下这个方法的作用:当线程需要排队的时候,会先new一个node节点,如果当前aqs队列不为空,就直接把new出来的node节点,加到队尾;如果aqs队列为空,就先new一个空白的node节点,作为head队头,然后把需要排队的当前线程,加到head后面;
/**
* 首先根据当前线程,生成一个node节点:
* 这个方法完成的作用就是:如果当前有节点在排队,或者已经生成了空白节点,就直接将new的node节点插入到队列中;如果当前没有节点在排队,没有生成空白节点,就先生成空白节点,插入到队列中,再将new的node查到空白节点后面
* 1、如果尾结点不为null,表示当前已经存在排队的节点;或者当前不存在排队的节点,但是已经生成了队列中的第一个空节点,这是,就不需要做其他操作,直接加到队尾即可
* 2、如果pred == null,表示当前没有尾结点;这时候,在插入到队列之前,需要先放一个空节点在最前面,然后把new出来的node加到空节点后面;这个操作,就是在enq(node);中完成的
* @param mode
* @return
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//队尾不为null
if (pred != null) {
//由于是双向链表,所以需要设置node.prev和pred.next
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//这里是表示当前aqs队列为空,需要先new一个空白node节点作为head
enq(node);
return node;
}
-------------------------------------
/**
* 代码如果进入到这里,表示当前需向队列中插入node节点,但是没有空白节点
* @param node
* @return
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
/**
* 这里还是先判断一下队尾是否为null,如果不为null,表示在这一段时间内,有其他线程已经创建了空白节点
* 1、如果尾结点为null,就new一个空白节点,并将空白节点设置为头结点;这时,其实头结点和尾结点都是这个空白节点
* 2、这里是一个for死循环,所以,在创建了空白节点之后,第二次循环的时候,会进入到else
* 将node节点设置为尾结点,并将head的next设置为node,至此,这个方法结束,return
*/
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
截止到这里,需要排队的队列已经加入到了aqs队列中,在入队之后,还要完成后续的动作:将前一个节点的waitStatus设置为-1,表示前一个节点在执行结束,释放锁之后,要唤醒后面的排队界面;这里的一个点是:在当前节点插入队列之后,需要把上一个节点(node.prev)的ws设置为-1
这个把当前排队节点的上一个节点的waitStatus设置为-1是非常重要的一个操作,因为在前一个节点解锁之后,会判断自己的waitStatus是否是-1,来判断:当前是否还有排队的线程;如果自己的waitStatus是-1,就表示后面还有排队的线程,就要去唤醒;否则,就不需要去唤醒
acquireQueued()方法:休眠
这个方法主要完成了以下工作:
1.如果当前node节点是第一个来排队的线程,就尝试进行一次加锁,防止在这期间,加锁的线程执行完毕,未唤醒后面的节点
2.如果加锁失败,或者是非第一个排队节点,那就尝试将上一个节点的waitStatus设置为-1,然后进行park()即可
/**
* 这个方法完成的是:判断当前入队的node节点是否需要park
* 如果当前是第二个线程进入到这里,会尝试去竞争锁;
* @param node
* @param arg
* @return
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//这里的p是当前节点的上一个节点 prev
final Node p = node.predecessor();
/**
* 如果当前节点的prev就是head头结点,就表示当前只有我自己一个线程来排队了,这时候,就尝试加一次锁,看是否可以加锁成功;因为有可能在我入队的这一段时间,线程已经执行完了,所以这里要尝试加一次锁
*
* 如果加锁成功,就把当前节点设置为头结点,其实就是把当前节点设置为空节点,将原来的空节点回收掉
* 如果加锁失败,就走下面的流程
*/
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/**
* 将上一个节点的waitStatus设置为-1的逻辑是在shouldParkAfterFailedAcquire方法中完成的
* 代码走到这里,表示当前我不是第一个来排队的线程,,或者我是第一个来排队的,上面尝试加锁的时候,失败了;这里判断是否需要park(),需要判断我的上一个节点的waitStatus是否是-1(待唤醒状态)
* ;如果不是,就将上一个节点的ws设置为-1,然后再调用LockSupport.park
* (this);并将当前线程park(),然后返回;
*
* parkAndCheckInterrupt();会将当前线程阻塞
*
* 需要注意的是:线程在哪里被park()的,等到被unpark()的时候,就会接着park()的代码执行,也就是说,线程被唤醒之后,会执行return Thread.interrupted();
* 如果这里是false,就会接着for循环,尝试加锁,这时候,会加锁成功,因为本来就该我来加锁了,然后return interrupted,这时候的interrupted是false
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
/**
* 这里的failed为true的条件是比较苛刻的,如果上面的线程加锁成功了,failed为false;如果未加锁成功,又会unpark掉,所以理论上,这里是不可能为true
* 的,除非是在上面代码的执行过程中,抛出了异常
*/
cancelAcquire(node);
}
}
解锁过程
公平锁和非公平锁调用的是同一个解锁方法,解锁主要完成以下几个操作
1.将当前state-1,如果-1之后不为0,就return,表示当前锁进行了重入,还有线程未释放锁
2.如果state为0,表示当前线程已经将锁完成释放,尝试唤醒后面排队的节点
3.在唤醒后面排队节点的时候,会判断当前head节点的waitStatus是否是-1,如果为0,表示后面没有排队节点,为-1,就去尝试唤醒第一个排队的节点
4.在唤醒时,调用LockSupport.unpark(thread)即可
/**
* 注意,ReentrantLock.unLock()和ReentrantReadWriteLock.writeLock.unLock()都是调用的这里
*
* tryRelease()方法返回true,表示解锁成功;返回false,表示有重入锁,还未解锁成功
* 1、如果解锁成功,就判断当前头结点的ws是否为非0,正常场景下,这里的ws应该是-1
* 如果这里的ws为0,表示这是最后一个线程了,因为后面有排队的线程的话,就会把我的ws变成-1
* 换而言之,在不考虑其他情况,如果一个node节点的ws为0,就是队尾了
*
* 在解锁成功之后,需要进行下一步的操作,这里可能会有多种情况
* 1、当前aqs队列为空,没有在排队的线程,这里的head就是null
* 2、aqs队列不为空,但是只有一个空白node节点,这时候,head的ws就是0,因为我后面没有人在排队了,没有人将我的ws设置为-1;也就是说,我执行完之后,就不需要唤醒后面的节点了
* 3、aqs队列不为空,并且空白node节点后面,还有线程在排队,这就是要调用unparkSuccessor(h)的场景
*
* @param arg
* @return
*/
//如果是非公平锁,在这个方法调用之前,会先进行一次cas,如果cas失败,再走这个代码流程
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* unlock解锁的时候,公平锁和非公平锁是一样的逻辑
* 1、判断当前解锁的线程和当前加锁的线程是否是同一个;如果不是同一个,报错
* 2、如果是同一个,state-1,如果-1之后的state为0;就表示当前线程已经解锁成;如果不为0,表示锁进行了重入,就将减1之后的值,set到state
*
* @param releases
* @return
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
我们再看下unparkSuccessor方法
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
/**
* s == null的场景就是当前队列中没有排队的线程,只有一个空的node节点
*/
if (s == null || s.waitStatus > 0) {
s = null;
/**
* 这个for循环的作用是这样的:
* 假如现在AQS队列中是这样的:
* 空白node节点 -- A节点 --B节点 -- C节点 -- D节点
* 假如说A节点被取消了,这时候A的ws就是0,就会进入到这个循环中
*
* 最终会找到B返回
*/
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//这里的s就是head的下一个节点,这里调用unpark();有一个原则,在哪里park()了,在被unpark()之后,就接着park()后面的代码来执行
if (s != null)
LockSupport.unpark(s.thread);
}
所以总结而言,有几个关键点
1.在aqs队列中,第一个head节点是当前正在持有锁的线程对应的node节点
2.aqs队列中第二个节点,才是第一个排队的线程
3.非公平锁和公平锁的区别是在加锁的时候,是否会判断需要排队,如果一旦排队,就没有任何差别
4.在排队的时候,如果当前节点是第一个排队的节点(aqs中的第二个节点),需要尝试进行一次加锁,因为防止在排队的这个过程中,锁释放了,但是又没有通知node节点进行加锁
5.在排队之后,需要将上一个节点的waitStatus设置为-1
6.在释放锁之后,会尝试唤醒后面第一个排队的节点
7.所谓的加锁、解锁,就是将volatile修饰的state变量 +1 -1
本文地址:https://blog.csdn.net/CPLASF_/article/details/109627542
上一篇: Python爬虫采集微博视频数据
下一篇: MySQL常用存储引擎功能与用法详解