欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

生产者消费者模式 生产者消费者模式 

程序员文章站 2022-04-23 08:37:45
...

 生产者消费者 环形缓冲模式

 ---------------------------------------------------------

/**

 * 环形缓冲区

 * @author fengbin

 *

 */

public class CircularBuf {

 

   int NMAX = 1000;

 

   int iput = 0; // 环形缓冲区的当前放人位置

 

   int iget = 0; // 缓冲区的当前取出位置

 

   int n = 0; // 环形缓冲区中的元素总数量

 

   Object buffer[];

   

   public CircularBuf() {

       super();

       buffer = new Object[NMAX];

   }

 

   public CircularBuf(int nmax) {

       super();

       NMAX = nmax;

       buffer = new Object[NMAX];

   }

 

   /*

    * 环形缓冲区的地址编号计算函数,,如果到达唤醒缓冲区的尾部,将绕回到头部。

    * 

    * 环形缓冲区的有效地址编号为:0到(NMAX-1)

    * 

    */

   public synchronized int addring(int i) {

       return (i + 1) == NMAX ? 0 : i + 1;

   }

 

   /* 从环形缓冲区中取一个元素 */

   public synchronized Object get() {

       int pos;

       

        System.out.println("------------数量:"+n);

       

           if (n > 0) {

           

               pos = iget;

               iget = addring(iget);

               n--;

               // System.out.println("get-->" + buffer[pos]);

               notifyAll();

               return buffer[pos];

 

           } else {

               // System.out.println("Buffer is Empty");

               try {

                   wait();

               } catch (InterruptedException e) {

                   // TODO Auto-generated catch block

                   e.printStackTrace();

               }

 

           }

       return null;

 

   }

 

   /* 向环形缓冲区中放人一个元素 */

   public synchronized void put(Object z) {

 

           if (n < NMAX) {

               buffer[iput] = z;

               

               System.out.println("put<--" + buffer[iput]);

               

               iput = addring(iput);

               n++;

               notifyAll();

           } else {

                System.out.println("Buffer is full");

               try {

                   wait();

                   put(z);//如果满了后,重新执行

                   System.out.println("rerun !");

               } catch (InterruptedException e) {

                   // TODO Auto-generated catch block

                   e.printStackTrace();

               }

           }

 

   }

 

}

 

 

-----------------------------------------------------------------

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

import org.apache.commons.lang.StringUtils;

 

/**

 * 环形缓冲区 测试

 * @author fengbin

 *

 */

public class CircularBufTest {

 

 

CircularBuf circularBuf = new CircularBuf(2);

 

class Producer implements Runnable{

 

String apple = "";

Producer(String str){

apple = str;

}

 

@Override

public void run() {

for(int j=0;j<5;j++){

circularBuf.put(apple+j);

System.out.println("!生产:"+apple+j+"结束");

}

 

}

}

 

class Consumer implements Runnable{

 

Consumer(){

}

 

@Override

public void run() {

while(true){

String str=(String) circularBuf.get();

if(StringUtils.isEmpty(str)){

continue;

}

System.out.println("#消费:"+str+"结束");

 

try {

Thread.sleep(500);

} catch (Exception e) {

System.out.println(e);

}

}

}

 

 

}

 

public void test() {

 

ExecutorService service = Executors.newCachedThreadPool();

Producer producer = new Producer("P");

Consumer consumer = new Consumer();

service.submit(producer);

service.submit(consumer);

}

 

 

 

public static void main(String[] args) {

 

CircularBufTest test = new CircularBufTest();

test.test();

 

}

 

}

 =================================================

使用阻塞队列实现的生产者和消费者模式.

 

public class TestProducterConsumer {

 

class Producter extends Thread {

Queue q;

 

Producter(Queue q) {

this.q = q;

}

 

public void run() {

for (int i = 0; i < 10; i++) {

q.put(i);

System.out.println("producter :" + i);

}

}

}

 

class Consumer extends Thread{

Queue q;

Consumer(Queue q) {

this.q = q;

}

 

public void run() {

while (true) {

System.out.println("Consumer:" + q.get());

}

}

}

 

class Queue {

int value;

boolean bFull = false;

 

public synchronized void put(int i) {

if (!bFull) {

value = i;

bFull = true;

notify();// 必须用在synchronized

}

try {

wait();// 必须捕获异常

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

 

public synchronized int get() {

if (!bFull)

try {

wait();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

bFull = false;

notify();

return value;

}

 

}

 

public static void main(String[] args) {

TestProducterConsumer con = new TestProducterConsumer();

con.test();

 

}

 

private void test() {

Queue q = new Queue();

Producter p = new Producter(q);

Consumer c = new Consumer(q);

p.start();

c.start();

}

 

}