ReentrantLock源码分析
程序员文章站
2022-05-04 20:13:00
...
ReentrantLock基本结构
ReentrantLock中有3个内部类,这个三个内部类的关系为
FairSync 是公平锁的主要实现类,NonfairSync 是非公平锁的主要实现类,默认为非公平锁的实现。
非公平锁加锁过程
非公平锁的加锁方法,加锁默认为非公平锁
ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();
1、构造方法中没有任何参数的时候,sync 默认为非公平锁的实现。
这是NonfairSync中实现lock方法
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//没有lock的状态是0 ,AQS将其设置为1 ,变成有锁状态,
//调用父类中的的AbstractQueuedSynchronizer中的方法
if (compareAndSetState(0, 1))
//调用父类AbstractOwnableSynchronizer的方法,设置为独占锁
setExclusiveOwnerThread(Thread.currentThread());
else
//如何获取锁事变,则调用acquire 去加锁1次acquire逻辑(调用父类AbstractQueuedSynchronizer)
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/************************这里是NonfairSync的父类AbstractQueuedSynchronizer中的方法**************************/
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
//这个是NonfairSync 调用 compareAndSetState,如果设置为加锁状态则为true,如果非加锁状态为false
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
NonfairSync 中acquire逻辑(调用父类AbstractQueuedSynchronizer)
public final void acquire(int arg) {
//tryAcquire为本身子类NonfairSync 所重写, 也是让新来的线程进行第二次插队的机会
//如果再次获取锁还不成功才会放到队列,放到队列里面acquireQueued()
if (!tryAcquire(arg) &&
//
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//NonfairSync 重写的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
//调用ReentrantLock 中 nonfairTryAcquire 方法
return nonfairTryAcquire(acquires);
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
//ReentrantLock 中 nonfairTryAcquire 方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//c==0 代表目前没有线程获取有锁
if (c == 0) {
//如果没有线程获取锁,则当前线程CAS获取锁。并设置自己为当前锁的独占线程
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前锁就是该线程,这将设置状态+acquires,也就是加1,这也是lock允许重入锁的原因
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//设置当前锁的数量
setState(nextc);
return true;
}
//当前有锁或者获取锁失败,且不是当前线程 返回false
return false;
}
当前面nonfairTryAcquire第二次获取锁失败,进入acquire 方法中addWaiter 方法和 acquireQueued 方法
/** Marker to indicate a node is waiting in exclusive mode */
//Node.EXCLUSIVE 为null表示这是独占锁,如果为读写锁,那就是 共享模式(shared)。
static final Node EXCLUSIVE = null;
//该 mode 为Node.EXCLUSIVE
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//定义pred为指针指向tail
Node pred = tail;
// pred 尾指针不为null,即队列不为空
if (pred != null) {
node.prev = pred;
//快速CAS将自己设为新的tail,如果tailOffset 没有改变,既pred还是队尾,没有node改变过他
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果队列为空, 则调用enq强制入队
//或者CAS设置失败,说明在其它线程入队节点争抢了tail,则此线也是调用enq强制入队
enq(node);
return node;
}
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
//一直在for循环中一直设置,该节点为尾节点,直到成功为止
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 判断队列是否为空,空队列应该先初始化头节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
设置当前节点为尾节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
当二次抢锁失败之后,将节点设置尾队列末尾之后,将进入acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//之前步骤已经将该节点放置在末尾了
final Node p = node.predecessor();
//如果该节点为最开始有效节点(头结点为初始的节点),再进行一次获取锁
if (p == head && tryAcquire(arg)) {
//如果设置成功,将该节点设置为头节点
setHead(node);
p.next = null; // help GC
failed = false;
//如果第一次循环就获取成功那么返回的interrupted是false,不需要自我中断。
//否则 说明在获取到同步状态之前经历过挂起(返回true)
return interrupted;
}
//走到这里说明前置节点不为head或者抢锁失败了
//判断当前node线程是否可挂起,是,就调用parkAndCheckInterrupt挂起 ,i
//nterrupted设置为true,表示曾经被中断过
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//如果tryAcquire出现异常那么取消当前结点的获取(该tryAcquire为NonfairSync实现的)
cancelAcquire(node);
}
}
/*******************node节点的等待状态*********************************/
/** waitStatus value to indicate thread has cancelled */
// 由于在同步队列中等待线程等待超时或者被中断,需要从同步队列中取消等待
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
//后继节点为等待状态,当且节点线程如果释放同步状态或者被取消
//,将后继节点的线程得以运行
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
//节点在等待队列中,其他线程对condition调用signal(),该节点会从等待队列转移到同步队列中,胶乳到对同步状态的获取中
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
//表示下一次桐乡式同状态会被传播下去
static final int PROPAGATE = -3;
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 如果前置结点是SIGNAL状态, 那么当前置结点执行完成后是可以唤醒后续结点的,
// 此时可以安全的挂起当前结点, 不需要进行不必要的for(;;),前置结点自然会通知。
return true;
// 如果ws>0说明前置结点的状态为1,是被自己取消获取同步的结点(只有线程本身可以取消自己)。
//那么do while循环一直往头结点方向找waitStatus < 0的节点;
//含义就是去除了FIFO队列中的已经自我取消申请同步状态的线程。
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
//一直找到为取消同步状态的线程
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//如果是其它状态waitStatus要不是0或着PROPAGATE,意味着当前结点需要一个signal但是尚未park,
// 所以调用者必须确保在挂起之前不能获取同步状态。
// 并强行设置前置结点为SIGNAL。之所以CAS设置是因为,pred线程也会操作cancelAcquire来取消自己和node线程对pred的操作产生竞争条件。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
下图为非公平锁的获取流程图:
公平锁加锁过程
公平锁在构造方法中设置为true
ReentrantLock reentrantLock = new ReentrantLock(true);
reentrantLock.lock();
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
//与非公平锁一样,不一样在于公平锁与非公平锁,中的tryAcquire方法
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//为无锁状态
if (c == 0) {
////拿到当前的同步状态, 如果是无锁状态, 则进行hasQueuedPredecessors方法逻辑
//逻辑含义是:当前队列为空或本身是同步队列中的头结点。如果满足条件CAS获取同步状态,并设置当前独占线程。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
//只有 h !=t 则当前队列不为空,且 s.thread != Thread.currentThread() 当前线程不是头线程才返回true
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
公平锁与非公平锁的加锁区别
1、非公平锁在一开始lock的时候compareAndSetState(0, 1) 进行一次抢锁操作,当加锁失败之后后面抢锁入队等操作。
2、在重新tryAcquire 方法中,非公平锁直接compareAndSetState(0, 1) 进行一次抢锁操作,而公平锁,需要判断队列里面没有等待加锁线程或者自己是头结点才开始抢锁。
与公平锁相比,非公平锁做的 1、在没入队的时候拿到锁,避免过多队列操作维护成本。
2、尽量在睡眠前拿到锁,避免过多上下文切换
释放锁
由前面的代码可知,释放锁NonfairSync和FairSync都没重写,他们释放锁的逻辑是由ReentrantLock写的。故他们释放锁的逻辑是相同的。
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 当前这个头结点不一定是同步队列中的头结点
// 因为可能是非公平锁的释放,不需要插入队列就特么插队获得同步了, 总之就是拿到头结点了。
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//释放锁的逻辑(该线程本身只能操作自己的锁,所有该方法事前)
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果锁已经被释放完毕了,则清空该线程读独占线程
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//如果该线程还有锁没有被释放,则更新当前锁次数
setState(c);
return free;
}