thingkinginjava--第21章 并发(二)
生产者与消费者队列
吐司BlockingQueue
考虑下面的这个使用BlockingQueue的示例,有一台机器具有三个任务:一个制作吐司、一个给吐司抹黄油,另一个在抹过黄油的吐司上涂果酱。
package com.atyouyou.concurrency;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
class Toast {
public enum Status {DRY, BUTTERED, JAMMED}
private Status status = Status.DRY;
private final int id;
public Toast(int id) {
this.id = id;
}
public void butter() {
status = Status.BUTTERED;
}
public void jam() {
status = Status.JAMMED;
}
public Status getStatus() {
return status;
}
public int getId() {
return id;
}
@Override
public String toString() {
return "Toast{" +
"status=" + status +
", id=" + id +
'}';
}
}
class ToastQueue extends LinkedBlockingQueue<Toast> {
};
class Toaster implements Runnable {
private ToastQueue toastQueue;
private int count = 0;
private Random rand = new Random(47);
public Toaster(ToastQueue tq) {
toastQueue = tq;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500));
Toast t = new Toast(count++);
System.out.println(t);
toastQueue.put(t);
}
} catch (InterruptedException e) {
System.out.println("Toast interrupted");
}
System.out.println("Toaster off");
}
}
class Butterer implements Runnable {
ToastQueue dryQueue, butteredQueue;
public Butterer(ToastQueue dryQueue, ToastQueue butteredQueue) {
this.dryQueue = dryQueue;
this.butteredQueue = butteredQueue;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
Toast t = dryQueue.take();
t.butter();
System.out.println(t);
butteredQueue.put(t);
}
} catch (InterruptedException e) {
System.out.println("Butterer interrupt");
}
System.out.println("Butterer off");
}
}
class Jammer implements Runnable {
ToastQueue butteredQueue, finishedQueue;
public Jammer(ToastQueue butteredQueue, ToastQueue finishedQueue) {
this.butteredQueue = butteredQueue;
this.finishedQueue = finishedQueue;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
Toast t = butteredQueue.take();
t.jam();
finishedQueue.put(t);
}
} catch (InterruptedException e) {
System.out.println("Jammer interrupted");
}
System.out.println("Jammer off");
}
}
class Eater implements Runnable {
ToastQueue finishedQueue;
public Eater(ToastQueue finishedQueue) {
this.finishedQueue = finishedQueue;
}
private int counter = 0;
@Override
public void run() {
try {
while (!Thread.interrupted()) {
Toast t = finishedQueue.take();
if (t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) {
System.out.println(">>> error: " + t);
System.exit(1);
} else {
System.out.println("Chomp! " + t);
}
}
} catch (InterruptedException e) {
System.out.println("Eater interrupted");
}
System.out.println("Eater off");
}
}
public class ToastOMatic {
public static void main(String[] args) throws InterruptedException {
ToastQueue dryQueue = new ToastQueue(),
butteredQueue = new ToastQueue(),
finishedQueue = new ToastQueue();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Toaster(dryQueue));
exec.execute(new Butterer(dryQueue, butteredQueue));
exec.execute(new Jammer(butteredQueue, finishedQueue));
exec.execute(new Eater(finishedQueue));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
程序的序列图:
程序理解:
-
制作吐司的流程中使用
LinkedBlockingQueue
来保存不同阶段不同状态的吐司,吐司的状态使用枚举类型来保存{DRY, BUTTERED, JAMMED}
,Toaster类用来生产原始的吐司,并把吐司保存在属性toastQueue
中。构造函数:
public Toaster(ToastQueue tq) { toastQueue = tq; }
-
类
Butterer
的作用是给Toaster
生产出的原始的吐司添加黄油,因此该类中包含有两个LinkedBlockingQueue
类型的属性,分别来保存步骤1中产生的待加工的dryQueue
和加工后butteredQueue
。构造函数:
public Butterer(ToastQueue dryQueue, ToastQueue butteredQueue) { this.dryQueue = dryQueue; this.butteredQueue = butteredQueue; }
-
类
Jammer
的存在合理性与类Butterer
相同,是为了给步骤2中加工后产生的butteredQueue
涂抹”果酱”。构造函数:
public Jammer(ToastQueue butteredQueue, ToastQueue finishedQueue) { this.butteredQueue = butteredQueue; this.finishedQueue = finishedQueue; }
那么我们怎末保证程序一定是按照
生产--->涂抹黄油--->涂抹果酱--->消化掉
的顺序执行的呢?此时就要理解程序中用来保存果酱的LinkedBlockingQueue
了。程序中使用的
ToasterQueue
都是LinkedBlockQueue
的子类:class ToastQueue extends LinkedBlockingQueue<Toast> { }; public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { ... }
简单的理解,
BlockingQueue
是存在于juc下为同步队列实现所提供的接口,同步队列在任何时刻都只允许一个任务插入或移除元素。通常使用的是LinkedBlockingQueue
,它是个*阻塞队列(有界队列ArrayBlokingQueue),可以在创建时指定队列的capacity
,如果不指定的话,则默认是Integer.MAX_VALUE
。LinkedBlockingQueue
的特点:- 是*阻塞队列
- 基于单向链表的FIFO队列
- 访问与移除操作在队头进行,添加操作在队尾进行,并分别使用了不同的锁来进行保护。
- 仍然使用count计数器来判断队空与队满,只不过count的类型是AtomicInteger原子类
/** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
类中含有两个
Node<E>
属性,分别指向队列头部的哨兵节点(head
)和最后一个节点(last
),它不持有有效数据,当队列中有数据时,头结点仍然指向这个哨兵,尾节点指向有效数据的最后一个节点./** * Head of linked list. * Invariant: head.item == null */ transient Node<E> head; /** * Tail of linked list. * Invariant: last.next == null */ private transient Node<E> last;
分析
put(E e)
操作,当执行put时,首先会获得队列的putLock
锁,然后去判断此时队列中的元素的个数即count
的值是否等于了capacity
,如果相等,则程序阻塞,对应于代码片段中的while (count.get() == capacity){notFull.await();}
。public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
分析
take()
操作,当执行take时,会首先去获得队列的takeLock
锁,如果此时队列中的count
值为0,即队列中没有有效数据时,程序将会阻塞
,对应于代码片段中的while (count.get() == 0){notEmpty.await();}
。public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
参考:
程序运行结果:
死锁
产生死锁问题的条件
当以下四个条件同时满足时会产生死锁问题:
- 互斥条件。任务使用的资源中至少有一个是不能共享的。
- 至少有一个任务它必须持有一个资源且正在等待获取一个当前被别的任务持有的资源。
- 资源不能被任务抢占,任务必须把资源释放当作普通事件。
- 必须循环等待,这时,一个任务等待其他任务所持有的资源,后者又在等待另一个任务所持有的资源,这样一直下去,直到有一个任务在等待第一个任务所持有的资源,使得大家都被锁住。
经典例证:哲学家就餐问题
新类库中的构件
Java并发编程:CountDownLatch、CyclicBarrier和 Semaphore
并发工具类(一)等待多线程完成的CountDownLatch
-
CountDownLatch
它被用来同步一个或多个任务,强制它们等待由其他任务执行的一组操作完成。
CountDownLatch
只被触发一次,并且计数值不能重置,如果需要重置,使用CyclicBarrier
。package com.atyouyou.concurrency; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; class TaskPortion implements Runnable{ private static int counter = 0; private final int id = counter++; private static Random rand = new Random(47); private final CountDownLatch latch; TaskPortion(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try { doWork(); latch.countDown(); }catch (InterruptedException e){ } } private void doWork() throws InterruptedException { TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000)); System.out.println(this + " completed"); } @Override public String toString() { return String.format("%1$-3d", id); } } class WaitintTask implements Runnable{ private static int counter = 0; private final int id = counter++; private final CountDownLatch latch; public WaitintTask(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try{ latch.await(); System.out.println("Latch barrier passd for " + this); }catch (InterruptedException e){ System.out.println(this + " interrupted"); } } @Override public String toString() { return String.format("WaitingTask %1$-3d ",id); } } public class CountDownLatchDemo { static final int SIZE = 100; public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); CountDownLatch latch = new CountDownLatch(SIZE);//指定计数值 for (int i = 0; i < 10; i++) { exec.execute(new WaitintTask(latch)); } for (int i = 0; i < SIZE; i++) { exec.execute(new TaskPortion(latch)); } System.out.println("Lanched all tasks"); exec.shutdown(); } }
TashPortion
将随机的休眠一段时间,以模拟这部分工作的完成,而WaitingTask
表示系统中必须等待的部分,它要等待到问题的初始部分完成为止。运行结果:
根据结果可以了解到,
WaitingTask
中的任务在TaskPortion
执行完毕后才会执行。虽然程序中使用了公共的Random对象,并不会存在线程安全的问题,因为
random.nextInt()
是线程安全的 -
CyclicBarrier
CyclicBarrier
适用于这样的情况:你希望创建一组任务,它们并行的执行工作,然后在进行下一步骤之前等待,直至所有的任务都完成(开起来有些像join()
)。它使得所有的并行任务都将在栅栏处列队,因此可以一致的向前冲。这非常像CountDownLatch
,只是CountDownLatch
是只触发一次的事件,而CyclicBarrier
可以多次重用。比如:赛马
-
DelayQueue
这是一个*的
BlockingQueue
,用于放置实现了Delayed
接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期的时间最长。如果没有任何延迟到期,那么就不会由任何头元素,并且poll()
将返回null
。不能将
null
放置到这种队列中 PriorityBlockingQueue
ScheduledExecutor
Semaphore
Exchanger
上一篇: 第 21 章 中介者模式
下一篇: 第21节 包和import语句