理解java.util.concurrent包(二)
下面我们来看java.util.concurrent.locks包:
1.AbstractOwnableSynchronizer、AbstractQueuedLongSynchronizer、AbstractQueuedSynchronizer
AbstractQueuedLongSynchronizer和AbstractQueuedSynchronizer是AbstractOwnableSynchronizer的子类。AbstractOwnableSynchronizer主要包括了一个Thread类型的变量exclusiveOwnerThread,并且对这个变量有get/set方法,来设置和获取独占的线程。
AbstractQueuedLongSynchronizer是从JDK6才开始出现的,还没有具体实现的子类
AbstractQueuedSynchronizer的子类实现在可重入锁里面
AbstractQueuedSynchronizer主要有四个最核心的方法:
protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)
分别对应获取和释放锁,前两个用于独占锁,后两个用于共享锁,这四个方法由子类来实现,默认实现是抛出UnsupportedOperationException。
AbstractQueuedSynchronizer仅仅提供获取和释放锁的流程,即当前线程尝试获取锁的时候,AbstractQueuedSynchronizer会先调用tryAcquire或者tryAcquireShared,如果返回false,就把当前线程放到等待队列中去,然后再做进一步操作。
AbstractQueuedSynchronizer按获取过程中的处理方式可以分为三种:响应中断,不响应中断和尝试限时等待超时放弃。这三种方式均针对共享锁和独占锁。
1.独占锁不响应中断
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我们看到线程首先会尝试获取锁,如果获取成功则进行操作,若是没有获取到,则将这个线程加入到等待的队列中,我们来看acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
首先获取前一个节点,若是头结点,那么久再次尝试获取,如果获取成功则返回,如果检测到线程中断,返回true,否则返回false.
如果等待期间检测到中断信号,也就是acquireQueued返回了true,会用selfInterrupt中断当前线程。我们看shouldParkAfterFailedAcquire方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//前面节点在释放锁以后会告诉当前线程,等待即可
return true;
if (ws > 0) {
//大于0的状态只有cancel,所以需要跳过,并且从队列中清除出去
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
2.响应中断的独占锁:
public final void acquireInterruptibly(long arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
它和不响应中断的区别在于调用doAcquireInterruptibly,doAcquireInterruptibly在发现中断后,直接break,然后取消获取锁的打算。如下:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
3.限时等待超时放弃
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) {
cancelAcquire(node);
return false;
}
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
通过循环的方式,若前一个节点是头结点则再次请求一次,成功则返回,否则,累积时间差,若是超时,则取消并返回false。如果线程被中断则抛出异常。
4.不响应中断的共享锁
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
我们看doAcquireShared方法:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
与独占锁的区别是doAcquireShared把selfInterrupt()挪到自己的方法内,其次就是共享锁调用setHeadAndPropagate方法:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
独占锁某个节点被唤醒之后,它只需要将这个节点设置成head就完事了,而共享锁不一样,某个节点被设置为head之后,如果它的后继节点是SHARED状态的,那么将继续通过doReleaseShared方法尝试往后唤醒节点,实现了共享状态的向后传播。我们看doReleaseShared()方法:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
当头结点本身的waitStatus是SIGNAL且能通过CAS算法将头结点的waitStatus从SIGNAL设置为0时,唤醒头结点的后继节点;当头结点本身的waitStatus是0的话,尝试将其设置为PROPAGATE状态的,意味着共享状态可以向后传播。
共享锁的响应中断和超时放弃与上面同理。
在AQS的Node中有每个Node有自己的状态(waitStatus),我们这里归纳一下,分别包含:
SIGNAL——前面有线程在运行,需要前面线程结束后,调用unpark()方法才能**自己,值为:-1
CANCELLED——当AQS发起取消或fullyRelease()时,会是这个状态。值为1,也是几个状态中唯一一个大于0的状态,所以前面判定状态大于0就基本等价于是CANCELLED。
CONDITION——线程基于Condition对象发生了等待,进入了相应的队列,自然也需要Condition对象来**,值为-2。
PROPAGATE——传播。由于共享访问的特点,连续的读操作节点可以依次进入临界区,设为PROPAGATE有助于实现这种迭代操作。
状态0——.初始化状态,也代表正在尝试去获取临界资源的线程所对应的Node的状态。
2.ReentrantLock
ReentrantLock实现了Lock方法,在ReentrantLock中包含了三个类,Sync,NonfairSync,FairSync,其中后面两个类继承第一个类。
ReentrantLock提供了两个构造器,分别是:
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
默认构造器初始化为NonfairSync对象,即非公平锁,而带参数的构造器可以指定使用公平锁和非公平锁。
Sync继承了AQS,sync中的lock方法是abstract的,需要子类实现。
NonfairSync
实现的lock方法:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
首先是一个CAS操作,判断state是否是0(表示当前锁未被占用),如果是0则把它置为1,并且设置当前线程为该锁的独占线程,表示获取锁成功。当多个线程同时尝试占用同一个锁时,CAS操作只能保证一个线程操作成功,则排队。
“非公平”即体现不是按顺序获取,比如占用锁的线程刚释放锁,state置为0,而排队等待锁的线程还未唤醒时,新来的线程可能就直接抢占了该锁。
若获取锁失败,会进入acquire。当然,这个方法包含在AQS里面,可以翻看前面,之前我们说tryAcquire。
会给子类实现,AQS只是提供了一个流程,我们看非公平锁中实现的tryAcquire:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
这个方法调用的是父类的nonfairTryAcquire:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//没有线程占用锁
if (compareAndSetState(0, acquires)) {
//占用锁成功,设置独占线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 更新state值为新的重入次数
setState(nextc);
return true;
}
return false;
}
从上面看出来非公平锁tryAcquire的流程是:检查state字段,若为0,表示锁未被占用,那么尝试占用,若不为0,检查当前占用锁是否是自己,若是则更新state字段,表示重入锁的次数。如果以上两点都没有成功,则获取锁失败,返回false。
#####FairSync
FairSync实现的lock方法如下:
final void lock() {
acquire(1);
}
调用了acquire并且传入了参数1,acquire方法我们可以看到前文是在AQS中的,tryAcquire是在子类中实现的,我们看FairSync中的tryAcquire方法:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {{//没有线程占用锁
if (!hasQueuedPredecessors() && //是否已经是最前面的节点
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
所以我们看到和非公平锁的区别主要是会判断是否按顺序()从头结点后第一个节点开始拿)。
ReentrantReadWriteLock
ReentrantReadWriteLock实现了ReadWriteLock接口,该接口提供readLock()方法获取读锁,writeLock()获取写锁:
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
接下来我们看构造方法:
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
Sync是ReentrantReadWriteLock的内部类,继承自AQS,并且有两个子类,一个是公平锁,还有一个是非公平锁,接下来我们来看Sync中的一组常量:
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
ReentrantReadWriterLock使用一个32位的int类型来表示锁被占用的线程数,高16位表示读锁占有的线程数量,低16位表示写锁被同一个线程申请的次数。
1. SHARED_SHIFT,表示读锁占用的位数,常量16
2. SHARED_UNIT,增加一个读锁,按照上述设计,就相当于增加 SHARED_UNIT;
3. MAX_COUNT,表示申请读锁最大的线程数量,为65535
4. EXCLUSIVE_MASK :表示计算写锁的具体值,用 getState &
EXCLUSIVE_MASK算出写锁的线程数,大于1表示重入。
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }//读锁线程的数量
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }// 写锁的计数,也就是它的重入次数
读锁
读锁,锁定的是AQS的state变量的高16位,当state的高16位等于0,表示当前读锁未被占有;当state的高16位大于0,表示当前读锁可能被一个或多个线程占有,允许重入。
一般来说读锁和写锁互斥,但是如果战有写锁的线程再次获取读锁,那么写锁降级为读锁。
我们看读锁的lock方法:
public void lock() {
sync.acquireShared(1);
}
acquireShared调用Sync父类AQS的acquireShared方法,调用的tryAcquireShared(arg)方法在Sync方法中实现:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&// 已分配了写锁
getExclusiveOwnerThread() != current) // 且当前线程不是持有写锁的线程
return -1;
int r = sharedCount(c);// 取读锁计数
if (!readerShouldBlock() &&// 由子类根据其公平策略决定是否允许获取读锁
r < MAX_COUNT &&// 读锁数量还没达到最大值(2^16 -1)
compareAndSetState(c, c + SHARED_UNIT)) {//通过CAS操作将读锁占有量+1(AQS的state高16位同步加1)
// 注意下面对firstReader的处理:firstReader是不会放到readHolds里的
// 这样,在读锁只有一个的情况下,就避免了查找readHolds。
if (r == 0) { // 是 firstReader,计数不会放入 readHolds。
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) { // firstReader 重入
firstReaderHoldCount++;
} else {
// 非 firstReader 读锁重入计数更新
HoldCounter rh = cachedHoldCounter; // 首先访问缓存
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 获取读锁失败,放到循环里重试。
return fullTryAcquireShared(current);
}
齐总readerShouldBlock在公平模式和 非公平模式下有所区别,公平模式下:
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();//等待队列head节点后的节点非共享节点,返回true
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
公平模式下:
final boolean readerShouldBlock() {
return hasQueuedPredecessors();//等待队列的头尾不为空,并且存在head后的节点并且节点的线程非当前线程,返回true。
}
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
接下来我们看unlock方法:
public void unlock() {
sync.releaseShared(1);
}
releaseShared方法是ASQ中的方法,其中调用了tryReleaseShared,这个方法在子类实现,我们看到Sync中实现了这个方法:
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 清理firstReader缓存 或 readHolds里的重入计数
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
// 完全释放读锁
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count; // 主要用于重入退出
}
// 循环在CAS更新状态值,主要是把读锁数量减 1
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// 释放读锁对其他读线程没有任何影响,
// 但可以允许等待的写线程继续,如果读锁、写锁都空闲。
return nextc == 0;
}
}
读锁的释放就是AQS的state高16位同步递减为0的过程,当高16位都为0则读锁释放完毕,如果此时写锁状态为0,唤醒head节点后下一个SIGNAL状态的节点的线程,一般为等待写锁的节点。如果读锁的占有数不为0,表示读锁未完全释放。或者写锁的占有数不为0,表示释放的读锁是写锁降级来的。
接下来我们分析写锁:
请求锁的过程如下:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) { // 状态不为0,表示锁被分配出去了。
// (Note: if c != 0 and w == 0 then shared count != 0)
// c != 0 and w == 0 表示分配了读锁
// w != 0 && current != getExclusiveOwnerThread() 表示其他线程获取了写锁。
if (w == 0 || current != getExclusiveOwnerThread())
return false ;
// 写锁重入
// 检测是否超过最大重入次数。
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 更新写锁重入次数,写锁在低位,直接加上 acquire 即可。
// Reentrant acquire
setState(c + acquires);
return true ;
}
// writerShouldBlock 留给子类实现,用于实现公平性策略。
// 如果允许获取写锁,则用 CAS 更新状态。
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false ; // 不允许获取锁 或 CAS 失败。
// 获取写锁超过,设置独占线程。
setExclusiveOwnerThread(current);
return true;
}
释放锁的过程如下:
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();//释放的当前线程不是持有锁的线程,抛出异常
int nextc = getState() - releases;//当前重入锁的数量
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}