生产者和消费者
程序员文章站
2022-07-12 17:41:43
...
wait / notify
wait 会释放锁,但 notify 不会释放锁。
public class MyContainer1<T> {
private final LinkedList<T> list = new LinkedList<>();
// 最多存 10 个元素
private final int MAX = 10;
private int count = 0;
public synchronized void put(T t) {
/**
* 为什么用 while 而不用 if?
* 用 while 的目的是当线程被唤醒的时候再进行一次判断
*/
while (list.size() == MAX) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(t);
count++;
// 通知消费者线程消费
this.notifyAll();
}
public synchronized T get() {
T t = null;
while (list.size() == 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
t = list.removeFirst();
count--;
// 通知生产者线程生产
this.notifyAll();
return t;
}
public static void main(String[] args) {
MyContainer1<String> my = new MyContainer1<>();
// 启动消费者线程
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 5; j++) {
System.out.println(my.get());
}
}, "c" + i).start();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 启动生产者线程
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < 25; j++) {
my.put(Thread.currentThread().getName() + " " + j);
}
}, "p" + i).start();
}
}
}
这种写法有一种缺陷:唤醒线程时无法明确只唤醒生产者线程或消费者线程。
await / signal
await 会释放锁,但 signal 不会释放锁。
public class MyContainer2<T> {
private final LinkedList<T> list = new LinkedList<>();
// 最多存 10 个元素
private final int MAX = 10;
private int count = 0;
private Lock lock = new ReentrantLock();
// Condition 的本质是不同的等待队列
private Condition p = lock.newCondition();
private Condition c = lock.newCondition();
public void put(T t) {
try {
lock.lock();
while (list.size() == MAX) {
p.await();
}
list.add(t);
count++;
// 通知消费者线程消费
c.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public T get() {
T t = null;
try {
lock.lock();
while (list.size() == 0) {
c.await();
}
t = list.removeFirst();
count--;
// 通知生产者线程生产
p.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return t;
}
public static void main(String[] args) {
MyContainer2<String> my = new MyContainer2<>();
// 启动消费者线程
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 5; j++) {
System.out.println(my.get());
}
}, "c" + i).start();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 启动生产者线程
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < 25; j++) {
my.put(Thread.currentThread().getName() + " " + j);
}
}, "p" + i).start();
}
}
}