生产者消费者模型理解及java两种方式实现
概念
生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。
生产者生成一定量的数据放到缓冲区中,然后重复此过程;与此同时,消费者也在缓冲区消耗这些数据。
生产者和消费者之间必须保持同步,要保证生产者不会在缓冲区满时放入数据,消费者也不会在缓冲区空时消耗数据。不够完善的解决方法容易出现死锁的情况,此时进程都在等待唤醒
如下图:
为什么要用?
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。
文字看起来很抽象,这里用个简单例子来形容。
比如去自助餐厅吃饭(我发现我所有的例子都是吃),大致有:
- 1.厨师把菜做好,放到自助区 —— 相当于生产者制造数据并且放到缓冲区。
- 2.你把做好的菜拿出来吃掉 —— 相当于消费者从缓冲区拿出数据使用
- 3.如果今天顾客少,自助区的菜放满了,那厨师就不能继续做菜了 —— 相当于缓存区满,生产者需要阻塞等待消费者消费
- 4.如果今天的菜好吃或者你饿的吃光了自助区的菜,别人就没有吃的 —— 相当于缓存区空,消费者需要阻塞等待生产者生产
优点
解耦
假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
支持并发
生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。
使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体(常见并发类型有进程和线程两种)。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。
支持忙闲不均
缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。
生产者消费者模式两种实现方式代码
1.synchronized
、wait()
和notify() /notifyAll()
synchronized
上一篇文章说的很详细,就不再解释了。我们看一下wait()
是干什么的?
1.wait()是Object里面的方法,而不是Thread里面的,这一点很容易搞错。
它的作用是将当前线程置于预执行队列,并在wait()所在的代码处处停止执行,直到接到通知或被中断为止
2.wait()只能在同步代码块或者同步方法中执行,如果调用wait()方法,而没有持有适当的锁,就会抛出异常。
3.wait()方法调用后,当前线程释放锁,线程与其他线程竞争重新获取锁。
notify()
notify方法就是使停止的线程继续运行。
1.notify()也要在同步方法或同步块中调用,该方法是用来通知那些可能等待该对象的对象锁的其它线程,对其发出通知,并使它们重新获取该对象的对象锁。如果有多个线程等待,则线程规划器随机挑选出一个呈wait状态的线程。
2.在notify()方法后,当前线程不会马上释放该对象锁,要等到执行notify()方法的线程将程序执行完,也就是退出同步代码块之后才会释放对象锁。
以上讲解了notify
方法只是唤醒某一个等待线程,那么如果有多个线程都在等待中怎么办呢,这个时候就可以使用notifyAll
方法,可以一次唤醒所有的等待线程
代码:
package Producer_consumer;
//默认生产者:一次生产3个
//默认消费者:一次消费1个
public class Producer_consumer {
//总量:上限100,下限0
public static volatile int SUM;
public static void main(String[] args) {
//启动生产者线程5个
for (int i = 0; i < 5; i++) {
new Thread(new Producer(), "生产者" + i).start();
}
//启动消费者线程5个
for (int i = 0; i < 10; i++) {
new Thread(new Consumer(), "消费者" + i).start();
}
}
//生产者类
private static class Producer implements Runnable {
@Override
public void run() {
try {
//总生产20次
for (int i = 0; i < 20; i++) {
synchronized (Producer_consumer.class) {
while (SUM + 3 > 100) { //下次生产完库存不能大于100,否则等待
Producer_consumer.class.wait();
}
//开始生产
SUM += 3;
//模拟休息
Thread.sleep(100);
//notify/notifyAll通知因调用wait阻塞的线程,notify随机唤醒,notifyAll唤醒全部
//synchronized代码块结束,也就是释放对象锁之后才会唤醒
//也就是说,synchronized代码块结束,wait 和 synchronized阻塞的线程都会被唤醒
Producer_consumer.class.notifyAll();
System.out.println(Thread.currentThread().getName() + "生产3,库存为:" + SUM);
}
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//消费者类
private static class Consumer implements Runnable {
@Override
public void run() {
try {
while (true) {
synchronized (Producer_consumer.class) {
//库存为0,当前线程不能消费,进入阻塞
while (SUM == 0) {
//释放对象锁,让其他线程继续运行,当前线程阻塞
Producer_consumer.class.wait();
}
SUM--;
//模拟休息
Thread.sleep(100);
Producer_consumer.class.notifyAll();
System.out.println(Thread.currentThread().getName() + "消费1,库存为:" + SUM);
}
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
运行结果:
生产者0生产3,库存为:3
消费者9消费1,库存为:2
消费者7消费1,库存为:1
消费者8消费1,库存为:0
生产者4生产3,库存为:3
生产者3生产3,库存为:6
生产者2生产3,库存为:9
生产者1生产3,库存为:12
生产者3生产3,库存为:15
生产者4生产3,库存为:18
消费者0消费1,库存为:17
消费者1消费1,库存为:16
............//太多了 省略结果,好奇的py自己跑一下
2.BlockingQueue阻塞队列方法
BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:
当队列满了的时候进行入队列操作
当队列空了的时候进行出队列操作
因此,当一个线程对已经满了的阻塞队列进行入队操作时会阻塞,除非有另外一个线程进行了出队操作
当一个线程对一个空的阻塞队列进行出队操作时也会阻塞,除非有另外一个线程进行了入队操作
从上可知,阻塞队列是线程安全的。
以下代码为手搓,肯定或多或少存在问题。希望大家及时指出:
package BlockingQueue;
/**
* 实现阻塞式队列:
* 1.满足线程安全的消费,生产功能
* 2.消费/生产达到下限/上限需阻塞等待
*/
public class MyBlockingQueue<E> {
Object[] items; //阻塞队列
private int putindex; // 添加元素
private int takeindex; //弹出元素
private int size; // 有效容量
public MyBlockingQueue(int capcity){ //初始化容量
items = new Object[capcity];
}
//入队操作
private synchronized void put(E e) throws InterruptedException {
while (size == items.length){ //达到上限,需阻塞等待
wait();
}
putindex = (putindex + 1) % items.length; //存放元素的索引。由于为循环队列,要防止越界
items[putindex] = e; //存放元素
size ++; //有效容量++
notifyAll(); // 通知
}
//出队操作
private synchronized E take() throws InterruptedException {
while ( size == 0){ //达到下限,需阻塞等待
wait();
}
takeindex = (takeindex + 1) % items.length;
size--;
notifyAll(); // 通知
return (E) items[takeindex];
}
private static int SUM; //产品库存
//生产者
private static class Producer implements Runnable{
@Override
public void run() {
SUM += 3;
System.out.println(Thread.currentThread().getName() + "生产3,库存为:" + SUM);
}
}
//消费者
private static class Consumer implements Runnable {
@Override
public void run() {
SUM -= 1;
System.out.println(Thread.currentThread().getName() + "消费1,库存为:" + SUM);
}
}
}
本文地址:https://blog.csdn.net/qq_41437542/article/details/109274746