多线程之生产者消费者模型
程序员文章站
2022-03-27 12:40:52
- 生产者消费者模型常见于消息队列中,由一个或多个线程进行消息的生成并放入到队列当中,根据先到先消费的策略,由消费者从队列中取出消息并进行消费,所以一般生产的消息从队尾加入,需要消费的消息从队头取出,队列的消息存满了就需要生产者进行等待,队列的消息为空时需要消费者进行等待:代码实现:在消费者等待生产时采用Object的wait()方法,进行有条件的等待while,一旦不满足就唤醒全部的生产者同样等待队列的元素在有位置的情形进行生产,队列满了就需要该线程进入休眠并有条件的......
- 生产者消费者模型常见于消息队列中,由一个或多个线程进行消息的生成并放入到队列当中,根据先到先消费的策略,由消费者从队列中取出消息并进行消费,所以一般生产的消息从队尾加入,需要消费的消息从队头取出,队列的消息存满了就需要生产者进行等待,队列的消息为空时需要消费者进行等待:
- 代码实现:在消费者等待生产时采用Object的wait()方法,进行有条件的等待while,一旦不满足就唤醒全部的生产者同样等待队列的元素在有位置的情形进行生产,队列满了就需要该线程进入休眠并有条件的唤醒。
package com.thread.text;
/*
* @Author sds
* @Description //生产者消费者模型
*/
import java.util.LinkedList;
import static java.lang.Thread.sleep;
public class CuotomerandProducer {
public static void main(String[] args) {
MessageMueue messageMQ = new MessageMueue(2);
//使用lambam表达式的方式来创建线程——new Thread(() ->{}).start();
//创建三个生产者线程
for(int i = 1;i<5;i++){
int id = i;
new Thread(() ->{
//重写run方法
messageMQ.putMess(new Message(id,"值"+id));
System.out.print("生产者线程"+id+"生产消息");
},"生产者线程"+i).start();
}
//创建1个消费者线程
new Thread(() ->{
while(true) { //持续等待消费
try {
sleep(1); //休眠1s后进行消费
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("消费者线程" + 1);
messageMQ.getMess();
}
}).start();
}
//声明容器和生产消费方法来构建生产者消费者模型
static class MessageMueue{
//使用双向链表来实现队列
private LinkedList<Message> queue = new LinkedList<>();
private int capiticy; //队列的容量大小
public MessageMueue(int capiticy) {
this.capiticy = capiticy;
}
//获取消息
public Message getMess(){
//使用while 条件与wait的等待,从而使线程有条件唤醒
//并使用synchronized来保证线程安全
synchronized (queue){
while(queue.isEmpty()){ //当队列空等待
System.out.println(" 队列为空,请等待");
try {
queue.wait(); //释放锁对象的休眠
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message res= queue.removeFirst();//从队首取出
System.out.println(" 唤醒后已有消费消息"+res);
queue.notifyAll(); //队列被消费了,有位置,需要唤醒线程
return res;
}
}
//生产消息
public void putMess(Message message){
//使用while 条件与wait的等待,从而使线程有条件唤醒
//并使用synchronized来保证线程安全
synchronized (queue){
while(queue.size()==capiticy){ //当队列满生产等待
System.out.println(" 队列已满,请等待");
try {
queue.wait(); //释放锁对象的休眠
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(message); //从队尾添加
//队列有消息了,需要唤醒线程
System.out.println("唤醒后已生产消息"+message);
queue.notifyAll();
}
}
}
//声明消息类,生产和消费的对象
static final class Message{
//声明为final,只有get方法,来保证线程安全
private int id;
private Object value;
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public Object getValue() {
return value;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
}
}
本文地址:https://blog.csdn.net/sdshdutx/article/details/107393498