欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

生产者消费者模型理解及java两种方式实现

程序员文章站 2022-10-03 13:55:07
生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。> 生产者生成一定量的数据放到缓冲区中,然后重复此过程;与此同时,消费者也在缓冲区消耗这些数据。> 生产者和消费者之间必须保持同步,要保证生产者不会在缓冲区满时放入数据,消费者也不会在缓冲区空时消耗数据。不够完善的解决方法容易出现死锁的情况,此时进程都在等待唤醒...

概念

生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。
生产者生成一定量的数据放到缓冲区中,然后重复此过程;与此同时,消费者也在缓冲区消耗这些数据。
生产者和消费者之间必须保持同步,要保证生产者不会在缓冲区满时放入数据,消费者也不会在缓冲区空时消耗数据。不够完善的解决方法容易出现死锁的情况,此时进程都在等待唤醒

如下图:

生产者消费者模型理解及java两种方式实现

为什么要用?

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。

文字看起来很抽象,这里用个简单例子来形容。
比如去自助餐厅吃饭(我发现我所有的例子都是吃),大致有:

  • 1.厨师把菜做好,放到自助区 —— 相当于生产者制造数据并且放到缓冲区。
  • 2.你把做好的菜拿出来吃掉 —— 相当于消费者从缓冲区拿出数据使用
  • 3.如果今天顾客少,自助区的菜放满了,那厨师就不能继续做菜了 —— 相当于缓存区满,生产者需要阻塞等待消费者消费
  • 4.如果今天的菜好吃或者你饿的吃光了自助区的菜,别人就没有吃的 —— 相当于缓存区空,消费者需要阻塞等待生产者生产

优点

解耦
假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。

支持并发
生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。

使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体(常见并发类型有进程和线程两种)。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。
支持忙闲不均
缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。


生产者消费者模式两种实现方式代码

1.synchronizedwait()notify() /notifyAll()

synchronized上一篇文章说的很详细,就不再解释了。我们看一下wait()是干什么的?

1.wait()是Object里面的方法,而不是Thread里面的,这一点很容易搞错。
它的作用是将当前线程置于预执行队列,并在wait()所在的代码处处停止执行,直到接到通知或被中断为止
2.wait()只能在同步代码块或者同步方法中执行,如果调用wait()方法,而没有持有适当的锁,就会抛出异常。
3.wait()方法调用后,当前线程释放锁,线程与其他线程竞争重新获取锁。

notify()

notify方法就是使停止的线程继续运行。
1.notify()也要在同步方法或同步块中调用,该方法是用来通知那些可能等待该对象的对象锁的其它线程,对其发出通知,并使它们重新获取该对象的对象锁。如果有多个线程等待,则线程规划器随机挑选出一个呈wait状态的线程。
2.在notify()方法后,当前线程不会马上释放该对象锁,要等到执行notify()方法的线程将程序执行完,也就是退出同步代码块之后才会释放对象锁。

以上讲解了notify方法只是唤醒某一个等待线程,那么如果有多个线程都在等待中怎么办呢,这个时候就可以使用
notifyAll方法,可以一次唤醒所有的等待线程

代码:

package Producer_consumer;

//默认生产者:一次生产3个
//默认消费者:一次消费1个
public class Producer_consumer {
    //总量:上限100,下限0
    public static volatile int SUM;

    public static void main(String[] args) {
        //启动生产者线程5个
        for (int i = 0; i < 5; i++) {
            new Thread(new Producer(), "生产者" + i).start();
        }
        //启动消费者线程5个
        for (int i = 0; i < 10; i++) {
            new Thread(new Consumer(), "消费者" + i).start();
        }
    }

