欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

《Java 7 并发编程指南》学习概要 (5) 线程池

程序员文章站 2022-06-19 08:23:27
...

1、newCachedThreadPool

newCachedThreadPool() 方法创建一个缓存线程池。当需要执行新的任务会创建新的线程,如果它们已经完成运行任务,变成可用状态,会重新使用这些线程。

缓存线程池的优点:线程重复利用,它减少线程创建的时间

缓存线程池的缺点:为新任务不断创建线程, 如果提交过多的任务给执行者,会使系统超载

注意事项:使用通过newCachedThreadPool()方法创建的执行者,只有当你有一个合理的线程数或任务有一个很短的执行时间

一旦你创建执行者,你可以使用execute()方法提交Runnable或Callable类型的任务。


executor=(ThreadPoolExecutor)Executors.newCachedThreadPool();

2、newFixedThreadPool

newFixedThreadPool()  创建一个有最大线程数的执行者。

如果提交超过最大线程数的任务,剩下的任务将会被阻塞,直到有空闲的线程来处理它们

executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(5);


3、Callable接口

返回结果,需实现方法call()

public class CallableTest {

    public static void main(String args[]) {
        String[] array = new String[]{"fdasdf", "24234", "&^*", "MNJ&^", ")(UJH"};
        ExecutorService pool = Executors.newFixedThreadPool(30);
        int sum = 0;
        List<Future<Integer>>   list = new  ArrayList<Future<Integer>>(); 
        for (String word : array) {
            Callable<Integer> callable = new WordLength(word);
            Future<Integer> future = pool.submit(callable);
            list.add(future );
        }
        for(Future<Integer> future :list){
          try {
                sum += future.get();
            } catch (InterruptedException | ExecutionException ex) {
                ex.printStackTrace();
            }
        }
       System.out.println("The sum of lengths is " + sum);
    }
}

class WordLength implements Callable<Integer> {

    private String word;

    public WordLength(String word) {
        this.word = word;
    }

    @Override
    public Integer call() {
        return Integer.valueOf(word.length());
    }
}



4、invokeAny()

