谈谈Java 线程池
一、引言
池的概念大家并不陌生,数据库连接池、线程池等...大体来说,有三个优点:
- 降低资源消耗。
- 提高响应速度。
- 便于统一管理。
以上是 “池化” 技术的相同特点,至于他们之间的不同点这里不讲,两者都是为了提高性能和效率,抛开实际做连连看找不同,没有意义。
同样,类比于线程池来说:
- 降低资源消耗:
重复利用线程池中已经创建的线程,相比之下省去了线程创建和销毁的性能消耗。
- 提高响应速度:
当有任务创建时,不必等待线程创建,可以立即执行。
- 便于统一管理:
使用线程池,可以对线程统一管理,对线程的执行状态做统一监控。
二、线程池的使用
public threadpoolexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, threadfactory threadfactory, rejectedexecutionhandler handler);
1、关键参数
- corepoolsize 核心线程数
当向线程池中提交一个任务时,如果线程池中的线程数量小于核心线程数,即使存在空闲线程,也会新建一个线程来执行当前任务,直到线程数量大于或等于核心线程数。
- maximunpoolsize 最大线程数
当任务队列满了,线程池中的线程数量小于最大线程数时,创建新线程执行任务。对于*队列,忽略该参数。
- keepalivetime 线程存活时间
大于核心线程数的那一部分线程的存活时间,如果这部分线程空闲超过这段时间,则进行销毁。
- workqueue 任务队列
线程池中的线程数大于核心线程数时,将任务放入此队列等待执行。
- threadfactory 线程工厂
用于创建线程,工厂使用 new threa() 的方式创建线程,并为每个线程做统一规则的命名:pool-m-thread-n(m为线程池的编号,n为线程池内的线程编号)。
- handler 饱和策略
当线程池和队列都满了,则根据此策略处理任务。
2、任务队列类型
名称 | 描述 |
---|---|
arrayblockingqueue | 基于数组结构的有界阻塞队列,此队列按 fifo(先进先出)原则对元素进行排序。 |
linkedblockingqueue | 基于链表结构的阻塞队列,此队列按 fifo (先进先出) 排序元素,吞吐量通常要高于 arrayblockingqueue。executors.newfixedthreadpool( ) 使用了这个队列。 |
synchronousqueue | 不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 linkedblockingqueue,静态工厂方法 executors.newcachedthreadpool( ) 使用了这个队列。 |
priorityblockingqueue | 具有优先级的无限阻塞队列。 |
3、饱和策略类型
策略名称 | 特性 |
---|---|
abortpolicy | 默认的饱和策略,直接抛出 rejectedexecutionexception 异常 |
discardpolicy | 不处理,直接丢弃任务 |
callerrunspolicy | 使用调用者的线程执行任务 |
discardoldestpolicy | 丢弃队列里最近的一个任务,执行当前任务 |
同时,还可以自行实现 rejectedexecutionhandler 接口来自定义饱和策略,比如记录日志、持久化等等。
void execute(runnable command)
threadfactory namedthreadfactory = new threadfactorybuilder().setnameformat("demo-pool-%d").build(); executorservice executor = new threadpoolexecutor( 10, 1000, 60l, timeunit.seconds, new linkedblockingqueue<>(10), namedthreadfactory, new threadpoolexecutor.abortpolicy()); executor.execute( () -> { system.out.println(1111); });
注意使用 execute 方法提交任务时,没有返回值。
future<?> submit(runnable task)
future<integer> future = executor.submit(() -> { return 1 + 1; }); integer result = future.get();
还可以使用 submit 方法提交任务,该方法返回一个 future 对象,通过 future#get( ) 方法可以获得任务的返回值,该方法会一直阻塞知道任务执行完毕。还可以使用 future#get(long timeout, timeunit unit) 方法,该方法会阻塞一段时间后立即返回,而这时任务可能没有执行完毕。
5、关闭线程池
threadpoolexecutor 提供了 shutdown( ) 和 shutdownnow( ) 两个方法关闭线程池。原理是首先遍历线程池的工作线程,依次调用 interrupt( ) 方法中断线程,这样看来如果无法响应中断的任务就不能终止。
两者区别是:
shutdownnow( )
shutdown( )
如果调用了其中一种方法,isshutdown 方法就会返回 true。当所有的任务都已关闭后, 才表示线程池关闭成功,这时调用 isterminaed 方法会返回 true。实际应用中可以根据任务是否 一定要执行完毕 的特性,决定使用哪种方法关闭线程池。
6、合理的配置线程池
通常我们可以 根据 cpu 核心数量来设计线程池数量 。
可以通过 runtime.getruntime().availableprocessors() 方法获得当前设备的物理核心数量。值得注意的是,如果应用运行在一些 docker 或虚拟机容器上时,该方法取得的是当前物理机的 cpu 核心数。
- io 密集型 2ncpu
- 计算密集型 ncpu+1
其中 n 为 cpu 核心数量。
为什么加 1:即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保 cpu 的时钟周期不会被浪费。
三、线程池的运行过程
当提交一个新任务时,线程池的处理步骤:
- 判断当前线程池内的线程数量是否小于核心线程数,如果小于则新建线程执行任务。否则,进入下个阶段。
- 判断队列是否已满,如果没满,则将任务加入等待队列。否则,进入下个阶段。
- 在上面基础上判断是否大于最大线程数,如果是根据响应的策略处理。否则,新建线程执行当前任务。
线程池的源码比较简单易懂,感兴趣的小伙伴可以自行查看 java.util.concurrent.threadpoolexecutor ,在线程池中每个任务都被包装为一个一个的 worker ,下面简单看下 worker 的 run( ) 方法:
try { while (task != null || (task = gettask()) != null) { w.lock(); // if pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. this // requires a recheck in second case to deal with // shutdownnow race while clearing interrupt if ((runstateatleast(ctl.get(), stop) || (thread.interrupted() && runstateatleast(ctl.get(), stop))) && !wt.isinterrupted()) wt.interrupt(); try { beforeexecute(wt, task); throwable thrown = null; try { task.run(); } catch (runtimeexception x) { thrown = x; throw x; } catch (error x) { thrown = x; throw x; } catch (throwable x) { thrown = x; throw new error(x); } finally { afterexecute(task, thrown); } } finally { task = null; w.completedtasks++; w.unlock(); } } completedabruptly = false; } finally { processworkerexit(w, completedabruptly); }
可以看到不断的循环取出 task 并执行,而在任务的执行前后,有 beforeexecute 和 afterexecute 方法,我们可以实现两个方法实现一些监控逻辑。除此之外还可以集合线程池的一些属性或者重写 terminated() 方法在线程池关闭时进行监控。
四、常见的几种线程池实现
在 executors 中提供了集中常见的线程池,分别应用在不同的场景。
- fixthreadpool 固定数量的线程池,适用于对线程管理,高负载的系统
- singlethreadpool 只有一个线程的线程池,适用于保证任务顺序执行
- cachethreadpool 创建一个不限制线程数量的线程池,适用于执行短期异步任务的小程序,低负载系统
- scheduledthreadpool 定时任务使用的线程池,适用于定时任务
上面几种线程池的特性主要依赖于 threadpoolexecutor 的几个参数来实现,不同的核心线程数量,以及不同类型的阻塞队列,同时我们还可以自行实现自己的线程池满足业务需求。
值得注意的是,并不推荐使用 executors 创建线程池,详见下:
executors.newfixedthreadpool(int nthread)
public static executorservice newfixedthreadpool(int nthreads) { return new threadpoolexecutor(nthreads, nthreads, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>()); }
继续来看 linkedblockingqueue :
public linkedblockingqueue() { this(integer.max_value); } public linkedblockingqueue(int capacity) { if (capacity <= 0) throw new illegalargumentexception(); this.capacity = capacity; last = head = new node<e>(null); }
可以看到使用 linkedblockingqueue 创建的是 integer.max_value 大小的队列,会堆积大量的请求,从而造成 oom
executors.newsinglethreadexexutor( )
public static executorservice newsinglethreadexecutor() { return new finalizabledelegatedexecutorservice (new threadpoolexecutor(1, 1, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>())); }
同样,使用的 linkedblockingqueue ,一样的情况
executors.newcachedthreadpool( )
public static executorservice newcachedthreadpool() { return new threadpoolexecutor(0, integer.max_value, 60l, timeunit.seconds, new synchronousqueue<runnable>()); }
代码课件线程池使用的最大线程数是 integer.max_value ,可能会创建大量线程,导致 oom
executors.newschedulethreadpool()
public static scheduledexecutorservice newscheduledthreadpool(int corepoolsize) { return new scheduledthreadpoolexecutor(corepoolsize); } public scheduledthreadpoolexecutor(int corepoolsize) { super(corepoolsize, integer.max_value, 0, nanoseconds, new delayedworkqueue()); }
和上面是一样的问题,最大线程数是 integer.max_value
所以原则上来说禁止使用 executors 创建线程池, 而使用 threadpoolexecutor 的构造函数来创建线程池。
五、结语
线程池在开发中还是比较常见的,结合不同的业务场景,结合最佳实践配置正确的参数,可以帮助我们的应用性能得到提升。
以上就是谈谈java 线程池的详细内容,更多关于java 线程池的资料请关注其它相关文章!