Java中的队列同步器AQS
一、aqs概念
1、队列同步器是用来构建锁或者其他同步组件的基础框架,使用一个int型变量代表同步状态,通过内置的队列来完成线程的排队工作。
2、下面是jdk8文档中对于aqs的部分介绍
public abstract class abstractqueuedsynchronizer extends abstractownablesynchronizer implements serializable 提供一个框架,用于实现依赖先进先出(fifo)等待队列的阻塞锁和相关同步器(信号量,事件等)。 该类被设计为大多数类型的同步器的有用依据,这些同步器依赖于单个原子int值来表示状
态。子类必须定义改变此状态的protected方法,以及根据该对象被获取或释放来定义该状态的含义。给定这些,这个类中的其他方法执行所有排队和阻塞机制。 子类可以保持其他状态字段,但只以
原子方式更新int使用方法操纵值getstate() , setstate(int)和compareandsetstate(int, int)被跟踪相对于同步。 此类支持默认独占模式和共享模式。 当以独占模式获取时,尝试通过其他线程获取不能成功。 多线程获取的共享模式可能(但不需要)成功。 除了在机械意义上,这个类不理解这些差异,当共享
模式获取成功时,下一个等待线程(如果存在)也必须确定它是否也可以获取。 在不同模式下等待的线程共享相同的fifo队列。 通常,实现子类只支持这些模式之一,但是两者都可以在
readwritelock中发挥作用。仅支持独占或仅共享模式的子类不需要定义支持未使用模式的方法。
总结来说就是:
①子类通过继承aqs并实现其抽象方法来管理同步状态,对于同步状态的更改通过提供的getstate()、setstate(int state)、compareandsetstate(int expect, int update)来进行操作,因为使用cas操作保证同步状态的改变是原子的。
②子类被推荐定义为自定义同步组件的静态内部类,同步器本身并没有实现任何的同步接口,仅仅是定义了若干状态获取和释放的方法来提供自定义同步组件的使用。
③同步器既可以支持独占式的获取同步状态,也可以支持共享式的获取同步状态(reentrantlock、reentrantreadwritelock、countdownlatch等不同类型的同步组件)
3、同步器是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义;
二、aqs的接口和实例
1、同步器的设计实现原理
继承同步器并且重写指定的方法,然后将同步器组合在自定义同步组件的实现中,并且调用同步器提供的模板方法(这些模板方法会调用重写的方法);而重写指定的方法的时候,需要使用getstate()、setstate(int state)、compareandsetstate(int expect, int update)来访问或者更新同步状态。下面是源码中state变量和三个方法的定义声明实现
1 /** 2 * .(同步状态) 3 */ 4 private volatile int state; 5 6 /** 7 * (返回当前的同步状态) 8 * 此操作的内存语义为@code volatile read 9 */ 10 protected final int getstate() { 11 return state; 12 } 13 14 /** 15 * (设置新的同步状态) 16 * 此操作的内存语义为@code volatile read 17 */ 18 protected final void setstate(int newstate) { 19 state = newstate; 20 } 21 22 /** 23 * (如果要更新的状态和期望的状态相同,那就通过原子的方式更新状态) 24 * ( 此操作的内存语义为@code volatile read 和 write) 25 * (如果更新的状态和期望的状态不同就返回false) 26 */ 27 protected final boolean compareandsetstate(int expect, int update) { 28 return unsafe.compareandswapint(this, stateoffset, expect, update); 29 }
2、下面介绍aqs提供可被重写的方法
1 /** 2 * 独占式的获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行cas设置同步状态 3 * 4 */ 5 protected boolean tryacquire(int arg) { 6 throw new unsupportedoperationexception(); 7 } 8 9 /** 10 * 独占式的释放同步状态,等待获取同步状态的线程可以有机会获取同步状态 11 * 12 */ 13 protected boolean tryrelease(int arg) { 14 throw new unsupportedoperationexception(); 15 } 16 17 /** 18 * 尝试以共享模式获取。 该方法应该查询对象的状态是否允许在共享模式下获取该对象,如果是这样,就可以获取它。 该方法总是由执行获取的线程调用。 19 * 如果此方法报告失败,则获取方法可能将线程排队(如果尚未排队),直到被其他线程释放为止。 获取失败时返回负值,如果在获取成共享模式下功但没 20 * 有后续共享模式获取可以成功,则为零; 并且如果以共享模式获取成功并且随后的共享模式获取可能成功,则为正值,在这种情况下,后续等待线程必须检查可用性。 21 */ 22 protected int tryacquireshared(int arg) { 23 throw new unsupportedoperationexception(); //如果不支持共享模式 ,会抛出该异常 24 } 25 26 /** 27 * 尝试将状态设置为以共享模式释放同步状态。 该方法总是由执行释放的线程调用。 28 */ 29 protected int tryreleaseshared(int arg) { 30 throw new unsupportedoperationexception(); //如果不支持共享模式 ,会抛出该异常 31 } 32 33 /** 34 * 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占 35 */ 36 protected int isheldexclusively(int arg) { 37 throw new unsupportedoperationexception(); //如果不支持共享模式 ,会抛出该异常 38 }
3、同步器提供的模板方法
在实现自定义同步组件的时候,需要重写上面的方法,而下面的模板方法会调用上面重写的方法。下面介绍同步器提供的模板方法
1 /** 2 * 以独占模式获取,忽略中断。 通过调用至少一次tryacquire(int)实现,成功返回。 否则线 3 * 程排队,可能会重复阻塞和解除阻塞,直到成功才调用tryacquire(int) 4 */ 5 public final void acquire(int arg) {...} 6 7 /** 8 * 以独占方式获得,如果中断,中止。 通过首先检查中断状态,然后调用至少一次 9 * tryacquire(int) ,成功返回。 否则线程排队,可能会重复阻塞和解除阻塞,调用 10 * tryacquire(int)直到成功或线程中断。 11 */ 12 public final void acquireinterruptibly(int arg) throws interruptedexception {...} 13 14 /** 15 * 尝试以独占模式获取,如果中断则中止,如果给定的超时时间失败。 首先检查中断状态,然 16 * 后调用至少一次tryacquire(int) ,成功返回。 否则,线程排队,可能会重复阻塞和解除阻 17 * 塞,调用tryacquire(int)直到成功或线程中断或超时 18 */ 19 public final boolean tryacquirenanos(int arg, long nanostimeout) throws interruptedexception {...} 20 21 /** 22 * 以共享模式获取,忽略中断。 通过首次调用至少一次执行 tryacquireshared(int),成功返 23 * 回。 否则线程排队,可能会重复阻塞和解除阻塞,直到成功调用tryacquireshared(int) 。 24 */ 25 public final void acquireshared(int arg){...} 26 27 /** 28 * 以共享方式获取,如果中断,中止。 首先检查中断状态,然后调用至少一次 29 * tryacquireshared(int) ,成功返回。 否则线程排队,可能会重复阻塞和解除阻塞,调用 30 * tryacquireshared(int)直到成功或线程中断。 31 */ 32 public final void acquiresharedinterruptibly(int arg) throws interruptedexception{...} 33 34 /** 35 * 尝试以共享模式获取,如果中断则中止,如果给定的时间超过,则失败。 通过首先检查中断 36 * 状态,然后调用至少一次tryacquireshared(int) ,成功返回。 否则,线程排队,可能会重 37 * 复阻塞和解除阻塞,调用tryacquireshared(int)直到成功或线程中断或超时。 38 */ 39 public final boolean tryacquiresharednanos(int arg, long nanostimeout) throws interruptedexception{...} 40 41 /** 42 * 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中的第一个节点包含的线程唤醒 43 */ 44 public final boolean release(int arg){...} 45 46 /** 47 * 共享式的释放同步状态 48 */ 49 public final boolean releaseshared(int arg){...} 50 51 /** 52 * 获取在等待队列上的线程集合 53 */ 54 public final collection<thread> getqueuedthreads(){...}
三、队列同步器的实现分析
1、同步队列
a)t同步队列的实现原理
aqs内部维护一个同步队列来完成同步状态的管理,当前线程获取同步状态失败的时候,aqs会将当前线程以及等待状态信息构造成一个结点node并将其加入同步队列中,同时阻塞当前线程,当同步状态由持有线程释放的时候,会将同步队列中的首节点唤醒使其再次尝试获取同步状态。同步队列中的结点用来保存获取同步状态失败的线程的线程引用、等待状态以及前驱结点和后继结点。下面是node的属性分析
1 static final class node { 2 /** 共享模式下构造结点 */ 3 static final node shared = new node(); 4 /** 独占模式下构造结点 */ 5 static final node exclusive = null; 6 7 /** 用于指示线程已经取消的waitstatus值(由于在同步队列中等待的线程等待超时或者发生中断,需要从同步队列中取消等待,结点进入该状态将不会发生变化)*/ 8 static final int cancelled = 1; 9 /** waitstatus值指示后续线程需要取消等待(后继结点的线程处于等待状态,而当前结点的线程如果释放了同步状态或者cancell,将会通知后继结点的线程以运行) */ 10 static final int signal = -1; 11 /**waitstatus值表示线程正在等待条件(原本结点在等待队列中,结点线程等待在condition上,当其他线程对condition调用了signal()方法之后)该结点会从
等待队列中转移到同步队列中,进行同步状态的获取 */ 12 static final int condition = -2; 13 /** 14 * waitstatus值表示下一个共享式同步状态的获取应该无条件传播下去 15 */ 16 static final int propagate = -3; 17 18 /** 19 * 不同的等到状态的int值 20 */ 21 volatile int waitstatus; 22 23 /** 24 * 前驱结点,当结点加入同步队列将会被设置前驱结点信息 25 */ 26 volatile node prev; 27 28 /** 29 * 后继结点 30 */ 31 volatile node next; 32 33 /** 34 * 当前获取到同步状态的线程 35 */ 36 volatile thread thread; 37 38 /** 39 * 等待队列中的后继结点,如果当前结点是共享的,那么这个字段是一个shared常量;也就是说结点类型(独占和共享)和等待队列中的后继结点公用一个字段 40 */ 41 node nextwaiter; 42 43 /** 44 * 如果是共享模式下等待,那么返回true(因为上面的node nextwaiter字段在共享模式下是一个shared常量) 45 */ 46 final boolean isshared() { 47 return nextwaiter == shared; 48 } 49 50 final node predecessor() throws nullpointerexception { 51 node p = prev; 52 if (p == null) 53 throw new nullpointerexception(); 54 else 55 return p; 56 } 57 58 node() { // 用于建立初始头结点或shared标记 59 } 60 61 node(thread thread, node mode) { // 用于添加到等待队列 62 this.nextwaiter = mode; 63 this.thread = thread; 64 } 65 66 node(thread thread, int waitstatus) { // used by condition 67 this.waitstatus = waitstatus; 68 this.thread = thread; 69 } 70 }
b)同步队列示意图和简单分析
①同步队列示意图:当一个线程获取了同步状态后,其他线程不能获取到该同步状态,就会被构造称为node然后添加到同步队列之中,这个添加的过程基于cas保证线程安全性。
②同步队列遵循先进先出(fifo),首节点是获取到同步状态的结点,首节点的线程在释放同步状态的时候将会唤醒后继结点(然后后继结点就会变成新的首节点等待获取同步状态)
2、独占式同步状态的获取和释放
①前面说过,同步器的acquire()方法会获取同步状态,这个方法对不会响应中断,也就是说当线程获取通同步状态失败后会被构造成结点加入到同步队列中,当线程被中断时不会从同步队列中移除。
1 /** 2 * ①首先调用tryacquire方法尝试获取同步状态,如果获取同步状态失败,就进行下面的操作 3 * ②获取失败:按照独占式的模式构造同步结点并通过addwaiter方法将结点添加到同步队列的尾部 4 * ③通过acquirequeue方法自旋获取同步状态。 5 * ④如果获取不到同步状态,就阻塞结点中的线程,而结点中的线程唤醒主要是通过前驱结点的出队或者被中断来实现 6 */ 7 public final void acquire(int arg) { 8 if (!tryacquire(arg) && acquirequeued(addwaiter(node.exclusive), arg)) 9 selfinterrupt(); 10 }
②下面是addwaiter、enq和自旋获取同步状态acquirequeue方法的实现(该方法的主要作用就是将获取同步状态失败的线程构造成结点然后添加到同步队列的队尾)
1 private node addwaiter(node mode) { 2 node node = new node(thread.currentthread(), mode); 3 //尝试直接放在队尾 4 node pred = tail; //直接获取同步器的tail结点 5 if (pred != null) { 6 node.prev = pred; 7 if (compareandsettail(pred, node)) { 8 //队尾结点不为空通过原子操作将构造的结点置为队尾结点 9 pred.next = node; 10 return node; 11 } 12 } 13 //采用自旋方式保证构造的结点添加到同步队列中 14 enq(node); 15 return node; 16 } 17 private node enq(final node node) { 18 for (;;) { //死循环知道添加成功 19 node t = tail; 20 if (t == null) { // must initialize 21 if (compareandsethead(new node())) 22 tail = head; 23 } else { 24 node.prev = t; 25 //通过cas方式将结点添加到同步队列之后才会返回,否则就会不断尝试添加(这样实际上就是在并发情况下,把向同步队列添加node变得串行化了) 26 if (compareandsettail(t, node)) { 27 t.next = node; 28 return t; 29 } 30 } 31 } 32 } 33 /** 34 * 通过tryacquire()和addwaiter(),表示该线程获取同步状态已经失败,被放入同步 35 * 队列尾部了。线程阻塞等待直到其他线程(前驱结点获得同步装填或者被中断)释放同步状 36 * 态后唤醒自己,自己才能获得。 37 */ 38 final boolean acquirequeued(final node node, int arg) { 39 boolean failed = true; 40 try { 41 boolean interrupted = false; 42 //线程在死循环的方式中尝试获取同步状态 43 for (;;) { 44 final node p = node.predecessor(); //获取前驱结点 45 //只有前驱接待是头结点的时候才能尝试获取同步状态 46 if (p == head && tryacquire(arg)) { 47 sethead(node); //获取到同步状态之后,就将自己设置为头结点 48 p.next = null; //前驱结点已经获得同步状态去执行自己的程序了,所以需要释放掉占用的同步队列的资源,由jvm回收 49 failed = false; 50 return interrupted; 51 } 52 //如果获取同步状态失败,应该自旋等待继续获取并且校验自己的中断标志位信息 53 if (shouldparkafterfailedacquire(p, node) && 54 parkandcheckinterrupt()) 55 interrupted = true; //如果被中断,就改变自己的中断标志位状态信息 56 } 57 } finally { 58 if (failed) 59 cancelacquire(node); 60 } 61 }
③独占式获取同步状态的整个流程
④独占式同步器的释放:release方法执行时,会唤醒头结点的后继结点线程
public final boolean release(int arg) { if (tryrelease(arg)) { node h = head;//头结点 //唤醒头结点的后继结点线程 if (h != null && h.waitstatus != 0) unparksuccessor(h); return true; } return false; }
3、共享式同步状态的获取和释放
①共享式获取和独占式获取最主要的区别是能否有多个线程同时获取到同步状态。如图所示简易描述二者的区别(共享式访问的时候,可以允许多个线程访问资源,但是存在独占式访问的时候,同一时刻其他的不管是共享还是独占都会被阻塞)
②关于共享式获取同步状态的方法
1 /** 2 * 此方法是共享模式下线程获取共享同步状态的顶层入口。它会尝试去获取同步状态,获取成功则直接返回, 3 * 获取失败则进入等待队列一直尝试获取(执行doacquireshared方法体中的内容),直到获取到资源为止(条件就是tryacquireshared方法返回值大于等于0),整个过程忽略中断 4 */ 5 public final void acquireshared(int arg) { 6 if (tryacquireshared(arg) < 0) 7 doacquireshared(arg); 8 } 9 /** 10 * "自旋"尝试获取同步状态 11 */ 12 private void doacquireshared(int arg) { 13 //首先将该线程包括线程引用、等待状态、前驱结点和后继结点的信息封装台node中,然后添加到等待队列里面(一共享模式添加) 14 final node node = addwaiter(node.shared); 15 boolean failed = true; 16 try { 17 boolean interrupted = false; //当前线程的中断标志 18 for (;;) { 19 final node p = node.predecessor(); //获取前驱结点 20 if (p == head) { 21 //当前驱结点是头结点的时候就会以共享的方式去尝试获取同步状态 22 int r = tryacquireshared(arg); 23 //判断tryacquireshared的返回值 24 if (r >= 0) { 25 //如果返回值大于等于0,表示获取同步状态成功,就修改当前的头结点并将信息传播都后续的结点队列中 26 setheadandpropagate(node, r); 27 p.next = null; // 释放掉已经获取到同步状态的前驱结点的资源 28 if (interrupted) 29 selfinterrupt(); //检查中断标志 30 failed = false; 31 return; 32 } 33 } 34 if (shouldparkafterfailedacquire(p, node) && 35 parkandcheckinterrupt()) 36 interrupted = true; 37 } 38 } finally { 39 if (failed) 40 cancelacquire(node); 41 } 42 }
根据源代码我们可以了解共享式获取同步状态的整个过程
首先同步器会调用tryacquireshared方法来尝试获取同步状态,然后根据这个返回值来判断是否获取到同步状态(当返回值大于等于0可视为获取到同步状态);如果第一次获取失败的话,就进入'自旋'状态(执行doacquireshared方法)一直尝试去获取同步状态;在自旋获取中,如果检查到当前前驱结点是头结点的话,就会尝试获取同步状态,而一旦获取成功(tryacquireshared方法返回值大于等于0)就可以从自旋状态退出。
另外,还有一点就是上面说到的一个处于等待队列的线程要想开始尝试去获取同步状态,需要满足的条件就是前驱结点是头结点,那么它本身就是整个队列中的第二个结点。当头结点释放掉所有的临界资源之后,我们考虑每个线程运行所需资源的不同数量问题,如下图所示
③共享式同步状态的释放
对于支持共享式的同步组件(即多个线程同同时访问),它们和独占式的主要区别就是tryreleaseshared方法必须确保同步状态的释放是线程安全的(cas的模式来释放同步状态,因为既然是多个线程能够访问,那么释放的时候也会是多个线程的,就需要保证释放时候的线程安全)
1 /** 2 * 该方法是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。 3 */ 4 public final boolean releaseshared(int arg) { 5 if (tryreleaseshared(arg)) { 6 doreleaseshared(); // 7 return true; 8 } 9 return false; 10 }
四、自定义同步组件的实现
1、共享式锁的实现
①、自定义一个同步组件,可以允许两个线程访问(共享式同步组件),超过两个线程就会被阻塞。
②、既然是共享式同步组件,按照前面所说的,组件本身需要使用aqs提供的共享式模板方法acquireshared等;组件的内部类需要实现aqs,并且重写关于共享式获取同步状态的方法(tryacquireshared()、tryreleaseshared()等共享模式下的方法)。
③、既然是两个线程能够同时访问的话,那么状态数的取值范围就是0、1、2了,每当一个线程获取到同步状态的时候state值减1,反之就会增加1;当state值为0的时候就会阻塞其他想要获取同步状态的线程。对于同步状态的更改需要使用cas来进行保证原子性。
1 package cn.source.concurrent; 2 3 import java.util.concurrent.timeunit; 4 import java.util.concurrent.locks.abstractqueuedsynchronizer; 5 import java.util.concurrent.locks.condition; 6 import java.util.concurrent.locks.lock; 7 8 public class testaqs implements lock{ 9 10 private sync sync = new sync(2); 11 12 private static class sync extends abstractqueuedsynchronizer { 13 14 sync(int num) { 15 if(num <= 0) { 16 throw new runtimeexception("num需要大于0"); 17 } 18 setstate(num); 19 } 20 21 @override 22 protected int tryacquireshared(int arg) { 23 for(; ;) { 24 int currentstate = getstate(); 25 int newstate = currentstate - arg; 26 if(newstate < 0 || compareandsetstate(currentstate, newstate)) { 27 return newstate; 28 } 29 } 30 } 31 32 @override 33 protected boolean tryreleaseshared(int arg) { 34 for(; ;) { 35 int currentstate = getstate(); 36 int newstate = currentstate + arg; 37 if(compareandsetstate(currentstate, newstate)) { 38 return true; 39 } 40 } 41 } 42 43 44 } 45 @override 46 public void lock() { 47 sync.acquireshared(1); 48 } 49 50 @override 51 public void unlock() { 52 sync.releaseshared(1); 53 } 54 55 //...... 56 }
1 /** 2 * 测试结果:输出的线程名称是成对的,保证同一时刻只有两个线程能够获取到锁 3 * 4 */ 5 public class testlockshare { 6 @test 7 public void test() { 8 lock lock = new testaqs(); 9 class worker extends thread { 10 11 @override 12 public void run() { 13 while(true) { 14 lock.lock(); 15 try { 16 thread.sleep(1000); 17 system.out.println(thread.currentthread().getname()); 18 thread.sleep(1000); 19 } catch (exception e) { 20 e.printstacktrace(); 21 } finally { 22 lock.unlock(); 23 } 24 } 25 } 26 27 } 28 29 for (int i = 0; i < 8; i++) { 30 worker worker = new worker(); 31 worker.setdaemon(true); 32 worker.start(); 33 34 } 35 for (int i = 0; i < 8; i++) { 36 try { 37 thread.sleep(1000); 38 } catch (interruptedexception e) { 39 // todo auto-generated catch block 40 e.printstacktrace(); 41 } 42 system.out.println(); 43 } 44 } 45 }
2、独占式锁的实现
1 package cn.source.concurrent; 2 3 import java.util.concurrent.timeunit; 4 import java.util.concurrent.locks.abstractqueuedsynchronizer; 5 import java.util.concurrent.locks.condition; 6 import java.util.concurrent.locks.lock; 7 8 public class mutex implements lock{ 9 10 private sync sync = new sync(); 11 12 private static class sync extends abstractqueuedsynchronizer { 13 14 /** 15 * 尝试获取资源,立即返回。成功则返回true,否则false。 16 */ 17 @override 18 protected boolean tryacquire(int arg) { 19 if(compareandsetstate(0, 1)) {//state为0才设置为1,不可重入! 20 setexclusiveownerthread(thread.currentthread());//设置为当前线程独占资源 21 return true; 22 } 23 return false; 24 } 25 26 /** 27 * 尝试释放资源,立即返回。成功则为true,否则false。 28 */ 29 @override 30 protected boolean tryrelease(int arg) { 31 if(getstate() == 0) { //既然来释放,那肯定就是已占有状态了。只是为了保险,多层判断! 32 throw new illegalmonitorstateexception(); 33 } 34 setexclusiveownerthread(null); 35 setstate(0); 36 return true; 37 } 38 39 @override 40 protected boolean isheldexclusively() { 41 // 判断是否锁定状态 42 return getstate() == 1; 43 } 44 45 } 46 47 @override 48 public void lock() { 49 sync.acquire(1); 50 } 51 52 @override 53 public boolean trylock(long time, timeunit unit) throws interruptedexception { 54 return sync.tryacquire(1); 55 } 56 57 @override 58 public void unlock() { 59 sync.release(1); 60 } 61 62 }
1 public class testmutex { 2 @test 3 public void test() { 4 lock lock = new mutex(); 5 class worker extends thread { 6 7 @override 8 public void run() { 9 while(true) { 10 lock.lock(); 11 try { 12 thread.sleep(1000); 13 system.out.println(thread.currentthread().getname()); 14 thread.sleep(1000); 15 } catch (exception e) { 16 e.printstacktrace(); 17 } finally { 18 lock.unlock(); 19 } 20 } 21 } 22 23 } 24 25 for (int i = 0; i < 8; i++) { 26 worker worker = new worker(); 27 worker.setdaemon(true); 28 worker.start(); 29 30 } 31 for (int i = 0; i < 8; i++) { 32 try { 33 thread.sleep(1000); 34 } catch (interruptedexception e) { 35 e.printstacktrace(); 36 } 37 system.out.println(); 38 } 39 } 40 }