jdk11源码--ReentrantReadWriteLock源码
文章目录
概述
在上一篇博文jdk11源码-ReentrantLock源码中介绍了ReentrantLock的源码实现。ReentrantReadWriteLock是ReentrantLock的兄弟类,顾名思义,读写锁。当前其内部实现也是借助于AQS队列,不过与ReentrantLock的实现稍有不同,后面会逐步分析。
一个典型使用案例:
class RWDictionary {
private final Map<String, Data> m = new TreeMap<>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
public Data get(String key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
public List<String> allKeys() {
r.lock();
try { return new ArrayList<>(m.keySet()); }
finally { r.unlock(); }
}
public Data put(String key, Data value) {
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}
public void clear() {
w.lock();
try { m.clear(); }
finally { w.unlock(); }
}
}}
读写标记的存储
在之前分析ReentrantLock时,讲述了线程获得锁的标记是在state上的,state=0表示没有被加锁,state=1表示加锁成功,state>1表示锁重入。
但是ReentrantReadWriteLock是读写锁,既要保存是否加锁,还要保存锁的类型,以及重入,只有state一个字段是无法满足的。所以ReentrantReadWriteLock对其进行了改造,一方面引入了HoldCounter来保存重入数量;一方面将state分为高低位,state是int类型,32位, 高16位保存共享锁(读锁),低16位保存独占锁(写锁)。接下来看一下详细实现。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
//state被拆分为高低位,高16位保存共享锁(读锁),低16位保存独占锁(写锁)
//这几个字段是为了区分state的高低位的,具体使用会在下面讲解
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
//返回共享锁的数量。c无符号右移16位也就是高16位的值。
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 返回独占锁的数量。EXCLUSIVE_MASK转为2进制是16个1,c是32位,进行与计算,高16位结果全为0,低16位结果与c的低16位相同,也就是c的低16位的值。
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
//HoldCounter类主要用于读锁的可重入,记录了重入的次数:count。
//HoldCounter会被包装在ThreadLocal中,是线程安全的,并且缓存在cachedHoldCounter变量中
static final class HoldCounter {
int count; // 初始值为0
// 这里使用id,而不是引用,目的是为了避免垃圾回收
final long tid = LockSupport.getThreadId(Thread.currentThread());
}
//这里重写了initialValue方法,可以保证获取到的HoldCounter对象是同一个,不会重复创建。
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
/**
* 当前线程持有的 可重入读锁 的数量。
* 仅在构造函数和readObject方法中初始化。
* 当count的值降至0时删除。
*/
private transient ThreadLocalHoldCounter readHolds;
//缓存最后一个成功获取读锁的线程的HoldCounter
//通常来说,下一个要释放锁的线程就是最后一个获取锁的线程。这里使用cachedHoldCounter来单独保存,可以减少在ThreadLocal中查找的开销
private transient HoldCounter cachedHoldCounter;
//第一个获取读锁的线程,确切来说firstReader是最后一次将共享计数从0更改为1的唯一线程,并且从那时起就没有释放读锁定; 如果没有这样的线程,则返回null。
private transient Thread firstReader;
//第一个获取读锁线程的 hold count
private transient int firstReaderHoldCount;
Sync() {
//初始化一个readHolds,其count默认是0
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // 这一步目的是为了确保state的可见性
}
//省略其他代码
}
上面介绍了关键的一些属性变量及业务逻辑。关键点有
- state分为高16位和低16位,高16位保存共享锁(读锁),低16位保存独占锁(写锁)
- 使用HoldCounter 来记录读锁的重入次数,HoldCounter会被包装在ThreadLocal中,是线程安全的
另外,锁的获取与释放都是走的同一段代码逻辑。区别点在于AQS队列非空时的插入规则。
写锁的加解锁过程分析
writeLock.lock
public void lock() {
sync.acquire(1);
}
调用AQS类的java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
方法:
public final void acquire(int arg) {
if(!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
其中addWaiter和acquireQueued方法与ReentrantLock中分析的一样,这里就不在讲述。着重分析一下tryAcquire方。这里具体实现在java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquire
中:
@ReservedStackAccess
protectedfinal boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();//获取state的值
int w = exclusiveCount(c);//获取独占锁的数量
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;//走到这里,c不等于0,w(低16位)是0,所以高16位肯定有值。也就是说此时有共享锁存在。
if (w + exclusiveCount(acquires) > MAX_COUNT)//校验独占锁的数量是否超标
throw new Error("Maximum lock count exceeded");
//走到这里,说明当前是写锁重入。因为走到这一步只可能是:c不是0,w不是0,exclusiveOwnerThread标识的线程是当前线程。
setState(c + acquires);
return true;
}
//走到这里说明c的值是0。也就是读锁和写锁都没有。当然可以加锁了。
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);//加锁成功,设置获取独占锁的线程为当前线程
return true;
}
上面就是获取锁的过程,总结一下就是:
- state的值是0,说明当前没有锁,那么判断写锁是否应该阻塞,如果不阻塞,则修改state的值,修改成功则加锁成功
- state的值不是0,低16位是0,说明高16位不是0,此时有共享锁存在,不可以加锁。
- state的值不是0,低16位不是0,但是当前获取独占锁的线程不是当前线程,不可以加锁
- state的值不是0,低16位不是0,当前获取独占锁的线程是当前线程,说明是写锁重入,可以加锁
writeLock.unlock
writeLock.unlock源码:
public void unlock() {
sync.release(1);
}
他调用AQS类的release方法:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
我们这里只分析tryRelease方法,他的实现在ReentrantReadWriteLock.Sync类中, 其他的与ReentranteLock中分析的一致
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())//独占锁的持有者不是当前线程,抛异常
throw new IllegalMonitorStateException();
int nextc = getState() - releases;//这里是独占锁,低16位,所以可以直接相减,下一步会校验是不是低16位减为0
boolean free = exclusiveCount(nextc) == 0;//判断低16位的独占锁是否都释放完
if (free)
setExclusiveOwnerThread(null);//独占锁已经释放完,exclusiveOwnerThread设为null
setState(nextc);//释放state
return free;
}
独占锁的释放过程比较简单的,首先检查加锁的是不是当前线程,不是则抛异常。接下来校验释放releases数量的state后,低16位是否为0,是,则说明独占锁释放完成,固设置exclusiveOwnerThread=null,否则设置新的state。
公平锁与非公平锁
在上面tryAcquire代码中有一个方法writerShouldBlock来判断写锁是否应该被阻塞。这里公平锁和非公平锁的实现是不同的。
公平锁的实现:
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
public final boolean hasQueuedPredecessors() {
Node h, s;
if ((h = head) != null) {
if ((s = h.next) == null || s.waitStatus > 0) {
s = null; // traverse in case of concurrent cancellation
for (Node p = tail; p != h && p != null; p = p.prev) {
if (p.waitStatus <= 0)
s = p;
}
}
if (s != null && s.thread != Thread.currentThread())
return true;
}
return false;
}
读锁的加解锁过程分析
readLock.lock
public void lock() {
sync.acquireShared(1);
}
该方法会调用AQS的acquireShared方法
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
首先尝试获取tryAcquireShared,如果失败则执行doAcquireShared。
tryAcquireShared的实现在ReentrantReadWriteLock中:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;//已经有写锁获取,读锁加锁失败
int r = sharedCount(c);//共享锁的数量
if (!readerShouldBlock() && r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {//不阻塞,并且申请一个读锁成功
if (r == 0) {//记录最后一次将共享锁计数从0更改为1的线程,初始化firstReaderHoldCount =1
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;//firstReader 标识的第一个读线程 读锁重入
} else {
HoldCounter rh = cachedHoldCounter;//获取缓存的最后一个获取读锁的线程的HoldCounter
if (rh == null || rh.tid != LockSupport.getThreadId(current))
cachedHoldCounter = rh = readHolds.get();//获取当前线程的计数器缓存
else if (rh.count == 0)
readHolds.set(rh);//将缓存的cachedHoldCountert添加到threadlocal变量readHolds中
rh.count++;//当前线程重入计数器加一
}
return 1;
}
return fullTryAcquireShared(current);
}
可以看到这里入参没有用到,而是通过compareAndSetState(c, c + SHARED_UNIT)
固定的申请一个读锁标记。整体流程如下:
- 如果另一个线程获取了写锁,失败
- 如果读线程不需要被阻塞,且读锁数量小于最大值,且修改状态成功,那么
2.1. 如果共享计数器是0,说明当前线程是第一个获取锁的线程,记录firstReader 和firstReaderHoldCount
2.2. 如果当前线程与firstReader 一致,则将firstReaderHoldCount 加1
2.3. 获取当前线程的HoldCounter,设置缓存readHolds, 并将其HoldCounter的count加1
这里隐藏的优化点:firstReader 和firstReaderHoldCount。用于缓存第一个获取读锁的线程。目的是为了减少threadlocal的操作,提高效率。
当(!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)
判断失败时,会进入fullTryAcquireShared方法,该方法会采用CAS的方式来保证操作执行成功。
fullTryAcquireShared方法与上面tryAcquireShared逻辑基本相同,但是为什么也要写两份呢?其实这也是为了性能考虑。
读者在阅读java线程及锁相关的代码时,会经常遇到类似的逻辑,将通用方法与大概率出现的代码逻辑分开,对于大概率出现的逻辑进行针对性的优化来提高性能。
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)//有其他线程加了独占锁,加锁失败
return -1;
} else if (readerShouldBlock()) {
//读锁需要被阻塞
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();//count为0,当前线程的所有读锁都释放了,将其删除
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
//成功申请一个读锁
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; //缓存最后一个成功获取读锁的线程
}
return 1;
}
}
}
readLock.unlock
public void unlock() {
sync.releaseShared(1);
}
//releaseShared调用AQS类中的releaseShared方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared的实现在ReentrantReadWriteLock中:
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {//当前线程是第一个读线程,释放 firstReader 及firstReaderHoldCount
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
//当前线程不是第一个读线程,需要从threadlocal中获取HoldCounter ,释放其count
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();//说明这已经是最后一次释放锁了,直接删除即可
if (count <= 0)
throw unmatchedUnlockException();//说明锁计数器出现异常
}
--rh.count;//释放count的值
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;//读锁标记减去 1 (还记得高16位表示读锁数量吗?)
if (compareAndSetState(c, nextc))
//当没有读锁也没有写锁时,返回true
return nextc == 0;
}
}
上一篇: php强制下载类型的实现代码_PHP教程
下一篇: 最全的HTML5标签