生产者和消费者
程序员文章站
2022-07-12 17:41:31
...
生产者和消费者:
情况:
一个生产者,一个消费者
1. 一个生产者,一个消费者
1. 属于无法囤积的:
仅使用flag来对中间存储仓的生产和消费进行二选一的执行操作,
当flag为true时,生产可以,消费等待,
生产完了就flag ,唤醒消费者(唤醒线程),flag 取false
当flag为false时,消费可以,生产等待,
消费完了就flag ,唤醒生产者(唤醒线程),flag 取true
生产一个,才消费一个;消费了一个,才能继续生产下一个
2. 属于可以囤积的
逻辑判断仍然在中间存储仓中,值得注意的是:
生产方法:
在大于等于生产上限数目限定下,将生产置于等待(线程等待)
中间做存进存储仓操作
在大于消费下限的限定下,唤醒消费者(唤醒线程)
消费方法:
在小于等于消费下限数目限定下,将消费置于等待(线程等待)
中间做存储仓取出操作
在小于生产上限的限定下,唤醒消费者(唤醒线程)
若干生产者, 若干消费者
2. 若干生产者, 若干消费者
注意:
1. 这里的生产者数目仅仅是生产者类的对象数目,而不是生产者类数目
同理消费者数目也是执消费者类的对象数目
2. 只要使用同一个中间存储仓对象,就能实现若干生产者, 若干消费者
也就是根据一个生产者,一个消费者,多添加新的生产者对象,
或是添加新的消费者对象。将任务提交出去即可,而我们要做的仅仅
是修改主线程代码,添加新的消费者、生产者对象。
要点:
1. 生产与消费的控制交给存储仓来控制,
1. 消费仅仅是消费,生产仅仅是生产,不做总数的逻辑判断
2. 生产的生产方法,消费的消费方法都是借助于中间存储仓类提供
1. 生产者和消费者类都要持有中间仓类对象,并要保证是同一个存储仓对象。
3. 中间存储仓在put,get方法上,要考虑synchronized,且要为生产者和消费者服务,
1. 为生产者和消费者考虑生产优先,总数的逻辑判断,可否囤积等,作出逻辑判断,
根据条件,使用wait()阻塞当前的put,get方法,并且要notify(),不能使某一方一直阻塞状态
一对一的生产消费,生产一个消费一个,无法囤积
生产者:
class Productor implements Callable<Integer>{
private BaoziStore baoziStore;
private Baozi baozi; //用于减少produce方法中的引用
public String name;
public String local;
public int productedNum;
public boolean energetic = true;
public Productor(String name, String local,BaoziStore baoziStore) {
this.name = name;
this.local = local;
this.baoziStore = baoziStore;
}
public void produce() {
baozi = new Baozi("菜包", 1.5);
baoziStore.inBaozis(baozi,this);
productedNum += 1;
}
@Override
public Integer call() throws Exception {
while (energetic) {
Thread.sleep(10);
produce();
}
return Integer.valueOf(productedNum);
}
}
消费者:
class Consumer implements Callable<Integer>{
private BaoziStore baoziStore;
private Baozi baozi; //用于减少produce方法中的引用
public String name;
public String local;
public boolean hungry = true;
public int eatenNum ;
public Consumer(String name, String local,BaoziStore baoziStore) {
this.name = name;
this.local = local;
this.baoziStore = baoziStore;
}
public void consume() throws Exception {
baozi = baoziStore.outBaozis(this);
eatenNum += 1;
}
@Override
public Integer call() throws Exception {
while (hungry) {
Thread.sleep(200);
consume();
}
return Integer.valueOf(eatenNum);
}
}
中介存储仓1:生产消费,生产一个消费一个,无法囤积
class BaoziStore {
private List<Baozi> baozis = new ArrayList<Baozi>();
private boolean flag = true;
//先生产后等待,ture:生产者生产,消费者等待,
//false:消费者消费,生产者等待
public synchronized void inBaozis(Baozi baozi,Productor productor) {
if(!flag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//生产
baozis.add(baozi);
System.out.println(productor.local + " "+
productor.name +" 生产了 "+baozi+" "+getBaozisSize());
//生产完了就flag ,唤醒消费者(唤醒线程),flag 取false
this.notify();
this.flag = false;
}
public synchronized Baozi outBaozis(Consumer consumer) {
if(flag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//消费
Baozi baozi = baozis.remove(baozis.size()-1);
System.out.println(consumer.local + " "+
consumer.name +" 消费了 "+baozi+" "+getBaozisSize());
//消费完了就flag ,唤醒生产者(唤醒线程),flag取true
this.notify();
this.flag = true;
return baozi;
}
public int getBaozisSize() {
return baozis.size();
}
}
中介存储仓2:生产消费,存在生产上限,消费下限,可囤积
class BaoziStore {
private List<Baozi> baozis = new ArrayList<Baozi>();
public boolean flag = true;
public synchronized void inBaozis(Baozi baozi,Productor productor) {
if(baozis.size() >= 20) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
baozis.add(baozi);
System.out.println(productor.local + " "+
productor.name +" 生产了 "+baozi+" "+getBaozisSize());
if(baozis.size() > 0) {
this.notify();
}
}
public synchronized Baozi outBaozis(Consumer consumer) {
if(baozis.size() <= 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Baozi baozi = baozis.remove(baozis.size()-1);
System.out.println(consumer.local + " "+
consumer.name +" 消费了 "+baozi+" "+getBaozisSize());
if(baozis.size() < 20) {
this.notify();
}
return baozi;
}
public int getBaozisSize() {
return baozis.size();
}
}
中介产品:
class Baozi {
private String name;
private double price;
public Baozi(String name,double price) {
this.name = name;
this.price = price;
}
@Override
public String toString() {
return "Baozi [name=" + name + ", price=" + price + "]";
}
}
主线程代码:(一对一,无论可否囤积皆可)
boolean flag = true;
BaoziStore baoziStore = new BaoziStore();
Productor productor = new Productor("老张菜包子铺","厦门", baoziStore);
Consumer consumer = new Consumer("唐老板", "酒店", baoziStore);
ExecutorService eService = Executors.newCachedThreadPool();
//提交2个callable实现类的对象,做任务
Future<Integer> productResult = eService.submit(productor);
Future<Integer> consumerResult = eService.submit(consumer);
Thread.sleep(1000);
//当前线程阻塞1秒后置flag为false,且要先于get执行,否则会阻塞。
productor.energetic = false;
consumer.hungry = false;
//2个任务都要做执行完的判断
while(flag) {
if(productResult.isDone() && consumerResult.isDone())
flag = false;
}
int result = productResult.get(1000,TimeUnit.MILLISECONDS);
System.out.println("共生产了: "+result);
System.out.println("共消费了: "+consumerResult.get(1000,TimeUnit.MILLISECONDS));
//对ExectorService类对象,执行shutdown,不再接受新的任务请求
执行awaitTermination,阻止其他任务还继续执行
再判断为false时,执行shutdownNow,停止主动执行的任务和等待的任务
eService.shutdown();
if(!eService.awaitTermination(8*1000, TimeUnit.MILLISECONDS)){
eService.shutdownNow();
}
主线程代码:(若干对若干,无论可否囤积皆可)
boolean flag = true;
BaoziStore baoziStore = new BaoziStore();
Productor productor = new Productor("老张菜包子铺","厦门", baoziStore);
Productor productor1 = new Productor("小白菜包子铺","福建", baoziStore);
Consumer consumer = new Consumer("唐老板", "酒店", baoziStore);
ExecutorService eService = Executors.newCachedThreadPool();
//提交3个callable实现类的对象,做任务
Future<Integer> productResult = eService.submit(productor);
Future<Integer> consumerResult = eService.submit(consumer);
Future<Integer> productResult1 = eService.submit(productor1);
//当前线程阻塞1秒后置flag为false,且要先于get执行,否则会阻塞。
Thread.sleep(1000);
productor.energetic = false;
productor1.energetic = false;
consumer.hungry = false;
//3个任务都要做执行完的判断
while(flag) {
if(productResult.isDone() && consumerResult.isDone()&& productResult1.isDone())
flag = false;
}
int result = productResult.get(1000,TimeUnit.MILLISECONDS)+productResult1.get(1000,TimeUnit.MILLISECONDS);
System.out.println("共生产了: "+result);
System.out.println("共消费了: "+consumerResult.get(1000,TimeUnit.MILLISECONDS));
//对ExectorService类对象,执行shutdown,不再接受新的任务请求
执行awaitTermination,阻止其他任务还继续执行
再判断为false时,执行shutdownNow,停止主动执行的任务和等待的任务
eService.shutdown();
if(!eService.awaitTermination(8*1000, TimeUnit.MILLISECONDS)){
eService.shutdownNow();
}