ThreadPoolExecutor类中的invokeAny()方法接收任务数列,并启动它们,返回完成时没有抛出异常的第一个 任务的结果

	        TaskValidator ldapTask = new TaskValidator(ldapValidator, username,
				password);
		TaskValidator dbTask = new TaskValidator(dbValidator, username,
				password);
		List<TaskValidator> taskList = new ArrayList<>();
		taskList.add(ldapTask);
		taskList.add(dbTask);

		ExecutorService executor = (ExecutorService) Executors
				.newCachedThreadPool();
		String result;
		try {
			result = executor.invokeAny(taskList);
			System.out.printf("Main: Result: %s\n", result);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
		executor.shutdown();


5、invokeAll()

invokeAll()方法等待它们的完成。这个方法接收Callable对象列表和返回 Future对象列表。这个列表将会有列表中每个任务的一个Future对象。Future对象列表的第一个对象是Callable对象列表控制的第一个任务,以此类推。
		List<Task> taskList = new ArrayList<>();
		for (int i = 0; i < 3; i++) {
			Task task = new Task(i);
			taskList.add(task);
		}
		List<Future<Result>> resultList = null;
		try {
			resultList = executor.invokeAll(taskList);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("Main: Printing the results");
		for (int i = 0; i < resultList.size(); i++) {
			Future<Result> future = resultList.get(i);
			try {
				Result result = future.get();
				System.out.println(result.getName() + ": " + result.getValue());
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
		}




6、ScheduledThreadPoolExecutor

执行一次Schedul

        ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
        System.out.printf("Main: Starting at: %s\n", new Date());
        for (int i = 0; i < 5; i++) {
            Task task = new Task("Task " + i);
            executor.schedule(task, i + 1, TimeUnit.SECONDS);
        }
        executor.shutdown();
        try {
            executor.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }



 周期执行Schedule

        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        System.out.printf("Main: Starting at: %s\n", new Date());
        Task task = new Task("Task");
        ScheduledFuture<?> result = executor.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
        for (int i = 0; i < 10; i++) {
            System.out.printf("Main: Delay: %d\n", result.getDelay(TimeUnit.MILLISECONDS));
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        executor.shutdown();
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("Main: Finished at: %s\n", new Date());


7、取消任务

       ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
        Task task = new Task();
        System.out.printf("Main: Executing the Task\n");
        Future<String> result = executor.submit(task);
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("Main: Canceling the Task\n");
        result.cancel(true);
        System.out.printf("Main: Canceled: %s\n", result.isCanceled());
        System.out.printf("Main: Done: %s\n", result.isDone());
        executor.shutdown();
        System.out.printf("Main: The executor has finished\n");

8、FutureTask

FutureTask则是一个RunnableFuture<V>,即实现了Runnbale又实现了Futrue<V>这两个接口,另外它还可以包装Runnable和Callable<V>,所以一般来讲是一个符合体了,它可以通过Thread包装来直接执行,也可以提交给ExecuteService来执行,并且还可以通过v get()返回执行结果,在线程体没有执行完成的时候,主线程一直阻塞等待,执行完则直接返回结果。

public class FutureTaskTest {

	public static void main(String[] args) {
		Callable<String> task = new Callable<String>() {
			public String call() {
				System.out.println("Sleep start.");
				try {
					Thread.sleep(1000 * 10);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				System.out.println("Sleep end.");
				return "time=" + System.currentTimeMillis();
			}
		};
		
		//直接使用Thread的方式执行
		FutureTask<String> ft = new FutureTask<String>(task);
		Thread t = new Thread(ft);
		t.start();
		try {
			System.out.println("waiting execute result");
			System.out.println("result = " + ft.get());
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		//使用Executors来执行
		System.out.println("=========");
		FutureTask<String> ft2 = new FutureTask<String>(task);
		Executors.newSingleThreadExecutor().submit(ft2);
		try {
			System.out.println("waiting execute result");
			System.out.println("result = " + ft2.get());
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
}

FutureTask类提供一个done()方法,允许你在执行者执行任务完成后执行一些代码。你可以用来做一些后处理操作,生成一个报告,通过e-mail发送结果,或释放一些资源。当执行的任务由FutureTask来控制完成,FutureTask会内部调用这个方法。这个方法在任务的结果设置和它的状态变成isDone状态之后被调用,不管任务是否已经被取消或正常完成。默认情况下,这个方法是空的。你可以重写FutureTask类实现这个方法来改变这种行为。 

public class FutureTaskDone {

	public static void main(String[] args) {
		ExecutorService executor = (ExecutorService) Executors
				.newCachedThreadPool();
		ResultTask resultTasks[] = new ResultTask[5];
		for (int i = 0; i < 5; i++) {
			ExecutableTask executableTask = new ExecutableTask("Task " + i);
			resultTasks[i] = new ResultTask(executableTask);
			executor.submit(resultTasks[i]);
		}

		try {
			TimeUnit.SECONDS.sleep(5);
		} catch (InterruptedException e1) {
			e1.printStackTrace();
		}
		for (int i = 0; i < resultTasks.length; i++) {
			resultTasks[i].cancel(true);
		}
		for (int i = 0; i < resultTasks.length; i++) {
			try {
				if (!resultTasks[i].isCancelled()) {
					System.out.printf("%s\n", resultTasks[i].get());
				}
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
		}
		executor.shutdown();
	}
}

class ExecutableTask implements Callable<String> {

	private String name;

	public String getName() {
		return name;
	}

	public ExecutableTask(String name) {
		this.name = name;
	}

	@Override
	public String call() throws Exception {
		try {
			long duration = (long) (Math.random() * 10);
			System.out.printf("%s: Waiting %d seconds for results.\n",
					this.name, duration);
			TimeUnit.SECONDS.sleep(duration);
		} catch (InterruptedException e) {
		}
		return "Hello, world. I'm " + name;
	}

}

class ResultTask extends FutureTask<String> {
	private String name;

	public ResultTask(Callable<String> callable) {
		super(callable);
		this.name = ((ExecutableTask) callable).getName();
	}

	@Override
	protected void done() {
		if (isCancelled()) {
			System.out.printf("%s: Has been canceled\n", name);
		} else {
			System.out.printf("%s: Has finished\n", name);
		}
	}
}


9、CompletionService

通常,当你使用执行者执行并发任务时,你将会提交 Runnable或Callable任务给这个执行者,并获取Future对象控制这个方法。你可以发现这种情况,你需要提交任务给执行者在一个对象中,而处理结果在另一个对象中。基于这种情况,Java并发API提供CompletionService类。

CompletionService 类有一个方法来提交任务给执行者和另一个方法来获取已完成执行的下个任务的Future对象。在内部实现中,它使用Executor对象执行任务。这种行为的优点是共享一个CompletionService对象,并提交任务给执行者,这样其他(对象)可以处理结果。其局限性是,第二个对象只能获取那些已经完成它们的执行的任务的Future对象,所以,这些Future对象只能获取任务的结果。

public class CompletionServiceTest {

	public static void main(String[] args) {
		ExecutorService executor = (ExecutorService) Executors
				.newCachedThreadPool();
		CompletionService<String> service = new ExecutorCompletionService<>(
				executor);
		ReportRequest faceRequest = new ReportRequest("Face", service);
		ReportRequest onlineRequest = new ReportRequest("Online", service);
		Thread faceThread = new Thread(faceRequest);
		Thread onlineThread = new Thread(onlineRequest);

		ReportProcessor processor = new ReportProcessor(service);
		Thread senderThread = new Thread(processor);

		System.out.printf("Main: Starting the Threads\n");
		faceThread.start();
		onlineThread.start();
		senderThread.start();

		try {
			System.out.printf("Main: Waiting for the report generators.\n");
			faceThread.join();
			onlineThread.join();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		System.out.printf("Main: Shutting down the executor.\n");
		executor.shutdown();
		try {
			executor.awaitTermination(1, TimeUnit.DAYS);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		System.out.printf("Main: Shutting down the executor.\n");
		executor.shutdown();
		try {
			executor.awaitTermination(1, TimeUnit.DAYS);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

	}

}

class ReportGenerator implements Callable<String> {
	private String sender;
	private String title;

	public ReportGenerator(String sender, String title) {
		this.sender = sender;
		this.title = title;
	}

	@Override
	public String call() throws Exception {
		try {
			Long duration = (long) (Math.random() * 10);
			System.out
					.printf("%s_%s: ReportGenerator: Generating a report during %d seconds\n",
							this.sender, this.title, duration);
			TimeUnit.SECONDS.sleep(duration);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		String ret = sender + ": " + title;
		return ret;
	}
}

class ReportRequest implements Runnable {
	private String name;
	private CompletionService<String> service;

	public ReportRequest(String name, CompletionService<String> service) {
		this.name = name;
		this.service = service;
	}

	@Override
	public void run() {
		ReportGenerator reportGenerator = new ReportGenerator(name, "Report");
		service.submit(reportGenerator);
	}

}

class ReportProcessor implements Runnable {

	private CompletionService<String> service;
	private boolean end;

	public ReportProcessor(CompletionService<String> service) {
		this.service = service;
		end = false;
	}

	@Override
	public void run() {
		while (!end) {
			try {
				Future<String> result = service.poll(20, TimeUnit.SECONDS);
				if (result != null) {
					String report = result.get();
					System.out.printf("ReportReceiver: Report Received:%s\n",
							report);
				}
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
		}
		System.out.printf("ReportSender: End\n");
	}

	public void setEnd(boolean end) {
		this.end = end;
	}
}


10、RejectedExecutionHandler

当你想要结束执行者的执行,你使用shutdown()方法来表明它的结束。执行者等待正在运行或等待它的执行的任务的结束,然后结束它们的执行。

如果你在shutdown()方法和执行者结束之间,提交任务给执行者,这个任务将被拒绝,因为执行者不再接收新的任务。ThreadPoolExecutor类提供一种机制,在调用shutdown()后,不接受新的任务。

public class RejectedTaskController implements RejectedExecutionHandler {
	public static void main(String[] args) {
		RejectedTaskController controller = new RejectedTaskController();
		ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
				.newCachedThreadPool();
		executor.setRejectedExecutionHandler(controller);

		System.out.printf("Main: Starting.\n");
		for (int i = 0; i < 3; i++) {
			Task task = new Task("Task" + i);
			executor.submit(task);
		}

		System.out.printf("Main: Shutting down the Executor.\n");
		executor.shutdown();

		System.out.printf("Main: Shutting down the Executor.\n");
		executor.shutdown();

		System.out.printf("Main: Sending another Task.\n");
		Task task = new Task("RejectedTask");
		executor.submit(task);

		System.out.println("Main: End");
		System.out.printf("Main: End.\n");

	}

	@Override
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
		System.out.printf(
				"RejectedTaskController: The task %s has been rejected\n",
				r.toString());
		System.out.printf("RejectedTaskController: %s\n", executor.toString());
		System.out.printf("RejectedTaskController: Terminating:%s\n",
				executor.isTerminating());
		System.out.printf("RejectedTaksController: Terminated:%s\n",
				executor.isTerminated());
	}

}

class Task implements Runnable {

	private String name;

	public Task(String name) {
		this.name = name;
	}

	@Override
	public void run() {
		System.out.println("Task " + name + ": Starting");
		try {
			long duration = (long) (Math.random() * 10);
			System.out
					.printf("Task %s: ReportGenerator: Generating a report during %d seconds\n",
							name, duration);
			TimeUnit.SECONDS.sleep(duration);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.printf("Task %s: Ending\n", name);

	}

	public String toString() {
		return name;
	}
}