	//生产者类
    private static class Producer implements Runnable {
        @Override
        public void run() {
            try {
                //总生产20次
                for (int i = 0; i < 20; i++) {
                    synchronized (Producer_consumer.class) {
                        while (SUM + 3 > 100) {  //下次生产完库存不能大于100,否则等待
                            Producer_consumer.class.wait();
                        }
                        //开始生产
                        SUM += 3;
                        //模拟休息
                        Thread.sleep(100);
                        //notify/notifyAll通知因调用wait阻塞的线程,notify随机唤醒,notifyAll唤醒全部
                        //synchronized代码块结束,也就是释放对象锁之后才会唤醒
                        //也就是说,synchronized代码块结束,wait 和 synchronized阻塞的线程都会被唤醒
                        Producer_consumer.class.notifyAll();
                        System.out.println(Thread.currentThread().getName() + "生产3,库存为:" + SUM);
                    }
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

	//消费者类
    private static class Consumer implements Runnable {

        @Override
        public void run() {
            try {
                while (true) {
                    synchronized (Producer_consumer.class) {
                        //库存为0,当前线程不能消费,进入阻塞
                        while (SUM == 0) {
                            //释放对象锁,让其他线程继续运行,当前线程阻塞
                            Producer_consumer.class.wait();
                        }
                        SUM--;
                        //模拟休息
                        Thread.sleep(100);
                        Producer_consumer.class.notifyAll();
                        System.out.println(Thread.currentThread().getName() + "消费1,库存为:" + SUM);
                    }
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果:

生产者0生产3,库存为:3
消费者9消费1,库存为:2
消费者7消费1,库存为:1
消费者8消费1,库存为:0
生产者4生产3,库存为:3
生产者3生产3,库存为:6
生产者2生产3,库存为:9
生产者1生产3,库存为:12
生产者3生产3,库存为:15
生产者4生产3,库存为:18
消费者0消费1,库存为:17
消费者1消费1,库存为:16
............//太多了 省略结果,好奇的py自己跑一下

2.BlockingQueue阻塞队列方法
BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:
当队列满了的时候进行入队列操作
当队列空了的时候进行出队列操作

因此,当一个线程对已经满了的阻塞队列进行入队操作时会阻塞,除非有另外一个线程进行了出队操作
当一个线程对一个空的阻塞队列进行出队操作时也会阻塞,除非有另外一个线程进行了入队操作
从上可知,阻塞队列是线程安全的。

以下代码为手搓,肯定或多或少存在问题。希望大家及时指出:

package BlockingQueue;

/**
 * 实现阻塞式队列:
 * 1.满足线程安全的消费,生产功能
 * 2.消费/生产达到下限/上限需阻塞等待
 */
public class MyBlockingQueue<E> {
    Object[] items;  //阻塞队列
    private int putindex;  // 添加元素
    private int takeindex; //弹出元素
    private int size; // 有效容量

    public MyBlockingQueue(int capcity){  //初始化容量
        items = new Object[capcity];
    }

    //入队操作
    private synchronized void put(E e) throws InterruptedException {
        while (size == items.length){  //达到上限,需阻塞等待
            wait();
        }
        putindex = (putindex + 1) % items.length;  //存放元素的索引。由于为循环队列,要防止越界
        items[putindex] = e;  //存放元素
        size ++; //有效容量++
        notifyAll(); // 通知
    }

    //出队操作
    private synchronized E take() throws InterruptedException {
        while ( size == 0){  //达到下限,需阻塞等待
            wait();
        }
        takeindex = (takeindex + 1) % items.length;
        size--;
        notifyAll(); // 通知
        return (E) items[takeindex];
    }

    private static int SUM; //产品库存
    //生产者
    private static class Producer implements Runnable{
        @Override
        public void run() {
            SUM += 3;
            System.out.println(Thread.currentThread().getName() + "生产3,库存为:" + SUM);
        }
    }

    //消费者
    private static class Consumer implements Runnable {
        @Override
        public void run() {
            SUM -= 1;
            System.out.println(Thread.currentThread().getName() + "消费1,库存为:" + SUM);
        }
    }
}

本文地址:https://blog.csdn.net/qq_41437542/article/details/109274746