Java并发编程之可重入锁ReentrantLock
ReentrantLock简介
ReentrantLock是一种可重入的独占锁。ReentrantLock构造方法:
//默认构建非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//传入公平参数,构建公平锁/非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
从构造方法可知,ReentrantLock支持公平锁FairSync和非公平锁NonfairSync。默认情况下,创建ReentrantLock实例会得到一个非公平锁。其锁的功能由内部类Sync完成,Sync扩展自AQS,并依赖AQS的基础行为。Sync的抽象方法lock()由子类FairSync和NonfairSync各自实现。先以NonfairSync为例介绍非公平锁的加锁和解锁原理。
非公平锁
NonfairSync是ReentrantLock的非公平锁实现,它主要实现Sync中定义的抽象方法lock()和AQS中未实现的抽象方法tryAcquire。
加锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
//执行锁定
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//尝试获取锁
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
加锁时,首先通过CAS操作将状态从0修改为1,如果修改成功,则修改锁的持有者为当前线程,加锁成功。
什么是CAS?这里简单说明下:
CAS 指的是现代 CPU 广泛支持的一种对内存中的共享数据进行操作的一种特殊指令。这个指令会对内存中的共享数据做原子的读写操作。CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则返回V。
如果修改状态失败,说明锁的状态不为0,仍被其他线程持有。调用acquire(1),acquire在AQS中提供了默认实现:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
调用tryAcquire()尝试获取锁,tryAcquire()调用nonfairTryAcquire方法,进入源码:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果锁状态为0,则尝试锁定,成功返回true
//这里是为了在并发条件下增加获取锁的可能性;若刚好之前持有锁的线程已经释放锁,则当前线程可再次尝试
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}//否则,如果当前线程为锁的持有者,则state+acquires,这里即体现了ReentrantLock的可重入性
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如果此次尝试成功获取锁,直接返回。否则获取锁失败,将当前线程加入等待队列。ReentrantLock是可重入的独占锁,因此以独占模式在链表中添加等待节点。
private Node addWaiter(Node mode) {
//创建当前结点
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//tail节点不空,则使用CAS操作将node添加到链表尾部,成功则返回;失败则进入enq
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//tail节点为空,调用enq初始化链表
//或节点加入链表尾部CAS操作失败,说明当前存在竞争
enq(node);
return node;
}
private Node enq(final Node node) {
//由于这里存在多线程并发问题,使用自旋保证node能够添加到链表中,因此enq本身是线程安全的
for (;;) {
Node t = tail;
//再次判断tail指向的节点是否为空,之所以再次判断,可能有其他线程已经初始化链表
if (t == null) {
//多线程并发时,只有一个线程执行compareAndSetHead返回成功,执行失败的线程将进入else代码块
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//这里同样存在多线程并发,一次只会有一个线程成功,失败的线程将进入下一次循环,直至成功
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter保证将节点加入等待队列。若队列非空,首先会尝试一次将节点插入队列,若成功则无需进入自旋代码块。通常情况下,没有竞争时,无需自旋即可完成。进入自旋通常是因为CAS操作竞争比较激烈。当前节点加入等待队列之后,进入acquireQueued等待挂起。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取当前结点的前驱节点
final Node p = node.predecessor();
//前驱节点是head,表示当前节点之前已无节点在等待锁,再次尝试获取锁
//这里不会和等待队列中的其他线程发生锁竞争,但会和尝试获取锁且尚未进入等待队列的线程发生竞争
if (p == head && tryAcquire(arg)) {
//获取锁成功,则设置node为新的head
setHead(node);
//释放节点,帮助GC
p.next = null;
failed = false;
return interrupted;
}
//前驱节点不是head
//parkAndCheckInterrupt会挂起当前线程,直到调用release收到唤醒信号
if (shouldParkAfterFailedAcquire(p, node)&&parkAndCheckInterrupt())
interrupted = true;//如果线程被中断,则将interrupted设为true
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//将node设为头部节点,并将thread和prev置空
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
如果当前节点的前驱节点为头节点,当前线程会再次尝试获取锁。如果再次尝试失败或当前节点的前驱节点不是头节点,调用shouldParkAfterFailedAcquire判断是否应当挂起当前线程,看实现:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//表示pred的后继节点node可以安全的挂起
if (ws == Node.SIGNAL)
return true;
//表示pred处于取消状态
if (ws > 0) {
do {
//从链表中移除pred,一直循环直到pred前面的节点不是取消状态
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//修改next指针
pred.next = node;
} else {
//ws为0或者PROPAGATE,将pred状态设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//阻塞当前线程,并返回线程的中断情况
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
unsafe.park(false, 0L);
setBlocker(t, null);
}
private static void setBlocker(Thread t, Object arg) {
unsafe.putObject(t, parkBlockerOffset, arg);
}
只有当前节点的前驱节点的状态为SIGNAL时,线程可安全挂起。其他情况线程会再次进入自旋。
总体看来,shouldParkAfterFailedAcquire就是靠前驱节点判断当前线程是否应该被挂起,如果前驱节点为SIGNAL,则挂起当前线程;如果前继节点处于CANCELLED状态,移除取消节点,并更新当前节点pred指针;前驱节点状态为0(默认进入队列即为0)或PROPAGATE,设前驱节点的状态为SIGNAL,当前线程再次进入自旋,还是未获取锁就会被安全挂起。acquireQueued是一个自旋操作,线程在此挂起,直到被唤醒,满足p == head,执行tryAcquire(arg)尝试获取锁,成功则将当前节点设为新的head,该操作非常重要(想想为什么,后面解释)。
解锁
解锁操作实际调用AQS中的release方法:
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//head不空且waitStatus不为0,唤醒后继节点;waitStatus为0则说明无线程在等待队列中。
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
release首先调用tryRelease尝试释放锁。
protected final boolean tryRelease(int releases) {
//如果线程被锁定多次,则需要进行多次释放
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
调用tryRelease返回true,说明锁已释放,判断head节点,若还有后继节点,调用unparkSuccessor进行唤醒。执行唤醒操作前,首先将当前节点(这里是head)的waitStatus置为0,再判断后继节点是否为取消状态,若为取消状态,需要使用pred指针从tail向前遍历,找到最靠近head的非CANCEL节点,使用LockSupport进行唤醒。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//获取后继节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {//s为空或node处于取消状态
s = null;
//从链表尾部向前遍历,直到找到位于链表最前端且waitStatus小于0的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒线程
}
被唤醒的线程将继续在acquireQueued的自旋操作中进行锁竞争,直到成功获取锁。
上面提到acquireQueued中,线程(假设为线程B)获取锁之后,会调用setHead将当前节点设为新的head。原因是,当持有锁的线程(假设为线程A)释放锁之后,是否需要唤醒后继节点是根据head的waitStatus是否为0来判定,当调用unparkSuccessor进行唤醒时,又会将head的waitStatus置为0。假设线程B之后仍然有线程C在阻塞,线程A唤醒B后将head的waitStatus置为0,若不执行setHead,线程B释放锁时,看到head的waitStatus为0,则将不会唤醒线程C,线程C及后面的节点将一直阻塞。当执行setHead之后,若线程B之后仍然有线程在阻塞,则新的head节点waitStatus必然是SIGNAL,唤醒操作将有序进行。好精妙,佩服!!!
公平锁
FairSync是ReentrantLock的公平锁实现,公平锁的加锁原理:
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
由FairSync源码可知,加锁方法直接调用acquire,然后调用tryAcquire方法。与NonfairSync最大的区别就在于tryAcquire实现不同,首先调用hasQueuedPredecessors确定队列中没有等待线程,或者队列中第一个等待者为当前线程,FairSync才会尝试获取锁。否则,直接进入等待队列。
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
//h != t,说明队列不空
//(s = h.next) == null,队列中无排队线程
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
公平锁实现完全遵循FIFO的原则,在进入等待队列之前不会存在锁竞争的问题。即所有线程按公平原则获取锁,如果锁未被占用且队列为空,获取锁;如果队列为空,自动去排队,不去争抢锁。通常情况下,公平锁的效率不如非公平锁。
下一篇: Hadoop及Yarn的HA集群安装