并发编程系列之Condition接口
前言
前面我们学习线程的时候讲过等待通知模式,之前讲的是通过wait,notify/notifyAll配合synchronized关键字,实现等待通知,今天呢,我们介绍另外一种同样实现等待通知模式的对象叫做condition接口,配合lock使用,也能完成等待通知,但是跟之前说的又有一些区别,今天就让我们来认识一下吧,OK,开始我们今天的并发之旅吧,祝您旅途愉快。
什么是Condition接口?
Condition定义了等待/通知两种类型的方法,当前线程如果调用这些方法之前,必须先获取到condition对象关联的锁,Condition对象是由Lock对象创建出来的,也就是说Condition是绑定在一个Lock对象上的,依赖于Lock对象,使用时需要通过Lock对象new出来。相比于之前的wait/notify而言,condition充当wait/notify的角色,而lock对象充当synchronized锁角色。
我们看下Condition接口提供的一些方法如下:
如何使用Condition实现等待通知
案例1:一个condition即1个等待队列
通过上面我们知道要使用condition提供的方法就必须将它绑定一个Lock对象,然后用法和wait/notify差不多,我们看下面demo
public class ConditionDemo {
// 创建重入锁
private Lock lock = new ReentrantLock();
// Lock对象创建condition对象
private Condition condition = lock.newCondition();
public void method1(){
try {
lock.lock();
System.out.println("当前线程:" + Thread.currentThread().getName() + "获取锁,并睡眠3秒..");
Thread.sleep(3000);
System.out.println("当前线程:" + Thread.currentThread().getName() + "释放锁..进入等待状态");
// 等待
condition.await();
System.out.println("当前线程:" + Thread.currentThread().getName() +"被唤醒,继续执行...");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void method2(){
try {
lock.lock();
System.out.println("当前线程:" + Thread.currentThread().getName() + "获取锁,进入...睡眠3秒");
Thread.sleep(3000);
System.out.println("当前线程:" + Thread.currentThread().getName() + "发出唤醒..,并释放锁");
// 唤醒,同样要注意多线程下死锁的发生,优先使用signalAll()唤醒所有等待此条件的线程
condition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws Exception{
final ConditionDemo uc = new ConditionDemo();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
uc.method1();
}
}, "线程1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
uc.method2();
}
}, "线程2");
System.out.println("线程1启动。。。");
t1.start();
Thread.sleep(1000);
System.out.println("线程2启动。。。");
t2.start();
}
}
执行结果如下:
案例2:多个Condition,即多个等待队列
public class ConditionDemo {
// 定义重入锁
private ReentrantLock lock = new ReentrantLock();
// condition1
private Condition c1 = lock.newCondition();
// condition2
private Condition c2 = lock.newCondition();
public void m1(){
try {
lock.lock();
System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m1等待..");
c1.await();
System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m1继续..");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void m2(){
try {
lock.lock();
System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m2等待..");
c1.await();
System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m2继续..");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void m3(){
try {
lock.lock();
System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m3等待..");
c2.await();
System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m3继续..");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void m4(){
try {
lock.lock();
System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒..");
// 唤醒所有condition1上的等待线程
c1.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void m5(){
try {
lock.lock();
System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒..");
// 唤醒所有condition2上的等待线程
c2.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
final ConditionDemo umc = new ConditionDemo();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
umc.m1();
}
},"Thread1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
umc.m2();
}
},"Thread2");
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
umc.m3();
}
},"Thread3");
Thread t4 = new Thread(new Runnable() {
@Override
public void run() {
umc.m4();
}
},"Thread4");
Thread t5 = new Thread(new Runnable() {
@Override
public void run() {
umc.m5();
}
},"Thread5");
// condition1
t1.start();
// condition1
t2.start();
// condition2
t3.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 唤醒condition1
t4.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 唤醒condition2
t5.start();
}
}
执行结果如下:
Condition底层实现
Condition接口源码中提供的是如下这几个方法,并没有具体实现,
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
具体实现主要主要体现在ConditionObject这个类上,这个类是AQS的内部类,每个ConditionObject对象都包含一个等待队列,下面我们就分别从等待队列,等待,通知三个方面分析ConditionObject的底层实现原理
等待队列
一个condition包含一个等待队列(FIFO),condition拥有一个首节点和一个尾节点,如下图:condition中拥有首位节点的引用,当新增节点时,只需要将原来尾节点的下个节点指向它,并且更新尾节点接口,该更新操作跟之前同步队列中CAS更新尾节点不同,此处不需要CAS操作,因为condition的操作是在一个lock里面进行的,是已经获取锁的,所以这个操作是线程安全的;
在前面所讲的wait等待通知模型中其实同步器是只有一个同步队列和一个等待队列的,而在我们这里,因为可以同时多个condition,上面案例也有使用过,也就是说,condition实现的同步器中,其实是一个同步队列和多个等待队列,从condition是AQS一个内部类也可以证实这一点,也就是说我们可以创建多个condition,每个condition都可以访问AQS提供的方法,相当于每个condition都持有所属AQS的引用,其关系模型如下:
等待
调用Condition的await开头的系列方法,会使当前线程进入等待队列等待并释放锁,线程状态变为等待状态,已上图为例,就是同步队列中的首节点(或者获取锁的节点,因为非公平性锁就不一定是首节点)移动到了Condition的等待队列中,这里关键的就是等待方法await,我们来看下其源码实现:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 当前线程加入等待队列,并不是直接加入,而是把当前线程构造成一个新的节点再加入
Node node = addConditionWaiter();
// 释放同步状态即释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 唤醒同步队列中后续节点,线程进入等待
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 调用acquireQueued尝试获取同步状态,获取成功后,线程中断返回
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
整个过程示意图如下:
通知
调用Condition的signal()方法将会唤醒在等待队列中的首节点,该节点也是到目前为止等待时间最长的节点,等待队列遵循FIFO原则。调用signalAll()方法将会唤醒该同步器上等待队列中的所有节点,我们看signal方法源码分析:
public final void signal() {
// 前置检查,判断当前线程是否是获取了锁的线程,如果不是抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 取得等待队列的头结点,头结点不为空执行doSignal,否则,唤醒结束
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
我们再来看下doSignal的源码:
private void doSignal(Node first) {
// 调用transferForSignal将节点从等待队列移动到同步队列
// 将该节点从等待队列删除
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
再往下看,我们追溯到transferForSignal方法:
final boolean transferForSignal(Node node) {
// 将节点waitStatus设置为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 调用enq方法将该节点加入同步队列
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 使用LockSuppor.unpark()方法唤醒该节点的线程
LockSupport.unpark(node.thread);
return true;
}
signalAll()方法实现也差不多,我就不做过多讲解了,看下doSignlAll方法即可:
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
我们再来看下节点在队列中的变化过程,如下图所示:
以上就是今天等待通知机制condition的全部内容,结合wait/notify+synchronized,对比学习,希望能对您有所收获!!!
上一篇: Shell 编程之规范和变量