可重入锁ReentrantLock,lock/unlock源码解读
程序员文章站
2022-05-05 09:41:54
...
我们知道可重入锁,是指当前线程加锁的对象,本线程可以再次访问。ReentrantLock 和synchronized 都是 可重入锁。
看下Doug Lea大神写的注释:
1、ReentrantLock的整体结构
首先看一下ReentrantLock类的整体组成:
两个私有变量:serialVersionUID不用多说,序列化,sync是Sync类型的变量;
再看一下Sync是一个内部抽象类,有两个内部类同时继承自此:FairSync和NotfairSync,这和注释上写的ReentrantLock能实现公平锁和非公平锁功能相对应,等会会详细解读两者的实现方式;
然后就是构造器、加解锁、其他方法和对象的属性等,等会看一些重要的实现方法。
2、ReentrantLock构造方法和公平/非公平锁的实现方式
无参构造器,默认创建的是非公平的可重入锁实例:
SYNC的类图,可以看出队列是通过AQS实现的:
3、lock的实现
先看一下方法注释,了解下加锁流程:
再看acquire(1)的源码:
先看 addWaiter(Node.EXCLUSIVE)
然后看下acquireQueued方法
相比加锁,ReentrantLock的解锁实现要简单一点,先上源码:
下面看下具体实现方法:
看下Doug Lea大神写的注释:
* <p>A {@code ReentrantLock} is <em>owned</em> by the thread last
* successfully locking, but not yet unlocking it. A thread invoking
* {@code lock} will return, successfully acquiring the lock, when
* the lock is not owned by another thread. The method will return
* immediately if the current thread already owns the lock. This can
* be checked using methods {@link #isHeldByCurrentThread}, and {@link
* #getHoldCount}.
ReentrantLock的拥有者是上一次lock的线程
* <p>It is recommended practice to <em>always</em> immediately
* follow a call to {@code lock} with a {@code try} block, most
* typically in a before/after construction such as:
*
推荐在lock()后使用try代码块,并在finally代码块中进行unlock();1、ReentrantLock的整体结构
首先看一下ReentrantLock类的整体组成:
两个私有变量:serialVersionUID不用多说,序列化,sync是Sync类型的变量;
再看一下Sync是一个内部抽象类,有两个内部类同时继承自此:FairSync和NotfairSync,这和注释上写的ReentrantLock能实现公平锁和非公平锁功能相对应,等会会详细解读两者的实现方式;
然后就是构造器、加解锁、其他方法和对象的属性等,等会看一些重要的实现方法。
2、ReentrantLock构造方法和公平/非公平锁的实现方式
无参构造器,默认创建的是非公平的可重入锁实例:
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
带参构造器(boolean)public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
创建对象时即给sync赋值,后续的加锁解锁获取锁状态,都是由sync进行管理。SYNC的类图,可以看出队列是通过AQS实现的:
3、lock的实现
先看一下方法注释,了解下加锁流程:
/**
* Acquires the lock.
*
* <p>Acquires the lock if it is not held by another thread and returns
* immediately, setting the lock hold count to one.
*
* <p>If the current thread already holds the lock then the hold
* count is incremented by one and the method returns immediately.
*
* <p>If the lock is held by another thread then the
* current thread becomes disabled for thread scheduling
* purposes and lies dormant until the lock has been acquired,
* at which time the lock hold count is set to one.
* 如果获取锁时未被其他线程持有,则立即返回且设置持有数=1
* 如果当前线程已持有该锁,则立即返回,且持有数+1;
* 如果所被其他线程持有,则进入等待队列;
*/
public void lock() {
sync.lock();
}
(1)非公平锁的加锁实现方式/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
可以看到非公平锁的核心思想还是CAS,将stateOffset从0更新为1;此处的stateOffset其实就是AbstractQueuedSynchronizer 类中的state私有变量(上面说过队列是AQS控制的); /**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
如果CAS通过,说明当前锁并未被任何线程占用,则
AbstractOwnableSynchronizer.exclusiveOwnerThread=Thread.currentThread()
再看acquire(1)的源码:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
非公平锁中tryAcquire的实现,主要是再次判断是否有线程占据lock(CAS),如果当前线程已持有该锁,则state+1并立即返回true:/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//再次检查state是否等于0,即是否有线程占有锁
int c = getState();
if (c == 0) {
//和前一步CAS一样
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//锁被当前线程占有
else if (current == getExclusiveOwnerThread()) {
//state+1,更新state
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如果tryAcquire(1)返回false,即已有其他线程持有当前锁,则进行下一步:acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 这一步其实是AQS的实现;先看 addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果尾节点不为空,更新尾节点为插入的数据,并更新prev指向原尾节点
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果原tail为空或者上一步CAS更新失败,则在这边执行插入节点
enq(node);
return node;
}
可以看到队列是很经典的链表实现,所以具体的算法实现就不再赘述了。要等待的线程封装成Node形式的对象,包含头尾指针;至此等待队列构建完成。然后看下acquireQueued方法
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
//传入的是上一步addWaiter新增的Node节点
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取传入node的prev节点
final Node p = node.predecessor();
//如果P是头结点,即等待队列中的第一个,则再次执行tryAcquire方法,即请求node成功时返回true(具体处理见上方代码)
if (p == head && tryAcquire(arg)) {
//node位置前移
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//是否挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
至此,非公平锁的lock方法解析完成,而公平锁与之最大的不同,就是在tryAcquire这一方法中多了一步检测当前线程是否是head,保证先进先出:/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//检测当前线程是否是队列中第一位
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
4、unlock的实现相比加锁,ReentrantLock的解锁实现要简单一点,先上源码:
/**
* Attempts to release this lock.
*
* <p>If the current thread is the holder of this lock then the hold
* count is decremented. If the hold count is now zero then the lock
* is released. If the current thread is not the holder of this
* lock then {@link IllegalMonitorStateException} is thrown.
*
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/
public void unlock() {
sync.release(1);
}
注释简单明了:解锁时当前锁的持有数减少,如果持有数=0,则锁被释放;如果当前线程不是锁的持有线程则抛异常。下面看下具体实现方法:
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {//释放锁,state-1,为0时返回true
Node h = head;
if (h != null && h.waitStatus != 0) //头结点不为空,且不为等待状态
unparkSuccessor(h);//调用Unsafe.unpack方法,使当前线程可用,此线程在lock时调用过parkAndCheckInterrupt()方法进行pack处理
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //当前state-1
if (Thread.currentThread() != getExclusiveOwnerThread()) //是否是当前线程
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //直至state==0
free = true; //标志位,等于true表示当前锁空闲
setExclusiveOwnerThread(null);
}
setState(c);//更新state
return free;
}
到此为止,ReentrantLock的加解锁源码分析完成,总结一下:ReentrantLock的加锁实际上是通过Unsafe.pack挂起,解锁是Unsafe.unpack请求执行,而管理获取锁的多个线程是通过CLH队列来实现。
后续有时间的话再更新其他方法的源码解读吧。
下一篇: java并发编程的艺术