多线程编程学习十一(ThreadPoolExecutor 详解).
一、threadpoolexecutor 参数说明
public threadpoolexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, threadfactory threadfactory, rejectedexecutionhandler handler)
- corepoolsize:核心线程池的大小。当提交一个任务到线程池时,核心线程池会创建一个核心线程来执行任务,即使其他核心线程能够执行新任务也会创建线程,等到需要执行的任务数大于核心线程池基本大小时就不再创建。如果调用了线程池的 prestartallcorethreads() 方法,核心线程池会提前创建并启动所有核心线程。
- workqueue:任务队列。当核心线程池中没有线程时,所提交的任务会被暂存在队列中。java 提供了多种。
- maximumpoolsize:线程池允许创建的最大线程数。如果队列也满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的空闲线程执行任务。值得注意的是,如果使用了*的任务队列则这个参数不起作用。
- keepalivetime:当线程池中的线程数大于 corepoolsize 时,keepalivetime 为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。值得注意的是,如果使用了*的任务队列则这个参数不起作用。
- timeunit:线程活动保持时间的单位。
-
threadfactory:创建线程的工厂。可以通过线程工厂给每个创建出来的线程设置符合业务的名字。
// 依赖 guava new threadfactorybuilder().setnameformat("xx-task-%d").build();
-
handler:饱和策略。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。java 提供了以下4种策略:
- abortpolicy:默认。直接抛出异常。
- callerrunspolicy:只用调用者所在线程来运行任务。
- discardoldestpolicy:丢弃队列里最近的一个任务,并执行当前任务。
- discardpolicy:不处理,丢弃掉。
tips: 一般我们称核心线程池中的线程为核心线程,这部分线程不会被回收;超过任务队列后,创建的线程为空闲线程,这部分线程会被回收(回收时间即 keepalivetime)
二、常见的 threadpoolexecutor 介绍
executors 是创建 threadpoolexecutor 和 scheduledthreadpoolexecutor 的工厂类。
java 提供了多种类型的 threadpoolexecutor,比较常见的有 fixedthreadpool、singlethreadexecutor、cachedthreadpool等。
fixedthreadpool
public static executorservice newfixedthreadpool(int nthreads) { return new threadpoolexecutor(nthreads, nthreads, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>()); }
fixedthreadpool 被称为可重用固定线程数的线程池。可以看到 corepoolsize 和 maximumpoolsize 都被设置成了 nthreads;keepalivetime设置为0l,意味着多余的空闲线程会被立即终止;使用了阻塞队列 linkedblockingqueue 作为线程的工作队列(队列的容量为 integer.max_value)。
fixedthreadpool 所存在的问题是,由于队列的容量为 integer.max_value,基本可以认为是*的,所以 maximumpoolsize 和 keepalivetime 参数都不会生效,饱和拒绝策略也不会执行,会造成任务大量堆积在阻塞队列中。
fixedthreadpool 适用于为了满足资源管理的需求,而需要限制线程数量的应用场景。
singlethreadexecutor
public static executorservice newsinglethreadexecutor() { return new finalizabledelegatedexecutorservice (new threadpoolexecutor(1, 1, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>())); }
singlethreadexecutor 是使用单个线程的线程池。可以看到 corepoolsize 和 maximumpoolsize 被设置为1,其他参数与 fixedthreadpool 相同,所以所带来的风险也和 fixedthreadpool 一致,就不赘述了。
singlethreadexecutor 适用于需要保证顺序的执行各个任务。
cachedthreadpool
public static executorservice newcachedthreadpool() { return new threadpoolexecutor(0, integer.max_value, 60l, timeunit.seconds, new synchronousqueue<runnable>()); }
cachedthreadpool 是一个会根据需要创建新线程的线程池。可以看到 corepoolsize 被设置为 0,所以创建的线程都为空闲线程;maximumpoolsize 被设置为 integer.max_value(基本可认为*),意味着可以创建无限数量的空闲线程;keepalivetime 设置为60l,意味着空闲线程等待新任务的最长时间为60秒;使用没有容量的 synchronousqueue 作为线程池的工作队列。
cachedthreadpool 所存在的问题是, 如果主线程提交任务的速度高于maximumpool 中线程处理任务的速度时,cachedthreadpool 会不断创建新线程。极端情况下,cachedthreadpool会因为创建过多线程而耗尽cpu和内存资源。
cachedthreadpool 适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。
三、 自建 threadpoolexecutor 线程池
鉴于上面提到的风险,我们更提倡使用 threadpoolexecutor 去创建线程池,而不用 executors 工厂去创建。
以下是一个 threadpoolexecutor 创建线程池的 demo 实例:
public class pool { static threadfactory threadfactory = new threadfactorybuilder().setnameformat("pool-task-%d").build(); static executorservice executor = new threadpoolexecutor(runtime.getruntime().availableprocessors() * 2, 200, 0l, timeunit.milliseconds, new linkedblockingqueue<>(1024), threadfactory, new threadpoolexecutor.abortpolicy()); public static void main(string[] args) throws executionexception, interruptedexception { // 1. 无返回值的任务执行 -> runnable executor.execute(() -> system.out.println("hello world")); // 2. 有返回值的任务执行 -> callable future<string> future = executor.submit(() -> "hello world"); // get 方法会阻塞线程执行等待返回结果 string result = future.get(); system.out.println(result); // 3. 监控线程池 monitor(); // 4. 关闭线程池 shutdownandawaittermination(); monitor(); } private static void monitor() { threadpoolexecutor threadpoolexecutor = (threadpoolexecutor) pool.executor; system.out.println("【线程池任务】线程池中曾经创建过的最大线程数:" + threadpoolexecutor.getlargestpoolsize()); system.out.println("【线程池任务】线程池中线程数:" + threadpoolexecutor.getpoolsize()); system.out.println("【线程池任务】线程池中活动的线程数:" + threadpoolexecutor.getactivecount()); system.out.println("【线程池任务】队列中等待执行的任务数:" + threadpoolexecutor.getqueue().size()); system.out.println("【线程池任务】线程池已执行完任务数:" + threadpoolexecutor.getcompletedtaskcount()); } /** * 关闭线程池 * 1. shutdown、shutdownnow 的原理都是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程。 * 2. shutdownnow:将线程池的状态设置成 stop,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。 * 3. shutdown:将线程池的状态设置成 shutdown 状态,然后中断所有没有正在执行任务的线程。 */ private static void shutdownandawaittermination() { // 禁止提交新任务 executor.shutdown(); try { // 等待现有任务终止 if (!executor.awaittermination(60, timeunit.seconds)) { // 取消当前正在执行的任务 executor.shutdownnow(); // 等待一段时间让任务响应被取消 if (!executor.awaittermination(60, timeunit.seconds)) { system.err.println("pool did not terminate"); } } } catch (interruptedexception ie) { // 如果当前线程也中断,则取消 executor.shutdownnow(); // 保留中断状态 thread.currentthread().interrupt(); } } }
创建线程池需要注意以下几点:
- cpu 密集型任务应配置尽可能小的线程,如配置 ncpu+1 个线程。
- io 密集型任务(数据库读写等)应配置尽可能多的线程,如配置 ncpu*2 个线程。
- 优先级不同的任务可以使用优先级队列 priorityblockingqueue 来处理。
- 建议使用有界队列。可以避免创建数量非常多的线程,甚至拖垮系统。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。