Java-并发-锁-ReadWriteLock
程序员文章站
2022-07-12 11:30:59
...
Java-并发-锁-ReadWriteLock
摘要
现在大家开发程序,大多是在多线程场景,就会用到各种锁。
但其实往往读和读之间是不冲突的,是无状态无修改的,不应该互相互斥。我们往往只需在读写或者写与写之间互斥即可。在JDK中就直接提供了一个ReadWriteLock
,本文会介绍其基本概念及分析源码。
0x01 基本概念
ReadWriteLock
的互斥关系如下表:
读 | 写 | |
---|---|---|
读 | 不互斥 | 互斥 |
写 | 互斥 | 互斥 |
ReadWriteLock的重要知识点如下:
- 读写锁分为读锁和写锁,分开使用
- 读锁允许读可并发,但此时写锁申请会被阻塞
- 写锁不允许任何并发,一旦有线程拥有写锁,其他线程的读写锁申请全被阻塞
- ReadWriteLock适用于写少读多的场景
- ReadWriteLock也被称为
共享-独占锁
- 读写锁都可重入,调用了几次
lock
就必须配套调用几次unlcok
使用时一般是这几个api:
// 获得读写锁实例
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// 读锁加锁,可与其他线程读锁共享,阻塞其他写锁申请,可重入
lock.readLock().lock()
// 读锁解锁,调用次数必须和lock相同
lock.readLock().unlock();
// 写锁加锁,阻塞其他线程读写锁申请,可重入
lock.writeLock().lock();
// 写锁解锁,调用次数必须和lock相同
lock.writeLock().unlock();
0x02 实现原理
2.1 流程分析
2.2 源码分析
2.2.1 ReadWriteLock接口
ReadWriteLock
类是一个位于java.util.concurrent.locks
的接口类。
public interface ReadWriteLock {
// 获取读锁
Lock readLock();
// 获取写锁
Lock writeLock();
}
这个接口申明真简单,我们下面看看最常用的一个实现java.util.concurrent.locks.ReentrantReadWriteLock
2.2.2 ReentrantReadWriteLock定义和重要属性
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
// 读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
// 写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
// 用来支持同步机制,继承自AQS
final Sync sync;
// 默认非公平锁
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
// 有公平/非公平两种sync实现
sync = fair ? new FairSync() : new NonfairSync();
// 分别创建读写锁对象
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
// 得到构建好了的写锁。
// 逼死强迫症啊,为啥这里的顺序和构造方法里不同- -
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
2.2.3 Sync
- 类定义
abstract static class Sync extends AbstractQueuedSynchronizer
继承自 AbstractQueuedSynchronizer
即AQS
- 重要属性和方法
static final int SHARED_SHIFT = 16;
// 2^16,1后面跟16个0。共享锁每次递增单位,其实就相当于高16位加1
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 2^16-1, 16个1。
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 2^16-1, 16个1。
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 保留高16位,代表共享锁(读锁)的许可数量
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
// 保留低16位,代表独占锁(写锁)的许可数量
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
// 内部类,用来对每个线程持有的共享读锁计数
static final class HoldCounter {
int count = 0;
// 这里使用线程ID而不是线程对象引用,是为了避免GC遗漏
final long tid = getThreadId(Thread.currentThread());
}
// 存最后一个成功申请共享读锁的线程拥有的锁许可数量
private transient HoldCounter cachedHoldCounter;
/**
* ThreadLocal subclass. Easiest to explicitly define for sake
* of deserialization mechanics.
*/
// 内部类,继承自ThreadLocal,泛型为HoldCounter
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
// 每个线程独有一个HoldCounter
return new HoldCounter();
}
}
// 当前线程持有的可重入读锁许可数量
// 在构造函数和readObject中初始化
// 当某个线程的读锁许可数量降为0,就移除该值
private transient ThreadLocalHoldCounter readHolds;
// 存第一个reader线程
private transient Thread firstReader = null;
// 存第一个reader线程的锁许可数量
private transient int firstReaderHoldCount;
// 构造方法,被非公平/公平的子类共用
Sync() {
// 创建一个ThreadLocal的HoldCounter
// 为每个线程单独统计拥有的共享读锁许可数量
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
2.2.4 NonfairSync
NonfairSync
是默认采用的,即非公平锁。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}
2.2.5 FairSync
可以在初始化时指定为公平锁FairSync
:
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
2.2.6 ReentrantReadWriteLock初始化
初始化使用的语句是new ReentrantReadWriteLock()
,他主要干的工作如下:
sync = fair ? new FairSync() : new NonfairSync();
// 分别创建读写锁对象,并把当前lock的sync的引用传递给两个锁
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
上面不管用公平与非公平锁模式,都会调用他们的父类构造函数:
// 创建一个ThreadLocal的HoldCounter
// 为每个线程单独统计拥有的共享读锁许可数量
readHolds = new ThreadLocalHoldCounter();
setState(getState());
2.2.7 lock.readLock().lock()
我们来看看读锁的加锁流程:
- reentrantReadWriteLock.lock
public void lock() {
// 直接调用AQS的公平模式申请锁,许可数为1
sync.acquireShared(1);
}
- AQS.acquireShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
// 尝试申请共享锁失败时,就以不可中断的模式申请共享锁
doAcquireShared(arg);
}
- reentrantReadWriteLock.tryAcquireShared
// 此方法返回值如下:
// 1.负数代表申请共享锁失败
// 2.数字0代表共享锁申请成功,但是后续的共享锁申请都不能成功
// 3.正数代表共享锁申请成功,后续的共享锁申请可能也成功,此时必须检查后续wait线程可用性
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
// 当前锁许可总数
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
// 存在独占锁(写锁)且不是当前线程拥有,读写应阻塞,返回失败
return -1;
// 返回共享锁数量(就是锁的高16位)
int r = sharedCount(c);
// readerShouldBlock判断是否应该被阻塞
// 1.非公平锁调用apparentlyFirstQueuedIsExclusive,
// 即如果wait_queue的真实线程头结点是独占锁(写锁),就返回true
// 否则返回false
// 2.而公平锁调用hasQueuedPredecessors
// 当有线程结点在当前线程之前,返回true(公平性)
// 当前线程为wait_queue真实头结点或wait_queue为空,返回false
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 非公平锁模式,wait_queue真实头结点不是独占锁
// 公平锁模式,当前线程为wait_queue真实头结点或wait_queue为空
// 且共享锁数量尚未超出最大值
// 且CAS更新锁许可高位(累积共享读锁数量)+1成功
if (r == 0) {
// 共享锁许可数为0
// 当前线程作为头一个reader线程
firstReader = current;
// 分给他一个锁许可
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 当前线程就是头一个reader线程,重入情况,直接累加锁许可
firstReaderHoldCount++;
} else {
// 当前有别的线程持有共享锁
// 最后一个成功申请共享读锁的线程和锁信息
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
// 未曾有读锁获取或最后读锁不是当前线程
// 拿到线程独有的HoldCounter,更新给cachedHoldCounter
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// 当前线程就是最后一个成功申请共享读锁,但还没有读锁?
// 此时就将读锁信息更新给该线程
// 因为在其他方法中,count==0时会remove
readHolds.set(rh);
// 最后累加读锁
rh.count++;
}
// 当前线程申请读锁成功,返回1
return 1;
}
// 其余情况调用fullTryAcquireShared
return fullTryAcquireShared(current);
}
总的来说:
- 别的线程拥有写锁,申请读锁失败,返回-1
- 无写锁,当前线程申请读锁成功成功,就更新线程的
HoldCounter
,返回1 - 其余情况,返回调用
fullTryAcquireShared
的结果
- reentrantReadWriteLock.fullTryAcquireShared
// 该方法是申请读锁的完整版,有很多代码和tryAcquireShared相同
// 但处理了tryAcquireShared没有处理的CAS失败和非头Reader的重入情况
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
// 此时当前reader线程应该被阻塞
// 1.非公平锁调用apparentlyFirstQueuedIsExclusive,
// 即如果wait_queue的真实线程头结点是独占锁(写锁),就返回true
// 2.而公平锁调用hasQueuedPredecessors
// 当有线程结点在当前线程之前,返回true(公平性)
if (firstReader == current) {
// 重入读锁
// assert firstReaderHoldCount > 0;
} else {
// 非重入读锁
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
// 更新为当前线程的HoldCounter
rh = readHolds.get();
if (rh.count == 0)
// 当前线程的HoldCounter,就remove掉该HoldCounter
// 目的是防止ThreadLocal未释放造成的弱引用对象内存泄露
readHolds.remove();
}
}
if (rh.count == 0)
// 注意此时该reader线程应该阻塞
// 所以当当前线程已有锁许可为0,就认为失败,返回-1
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
// 锁许可达到最大值,抛出异常
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
// CAS为当前高位(共享锁)累积锁许可数增加锁许可成功
if (sharedCount(c) == 0) {
// 之前的共享锁许可数为0
// 当前线程作为头一个reader线程
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 当前线程就是头一个reader线程,重入情况,直接累加锁许可
firstReaderHoldCount++;
} else {
// 当前有别的线程持有共享锁
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
// 当前线程共享读锁许可数自增
rh.count++;
cachedHoldCounter = rh; // cache for release
}
// 当前线程读锁成功申请,返回1
return 1;
}
}
}
- AQS.doAcquireShared
// 以不可中断的模式申请共享锁
private void doAcquireShared(int arg) {
// 当前线程以SHARED共享模式加入wait_queue
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 前驱结点
final Node p = node.predecessor();
if (p == head) {
// 此时当前结点就是真实头结点
// 尝试申请共享读锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// 申请成功
// 设置队列头,并检查后继者是否可能在共享锁模式下等待,
// 如果是传播,则设置传播> 0或PROPAGATE状态。
setHeadAndPropagate(node, r);
// 从wait_queue移除当前节点
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
// 成功,退出循环返回
return;
}
}
// 更新前驱结点为SIGNAL,阻塞当前线程直到唤醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}