Java 自定义线程池和线程总数控制操作
1 概述
池化是常见的思想,线程池是非常典型的池化的实现,《java并发编程实战》也大篇幅去讲解了java中的线程池。本文实现一个简单的线程池。
2 核心类
【1】接口定义
public interface ithreadpool<job extends runnable> { /** * 关闭线程池 */ public void shutalldown(); /** * 执行任务 * * @param job 任务 */ public void execute(job job); /** * 添加工作者 * * @param addnum 添加数 */ public void addworkers(int addnum); /** * 减少工作者 * * @param reducenum 减少数目 */ public void reduceworkers(int reducenum); }
【2】实现类
线程池的核心是维护了1个任务列表和1个工作者列表。
import java.util.arraylist; import java.util.collections; import java.util.linkedlist; import java.util.list; public class xythreadpool<job extends runnable> implements ithreadpool<job> { // 默认线程数 private static int deafault_size = 5; // 最大线程数 private static int max_size = 10; // 任务列表 private linkedlist<job> tasks = new linkedlist<job>(); // 工作线程列表 private list<worker> workers = collections .synchronizedlist(new arraylist<worker>()); /** * 默认构造函数 */ public xythreadpool() { initwokers(deafault_size); } /** * 执行线程数 * * @param threadnums 线程数 */ public xythreadpool(int workernum) { workernum = workernum <= 0 ? deafault_size : workernum > max_size ? max_size : workernum; initwokers(workernum); } /** * 初始化线程池 * * @param threadnums 线程数 */ public void initwokers(int threadnums) { for (int i = 0; i < threadnums; i++) { worker worker = new worker(); worker.start(); workers.add(worker); } // 添加关闭钩子 runtime.getruntime().addshutdownhook(new thread() { public void run() { shutalldown(); } }); } @override public void shutalldown() { for (worker worker : workers) { worker.shutdown(); } } @override public void execute(job job) { synchronized (tasks) { // 提交任务就是将任务对象加入任务队列,等待工作线程去处理 tasks.addlast(job); tasks.notifyall(); } } @override public void addworkers(int addnum) { // 新线程数必须大于零,并且线程总数不能大于最大线程数 if ((workers.size() + addnum) <= max_size && addnum > 0) { initwokers(addnum); } else { system.out.println("addnum too large"); } } @override public void reduceworkers(int reducenum) { if ((workers.size() - reducenum <= 0)) system.out.println("thread num too small"); else { // 暂停指定数量的工作者 int count = 0; while (count != reducenum) { for (worker w : workers) { w.shutdown(); count++; } } } } /** * 工作线程 */ class worker extends thread { private volatile boolean flag = true; @override public void run() { while (flag) { job job = null; // 加锁(若只有一个woker可不必加锁,那就是所谓的单线程的线程池,线程安全) synchronized (tasks) { // 任务队列为空 while (tasks.isempty()) { try { // 阻塞,放弃对象锁,等待被notify唤醒 tasks.wait(); system.out.println("block when tasks is empty"); } catch (interruptedexception e) { e.printstacktrace(); } } // 不为空取出任务 job = tasks.removefirst(); system.out.println("get job:" + job + ",do biz"); job.run(); } } } public void shutdown() { flag = false; } } }
(1) 当调用wait()方法时线程会放弃对象锁,进入等待此对象的等待锁定池,只有针对此对象调用notify()方法后本线程才进入对象锁定池准备
(2) object的方法:void notify(): 唤醒一个正在等待该对象的线程。void notifyall(): 唤醒所有正在等待该对象的线程。
notifyall使所有原来在该对象上等待被notify的线程统统退出wait状态,变成等待该对象上的锁,一旦该对象被解锁,它们会去竞争。
notify只是选择一个wait状态线程进行通知,并使它获得该对象上的锁,但不惊动其它同样在等待被该对象notify的线程们,当第一个线程运行完毕以后释放对象上的锁,此时如果该对象没有再次使用notify语句,即便该对象已经空闲,其他wait状态等待的线程由于没有得到该对象的通知,继续处在wait状态,直到这个对象发出一个notify或notifyall,它们等待的是被notify或notifyall,而不是锁。
3 无需控制线程总数
每调用一次就会创建一个拥有10个线程工作者的线程池。
public class testservice1 { public static void main(string[] args) { // 启动10个线程 xythreadpool<runnable> pool = new xythreadpool<runnable>(10); pool.execute(new runnable() { @override public void run() { system.out.println("====1 test===="); } }); } } public class testservice2 { public static void main(string[] args) { // 启动10个线程 xythreadpool<runnable> pool = new xythreadpool<runnable>(10); pool.execute(new runnable() { @override public void run() { system.out.println("====2 test===="); } }); } }
4 控制线程总数
在项目中所有的线程调用,一般都共用1个固定工作者数大小的线程池。
import javax.annotation.postconstruct; import org.springframework.stereotype.component; import com.xy.pool.xythreadpool; /** * 统一线程池管理类 */ @component public class xythreadmanager { private xythreadpool<runnable> executorpool; @postconstruct public void init() { executorpool = new xythreadpool<runnable>(10); } public xythreadpool<runnable> getexecutorpool() { return executorpool; } } import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.service; @service("testservice3") public class testservice3 { @autowired private xythreadmanager threadmanager; public void test() { threadmanager.getexecutorpool().execute(new runnable() { @override public void run() { system.out.println("====3 test===="); } }); } } import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.service; @service("testservice4") public class testservice4 { @autowired private xythreadmanager threadmanager; public void test() { threadmanager.getexecutorpool().execute(new runnable() { @override public void run() { system.out.println("====4 test===="); } }); } } import org.springframework.context.applicationcontext; import org.springframework.context.support.classpathxmlapplicationcontext; public class testmain { @suppresswarnings("resource") public static void main(string[] args) { applicationcontext atc = new classpathxmlapplicationcontext("applicationcontext.xml"); testservice3 t3 = (testservice3) atc.getbean("testservice3"); t3.test(); testservice4 t4 = (testservice4) atc.getbean("testservice4"); t4.test(); } }
补充:论如何优雅的自定义threadpoolexecutor线程池
前言
线程池想必大家也都用过,jdk的executors 也自带一些线程池。但是不知道大家有没有想过,如何才是最优雅的方式去使用过线程池吗? 生产环境要怎么去配置自己的线程池才是合理的呢?
今天周末,刚好有时间来总结一下自己所认为的'优雅', 如有问题欢迎大家指正。
线程池使用规则
要使用好线程池,那么一定要遵循几个规则:
线程个数大小的设置
线程池相关参数配置
利用hook嵌入你的行为
线程池的关闭
线程池配置相关
线程池大小的设置
这其实是一个面试的考点,很多面试官会问你线程池coresize 的大小来考察你对于线程池的理解。
首先针对于这个问题,我们必须要明确我们的需求是计算密集型还是io密集型,只有了解了这一点,我们才能更好的去设置线程池的数量进行限制。
1、计算密集型:
顾名思义就是应用需要非常多的cpu计算资源,在多核cpu时代,我们要让每一个cpu核心都参与计算,将cpu的性能充分利用起来,这样才算是没有浪费服务器配置,如果在非常好的服务器配置上还运行着单线程程序那将是多么重大的浪费。对于计算密集型的应用,完全是靠cpu的核数来工作,所以为了让它的优势完全发挥出来,避免过多的线程上下文切换,比较理想方案是:
线程数 = cpu核数+1,也可以设置成cpu核数*2,但还要看jdk的版本以及cpu配置(服务器的cpu有超线程)。
一般设置cpu * 2即可。
2、io密集型
我们现在做的开发大部分都是web应用,涉及到大量的网络传输,不仅如此,与数据库,与缓存间的交互也涉及到io,一旦发生io,线程就会处于等待状态,当io结束,数据准备好后,线程才会继续执行。因此从这里可以发现,对于io密集型的应用,我们可以多设置一些线程池中线程的数量,这样就能让在等待io的这段时间内,线程可以去做其它事,提高并发处理效率。那么这个线程池的数据量是不是可以随便设置呢?当然不是的,请一定要记得,线程上下文切换是有代价的。目前总结了一套公式,对于io密集型应用:
线程数 = cpu核心数/(1-阻塞系数) 这个阻塞系数一般为0.8~0.9之间,也可以取0.8或者0.9。
套用公式,对于双核cpu来说,它比较理想的线程数就是20,当然这都不是绝对的,需要根据实际情况以及实际业务来调整:final int poolsize = (int)(cpucore/(1-0.9))
针对于阻塞系数,《programming concurrency on the jvm mastering》即《java 虚拟机并发编程》中有提到一句话:
对于阻塞系数,我们可以先试着猜测,抑或采用一些细嫩分析工具或java.lang.management api 来确定线程花在系统/io操作上的时间与cpu密集任务所耗的时间比值。
线程池相关参数配置
说到这一点,我们只需要谨记一点,一定不要选择没有上限限制的配置项。
这也是为什么不建议使用executors 中创建线程的方法。
比如,executors.newcachedthreadpool的设置与*队列的设置因为某些不可预期的情况,线程池会出现系统异常,导致线程暴增的情况或者任务队列不断膨胀,内存耗尽导致系统崩溃和异常。 我们推荐使用自定义线程池来避免该问题,这也是在使用线程池规范的首要原则! 小心无大错,千万别过度自信!
可以看下executors中四个创建线程池的方法:
//使用*队列 public static executorservice newfixedthreadpool(int nthreads) { return new threadpoolexecutor(nthreads, nthreads, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>()); } //线程池数量是无限的 public static executorservice newcachedthreadpool() { return new threadpoolexecutor(0, integer.max_value, 60l, timeunit.seconds, new synchronousqueue<runnable>()); }
其他的就不再列举了,大家可以自行查阅源码。
第二,合理设置线程数量、和线程空闲回收时间,根据具体的任务执行周期和时间去设定,避免频繁的回收和创建,虽然我们使用线程池的目的是为了提升系统性能和吞吐量,但是也要考虑下系统的稳定性,不然出现不可预期问题会很麻烦!
第三,根据实际场景,选择适用于自己的拒绝策略。进行补偿,不要乱用jdk支持的自动补偿机制!尽量采用自定义的拒绝策略去进行兜底!
第四,线程池拒绝策略,自定义拒绝策略可以实现rejectedexecutionhandler接口。
jdk自带的拒绝策略如下:
abortpolicy:直接抛出异常阻止系统正常工作。
callerrunspolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。
discardoldestpolicy:丢弃最老的一个请求,尝试再次提交当前任务。
discardpolicy:丢弃无法处理的任务,不给予任何处理。
利用hook
利用hook,留下线程池执行轨迹:
threadpoolexecutor提供了protected类型可以被覆盖的钩子方法,允许用户在任务执行之前会执行之后做一些事情。我们可以通过它来实现比如初始化threadlocal、收集统计信息、如记录日志等操作。这类hook如beforeexecute和afterexecute。另外还有一个hook可以用来在任务被执行完的时候让用户插入逻辑,如rerminated 。
如果hook方法执行失败,则内部的工作线程的执行将会失败或被中断。
我们可以使用beforeexecute和afterexecute来记录线程之前前和后的一些运行情况,也可以直接把运行完成后的状态记录到elk等日志系统。
关闭线程池
内容当线程池不在被引用并且工作线程数为0的时候,线程池将被终止。我们也可以调用shutdown来手动终止线程池。如果我们忘记调用shutdown,为了让线程资源被释放,我们还可以使用keepalivetime和allowcorethreadtimeout来达到目的!
当然,稳妥的方式是使用虚拟机runtime.getruntime().addshutdownhook方法,手工去调用线程池的关闭方法!
线程池使用实例
线程池核心代码:
public class asyncprocessqueue { // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ /** * task 包装类<br> * 此类型的意义是记录可能会被 executor 吃掉的异常<br> */ public static class taskwrapper implements runnable { private static final logger _logger = loggerfactory.getlogger(taskwrapper.class); private final runnable gift; public taskwrapper(final runnable target) { this.gift = target; } @override public void run() { // 捕获异常,避免在 executor 里面被吞掉了 if (gift != null) { try { gift.run(); } catch (exception e) { _logger.error("wrapped target execute exception.", e); } } } } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ /** * 执行指定的任务 * * @param task * @return */ public static boolean execute(final runnable task) { return asyncprocessor.executetask(new taskwrapper(task)); } } public class asyncprocessor { static final logger logger = loggerfactory.getlogger(asyncprocessor.class); /** * 默认最大并发数<br> */ private static final int default_max_concurrent = runtime.getruntime().availableprocessors() * 2; /** * 线程池名称格式 */ private static final string thread_pool_name = "externalconvertprocesspool-%d"; /** * 线程工厂名称 */ private static final threadfactory factory = new basicthreadfactory.builder().namingpattern(thread_pool_name) .daemon(true).build(); /** * 默认队列大小 */ private static final int default_size = 500; /** * 默认线程存活时间 */ private static final long default_keep_alive = 60l; /**newentryserviceimpl.java:689 * executor */ private static executorservice executor; /** * 执行队列 */ private static blockingqueue<runnable> executequeue = new arrayblockingqueue<>(default_size); static { // 创建 executor // 此处默认最大值改为处理器数量的 4 倍 try { executor = new threadpoolexecutor(default_max_concurrent, default_max_concurrent * 4, default_keep_alive, timeunit.seconds, executequeue, factory); // 关闭事件的挂钩 runtime.getruntime().addshutdownhook(new thread(new runnable() { @override public void run() { asyncprocessor.logger.info("asyncprocessor shutting down."); executor.shutdown(); try { // 等待1秒执行关闭 if (!executor.awaittermination(1, timeunit.seconds)) { asyncprocessor.logger.error("asyncprocessor shutdown immediately due to wait timeout."); executor.shutdownnow(); } } catch (interruptedexception e) { asyncprocessor.logger.error("asyncprocessor shutdown interrupted."); executor.shutdownnow(); } asyncprocessor.logger.info("asyncprocessor shutdown complete."); } })); } catch (exception e) { logger.error("asyncprocessor init error.", e); throw new exceptionininitializererror(e); } } /** * 此类型无法实例化 */ private asyncprocessor() { } /** * 执行任务,不管是否成功<br> * 其实也就是包装以后的 {@link executer} 方法 * * @param task * @return */ public static boolean executetask(runnable task) { try { executor.execute(task); } catch (rejectedexecutionexception e) { logger.error("task executing was rejected.", e); return false; } return true; } /** * 提交任务,并可以在稍后获取其执行情况<br> * 当提交失败时,会抛出 {@link } * * @param task * @return */ public static <t> future<t> submittask(callable<t> task) { try { return executor.submit(task); } catch (rejectedexecutionexception e) { logger.error("task executing was rejected.", e); throw new unsupportedoperationexception("unable to submit the task, rejected.", e); } } }
使用方式:
asyncprocessqueue.execute(new runnable() { @override public void run() { //do something } });
可以根据自己的使用场景灵活变更,我这里并没有用到beforeexecute和afterexecute以及拒绝策略。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持。如有错误或未考虑完全的地方,望不吝赐教。