六、显式锁和AQS
显式锁和aqs
一、显式锁
synchronized 关键字结合对象的监视器,jvm 为我们提供了一种『内置锁』的语义,这种锁很简便,不需要我们关心加锁和释放锁的过程,我们只需要告诉虚拟机哪些代码块需要加锁即可,其他的细节会由编译器和虚拟机自己实现。
可以将我们的『内置锁』理解为是 jvm 的一种内置特性, 所以一个很显著的问题就是,它不支持某些高级功能的定制,比如说,我想要这个锁支持公平竞争,我想要根据不同的条件将线程阻塞在不同的队列上,我想要支持定时竞争锁,超时返回,我还想让被阻塞的线程能够响应中断请求等等。
这些特殊的需求是『内置锁』满足不了的,所以在 jdk 层面又引入了『显式锁』的概念,不再由 jvm 来负责加锁和释放锁,这两个动作释放给我们程序来做,程序层面难免复杂了些,但锁灵活性提高了,可以支持更多定制功能,但要求你对锁具有更深层次的理解。
【1】lock 显式锁
lock 接口位于 java.util.concurrent.locks 包下,基本定义如下:
public interface lock { //获取锁,失败则阻塞 void lock(); //响应中断式获取锁 void lockinterruptibly() //尝试一次获取锁,成功返回true,失败返回false,不会阻塞 boolean trylock(); //定时尝试 boolean trylock(long time, timeunit unit) //释放锁 void unlock(); //创建一个条件队列 condition newcondition(); }
显式锁的实现类主要有三个,reentrantlock 是其最主要的实现类,readlock 和 writelock 是 reentrantreadwritelock 内部定义的两个内部类,他们继承自 lock 并实现了其定义的所有方法,精细化读写分离。而 reentrantreadwritelock 向外提供读锁写锁。
【2】reentrantlock(可重入锁)
reentrantlock 作为 lock 显式锁的最基本实现,也是使用最频繁的一个锁实现类。可重入就是可以重新获取这个锁,例如递归操作。
它提供了两个构造函数,用于支持公平竞争锁。
public reentrantlock() //参数fair为是否支持公平竞争锁 public reentrantlock(boolean fair)
公平锁和非公平锁的区别之处在于,公平锁在选择下一个占有锁的线程时,参考先到先得原则,等待时间越长的线程将具有更高的优先级。而非公平锁则无视这种原则。
那么假设这么一种情况,a 获得锁正在运行,b 尝试获得锁失败被阻塞,此时 c 也尝试获得锁,失败而阻塞,虽然 c 只需要很短运行时间,它依然需要等待 b 执行结束才有机会获得锁来运行。
非公平锁的前提下,a 执行结束,找到队列首部的 b 线程,开始上下文切换,假如此时的 c 过来竞争锁,非公平策略前提下,c 是可以获得锁的,并假设它迅速的执行结束了,当 b 线程被切换回来之后再去获取锁也不会有什么问题,结果是,c 线程在 b 线程的上下文切换过程中执行结束。显然,非公平策略下 cpu 的吞吐量是提高的。
但是,非公平策略的锁可能会造成某些线程饥饿,始终得不到运行,各有利弊,适时取舍。
【3】readwritelock(读写锁)
① readwritelock同lock一样也是一个接口,提供了readlock和writelock两种锁的操作机制,一个是只读的锁,一个是写锁。
读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的(排他的)。 每次只能有一个写线程,但是可以有多个线程并发地读数据。
所有读写锁的实现必须确保写操作对读操作的内存影响。换句话说,一个获得了读锁的线程必须能看到前一个释放的写锁所更新的内容。
理论上,读写锁比互斥锁允许对于共享数据更大程度的并发。与互斥锁相比,读写锁是否能够提高性能取决于读写数据的频率、读取和写入操作的持续时间、以及读线程和写线程之间的竞争。
② 互斥原则:
- 读-读能共存,
- 读-写不能共存,
- 写-写不能共存。
③reentrantreadwritelock
reentrantreadwritelock为readwritelock的实现类
这个锁允许读线程和写线程以reentrantlock的语法重新获取读写锁。在写入线程保持的所有写入锁被释放之前,不允许不可重入的读线程。
另外,写锁(写线程)可以获取读锁,但是不允许读锁(读线程)获取写锁。在其他应用程序中,当对在读锁下执行读取的方法或回调期间保持写锁时,可重入性可能非常有用。
实例代码如下:
public class testreadwritelock { public static void main(string[] args){ final readwritelockdemo rwd = new readwritelockdemo(); //启动100个读线程 for (int i = 0; i < 100; i++) { new thread(new runnable() { @override public void run() { rwd.get(); } }).start(); } //写线程 new thread(new runnable() { @override public void run() { rwd.set((int)(math.random()*101)); } },"write").start(); } } class readwritelockdemo{ //模拟共享资源--number private int number = 0; // 实际实现类--reentrantreadwritelock,默认非公平模式 private readwritelock readwritelock = new reentrantreadwritelock(); //读 public void get(){ //使用读锁 readwritelock.readlock().lock(); try { system.out.println(thread.currentthread().getname()+" : "+number); }finally { readwritelock.readlock().unlock(); } } //写 public void set(int number){ readwritelock.writelock().lock(); try { this.number = number; system.out.println(thread.currentthread().getname()+" : "+number); }finally { readwritelock.writelock().unlock(); } } } /** thread-50 : 0 thread-19 : 0 thread-54 : 0 thread-57 : 0 thread-31 : 0 write : 40 thread-61 : 40 thread-62 : 40 thread-35 : 40 thread-32 : 40 */
首先启动读线程,此时number为0;然后某个时刻写线程修改了共享资源number数据,读线程再次读取最新值
二、aqs深入分析
【1】什么是aqs
aqs是abustactqueuedsynchronizer的简称,它是一个java提供的底层同步工具类,用一个int类型的变量表示同步状态(state),并提供了一系列的cas操作来管理这个同步状态。aqs的主要作用是为java中的并发同步组件提供统一的底层支持,例如reentrantlock,countdowlatch就是基于aqs实现的,用法是通过继承aqs实现其模版方法,然后将子类作为同步组件的内部类。
【2】同步队列
同步队列是aqs很重要的组成部分,它是一个双端队列,遵循fifo原则,主要作用是用来存放在锁上阻塞的线程,当一个线程尝试获取锁时,如果已经被占用,那么当前线程就会被构造成一个node节点加入到同步队列的尾部,队列的头节点是成功获取锁的节点,当头节点线程释放锁时,会唤醒后面的节点并释放当前头节点的引用。
【3】state状态
abstractqueuedsynchronizer维护了一个volatile int类型的变量,用户表示当前同步状态。volatile虽然不能保证操作的原子性,但是保证了当前变量state的可见性。至于volatile的具体语义,可以参考我的相关文章。state的访问方式有三种:
- getstate()
- setstate()
- compareandsetstate()
这三种叫做均是原子操作,其中compareandsetstate的实现依赖于unsafe的compareandswapint()方法。代码实现如下:
/** * the synchronization state. */ private volatile int state; /** * returns the current value of synchronization state. * this operation has memory semantics of a {@code volatile} read. * @return current state value */ protected final int getstate() { return state; } /** * sets the value of synchronization state. * this operation has memory semantics of a {@code volatile} write. * @param newstate the new state value */ protected final void setstate(int newstate) { state = newstate; } /** * atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * this operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. false return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareandsetstate(int expect, int update) { // see below for intrinsics setup to support this return unsafe.compareandswapint(this, stateoffset, expect, update); }
【4】资源的共享方式
aqs定义两种资源共享方式:exclusive(独占,只有一个线程能执行,如reentrantlock)和share(共享,多个线程可同时执行,如semaphore/countdownlatch)。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),aqs已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
- isheldexclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryacquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryrelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryacquireshared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryreleaseshared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
【5】获取锁和释放锁的流程
下面基于独占锁讲解在aqs获取锁和释放锁的流程:
获取:
- 调用自定义同步器的tryacquire()尝试直接去获取资源,如果成功则直接返回;
- 没成功,则addwaiter()将该线程加入等待队列的尾部,并标记为独占模式;
- acquirequeued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfinterrupt(),将中断补上。
源码解析:
acquire是一种以独占方式获取资源,如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。该方法是独占模式下线程获取共享资源的顶层入口。获取到资源后,线程就可以去执行其临界区代码了。下面是acquire()的源码:
public final void acquire(int arg) { if (!tryacquire(arg) && acquirequeued(addwaiter(node.exclusive), arg)) selfinterrupt(); }
tryacquire尝试以独占的方式获取资源,如果获取成功,则直接返回true,否则直接返回false。该方法可以用于实现lock中的trylock()方法。该方法的默认实现是抛出unsupportedoperationexception
,具体实现由自定义的扩展了aqs的同步类来实现。aqs在这里只负责定义了一个公共的方法框架。这里之所以没有定义成abstract,是因为独占模式下只用实现tryacquire-tryrelease,而共享模式下只用实现tryacquireshared-tryreleaseshared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。
protected boolean tryacquire(int arg) { throw new unsupportedoperationexception(); }
addwaiter该方法用于将当前线程根据不同的模式(node.exclusive
互斥模式、node.shared
共享模式)加入到等待队列的队尾,并返回当前线程所在的结点。如果队列不为空,则以通过compareandsettail
方法以cas的方式将当前线程节点加入到等待队列的末尾。否则,通过enq(node)方法初始化一个等待队列,并返回当前节点。源码如下:
private node addwaiter(node mode) { node node = new node(thread.currentthread(), mode); // try the fast path of enq; backup to full enq on failure node pred = tail; if (pred != null) { node.prev = pred; if (compareandsettail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
acquirequeued用于队列中的线程自旋地以独占且不可中断的方式获取同步状态(acquire),直到拿到锁之后再返回。该方法的实现分成两部分:如果当前节点已经成为头结点,尝试获取锁(tryacquire)成功,然后返回;否则检查当前节点是否应该被park,然后将该线程park并且检查当前线程是否被可以被中断。
final boolean acquirequeued(final node node, int arg) { //标记是否成功拿到资源,默认false boolean failed = true; try { boolean interrupted = false;//标记等待过程中是否被中断过 for (;;) { final node p = node.predecessor(); if (p == head && tryacquire(arg)) { sethead(node); p.next = null; // help gc failed = false; return interrupted; } if (shouldparkafterfailedacquire(p, node) && parkandcheckinterrupt()) interrupted = true; } } finally { if (failed) cancelacquire(node); } }
释放:
release方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()。下面是release()的源码:
public final boolean release(int arg) { if (tryrelease(arg)) { node h = head; if (h != null && h.waitstatus != 0) unparksuccessor(h); return true; } return false; } /** * attempts to set the state to reflect a release in exclusive * mode. * * <p>this method is always invoked by the thread performing release. * * <p>the default implementation throws * {@link unsupportedoperationexception}. * * @param arg the release argument. this value is always the one * passed to a release method, or the current state value upon * entry to a condition wait. the value is otherwise * uninterpreted and can represent anything you like. * @return {@code true} if this object is now in a fully released * state, so that any waiting threads may attempt to acquire; * and {@code false} otherwise. * @throws illegalmonitorstateexception if releasing would place this * synchronizer in an illegal state. this exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws unsupportedoperationexception if exclusive mode is not supported */ protected boolean tryrelease(int arg) { throw new unsupportedoperationexception(); } /** * wakes up node's successor, if one exists. * * @param node the node */ private void unparksuccessor(node node) { /* * if status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. it is ok if this * fails or if status is changed by waiting thread. */ int ws = node.waitstatus; if (ws < 0) compareandsetwaitstatus(node, ws, 0); /* * thread to unpark is held in successor, which is normally * just the next node. but if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ node s = node.next; if (s == null || s.waitstatus > 0) { s = null; for (node t = tail; t != null && t != node; t = t.prev) if (t.waitstatus <= 0) s = t; } if (s != null) locksupport.unpark(s.thread); }
与acquire()方法中的tryacquire()类似,tryrelease()方法也是需要独占模式的自定义同步器去实现的。正常来说,tryrelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release()是根据tryrelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。
unparksuccessor(node)
方法用于唤醒等待队列中下一个线程。这里要注意的是,下一个线程并不一定是当前节点的next节点,而是下一个可以用来唤醒的线程,如果这个节点存在,调用unpark()
方法唤醒。
总之,release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
三、自定义独占锁
public class mutex implements lock { // 静态内部类,自定义同步器 private static class sync extends abstractqueuedsynchronizer { // 是否处于占用状态 protected boolean isheldexclusively() { return getstate() == 1; } // 当状态为0的时候获取锁 public boolean tryacquire(int acquires) { if (compareandsetstate(0, 1)) { setexclusiveownerthread(thread.currentthread()); return true; } return false; } // 释放锁,将状态设置为0 protected boolean tryrelease(int releases) { if (getstate() == 0) throw new illegalmonitorstateexception(); setexclusiveownerthread(null); setstate(0); return true; } // 返回一个condition,每个condition都包含了一个condition队列 condition newcondition() { return new conditionobject(); } } // 仅需要将操作代理到sync上即可 private final sync sync = new sync(); public void lock() { sync.acquire(1); } public boolean trylock() { return sync.tryacquire(1); } public void unlock() { sync.release(1); } public condition newcondition() { return sync.newcondition(); } public boolean islocked() { return sync.isheldexclusively(); } public boolean hasqueuedthreads() { return sync.hasqueuedthreads(); } public void lockinterruptibly() throws interruptedexception { sync.acquireinterruptibly(1); } public boolean trylock(long timeout, timeunit unit) throws interruptedexception { return sync.tryacquirenanos(1, unit.tonanos(timeout)); } }
上一篇: HTML颜色名称大全