Java concurrency之共享锁和ReentrantReadWriteLock_动力节点Java学院整理
readwritelock 和 reentrantreadwritelock介绍
readwritelock,顾名思义,是读写锁。它维护了一对相关的锁 — — “读取锁”和“写入锁”,一个用于读取操作,另一个用于写入操作。
“读取锁”用于只读操作,它是“共享锁”,能同时被多个线程获取。
“写入锁”用于写入操作,它是“独占锁”,写入锁只能被一个线程锁获取。
注意:不能同时存在读取锁和写入锁!
readwritelock是一个接口。reentrantreadwritelock是它的实现类,reentrantreadwritelock包括子类readlock和writelock。
readwritelock 和 reentrantreadwritelock函数列表
readwritelock函数列表
// 返回用于读取操作的锁。 lock readlock() // 返回用于写入操作的锁。 lock writelock()
reentrantreadwritelock函数列表
// 创建一个新的 reentrantreadwritelock,默认是采用“非公平策略”。 reentrantreadwritelock() // 创建一个新的 reentrantreadwritelock,fair是“公平策略”。fair为true,意味着公平策略;否则,意味着非公平策略。 reentrantreadwritelock(boolean fair) // 返回当前拥有写入锁的线程,如果没有这样的线程,则返回 null。 protected thread getowner() // 返回一个 collection,它包含可能正在等待获取读取锁的线程。 protected collection<thread> getqueuedreaderthreads() // 返回一个 collection,它包含可能正在等待获取读取或写入锁的线程。 protected collection<thread> getqueuedthreads() // 返回一个 collection,它包含可能正在等待获取写入锁的线程。 protected collection<thread> getqueuedwriterthreads() // 返回等待获取读取或写入锁的线程估计数目。 int getqueuelength() // 查询当前线程在此锁上保持的重入读取锁数量。 int getreadholdcount() // 查询为此锁保持的读取锁数量。 int getreadlockcount() // 返回一个 collection,它包含可能正在等待与写入锁相关的给定条件的那些线程。 protected collection<thread> getwaitingthreads(condition condition) // 返回正等待与写入锁相关的给定条件的线程估计数目。 int getwaitqueuelength(condition condition) // 查询当前线程在此锁上保持的重入写入锁数量。 int getwriteholdcount() // 查询是否给定线程正在等待获取读取或写入锁。 boolean hasqueuedthread(thread thread) // 查询是否所有的线程正在等待获取读取或写入锁。 boolean hasqueuedthreads() // 查询是否有些线程正在等待与写入锁有关的给定条件。 boolean haswaiters(condition condition) // 如果此锁将公平性设置为 ture,则返回 true。 boolean isfair() // 查询是否某个线程保持了写入锁。 boolean iswritelocked() // 查询当前线程是否保持了写入锁。 boolean iswritelockedbycurrentthread() // 返回用于读取操作的锁。 reentrantreadwritelock.readlock readlock() // 返回用于写入操作的锁。 reentrantreadwritelock.writelock writelock()
reentrantreadwritelock数据结构
reentrantreadwritelock的uml类图如下:
从中可以看出:
(01) reentrantreadwritelock实现了readwritelock接口。readwritelock是一个读写锁的接口,提供了"获取读锁的readlock()函数" 和 "获取写锁的writelock()函数"。
(02) reentrantreadwritelock中包含:sync对象,读锁readerlock和写锁writerlock。读锁readlock和写锁writelock都实现了lock接口。读锁readlock和写锁writelock中也都分别包含了"sync对象",它们的sync对象和reentrantreadwritelock的sync对象 是一样的,就是通过sync,读锁和写锁实现了对同一个对象的访问。
(03) 和"reentrantlock"一样,sync是sync类型;而且,sync也是一个继承于aqs的抽象类。sync也包括"公平锁"fairsync和"非公平锁"nonfairsync。sync对象是"fairsync"和"nonfairsync"中的一个,默认是"nonfairsync"。
其中,共享锁源码相关的代码如下:
public static class readlock implements lock, java.io.serializable { private static final long serialversionuid = -5992448646407690164l; // reentrantreadwritelock的aqs对象 private final sync sync; protected readlock(reentrantreadwritelock lock) { sync = lock.sync; } // 获取“共享锁” public void lock() { sync.acquireshared(1); } // 如果线程是中断状态,则抛出一场,否则尝试获取共享锁。 public void lockinterruptibly() throws interruptedexception { sync.acquiresharedinterruptibly(1); } // 尝试获取“共享锁” public boolean trylock() { return sync.tryreadlock(); } // 在指定时间内,尝试获取“共享锁” public boolean trylock(long timeout, timeunit unit) throws interruptedexception { return sync.tryacquiresharednanos(1, unit.tonanos(timeout)); } // 释放“共享锁” public void unlock() { sync.releaseshared(1); } // 新建条件 public condition newcondition() { throw new unsupportedoperationexception(); } public string tostring() { int r = sync.getreadlockcount(); return super.tostring() + "[read locks = " + r + "]"; } }
说明:
readlock中的sync是一个sync对象,sync继承于aqs类,即sync就是一个锁。reentrantreadwritelock中也有一个sync对象,而且readlock中的sync和reentrantreadwritelock中的sync是对应关系。即reentrantreadwritelock和readlock共享同一个aqs对象,共享同一把锁。
reentrantreadwritelock中sync的定义如下:
final sync sync;
下面,分别从“获取共享锁”和“释放共享锁”两个方面对共享锁进行说明。
获取共享锁
获取共享锁的思想(即lock函数的步骤),是先通过tryacquireshared()尝试获取共享锁。尝试成功的话,则直接返回;尝试失败的话,则通过doacquireshared()不断的循环并尝试获取锁,若有需要,则阻塞等待。doacquireshared()在循环中每次尝试获取锁时,都是通过tryacquireshared()来进行尝试的。下面看看“获取共享锁”的详细流程。
1. lock()
lock()在readlock中,源码如下:
public void lock() { sync.acquireshared(1); }
2. acquireshared()
sync继承于aqs,acquireshared()定义在aqs中。源码如下:
public final void acquireshared(int arg) { if (tryacquireshared(arg) < 0) doacquireshared(arg); }
说明:acquireshared()首先会通过tryacquireshared()来尝试获取锁。
尝试成功的话,则不再做任何动作(因为已经成功获取到锁了)。
尝试失败的话,则通过doacquireshared()来获取锁。doacquireshared()会获取到锁了才返回。
3. tryacquireshared()
tryacquireshared()定义在reentrantreadwritelock.java的sync中,源码如下:
protected final int tryacquireshared(int unused) { thread current = thread.currentthread(); // 获取“锁”的状态 int c = getstate(); // 如果“锁”是“互斥锁”,并且获取锁的线程不是current线程;则返回-1。 if (exclusivecount(c) != 0 && getexclusiveownerthread() != current) return -1; // 获取“读取锁”的共享计数 int r = sharedcount(c); // 如果“不需要阻塞等待”,并且“读取锁”的共享计数小于max_count; // 则通过cas函数更新“锁的状态”,将“读取锁”的共享计数+1。 if (!readershouldblock() && r < max_count && compareandsetstate(c, c + shared_unit)) { // 第1次获取“读取锁”。 if (r == 0) { firstreader = current; firstreaderholdcount = 1; // 如果想要获取锁的线程(current)是第1个获取锁(firstreader)的线程 } else if (firstreader == current) { firstreaderholdcount++; } else { // holdcounter是用来统计该线程获取“读取锁”的次数。 holdcounter rh = cachedholdcounter; if (rh == null || rh.tid != current.getid()) cachedholdcounter = rh = readholds.get(); else if (rh.count == 0) readholds.set(rh); // 将该线程获取“读取锁”的次数+1。 rh.count++; } return 1; } return fulltryacquireshared(current); }
说明:tryacquireshared()的作用是尝试获取“共享锁”。
如果在尝试获取锁时,“不需要阻塞等待”并且“读取锁的共享计数小于max_count”,则直接通过cas函数更新“读取锁的共享计数”,以及将“当前线程获取读取锁的次数+1”。否则,通过fulltryacquireshared()获取读取锁。
4. fulltryacquireshared()
fulltryacquireshared()在reentrantreadwritelock中定义,源码如下:
final int fulltryacquireshared(thread current) { holdcounter rh = null; for (;;) { // 获取“锁”的状态 int c = getstate(); // 如果“锁”是“互斥锁”,并且获取锁的线程不是current线程;则返回-1。 if (exclusivecount(c) != 0) { if (getexclusiveownerthread() != current) return -1; // 如果“需要阻塞等待”。 // (01) 当“需要阻塞等待”的线程是第1个获取锁的线程的话,则继续往下执行。 // (02) 当“需要阻塞等待”的线程获取锁的次数=0时,则返回-1。 } else if (readershouldblock()) { // 如果想要获取锁的线程(current)是第1个获取锁(firstreader)的线程 if (firstreader == current) { } else { if (rh == null) { rh = cachedholdcounter; if (rh == null || rh.tid != current.getid()) { rh = readholds.get(); if (rh.count == 0) readholds.remove(); } } // 如果当前线程获取锁的计数=0,则返回-1。 if (rh.count == 0) return -1; } } // 如果“不需要阻塞等待”,则获取“读取锁”的共享统计数; // 如果共享统计数超过max_count,则抛出异常。 if (sharedcount(c) == max_count) throw new error("maximum lock count exceeded"); // 将线程获取“读取锁”的次数+1。 if (compareandsetstate(c, c + shared_unit)) { // 如果是第1次获取“读取锁”,则更新firstreader和firstreaderholdcount。 if (sharedcount(c) == 0) { firstreader = current; firstreaderholdcount = 1; // 如果想要获取锁的线程(current)是第1个获取锁(firstreader)的线程, // 则将firstreaderholdcount+1。 } else if (firstreader == current) { firstreaderholdcount++; } else { if (rh == null) rh = cachedholdcounter; if (rh == null || rh.tid != current.getid()) rh = readholds.get(); else if (rh.count == 0) readholds.set(rh); // 更新线程的获取“读取锁”的共享计数 rh.count++; cachedholdcounter = rh; // cache for release } return 1; } } }
说明:fulltryacquireshared()会根据“是否需要阻塞等待”,“读取锁的共享计数是否超过限制”等等进行处理。如果不需要阻塞等待,并且锁的共享计数没有超过限制,则通过cas尝试获取锁,并返回1。
5. doacquireshared()
doacquireshared()定义在aqs函数中,源码如下:
private void doacquireshared(int arg) { // addwaiter(node.shared)的作用是,创建“当前线程”对应的节点,并将该线程添加到clh队列中。 final node node = addwaiter(node.shared); boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取“node”的前一节点 final node p = node.predecessor(); // 如果“当前线程”是clh队列的表头,则尝试获取共享锁。 if (p == head) { int r = tryacquireshared(arg); if (r >= 0) { setheadandpropagate(node, r); p.next = null; // help gc if (interrupted) selfinterrupt(); failed = false; return; } } // 如果“当前线程”不是clh队列的表头,则通过shouldparkafterfailedacquire()判断是否需要等待, // 需要的话,则通过parkandcheckinterrupt()进行阻塞等待。若阻塞等待过程中,线程被中断过,则设置interrupted为true。 if (shouldparkafterfailedacquire(p, node) && parkandcheckinterrupt()) interrupted = true; } } finally { if (failed) cancelacquire(node); } }
说明:doacquireshared()的作用是获取共享锁。
它会首先创建线程对应的clh队列的节点,然后将该节点添加到clh队列中。clh队列是管理获取锁的等待线程的队列。
如果“当前线程”是clh队列的表头,则尝试获取共享锁;否则,则需要通过shouldparkafterfailedacquire()判断是否阻塞等待,需要的话,则通过parkandcheckinterrupt()进行阻塞等待。
doacquireshared()会通过for循环,不断的进行上面的操作;目的就是获取共享锁。需要注意的是:doacquireshared()在每一次尝试获取锁时,是通过tryacquireshared()来执行的!
释放共享锁
释放共享锁的思想,是先通过tryreleaseshared()尝试释放共享锁。尝试成功的话,则通过doreleaseshared()唤醒“其他等待获取共享锁的线程”,并返回true;否则的话,返回flase。
1. unlock()
public void unlock() { sync.releaseshared(1); }
说明:该函数实际上调用releaseshared(1)释放共享锁。
2. releaseshared()
releaseshared()在aqs中实现,源码如下:
public final boolean releaseshared(int arg) { if (tryreleaseshared(arg)) { doreleaseshared(); return true; } return false; }
说明:releaseshared()的目的是让当前线程释放它所持有的共享锁。
它首先会通过tryreleaseshared()去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则通过doreleaseshared()去释放共享锁。
3. tryreleaseshared()
tryreleaseshared()定义在reentrantreadwritelock中,源码如下:
protected final boolean tryreleaseshared(int unused) { // 获取当前线程,即释放共享锁的线程。 thread current = thread.currentthread(); // 如果想要释放锁的线程(current)是第1个获取锁(firstreader)的线程, // 并且“第1个获取锁的线程获取锁的次数”=1,则设置firstreader为null; // 否则,将“第1个获取锁的线程的获取次数”-1。 if (firstreader == current) { // assert firstreaderholdcount > 0; if (firstreaderholdcount == 1) firstreader = null; else firstreaderholdcount--; // 获取rh对象,并更新“当前线程获取锁的信息”。 } else { holdcounter rh = cachedholdcounter; if (rh == null || rh.tid != current.getid()) rh = readholds.get(); int count = rh.count; if (count <= 1) { readholds.remove(); if (count <= 0) throw unmatchedunlockexception(); } --rh.count; } for (;;) { // 获取锁的状态 int c = getstate(); // 将锁的获取次数-1。 int nextc = c - shared_unit; // 通过cas更新锁的状态。 if (compareandsetstate(c, nextc)) return nextc == 0; } }
说明:tryreleaseshared()的作用是尝试释放共享锁。
4. doreleaseshared()
doreleaseshared()定义在aqs中,源码如下:
private void doreleaseshared() { for (;;) { // 获取clh队列的头节点 node h = head; // 如果头节点不为null,并且头节点不等于tail节点。 if (h != null && h != tail) { // 获取头节点对应的线程的状态 int ws = h.waitstatus; // 如果头节点对应的线程是signal状态,则意味着“头节点的下一个节点所对应的线程”需要被unpark唤醒。 if (ws == node.signal) { // 设置“头节点对应的线程状态”为空状态。失败的话,则继续循环。 if (!compareandsetwaitstatus(h, node.signal, 0)) continue; // 唤醒“头节点的下一个节点所对应的线程”。 unparksuccessor(h); } // 如果头节点对应的线程是空状态,则设置“文件点对应的线程所拥有的共享锁”为其它线程获取锁的空状态。 else if (ws == 0 && !compareandsetwaitstatus(h, 0, node.propagate)) continue; // loop on failed cas } // 如果头节点发生变化,则继续循环。否则,退出循环。 if (h == head) // loop if head changed break; } }
说明:doreleaseshared()会释放“共享锁”。它会从前往后的遍历clh队列,依次“唤醒”然后“执行”队列中每个节点对应的线程;最终的目的是让这些线程释放它们所持有的锁。
公平共享锁和非公平共享锁
和互斥锁reentrantlock一样,readlock也分为公平锁和非公平锁。
公平锁和非公平锁的区别,体现在判断是否需要阻塞的函数readershouldblock()是不同的。
公平锁的readershouldblock()的源码如下:
final boolean readershouldblock() { return hasqueuedpredecessors(); }
在公平共享锁中,如果在当前线程的前面有其他线程在等待获取共享锁,则返回true;否则,返回false。
非公平锁的readershouldblock()的源码如下:
final boolean readershouldblock() { return apparentlyfirstqueuedisexclusive(); }
在非公平共享锁中,它会无视当前线程的前面是否有其他线程在等待获取共享锁。只要该非公平共享锁对应的线程不为null,则返回true。
reentrantreadwritelock示例
import java.util.concurrent.locks.readwritelock; import java.util.concurrent.locks.reentrantreadwritelock; public class readwritelocktest1 { public static void main(string[] args) { // 创建账户 mycount mycount = new mycount("4238920615242830", 10000); // 创建用户,并指定账户 user user = new user("tommy", mycount); // 分别启动3个“读取账户金钱”的线程 和 3个“设置账户金钱”的线程 for (int i=0; i<3; i++) { user.getcash(); user.setcash((i+1)*1000); } } } class user { private string name; //用户名 private mycount mycount; //所要操作的账户 private readwritelock mylock; //执行操作所需的锁对象 user(string name, mycount mycount) { this.name = name; this.mycount = mycount; this.mylock = new reentrantreadwritelock(); } public void getcash() { new thread() { public void run() { mylock.readlock().lock(); try { system.out.println(thread.currentthread().getname() +" getcash start"); mycount.getcash(); thread.sleep(1); system.out.println(thread.currentthread().getname() +" getcash end"); } catch (interruptedexception e) { } finally { mylock.readlock().unlock(); } } }.start(); } public void setcash(final int cash) { new thread() { public void run() { mylock.writelock().lock(); try { system.out.println(thread.currentthread().getname() +" setcash start"); mycount.setcash(cash); thread.sleep(1); system.out.println(thread.currentthread().getname() +" setcash end"); } catch (interruptedexception e) { } finally { mylock.writelock().unlock(); } } }.start(); } } class mycount { private string id; //账号 private int cash; //账户余额 mycount(string id, int cash) { this.id = id; this.cash = cash; } public string getid() { return id; } public void setid(string id) { this.id = id; } public int getcash() { system.out.println(thread.currentthread().getname() +" getcash cash="+ cash); return cash; } public void setcash(int cash) { system.out.println(thread.currentthread().getname() +" setcash cash="+ cash); this.cash = cash; } }
运行结果:
thread-0 getcash start thread-2 getcash start thread-0 getcash cash=10000 thread-2 getcash cash=10000 thread-0 getcash end thread-2 getcash end thread-1 setcash start thread-1 setcash cash=1000 thread-1 setcash end thread-3 setcash start thread-3 setcash cash=2000 thread-3 setcash end thread-4 getcash start thread-4 getcash cash=2000 thread-4 getcash end thread-5 setcash start thread-5 setcash cash=3000 thread-5 setcash end
结果说明:
(01) 观察thread0和thread-2的运行结果,我们发现,thread-0启动并获取到“读取锁”,在它还没运行完毕的时候,thread-2也启动了并且也成功获取到“读取锁”。
因此,“读取锁”支持被多个线程同时获取。
(02) 观察thread-1,thread-3,thread-5这三个“写入锁”的线程。只要“写入锁”被某线程获取,则该线程运行完毕了,才释放该锁。
因此,“写入锁”不支持被多个线程同时获取。
推荐阅读
-
Java concurrency之共享锁和ReentrantReadWriteLock_动力节点Java学院整理
-
Java concurrency之非公平锁_动力节点Java学院整理
-
Java concurrency之公平锁(一)_动力节点Java学院整理
-
Java concurrency之集合_动力节点Java学院整理
-
Java concurrency之LockSupport_动力节点Java学院整理
-
Java concurrency线程池之线程池原理(一)_动力节点Java学院整理
-
Java concurrency线程池之线程池原理(二)_动力节点Java学院整理
-
Java concurrency集合之CopyOnWriteArraySet_动力节点Java学院整理
-
Java concurrency集合之LinkedBlockingDeque_动力节点Java学院整理
-
Java concurrency之Condition条件_动力节点Java学院整理