ReentrantLock原理分析
ReentrantLock原理分析
1. ReentrantLock 基本使用和概念
1.1 ReentrantLock是什么
- ReentrantLock是一种可重入锁,即获得锁的线程可重复获得锁,同sychronized一致
- ReentrantLock相比sychronized使用更灵活,且可对多个资源进行等待唤醒操作,可以引入自定义的调度算法
1.2 ReentrantLock 基本API使用
void lock(); //获取锁
void unlock(); //放开锁
void tryLock(); //尝试获得锁
void newCondition(); //获得condition 用于等待唤醒操作
void getHoldCount(); //当前被持有的数量
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestReentrantApi {
public static void main(String[] args) throws InterruptedException {
ReentrantLock reentrantLock = new ReentrantLock();
System.out.println("开始测试获取锁和释放锁");
testLock(reentrantLock);
System.out.println("开始测试condition");
testCondition(reentrantLock);
}
public static void testLock(ReentrantLock reentrantLock) throws InterruptedException {
Thread thread = new Thread("线程0"){
@Override
public void run() {
reentrantLock.lock();
try{
System.out.println("线程0获得锁1");
TimeUnit.SECONDS.sleep(1);
reentrantLock.lock();
try {
System.out.println("线程0再次获得锁2");
TimeUnit.SECONDS.sleep(1);
}finally {
reentrantLock.unlock();
System.out.println("线程0释放锁2");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
System.out.println("线程0释放锁1");
}
}
};
Thread thread1 = new Thread("线程1"){
public void run() {
reentrantLock.lock();
try{
System.out.println("线程1获得锁");
} finally {
reentrantLock.unlock();
}
}
};
thread.start();
thread1.start();
thread.join();
thread1.join();
}
public static void testCondition(ReentrantLock reentrantLock){
Condition condition = reentrantLock.newCondition();
Condition condition1 = reentrantLock.newCondition();
Thread thread = new Thread("线程0"){
@Override
public void run() {
reentrantLock.lock();
System.out.println("线程0获得锁");
try{
System.out.println("线程0进入await");
condition.await();
//condition1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
};
Thread thread1 = new Thread("线程1"){
@Override
public void run() {
reentrantLock.lock();
System.out.println("线程1获得锁");
try{
//condition1.await();
condition.signal();
System.out.println("线程1唤醒线程0");
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
};
thread.start();
thread1.start();
}
}
打印结果如下:
开始测试获取锁和释放锁
线程0获得锁1
线程0再次获得锁2
线程0释放锁2
线程0释放锁1
线程1获得锁
开始测试condition
线程0获得锁
线程0进入await
线程1获得锁
线程1唤醒线程0
如果把testCondition中的注释打开会出现什么情况呢?这里给出答案,没有其他线程干扰情况下,这两个线程将产生’死锁’(程序永远无法停止),还请读者自己进行分析
2. ReentrantLock加锁原理
2.1 查看lock和unlock源码发现其中的共同点
private final Sync sync;
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
public void unlock() {
sync.release(1);
}
发现lock和unlock调用的都是成员变量sync的方法,其中加锁为lock,解锁为release,sync为ReentrantLock的成员变量,其值在构造函数中赋值,复制分为公平锁和非公平锁,那sync为何方神圣,能够进行加锁和解锁呢,还支持锁重入。Sync类继承关系如下:
FairSync,NonfairSync -->Sync —>AbstractQueuedSychronizer—>AbstractOwnableSychronizer
–>表示继承
2.2 lock的执行过程
lock流程如下:
sync.lock —>FairSync,NorfairSync.lock —>AbstractQueuedSychronizer.acquire()—>sync.tryAcquire()—>
AbstractQueuedSychronizer.addWaiter–>AbstractQueuedSychronizer.acquireQueued
下面通过源码分析各个方法的作用
- fairSync或者NorfairSync的lock方法:
//NorfairSync
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//fairSync
final void lock() {
acquire(1);
}
可以发现NorfairSync比fairSync多了一个cas过程,此cas过程操作的意义是获取当前对象的state值,如果该值为0,将其改为1,如果cas操作成功,则该线程获得锁,这种情况会造成各个线程的不公平性,即获得锁的条件不公平,不公平体现在于有些线程在执行这一步操作成功,有些则会失败,这个结果有时不受控制
然后调用acquire方法
- acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
此方法执行流程为首先调用tryAcquire(arg)方法,如果成功,则不执行下面的acquireQueued,addWaiter,selfInterrupt方法,tryAcquire方法为尝试获得锁(方法来源于AbstractQueuedSychronizer中,子类需要进行重写)
- tryAcquire
//此方法在公平锁和非公平锁中有不同的实现
//非公平锁
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//公平锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
获得锁的方式为获取当前的线程和state值,如果state值为0,非公平锁会直接进行cas操作,如果成功则获得锁,并将持有锁线程设置为当前线程,而非公平锁会判断当前FIFO队列元素是否为空或者元素是否为头元素后的第一个元素,满足条件则进行cas操作获得锁,当state≠0时,判断当前持有锁线程是否为当前线程,是则state+1,并且成功获得锁,否则获得锁失败(此处为支持锁重入的实现)
-
addWaiter
如果tryAcquire获得锁失败,则调用此方法
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
此方法为封装当前线程增加到队列尾部,首先判断当前队列尾是否为空,部位空,通过cas操作将此线程的Node变为队列尾,cas失败则调用enq方法
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
enq方法为通过死循环将node节点加入到队列尾部,直至增加成功,返回node
-
acquireQueued
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
此方法在addWaiter将node加入到队列尾部之后进行自旋获得锁,方式为判断当前node是否为head的后继节点,是则进行尝试获得锁,获得锁成功,将此node设置为头节点,并清除线程和prev节点信息,并将之前的头部节点引用删除,有助于被回收算法发现(可达性分析算法),然后返回,不进行等待,否则进入到等待阶段,此阶段为LockSupport的park方法调用。当被unpark唤醒时,重复此操作
2.3 unlock的执行过程
sync.release(1)–>tryRelease–>unparkSuccessor()–>LockSupport.unpark(s.thread);
-
release
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
此方法为释放锁,首先执行tryRelease方法,如果成功,如果还有等待的线程,则唤醒等待的线程
-
tryRelease
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
此方法为减少并设置当前AQS的state值,如果state值为0,表示当前线程不再持有锁,从判断当前线程是否为持有锁线程时,如果不时,会抛出异常(lock和unlock的成对出现的原理。如果没有lock则会抛异常,如果没有unlock,其他线程无法获得锁)
-
unpark
唤醒等待的线程
2.4加锁原理总结
reentrantLock 为实现加锁功能,通过引入成员变量sync,此变量为AQS(AbstractQueuedSychronizer)的子类,并实现其中的tryAcquire和tryRealease方法,并且使用了park和unpark的唤醒机制,使用acquire方法进行lock,使用release方法进行unlock,此处有一个没有讲到,就是AQS中的等待队列的实现没有讲到(后续博客)