Lock源码深度解析(lock方法unlock方法、AQS)
大家好,我是wave。这次我们继续接着讲锁,来给大家聊一聊Lock的一些底层原理。
基本使用
Lock的基本使用案例
public class Solution {
static int n = 0;
public static void main(String[] args)throws Exception {
//创建一个Lock对象
Lock lock = new ReentrantLock();
//创建10个线程对n自加10000
for (int i = 0; i < 10; i++) {
new Thread(()->{
//加锁
lock.lock();
try {
for (int j = 0; j < 10000; j++) {
n++;
}
}catch (Exception e){
e.printStackTrace();
}finally {
//解锁
lock.unlock();
}
}).start();
}
Thread.sleep(3000);
System.out.println(n);//100000
}
}
- 这个代码案例就是创建了10个线程对一个变量n进行自加10000的操作,然后使用了Lock进行加锁,最后结果是正确的100000。
- 可以看到这里Lock加锁的逻辑代码加了一个try-catch块,这里并不是必须的一个异常捕获,但是Lock比较标准的写法就是最好使用try-catch块写入业务逻辑,最后在finally中进行unlock(解锁)。这样做的好处是避免某个线程突然发生异常,导致后面的unlock代码没有执行,就会造成死锁。
- 本篇文章主要讨论的是Lock的实现类ReentrantLock。
加锁操作
进入到lock()方法中,并找到ReentrantLock的实现方法
public void lock() {
sync.lock();
}
继续进入lock()
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
这里我们发现Sync是一个继承了AbstractQueuedSynchronizer的类,AbstractQueuedSynchronizer就是我们常说的AQS,所以说Lock的底层使用的是AQS框架。AQS的细节我们后面继续说。
继续看lock()抽象方法的实现类,我们先选择看公平锁。
final void lock() {
acquire(1);
}
接着看 acquire(1),这个方法的tryAcquire、acquireQueued、addWaiter都会详细分析
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 这个acquire就是Lock加锁的关键了
- tryAcquire(arg)这个方法就是在尝试加锁,如果加锁成功就返回true,然后根据if里面的判断!tryAcquire(arg)的值就是false,所以&&后面的代码就不会执行了。
- acquireQueued这个方法就是在把Node节点进行入队,也就是当点线程加锁失败了,所以需要把这个线程进行阻塞入队,之后等到共享资源被释放了之后再尝试加锁。
- addWaiter(Node.EXCLUSIVE)会返回一个Node对象,这个对象就包含了当前线程Id等信息,并且如果Lock里面还没有队列的话,这个方法会先创建一个队列。
深入看看tryAcquire里面代码
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
//获取state变量值
int c = getState();
//当state变量为0表示当前对象未加锁,对其尝试加锁
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果是可重入锁就进入else if
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
这个类的逻辑还是比较好看懂的,首先获取了一下当前的线程,然后再获取state变量的值,这个state变量如果为1,表示已经有别的线程持有这个锁,如果为0表示当前还没有线程持有这个锁。所以如果state为
0就进入if里面。如果state为1,并且是可重入锁,就进入到else if代码块里面。如果都不满足,就返回false,然后当前线程进入队列排队。
进入到hasQueuedPredecessors
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
- hasQueuedPredecessors这个方法需要返回false才能真正进入到下面的cas进行加锁。
- 第一种是头节点与尾结点是同一个,这种情况只可能是队列还没初始化,所以h == t,返回false
- 第二种是头节点后面有下一个节点,并且这个节点是当前线程。也就是说尾结点前面的和头节点的下一个线程应该是正确的线程。一般前一个线程释放锁,后一个线程进行加锁走的就是这个方法。
- 走完hasQueuedPredecessors方法之后就使用compareAndSetState也就是一个cas操作对state的值进行修改,如果修改成了1则加锁成功,再 setExclusiveOwnerThread(current)设置可重入锁线程为当前这个线程,然后返回true就可以了。
- else if里面的可重入锁的代码就比较好理解了,判断当前这个线程是不是第一次设置的可重入锁线程(就是第一次设置过的线程和后序进入的线程是不是同一个),如果是,就对nextc这个变量进行 + 1,然后再修改state的值为nextc。加锁成功返回true,否则返回false。
acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 我们上面分析的是tryAcquire方法,只有tryAcquire加锁失败,!tryAcquire(arg)为true才会继续执行后面的代码。既然是加锁失败,那么肯定是这把锁已经被其他线程锁持有了,所以这里我们可以想到肯定是会有一个入队的操作。
- 首先lock里面的这个队列并不是Java集合中的队列,而且AQS中的虚拟双向队列,虚拟的意思就是并没有用Java的API中的队列,而且自己使用了一个Node节点,里面定义有pre指针与next指针,自实现的这么一个队列。
- 这个队列并不是Lock对象一创建就会有的,上面我们也提到过队列未初始化的一种加锁情况。所以入队操作首先要判断队列是否被创建,如果还没有被创建的话就需要先创建队列。
调用acquireQueued会先执行addWaiter(Node.EXCLUSIVE)这个函数,所以先进入到addWaiter
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
再先看看Node节点
static final class Node {
volatile Node prev;//队列前一个
volatile Node next;//队列下一个
volatile Thread thread;//线程
private transient volatile Node head;//头节点
private transient volatile Node tail;//尾结点
private volatile int state;//表示锁状态的变量
这里可以看到Node节点就是用来组成队列的元素,这里Node我只截取了关键的几个属性。
- 我们继续分析addWaiter,第一步创建一个Node,也就是把当前线程变成一个Node,然后当尾结点不会null的时候进入if代码块,尾结点不为null意味着这个队列已经被初始化了。所以如果当前队列还为初始化就进入到enq去初始化队列。
- 如果是队列已经被初始化进入到了if代码块里面,就是把node插入到队列的尾部。
如果未初始化,进入enq
private Node enq(final Node node) {
for (;;) {
//未初始化的话tail肯定为null,进入到if里面。
Node t = tail;
if (t == null) { // Must initialize
//cas操作设置头节点
if (compareAndSetHead(new Node()))
//这里让tail不为null了,所以下次循环会进入到else里面
tail = head;
} else {
//node的前一个指向t,如果是初始化进入的话
//t是head也是tail,如果不是初始化进入t就是tail
node.prev = t;
//cas把Node节点中的tail指向node
if (compareAndSetTail(t, node)) {
//t此时后面加了一个node,所以t的下一个为node
t.next = node;
return t;
}
}
}
}
这段代码有点难看懂,我画了一个图帮助大家理解
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-b11krvJZ-1609316095610)( http://gtwave.gitee.io/image/images/wechart/2020-12/lock队列enq图解.png)]
最后再进入到acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取节点前一个
final Node p = node.predecessor();
//如果前一个节点是头节点并且对当前节点解锁成功进入if代码块
if (p == head && tryAcquire(arg)) {
//当前节点变为头节点,说明它持有了锁,前一个节点指向null
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果加锁失败需要park线程会进入到这里继续阻塞线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- acquireQueued这个方法主要功能就是循环不断的让队列中的第一个线程去获取锁。
- 这里的大致流程就是让头节点的下一个节点变成头节点,然后当前的头结点的Thread属性会变成null,因为当前线程就是这个Thread了,所以就不用保存这个线程了。旧的头节点会变为null,方便gc。
整个加锁的流程大致就是这个样子的了,其实我们回味一下lock的代码,写的真的是非常非常的简洁和有趣,很巧妙的用循环和一些逻辑判断简化了整个代码,不得不说Doug Lea实在太厉害了。
AQS
- 这里对AQS做一个简单的描述:AQS是一个用来自定义锁的框架,AQS的底层就是使用了一个虚拟双向队列和一个State变量来完成加锁操作的。虚拟双向队列的含义就是这个队列不是集合里面的队列,而是用一个Node节点,里面包含pre指针与next指针实现的一个队列。如果队列中的某个节点把state变量进行了修改,就可以视为这个节点持有了锁。
- 显而易见的lock的底层就是AQS了,我们上面分析lock的源码的思想就和AQS的思想是一样的,而且我们也看到了lock其实继承了AQS。
- 我们其实也可以自己继承AQS的类,然后自己实现一个自定义的锁,这里大家可以自行尝试。
解锁操作
同样进入到unlock里面
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- tryRelease(arg)就是在对当前线程进行解锁,如果解锁成功,则判断队列里面是否有线程了,如果有线程则唤起下一个线程
- waitStatus这个属性其实如果为0则表示线程处于活跃状态,其他值都表示阻塞、取消等状态。
我们看看tryRelease(arg)
protected final boolean tryRelease(int releases) {
//取出当前线程的state变量,减去releases
int c = getState() - releases;
//如果当前线程不是持有锁的线程,抛出异常解锁失败
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//c == 0表示这个可重入锁已经把全部锁都解掉了
if (c == 0) {
free = true;
//把持有锁的线程设置为null
setExclusiveOwnerThread(null);
}
//把c写回到state里面
setState(c);
//如果全部锁都解完了,即c == 0,返回true,反之返回false
return free;
}
这个类的方法还是很好理解的,大家仔细看我写的注释就能明白这个类的意思了。
继续看一下unparkSuccessor(h)
private void unparkSuccessor(Node node) {
//获取Node中的waitStatus,就是在判断node中的线程的状态
int ws = node.waitStatus;
//小于0就用cas操作改为0,因为现在就在操作这个线程,所以状态肯定是活跃的
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//获取下一node中下一个节点
Node s = node.next;
//waitStatus > 0表示这个线程被取消了
if (s == null || s.waitStatus > 0) {
s = null;
//因为s == null 与线程被取消都表示这个线程已经没了
//所以从尾节点到头结点重新遍历找出一个是等待状态或者活跃状态的线程
for (Node t = tail; t != null && t != node; t = t.prev)
//waitStatus < 0 表示在等待,waitStatus == 0表示活跃
if (t.waitStatus <= 0)
s = t;
}
//如果s线程被取得了,就解阻塞
if (s != null)
LockSupport.unpark(s.thread);
}
这个方法的作用就是从队列中重新找出一个在等待状态或者活跃状态的线程重新竞争锁
ENDING
- 好了,现在Lock中的加锁解锁都已经讲完了。希望大家能够对加锁解锁操作能有一个深入的理解,同时领悟一下Doug Lea写的代码的魅力。
- Lock在面试中也是非常常见的一个知识点,如果能把源码流程过一面,任凭面试官怎么问,你都可以通过源码的一些思想去解答面试官的问题,绝对可以让面试官对你刮目相看~
本次分享就到这里结束了,如果你喜欢就点个赞吧~
往期推荐
本文地址:https://blog.csdn.net/qq_42832874/article/details/111666684
推荐阅读