Java多线程系列--“JUC锁”06之 Condition条件
synchronized配合object的wait()、notify()系列方法可以实现等待/通知模式。
lock提供了条件condition,对线程的等待、唤醒操作更加详细和灵活。
condition的作用是对锁进行更精确的控制。condition中的await()方法相当于object的wait()方法,condition中的signal()方法相当于object的notify()方法,condition中的signalall()相当于object的notifyall()方法。
一、condition实现生产者消费者问题
class boundedbuffer { final lock lock = new reentrantlock(); // condition 依赖于 lock 来产生 final condition notfull = lock.newcondition(); final condition notempty = lock.newcondition(); final object[] items = new object[100]; int putptr, takeptr, count; // 生产 public void put(object x) throws interruptedexception { lock.lock(); try { while (count == items.length) notfull.await(); // 队列已满,等待,直到 not full 才能继续生产 items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notempty.signal(); // 生产成功,队列已经 not empty 了,发个通知出去 } finally { lock.unlock(); } } // 消费 public object take() throws interruptedexception { lock.lock(); try { while (count == 0) notempty.await(); // 队列为空,等待,直到队列 not empty,才能继续消费 object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notfull.signal(); // 被我消费掉一个,队列 not full 了,发个通知出去 return x; } finally { lock.unlock(); } } }
二、lock与condition关系
// 一个lock可以new多个条件 public static void main(string[] args) { reentrantlock lock = new reentrantlock(); lock.lock(); condition newcondition1 = lock.newcondition(); newcondition1.await(); newcondition1.signal(); condition newcondition2 = lock.newcondition(); newcondition2.await(); newcondition2.signal(); lock.unlock(); } // reentrantlock.newcondition() public condition newcondition() { return sync.newcondition(); } // reentrantlock.sync.newcondition() final conditionobject newcondition() { return new conditionobject(); }
三、条件队列
每个condition对象都包含着一个fifo队列,该队列是condition对象通知/等待功能的关键。在队列中每一个节点(使用的aqs的节点)都包含着一个线程引用,该线程就是在该condition对象上等待的线程。
public class conditionobject implements condition, java.io.serializable { private static final long serialversionuid = 1173984872572414699l; private transient node firstwaiter; //头节点 private transient node lastwaiter; //尾节点 public conditionobject() { } /** * 通过addconditionwaiter()方法理解等待队列数据结构 * 将当前线程加入条件等待队列 * 1.node就是aqs的node * 2.单向链表,通过nextwaiter连接 * 3.waitstatus==node.condition才能在等待队列中 */ private node addconditionwaiter() { node t = lastwaiter; if (t != null && t.waitstatus != node.condition) { unlinkcancelledwaiters(); t = lastwaiter; } node node = new node(thread.currentthread(), node.condition); if (t == null) firstwaiter = node; else t.nextwaiter = node; lastwaiter = node; return node; } /** * 清除队列中不是等待状态的线程 */ private void unlinkcancelledwaiters() { node t = firstwaiter; node trail = null; // 用于保存前一个节点 while (t != null) { node next = t.nextwaiter; if (t.waitstatus != node.condition) { t.nextwaiter = null; if (trail == null) firstwaiter = next; else trail.nextwaiter = next; if (next == null) lastwaiter = trail; } else trail = t; t = next; } } }
四、等待await()
public final void await() throws interruptedexception { if (thread.interrupted()) throw new interruptedexception(); node node = addconditionwaiter(); // 当前线程new node()加入条件队列 int savedstate = fullyrelease(node); // 释放当先线程的锁 int interruptmode = 0; /** * 自旋: * 1.当前节点不在同步队列(刚new的节点肯定不在),挂起当前线程,等待被唤醒 * 2.当其他线程调用同一个conditionobject的signal方法时,会将队列里的节点放入同步队列,并unpark线程(排队唤醒) * 3.如果该节点被唤醒,再自旋检查是否在同步队列。发现已经在队列中,就可以跳出循环,获取lock */ while (!isonsyncqueue(node)) { locksupport.park(this); if ((interruptmode = checkinterruptwhilewaiting(node)) != 0) // 处理打断 break; } if (acquirequeued(node, savedstate) && interruptmode != throw_ie) // 获取锁 interruptmode = reinterrupt; if (node.nextwaiter != null) // clean up if cancelled unlinkcancelledwaiters(); if (interruptmode != 0) reportinterruptafterwait(interruptmode); }
五、唤醒signal()
public final void signal() { if (!isheldexclusively()) // 检查是否拿到锁 throw new illegalmonitorstateexception(); node first = firstwaiter; if (first != null) dosignal(first); } private void dosignal(node first) { do { if ( (firstwaiter = first.nextwaiter) == null) lastwaiter = null; first.nextwaiter = null; } while (!transferforsignal(first) && // 唤醒队列第一个节点 (first = firstwaiter) != null); } final boolean transferforsignal(node node) { if (!compareandsetwaitstatus(node, node.condition, 0)) return false; node p = enq(node); // 加入同步队列 int ws = p.waitstatus; if (ws > 0 || !compareandsetwaitstatus(p, ws, node.signal)) locksupport.unpark(node.thread); // 唤醒线程 return true; }
await() :造成当前线程在接到信号或被中断之前一直处于等待状态。
await(long time, timeunit unit) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
awaitnanos(long nanostimeout) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanostimesout之前唤醒,那么返回值 = nanostimeout – 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。
awaituninterruptibly() :造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。
awaituntil(date deadline) :造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false。
signal():唤醒一个等待线程。该线程从等待方法返回前必须获得与condition相关的锁。
signal()all:唤醒所有等待线程。能够从等待方法返回的线程必须获得与condition相关的锁。