生产者-消费者模型进阶
生产者与消费者模型是多线程环境下经典问题之一。生产者与消费者共同操作共享资源(一般用队列存储),生产者与消费者应该保证以下几点:
1、同一时间内只能有一个生产者生产(生产方法加锁sychronized)
2、同一时间内只能有一个消费者消费(消费方法加锁sychronized)
3、生产者生产的同时消费者不能消费(生产方法加锁sychronized)
4、消费者消费的同时生产者不能生产(消费方法加锁sychronized)
5、共享空间空时消费者不能继续消费(消费前循环判断是否为空,空的话将该线程wait,释放锁允许其他同步方法执行)
6、共享空间满时生产者不能继续生产(生产前循环判断是否为满,满的话将该线程wait,释放锁允许其他同步方法执行)
生产者-消费者模型V1.0版
public class ProducerConsumer {
public static void main(String[] args) {
StackBasket s = new StackBasket();
Producer p = new Producer(s);
Consumer c = new Consumer(s);
Thread tp = new Thread(p);
Thread tc = new Thread(c);
tp.start();
tc.start();
}
}
/**
* 馒头,也可以是整形数据,这里用自定义对象
* */
class Mantou
{
private int id;
Mantou(int id){
this.id = id;
}
public String toString(){
return "馒头ID:" + id;
}
}
/**
* 共享栈空间 仓库,用来存储馒头
* */
class StackBasket
{
Queue<Mantou> sm = new LinkedList<Mantou>();
int index = 6;
/**
* show 生产方法.
* show 该方法为同步方法,持有方法锁;
* show 首先循环判断满否,满的话使该线程等待,释放同步方法锁,允许消费;
* show 当不满时首先唤醒正在等待的消费方法,但是也只能让其进入就绪状态,
* show 等生产结束释放同步方法锁后消费才能持有该锁进行消费
* @param m 元素
* @return 没有返回值
*/
public synchronized void push(Mantou m){
try{
while(index == sm.size()){
System.out.println("!!!!!!!!!生产满了!!!!!!!!!");
this.wait();
}
this.notify();
}catch(InterruptedException e){
e.printStackTrace();
}catch(IllegalMonitorStateException e){
e.printStackTrace();
}
//此处可执行逻辑操作,如从数据库取出数据
sm.offer(m);
System.out.println("生产了:" + m + " 仓库有:" + sm.size() + "个馒头");
}
/**
* show 消费方法
* show 该方法为同步方法,持有方法锁
* show 首先循环判断空否,空的话使该线程等待,释放同步方法锁,允许生产;
* show 当不空时首先唤醒正在等待的生产方法,但是也只能让其进入就绪状态
* show 等消费结束释放同步方法锁后生产才能持有该锁进行生产
* @return 没有返回值
*/
public synchronized Mantou pop(){
try{
while(sm.size() == 0){
System.out.println("!!!!!!!!!消费光了!!!!!!!!!");
this.wait();
}
this.notify();
}catch(InterruptedException e){
e.printStackTrace();
}catch(IllegalMonitorStateException e){
e.printStackTrace();
}
Mantou mantou=sm.poll();
System.out.println("消费了:" + mantou + " 仓库有:" + sm.size() + "个馒头");
return mantou;
}
}
/**
* 生产者
* */
class Producer implements Runnable
{
StackBasket ss = new StackBasket();
Producer(StackBasket ss){
this.ss = ss;
}
/**
* show 生产进程.
*/
public void run(){
for(int i = 0;i < 20;i++){
Mantou m = new Mantou(i);
ss.push(m);
try{
Thread.sleep((int)(Math.random()*500));
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
}
这里对栈空间的入栈和出栈方法进行了加锁,使得入栈和出栈过程成为一个原子操作,不会被打断。当队列满时,入栈方法就会阻塞,并循环判断是否栈满,出栈方法也是如此。
生产者-消费者模型V2.0版
class ShareData{
private int number=0;
private Lock lock=new ReentrantLock();
private Condition condition=lock.newCondition();
public void increment() throws InterruptedException {
lock.lock();
try{
//判断
while(number!=0){
//等待,不能生产
condition.await();
}
//2、干活
number++;
System.out.println(Thread.currentThread().getName()+"\t"+number);
//通知唤醒
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void decrease() throws InterruptedException {
lock.lock();
try{
//判断
while(number==0){
//等待,不能生产
condition.await();
}
//2、干活
number--;
System.out.println(Thread.currentThread().getName()+"\t"+number);
//通知唤醒
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
public class ProduConsumer_TraditionDemo {
public static void main(String[] args) {
ShareData shareData=new ShareData();
new Thread(()->{
for(int i=0;i<5;i++){
try {
shareData.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"AAA").start();
new Thread(()->{
for(int i=0;i<5;i++){
try {
shareData.decrease();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"BBB").start();
}
}
这里用到了ReentrantLock为线程进行加锁,是JDK1.5之后在JUC中加入的锁,比synchronized操作线程更加灵活。
生产者-消费者模型V3.0版——阻塞队列版BlockingQueue
class MyResource{
private volatile boolean FLAG=true;//默认开户,进行生产+消费
private AtomicInteger atomicInteger=new AtomicInteger();
BlockingQueue<String> blockingQueue=null;
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
public void myProd() throws Exception{
String data=null;
boolean retValue;
while(FLAG){
data=atomicInteger.incrementAndGet()+"";
retValue=blockingQueue.offer(data,2L, TimeUnit.SECONDS);
if(retValue){
System.out.println(Thread.currentThread().getName()+"\t 插入队列"+data+"成功");
}else{
System.out.println(Thread.currentThread().getName()+"\t 插入队列"+data+"失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName()+"\t大老板叫停了,表示FLAG=false,生产动作结束了");
}
public void myConsumer()throws Exception{
String result=null;
while(FLAG){
result=blockingQueue.poll(2L,TimeUnit.SECONDS);
if(null==result || result.equalsIgnoreCase("")){
FLAG=false;
System.out.println(Thread.currentThread().getName()+"\t 超过2秒没有取到蛋糕,消费退出");
System.out.println();
System.out.println();
return;
}
System.out.println(Thread.currentThread().getName()+"\t 消费队列"+result+"成功");
}
}
public void stop(){
this.FLAG=false;
}
}
public class ProdConsumer_BlockQueueDemo {
public static void main(String[] args) {
MyResource myResource=new MyResource(new ArrayBlockingQueue<String>(10));
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t 生产线程启动");
try {
myResource.myProd();
} catch (Exception e) {
e.printStackTrace();
}
},"myProd").start();
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t 消费线程启动");
try {
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
},"myConsumer").start();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"\t 五秒钟时间到,更改flag");
myResource.stop();
}
}
这里涉及几个知识:volatile关键字,AtomicInteger原子整形,BlockingQueue阻塞队列。这里简要说明一下:
volatile关键字:在JMM内存模型中每一个线程都会将主内存中的数据进行一次拷贝到该线程的工作内存中去,但是由于CPU繁忙,当线程的工作内存中的数据改变时,不会立即同步到主内存中去(cpu会利用空余时间进行同步)。此时volatile的意思就是强制同步,一旦线程的工作内存中的共享变量的值发生改变,就会被立即写回到主内存,使其它线程可见。其次它还有防止指令重排的作用。上边代码的stop()方法就是改变了volatile修饰的FLAG从而使程序停下来。
AtomicInteger原子整形:底层使用技术为unsafe类中的CAS(compare and swap,比较并交换),这是一种不同于加锁的同步方式,可以保证系统的并发性,缺点是当大量存在CAS时,系统开销将会很大。以后将会专门写这方面的博文。
BlockingQueue阻塞队列:一个接口,有众多实现类,具体特点是队满阻塞。
参考:
上一篇: Phone List POJ - 3630(trie)
下一篇: idea手动添加jar包