欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

生产者和消费者

程序员文章站 2022-07-12 17:41:31
...

生产者和消费者:

情况:

一个生产者,一个消费者

1. 一个生产者,一个消费者
	1. 属于无法囤积的:
		仅使用flag来对中间存储仓的生产和消费进行二选一的执行操作,
		当flag为true时,生产可以,消费等待,
			生产完了就flag ,唤醒消费者(唤醒线程),flag 取false
		当flag为false时,消费可以,生产等待,
			消费完了就flag ,唤醒生产者(唤醒线程),flag 取true
		生产一个,才消费一个;消费了一个,才能继续生产下一个

	2. 属于可以囤积的
		逻辑判断仍然在中间存储仓中,值得注意的是:
		生产方法:
			在大于等于生产上限数目限定下,将生产置于等待(线程等待)
			中间做存进存储仓操作
			在大于消费下限的限定下,唤醒消费者(唤醒线程)	
		消费方法:
			在小于等于消费下限数目限定下,将消费置于等待(线程等待)
			中间做存储仓取出操作
			在小于生产上限的限定下,唤醒消费者(唤醒线程)

若干生产者, 若干消费者

2. 若干生产者, 若干消费者
	注意:
	1. 这里的生产者数目仅仅是生产者类的对象数目,而不是生产者类数目
		同理消费者数目也是执消费者类的对象数目
	2. 只要使用同一个中间存储仓对象,就能实现若干生产者, 若干消费者
		也就是根据一个生产者,一个消费者,多添加新的生产者对象,
		或是添加新的消费者对象。将任务提交出去即可,而我们要做的仅仅
		是修改主线程代码,添加新的消费者、生产者对象。

要点:

1. 生产与消费的控制交给存储仓来控制,
	1. 消费仅仅是消费,生产仅仅是生产,不做总数的逻辑判断
2. 生产的生产方法,消费的消费方法都是借助于中间存储仓类提供
	1. 生产者和消费者类都要持有中间仓类对象,并要保证是同一个存储仓对象。
3. 中间存储仓在put,get方法上,要考虑synchronized,且要为生产者和消费者服务,
	1. 为生产者和消费者考虑生产优先,总数的逻辑判断,可否囤积等,作出逻辑判断,
	根据条件,使用wait()阻塞当前的put,get方法,并且要notify(),不能使某一方一直阻塞状态

一对一的生产消费,生产一个消费一个,无法囤积

生产者:

class Productor implements Callable<Integer>{
	private BaoziStore baoziStore;
	private Baozi baozi; //用于减少produce方法中的引用
	public String name;
	public String local;
	public int productedNum;
	public boolean energetic = true;
	public Productor(String name, String local,BaoziStore baoziStore) {
		this.name = name;
		this.local = local;
		this.baoziStore = baoziStore;
	}
	public void produce() {
		baozi = new Baozi("菜包", 1.5);
		baoziStore.inBaozis(baozi,this);
		productedNum += 1;
		
	}
	@Override
	public Integer call() throws Exception {
		while (energetic) {
			Thread.sleep(10);
			produce();
		}
		return Integer.valueOf(productedNum);
	}
}

消费者:

class Consumer implements Callable<Integer>{
	private BaoziStore baoziStore;
	private Baozi baozi; //用于减少produce方法中的引用
	public String name;
	public String local;
	public boolean hungry = true;
	public int eatenNum ;
	public Consumer(String name, String local,BaoziStore baoziStore) {
		this.name = name;
		this.local = local;
		this.baoziStore = baoziStore;
	}
	public void consume() throws Exception {
		baozi = baoziStore.outBaozis(this);
		eatenNum += 1;	
	}
	@Override
	public Integer call() throws Exception {
		while (hungry) {
			Thread.sleep(200);
			consume();
		}
		return Integer.valueOf(eatenNum);
	}
}

中介存储仓1:生产消费,生产一个消费一个,无法囤积

