欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

并发编程系列之Condition接口

程序员文章站 2024-03-12 12:22:20
...

前言

前面我们学习线程的时候讲过等待通知模式,之前讲的是通过wait,notify/notifyAll配合synchronized关键字,实现等待通知,今天呢,我们介绍另外一种同样实现等待通知模式的对象叫做condition接口,配合lock使用,也能完成等待通知,但是跟之前说的又有一些区别,今天就让我们来认识一下吧,OK,开始我们今天的并发之旅吧,祝您旅途愉快。

 

什么是Condition接口?

Condition定义了等待/通知两种类型的方法,当前线程如果调用这些方法之前,必须先获取到condition对象关联的锁,Condition对象是由Lock对象创建出来的,也就是说Condition是绑定在一个Lock对象上的,依赖于Lock对象,使用时需要通过Lock对象new出来。相比于之前的wait/notify而言,condition充当wait/notify的角色,而lock对象充当synchronized锁角色。

我们看下Condition接口提供的一些方法如下:

并发编程系列之Condition接口

 

 

如何使用Condition实现等待通知

案例1:一个condition即1个等待队列

通过上面我们知道要使用condition提供的方法就必须将它绑定一个Lock对象,然后用法和wait/notify差不多,我们看下面demo

public class ConditionDemo {
   // 创建重入锁
   private Lock lock = new ReentrantLock();
   // Lock对象创建condition对象
   private Condition condition = lock.newCondition();

   public void method1(){
       try {
           lock.lock();
           System.out.println("当前线程:" + Thread.currentThread().getName() + "获取锁,并睡眠3秒..");
           Thread.sleep(3000);
           System.out.println("当前线程:" + Thread.currentThread().getName() + "释放锁..进入等待状态");
           // 等待
           condition.await();
           System.out.println("当前线程:" + Thread.currentThread().getName() +"被唤醒,继续执行...");
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           lock.unlock();
       }
   }

   public void method2(){
       try {
           lock.lock();
           System.out.println("当前线程:" + Thread.currentThread().getName() + "获取锁,进入...睡眠3秒");
           Thread.sleep(3000);
           System.out.println("当前线程:" + Thread.currentThread().getName() + "发出唤醒..,并释放锁");
           // 唤醒,同样要注意多线程下死锁的发生,优先使用signalAll()唤醒所有等待此条件的线程
           condition.signal();
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           lock.unlock();
       }
   }

   public static void main(String[] args) throws Exception{
       final ConditionDemo uc = new ConditionDemo();
       Thread t1 = new Thread(new Runnable() {
           @Override
           public void run() {
               uc.method1();
           }
       }, "线程1");
       Thread t2 = new Thread(new Runnable() {
           @Override
           public void run() {
               uc.method2();
           }
       }, "线程2");
       System.out.println("线程1启动。。。");
       t1.start();
       Thread.sleep(1000);
       System.out.println("线程2启动。。。");
       t2.start();
   }
}

执行结果如下:

并发编程系列之Condition接口

案例2:多个Condition,即多个等待队列

public class ConditionDemo {

   // 定义重入锁
   private ReentrantLock lock = new ReentrantLock();
   // condition1
   private Condition c1 = lock.newCondition();
   // condition2
   private Condition c2 = lock.newCondition();

   public void m1(){
       try {
           lock.lock();
           System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m1等待..");
           c1.await();
           System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m1继续..");
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           lock.unlock();
       }
   }

   public void m2(){
       try {
           lock.lock();
           System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m2等待..");
           c1.await();
           System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m2继续..");
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           lock.unlock();
       }
   }

   public void m3(){
       try {
           lock.lock();
           System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m3等待..");
           c2.await();
           System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m3继续..");
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           lock.unlock();
       }
   }

   public void m4(){
       try {
           lock.lock();
           System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒..");
           // 唤醒所有condition1上的等待线程
           c1.signalAll();
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           lock.unlock();
       }
   }

   public void m5(){
       try {
           lock.lock();
           System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒..");
           // 唤醒所有condition2上的等待线程
           c2.signalAll();
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           lock.unlock();
       }
   }

