第21章并发(二)
21.4.4 检查中断
1、当在线程上调用interrupt()时,中断发生的唯一时刻是在任务要进入到阻塞操作中,或者已经在阻塞操作内部时。
2、可以通过调用interruped()来检查中断状态,不仅可以知道interrupt()是否被调用过,而且还可以清楚中断状态。清楚中断状态可以确保并发结构不会就某个任务被中断这个问题通知两次。
package com21下;
import java.util.concurrent.TimeUnit;
/**
* Created by Panda on 2018/5/24.
*/
class NeedsCleanup{
private final int id;
public NeedsCleanup(int ident){
id=ident;
System.out.println("NeedsCleanup"+id);
}
public void cleanup(){
System.out.println("Cleaning up "+id);
}
}
class Blocked3 implements Runnable{
private volatile double d=0.0;
@Override
public void run() {
try{
while (!Thread.interrupted()){
NeedsCleanup needsCleanup = new NeedsCleanup(1);
try{
System.out.println("sleeping");
TimeUnit.SECONDS.sleep(1);
NeedsCleanup needsCleanup1 = new NeedsCleanup(2);
try{
System.out.println("Calculating");
for (int i = 0; i <2500000 ; i++) {
d=d+(Math.PI+Math.E)/d;
System.out.println("Finished time-consuming operation");
}
}finally {
needsCleanup1.cleanup();
}
}finally {
needsCleanup.cleanup();
}
}
System.out.println("Exiting via InterruptedException");
}catch (InterruptedException e){
System.out.println("Exiting ia InterruptedException");
}
}
}
public class InterruptingIdiom {
public static void main(String[] args) throws Exception{
if(args.length!=1){
System.out.println("usae:java Interrupting Idiom delay-in-mS");
System.exit(1);
}
Thread thread = new Thread(new Blocked3());
thread.start();
TimeUnit.SECONDS.sleep(new Integer(args[0]));
thread.interrupt();
}
}
21.5 线程之间的协作
1、当使用线程来同时运行多个任务时,可以通过使用锁(互斥)来同步两个任务的行为,从而使得一个任务不会干涉另一个任务的资源。也就是说,如果两个任务在交替着不如某项共享资源(通常是内存),可以使用互斥来使得每个时刻只有一个任务访问这项资源。
21.5.1wait()和notifyAll()
1、wait()等待可以等待某个条件发生变化,而改变这个条件超出了当前方法的控制能力。
2、忙等待:不断地进行空循环,通常是一种不良的CPU周期使用方式。wait()会在等待外部世界产生变化的时候将任务挂起,并且只有notify()和notifyAll()发生时,即表示发生了某些感兴趣的事物,这个任务才会被唤醒并去检查所发生的变化。因此,wait()提供了一种在任务之间对活动同步的方式。
3、与sleep()不同的是:对于wait()而言:
①在wait()期间对象锁是释放的。
②可以通过notify()notifyAll(),或者令时间到期,从wait()中恢复。
4、
package com21下;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Created by Panda on 2018/5/24.
*/
class Car{
private boolean waxOn=false;
public synchronized void waxed(){
waxOn=true;
notifyAll();
}
public synchronized void buffed(){
waxOn=false;
notifyAll();
}
public synchronized void waitForWaxing() throws InterruptedException{
while (waxOn==false) wait();
}
public synchronized void waitForBuffing() throws InterruptedException{
while (waxOn==true) wait();
}
}
class WaxOn implements Runnable{
private Car car;
public WaxOn(Car car) {
this.car = car;
}
@Override
public void run() {
try{
while (!Thread.interrupted()){
System.out.println("Wax On!");
TimeUnit.SECONDS.sleep(200);
car.waxed();
car.waitForBuffing();
}
}catch (InterruptedException e){
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax On task");
}
}
class WaxOff implements Runnable{
private Car car;
public WaxOff(Car car) {
this.car = car;
}
@Override
public void run() {
try{
while (!Thread.interrupted()){
car.waitForWaxing();
System.out.println("Wax Off");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
}catch (InterruptedException e){
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax Off task");
}
}
public class WaxOMatic {
public static void main(String[] args) throws Exception{
Car car = new Car();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(new WaxOff(car));
executorService.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(5);
executorService.shutdownNow();
}
}
5、错失的信号
21.5.2notify()和notifyAll()
package com21下;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Created by Panda on 2018/5/24.
*/
class Blocker{
synchronized void waitingCall(){
try{
while (!Thread.interrupted()){
wait();
System.out.println(Thread.currentThread()+" ");
}
}catch (InterruptedException e){
}
}
synchronized void prod(){notify();}
synchronized void prodAll(){notifyAll();}
}
class Task implements Runnable{
static Blocker blocker =new Blocker();
@Override
public void run() {
blocker.waitingCall();
}
}
class Task2 implements Runnable{
static Blocker blocker = new Blocker();
@Override
public void run() {
blocker.waitingCall();
}
}
public class NotifyVsNotifyAll {
public static void main(String[] args) throws Exception{
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i <5 ; i++) {
executorService.execute(new Task());
}
executorService.execute(new Task2());
Timer timer=new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
boolean prod=true;
@Override
public void run() {
if(prod){
System.out.println("\n notify()");
Task.blocker.prod();
prod=false;
}else {
System.out.println("\n notify()");
Task.blocker.prodAll();
prod=true;
}
}
},400,400);
TimeUnit.SECONDS.sleep(5);
timer.cancel();
System.out.println("\n Timer canceled");
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("Task2.block.prodAll()");
Task.blocker.prodAll();
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("\n Shutting down");
executorService.shutdownNow();
}
}
21.5.3 生产者与消费者
1、
package com21下;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Created by Panda on 2018/5/24.
*/
class Meal{
private final int orderNum;
public Meal(int orderNum) {
this.orderNum = orderNum;
}
public String toString(){
return "Meal"+orderNum;
}
}
class WaitPerson implements Runnable{
private Restaurant restaurant;
public WaitPerson(Restaurant restaurant) {
this.restaurant = restaurant;
}
@Override
public void run() {
try{
while (!Thread.interrupted()){
synchronized (this){
while (restaurant.meal==null)
wait();
}
System.out.println("Waitperson got: "+restaurant.meal);
synchronized (restaurant.chef){
restaurant.meal=null;
restaurant.chef.notifyAll();
}
}
}catch (InterruptedException e){
System.out.println("waitperson interrupted");
}
}
}
class Chef implements Runnable{
private Restaurant restaurant;
private int count=0;
public Chef(Restaurant restaurant) {
this.restaurant = restaurant;
}
@Override
public void run() {
try{
while (!Thread.interrupted()){
synchronized (this){
while (restaurant.meal!=null){
wait();
}
}
if(++count==10){
System.out.println("Out of food,closing");
restaurant.executorService.shutdownNow();
}
System.out.println("order up!");
synchronized (restaurant.waitPerson){
restaurant.meal=new Meal(count);
restaurant.waitPerson.notifyAll();
}
TimeUnit.MILLISECONDS.sleep(100);
}
}catch (InterruptedException e){
System.out.println("Chef interrupted");
}
}
}
public class Restaurant {
Meal meal;
ExecutorService executorService = Executors.newCachedThreadPool();
WaitPerson waitPerson = new WaitPerson(this);
Chef chef = new Chef(this);
public Restaurant() {
executorService.execute(chef);
executorService.execute(waitPerson);
}
public static void main(String[] args) {
new Restaurant();
}
}
2、使用显示的Lock和Condition对象
21.5.4生产者-消费者与队列
1、使用同步队列解决任务协作问题,同步队列在任何时刻只允许一个任务插入或者删除元素。如果消费者任务试图从队列中获取对象,而该队列此时为空,那么这些队列还可以挂起消费者任务,并且当有更多的元素可用时恢复消费者任务。
2、吐司BlockingQueue
package com21下;
import java.util.Random;
import java.util.concurrent.*;
/**
* Created by Panda on 2018/5/24.
*/
class Toast{
public enum Status{DRY,BUTTERED,JAMMED}
private Status status = Status.DRY;
private final int id;
public Toast(int id) {
this.id = id;
}
public void butter(){status=Status.BUTTERED;}
public void jam(){status=Status.JAMMED;}
public Status getStatus(){return status;}
public int getId(){return id;}
public String toString(){return "Toast "+id+": "+status;}
}
class ToastQueue extends LinkedBlockingQueue<Toast>{}
class Toaster implements Runnable{
private ToastQueue toastQueue;
private int count=0;
private Random random=new Random(47);
public Toaster(ToastQueue toastQueue) {
this.toastQueue = toastQueue;
}
@Override
public void run() {
try{
while (!Thread.interrupted()){
TimeUnit.MILLISECONDS.sleep(100+random.nextInt(500));
Toast toast = new Toast(count++);
System.out.println(toast);
toastQueue.put(toast);
}
}catch (InterruptedException e){
System.out.println("Toaster interrupted");
}
System.out.println("Toaster off");
}
}
class Butterer implements Runnable{
private ToastQueue dryQueue,butteredQueue;
public Butterer(ToastQueue dryQueue, ToastQueue butteredQueue) {
this.dryQueue = dryQueue;
this.butteredQueue = butteredQueue;
}
@Override
public void run() {
try{
while (!Thread.interrupted()){
Toast toast = dryQueue.take();
toast.butter();
System.out.println(toast);
butteredQueue.put(toast);
}
}catch (InterruptedException e){
System.out.println("Butterer interrupted");
}
System.out.println("Butterer off");
}
}
class Jammer implements Runnable{
private ToastQueue butteredQueue,finishedQueue;
public Jammer(ToastQueue butteredQueue, ToastQueue finishedQueue) {
this.butteredQueue = butteredQueue;
this.finishedQueue = finishedQueue;
}
@Override
public void run() {
try{
while (!Thread.interrupted()){
Toast toast =butteredQueue.take();
toast.jam();
System.out.println(toast);
finishedQueue.put(toast);
}
}catch (InterruptedException e){
System.out.println("Jammer interrupted");
}
}
}
class Easter implements Runnable{
private ToastQueue finishedQueue;
private int counter=0;
public Easter(ToastQueue finishedQueue) {
this.finishedQueue = finishedQueue;
}
@Override
public void run() {
try{
while (!Thread.interrupted()){
Toast toast = finishedQueue.take();
if(toast.getId()!=counter++||toast.getStatus()!=Toast.Status.JAMMED){
System.out.println(">>>>Error: "+toast);
System.exit(1);
}else {
System.out.println("Chomp! "+toast);
}
}
}catch (InterruptedException e){
System.out.println("Easter interruptedException");
}
System.out.println("Easter off");
}
}
public class ToastOMatic {
public static void main(String[] args) throws Exception {
ToastQueue dryQueue=new ToastQueue();
ToastQueue butteredQueue = new ToastQueue();
ToastQueue finishedQueue=new ToastQueue();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(new Toaster(dryQueue));
executorService.execute(new Butterer(dryQueue,butteredQueue));
executorService.execute(new Jammer(butteredQueue,finishedQueue));
executorService.execute(new Easter(finishedQueue));
TimeUnit.SECONDS.sleep(5);
executorService.shutdownNow();
}
}
21.5.5任务间使用管道进行输入输出
1、通过输入/输出在线程间进行通信。提供线程的类库以“管道”的形式对线程间的输入/输出提供了支持。在Java输入/输出类库中的对应物就是PipedWriter类和PipedReader类。管道基于一个阻塞队列。
package com21下;
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Created by Panda on 2018/5/24.
*/
class Sender implements Runnable{
private Random random = new Random(47);
private PipedWriter out=new PipedWriter();
public PipedWriter getPipedWriter(){return out;}
@Override
public void run() {
try{
while (true){
for(char c='A';c<='z';c++){
out.write(c);
TimeUnit.MILLISECONDS.sleep(random.nextInt(500));
}
}
}catch (IOException e){
System.out.println(e+"Sender writer exception");
}catch (InterruptedException e){
System.out.println(e+"Sender sleep interrupted");
}
}
}
class Receiver implements Runnable{
private PipedReader pipedReader;
public Receiver(Sender sender ) throws Exception{
pipedReader=new PipedReader(sender.getPipedWriter());
}
@Override
public void run() {
try{
while (true){
System.out.println("Read: "+(char)pipedReader.read()+" ");
}
}catch (IOException e){
System.out.println(e+"Receiver read exception");
}
}
}
public class PipedIO {
public static void main(String[] args) throws Exception{
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(sender);
executorService.execute(receiver);
TimeUnit.SECONDS.sleep(4);
executorService.shutdownNow();
}
/**
* Read: A
Read: B
Read: C
Read: D
Read: E
Read: F
Read: G
Read: H
Read: I
Read: J
Read: K
Read: L
Read: M
java.lang.InterruptedException: sleep interruptedSender sleep interrupted
java.io.InterruptedIOExceptionReceiver read exception
*/
}
21.6 死锁
1、某个任务在等待另一个任务,而后者又等待别的任务,这样一直下去,直到这个链条上的任务又在等待第一个任务释放锁。这得到了一个任务之间相互等待的连续循环,没有哪个线程能继续,这称之为死锁。
2、死锁产生的条件:互斥条件;请求和保持条件;不可剥夺条件;循环等待条件。
21.7新类库中的构件
21.7.1CountDownLatch
1、被用来同步一个或多个任务,强制它们等待由其他任务执行的一组操作完成。
2、可以向CountDownLatch()对象设置一个初始计数值,任何在这个对象上调用wait()的方法都将阻塞,直至这个计数值到达0.其他任务在结束其工作时,可以在该对象上调用countDown()来减小这个计数值。CountDownLatch被设计为只触发一次,计数值不能被重置。如果希望能够重置计数值的版本,可以使用CyclicBarrier
3、调用countDown()的任务在产生这个调用时并没有被阻塞,只有对await()的调用会阻塞,直至计数值到达0.
4、CountDownLatch的典型用法是将一个程序分为n个互相独立的可解决任务,并创建值为0的CountDownLatch。当每个任务完成时,都会在这个锁存器上调用countDown()。等待问题被解决的任务在这个锁存器上调用await(),将它们自己拦住,直至锁存器计数结束。
5、
package com21下;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Created by Panda on 2018/5/24.
*/
class TaskPortion implements Runnable{
private static int counter=0;
private final int id=counter++;
private static Random random = new Random(47);
private final CountDownLatch latch;
TaskPortion(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try{
doWork();
latch.countDown();
}catch (InterruptedException e){
}
}
public void doWork() throws InterruptedException{
TimeUnit.MILLISECONDS.sleep(random.nextInt(2000));
System.out.println(this+"completed");
}
public String toString(){
return String.format("%1$-3d",id);
}
}
class WaitingTask implements Runnable{
private static int counter=0;
private final int id=counter++;
private final CountDownLatch latch;
WaitingTask(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
latch.await();
System.out.println("Latch barrier passed for :" +this);
}catch (InterruptedException e){
System.out.println(this + "interruption");
}
}
public String toString(){return String.format("WaitingTask %1$-3d ",id);}
}
public class CountDownLatchDemo {
static final int SIZE=10;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
CountDownLatch countDownLatch = new CountDownLatch(SIZE);
for (int i = 0; i <5 ; i++) {
executorService.execute(new WaitingTask(countDownLatch));
}
for (int i = 0; i <SIZE ; i++) {
executorService.execute(new TaskPortion(countDownLatch));
}
System.out.println("launched all tasks");
executorService.shutdown();
}
/**
* launched all tasks
7 completed
9 completed
6 completed
8 completed
1 completed
2 completed
5 completed
4 completed
0 completed
3 completed
Latch barrier passed for :WaitingTask 2
Latch barrier passed for :WaitingTask 3
Latch barrier passed for :WaitingTask 4
Latch barrier passed for :WaitingTask 1
Latch barrier passed for :WaitingTask 0
*/
}
21.7.2CyclicBarrier
1、CyclicBarrier :创建一组任务,并行地执行工作,然后在进行下一个步骤之前等待,直至所有任务都完成。它使得所有的并行任务都将在栅栏处列队,因此可以一致地向前移动。和CountDownLatch功能差不多,只是CountDownLatch是只触发一次的事件,而CyclicBarrier可以多次重用。
package com21下;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
/**
* Created by Panda on 2018/5/24.
*/
class Horse implements Runnable{
private static int counter=0;
private final int id=counter++;
private int strides=0;
private static Random random = new Random(47);
private static CyclicBarrier cyclicBarrier;
public Horse(CyclicBarrier cyclicBarrier){
this.cyclicBarrier=cyclicBarrier;
}
public synchronized int getStrides(){return strides;}
@Override
public void run() {
try {
while (!Thread.interrupted()){
synchronized (this){
strides+=random.nextInt(3);
}
cyclicBarrier.await();
}
}catch (InterruptedException e){
}catch (BrokenBarrierException e){
throw new RuntimeException(e);
}
}
public String toString(){return "Horse "+id+" ";}
public String tracks(){
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i <getStrides() ; i++) {
stringBuilder.append("*");
}
stringBuilder.append(id);
return stringBuilder.toString();
}
}
public class HorseRace {
static final int FINISH_LINE=75;
private List<Horse> horses = new ArrayList<>();
private ExecutorService executorService = Executors.newCachedThreadPool();
private CyclicBarrier cyclicBarrier;
public HorseRace(int nHorses,final int pause){
cyclicBarrier=new CyclicBarrier(nHorses, new Runnable() {
@Override
public void run() {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i <FINISH_LINE ; i++) {
stringBuilder.append("=");
}
System.out.println(stringBuilder);
for (Horse horse:horses) {
System.out.println(horse.tracks());
}
for (Horse horse:horses) {
if(horse.getStrides()>=FINISH_LINE){
System.out.println(horse+"won!");
executorService.shutdownNow();
return;
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
}catch (InterruptedException e){
System.out.println("barrier-action sleep interrupted");
}
}
}
});
for (int i = 0; i < nHorses; i++) {
Horse horse = new Horse(cyclicBarrier);
horses.add(horse);
executorService.execute(horse);
}
}
public static void main(String[] args) {
int nHoses=7;
int pause=200;
if(args.length>0){
int n=new Integer(args[0]);
nHoses=n>0?n:nHoses;
}
if(args.length>1){
int p=new Integer(args[1]);
pause=p>1?p:pause;
}
new HorseRace(nHoses,pause);
}
}
21.7.3DelaayQueue
1、DelayQueue一个*的BlockingQueue。用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期的时间最长。如果没有任何延迟到期,那么就不会有任何头元素,并且poll()将返回null。
PriorityBlockingQueue是一个很基础的优先级队列,具有可阻塞的读取操作。
推荐阅读
-
并发编程(二)—— CountDownLatch、CyclicBarrier和Semaphore
-
Python实现查找二叉搜索树第k大的节点功能示例
-
中国最好大学排名2017:清华第一北大第二,武大仅列第13
-
Python for Informatics 第11章之正则表达式(二)
-
一个模仿oso的php论坛程序源码(之二)第1/3页
-
asp.net下用url重写URLReWriter实现任意二级域名的方法第1/2页
-
求得二叉搜索树的第k小的元素
-
SQL Server 索引结构及其使用(二) 改善SQL语句第1/3页
-
PHP+jQuery 注册模块的改进(二):邮箱激活第1/2页
-
第36天并发编程之进程篇