Java并发包同步组件ReentrantReadWriteLock 博客分类: Java 线程
前言
在前面我们介绍的独占式同步组件ReentrantLock实现了标准的互斥操作,它是和synchronize关键字一样在某一时刻只有一个线程能够获得同步锁,从使用者的角度出发,它还是一种保守锁策略,试想:对共享资源的并发访问中,大部分都是执行读操作,而变更共享资源的写操作却非常少,在这种情况下,虽然读操作并不会影响共享数据的一致性,但是采用ReentrantLock独占锁的时候,多个执行读操作的线程依然需要互斥访问,这势必会大大降低吞吐量。如果能够做到只在执行写操作的时候进行互斥访问共享资源,对执行读操作的线程允许它们并行的访问共享资源,当然读操作必须要能够及时发现写操作对共享数据的修改,也就是所谓的“读写分离”,那样的话就非常完美了,而这就是ReentrantReadWriteLock锁能够办到的事情。
ReentrantReadWriteLock锁就是一种支持ReentrantLock类似语义的实现了读写分离的同步锁,ReentrantReadWriteLock维护着一对锁,一个读锁和一个写锁。通过分离读锁和写锁,使得并发性比一般的排他锁有了较大的提升:在同一时间可以允许多个读线程同时访问,但是在写线程访问时,所有读线程和写线程都会被阻塞。从这句话我们也可以看出ReentrantReadWriteLock读写锁的使用场景:在多处理器上,对共享资源的读取操作频繁,对其修改操作极少。
ReentrantReadWriteLock类结构
从以上的 ReentrantReadWriteLock的类结构图可以看到,ReentrantReadWriteLock实现了ReadWriteLock接口和他的两个分别获取读锁和写锁的方法readLock(),writeLock(). 在ReentrantReadWriteLock内部通过抽象静态内部类Sync继承AQS实现对独占式和共享式情况下对共享资源的公共的获取与释放操作,同时通过Sync的子类实现对公平和非公平模式的支持。另外通过两个静态内部类ReadLock、WriteLock分别实现Lock接口并借助AQS的继承类Sync提供对读锁和写锁的支持。
ReentrantReadWriteLock源码分析
在梳理ReentrantReadWriteLock读写锁的特征之前,我们先通过源码进行分析,之后再进行总结其所有的特性,例如:公平性、重入性、锁降级、锁升级以及条件等待等等。
构造方法与ReadWriteLock接口实现
public ReentrantReadWriteLock() {//默认构造方法 this(false); } //指定公平策略的构造方法 public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } //ReadWriteLock接口实现 public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
构造方法很简单,可见默认情况下ReentrantReadWriteLock使用的是非公平策略。并且在构造方法中分别创建了读锁ReadLock和写锁WriteLock的实例,读锁和写锁都依赖于AQS的具体实现FairSync/NonfairSync。对ReadWriteLock接口的实现仅仅是简单的直接返回在构造方法中创建的读写锁实例。
公平与非公平模式
首先我们看看ReentrantReadWriteLock的公平与非公平策略是靠什么来维护的,也就是FairSync/NonfairSync的不同实现:
static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; final boolean writerShouldBlock() { return false; //非公平模式的写锁优先 } final boolean readerShouldBlock() { //非公平模式下读锁只有在下一个最有资格获取锁的线程是写线程时才需要被阻塞 //在下一个最有资格获取锁的线程不是写操作时,当前读操作也有优先权。 return apparentlyFirstQueuedIsExclusive(); } } static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; final boolean writerShouldBlock() { return hasQueuedPredecessors();//公平模式下一切都按顺序来 } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } }
Sync有两个抽象方法boolean readerShouldBlock()和boolean writerShouldBlock(),这两个方法分别用于在获取读锁和写锁时判断当前线程是否需要被阻塞等待,其实就是对当前线程执行的读/写操作的优先策略的控制,这里叫“公平性”。从上面非公平和公平两种策略的不同实现,我们可以得出:在公平模式下,一切都按进入等待队列的顺序进行,不论是执行的写操作还是读操作。在非公平模式下,写锁有优先权,即当需要执行写操作时,获取写锁的操作可以立即尝试而不需要排队等候,另外在进行读操作的时候,也需要看看是否已经有写操作正在执行或者即将执行,如果有就等待写操作完成,如果没有多个读操作可以并行执行。
写锁的获取
//静态内部类WriteLock对lock方法的实现: public void lock() { sync.acquire(1); } //抽象静态内部类Sync中对state的维护代码: static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); //由于读锁用高位部分,所以读锁个数加1,其实是状态值加 2^16 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;//16个1表示的二进制位,即十进制65535 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;//16个1表示的二进制位,即十进制65535 //得到当前共享锁的个数,state(32位二进制位)无符号右移16位,高位补0,其实就是取高16位 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } //得到当前独占锁的个数,state(32位二进制位)按位与16个1表示的二进制位,同为1才为1,其实就是取低16位 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } //抽象静态内部类Sync中对tryAcquire(int) 方法的实现: protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); //获取state低16位表示的写锁个数 if (c != 0) {//c != 0 表明存在锁 // c !=0 && w ==0 表示只存在读锁, //如果存在读锁,或者存在不是被当前线程占用的写锁也就是非重入的情况,直接导致获取写锁失败。 if (w == 0 || current != getExclusiveOwnerThread()) return false; //走到这里表示是写锁的重入 //想要获取写多的次数超出最大个数范围65535,抛异常 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //表示重入并且写锁次数未达到最大限制,成功! setState(c + acquires); return true; } //不存在任何锁,如果需要被公平原则阻塞或者CAS直接尝试获取写锁失败,则失败 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); //成功获取写锁 return true; }首先可以看出对写锁的实现使用的是AQS的独占锁模式,在使用一个int类型的唯一变量state来进行维护两种锁的次数时,分别使用该32位整形变量state的低16和高16位来表示写锁和读锁的获取次数。梳理获取写锁的逻辑如下:
- 如果已经存在读锁,则获取写锁失败,进入同步等待队列。
- 如果已经存在写锁并且不是重入,则获取失败,进入同步等待队列。
- 如果已经存在写锁并且是重入,但是已经占有的写锁次数加上现在想要获取的写锁次数超过最大限制65536,直接抛出异常,终止获取写锁。
- 如果已经存在写锁并且是重入,并且已经占有的写锁次数加上现在想要获取的写锁次数没有超过65535,则直接更新state变量,表示获取成功。
- 如果不存在任何读写锁,根据不同的公平策略查看是否需要被阻塞等待,如果不需要阻塞写锁就直接尝试修改state获取写锁,成功返回true,其他情况返回false,加入同步等待队列。
由此可知,当已经存在某种锁的情况下,写锁只有在:①没有读锁;②写锁是被当前线程持有的即重入;③写锁数量没有超出最大限制65535,这三个条件都满足的时候才能成功获取。当不存在任何读锁和写锁时,需要按照公平策略要么等待,要么立即尝试获取写锁。
写锁的释放
//静态内部类WriteLock对unlock方法的实现: public void unlock() { sync.release(1); } //抽象静态内部类Sync中对tryRelease(int) 方法的实现: protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) //释放的线程不为锁的持有者抛异常 throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) //如果完全释放了所有的重入,就清除掉当前写锁(独占锁)的持有者。 setExclusiveOwnerThread(null); setState(nextc); //修改state为最新的值 return free; }写锁的释放很简单,唯一需要注意的就是,它只在完全释放所有的重入之后,才会清除当前写锁(独占锁)的持有者。
读锁的获取
//静态内部类ReadLock对lock方法的实现: public void lock() { sync.acquireShared(1); } //抽象静态内部类Sync中对那些获取过读锁的线程的重入次数的维护: private transient Thread firstReader = null; //第一个获取读锁的线程 private transient int firstReaderHoldCount; //第一个获取读锁线程的重入计数器 private transient HoldCounter cachedHoldCounter; //缓存最近一个成功获取读锁的线程的计数。这省却了ThreadLocal查找 private transient ThreadLocalHoldCounter readHolds;//保存当前线程重入读锁的次数的容器。在读锁重入次数为 0 时移除。 //Sync构造方法 Sync() { readHolds = new ThreadLocalHoldCounter(); setState(getState()); //确保 readHolds 的内存可见性,利用 volatile 写的内存语义。 } //每个线程持有读锁的计数器。存放在ThreadLocalHoldCounter,不需要是线程安全的。 static final class HoldCounter { int count = 0; //当前线程id,使用线程id而不是线程引用是为了方便垃圾回收 final long tid = getThreadId(Thread.currentThread()); } //采用继承ThreadLocal,重写initialValue方法,自动封装线程计数器, //而且可以直接调用ThreadLocal的get方法拿到计数器,方便快捷。 static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } //抽象静态内部类Sync中对tryAcquireShared(int) 方法的实现: //这里的参数取名叫“unused”是因为读锁的重入计数是内部维护的。 protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //如果存在写锁,且不是当前线程持有,返回-1,表示失败 //执行到这里如果存在写锁,并且是当前线程所持有,现在又在获取读锁,其实就是所谓的“锁降级” int r = sharedCount(c); //读锁 //根据读锁是否不需要被阻塞的公平策略,读锁没有超范围65535,直接尝试获取读锁 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {//注意读锁存在在高16位,所以加了2^16 //成功获取到读锁之后: if (r == 0) { //还不存在读锁,表示是第一个获取读锁的线程,设置firstReader和重入计数器 //这里不直接放入readHolds容器是为了提高在无争用的情况加快效率 //因为查找readHolds容器的get方法也是比较耗时的 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) {// 第一个获取读锁的线程线程重入 firstReaderHoldCount++; } else { // 非 firstReader 读锁重入计数更新 HoldCounter rh = cachedHoldCounter; // 首先直接访问缓存 if (rh == null || rh.tid != getThreadId(current)) //如果缓存为空或者最后一个获取读锁的线程发生变化了,就更新缓存 //这里致使缓存里面总是存放的最后一次获取到读锁的线程,这是为了方便释放读锁的时候可以直接使用缓存,而避免再次从readHolds容器中get //因为通常情况下,下一个释放读锁的线程就是最后一个获取读锁的线程 cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) //第一次获取读锁,就加入readHolds容器 readHolds.set(rh); rh.count++; //对重入计数器加1 } return 1; //返回1, 表示成功 } return fullTryAcquireShared(current);// 获取读锁失败,放到循环里重试。 } final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; //如果存在写锁,且不是当前线程持有,立即返回失败 } else if (readerShouldBlock()) { //下面这段逻辑是指:根据公平策略需要被阻塞时,如果是已经获取过读锁的线程重入时不应该被阻塞,不然将导致死锁。 if (firstReader == current) { // assert firstReaderHoldCount > 0;//被注释了 } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove();//清除重入次数为0的计数器 } } if (rh.count == 0) return -1; //还没有获取过读锁,且需要被阻塞时,返回-1,表示失败。 } } // 走到这里表示写锁空闲且公平策略决定线程可以获取读锁 if (sharedCount(c) == MAX_COUNT) //读锁数量超过最大限制抛出Error throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { //申请读锁成功,跟tryAcquireShared是类似的,进行计数器加1,并设置最后一次获取读锁的缓存 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; //返回代表成功的1 } } }根据以上对读锁的获取源码的分析,可以看到读锁获取锁的过程比写锁稍微复杂些,对读锁的实现使用的是AQS的共享锁模式,读锁的获取过程大致可以概括如下:
- 在已经存在其他线程获取到了写锁的情况下,不允许立即获取到读锁。相反,在当前线程已经获取的写锁的情况下可以再次获取读锁,即所谓的“锁降级”。
- 在根据公平策略不需要阻塞时,且读锁数量没有超范围(65535)时,可以立即尝试获取读锁。
- 在无论哪种情况导致第2步中没有能成功获取到读锁时,需要在自旋中不断进行条件判断或尝试获取读锁,这时会出现如下几种情况;
- ①写锁被其他线程占用导致获取失败;②公平策略指示需要被阻塞,但是是第一次尝试获取读锁的,立即返回失败;③读锁数量超范围,跑出Error级别异常;④写锁没被其他线程占用,公平策略允许立即尝试,读锁数量也不会超范围时,立即尝试获取读锁,成功则返回1,失败继续自旋走以上条件分支。另外,在②中,如果公平策略指示需要被阻塞,也不是第一次尝试获取读锁的情况,也会再次进入自旋重复进行以上条件判断。在①和②这种失败的情况下,会被加入到同步等待队列,其它除开情况③的时候,就继续在自旋中不断进行以上条件判断或尝试。
读锁的获取过程大致就是这样,但是值得一提的是,在获取读锁的过程中,还对读锁的重入次数进行计数,对第一个获取读锁的线程单独进行计数,对其他后来获取读锁的线程采用了ThreadLocal类型的容器进行计数,并且还对最后一次获取读锁的线程计数器进行了缓存,这些种种策略最终不仅是为了满足读锁释放重入的要求,更是为了执行效率的提升。
读锁的释放
//静态内部类ReadLock对unlock方法的实现: public void unlock() { sync.releaseShared(1); } //抽象静态内部类Sync中对tryReleaseShared(int) 方法的实现: //这里的参数取名叫“unused”是因为读锁的重入计数是内部维护的。 protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // 清理firstReader缓存或重入计数器减1 if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { // 清理readHolds里的重入计数 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { // 完全释放读锁 readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; // 重入计数器减1 } for (;;) { // 自旋中通过CAS更新状态值,主要是把读锁数量减 1 int c = getState(); int nextc = c - SHARED_UNIT; //注意读锁存在在高16位,所以实际减了2^16 if (compareAndSetState(c, nextc)) //释放1个读锁 //nextc == 0 表示彻底释放了所有读锁,返回true,否则返回false //释放读锁对其他读线程没有任何影响, 但再彻底释放之后,可以允许等待的写线程继续 return nextc == 0; } }读锁的释放操作还是很简单的,一句话概括就是先对实现了重入计数器的那些对象进行清除,然后再进行真正的释放读锁的数量state的高16减1。
读写锁的其它方法
由于读写锁都实现了Lock接口,所以它们都对支持中断的、非阻塞式的、带有超时机制的获取读写锁进行了实现,这里就不在一一进行源码分析了,它们的实现只是和上面的过程略微不同但基本和ReentrantLock中相关的机制一直,不同的是要遵循读写锁的一些基本规则,例如:在有写入锁时,只有重入的读取锁才能获取成功,其他任何情况的任何形式的锁申请都将失败。在有读取锁时,不论是不是重入,写锁的申请都将失败。
ReentrantReadWriteLock读写锁总结
通过以上对读写锁的源码分析,可以很清楚的知道它的一些特性,在真正了解这些特性之后,才有助于我们选择使用它。
读写锁的实现
写锁的实现以独占锁的方式实现,读锁的实现以共享式的方式实现。
读写锁数量
由于对读写锁的维护使用的是同一个int类型的32位字节的整形变量state,所以使用了它的高16表示读锁,低16表示写锁,因此每一把读写锁能够被获取的次数或者重入次数只有2^16-1=65535次。
读写锁的阻塞
存在读锁时,写锁将被阻塞,不论是不是同一个线程;存在写锁时,只有在被同一个线程再次申请读锁时才不会被阻塞,其他情况也会被阻塞。
公平性
非公平模式(默认): 在非公平模式下,写锁是最有优先权的,在不存在写锁时新来的读线程比晚到的读线程有优先权,所以在非公平模式下,一个锁的获取可能会导致一个或多个读锁或写锁的无限期延迟,但是这提高了吞吐量。在一般情况下,吞吐量通常要高于公平锁。
公平模式: 在公平模式下,所有线程利用一个近似到达顺序的策略来争夺进入,写锁也没有优先的特权,并且写锁的分配的独占的,一次只能被一个写线程持有,其他读写锁的申请都将被阻塞,如果有一拼读锁排在最前面,那么它们都将有机会获得读锁,前提是读锁的数量没有超过65535限制。 另外公平模式对非阻塞的读写锁获取方法WriteLock.tryLock()和ReadLock.tryLock()没有影响,它们将会立即尝试获取锁。
重入
读锁和写锁都允许同ReentrantLock那样进行相同锁的重入,但是读写锁都最多只支持65535次获取操作。
锁降级
写线程获取写入锁后可以再获取读取锁,然后再释放写入锁,这样就从写入锁变成了读取锁,这就是所谓的“锁降级”。
锁升级(不支持)
读取锁是不能直接升级为写入锁的。因为获取一个写入锁需要释放所有读取锁,所以如果有两个读取锁试图获取写入锁而都不释放读取锁时就会发生死锁。
获取锁可中断
synchronize等待获取锁的过程是不支持中断的,ReentrantReadWriteLock实现的读写锁都支持获取锁期间被中断。因为他们都实现了Lock接口中关于中断获取锁的接口方法。
条件等待Condition
只有写锁支持通过条件等待Condition来进行线程间的通信,读锁不支持条件等待,其对lock接口的newCondition方法的实现直接返回的是UnsupportedOperationException异常。
监控
ReentrantReadWriteLock还提供了一些用于监控或者叫查看读写锁状态的一些方法,这些方法不是用于进行同步操作,而是用于监控系统状态。
读写锁应用场景
大部分是读操作,少量的写操作,并且读取操作的逻辑本身相对复杂耗时,写操作相对简单耗时少。例如对缓存的存取操作就是使用读写锁最适合的场景。
ReentrantReadWriteLock读写锁的缺陷
ReentrantReadWriteLock读写锁虽然做到了多个读操作之间不被阻塞,但是读操作会阻塞写,如果读操作频繁,写操作很少,那么这时可能读操作将会无限期的推迟写操作的执行(虽然在非公平模式下,写操作原则上具有优先权,但是由于读和读操作之间不存在相互阻塞,所以当多个读操作交叉形成链式的执行顺序时,写操作其实有可能会一直被阻塞),导致数据迟迟得不到更新,虽然在公平模式下可以解决写饥饿问题,但是那样将会导致低效以及系统吞吐量的严重降低。只能在没有任何读写锁的时候,才能获取到写锁,这其实也是悲观读锁的表现。在JDK8新增的另一种锁StampedLock将会是ReentrantReadWriteLock的改进版本。
ReentrantReadWriteLock示例
示例一:利用锁降级特性更新缓存
public class ReentrantReadWriteLockTest { Object cacheData; volatile boolean cacheValid; ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); //先获取读锁 if (!cacheValid) { //假设发现缓存失效 rwl.readLock().unlock(); //申请写锁前先释放读锁 rwl.writeLock().lock(); //再申请写锁 //更新缓存前,再检查一遍,防止已经被其他线程更正了 if (!cacheValid) { cacheData = reloadCache(); //加载缓存 cacheValid = true; } rwl.readLock().lock(); //在持有写锁的同时,申请读锁 rwl.writeLock().unlock(); //读锁申请成功之后,释放写锁,即“锁降级” } use(cacheData); //再次使用新的缓存 rwl.readLock().unlock();//释放读锁 } }
示例二:利用读写锁提高集合存取操作的并发性(假设这个集合很大,并发度很高,读取操作多过写入,并且读取操作的开销高于同步本身开销):
class RWDictionary { private final Map<String, Data> m = new TreeMap<String, Data>(); private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); public Data get(String key) { r.lock(); try { return m.get(key); } finally { r.unlock(); } } public String[] allKeys() { r.lock(); try { return m.keySet().toArray(); } finally { r.unlock(); } } public Data put(String key, Data value) { w.lock(); try { return m.put(key, value); } finally { w.unlock(); } } public void clear() { w.lock(); try { m.clear(); } finally { w.unlock(); } } }
推荐阅读
-
同步数据结构之原子类序章 博客分类: Java 线程
-
Java并发包同步组件ReentrantReadWriteLock 博客分类: Java 线程
-
同步数据结构之原子复合类 博客分类: Java 线程
-
Java并发包同步组件ReentrantReadWriteLock 博客分类: Java 线程
-
Java Volatile学习 博客分类: 并发&异步&无阻塞 多线程并发cocurrencethread同步
-
Java并发框架(Concurrency) 博客分类: 并发&异步&无阻塞 并发多线程concurrent同步异步
-
Java分布式应用学习笔记05多线程下的并发同步器----后篇 博客分类: 分布式集群 分布式集群线程调度多线程同步器
-
Java分布式应用学习笔记05多线程下的并发同步器----前篇 博客分类: 分布式集群 分布式集群并发包线程调度器多线程
-
JAVA线程同步 博客分类: JAVA面试题 Java工作thread
-
JAVA线程同步 博客分类: JAVA面试题 Java工作thread