   public static void main(String[] args) {
       final ConditionDemo umc = new ConditionDemo();
       Thread t1 = new Thread(new Runnable() {
           @Override
           public void run() {
               umc.m1();
           }
       },"Thread1");
       Thread t2 = new Thread(new Runnable() {
           @Override
           public void run() {
               umc.m2();
           }
       },"Thread2");
       Thread t3 = new Thread(new Runnable() {
           @Override
           public void run() {
               umc.m3();
           }
       },"Thread3");
       Thread t4 = new Thread(new Runnable() {
           @Override
           public void run() {
               umc.m4();
           }
       },"Thread4");
       Thread t5 = new Thread(new Runnable() {
           @Override
           public void run() {
               umc.m5();
           }
       },"Thread5");

       // condition1
       t1.start();
       // condition1
       t2.start();
       // condition2
       t3.start();
       try {
           Thread.sleep(2000);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       // 唤醒condition1
       t4.start();
       try {
           Thread.sleep(2000);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       // 唤醒condition2
       t5.start();
   }
}

执行结果如下:

并发编程系列之Condition接口

 

Condition底层实现

Condition接口源码中提供的是如下这几个方法,并没有具体实现,

public interface Condition {

   void await() throws InterruptedException;
   void awaitUninterruptibly();
   long awaitNanos(long nanosTimeout) throws InterruptedException;
   boolean await(long time, TimeUnit unit) throws InterruptedException;
   boolean awaitUntil(Date deadline) throws InterruptedException;
   void signal();
   void signalAll();
}

具体实现主要主要体现在ConditionObject这个类上,这个类是AQS的内部类,每个ConditionObject对象都包含一个等待队列,下面我们就分别从等待队列,等待,通知三个方面分析ConditionObject的底层实现原理

 

等待队列

一个condition包含一个等待队列(FIFO),condition拥有一个首节点和一个尾节点,如下图:condition中拥有首位节点的引用,当新增节点时,只需要将原来尾节点的下个节点指向它,并且更新尾节点接口,该更新操作跟之前同步队列中CAS更新尾节点不同,此处不需要CAS操作,因为condition的操作是在一个lock里面进行的,是已经获取锁的,所以这个操作是线程安全的;

并发编程系列之Condition接口

在前面所讲的wait等待通知模型中其实同步器是只有一个同步队列和一个等待队列的,而在我们这里,因为可以同时多个condition,上面案例也有使用过,也就是说,condition实现的同步器中,其实是一个同步队列和多个等待队列,从condition是AQS一个内部类也可以证实这一点,也就是说我们可以创建多个condition,每个condition都可以访问AQS提供的方法,相当于每个condition都持有所属AQS的引用,其关系模型如下:

并发编程系列之Condition接口

 

 

等待

调用Condition的await开头的系列方法,会使当前线程进入等待队列等待并释放锁,线程状态变为等待状态,已上图为例,就是同步队列中的首节点(或者获取锁的节点,因为非公平性锁就不一定是首节点)移动到了Condition的等待队列中,这里关键的就是等待方法await,我们来看下其源码实现:

public final void await() throws InterruptedException {
           if (Thread.interrupted())
               throw new InterruptedException();
           // 当前线程加入等待队列,并不是直接加入,而是把当前线程构造成一个新的节点再加入
           Node node = addConditionWaiter();
           // 释放同步状态即释放锁
           int savedState = fullyRelease(node);
           int interruptMode = 0;
           // 唤醒同步队列中后续节点,线程进入等待
           while (!isOnSyncQueue(node)) {
               LockSupport.park(this);
               if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                   break;
           }
           // 调用acquireQueued尝试获取同步状态,获取成功后,线程中断返回
           if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
               interruptMode = REINTERRUPT;
           if (node.nextWaiter != null) // clean up if cancelled
               unlinkCancelledWaiters();
           if (interruptMode != 0)
               reportInterruptAfterWait(interruptMode);
       }

整个过程示意图如下:

并发编程系列之Condition接口

 

通知

调用Condition的signal()方法将会唤醒在等待队列中的首节点,该节点也是到目前为止等待时间最长的节点,等待队列遵循FIFO原则。调用signalAll()方法将会唤醒该同步器上等待队列中的所有节点,我们看signal方法源码分析:

public final void signal() {
           // 前置检查,判断当前线程是否是获取了锁的线程,如果不是抛出异常
           if (!isHeldExclusively())
               throw new IllegalMonitorStateException();
           // 取得等待队列的头结点,头结点不为空执行doSignal,否则,唤醒结束
           Node first = firstWaiter;
           if (first != null)
               doSignal(first);
       }

我们再来看下doSignal的源码:

private void doSignal(Node first) {
           // 调用transferForSignal将节点从等待队列移动到同步队列
           // 将该节点从等待队列删除
           do {
               if ( (firstWaiter = first.nextWaiter) == null)
                   lastWaiter = null;
               first.nextWaiter = null;
           } while (!transferForSignal(first) &&
                    (first = firstWaiter) != null);
       }

再往下看,我们追溯到transferForSignal方法:

final boolean transferForSignal(Node node) {
       // 将节点waitStatus设置为0
       if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
           return false;
       // 调用enq方法将该节点加入同步队列
       Node p = enq(node);
       int ws = p.waitStatus;
       if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
           // 使用LockSuppor.unpark()方法唤醒该节点的线程
           LockSupport.unpark(node.thread);
       return true;
   }

signalAll()方法实现也差不多,我就不做过多讲解了,看下doSignlAll方法即可:

/**
  * Removes and transfers all nodes.
  * @param first (non-null) the first node on condition queue
 */
private void doSignalAll(Node first) {
  lastWaiter = firstWaiter = null;
   do {
       Node next = first.nextWaiter;
       first.nextWaiter = null;
       transferForSignal(first);
       first = next;
      } while (first != null);
  }

我们再来看下节点在队列中的变化过程,如下图所示:

并发编程系列之Condition接口

 

以上就是今天等待通知机制condition的全部内容,结合wait/notify+synchronized,对比学习,希望能对您有所收获!!!

相关标签: 并发编程