class BaoziStore {
	private List<Baozi> baozis = new ArrayList<Baozi>();
	private boolean flag = true; 
	//先生产后等待,ture:生产者生产,消费者等待,
	//false:消费者消费,生产者等待
	public synchronized void inBaozis(Baozi baozi,Productor productor) {
		if(!flag) {
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
		//生产
		baozis.add(baozi);
		System.out.println(productor.local + " "+ 
		productor.name +" 生产了 "+baozi+" "+getBaozisSize());
		
		//生产完了就flag ,唤醒消费者(唤醒线程),flag 取false
		this.notify();
		this.flag = false;
	}
	public synchronized Baozi outBaozis(Consumer consumer) {
		if(flag) {
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		//消费
		Baozi baozi =  baozis.remove(baozis.size()-1);
		System.out.println(consumer.local + " "+ 
		consumer.name +" 消费了 "+baozi+" "+getBaozisSize());
		
		//消费完了就flag ,唤醒生产者(唤醒线程),flag取true	
		this.notify();
		this.flag = true;
		return baozi;
	}
	public int getBaozisSize() {
		return baozis.size();
	}
}

中介存储仓2:生产消费,存在生产上限,消费下限,可囤积

class BaoziStore {
	private List<Baozi> baozis = new ArrayList<Baozi>();
	public boolean flag = true; 
	
	public synchronized void inBaozis(Baozi baozi,Productor productor) {
		if(baozis.size() >= 20) {
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		baozis.add(baozi);
		System.out.println(productor.local + " "+ 
		productor.name +" 生产了 "+baozi+" "+getBaozisSize());
		if(baozis.size() > 0) {
			this.notify();
		}
	}
	public synchronized Baozi outBaozis(Consumer consumer) {
		if(baozis.size() <= 0) {
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		Baozi baozi =  baozis.remove(baozis.size()-1);
		System.out.println(consumer.local + " "+ 
		consumer.name +" 消费了 "+baozi+" "+getBaozisSize());
		if(baozis.size() < 20) {
			this.notify();
		}
		return baozi;
	}
	public int getBaozisSize() {
		return baozis.size();
	}
}

中介产品:

class Baozi {
	private String name;
	private double price;
	public Baozi(String name,double price) {
		this.name = name;
		this.price = price;
	}
	@Override
	public String toString() {
		return "Baozi [name=" + name + ", price=" + price + "]";
	}
}

主线程代码:(一对一,无论可否囤积皆可)

boolean flag = true;
BaoziStore baoziStore = new BaoziStore();
Productor productor = new Productor("老张菜包子铺","厦门", baoziStore);
Consumer consumer = new Consumer("唐老板", "酒店", baoziStore);

ExecutorService eService = Executors.newCachedThreadPool();

//提交2个callable实现类的对象,做任务
Future<Integer> productResult  = eService.submit(productor);
Future<Integer> consumerResult = eService.submit(consumer);

Thread.sleep(1000);  
//当前线程阻塞1秒后置flag为false,且要先于get执行,否则会阻塞。
productor.energetic = false;
consumer.hungry = false;

//2个任务都要做执行完的判断
while(flag) {
	if(productResult.isDone() && consumerResult.isDone())
		flag = false;
}
int result = productResult.get(1000,TimeUnit.MILLISECONDS);	
System.out.println("共生产了: "+result);
System.out.println("共消费了: "+consumerResult.get(1000,TimeUnit.MILLISECONDS));

//对ExectorService类对象,执行shutdown,不再接受新的任务请求
	执行awaitTermination,阻止其他任务还继续执行
	再判断为false时,执行shutdownNow,停止主动执行的任务和等待的任务
eService.shutdown();
if(!eService.awaitTermination(8*1000, TimeUnit.MILLISECONDS)){  
     eService.shutdownNow();  
 }  

主线程代码:(若干对若干,无论可否囤积皆可)

boolean flag = true;
BaoziStore baoziStore = new BaoziStore();
Productor productor = new Productor("老张菜包子铺","厦门", baoziStore);
Productor productor1 = new Productor("小白菜包子铺","福建", baoziStore);
Consumer consumer = new Consumer("唐老板", "酒店", baoziStore);
		
ExecutorService eService = Executors.newCachedThreadPool();
//提交3个callable实现类的对象,做任务
Future<Integer> productResult  = eService.submit(productor);
Future<Integer> consumerResult = eService.submit(consumer);
Future<Integer> productResult1  = eService.submit(productor1);

//当前线程阻塞1秒后置flag为false,且要先于get执行,否则会阻塞。
Thread.sleep(1000);
productor.energetic = false;
productor1.energetic = false;
consumer.hungry = false;


//3个任务都要做执行完的判断
while(flag) {
	if(productResult.isDone() && consumerResult.isDone()&& productResult1.isDone())
		flag = false;
}

int result = productResult.get(1000,TimeUnit.MILLISECONDS)+productResult1.get(1000,TimeUnit.MILLISECONDS);
System.out.println("共生产了: "+result);
System.out.println("共消费了: "+consumerResult.get(1000,TimeUnit.MILLISECONDS));

//对ExectorService类对象,执行shutdown,不再接受新的任务请求
	执行awaitTermination,阻止其他任务还继续执行
	再判断为false时,执行shutdownNow,停止主动执行的任务和等待的任务
eService.shutdown();
if(!eService.awaitTermination(8*1000, TimeUnit.MILLISECONDS)){  
     eService.shutdownNow();  
 }