Java多线程中ReentrantLock与Condition详解
一、reentrantlock类
1.1什么是reentrantlock
java.util.concurrent.lock中的lock框架是锁定的一个抽象,它允许把锁定的实现作为java类,而不是作为语言的特性来实现。这就为lock的多种实现留下了空间,各种实现可能有不同的调度算法、性能特性或者锁定语义。reentrantlock类实现了lock,它拥有与synchronized相同的并发性和内存语义,但是添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。(换句话说,当许多线程都想访问共享资源时,jvm可以花更少的时候来调度线程,把更多时间用在执行线程上。)
reentrant锁意味着什么呢?简单来说,它有一个与锁相关的获取计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,然后锁需要被释放两次才能获得真正释放。这模仿了synchronized的语义;如果线程进入由线程已经拥有的监控器保护的synchronized块,就允许线程继续进行,当线程退出第二个(或者后续)synchronized块的时候,不释放锁,只有线程退出它进入的监控器保护的第一个synchronized块时,才释放锁。
1.2reentrantlock与synchronized的比较
相同:reentrantlock提供了synchronized类似的功能和内存语义。
不同:
(1)reentrantlock功能性方面更全面,比如时间锁等候,可中断锁等候,锁投票等,因此更有扩展性。在多个条件变量和高度竞争锁的地方,用reentrantlock更合适,reentrantlock还提供了condition,对线程的等待和唤醒等操作更加灵活,一个reentrantlock可以有多个condition实例,所以更有扩展性。
(2)reentrantlock的性能比synchronized会好点。
(3)reentrantlock提供了可轮询的锁请求,他可以尝试的去取得锁,如果取得成功则继续处理,取得不成功,可以等下次运行的时候处理,所以不容易产生死锁,而synchronized则一旦进入锁请求要么成功,要么一直阻塞,所以更容易产生死锁。
1.3reentrantlock扩展的功能
1.3.1实现可轮询的锁请求
在内部锁中,死锁是致命的——唯一的恢复方法是重新启动程序,唯一的预防方法是在构建程序时不要出错。而可轮询的锁获取模式具有更完善的错误恢复机制,可以规避死锁的发生。
如果你不能获得所有需要的锁,那么使用可轮询的获取方式使你能够重新拿到控制权,它会释放你已经获得的这些锁,然后再重新尝试。可轮询的锁获取模式,由trylock()方法实现。此方法仅在调用时锁为空闲状态才获取该锁。如果锁可用,则获取锁,并立即返回值true。如果锁不可用,则此方法将立即返回值false。此方法的典型使用语句如下:
lock lock = ...; if (lock.trylock()) { try { // manipulate protected state } finally { lock.unlock(); } } else { // perform alternative actions }
1.3.2实现可定时的锁请求
当使用内部锁时,一旦开始请求,锁就不能停止了,所以内部锁给实现具有时限的活动带来了风险。为了解决这一问题,可以使用定时锁。当具有时限的活
动调用了阻塞方法,定时锁能够在时间预算内设定相应的超时。如果活动在期待的时间内没能获得结果,定时锁能使程序提前返回。可定时的锁获取模式,由trylock(long,timeunit)方法实现。
1.3.3实现可中断的锁获取请求
可中断的锁获取操作允许在可取消的活动中使用。lockinterruptibly()方法能够使你获得锁的时候响应中断。
1.4reentrantlock不好与需要注意的地方
(1)lock必须在finally块中释放。否则,如果受保护的代码将抛出异常,锁就有可能永远得不到释放!这一点区别看起来可能没什么,但是实际上,它极为重要。忘记在finally块中释放锁,可能会在程序中留下一个定时炸弹,当有一天炸弹爆炸时,您要花费很大力气才有找到源头在哪。而使用同步,jvm将确保锁会获得自动释放
(2)当jvm用synchronized管理锁定请求和释放时,jvm在生成线程转储时能够包括锁定信息。这些对调试非常有价值,因为它们能标识死锁或者其他异常行为的来源。lock类只是普通的类,jvm不知道具体哪个线程拥有lock对象。
二、条件变量condition
条件变量很大一个程度上是为了解决object.wait/notify/notifyall难以使用的问题。
我们通过一个实际的例子来解释condition的用法:
我们要打印1到9这9个数字,由a线程先打印1,2,3,然后由b线程打印4,5,6,然后再由a线程打印7,8,9. 这道题有很多种解法,现在我们使用condition来做这道题
package cn.outofmemory.locks; import java.util.concurrent.locks.condition; import java.util.concurrent.locks.lock; import java.util.concurrent.locks.reentrantlock; public class app { static class numberwrapper { public int value = 1; } public static void main(string[] args) { //初始化可重入锁 final lock lock = new reentrantlock(); //第一个条件当屏幕上输出到3 final condition reachthreecondition = lock.newcondition(); //第二个条件当屏幕上输出到6 final condition reachsixcondition = lock.newcondition(); //numberwrapper只是为了封装一个数字,一边可以将数字对象共享,并可以设置为final //注意这里不要用integer, integer 是不可变对象 final numberwrapper num = new numberwrapper(); //初始化a线程 thread threada = new thread(new runnable() { @override public void run() { //需要先获得锁 lock.lock(); try { system.out.println("threada start write"); //a线程先输出前3个数 while (num.value <= 3) { system.out.println(num.value); num.value++; } //输出到3时要signal,告诉b线程可以开始了 reachthreecondition.signal(); } finally { lock.unlock(); } lock.lock(); try { //等待输出6的条件 reachsixcondition.await(); system.out.println("threada start write"); //输出剩余数字 while (num.value <= 9) { system.out.println(num.value); num.value++; } } catch (interruptedexception e) { e.printstacktrace(); } finally { lock.unlock(); } } } ); thread threadb = new thread(new runnable() { @override public void run() { try { lock.lock(); while (num.value <= 3) { //等待3输出完毕的信号 reachthreecondition.await(); } } catch (interruptedexception e) { e.printstacktrace(); } finally { lock.unlock(); } try { lock.lock(); //已经收到信号,开始输出4,5,6 system.out.println("threadb start write"); while (num.value <= 6) { system.out.println(num.value); num.value++; } //4,5,6输出完毕,告诉a线程6输出完了 reachsixcondition.signal(); } finally { lock.unlock(); } } } ); //启动两个线程 threadb.start(); threada.start(); } }
上述代码中有完整的注释,请参考注释,理解condition的用法。
基本思路就是首先要a线程先写1,2,3,这时候b线程应该等待reachthredcondition信号,而当a线程写完3之后就通过signal告诉b线程“我写到3了,该你了”,这时候a线程要等嗲reachsixcondition信号,同时b线程得到通知,开始写4,5,6,写完4,5,6之后b线程通知a线程reachsixcondition条件成立了,这时候a线程就开始写剩下的7,8,9了。条件(也称为条件队列或条件变量)为线程提供了一个含义,以便在某个状态条件现在可能为true的另一个线程通知它之前,一直挂起该线程(即让其“等待”)。因为访问此共享状态信息发生在不同的线程中,所以它必须受保护,因此要将某种形式的锁与该条件相关联。等待提供一个条件的主要属性是:以原子方式释放相关的锁,并挂起当前线程,就像object.wait做的那样。
上述api说明表明条件变量需要与锁绑定,而且多个condition需要绑定到同一锁上。前面的lock中提到,获取一个条件变量的方法是lock.newcondition()。
voidawait()throwsinterruptedexception; voidawaituninterruptibly(); longawaitnanos(longnanostimeout)throwsinterruptedexception; booleanawait(longtime,timeunitunit)throwsinterruptedexception; booleanawaituntil(datedeadline)throwsinterruptedexception; voidsignal(); voidsignalall();
以上是condition接口定义的方法,await*对应于object.wait,signal对应于object.notify,signalall对应于object.notifyall。特别说明的是condition的接口改变名称就是为了避免与object中的wait/notify/notifyall的语义和使用上混淆,因为condition同样有wait/notify/notifyall方法。
每一个lock可以有任意数据的condition对象,condition是与lock绑定的,所以就有lock的公平性特性:如果是公平锁,线程为按照fifo的顺序从condition.await中释放,如果是非公平锁,那么后续的锁竞争就不保证fifo顺序了。
一个使用condition实现生产者消费者的模型例子如下。
import java.util.concurrent.locks.condition; import java.util.concurrent.locks.lock; import java.util.concurrent.locks.reentrantlock; public class productqueue<t> { private final t[] items; private final lock lock = new reentrantlock(); private condition notfull = lock.newcondition(); private condition notempty = lock.newcondition(); // private int head, tail, count; public productqueue(int maxsize) { items = (t[]) new object[maxsize]; } public productqueue() { this(10); } public void put(t t) throws interruptedexception { lock.lock(); try { while (count == getcapacity()) { notfull.await(); } items[tail] = t; if (++tail == getcapacity()) { tail = 0; } ++count; notempty.signalall(); } finally { lock.unlock(); } } public t take() throws interruptedexception { lock.lock(); try { while (count == 0) { notempty.await(); } t ret = items[head]; items[head] = null; //gc // if (++head == getcapacity()) { head = 0; } --count; notfull.signalall(); return ret; } finally { lock.unlock(); } } public int getcapacity() { return items.length; } public int size() { lock.lock(); try { return count; } finally { lock.unlock(); } } }
在这个例子中消费take()需要队列不为空,如果为空就挂起(await()),直到收到notempty的信号;生产put()需要队列不满,如果满了就挂起(await()),直到收到notfull的信号。
可能有人会问题,如果一个线程lock()对象后被挂起还没有unlock,那么另外一个线程就拿不到锁了(lock()操作会挂起),那么就无法通知(notify)前一个线程,这样岂不是“死锁”了?
2.1await*操作
上一节中说过多次reentrantlock是独占锁,一个线程拿到锁后如果不释放,那么另外一个线程肯定是拿不到锁,所以在lock.lock()和lock.unlock()之间可能有一次释放锁的操作(同样也必然还有一次获取锁的操作)。我们再回头看代码,不管take()还是put(),在进入lock.lock()后唯一可能释放锁的操作就是await()了。也就是说await()操作实际上就是释放锁,然后挂起线程,一旦条件满足就被唤醒,再次获取锁!
public final void await() throws interruptedexception { if (thread.interrupted()) throw new interruptedexception(); node node = addconditionwaiter(); int savedstate = fullyrelease(node); int interruptmode = 0; while (!isonsyncqueue(node)) { locksupport.park(this); if ((interruptmode = checkinterruptwhilewaiting(node)) != 0) break; } if (acquirequeued(node, savedstate) && interruptmode != throw_ie) interruptmode = reinterrupt; if (node.nextwaiter != null) unlinkcancelledwaiters(); if (interruptmode != 0) reportinterruptafterwait(interruptmode); }
上面是await()的代码片段。上一节中说过,aqs在获取锁的时候需要有一个chl的fifo队列,所以对于一个condition.await()而言,如果释放了锁,要想再一次获取锁那么就需要进入队列,等待被通知获取锁。完整的await()操作是安装如下步骤进行的:
将当前线程加入condition锁队列。特别说明的是,这里不同于aqs的队列,这里进入的是condition的fifo队列。后面会具体谈到此结构。进行2。
释放锁。这里可以看到将锁释放了,否则别的线程就无法拿到锁而发生死锁。进行3。
自旋(while)挂起,直到被唤醒或者超时或者cacelled等。进行4。
获取锁(acquirequeued)。并将自己从condition的fifo队列中释放,表明自己不再需要锁(我已经拿到锁了)。
这里再回头介绍condition的数据结构。我们知道一个condition可以在多个地方被await*(),那么就需要一个fifo的结构将这些condition串联起来,然后根据需要唤醒一个或者多个(通常是所有)。所以在condition内部就需要一个fifo的队列。
private transient node firstwaiter; private transient node lastwaiter;
上面的两个节点就是描述一个fifo的队列。我们再结合前面提到的节点(node)数据结构。我们就发现node.nextwaiter就派上用场了!nextwaiter就是将一系列的condition.await*串联起来组成一个fifo的队列。
2.2signal/signalall操作
await*()清楚了,现在再来看signal/signalall就容易多了。按照signal/signalall的需求,就是要将condition.await*()中fifo队列中第一个node唤醒(或者全部node)唤醒。尽管所有node可能都被唤醒,但是要知道的是仍然只有一个线程能够拿到锁,其它没有拿到锁的线程仍然需要自旋等待,就上上面提到的第4步(acquirequeued)。
private void dosignal(node first) { do { if ( (firstwaiter = first.nextwaiter) == null) lastwaiter = null; first.nextwaiter = null; } while (!transferforsignal(first) && (first = firstwaiter) != null); } private void dosignalall(node first) { lastwaiter = firstwaiter = null; do { node next = first.nextwaiter; first.nextwaiter = null; transferforsignal(first); first = next; } while (first != null); }
final boolean transferforsignal(node node) { if (!compareandsetwaitstatus(node, node.condition, 0)) return false; node p = enq(node); int c = p.waitstatus; if (c > 0 || !compareandsetwaitstatus(p, c, node.signal)) locksupport.unpark(node.thread); return true; }
总结
以上就是本文关于java多线程中reentrantlock与condition详解的全部内容,希望对大家有所帮助。有什么问题可以随时留言,小编会及时回复大家的。