生产者-消费者模式实现
程序员文章站
2022-07-03 23:03:55
...
生产者是指:生产数据的线程
消费者是指:使用数据的线程
生产者和消费者是不同的线程,他们处理数据的速度是不一样的,一般在二者之间还要加个“桥梁参与者”,用于缓冲二者之间处理数据的速度差。
下面用代码来说明:
//生产者
public class MakerThread extends Thread {
private final Random random;
private final Table table;
private static int id = 0;
public MakerThread(String name, Table table, long seed) {
super(name);
this.table = table;//table就是桥梁参与者
this.random = new Random(seed);
}
public void run() {
try {
while (true) {
Thread.sleep(random.nextInt(1000));//生产数据要耗费时间
//生产数据
String cake = "[ Cake No." + nextId() + " by " + getName() + " ]";
table.put(cake);//将数据存入桥梁参与者
}
} catch (InterruptedException e) {
}
}
private static synchronized int nextId() {
return id++;
}
}
再来看看消费者:
//消费者线程
public classEaterThread extends Thread {
private final Random random;
private final Table table;
public EaterThread(String name, Table table,long seed) {
super(name);
this.table = table;
this.random = new Random(seed);
}
public void run() {
try {
while (true) {
String cake = table.take();//从桥梁参与者中取数据
Thread.sleep(random.nextInt(1000));//消费者消费数据要花时间
}
} catch (InterruptedException e) {
}
}
}
看来在这个模式里table是个很重要的角色啊,让我们来看看他吧(这里只给出个简单的):
public class Table {
private final String[] buffer;
private int tail; //下一个放put(数据)的地方
private int head; //下一个取take(数据)的地方
private int count; // buffer内的数据数量
public Table(int count) {
this.buffer = new String[count];//总量是确定的
this.head = 0;
this.tail = 0;
this.count = 0;
}
// 放置数据
public synchronized void put(String cake) throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " puts " + cake);
while (count >= buffer.length) {//数据放满了就只能等待
wait();
}
buffer[tail] = cake;
tail = (tail + 1) % buffer.length;
count++;
notifyAll();//有数据了,唤醒线程去取数据
}
// 取得数据
public synchronized String take() throws InterruptedException {
while (count <= 0) {//没有数据就只能等待
wait();
}
String cake = buffer[head];
head = (head + 1) % buffer.length;
count--;
notifyAll();//有位置可以放数据了,唤醒线程,不等了
System.out.println(Thread.currentThread().getName() + " takes " + cake);
return cake;
}
}
好了我们来实验吧:
public class Main {
public static void main(String[] args) {
Table table = new Table(3); // 建立可以放置数据的桥梁参与者,3是他所能放置的最大数量的数据。
new MakerThread("MakerThread-1", table, 31415).start();//生产数据
new MakerThread("MakerThread-2", table, 92653).start();
new MakerThread("MakerThread-3", table, 58979).start();
new EaterThread("EaterThread-1", table, 32384).start();//消费数据
new EaterThread("EaterThread-2", table, 62643).start();
new EaterThread("EaterThread-3", table, 38327).start();
}
}
spring视频获取 dubbo视频获取
消费者是指:使用数据的线程
生产者和消费者是不同的线程,他们处理数据的速度是不一样的,一般在二者之间还要加个“桥梁参与者”,用于缓冲二者之间处理数据的速度差。
下面用代码来说明:
//生产者
public class MakerThread extends Thread {
private final Random random;
private final Table table;
private static int id = 0;
public MakerThread(String name, Table table, long seed) {
super(name);
this.table = table;//table就是桥梁参与者
this.random = new Random(seed);
}
public void run() {
try {
while (true) {
Thread.sleep(random.nextInt(1000));//生产数据要耗费时间
//生产数据
String cake = "[ Cake No." + nextId() + " by " + getName() + " ]";
table.put(cake);//将数据存入桥梁参与者
}
} catch (InterruptedException e) {
}
}
private static synchronized int nextId() {
return id++;
}
}
再来看看消费者:
//消费者线程
public classEaterThread extends Thread {
private final Random random;
private final Table table;
public EaterThread(String name, Table table,long seed) {
super(name);
this.table = table;
this.random = new Random(seed);
}
public void run() {
try {
while (true) {
String cake = table.take();//从桥梁参与者中取数据
Thread.sleep(random.nextInt(1000));//消费者消费数据要花时间
}
} catch (InterruptedException e) {
}
}
}
看来在这个模式里table是个很重要的角色啊,让我们来看看他吧(这里只给出个简单的):
public class Table {
private final String[] buffer;
private int tail; //下一个放put(数据)的地方
private int head; //下一个取take(数据)的地方
private int count; // buffer内的数据数量
public Table(int count) {
this.buffer = new String[count];//总量是确定的
this.head = 0;
this.tail = 0;
this.count = 0;
}
// 放置数据
public synchronized void put(String cake) throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " puts " + cake);
while (count >= buffer.length) {//数据放满了就只能等待
wait();
}
buffer[tail] = cake;
tail = (tail + 1) % buffer.length;
count++;
notifyAll();//有数据了,唤醒线程去取数据
}
// 取得数据
public synchronized String take() throws InterruptedException {
while (count <= 0) {//没有数据就只能等待
wait();
}
String cake = buffer[head];
head = (head + 1) % buffer.length;
count--;
notifyAll();//有位置可以放数据了,唤醒线程,不等了
System.out.println(Thread.currentThread().getName() + " takes " + cake);
return cake;
}
}
好了我们来实验吧:
public class Main {
public static void main(String[] args) {
Table table = new Table(3); // 建立可以放置数据的桥梁参与者,3是他所能放置的最大数量的数据。
new MakerThread("MakerThread-1", table, 31415).start();//生产数据
new MakerThread("MakerThread-2", table, 92653).start();
new MakerThread("MakerThread-3", table, 58979).start();
new EaterThread("EaterThread-1", table, 32384).start();//消费数据
new EaterThread("EaterThread-2", table, 62643).start();
new EaterThread("EaterThread-3", table, 38327).start();
}
}
spring视频获取 dubbo视频获取
上一篇: 队列,并非想象的那样简单
下一篇: 域对象 & 面向对象 & 结构化编程