Java同步框架AbstractQueuedSynchronizer详解
abstractqueuedsynchronizer概述
abstractqueuedsynchronizer是java中非常重要的一个框架类,它实现了最核心的多线程同步的语义,我们只要继承abstractqueuedsynchronizer就可以非常方便的实现我们自己的线程同步器,java中的锁lock就是基于abstractqueuedsynchronizer来实现的。下面首先展示了abstractqueuedsynchronizer类提供的一些方法:
abstractqueuedsynchronizer类方法
在类结构上,abstractqueuedsynchronizer继承了abstractownablesynchronizer,abstractownablesynchronizer仅有的两个方法是提供当前独占模式的线程设置:
/** * the current owner of exclusive mode synchronization. */ private transient thread exclusiveownerthread; /** * sets the thread that currently owns exclusive access. * a {@code null} argument indicates that no thread owns access. * this method does not otherwise impose any synchronization or * {@code volatile} field accesses. * @param thread the owner thread */ protected final void setexclusiveownerthread(thread thread) { exclusiveownerthread = thread; } /** * returns the thread last set by {@code setexclusiveownerthread}, * or {@code null} if never set. this method does not otherwise * impose any synchronization or {@code volatile} field accesses. * @return the owner thread */ protected final thread getexclusiveownerthread() { return exclusiveownerthread; }
exclusiveownerthread代表的是当前获得同步的线程,因为是独占模式,在exclusiveownerthread持有同步的过程中其他的线程的任何同步获取请求将不能得到满足。
至此,需要说明的是,abstractqueuedsynchronizer不仅支持独占模式下的同步实现,还支持共享模式下的同步实现。在java的锁的实现上就有共享锁和独占锁的区别,而这些实现都是基于abstractqueuedsynchronizer对于共享同步和独占同步的支持。从上面展示的abstractqueuedsynchronizer提供的方法中,我们可以发现abstractqueuedsynchronizer的api大概分为三类:
- 类似acquire(int)的一类是最基本的一类,不可中断
- 类似acquireinterruptibly(int)的一类可以被中断
- 类似tryacquirenanos(int, long)的一类不仅可以被中断,而且可以设置阻塞时间
上面的三种类型的api分为独占和共享两套,我们可以根据我们的需求来使用合适的api来做多线程同步。
下面是一个继承abstractqueuedsynchronizer来实现自己的同步器的一个示例:
class mutex implements lock, java.io.serializable { // our internal helper class private static class sync extends abstractqueuedsynchronizer { // reports whether in locked state protected boolean isheldexclusively() { return getstate() == 1; } // acquires the lock if state is zero public boolean tryacquire(int acquires) { assert acquires == 1; // otherwise unused if (compareandsetstate(0, 1)) { setexclusiveownerthread(thread.currentthread()); return true; } return false; } // releases the lock by setting state to zero protected boolean tryrelease(int releases) { assert releases == 1; // otherwise unused if (getstate() == 0) throw new illegalmonitorstateexception(); setexclusiveownerthread(null); setstate(0); return true; } // provides a condition condition newcondition() { return new conditionobject(); } // deserializes properly private void readobject(objectinputstream s) throws ioexception, classnotfoundexception { s.defaultreadobject(); setstate(0); // reset to unlocked state } } // the sync object does all the hard work. we just forward to it. private final sync sync = new sync(); public void lock() { sync.acquire(1); } public boolean trylock() { return sync.tryacquire(1); } public void unlock() { sync.release(1); } public condition newcondition() { return sync.newcondition(); } public boolean islocked() { return sync.isheldexclusively(); } public boolean hasqueuedthreads() { return sync.hasqueuedthreads(); } public void lockinterruptibly() throws interruptedexception { sync.acquireinterruptibly(1); } public boolean trylock(long timeout, timeunit unit) throws interruptedexception { return sync.tryacquirenanos(1, unit.tonanos(timeout)); } }}
mutex实现的功能是:使用0来代表可以获得同步变量,使用1来代表需要等待同步变量被释放再获取,这是一个简单的独占锁实现,任何时刻只会有一个线程获得锁,其他请求获取锁的线程都会阻塞等待直到锁被释放,等待的线程将再次竞争来获得锁。mutex给了我们很好的范例,我们要实现自己的线程同步器,那么就继承abstractqueuedsynchronizer实现其三个抽象方法,然后使用该实现类来做lock和unlock的操作,可以发现,abstractqueuedsynchronizer框架为我们铺平了道路,我们只需要做一点点改变就可以实现高效安全的线程同步去,下文中将分析abstractqueuedsynchronizer是如何为我么提供如此强大得同步能力的。
abstractqueuedsynchronizer实现细节
独占模式
abstractqueuedsynchronizer使用一个volatile类型的int来作为同步变量,任何想要获得锁的线程都需要来竞争该变量,获得锁的线程可以继续业务流程的执行,而没有获得锁的线程会被放到一个fifo的队列中去,等待再次竞争同步变量来获得锁。abstractqueuedsynchronizer为每个没有获得锁的线程封装成一个node再放到队列中去,下面先来分析一下node这个数据结构:
/** waitstatus value to indicate thread has cancelled */ static final int cancelled = 1; /** waitstatus value to indicate successor's thread needs unparking */ static final int signal = -1; /** waitstatus value to indicate thread is waiting on condition */ static final int condition = -2; /** * waitstatus value to indicate the next acquireshared should * unconditionally propagate */ static final int propagate = -3;
上面展示的是node定义的四个状态,需要注意的是只有一个状态是大于0的,也就是cancelled,也就是被取消了,不需要为此线程协调同步变量的竞争了。其他几个的意义见注释。上一小节说到,abstractqueuedsynchronizer提供独占式和共享式两种模式,abstractqueuedsynchronizer使用下面的两个变量来标志是共享还是独占模式:
/** marker to indicate a node is waiting in shared mode */ static final node shared = new node(); /** marker to indicate a node is waiting in exclusive mode */ static final node exclusive = null;
有趣的是,node使用了一个变量nextwaiter来代表两种含义,当在独占模式下,nextwaiter表示下一个等在conditionobject上的node,在共享模式下就是shared,因为对于任何一个同步器来说,都不可能同时实现共享和独占两种模式的,更为专业的解释为:
/** * link to next node waiting on condition, or the special * value shared. because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. they are then transferred to the queue to * re-acquire. and because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ node nextwaiter;
abstractqueuedsynchronizer使用双向链表来管理请求同步的node,保存了链表的head和tail,新的node将会被插到链表的尾端,而链表的head总是代表着获得锁的线程,链表头的线程释放了锁之后会通知后面的线程来竞争共享变量。下面分析一下abstractqueuedsynchronizer是如何实现独占模式下的acquire和release的。
首先,使用方法acquire(int)可以竞争同步变量,下面是调用链路:
public final void acquire(int arg) { if (!tryacquire(arg) && acquirequeued(addwaiter(node.exclusive), arg)) selfinterrupt(); } private node addwaiter(node mode) { node node = new node(thread.currentthread(), mode); // try the fast path of enq; backup to full enq on failure node pred = tail; if (pred != null) { node.prev = pred; if (compareandsettail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } final boolean acquirequeued(final node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final node p = node.predecessor(); if (p == head && tryacquire(arg)) { sethead(node); p.next = null; // help gc failed = false; return interrupted; } if (shouldparkafterfailedacquire(p, node) && parkandcheckinterrupt()) interrupted = true; } } finally { if (failed) cancelacquire(node); } }
首先会调用方法tryacquire来尝试获的锁,而tryacquire这个方法是需要子类来实现的,子类的实现无非就是通过compareandsetstate、getstate、setstate三个方法来操作同步变量state,子类的方法实现需要根据各自的需求场景来实现。继续分析上面的acquire流程,如果tryacquire返回true了,也就是成功改变了state的值了,也就是获得了同步锁了,那么就可以退出了。如果返回false,说明有其他的线程获得锁了,这个时候abstractqueuedsynchronizer会使用addwaiter将当前线程添加到等待队列的尾部等待再次竞争。需要注意的是将当前线程标记为了独占模式。然后重头戏来了,方法acquirequeued使得新添加的node在一个for死循环中不断的轮询,也就是自旋,acquirequeued方法退出的条件是:
- 该节点的前驱节点是头结点,头结点代表的是获得锁的节点,只有它释放了state其他线程才能获得这个变量的所有权
- 在条件1的前提下,方法tryacquire返回true,也就是可以获得同步资源state
满足上面两个条件之后,这个node就会获得锁,根据abstractqueuedsynchronizer的规定,获得锁的node必须是链表的头结点,所以,需要将当前节点设定为头结点。那如果不符合上面两个条件的node会怎么样呢?看for循环里面的第二个分支,首先是shouldparkafterfailedacquire方法,看名字应该是说判断是否应该park当前该线程,然后是方法parkandcheckinterrupt,这个方法是在shouldparkafterfailedacquire返回true的前提之下才会之下,意思就是首先判断一下是否需要park该node,如果需要,那么就park它。关于线程的park和unpark,abstractqueuedsynchronizer使用了偏向底层的技术来实现,在此先不做分析。现在来分析一下再什么情况下node会被park(block):
private static boolean shouldparkafterfailedacquire(node pred, node node) { int ws = pred.waitstatus; if (ws == node.signal) /* * this node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * predecessor was cancelled. skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitstatus > 0); pred.next = node; } else { /* * waitstatus must be 0 or propagate. indicate that we * need a signal, but don't park yet. caller will need to * retry to make sure it cannot acquire before parking. */ compareandsetwaitstatus(pred, ws, node.signal); } return false; }
可以发现,只有当node的前驱节点的状态为node.signal的时候才会返回true,也就是说,只有当前驱节点的状态变为了node.signal,才会去通知当前节点,所以如果前驱节点是node.signal的,那么当前节点就可以放心的park就好了,前驱节点在完成工作之后在释放资源的时候会unpark它的后继节点。下面看一下release的过程:
public final boolean release(int arg) { if (tryrelease(arg)) { node h = head; if (h != null && h.waitstatus != 0) unparksuccessor(h); return true; } return false; } private void unparksuccessor(node node) { /* * if status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. it is ok if this * fails or if status is changed by waiting thread. */ int ws = node.waitstatus; if (ws < 0) compareandsetwaitstatus(node, ws, 0); /* * thread to unpark is held in successor, which is normally * just the next node. but if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ node s = node.next; if (s == null || s.waitstatus > 0) { s = null; for (node t = tail; t != null && t != node; t = t.prev) if (t.waitstatus <= 0) s = t; } if (s != null) locksupport.unpark(s.thread); }
首先通过tryrelease方法来保证资源安全完整的释放了之后,如果发现节点的状态小于0,会变为0。0代表的是初始化的状态,当前的线程已经完成了工作,释放了锁,就要恢复原来的样子。然后会获取该节点的后继节点,如果没有后续节点了,或者后继节点已经被取消了,那么从尾部开始向前找第一个符合要求的节点,然后unpark它。
上面介绍了一对acquire-release,如果希望线程可以在竞争的时候被中断,可以使用acquireinterruptibly。如果希望加上获取锁的时间限制,可以使用tryacquirenanos(int, long)方法来获取。
共享模式
和独占模式一样,分析一下acquireshared的过程:
public final void acquireshared(int arg) { if (tryacquireshared(arg) < 0) doacquireshared(arg); } private void doacquireshared(int arg) { final node node = addwaiter(node.shared); boolean failed = true; try { boolean interrupted = false; for (;;) { final node p = node.predecessor(); if (p == head) { int r = tryacquireshared(arg); if (r >= 0) { setheadandpropagate(node, r); p.next = null; // help gc if (interrupted) selfinterrupt(); failed = false; return; } } if (shouldparkafterfailedacquire(p, node) && parkandcheckinterrupt()) interrupted = true; } } finally { if (failed) cancelacquire(node); } }
获取锁的流程如下:
- 尝试使用tryacquireshared方法,如果返回值大于等于0则表示成功,否则运行doacquireshared方法
- 将当前竞争同步的线程添加到链表尾部,然后自旋
- 获取前驱节点,如果前驱节点是头节点,也就是说前驱节点现在持有锁,那么继续运行4,否则park该节点等待被unpark
- 使用tryacquireshared方法来竞争,如果返回值大于等于0,那么就算是获取成功了,否则继续自旋尝试
共享模式下的release流程:
public final boolean releaseshared(int arg) { if (tryreleaseshared(arg)) { doreleaseshared(); return true; } return false; } private void doreleaseshared() { for (;;) { node h = head; if (h != null && h != tail) { int ws = h.waitstatus; if (ws == node.signal) { if (!compareandsetwaitstatus(h, node.signal, 0)) continue; // loop to recheck cases unparksuccessor(h); } else if (ws == 0 && !compareandsetwaitstatus(h, 0, node.propagate)) continue; // loop on failed cas } if (h == head) // loop if head changed break; } } private void unparksuccessor(node node) { int ws = node.waitstatus; if (ws < 0) compareandsetwaitstatus(node, ws, 0); node s = node.next; if (s == null || s.waitstatus > 0) { s = null; for (node t = tail; t != null && t != node; t = t.prev) if (t.waitstatus <= 0) s = t; } if (s != null) locksupport.unpark(s.thread); }
首先尝试使用tryreleaseshared方法来释放资源,如果释放失败,则返回false,如果释放成功了,那么继续执行doreleaseshared方法唤醒后续节点来竞争资源。需要注意的是,共享模式和独占模式的区别在于,独占模式只允许一个线程获得资源,而共享模式允许多个线程获得资源。所以在独占模式下只有当tryacquire返回true的时候我们才能确定获得资源了,而在共享模式下,只要tryacquireshared返回值大于等于0就可以说明获得资源了,所以你要确保你需要实现的需求和abstractqueuedsynchronizer希望的是一致的。
桶独占模式一样,共享模式也有其他的两种api:
- acquiresharedinterruptibly:支持相应中断的资源竞争
- tryacquiresharednanos:可以设定时间的资源竞争
本文大概描述了abstractqueuedsynchronizer框架的一些基本情况,具体的细节没有深究,但是abstractqueuedsynchronizer作为java中锁实现的底层支撑,需要好好研究一下,后续会基于abstractqueuedsynchronizer来分析java中各种锁的实现。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。