死磕 java同步系列之ReentrantReadWriteLock源码解析
问题
(1)读写锁是什么?
(2)读写锁具有哪些特性?
(3)reentrantreadwritelock是怎么实现读写锁的?
(4)如何使用reentrantreadwritelock实现高效安全的treemap?
简介
读写锁是一种特殊的锁,它把对共享资源的访问分为读访问和写访问,多个线程可以同时对共享资源进行读访问,但是同一时间只能有一个线程对共享资源进行写访问,使用读写锁可以极大地提高并发量。
特性
读写锁具有以下特性:
是否互斥 | 读 | 写 |
---|---|---|
读 | 否 | 是 |
写 | 是 | 是 |
可以看到,读写锁除了读读不互斥,读写、写读、写写都是互斥的。
那么,reentrantreadwritelock是怎么实现读写锁的呢?
类结构
在看源码之前,我们还是先来看一下reentrantreadwritelock这个类的主要结构。
reentrantreadwritelock中的类分成三个部分:
(1)reentrantreadwritelock本身实现了readwritelock接口,这个接口只提供了两个方法readlock()
和writelock()
;
(2)同步器,包含一个继承了aqs的sync内部类,以及其两个子类fairsync和nonfairsync;
(3)readlock和writelock两个内部类实现了lock接口,它们具有锁的一些特性。
源码分析
主要属性
// 读锁 private final reentrantreadwritelock.readlock readerlock; // 写锁 private final reentrantreadwritelock.writelock writerlock; // 同步器 final sync sync;
维护了读锁、写锁和同步器。
主要构造方法
// 默认构造方法 public reentrantreadwritelock() { this(false); } // 是否使用公平锁的构造方法 public reentrantreadwritelock(boolean fair) { sync = fair ? new fairsync() : new nonfairsync(); readerlock = new readlock(this); writerlock = new writelock(this); }
它提供了两个构造方法,默认构造方法使用的是非公平锁模式,在构造方法中初始化了读锁和写锁。
获取读锁和写锁的方法
public reentrantreadwritelock.writelock writelock() { return writerlock; } public reentrantreadwritelock.readlock readlock() { return readerlock; }
属性中的读锁和写锁是私有属性,通过这两个方法暴露出去。
下面我们主要分析读锁和写锁的加锁、解锁方法,且都是基于非公平模式的。
readlock.lock()
// reentrantreadwritelock.readlock.lock() public void lock() { sync.acquireshared(1); } // abstractqueuedsynchronizer.acquireshared() public final void acquireshared(int arg) { // 尝试获取共享锁(返回1表示成功,返回-1表示失败) if (tryacquireshared(arg) < 0) // 失败了就可能要排队 doacquireshared(arg); } // reentrantreadwritelock.sync.tryacquireshared() protected final int tryacquireshared(int unused) { thread current = thread.currentthread(); // 状态变量的值 // 在读写锁模式下,高16位存储的是共享锁(读锁)被获取的次数,低16位存储的是互斥锁(写锁)被获取的次数 int c = getstate(); // 互斥锁的次数 // 如果其它线程获得了写锁,直接返回-1 if (exclusivecount(c) != 0 && getexclusiveownerthread() != current) return -1; // 读锁被获取的次数 int r = sharedcount(c); // 下面说明此时还没有写锁,尝试去更新state的值获取读锁 // 读者是否需要排队(是否是公平模式) if (!readershouldblock() && r < max_count && compareandsetstate(c, c + shared_unit)) { // 获取读锁成功 if (r == 0) { // 如果之前还没有线程获取读锁 // 记录第一个读者为当前线程 firstreader = current; // 第一个读者重入的次数为1 firstreaderholdcount = 1; } else if (firstreader == current) { // 如果有线程获取了读锁且是当前线程是第一个读者 // 则把其重入次数加1 firstreaderholdcount++; } else { // 如果有线程获取了读锁且当前线程不是第一个读者 // 则从缓存中获取重入次数保存器 holdcounter rh = cachedholdcounter; // 如果缓存不属性当前线程 // 再从threadlocal中获取 // readholds本身是一个threadlocal,里面存储的是holdcounter if (rh == null || rh.tid != getthreadid(current)) // get()的时候会初始化rh cachedholdcounter = rh = readholds.get(); else if (rh.count == 0) // 如果rh的次数为0,把它放到threadlocal中去 readholds.set(rh); // 重入的次数加1(初始次数为0) rh.count++; } // 获取读锁成功,返回1 return 1; } // 通过这个方法再去尝试获取读锁(如果之前其它线程获取了写锁,一样返回-1表示失败) return fulltryacquireshared(current); } // abstractqueuedsynchronizer.doacquireshared() private void doacquireshared(int arg) { // 进入aqs的队列中 final node node = addwaiter(node.shared); boolean failed = true; try { boolean interrupted = false; for (;;) { // 当前节点的前一个节点 final node p = node.predecessor(); // 如果前一个节点是头节点(说明是第一个排队的节点) if (p == head) { // 再次尝试获取读锁 int r = tryacquireshared(arg); // 如果成功了 if (r >= 0) { // 头节点后移并传播 // 传播即唤醒后面连续的读节点 setheadandpropagate(node, r); p.next = null; // help gc if (interrupted) selfinterrupt(); failed = false; return; } } // 没获取到读锁,阻塞并等待被唤醒 if (shouldparkafterfailedacquire(p, node) && parkandcheckinterrupt()) interrupted = true; } } finally { if (failed) cancelacquire(node); } } // abstractqueuedsynchronizer.setheadandpropagate() private void setheadandpropagate(node node, int propagate) { // h为旧的头节点 node h = head; // 设置当前节点为新头节点 sethead(node); // 如果旧的头节点或新的头节点为空或者其等待状态小于0(表示状态为signal/propagate) if (propagate > 0 || h == null || h.waitstatus < 0 || (h = head) == null || h.waitstatus < 0) { // 需要传播 // 取下一个节点 node s = node.next; // 如果下一个节点为空,或者是需要获取读锁的节点 if (s == null || s.isshared()) // 唤醒下一个节点 doreleaseshared(); } } // abstractqueuedsynchronizer.doreleaseshared() // 这个方法只会唤醒一个节点 private void doreleaseshared() { for (;;) { node h = head; if (h != null && h != tail) { int ws = h.waitstatus; // 如果头节点状态为signal,说明要唤醒下一个节点 if (ws == node.signal) { if (!compareandsetwaitstatus(h, node.signal, 0)) continue; // loop to recheck cases // 唤醒下一个节点 unparksuccessor(h); } else if (ws == 0 && // 把头节点的状态改为propagate成功才会跳到下面的if !compareandsetwaitstatus(h, 0, node.propagate)) continue; // loop on failed cas } // 如果唤醒后head没变,则跳出循环 if (h == head) // loop if head changed break; } }
看完【死磕 java同步系列之reentrantlock源码解析(一)——公平锁、非公平锁】的分析再看这章的内容应该会比较简单,中间一样的方法我们这里直接跳过了。
我们来看看大致的逻辑:
(1)先尝试获取读锁;
(2)如果成功了直接结束;
(3)如果失败了,进入doacquireshared()方法;
(4)doacquireshared()方法中首先会生成一个新节点并进入aqs队列中;
(5)如果头节点正好是当前节点的上一个节点,再次尝试获取锁;
(6)如果成功了,则设置头节点为新节点,并传播;
(7)传播即唤醒下一个读节点(如果下一个节点是读节点的话);
(8)如果头节点不是当前节点的上一个节点或者(5)失败,则阻塞当前线程等待被唤醒;
(9)唤醒之后继续走(5)的逻辑;
在整个逻辑中是在哪里连续唤醒读节点的呢?
答案是在doacquireshared()方法中,在这里一个节点a获取了读锁后,会唤醒下一个读节点b,这时候b也会获取读锁,然后b继续唤醒c,依次往复,也就是说这里的节点是一个唤醒一个这样的形式,而不是一个节点获取了读锁后一次性唤醒后面所有的读节点。
readlock.unlock()
// java.util.concurrent.locks.reentrantreadwritelock.readlock.unlock public void unlock() { sync.releaseshared(1); } // java.util.concurrent.locks.abstractqueuedsynchronizer.releaseshared public final boolean releaseshared(int arg) { // 如果尝试释放成功了,就唤醒下一个节点 if (tryreleaseshared(arg)) { // 这个方法实际是唤醒下一个节点 doreleaseshared(); return true; } return false; } // java.util.concurrent.locks.reentrantreadwritelock.sync.tryreleaseshared protected final boolean tryreleaseshared(int unused) { thread current = thread.currentthread(); if (firstreader == current) { // 如果第一个读者(读线程)是当前线程 // 就把它重入的次数减1 // 如果减到0了就把第一个读者置为空 if (firstreaderholdcount == 1) firstreader = null; else firstreaderholdcount--; } else { // 如果第一个读者不是当前线程 // 一样地,把它重入的次数减1 holdcounter rh = cachedholdcounter; if (rh == null || rh.tid != getthreadid(current)) rh = readholds.get(); int count = rh.count; if (count <= 1) { readholds.remove(); if (count <= 0) throw unmatchedunlockexception(); } --rh.count; } for (;;) { // 共享锁获取的次数减1 // 如果减为0了说明完全释放了,才返回true int c = getstate(); int nextc = c - shared_unit; if (compareandsetstate(c, nextc)) return nextc == 0; } } // java.util.concurrent.locks.abstractqueuedsynchronizer.doreleaseshared // 行为跟方法名有点不符,实际是唤醒下一个节点 private void doreleaseshared() { for (;;) { node h = head; if (h != null && h != tail) { int ws = h.waitstatus; // 如果头节点状态为signal,说明要唤醒下一个节点 if (ws == node.signal) { if (!compareandsetwaitstatus(h, node.signal, 0)) continue; // loop to recheck cases // 唤醒下一个节点 unparksuccessor(h); } else if (ws == 0 && // 把头节点的状态改为propagate成功才会跳到下面的if !compareandsetwaitstatus(h, 0, node.propagate)) continue; // loop on failed cas } // 如果唤醒后head没变,则跳出循环 if (h == head) // loop if head changed break; } }
解锁的大致流程如下:
(1)将当前线程重入的次数减1;
(2)将共享锁总共被获取的次数减1;
(3)如果共享锁获取的次数减为0了,说明共享锁完全释放了,那就唤醒下一个节点;
如下图,abc三个节点各获取了一次共享锁,三者释放的顺序分别为acb,那么最后b释放共享锁的时候tryreleaseshared()才会返回true,进而才会唤醒下一个节点d。
writelock.lock()
// java.util.concurrent.locks.reentrantreadwritelock.writelock.lock() public void lock() { sync.acquire(1); } // java.util.concurrent.locks.abstractqueuedsynchronizer.acquire() public final void acquire(int arg) { // 先尝试获取锁 // 如果失败,则会进入队列中排队,后面的逻辑跟reentrantlock一模一样了 if (!tryacquire(arg) && acquirequeued(addwaiter(node.exclusive), arg)) selfinterrupt(); } // java.util.concurrent.locks.reentrantreadwritelock.sync.tryacquire() protected final boolean tryacquire(int acquires) { thread current = thread.currentthread(); // 状态变量state的值 int c = getstate(); // 互斥锁被获取的次数 int w = exclusivecount(c); if (c != 0) { // 如果c!=0且w==0,说明共享锁被获取的次数不为0 // 这句话整个的意思就是 // 如果共享锁被获取的次数不为0,或者被其它线程获取了互斥锁(写锁) // 那么就返回false,获取写锁失败 if (w == 0 || current != getexclusiveownerthread()) return false; // 溢出检测 if (w + exclusivecount(acquires) > max_count) throw new error("maximum lock count exceeded"); // 到这里说明当前线程已经获取过写锁,这里是重入了,直接把state加1即可 setstate(c + acquires); // 获取写锁成功 return true; } // 如果c等于0,就尝试更新state的值(非公平模式writershouldblock()返回false) // 如果失败了,说明获取写锁失败,返回false // 如果成功了,说明获取写锁成功,把自己设置为占有者,并返回true if (writershouldblock() || !compareandsetstate(c, c + acquires)) return false; setexclusiveownerthread(current); return true; } // 获取写锁失败了后面的逻辑跟reentrantlock是一致的,进入队列排队,这里就不列源码了
写锁获取的过程大致如下:
(1)尝试获取锁;
(2)如果有读者占有着读锁,尝试获取写锁失败;
(3)如果有其它线程占有着写锁,尝试获取写锁失败;
(4)如果是当前线程占有着写锁,尝试获取写锁成功,state值加1;
(5)如果没有线程占有着锁(state==0),当前线程尝试更新state的值,成功了表示尝试获取锁成功,否则失败;
(6)尝试获取锁失败以后,进入队列排队,等待被唤醒;
(7)后续逻辑跟reentrantlock是一致;
writelock.unlock()
// java.util.concurrent.locks.reentrantreadwritelock.writelock.unlock() public void unlock() { sync.release(1); } //java.util.concurrent.locks.abstractqueuedsynchronizer.release() public final boolean release(int arg) { // 如果尝试释放锁成功(完全释放锁) // 就尝试唤醒下一个节点 if (tryrelease(arg)) { node h = head; if (h != null && h.waitstatus != 0) unparksuccessor(h); return true; } return false; } // java.util.concurrent.locks.reentrantreadwritelock.sync.tryrelease() protected final boolean tryrelease(int releases) { // 如果写锁不是当前线程占有着,抛出异常 if (!isheldexclusively()) throw new illegalmonitorstateexception(); // 状态变量的值减1 int nextc = getstate() - releases; // 是否完全释放锁 boolean free = exclusivecount(nextc) == 0; if (free) setexclusiveownerthread(null); // 设置状态变量的值 setstate(nextc); // 如果完全释放了写锁,返回true return free; }
写锁释放的过程大致为:
(1)先尝试释放锁,即状态变量state的值减1;
(2)如果减为0了,说明完全释放了锁;
(3)完全释放了锁才唤醒下一个等待的节点;
总结
(1)reentrantreadwritelock采用读写锁的思想,能提高并发的吞吐量;
(2)读锁使用的是共享锁,多个读锁可以一起获取锁,互相不会影响,即读读不互斥;
(3)读写、写读和写写是会互斥的,前者占有着锁,后者需要进入aqs队列中排队;
(4)多个连续的读线程是一个接着一个被唤醒的,而不是一次性唤醒所有读线程;
(5)只有多个读锁都完全释放了才会唤醒下一个写线程;
(6)只有写锁完全释放了才会唤醒下一个等待者,这个等待者有可能是读线程,也可能是写线程;
彩蛋
(1)如果同一个线程先获取读锁,再获取写锁会怎样?
分析上图中的代码,在tryacquire()方法中,如果读锁被获取的次数不为0(c != 0 && w == 0),返回false,返回之后外层方法会让当前线程阻塞。
可以通过下面的方法验证:
readlock.lock(); writelock.lock(); writelock.unlock(); readlock.unlock();
运行程序后会发现代码停止在writelock.lock();
,当然,你也可以打个断点跟踪进去看看。
(2)如果同一个线程先获取写锁,再获取读锁会怎样?
分析上面的代码,在tryacquireshared()方法中,第一个红框处并不会返回,因为不满足getexclusiveownerthread() != current
;第二个红框处如果原子更新成功就说明获取了读锁,然后就会执行第三个红框处的代码把其重入次数更改为1。
可以通过下面的方法验证:
writelock.lock(); readlock.lock(); readlock.unlock(); writelock.unlock();
你可以打个断点跟踪一下看看。
(3)死锁了么?
通过上面的两个例子,我们可以感受到同一个线程先读后写和先写后读是完全不一样的,为什么不一样呢?
先读后写,一个线程占有读锁后,其它线程还是可以占有读锁的,这时候如果在其它线程占有读锁之前让自己占有了写锁,其它线程又不能占有读锁了,这段程序会非常难实现,逻辑也很奇怪,所以,设计成只要一个线程占有了读锁,其它线程包括它自己都不能再获取写锁。
先写后读,一个线程占有写锁后,其它线程是不能占有任何锁的,这时候,即使自己占有一个读锁,对程序的逻辑也不会有任何影响,所以,一个线程占有写锁后是可以再占有读锁的,只是这个时候其它线程依然无法获取读锁。
如果你仔细思考上面的逻辑,你会发现一个线程先占有读锁后占有写锁,会有一个很大的问题——锁无法被释放也无法被获取了。这个线程先占有了读锁,然后自己再占有写锁的时候会阻塞,然后它就自己把自己搞死了,进而把其它线程也搞死了,它无法释放锁,其它线程也无法获得锁了。
这是死锁吗?似乎不是,死锁的定义是线程a占有着线程b需要的资源,线程b占有着线程a需要的资源,两个线程相互等待对方释放资源,经典的死锁例子如下:
object a = new object(); object b = new object(); new thread(()->{ synchronized (a) { locksupport.parknanos(1000000); synchronized (b) { } } }).start(); new thread(()->{ synchronized (b) { synchronized (a) { } } }).start();
简单的死锁用jstack是可以看到的:
"thread-1": at com.coolcoding.code.synchronize.reentrantreadwritelocktest.lambda$main$1(reentrantreadwritelocktest.java:40) - waiting to lock <0x000000076baa9068> (a java.lang.object) - locked <0x000000076baa9078> (a java.lang.object) at com.coolcoding.code.synchronize.reentrantreadwritelocktest$$lambda$2/1831932724.run(unknown source) at java.lang.thread.run(thread.java:748) "thread-0": at com.coolcoding.code.synchronize.reentrantreadwritelocktest.lambda$main$0(reentrantreadwritelocktest.java:32) - waiting to lock <0x000000076baa9078> (a java.lang.object) - locked <0x000000076baa9068> (a java.lang.object) at com.coolcoding.code.synchronize.reentrantreadwritelocktest$$lambda$1/1096979270.run(unknown source) at java.lang.thread.run(thread.java:748) found 1 deadlock.
(4)如何使用reentrantreadwritelock实现一个高效安全的treemap?
class safetreemap { private final map<string, object> m = new treemap<string, object>(); private final reentrantreadwritelock lock = new reentrantreadwritelock(); private final lock readlock = lock.readlock(); private final lock writelock = lock.writelock(); public object get(string key) { readlock.lock(); try { return m.get(key); } finally { readlock.unlock(); } } public object put(string key, object value) { writelock.lock(); try { return m.put(key, value); } finally { writelock.unlock(); } } }
推荐阅读
欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。
上一篇: php如何将json字符串转为数组