基于AbstractQueuedSynchronizer的并发类的详解
前面的文章研究了AbstractQueuedSynchronizer的独占锁和共享锁,有了前两篇文章的基础,就可以乘胜追击,看一下基于AbstractQueuedSynchronizer的并发类是如何实现的。
ReentrantLock显然是一种独占锁,首先是公平模式的ReentrantLock,Sync是ReentractLock中的基础类,继承自AbstractQueuedSynchronizer,看一下代码实现:
1 abstract static class Sync extends AbstractQueuedSynchronizer { 2 private static final long serialVersionUID = -5179523762034025860L; 3 4 /** 5 * Performs {@link Lock#lock}. The main reason for subclassing 6 * is to allow fast path for nonfair version. 7 */ 8 abstract void lock(); 9 10 /**11 * Performs non-fair tryLock. tryAcquire is12 * implemented in subclasses, but both need nonfair13 * try for trylock method.14 */15 final boolean nonfairTryAcquire(int acquires) {16 final Thread current = Thread.currentThread();17 int c = getState();18 if (c == 0) {19 if (compareAndSetState(0, acquires)) {20 setExclusiveOwnerThread(current);21 return true;22 }23 }24 else if (current == getExclusiveOwnerThread()) {25 int nextc = c + acquires;26 if (nextc < 0) // overflow27 throw new Error("Maximum lock count exceeded");28 setState(nextc);29 return true;30 }31 return false;32 }33 34 protected final boolean tryRelease(int releases) {35 int c = getState() - releases;36 if (Thread.currentThread() != getExclusiveOwnerThread())37 throw new IllegalMonitorStateException();38 boolean free = false;39 if (c == 0) {40 free = true;41 setExclusiveOwnerThread(null);42 }43 setState(c);44 return free;45 }46 47 protected final boolean isHeldExclusively() {48 // While we must in general read state before owner,49 // we don't need to do so to check if current thread is owner50 return getExclusiveOwnerThread() == Thread.currentThread();51 }52 53 final ConditionObject newCondition() {54 return new ConditionObject();55 }56 57 // Methods relayed from outer class58 59 final Thread getOwner() {60 return getState() == 0 ? null : getExclusiveOwnerThread();61 }62 63 final int getHoldCount() {64 return isHeldExclusively() ? getState() : 0;65 }66 67 final boolean isLocked() {68 return getState() != 0;69 }70 71 /**72 * Reconstitutes this lock instance from a stream.73 * @param s the stream74 */75 private void readObject(java.io.ObjectInputStream s)76 throws java.io.IOException, ClassNotFoundException {77 s.defaultReadObject();78 setState(0); // reset to unlocked state79 }80 }
Sync属于一个公共类,它是抽象的说明Sync会被继承,简单整理一下Sync主要做了哪些事(因为Sync不是ReentrantLock公平锁的关键):
定义了一个lock方法让子类去实现,我们平时之所以能调用ReentrantLock的lock()方法,就是因为Sync定义了它
实现了非公平锁tryAcquira的方法
实现了tryRelease方法,比较简单,状态-1,独占锁的线程置空
实现了isHeldExclusively方法
定义了newCondition方法,让开发者可以利用Condition实现通知/等待
接着,看一下公平锁的实现,FairSync类,它继承自Sync:
1 static final class FairSync extends Sync { 2 private static final long serialVersionUID = -3000897897090466540L; 3 4 final void lock() { 5 acquire(1); 6 } 7 8 /** 9 * Fair version of tryAcquire. Don't grant access unless10 * recursive call or no waiters or is first.11 */12 protected final boolean tryAcquire(int acquires) {13 final Thread current = Thread.currentThread();14 int c = getState();15 if (c == 0) {16 if (!hasQueuedPredecessors() &&17 compareAndSetState(0, acquires)) {18 setExclusiveOwnerThread(current);19 return true;20 }21 }22 else if (current == getExclusiveOwnerThread()) {23 int nextc = c + acquires;24 if (nextc < 0)25 throw new Error("Maximum lock count exceeded");26 setState(nextc);27 return true;28 }29 return false;30 }31 }
整理一下要点:
每次acquire的时候,state+1,如果当前线程lock()之后又lock()了,state不断+1,相应的unlock()的时候state-1,直到将state减到0为之,说明当前线程释放完所有的状态,其它线程可以竞争
state=0的时候,通过hasQueuedPredecessors方法做一次判断,hasQueuedPredecessors的实现为"h != t && ((s = h.next) == null || s.thread != Thread.currentThread());",其中h是head、t是tail,由于代码中对结果取反,因此取反之后的判断为"h == t || ((s = h.next) != null && s.thread == Thread.currentThread());",总结起来有两种情况可以通过!hasQueuedPredecessors()这个判断:
h==t,h==t的情况为要么当前FIFO队列中没有任何数据,要么只构建出了一个head还没往后面连过任何一个Node,因此head就是tail
(s = h.next) != null && s.thread == Thread.currentThread(),当前线程为正在等待的第一个Node中的线程
如果没有线程比当前线程等待更久去执行acquire操作,那么通过CAS操作将state从0变为1的线程tryAcquire成功
没有tryAcquire成功的线程,按照tryAcquire的先后顺序,构建为一个FIFO队列,即第一个tryAcquire失败的排在head的后一位,第二个tryAcquire失败的排在head的后二位
当tryAcquire成功的线程release完毕,第一个tryAcquire失败的线程第一个尝试tryAcquire,这就是先到先得,典型的公平锁
非公平模式ReentrantLock实现原理
看完了公平模式ReentrantLock,接着我们看一下非公平模式ReentrantLock是如何实现的。NonfairSync类,同样是继承自Sync类,实现为:
1 static final class NonfairSync extends Sync { 2 private static final long serialVersionUID = 7316153563782823691L; 3 4 /** 5 * Performs lock. Try immediate barge, backing up to normal 6 * acquire on failure. 7 */ 8 final void lock() { 9 if (compareAndSetState(0, 1))10 setExclusiveOwnerThread(Thread.currentThread());11 else12 acquire(1);13 }14 15 protected final boolean tryAcquire(int acquires) {16 return nonfairTryAcquire(acquires);17 }18 }
结合nonfairTryAcquire方法一起讲解,nonfairTryAcquire方法的实现为:
1 final boolean nonfairTryAcquire(int acquires) { 2 final Thread current = Thread.currentThread(); 3 int c = getState(); 4 if (c == 0) { 5 if (compareAndSetState(0, acquires)) { 6 setExclusiveOwnerThread(current); 7 return true; 8 } 9 }10 else if (current == getExclusiveOwnerThread()) {11 int nextc = c + acquires;12 if (nextc < 0) // overflow13 throw new Error("Maximum lock count exceeded");14 setState(nextc);15 return true;16 }17 return false;18 }
看到差别就在于非公平锁lock()的时候会先尝试通过CAS看看能不能把state从0变为1(即获取锁),如果可以的话,直接获取锁而不需要排队。举个实际例子就很好理解了:
线程1、线程2、线程3竞争锁,线程1竞争成功获取锁,线程2、线程3依次排队
线程1执行完毕,释放锁,state变为0,唤醒了第一个排队的线程2
此时线程4来尝试获取锁了,由于线程2被唤醒了,因此线程2与线程4竞争锁
线程4成功将state从0变为1,线程2竞争锁失败,继续park
看到整个过程中,后来的线程4反而比先来的线程2先获取锁,相当于是一种非公平的模式,
那为什么非公平锁效率会比公平锁效率高?上面第(3)步如果线程2和线程4不竞争锁就是答案。为什么这么说,后面的解释很重要,希望大家可以理解:
线程1是先将state设为0,再去唤醒线程2,这两个过程之间是有时间差的。
那么如果线程1将state设置为0的时候,线程4就通过CAS算法获取到了锁,且在线程1唤醒线程2之前就已经使用完毕锁,那么相当于线程2获取锁的时间并没有推迟,在线程1将state设置为0到线程1唤醒线程2的这段时间里,反而有线程4获取了锁执行了任务,这就增加了系统的吞吐量,相当于单位时间处理了更多的任务。
从这段解释我们也应该能看出来了,非公平锁比较适合加锁时间比较短的任务。这是因为加锁时间长,相当于线程2将state设为0并去唤醒线程2的这段时间,线程4无法完成释放锁,那么线程2被唤醒由于没法获取到锁,又被阻塞了,这种唤醒-阻塞的操作会引起线程的上下文切换,继而影响系统的性能。
Semaphore实现原理
Semaphore即信号量,用于控制代码块的并发数,将Semaphore的permits设置为1相当于就是synchronized或者ReentrantLock,Semaphore具体用法可见Java多线程19:多线程下的其他组件之CountDownLatch、Semaphore、Exchanger。信号量允许多条线程获取锁,显然它的锁是一种共享锁,信号量也有公平模式与非公平模式,相信看懂了上面ReentrantLock的公平模式与非公平模式的朋友应该对Semaphore的公平模式与非公平模式理解起来会更快,这里就放在一起写了。
首先还是看一下Semaphore的基础设施,它和ReentrantLock一样,也有一个Sync:
1 abstract static class Sync extends AbstractQueuedSynchronizer { 2 private static final long serialVersionUID = 1192457210091910933L; 3 4 Sync(int permits) { 5 setState(permits); 6 } 7 8 final int getPermits() { 9 return getState();10 }11 12 final int nonfairTryAcquireShared(int acquires) {13 for (;;) {14 int available = getState();15 int remaining = available - acquires;16 if (remaining < 0 ||17 compareAndSetState(available, remaining))18 return remaining;19 }20 }21 22 protected final boolean tryReleaseShared(int releases) {23 for (;;) {24 int current = getState();25 int next = current + releases;26 if (next < current) // overflow27 throw new Error("Maximum permit count exceeded");28 if (compareAndSetState(current, next))29 return true;30 }31 }32 33 final void reducePermits(int reductions) {34 for (;;) {35 int current = getState();36 int next = current - reductions;37 if (next > current) // underflow38 throw new Error("Permit count underflow");39 if (compareAndSetState(current, next))40 return;41 }42 }43 44 final int drainPermits() {45 for (;;) {46 int current = getState();47 if (current == 0 || compareAndSetState(current, 0))48 return current;49 }50 }51 }
和ReentrantLock的Sync差不多,Semaphore的Sync定义了以下的一些主要内容:
getPermits方法获取当前的许可剩余量还剩多少,即还有多少线程可以同时获得信号量
定义了非公平信号量获取共享锁的逻辑nonfairTryAcquireShared
定义了公平模式释放信号量的逻辑tryReleaseShared,相当于释放一次信号量,state就向上+1(信号量每次的获取与释放都是以1为单位的)
再看下公平信号量的实现,同样的FairSync,继承自Sync,代码为:
1 static final class FairSync extends Sync { 2 private static final long serialVersionUID = 2014338818796000944L; 3 4 FairSync(int permits) { 5 super(permits); 6 } 7 8 protected int tryAcquireShared(int acquires) { 9 for (;;) {10 if (hasQueuedPredecessors())11 return -1;12 int available = getState();13 int remaining = available - acquires;14 if (remaining < 0 ||15 compareAndSetState(available, remaining))16 return remaining;17 }18 }19 }
首先第10行的hasQueuedPredecessors方法,前面已经说过了,如果已经有了FIFO队列或者当前线程不是FIFO队列中在等待的第一条线程,返回-1,表示无法获取共享锁成功。
接着获取available,available就是state,用volatile修饰,所以线程中可以看到最新的state,信号量的acquires是1,每次获取信号量都对state-1,两种情况直接返回:
remaining减完<0
通过cas设置成功
之后就是和之前说过的共享锁的逻辑了,如果返回的是一个<0的数字,那么构建FIFO队列,线程阻塞,直到前面的执行完才能唤醒后面的。
接着看一下非公平信号量的实现,NonfairSync继承Sync:
1 static final class NonfairSync extends Sync { 2 private static final long serialVersionUID = -2694183684443567898L; 3 4 NonfairSync(int permits) { 5 super(permits); 6 } 7 8 protected int tryAcquireShared(int acquires) { 9 return nonfairTryAcquireShared(acquires);10 }11 }
nonfairTryAcquireShared在父类已经实现了,再贴一下代码:
1 final int nonfairTryAcquireShared(int acquires) {2 for (;;) {3 int available = getState();4 int remaining = available - acquires;5 if (remaining < 0 ||6 compareAndSetState(available, remaining))7 return remaining;8 }9 }
看到这里和公平Semaphore只有一点差别:不会前置进行一次hasQueuedPredecessors()判断。即当前有没有构建为一个FIFO队列,队列里面第一个等待的线程是不是自身都无所谓,对于非公平Semaphore都一样,反正线程调用Semaphore的acquire方法就将当前state-1,如果得到的remaining设置成功或者CAS操作成功就返回,这种操作没有遵循先到先得的原则,即非公平信号量。
至于非公平信号量对比公平信号量的优点,和ReentrantLock的非公平锁对比ReentrantLock的公平锁一样,就不说了。
CountDownLatch实现原理
CountDownLatch即计数器自减的一种闭锁,某线程阻塞,对一个计数器自减到0,此线程被唤醒,CountDownLatch具体用法可见Java多线程19:多线程下的其他组件之CountDownLatch、Semaphore、Exchanger。
CountDownLatch是一种共享锁,通过await()方法与countDown()两个方法实现自身的功能,首先看一下await()方法的实现:
1 public void await() throws InterruptedException { 2 sync.acquireSharedInterruptibly(1); 3 }
acquireSharedInterruptibly最终又回到tryAcquireShared方法上,直接贴整个Sync的代码实现:
1 private static final class Sync extends AbstractQueuedSynchronizer { 2 private static final long serialVersionUID = 4982264981922014374L; 3 4 Sync(int count) { 5 setState(count); 6 } 7 8 int getCount() { 9 return getState();10 }11 12 protected int tryAcquireShared(int acquires) {13 return (getState() == 0) ? 1 : -1;14 }15 16 protected boolean tryReleaseShared(int releases) {17 // Decrement count; signal when transition to zero18 for (;;) {19 int c = getState();20 if (c == 0)21 return false;22 int nextc = c-1;23 if (compareAndSetState(c, nextc))24 return nextc == 0;25 }26 }27 }
其实看到tryAcquireShared方法,理解AbstractQueuedSynchronizer共享锁原理的,不用看countDown方法应该都能猜countDown方法是如何实现的。我这里总结一下:
传入一个count,state就等于count,await的时候判断是不是0,是0返回1表示成功,不是0返回-1表示失败,构建FIFO队列,head头只连接一个Node,Node中的线程就是调用CountDownLatch的await()方法的线程
每次countDown的时候对state-1,直到state减到0的时候才算tryReleaseShared成功,tryReleaseShared成功,唤醒被挂起的线程
为了验证(2),看一下上面Sync的tryReleaseShared方法就可以了,确实是这么实现的。
以上就是基于AbstractQueuedSynchronizer的并发类的详解的详细内容,更多请关注其它相关文章!