欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java并发编程一Condition初使用

程序员文章站 2024-03-12 12:34:26
...

Java并发编程一Condition初使用

Condition是什么?

Condition是在Java1.5中才出现的,它用来替代传统Object中的wait()notify(),实现线程间的协作,相比使用Object中的wait()notify(),使用Conditionawait()signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition,阻塞队列实际上是使用了Condition来模拟线程间协作。

Condition是个接口,基本的方法就是await()signal()方法。
Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition()(假如lockReentrantLock的实例,ReentrantLockLock的实现类)。
调用Conditionawait()signal()方法,都必须在lock保护之内,就是说必须在lock.lock()lock.unlock()之间才可以使用,这和在synchronized同步代码块或者同步方法中使用Object中的wait()notify()类似。

  • Conditon中的await()对应Object中的wait()。
  • Condition中的signal()对应Object中的notify()
  • Condition中的signalAll()对应Object中的notifyAll()

我们来使用一下它吧。

代码:

package flowcontrol.condition;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionDemo1 {
    
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    void method1() throws InterruptedException {
        lock.lock();
        try{
            System.out.println("条件不满足,开始await");
            condition.await();
            System.out.println("条件满足了,开始执行后续的任务");
        }finally {
            lock.unlock();
        }
    }

    void method2() {
        lock.lock();
        try{
            System.out.println("准备工作完成,唤醒其他的线程");
            condition.signal();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionDemo1 conditionDemo1 = new ConditionDemo1();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    conditionDemo1.method2();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        conditionDemo1.method1();
    }
}

输出:

条件不满足,开始await
准备工作完成,唤醒其他的线程
条件满足了,开始执行后续的任务

是不是很像synchronized同步代码块或者同步方法中调用的wait()notify()

来看看Condition 的源码。

package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.Date;

public interface Condition {

    void await() throws InterruptedException;

    void awaitUninterruptibly();

    long awaitNanos(long nanosTimeout) throws InterruptedException;

    boolean await(long time, TimeUnit unit) throws InterruptedException;

    boolean awaitUntil(Date deadline) throws InterruptedException;

    void signal();

    void signalAll();
}
  • await():造成当前线程在接到信号或被中断之前一直处于等待状态。
  • awaitUninterruptibly():造成当前线程在接到信号之前一直处于等待状态(该方法不响应中断)。
  • awaitNanos(long nanosTimeout):造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值为nanosTimeout - 消耗时间,如果返回值<= 0,则可以认定它已经超时了。
  • await(long time, TimeUnit unit):造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
  • awaitUntil(Date deadline):造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回false
  • signal():唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
  • signalAll():唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。

说了Condition中的方法,就可以将Condition中的方法和Object中的方法进行对比了,如下图所示:
Java并发编程一Condition初使用
最后我们用LockCondition和队列来实现一个生产者-消费者模式。

代码:

package flowcontrol.condition;

import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionDemo2 {

    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        ConditionDemo2 conditionDemo2 = new ConditionDemo2();
        Producer producer = conditionDemo2.new Producer();
        Consumer consumer = conditionDemo2.new Consumer();
        producer.start();
        consumer.start();
    }

    class Consumer extends Thread {

        @Override
        public void run() {
            consume();
        }

        private void consume() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == 0) {
                        System.out.println("队列空,等待数据");
                        try {
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.poll();
                    notFull.signalAll();
                    System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素");
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class Producer extends Thread {

        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == queueSize) {
                        System.out.println("队列满,等待有空余");
                        try {
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.offer(1);
                    notEmpty.signal();
                    System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size()));
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}

输出:

向队列插入了一个元素,队列剩余空间9
向队列插入了一个元素,队列剩余空间8
向队列插入了一个元素,队列剩余空间7
向队列插入了一个元素,队列剩余空间6
向队列插入了一个元素,队列剩余空间5
向队列插入了一个元素,队列剩余空间4
向队列插入了一个元素,队列剩余空间3
向队列插入了一个元素,队列剩余空间2
向队列插入了一个元素,队列剩余空间1
向队列插入了一个元素,队列剩余空间0
队列满,等待有空余
从队列里取走了一个数据,队列剩余9个元素
从队列里取走了一个数据,队列剩余8个元素
从队列里取走了一个数据,队列剩余7个元素
从队列里取走了一个数据,队列剩余6个元素
从队列里取走了一个数据,队列剩余5个元素
从队列里取走了一个数据,队列剩余4个元素
从队列里取走了一个数据,队列剩余3个元素
从队列里取走了一个数据,队列剩余2个元素
从队列里取走了一个数据,队列剩余1个元素
从队列里取走了一个数据,队列剩余0个元素
队列空,等待数据

输出我只粘贴了一部分,但从输出结果看,这符合我们的预期。

我们上面用用LockCondition和队列来实现的生产者-消费者模式类似于ArrayBlockingQueue中的生产者-消费者模式的实现,只不过ArrayBlockingQueue用的是数组。
Java并发编程一Condition初使用

其实使用synchronized再结合Object中的wait()notify()同样也可以实现生产者-消费者模式。

但是LockCondition中的方法相较于synchronizedObject中的方法有很明显的扩展性和优势,比如Lock中的tryLock()lockInterruptibly()等,并且一个Lock可以new多个ConditionCondition中还有awaitUninterruptibly() 这种可以不被中断的等待方法。

相关标签: 并发编程