生产者消费者模式 生产者消费者模式
生产者消费者 环形缓冲模式
---------------------------------------------------------
/**
* 环形缓冲区
* @author fengbin
*
*/
public class CircularBuf {
int NMAX = 1000;
int iput = 0; // 环形缓冲区的当前放人位置
int iget = 0; // 缓冲区的当前取出位置
int n = 0; // 环形缓冲区中的元素总数量
Object buffer[];
public CircularBuf() {
super();
buffer = new Object[NMAX];
}
public CircularBuf(int nmax) {
super();
NMAX = nmax;
buffer = new Object[NMAX];
}
/*
* 环形缓冲区的地址编号计算函数,,如果到达唤醒缓冲区的尾部,将绕回到头部。
*
* 环形缓冲区的有效地址编号为:0到(NMAX-1)
*
*/
public synchronized int addring(int i) {
return (i + 1) == NMAX ? 0 : i + 1;
}
/* 从环形缓冲区中取一个元素 */
public synchronized Object get() {
int pos;
System.out.println("------------数量:"+n);
if (n > 0) {
pos = iget;
iget = addring(iget);
n--;
// System.out.println("get-->" + buffer[pos]);
notifyAll();
return buffer[pos];
} else {
// System.out.println("Buffer is Empty");
try {
wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return null;
}
/* 向环形缓冲区中放人一个元素 */
public synchronized void put(Object z) {
if (n < NMAX) {
buffer[iput] = z;
System.out.println("put<--" + buffer[iput]);
iput = addring(iput);
n++;
notifyAll();
} else {
System.out.println("Buffer is full");
try {
wait();
put(z);//如果满了后,重新执行
System.out.println("rerun !");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
-----------------------------------------------------------------
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang.StringUtils;
/**
* 环形缓冲区 测试
* @author fengbin
*
*/
public class CircularBufTest {
CircularBuf circularBuf = new CircularBuf(2);
class Producer implements Runnable{
String apple = "";
Producer(String str){
apple = str;
}
@Override
public void run() {
for(int j=0;j<5;j++){
circularBuf.put(apple+j);
System.out.println("!生产:"+apple+j+"结束");
}
}
}
class Consumer implements Runnable{
Consumer(){
}
@Override
public void run() {
while(true){
String str=(String) circularBuf.get();
if(StringUtils.isEmpty(str)){
continue;
}
System.out.println("#消费:"+str+"结束");
try {
Thread.sleep(500);
} catch (Exception e) {
System.out.println(e);
}
}
}
}
public void test() {
ExecutorService service = Executors.newCachedThreadPool();
Producer producer = new Producer("P");
Consumer consumer = new Consumer();
service.submit(producer);
service.submit(consumer);
}
public static void main(String[] args) {
CircularBufTest test = new CircularBufTest();
test.test();
}
}
=================================================
使用阻塞队列实现的生产者和消费者模式.
public class TestProducterConsumer {
class Producter extends Thread {
Queue q;
Producter(Queue q) {
this.q = q;
}
public void run() {
for (int i = 0; i < 10; i++) {
q.put(i);
System.out.println("producter :" + i);
}
}
}
class Consumer extends Thread{
Queue q;
Consumer(Queue q) {
this.q = q;
}
public void run() {
while (true) {
System.out.println("Consumer:" + q.get());
}
}
}
class Queue {
int value;
boolean bFull = false;
public synchronized void put(int i) {
if (!bFull) {
value = i;
bFull = true;
notify();// 必须用在synchronized
}
try {
wait();// 必须捕获异常
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public synchronized int get() {
if (!bFull)
try {
wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
bFull = false;
notify();
return value;
}
}
public static void main(String[] args) {
TestProducterConsumer con = new TestProducterConsumer();
con.test();
}
private void test() {
Queue q = new Queue();
Producter p = new Producter(q);
Consumer c = new Consumer(q);
p.start();
c.start();
}
}