解读可重入锁——ReentrantLock&AQS,java8
一 , ReentrantLock简介 |
首先我们可以查看ReentrantLock类,实现了Lock接口,其内部类都直接或者间接的继承了AQS类,则AQS肯定与ReentrantLock关系密切
我们来个demo,让大家更直接的了解重入锁: |
二, 构造器 |
MyReentrantLock是为了写自己理解的注释,改了一下类名
/**
* ReentrantLock默认选择的就是不公平锁
*/
public MyReentrantLock() {
sync = new MyReentrantLock.NonfairSync();
}
/**
* 给我正义的数,我就是公平锁,不然还是不公平锁
* @param fair true 公平锁 false非公平锁
*/
public MyReentrantLock(boolean fair) {
sync = fair ? new MyReentrantLock.FairSync() : new
MyReentrantLock.NonfairSync();
}
三,lock 方法 |
lock方法封装在Syn抽象类中
公平锁和非公平锁类,都为lock做了重写
3 .1 公平锁的 lock |
lock方法中就一个获取锁的方法,还是借用AQS去获取方法
3.1.1 AQS的 acquire 实现 |
首先尝试获取锁,然后再考虑要不要进入阻塞队列
/**
* 首先调用该方法的线程,去尝试获取锁资源
* 如果获取到锁,给个标记,当前执行的线程就是我的线程,state+1
* 如果没有获取到锁,该线程写入阻塞队列(面壁思过,等下再来叫你)
*
* 怎样才能成功获取锁呢?
* 1,当前锁空闲,且同一时刻也没有竞争
* 2,这个锁本来就是由当前线程持有,就是重入
*
* @param arg 调用方写死的量为1
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(MyAbstractQueueSync.Node.EXCLUSIVE), arg))
selfInterrupt();
}
3.1.2 公平锁尝试获取锁 tryAcquire尝试获取锁,是公平锁自己实现 |
/**
* 尝试直接获取锁
* @param acquires
* @return true 获取到锁,flase,没有获取到锁
*/
protected final boolean tryAcquire(int acquires) {
//获取当前线程引用
final Thread current = Thread.currentThread();
//获取当前线程锁的状态
//第一个获取到锁 c == 0
//第i次重入 c == i
int c = getState();
if (c == 0) {//第一次尝试直接获取锁
//由于是公平锁,要讲究先来后到
//首先判断阻塞队列中有没有排在它前面的线程
// 因为有可能是上一个持有锁的线程刚刚释放锁, 队列里的线程还没来得及争抢, 本线程就乱入了
//
//判断阻塞队列中没有其它线程
//compareAndSetState(0, acquires)
// 如果没有线程在等待,那就用CAS尝试一下,成功了就获取到锁了,
// 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了(到嘴的鸭子飞了)
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//获取到了锁,标记,,告诉大家,当前线程获取到锁了
setExclusiveOwnerThread(current);
return true;
}
}
//这个锁被人占了,确认一下是不是自己现在占着这个锁
//是自己占着这个锁,且c!=0了,也就是重入了
else if (current == getExclusiveOwnerThread()) {
//acquires = 1 c>=1
int nextc = c + acquires;
if (nextc < 0)//这个线程占有这个锁的次数太多了,使int溢出了,抛出异常
throw new Error("Maximum lock count exceeded");
//更新锁的状态为2,就是告诉别人,它获取到这个锁2次了
//state为几就是这个线程获取到这个锁几次
setState(nextc);
return true;
}
return false;
}
}
3 .2 非公平锁的 lock |
非公平锁,就是很流氓的,想获取锁的时候就直接去获取,不排队。但是直接获取不到,又像其它人一样老实的遵守步骤了。
/**
* 试着直接执行获取锁,失败了之后就老实的去尝试获取锁
*/
final void lock() {
if (compareAndSetState(0, 1))//直接尝试cas操作获取锁,且获取成功了
//获取到了锁,标记,,告诉大家,当前线程获取到锁了
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
四, 将线程加入阻塞队列 |
将线程加入阻塞队列,我们得知道该阻塞队列结构是什么样的
AQS的Node类的属性有:
//线程的等待状态,执行的时候,状态为-1,等待时状态为0
volatile int waitStatus;
//上一个发起过获取锁的线程
volatile MyAbstractQueueSync.Node prev;
//下一个发起过获取锁的线程,,或者是空
volatile MyAbstractQueueSync.Node next;
//发起获取锁的线程
volatile Thread thread;
//下个等待线程
MyAbstractQueueSync.Node nextWaiter;
队列中全部都是等待的线程
如果头线程开始执行了:
等待状态发生改变,切该线程有了得到锁的标识,head指向队列中的持有锁线程,tail指向队列尾。释放锁之后,等待状态又回到0。这样依次类推。
acquireQueued(addWaiter(MyAbstractQueueSync.Node.EXCLUSIVE), arg)
在尝试获取锁失败的时候,这句话就是将线程加入阻塞队列的调用
我们先看addWaiter方法:
/**
* 在队列中添加节点
* @param mode
* @return 添加的节点
*/
private MyAbstractQueueSync.Node addWaiter(MyAbstractQueueSync.Node mode) {
MyAbstractQueueSync.Node node = new MyAbstractQueueSync.Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//获取队尾的引用
MyAbstractQueueSync.Node pred = tail;
//如果阻塞队列不为空,队尾有指向的引用
if (pred != null) {
node.prev = pred;
//在队列的尾部插入线程
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//初始化队列,如果队列是空,有队列需要加入的时候
//就 需要初始化队列,如果使用不上队列的时候,
// 就不需要引入队列
enq(node);
return node;
}
在看初始化以及争抢入队方法 enq:
/**
* 队列初始化
* cas设置tail,直到争抢成功
* @param node
* @return
*/
private MyAbstractQueueSync.Node enq(final MyAbstractQueueSync.Node node) {
for (;;) {
MyAbstractQueueSync.Node t = tail;
//最开始的时候,队列为空,必定进入if
if (t == null) { // Must initialize
//cas来设置head和tail
if (compareAndSetHead(new MyAbstractQueueSync.Node()))
tail = head;
} else {
// 争抢入队, 没抢到就继续for循环迭代.抢成功了就可以return了,不然一直循环.
// 为什么是用cas来争抢呢? 因为怕是多个线程一起执行到这里
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
然后在回到 acquireQueued 方法:
/**
* 把node入队
* @param node 刚刚插入队尾的节点
* @param arg
* @return
*/
final boolean acquireQueued(final MyAbstractQueueSync.Node node, int arg) {
//设置入队是否获取锁成功标志
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取节点的前驱
final MyAbstractQueueSync.Node p = node.predecessor();
//如果node前驱节点恰好是head
//那么就可以在次尝试获取一次锁
//队首节点很乐观,因为确实很可能马上轮到他来获取锁
if (p == head && tryAcquire(arg)) {
//很幸运的是,它居然再一次的尝试成功获取了锁
//那head自然就要指到它的头上
setHead(node);
//那个刚刚那个线程,我帮你继续使用锁
//我也继续帮你GC(好人做到底)
p.next = null; // help GC
//肯定是获取成功了啊
failed = false;
//全村的希望,让大家终于摆脱死循环
return interrupted;
}
//不是head的后一个,或者是head的后一个,但是没有获取到锁,
// 就到了这个if
//获取锁失败了,是否要阻塞这个线程
if (shouldParkAfterFailedAcquire(p, node) &&
//在这阻塞,等待唤醒
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//发现之前哪里出现了异常,就执行
if (failed)
cancelAcquire(node);
}
}
我们在一层一层的扒下去,看看 shouldParkAfterFailedAcquire 是如果判断线程是否需要阻塞:
注意:我之前在图示队列结构的时候,表述了执行的线程,waitStatus为-1,但是在这个情况,阻塞的线程waitStatus也是-1,要分请waitStatus为-1时,线程的状态
/**
* 是否要阻塞该线程
* @param pred node的前驱
* @param node node
* @return
*/
private static boolean shouldParkAfterFailedAcquire(MyAbstractQueueSync.Node pred, MyAbstractQueueSync.Node node) {
//获取前驱的等待状态
int ws = pred.waitStatus;
//如果线程前驱的等待状态为-1,即在执行状态,那么就阻塞node
if (ws == MyAbstractQueueSync.Node.SIGNAL)
return true;
//如果线程前驱的等待状态大于0,即是1 CANCELLED 就把前面的等待状态为1的删了
//删到直到不为1
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//ws只有1,0,-1,-2,-3
//1,-1处理了,前面没有对waitStatus做操作
//那么只剩0了,前驱的节点的ws设置为-1
//在之前图表示的时候,为-1是执行状态,但这种状态是阻塞状态
// 直到前一个线程释放了锁,才能使执行状态
compareAndSetWaitStatus(pred, ws, MyAbstractQueueSync.Node.SIGNAL);
}
return false;
}
在 parkAndCheckInterrupt 中:
LockSupport.park(this)会挂起当前线程. 但是LockSupport.park
还有一个隐藏功能. 就是, 如果先对一个线程unpark, 再对这个线程park,
那么这次的park是失效的. 下一次park才会挂起.
原因就是, 对一个没有被park的线程进行unpark的时候, 会把标志位
perm置为1. 而每次park的操作, 都是先去检查perm是否为1.
如果是1, 那么置为0, 并且这次不挂起.
如果perm为0, 那么就直接挂起这个线程.
*如果线程被阻塞过,返回true
* @return
*/
private final boolean parkAndCheckInterrupt() {
/**
* 挂起当前线程
*/
LockSupport.park(this);
return Thread.interrupted();
}
五,释放锁 unlock |
释放锁是调用Sync的release
public void unlock() {
sync.release(1);
}
5.1 释放锁 release |
/**
* 释放锁
* 记录锁状态的计数器-1
* 状态为0,就是彻底释放
* @param arg
* @return true 释放成功,false,释放失败
*/
public final boolean release(int arg) {
//首先尝试释放锁
if (tryRelease(arg)) {
//只有一层lock unlock结构,释放锁成功后,
// 或者独占锁完全释放
//其实就是这个线程把所有的lock都unlock了
MyAbstractQueueSync.Node h = head;
// 唤醒阻塞队列中的下一个等待的线程
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
5.1.1 尝试释放锁 tryRelease |
尝试释放锁是去掉一层lock unlock,如果是最后一层,给个标记,并且唤醒阻塞队列的下一个等待线程
/**
* 公平锁和非公平锁尝试释放锁
* @param releases
* @return
*/
protected final boolean tryRelease(int releases) {
//将线程持有锁的状态-1
int c = getState() - releases;
//当前线程是不是获取到锁的线程
//是别的线程就抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//用于标记是否可以完全释放锁
boolean free = false;
//如果状态是0,就可以完全释放锁了
//是重入锁,这也是释放的最后的一把锁了
//不是重入锁,就直接释放了
if (c == 0) {
//表示完全释放
free = true;
//设置持有该锁的线程没有
setExclusiveOwnerThread(null);
}
//不是最后一层锁,就释放一层
setState(c);
return free;
}
5.2.2 唤醒队列的挂起线程 unparkSuccessor |
/**
* 唤醒node
* @param node 这个node是head节点
*/
private void unparkSuccessor(MyAbstractQueueSync.Node node) {
//获取等待状态
int ws = node.waitStatus;
//如果ws<0,就将其变为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//获取head的后继节点
MyAbstractQueueSync.Node s = node.next;
//如果head的后继节点不存在,或者ws为1
if (s == null || s.waitStatus > 0) {
//head后继节点指向null
s = null;
for (MyAbstractQueueSync.Node t = tail; t != null && t != node; t = t.prev)
//就从队列的后面往前面找,找到最前面一个ws<0的,但是又不是head的节点
if (t.waitStatus <= 0)
s = t;
}
//就把该节点唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
六,总结 |
看完源码之后:
- Reentrant的实现和AQS密切分不开,使用到的阻塞队列,获取锁的方式都依赖于AQS
- 在开发时也可以视情况而选择锁,lock锁在适当的时候会更优于synchronized