Java并发——结合CountDownLatch源码、Semaphore源码及ReentrantLock源码来看AQS原理
前言:
如果说j.u.c包下的核心是什么?那我想答案只有一个就是aqs。那么aqs是什么呢?接下来让我们一起揭开aqs的神秘面纱
aqs是什么?
aqs是abstractqueuedsynchronizer的简称。为什么说它是核心呢?是因为它提供了一个基于fifo的队列和state变量来构建锁和其他同步装置的基础框架。下面是其底层的数据结构。
aqs的特点
1、其内使用node实现fifo(firstinfirstout)队列。可用于构建锁或者其他同步装置的基础框架
2、且利用了一个int类表示状态。在aqs中维护了一个volatile int state,通常表示有线程访问资源的状态,当state>1的时候表示线程重入的数量,主要有三个方法控制:getstate(),setstate(),compareandsetstate()。后面的源码分析多用到这几个方法
3、使用方法是继承,子类通过继承并通过实现它的方法管理其状态(acquire和release)的方法操纵状态。
4、同时实现排它锁和共享锁模式。实际上aqs功能主要分为两类:独占(只有一个线程能执行)和共享(多个线程同时执行),它的子类要么使用独占功能要么使用共享功能,而reentrantlock是通过两个内部类来实现独占和共享
countdownlatch如何借助aqs实现计数功能?
先来说一下countdownlatch,countdownlatch是一个同步辅助类,通过它可以来完成类似阻塞当前线程的功能,即一个或多个线程一起等待,直到其他线程执行的操作完成。要实现上面的功能,countdownlatch是通过一个给定的原子操作的计数器来实现。调用该类的await()方法的线程会一直处于阻塞状态,直到其他线程调用countdown()方法使得计数器的值变为0之后线程才会执行,这个计数器是不能被重置的。通常这个类会用在程序执行需要等待某个条件完成的场景,比如说并行计算,可将一个数据量很大的计算拆分成一个个子任务,当子任务完成之后,再将最终的结果汇总。每次访问countdownlatch只能有一个线程,但是这个线程在使用完countdown()方法之后能多个线程能继续运行,而调用await()方法的线程就一定要计数器为0才会运行
下面来分析countdownlatch的源码以及如何使用aqs框架
public class countdownlatch { /** * countdownlatch 实现同步控制 * 底层是使用aqs的state来代表count */ private static final class sync extends abstractqueuedsynchronizer { private static final long serialversionuid = 4982264981922014374l; //初始化内部类实际上是设置aqs的state sync(int count) { setstate(count); } int getcount() { return getstate(); } //尝试获取共享是看当前的state是否为0 protected int tryacquireshared(int acquires) { return (getstate() == 0) ? 1 : -1; } /*尝试释放共享锁则是递减计数直到state==0就返回false代表资源已经释放完全否则就会使用cas来让state减一*/ protected boolean tryreleaseshared(int releases) { for (;;) { int c = getstate(); if (c == 0) return false; int nextc = c-1; if (compareandsetstate(c, nextc)) return nextc == 0; } } } private final sync sync; /** * 初始化countdownlatch,实际上是初始化内部类,实际上是设置aqs的state,count不能小于0 */ public countdownlatch(int count) { if (count < 0) throw new illegalargumentexception("count < 0"); this.sync = new sync(count); } /** * 这里实际上是调用了aqs里的acquiresharedinterruptibly方法,完成的功能就是先去查看线程是否被中断,中断则抛出异常,没有被中断就会尝试获取共享资源。
* 注意在syn内部类中重写了tryacquireshared,也就是当state为0就返回1,这时候就会将当前线程放入aqs的队列中去,也就是这时候线程可以不再阻塞而是尝试去获取锁 */ public void await() throws interruptedexception { sync.acquiresharedinterruptibly(1); } /** * 原理同上面方法,但是加了一个时间参数来设置等待的时间 */ public boolean await(long timeout, timeunit unit) throws interruptedexception { return sync.tryacquiresharednanos(1, unit.tonanos(timeout)); } /** * 这里传入参数为1,同样上面内部类一样重写了aqs的tryreleaseshared方法,使用这个重写的方法来让计数器原子操作的减一 */ public void countdown() { sync.releaseshared(1); } /** * 就是获取aqs的state */ public long getcount() { return sync.getcount(); } /** * 转换成字符串的方法 */ public string tostring() { return super.tostring() + "[count = " + sync.getcount() + "]"; } }
由上面代码可看见countdownlatch实现了aqs的共享锁,原理是操作state来实现计数,并且重写了tryacquireshared(),tryreleaseshared()等方法
semaphore是如何借助aqs实现控制并发访问线程个数?
semaphore的功能类似于操作系统的信号量,可以很方便的控制某个资源同时被几个线程访问,即做并发访问控制,与countdownlatch类似,同样是实现获取和释放两个方法。semaphore的使用场景:常用于仅能提供访问的资源,比如数据库的连接数最大只有30,而应用程序的并发数可能远远大于30,这时候就可以使用semaphore来控制同时访问的线程数。当semaphore控制线程数到1的时候就和我们单线程一样了。同样semaphore说是信号量的意思,我们这里就可以把它理解为十字路口的红绿灯,可以控制车流量(这里是控制线程数)
下面来分析semaphore的源码以及如何使用aqs框
public class semaphore implements java.io.serializable {
private static final long serialversionuid = -3222578661600680210l;
/** 所有机制都通过abstractqueuedsynchronizer子类实现 */ private final sync sync; /** * 同样是通过内部类来实现aqs主要功能,使用state来表示许可证数量 */ abstract static class sync extends abstractqueuedsynchronizer { private static final long serialversionuid = 1192457210091910933l; sync(int permits) { setstate(permits); } final int getpermits() { return getstate(); } /* * 不公平的获取方式,会有一个抢占锁的情况,即线程执行顺序会乱 */ final int nonfairtryacquireshared(int acquires) { for (;;) { int available = getstate(); int remaining = available - acquires; if (remaining < 0 || compareandsetstate(available, remaining)) return remaining; } } /* * 释放资源 */ protected final boolean tryreleaseshared(int releases) { for (;;) { int current = getstate(); int next = current + releases; if (next < current) // overflow throw new error("maximum permit count exceeded"); if (compareandsetstate(current, next)) return true; } } final void reducepermits(int reductions) { for (;;) { int current = getstate(); int next = current - reductions; if (next > current) // underflow throw new error("permit count underflow"); if (compareandsetstate(current, next)) return; } } final int drainpermits() { for (;;) { int current = getstate(); if (current == 0 || compareandsetstate(current, 0)) return current; } } } /** * 不公平的sync版本,使用的就是sync定义的不公平锁 */ static final class nonfairsync extends sync { private static final long serialversionuid = -2694183684443567898l; nonfairsync(int permits) { super(permits); } protected int tryacquireshared(int acquires) { return nonfairtryacquireshared(acquires); } } /** * 公平版本,获取锁的线程顺序就是线程启动的顺序。具体是使用hasqueuedpredecessors()方法判断“当前线程”是不是clh队列中的第一个线程。
* 若不是的话,则返回-1,是就设置获取许可证,并检查许可证数量是否足够 */ static final class fairsync extends sync { private static final long serialversionuid = 2014338818796000944l; fairsync(int permits) { super(permits); } protected int tryacquireshared(int acquires) { for (;;) { if (hasqueuedpredecessors()) return -1; int available = getstate(); int remaining = available - acquires; if (remaining < 0 || compareandsetstate(available, remaining)) return remaining; } } } /** * 默认使用不公平的版本,如果需要公平的,则需要两个参数 */ public semaphore(int permits) { sync = new nonfairsync(permits); } public semaphore(int permits, boolean fair) { sync = fair ? new fairsync(permits) : new nonfairsync(permits); } /** * 分析同countdownlatch中的类似方法,具体的实现都是内部类中的获取方法,这里是获取一个许可 */ public void acquire() throws interruptedexception { sync.acquiresharedinterruptibly(1); } /** *功能同上,但是这里不会检测线程是否被中断 */ public void acquireuninterruptibly() { sync.acquireshared(1); } /** * 尝试获取 */ public boolean tryacquire() { return sync.nonfairtryacquireshared(1) >= 0; } /** * 在一段时间内一直尝试获取许可 */ public boolean tryacquire(long timeout, timeunit unit) throws interruptedexception { return sync.tryacquiresharednanos(1, unit.tonanos(timeout)); } /** * 当前线程释放一个许可证 */ public void release() { sync.releaseshared(1); } /** * 可以规定一个线程获得许可证的数量 */ public void acquire(int permits) throws interruptedexception { if (permits < 0) throw new illegalargumentexception(); sync.acquiresharedinterruptibly(permits); } public void acquireuninterruptibly(int permits) { if (permits < 0) throw new illegalargumentexception(); sync.acquireshared(permits); } public boolean tryacquire(int permits) { if (permits < 0) throw new illegalargumentexception(); return sync.nonfairtryacquireshared(permits) >= 0; } public boolean tryacquire(int permits, long timeout, timeunit unit) throws interruptedexception { if (permits < 0) throw new illegalargumentexception(); return sync.tryacquiresharednanos(permits, unit.tonanos(timeout)); } /** * 同样可以规定一个线程释放许可证的数量 */ public void release(int permits) { if (permits < 0) throw new illegalargumentexception(); sync.releaseshared(permits); } /** * 当前的许可还剩几个 */ public int availablepermits() { return sync.getpermits(); } /** * 销毁所有许可 */ public int drainpermits() { return sync.drainpermits(); } protected void reducepermits(int reduction) { if (reduction < 0) throw new illegalargumentexception(); sync.reducepermits(reduction); } public final boolean hasqueuedthreads() { return sync.hasqueuedthreads(); } public final int getqueuelength() { return sync.getqueuelength(); } protected collection<thread> getqueuedthreads() { return sync.getqueuedthreads(); }
public string tostring() { return super.tostring() + "[permits = " + sync.getpermits() + "]"; }}
上面对于semaphore的一些重要内部类和常用方法进行了解释,与countdownlatch很类似,实现的都是共享的功能,即semaphore允许得到许可证的线程同时执行,而countdownlatch允许调用countdown()方法的线程同时执行。并且都是通过内部类实现的。相信看到这里,你能越来越看见aqs为什么被称作juc包的核心。下面就来介绍一下reentrantlock
reentrantlock是如何借助aqs实现锁机制
reentrantlock是可重入锁,前面博客中写到synchronized实现的锁也是可重入的。不过synchronized是基于jvm指令实现,而reentrantlock是使用java代码实现的。reentrantlock重点就是需要我们手动声明加锁和释放锁,如果手工忘记释放锁,很有可能就会导致死锁,即资源永远都被锁住,其他线程无法得到,当前线程也释放不出去。reentrantlock实现的是自旋锁,通过循环调用cas操作实现加锁,避免了线程进入内核态的阻塞状态,所以性能较好。reentrantlock内部同样实现了公平锁和非公平锁。事实上synchronized能做的reentrantlock都能做,但是反过来就不一样了、
经过前面的源码分析我们发现核心的都在当前类的内部类里,而当前类的一些方法不过是使用的内部类以及aqs的方法罢了,所以下面我们就来分析reentrantlock中的三个内部类。
public class reentrantlock implements lock, java.io.serializable { private static final long serialversionuid = 7373984872572414699l; /** 同步的机制都是通过内部类来实现的 */ private final sync sync; /** * 在reentrantlock中state表示的是线程重入锁的次数,当state为0时才能释放锁 */ abstract static class sync extends abstractqueuedsynchronizer { private static final long serialversionuid = -5179523762034025860l; /** * 这个抽象方法提供给公平锁和不公平锁来单独实现,父类不实现 */ abstract void lock(); /** * 首先得到当前线程,而后获取state,如果state为0,也就是没有线程获得当前锁,那么就设置当前线程拥有当前锁的独占访问权,并且返回true。
* 如果state不为0,那么就看当前线程是否是已经获得过锁的线程,如果是就让state+=acquire,acquire一般是1,即表示线程重入并且返回true。
* 上面两个条件都不满足就代表是锁被其他线程获取了,当前线程获取不到,所以返回false */ final boolean nonfairtryacquire(int acquires) { final thread current = thread.currentthread(); int c = getstate(); if (c == 0) { if (compareandsetstate(0, acquires)) { setexclusiveownerthread(current); return true; } } else if (current == getexclusiveownerthread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new error("maximum lock count exceeded"); setstate(nextc); return true; } return false; } /** * 先判断当前线程等不等于拥有锁的线程,不等于就会抛异常,也就是释放不了。
* 等于之后就看state-releases是否为0,当为0的时候就代表释放完全。
* 可以设置锁的状态为没有线程拥有,从而让锁能被其他线程竞争,否则就设置state,代表线程重入该锁,并且线程还没释放完全。 */ protected final boolean tryrelease(int releases) { int c = getstate() - releases; if (thread.currentthread() != getexclusiveownerthread()) throw new illegalmonitorstateexception(); boolean free = false; if (c == 0) { free = true; setexclusiveownerthread(null); } setstate(c); return free; } /* *该方法检验当前线程是否是锁的独占者 */ protected final boolean isheldexclusively() { return getexclusiveownerthread() == thread.currentthread(); } /* *该方法是创建一个条件锁,本文不做具体分析 */ final conditionobject newcondition() { return new conditionobject(); } // methods relayed from outer class final thread getowner() { return getstate() == 0 ? null : getexclusiveownerthread(); } final int getholdcount() { return isheldexclusively() ? getstate() : 0; } final boolean islocked() { return getstate() != 0; } /** * 使得该类从流中能重构实例,并且会重置为解锁状态 */ private void readobject(java.io.objectinputstream s) throws java.io.ioexception, classnotfoundexception { s.defaultreadobject(); setstate(0); } } /** * sync 的不公平版本 */ static final class nonfairsync extends sync { private static final long serialversionuid = 7316153563782823691l; /** * 将state从0更新到1成功的话就让当前线程获取锁,否则就会尝试获得锁和获取当前节点的前一节点,并判断这一个节点是否为头节点,即当前线程是不是头节点的直接后继。
* 如果两个中有一个失败则线程中断,进入阻塞状态 */ final void lock() { if (compareandsetstate(0, 1)) setexclusiveownerthread(thread.currentthread()); else acquire(1); } protected final boolean tryacquire(int acquires) { return nonfairtryacquire(acquires); } } /** * sync 的公平版本 */ static final class fairsync extends sync { private static final long serialversionuid = -3000897897090466540l; /** * 尝试获得锁和获取当前节点的前一节点,并判断这一个节点是否为头节点,即当前线程是不是头节点的直接后继,如果两个中有一个失败则线程中断,进入阻塞状态。
* 也就是一定按照队列中线程的顺序来实现 */ final void lock() { acquire(1); } /** * 跟不公平的版本相比其实是在state为0的时候检查当前线程是不是在队列的头部节点的直接后继,来达到公平的概念 */ protected final boolean tryacquire(int acquires) { final thread current = thread.currentthread(); int c = getstate(); if (c == 0) { if (!hasqueuedpredecessors() && compareandsetstate(0, acquires)) { setexclusiveownerthread(current); return true; } } else if (current == getexclusiveownerthread()) { int nextc = c + acquires; if (nextc < 0) throw new error("maximum lock count exceeded"); setstate(nextc); return true; } return false; } } }
reentrantlock和上面两个类最不同的莫过于reentrantlock使用的是独占功能,即每次只能有一个线程来获取reentrantlock类。reentrantlock类下还有很多方法,这里就不一一介绍,但是本质都是内部类中的实现以及aqs的一些调用
总结:
aqs只是一个基础的框架,里面最核心的就是维护了state变量和chl队列,而其他的类全部都是通过继承的方法进行扩展,虽然没有直接说源码,但是通过上面三个主要类的源码分析再去看aqs已经不是难事。继承主要改变的就是获取和释放的方法,通过这两个方法来对state和队列进行操作达到我们能够进行的并发控制的功能,事实上j.u.c包下的类和能够实现的功能远不止这三个,后面会选择重点的来介绍。