ThreadPoolExecutor源码解析
程序员文章站
2022-04-03 14:55:04
...
一、ThreadPoolExecutor简介
使用线程池主要为了解决一下几个问题:
ThreadPoolExecutor参数
JDK自带的RejectedExecutionHandler
二、ThreadPoolExecutor源码
使用线程池主要为了解决一下几个问题:
- 通过重用线程池中的线程,来减少每个线程创建和销毁的性能开销。
- 对线程进行一些维护和管理,比如定时开始,周期执行,并发数控制等等。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
ThreadPoolExecutor参数
- corePoolSize:核心线程数,默认情况下核心线程会一直存活,即使处于闲置状态也不会受存keepAliveTime限制。除非将allowCoreThreadTimeOut设置为true。
- maximumPoolSize:线程池所能容纳的最大线程数。
- keepAliveTime:非核心线程的闲置超时时间,超过这个时间就会被回收。
- unit:指定keepAliveTime的单位,如TimeUnit.SECONDS。
- workQueue:线程池中的任务队列.常用的有三种队列,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。
- threadFactory:线程工厂,提供创建新线程的功能。默认:DefaultThreadFactory
- handler:当任务队列满并且线程数达到maximumPoolSize时,会调用RejectedExecutionHandler.rejectedExecution()方法,默认:AbortPolicy
JDK自带的RejectedExecutionHandler
- DiscardOldestPolicy:会丢弃workQueue中最老的任务,这个Runnable放入workQueue。
- AbortPolicy:直接throw RejectedExecutionException。
- CallerRunsPolicy:在主线程执行这个Runnable。
- DiscardPolicy:不处理,丢弃这个Runnalbe。
- NewThreadRunsPolicy:新起线程执行Runnable。
二、ThreadPoolExecutor源码
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,10,10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); Runnable runnable = new Runnable() { @Override public void run() { System.out.println("aa"); } }; threadPoolExecutor.submit(runnable);
- 如果当前工作线程数<corePoolSize,新增一个Worker,Worker直接执行当前Runnable。
- 如果线程池是运行状态,并且workQueue未满,Runnable入队列。二次校验线程池是运行状态和Worker数,如果线程池已暂停,则Runnable出队列,RejectedExecutionHandler处理。如果Worker==0,新增Worker。
- 如果workQueue已满并且线程池Worker数<maximumPoolSize,新增Worker,并立即执行Runnable。如果线程池Worker数>=maximumPoolSize,则拒绝消息,RejectedExecutionHandler处理。
- 新建一个Worker对象,Worker implements Runnable,并且start这个Worker。
- 线程Worker.runWorker方法会调用getTask()去workQueue取task,如果getTask()返回无task,线程Worker执行完成,线程关闭。
- 如果allowCoreThreadTimeOut=true或者workerCount>corePoolSize,则time=true。
- 如果time=true,表示workerCount>corePoolSize,则workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)会在keepAliveTime时间后返回,如果取不到task,Worker线程关闭。如果time=false,workQueue.take()会block线程,直到有新的task进入workQueue。