ThreadPoolExecuter的实现原理(ZT)
转帖地址:https://blog.csdn.net/zqz_zqz/article/details/69488570?locationNum=12&fps=1
线程池基本在每个应用中都会用到,而线程池涉及到的细节非常多,要想用好它,仅仅是了解它的api调用是不行的,而且如果你经常分析java线程堆栈,不了解线程池,那么涉及到线程池堆栈的代码也很难看懂,所以作为java程序员,应该好好研究下线程池的实现!
类结构图
示例
public class ThreadPoolTest {
//固定大小的线程池:
//初始化一个指定线程数的线程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作为阻塞队列,当线程池没有可执行任务时,也不会释放线程。
private static ExecutorService executor = Executors.newFixedThreadPool(10);
private static ThreadPoolExecutor executor_ = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
//缓存线程池:
//初始化一个可以缓存线程的线程池,默认缓存60s,线程池的线程数可达到Integer.MAX_VALUE,即2147483647,内部使用SynchronousQueue作为阻塞队列;
//和newFixedThreadPool创建的线程池不同,newCachedThreadPool在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销;
//所以,使用该线程池时,一定要注意控制并发的任务数,否则创建大量的线程可能导致严重的性能问题;
private static ExecutorService executor2 = Executors.newCachedThreadPool();
private static ExecutorService executor2_ = new ThreadPoolExecutor(0,
Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
//初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行,内部使用LinkedBlockingQueue作为阻塞队列。
private static ExecutorService executor3 = Executors.newSingleThreadExecutor();
// 因为FinalizableDelegatedExecutorService类是不可直接访问的,这样写会报错,所以注释掉
// private static ExecutorService executor3_ = new FinalizableDelegatedExecutorService
// (new ThreadPoolExecutor(1, 1,
// 0L, TimeUnit.MILLISECONDS,
// new LinkedBlockingQueue<Runnable>()));
//定时任务线程池:
//初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据。
private static ScheduledExecutorService executor4 = Executors.newScheduledThreadPool(5);
// 因为new DelayedWorkQueue()这个类是内部类所以这里也不可以直接这样写,这样写是为了让大家了解它的实现本质
// private static ExecutorService executor4_ = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 0, NANOSECONDS,
// new DelayedWorkQueue());
public static void main(String[] args){
if(!executor.isShutdown())
executor.execute(new Task());
Future f = executor.submit(new Task());
executor_.execute(new Task());
executor_.getLargestPoolSize();
}
static class Task implements Runnable{
public void run(){
System.out.println(Thread.currentThread().getName());
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
自带线程池的各种坑
- Executors.newFixedThreadPool(10);
固定大小的线程池,它的实现
new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
- 1
初始化一个指定线程数的线程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作为阻塞队列,超时时间为0,当线程池没有可执行任务时,也不会释放线程。
因为队列LinkedBlockingQueue大小为默认的Integer.MAX_VALUE,可以无限的往里面添加任务,直到内存溢出;
- Executors.newCachedThreadPool();
缓存线程池,它的实现:
new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
- 1
初始化一个可以缓存线程的线程池,默认超时时间60s,线程池的最小线程数时0,但是最大线程数为Integer.MAX_VALUE,即2147483647,内部使用SynchronousQueue作为阻塞队列;
因为线程池的最大值了Integer.MAX_VALUE,会导致无限创建线程;所以,使用该线程池时,一定要注意控制并发的任务数,如果短时有大量任务要执行,就会创建大量的线程,导致严重的性能问题(线程上下文切换带来的开销),线程创建占用堆外内存,如果任务对象也不小,它就会使堆外内存和堆内内存其中的一个先耗尽,导致oom;
- Executors.newSingleThreadExecutor()
单线程线程池,它的实现
new FinalizableDelegatedExecutorService(
new ThreadPoolExecutor(1, 1,0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()
)
);
- 1
- 2
- 3
- 4
- 5
- 6
同newFixedThreadPool线程池一样,队列用的是LinkedBlockingQueue,队列大小为默认的Integer.MAX_VALUE,可以无限的往里面添加任务,直到内存溢出;
基础参数
我们先来总结一下线程池的这些参数,后面再上源码就好理解了
core,maxPoolSize,keepalive
执行任务时
1. 如果线程池中线程数量 < core,新建一个线程执行任务;
2. 如果线程池中线程数量 >= core ,则将任务放入任务队列
3. 如果线程池中线程数量 >= core 且 < maxPoolSize,且任务队列满了,则创建新的线程;
4. 如果线程池中线程数量 > core ,当线程空闲时间超过了keepalive时,则会销毁线程;由此可见线程池的队列如果是*队列,那么设置线程池最大数量是无效的;
源码分析java.util.concurrent.ThreadPoolExecutor
这是最常用的一个类,我们建立的线程池大部分都是用它实现的,所以重点来分析下这个类的源码;
构造方法
它的构造方法有很多,但是最终调用的都是下面这个构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
参数说明
-
corePoolSize(核心线程池大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
-
maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了*的任务队列这个参数就没什么效果。
-
ThreadFactory:用于设置创建线程的工厂。 默认使用Executors内部类DefaultThreadFactory,可以通过实现ThreadFactory接口,写自己的Factory,通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常又帮助;
-
keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
-
TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
-
workQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。
1.ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。 2.LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。 3.SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。 4.PriorityBlockingQueue:一个具有优先级得无限阻塞队列。
- 1
- 2
- 3
- 4
- 5
-
RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。
这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是提供的四种策略。 1.AbortPolicy:直接抛出异常。默认策略 2.CallerRunsPolicy:只用调用者所在线程来运行任务。 3.DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。 4.DiscardPolicy:不处理,丢弃掉。 当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
- 1
- 2
- 3
- 4
- 5
- 6
- 7
重要的成员变量
先看下重要的成员变量ctl及其相关常量
ctl
它记录了当前线程池的运行状态和线程池内的线程数;一个变量是怎么记录两个值的呢?它是一个AtomicInteger 类型,有32个字节,这个32个字节中,高3位用来标识线程池的运行状态,低29位用来标识线程池内当前存在的线程数;
//利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- 1
- 2
- 3
线程池状态
线程池有5种状态,这五种状态由五个静态常量标识,每种状态的值的大小
RUNNING < shutdown < stop < tidying < terminated;
//32-3 = 29 ,低位29位存储线程池中线程数
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池最多可以有536870911个线程,一般绝对创建不到这么大
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//RUNNING线程池能接受新任务(只有running状态才会接收新任务),并且可以运行队列中的任务
//-1的二进制为32个1,移位后为:11100000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
//SHUTDOWN不再接受新任务,但仍可以执行队列中的任务
//0的二进制为32个0,移位后还是全0
private static final int SHUTDOWN = 0 << COUNT_BITS;
//STOP不再接受新任务,不再执行队列中的任务,而且要中断正在处理的任务
//1的二进制为前面31个0,最后一个1,移位后为:00100000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
//TIDYING所有任务均已终止,workerCount的值为0,转到TIDYING状态的线程即将要执行terminated()钩子方法.
//2的二进制为01000000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
//TERMINATED terminated()方法执行结束.
//3移位后01100000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
要牢记以下几点:
- 只有RUNNING状态下才会接收新任务;
- 只有RUNNING状态和SHUTDOWN状态才会执行任务队列中的任务;
- 其它状态都不会接收新任务,不会执行任务队列中的任务;
状态之间转换关系如下
- RUNNING -> SHUTDOWN
调用了shutdown方法,线程池实现了finalize方法,在里面调用了shutdown方法,因此shutdown可能是在finalize中被隐式调用的
(RUNNING or SHUTDOWN) -> STOP
调用了shutdownNow方法 - SHUTDOWN -> TIDYING
当队列和线程池均为空的时候 - STOP -> TIDYING
当线程池为空的时候 - TIDYING -> TERMINATED
处于TIDYING状态后最终会进入TERMINATED状态
与ctl相关的三个方法
//获取线程池的状态,也就是将ctl低29位都置为0后的值
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取线程池中线程数,也就是ctl低29位的值
private static int workerCountOf(int c) { return c & CAPACITY; }
//设置ctl的值,rs为线程池状态,wc为线程数;
private static int ctlOf(int rs, int wc) { return rs | wc; }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
workers
用来存储线程池中的线程,线程都被封装成了Worker对象
private final HashSet<Worker> workers = new HashSet<Worker>();
- 1
- 2
- 3
completedTaskCount
//记录了已经销毁的线程,完成的任务总数;
private long completedTaskCount;
- 1
- 2
- 3
线程池的运行
前面内容都是理解源码的基础,下面开始讲解重要的运行方法,阅读前了解前面的内容才能更好的理解下面方法的运行原理;
添加任务execute方法
线程池是调用该方法来添加任务的,所以我们就从这个方法看起;
它传入的参数为实现了Runnable接口的对象,要执行的任务写在它的run方法中;
//添加新任务
public void execute(Runnable command) {
//如果任务为null直接抛出异常
if (command == null)
throw new NullPointerException();
//获取当前线程池的ctl值,不知道它作用的看前面说明
int c = ctl.get();
//如果当前线程数小于核心线程数,这时候任务不会进入任务队列,会创建新的工作线程直接执行任务;
if (workerCountOf(c) < corePoolSize) {
//添加新的工作线程执行任务,addWorker方法后面分析
if (addWorker(command, true))
return;
//addWorker操作返回false,说明添加新的工作线程失败,则获取当前线程池状态;(线程池数量小于corePoolSize情况下,创建新的工作线程失败,是因为线程池的状态发生了改变,已经处于非Running状态,或shutdown状态且任务队列为空)
c = ctl.get();
}
//以下两种情况继续执行后面代码
//1.前面的判断中,线程池中线程数小于核心线程数,并且创建新的工作线程失败;
//2.前面的判断中,线程池中线程数大于等于核心线程数
//线程池处于RUNNING状态,说明线程池中线程已经>=corePoolSize,这时候要将任务放入队列中,等待执行;
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次检查线程池的状态,如果线程池状态变了,非RUNNING状态下不会接收新的任务,需要将任务移除,成功从队列中删除任务,则执行reject方法处理任务;
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)//如果线程池的状态没有改变,且池中无线程
// 两种情况进入以该分支
//1.线程池处于RUNNING状态,线程池中没有线程了,因为有新任务进入队列所以要创建工作线程(这时候新任务已经在队列中,所以下面创建worker线程时第一个参数,要执行的任务为null,只是创建一个新的工作线程并启动它,让它自己去队列中取任务执行)
//2.线程池处于非RUNNING状态但是任务移除失败,导致任务队列中仍然有任务,但是线程池中的线程数为0,则创建新的工作线程,处理队列中的任务;
addWorker(null, false);
// 两种情况执行下面分支:
// 1.非RUNNING状态拒绝新的任务,并且无法创建新的线程,则拒绝任务
// 2.线程池处于RUNNING状态,线程池线程数量已经大于等于coresize,任务就需要放入队列,如果任务入队失败,说明队列满了,则创建新的线程,创建成功则新线程继续执行任务,如果创建失败说明线程池中线程数已经超过maximumPoolSize,则拒绝任务
}else if (!addWorker(command, false))
reject(command);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
往线程池添加线程addWorker方法
往线程池中添加工作线程,线程会被封装成Worker对象,放入到works线程池中(可以先看下一小节“内部类Worker”的实现后再看这个方法,也可以先不用管Worker类,先看addWorker的实现过程);
它的执行过程如下:
- 增加线程时,先判断当前线程池的状态允不允许创建新的线程,如果允许再判断线程池有没有达到 限制,如果条件都满足,才继续执行;
- 先增加线程数计数ctl,增加计数成功后,才会去创建线程;
- 创建线程是通过work对象来创建的,创建成功后,将work对象放入到works线程池中(就是一个hashSet);
- 添加完成后,更新largestPoolSize值(线程池中创建过的线程最大数量),最后启动线程,如果参数firstTask不为null,则执行第一个要执行的任务,然后循环去任务队列中取任务来执行;
成功添加worker工作线程需要线程池处于以下两种状态中的一种
- 线程池处于RUNNING状态
- 线程池处于SHUTDOWN状态,且创建线程的时候没有传入新的任务(此状态下不接收新任务),且任务队列不为空(此状态下,要执行完任务队列中的剩余任务才能关闭);
private boolean addWorker(Runnable firstTask, boolean core) {
//以下for循环,增加线程数计数,ctl,只增加计数,不增加线程,只有增加计数成功,才会增加线程
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//这个代码块的判断,如果是STOP,TIDYING和TERMINATED这三种状态,都会返回false。(这几种状态不会接收新任务,也不再执行队列中的任务,中断当前执行的任务)
//如果是SHUTDOWN,firstTask不为空(SHUTDOWN状态下,不会接收新任务)或 者workQueue是空(队列里面都没有任务了,也就不需要线程了),返回false。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//只有满足以下两种条件才会继续创建worker线程对象
//1.RUNNING状态,
//2.shutdown状态,且firstTask为null(因为shutdown状态下不再接收新任务),队列不是空(shutdown状态下需要继续处理队列中的任务)
通过自旋的方式增加线程池线程数
for (;;) {
int wc = workerCountOf(c);
//1.如果线程数大于最大可创建的线程数CAPACITY,直接返回false;
//2.判断当前是要根据corePoolSize,还是maximumPoolSize进行创建线程(corePoolSize是基本线程池大小,未达到corePoolSize前按照corePollSize来限制线程池大小,达到corePoolSize后,并且任务队列也满了,才会按照maximumPoolSize限制线程池大小)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//将WorkerCount通过CAS操作增加1,成功的话直接跳出两层循环;
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)//否则则判断当前线程池的状态,如果现在获取到的状态与进入自旋的状态不一致的话,那么则通过continue retry重新进行状态的判断
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//以下代码块是创建Worker线程对象,并启动
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); //创建一个新的Worker对象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //获取线程池的重入锁后,
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// RUNNING状态 || SHUTDONW状态下,没有新的任务,只是处理任务队列中剩余的任务;
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//如果线程是活动状态,直接抛出异常,因为线程刚创建,还没有执行start方法,一定不会是活动状态;
if (t.isAlive())
throw new IllegalThreadStateException();
// 将新启动的线程添加到线程池中
workers.add(w);
// 更新largestPoolSize的值,largestPoolSize成员变量保存线程池中创建过的线程最大数量
int s = workers.size();
//将线程池中创建过的线程最大数量,设置给largestPoolSize,可以通过getLargestPoolSize()方法获取,注意这个方法只能在 ThreadPoolExecutor中调用,Executer,ExecuterService,AbstractExecutorService中都是没有这个方法的
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动新添加的线程,这个线程首先执行firstTask,然后不停的从队列中取任务执行
// 当等待keepAlieTime还没有任务执行则该线程结束。见runWoker和getTask方法的代码。
if (workerAdded) {
t.start();
workerStarted = true;
}
}
}finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
内部类Worker
它是ThreadPoolExecutor的一个内部类
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
- 1
- 2
由它的定义可以知它实现了Runnable接口,是一个线程,还继承了AQS类,实现了加锁机制;
它利用AQS框架实现了一个简单的非重入的互斥锁, 实现互斥锁主要目的是为了中断的时候判断线程是在空闲还是运行,它的state只有三个值 ,初始状态为不可加锁状态-1,无锁状态为0,加锁状态为1,可以看shutdown、shutdownNow、runWorker方法来分析它锁的作用。
Worker的构造方法
构造方法里面要重点关注一下getThreadFactory()这个方法
//参数为Worker线程运行后第一个要执行的任务
Worker(Runnable firstTask) {
//设置ASQ的state为-1 设置worker处于不可加锁的状态,看后面的tryAcquire方法,只有state为0时才允许加锁,worker线程运行以后才会把state置为0
setState(-1);
//设置第一个运行的任务
this.firstTask = firstTask;
//创建线程,将this自己传入进去;getThreadFactory()见后面详解
this.thread = getThreadFactory().newThread(this);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
线程的创建getThreadFactory();
默认会在构造方法中传入Executors.defaultThreadFactory(),该方法然会一个DefaultThreadFactory();
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
- 1
- 2
- 3
static class DefaultThreadFactory implements ThreadFactory {
//线程池编号
private static final AtomicInteger poolNumber = new AtomicInteger(1);
//线程池中线程所属线程组
private final ThreadGroup group;
//线程池中线程编号
private final AtomicInteger threadNumber = new AtomicInteger(1);
//线程名称前缀
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
//设置线程名称为"pool-线程池的编号-thread-线程的编号"
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
//创建新的线程
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
//设置为非守护线程
if (t.isDaemon())
t.setDaemon(false);
//设置优先级为NORMAL为5
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
一般我们最好不要用默认的线程池,可以继承该类,给线程指定一个识别度高的名字,出了问题好排查;
Worker的成员变量
//被封装的线程,就是它自己;
final Thread thread;
//传入的它要执行的第一个任务,如果firstTask为空就从任务队列中取任务执行
Runnable firstTask;
//记录执行完成的任务数量,如果执行任务过程中出现异常,仍然会计数;
volatile long completedTasks
- 1
- 2
- 3
- 4
- 5
- 6
- 7
worker线程的加锁解锁
worker的加锁解锁机制是基于AQS框架的,要完全弄明白它的加锁解锁机制请看AQS框架的实现,在这里只是简单介绍一下:
//尝试加锁方法,将状态从0设置为1;如果不是0则加锁失败,在worker线程没有启动前是-1状态,无法加锁
//该方法重写了父类AQS的同名方法
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//尝试释放锁的方法,直接将state置为0
//该方法重写了父类AQS的同名方法
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//注意:tryAcquire与tryRelease是重写了AQS父类的方法,且不可以直接调用,它们被以下方法调用实现加锁解锁操作
//加锁:acquire法是它父类AQS类的方法,会调用tryAcquire方法加锁
public void lock() { acquire(1); }
//尝试加锁
public boolean tryLock() { return tryAcquire(1); }
//解锁:release方法是它父类AQS类的方法,会调用tryRelease方法
public void unlock() { release(1); }
//返回锁状态
public boolean isLocked() { return isHeldExclusively(); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
Worker线程执行任务runWorker (重要)
看完了Worker线程的创建,再来看看Worker线程的运行,Worker的run方法中会调用runWorker方法来获循环取任务并执行;
final void runWorker(Worker w) {
//当前线程
Thread wt = Thread.currentThread();
//获取当前Worker线程创建时,指定的第一个要执行的任务,也可以不指定任务,那么它自己就会去任务队列中取任务;
Runnable task = w.firstTask;
w.firstTask = null;
// 在构造方法里面将state设置为了-1,执行该方法就将state置为了0,这样就可以加锁了,-1状态下是无法加锁的,看Worker类的tryAcquire方法
w.unlock();
//该变量代表任务执行是否发生异常,默认值为true发生了异常,后面会用到这个变量
boolean completedAbruptly = true;
try {
//如果创建worker时传入了第一个任务,则执行第一个任务,否则 从任务队列中获取任务getTask(),getTask()后面分析;
while (task != null || (task = getTask()) != null) {
//线程加锁
w.lock();
/**
* 先判断线程池状态是否允许继续执行任务:
* 1.如果是stop<tidying<terminated(这种状态是不接受任务,且不执行任务的),并且线程是非中断状态
* 2.shutingdown,runing ,处于中断状态(并复位中断标志),如果这个时候其它线程执行了shutdownNow方法,shutdownNow方法会把状态设置为STOP
*
* 这个时候则中断线程
**/
if ((
runStateAtLeast(ctl.get(), STOP) ||
(
Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)
)
)
&&
!wt.isInterrupted())
wt.interrupt();
/**
*开始执行任务
*/
try {
//任务执行前要做的处理:这个方法是空的,什么都不做,一般会通过继承ThreadPoolExecute类后重写该方法实现自己的功能;传入参数为当前线程与要执行的任务
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任务执行后要做的处理:这个方法也是空的,什么都不做,一般会通过继承ThreadPoolExecute类后重写该方法实现自己的功能;参数为当前任务和执行任务时抛出的异常
afterExecute(task, thrown);
}
} finally {
task = null;
//增加完成任务计数
w.completedTasks++;
w.unlock();
}
}
/**
*退出while循环,线程结束;
**/
//判断task.run()方法是否抛出了异常,如果没有则设置它为false,如果发生了异常,前面会直接抛出,中断方法继续执行,就不会执行下面这句;
completedAbruptly = false;
} finally {
/**
* 线程退出后的处理
*/
processWorkerExit(w, completedAbruptly);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
需要注意的是,线程如果执行任务过程中,业务代码抛出了异常,那么会将抛出的异常catch以后抛出,如果是Throwable类型的异常,则会封装成Error抛出,最后此线程退出,但是退出之前会将任务完成数照样+1,然后会在控制台上打印Error或者是RuntimeException 异常,这些异常不会被我们捕获,异常信息只会在控制台打出,不会再我们的log日志中打出;
所以我们一定要自己去捕获并处理我们的异常,而不能抛出不管;
worker线程从任务队列里面获取任务getTask
从任务队列中获取任务
这是个for循环
1.先判断线程池状态是否允许取任务,不允许直接将线程数量减1 ,直接返回null;
2.若线程池状态允许取任务,则判断当前线程是否超时 ,若线程超时则将线程池数量减1,直接返回null;
3.若没有超时,则去任务队列取任务,取到的话返回任务,若超时则设置超时状态,继续循环,在下次循环中处理超时状态
- 1
- 2
- 3
- 4
- 5
private Runnable getTask() {
// 如果判断当前线程池状态需要启用超时操作,那么任务队列取任务时使用的是带有超时的workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法,如果超时,则会将timeOut 变量设置为true,在下次执行for循环时根据timeOut来执行超时操作;
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/**
* 以下分支在stop、tidying、terminated状态,或者在SHUTDOWN状态且任务队列为空时 退出当前线程
*
* 判断线程池状态是否允许继续获取任务:
* RUNNING<shutdown<stop<tidying<terminated;
* rs >= SHUTDOWN,包含两部分判断操作
*1.如果是rs > SHUTDOWN,即状态为stop、tidying、terminated;这时不再处理队列中的任务,直接返回null
*2.如果是rs = SHUTDOWN ,rs>=STOP不成立,这时还需要处理队列中的任务除非队列为空,没有任务要处理,则返回null
*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//自旋锁将ctl减1(也就是将线程池中的线程数减1)
decrementWorkerCount();
return null;
}
/**
* 在RUNNING状态 或 shutdown状态且任务队列不为空时继续往下执行执行
*/
/**
* 以下做线程超时控制:
* 启用超时控制需要满足至少一个条件
* 1.allowCoreThreadTimeOut为true代表核心线程数可以做超时控制;
* 2.如果当前线程数>corePoolSize核心线程数,也可以做超时控制;
* 在以上前提下,再判断当前线程是否需要销毁:
* 1.如果当前线程数大于maximumPoolSize,这肯定是不允许的,需要销毁当前线程;
* 2.如果当前线程上次执行循环时,取任务操作超时,任务队列是空,需要销毁当前线程;
*/
//获取线程池中线程数量
int wc = workerCountOf(c);
// timed变量用于判断是否需要进行超时控制。
// allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,需要进行超时控制;
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* 超时销毁线程需要先满足以下两个条件之一
* 1. wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
* 2. timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次循环当前线程从任务队列中获取任务发生了超时,没有取到任务;
* 满足上面两个条件之一的情况下,接下来判断,如果线程数量大于1,或者线程队列是空的,那么尝试将workerCount减1,减1成功则返回null,退出当前线程; 如果减1失败,则返回继续执行循环操作,重试。
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//尝试将线程池线程数量减一
if (compareAndDecrementWorkerCount(c))
return null;
//如果将线程池数量减一不成功则循环重试
continue;
}
/**
* 如果没有超时,则继续去任务队列取任务执行;
*取任务操作
*/
try {
//根据timed(是否启用超时控制)来判断执行poll操作还是执行take()操作还是执行有时间限制的poll操作,并返回获取到的任务;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//如果poll操作等待超时,没有取到任务;则将timeOut设置为true;
timedOut = true;
} catch (InterruptedException retry) {
//如果是因为线程中断导致没有取到任务;则设置timedOut=false继续执行循环,取任务
timedOut = false;
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
Worker线程的退出processWorkerExit
如果是处理任务发生异常导致的退出,则以自旋锁的方式将线程数减1;
将当前worker执行完成的任务数,累加到completedTaskCount上;
将当前线程移出线程池;
尝试终止线程池;
判断是否要新建workder线程;
1.如果是RUNNING或SHUTDOWN状态,且worker是异常结束,会直接执行AddWorker操作;
2.如果是RUNNING或SHUTDOWN状态,且worker是没有任务可做结束的,且allowCoreThreadTimeOut=false,且当前线程池中的线程数小于corePoolSize,则会创建addWorker线程;
3.判断是否要添加一个新的线程:线程池是RUNNING或SHUTDOWN状态,worker线程如果是异常结束的,则直接添加一个新线程;如果当前线程池中的线程数小于最小线程数,也会创建一个新线程;
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果任务运行异常导致则completedAbruptly=true,则将线程池worker线程数减1,如果是没有获取到任务导致的completedAbruptly=false,则会在getTask()方法里面将线程数减1;
if (completedAbruptly)
//自旋锁将ctl减1(也就是将线程池中的线程数减1)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//退出前,将本线程已完成的任务数量,添加到已经完成任务的总数中;
completedTaskCount += w.completedTasks;
//线程队列中移除当前线程
workers.remove(w);
} finally {
mainLock.unlock();
}
//尝试停止线程池
tryTerminate();
/**
*判断是否要增加新的线程
*如果满足以下条件则新增线程:
* 一、当线程池是RUNNING或SHUTDOWN状态,且worker是异常结束,那么会直接addWorker;
* 二、当线程池是RUNNING或SHUTDOWN状态,且worker是没有任务可做结束的;
* 1.如果allowCoreThreadTimeOut=true,则判断等待队列不为空 ,且当前线程数是否小于1;
* 2.如果allowCoreThreadTimeOut=false,则判断当前线程数是否小于小于corePoolSize;
* 如果小于,则会创建addWorker线程;
**/
int c = ctl.get();
//当线程池是RUNNING或SHUTDOWN状态,
if (runStateLessThan(c, STOP)) {
//如果非异常状况completedAbruptly=false,也就是没有获取到可执行的任务,则获取线程池允许的最小线程数,如果allowCoreThreadTimeOut为true说明允许核心线程超时,则最小线程数为0,否则最小线程数为corePoolSize;
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果allowCoreThreadTimeOut=true,且任务队列有任务要执行,则将最最小线程数设置为1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果当前线程数大于等于最小线程数,则直接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//以下两种情况会添加一个新的线程
//1.worker是异常结束;
//2.如果是非异常结束,且任务队列里面还有任务,
addWorker(null, false);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
线程池的关闭
线程池的关闭有两个方法shutdown() 与shutdownNow() ;
shutdown会将线程池状态设置为SHUTDOWN状态,然后中断所有空闲线程,然后执行tryTerminate()方法(tryTerminate这个方法很重要,会在后面分析),来尝试终止线程池;
shutdownNow会将线程池状态设置为STOP状态,然后中断所有线程(不管有没有执行任务都设置为中断状态),然后执行tryTerminate()方法,来尝试终止线程池;
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 线程池状态设为SHUTDOWN,如果已经是shutdown<stop<tidying<terminated,也就是非RUNING状态则直接返回
advanceRunState(SHUTDOWN);
// 中断空闲的没有执行任务的线程
interruptIdleWorkers();
onShutdown(); //空方法,子类覆盖实现
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// STOP状态:不再接受新任务且不再执行队列中的任务。
advanceRunState(STOP);
// 中断所有线程,无论空闲还是在执行任务
interruptWorkers();
// 将任务队列清空,并返回队列中还没有被执行的任务。
tasks = drainQueue();
}finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
这两个方法可以直接调用,来关闭线程池;shutdown方法还会在线程池被垃圾回收时调用,因为ThreadPoolExecuter重写了finalize方法
protected void finalize() {
shutdown();
}
- 1
- 2
- 3
关于finalize方法说明:
垃圾回收时,如果判断对象不可达,且覆盖了finalize方法,则会将对象放入到F-Queue队列 ,有一个名为”Finalizer”的守护线程执行finalize方法,它的优先级为8,做最后的清理工作,执行finalize方法完毕后,GC会再次判断该对象是否可达,若不可达,则进行回收,否则,对象复活
注意:网上很多人说 ,Finalizer线程的优先级低,个人认为这是不对的,Finalizer线程在jdk1.8的优先级是8,比我们创建线程默认优先级5要高,之前其它版本的jdk我记得导出的线程栈信息里面优先级是5,忘记是哪个版本的jdk了,即使是5优先级也不比自建的线程默认优先级低,总之我没见过优先级低于5的Finalizer线程;
这个线程会不停的循环等待java.lang.ref.Finalizer.ReferenceQueue中的新增对象。一旦Finalizer线程发现队列中出现了新的对象,它会弹出该对象,调用它的finalize()方法,将该引用从Finalizer类中移除,因此下次GC再执行的时候,这个Finalizer实例以及它引用的那个对象就可以回垃圾回收掉了。
大多数时候,Finalizer线程能够赶在下次GC带来更多的Finalizer对象前清空这个队列,但是当它的处理速度没法赶上新对象创建的速度,对象创建的速度要比Finalizer线程调用finalize()结束它们的速度要快,这导致最后堆中所有可用的空间都被耗尽了;
当我们大量线程频繁创建重写了finalizer()方法的对象的情况下,高并发情况下,它可能会导致你内存的溢出;虽然Finalizer线程优先级高,但是毕竟它只有一个线程;最典型的例子就是数据库连接池,proxool,对要释放资源的操作加了锁,并在finalized方法中调用该加锁方法,在高并发情况下,锁竞争严重,finalized竞争到锁的几率减少,finalized无法立即释放资源,越来越多的对象finalized()方法无法被执行,资源无法被回收,最终导致导致oom;所以覆盖finalized方法,执行一定要快,不能有锁竞争的操作,否则在高并发下死的很惨;
(proxool使用了cglib,它用WrappedConnection代理实际的Conneciton。在运行WrappedConnection的方法时,包括其finalize方法,都会调用Conneciton.isClosed()方法去判断是否真的需要执行某些操作。不幸的是JDBC中的这个方法是同步的,锁是连接对象本身。于是, Finalizer线程回收刚执行过的WrappedConnection对象时就总会与还在使用Connection的各个工作线程争用锁。)
线程池中线程的中断
线程池的中断也有两个方法
interruptIdleWorkers 中断没有执行任务的线程;
interruptWorkers 中断所有线程,不管线程有没有执行任务;
//中断空闲线程,没有执行任务的线程会被中断,onlyOne参数用来标识是否只中断一个线程;
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历所有的Worker线程
for (Worker w : workers) {
Thread t = w.thread;
//如果线程没有被中断,w.tryLock()会调用tryAcquire()方法尝试加锁,加锁成功后会中断线程
//为什么要w.tryLock(),因为在runWorker()方法的while循环执行任务之前会加锁,如果已经被加锁说明线程正在执行任务,不能被中断;
if (!t.isInterrupted() && w.tryLock()) {
try {
//中断线程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
//如果 onlyOne为true, for循环只执行一次就退出
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
/****
* 中断所有正在运行的线程,注意,这里与interruptIdelWorkers()方法不同的是,没有使用worker的AQS锁
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
尝试终止线程池tryTerminate
该方法会在很多地方调用,如添加worker线程失败的addWorkerFailed()方法,worker线程跳出执行任务的while 循环退出时的processWorkerExit()方法,关闭线程池的shutdown()和shutdownNow()方法,从任务队列移除任务的remove()方法;
该方法的作用是检测当前线程池的状态是否可以将线程池终止,如果可以终止则尝试着去终止线程,否则直接返回;
STOP-》TIDYING 与SHUTDOWN-》TIDYING状态的转换,就是在该方法中实现的,最终执行terminated()方法后会把线程状态设置为TERMINATED的状态;
尝试终止线程池执行过程;
一、重点内容先判断线程池的状态是否允许被终止
以下状态不可被终止:
1.如果线程池的状态是RUNNING(不可终止)
或者是TIDYING(该状态一定执行过了tryTerminate方法,正在执行或即将执行terminated()方法,所以不需要重复执行),
或者是TERMINATED(该状态已经执行完成terminated()钩子方法,已经是被终止状态了),
以上三种状态直接返回。
2.如果线程池状态是SHUTDOWN,而且任务队列不是空的(该状态需要继续处理任务队列中的任务,不可被终止),也直接返回。
- 1
- 2
- 3
- 4
- 5
- 6
以下两种状态线程池可以被终止:
1.如果线程池状态是SHUTDOWN,而且任务队列是空的(shutdown状态下,任务队列为空,可以被终止),向下进行。
2.如果线程池状态是STOP(该状态下,不接收新任务,不执行任务队列中的任务,并中断正在执行中的线程,可以被终止),向下进行。
- 1
- 2
- 3
二、线程池状态可以被终止,如果线程池中仍然有线程,则尝试中断线程池中的线程
则尝试中断一个线程然后返回,被中断的这个线程执行完成退出后,又会调用tryTerminate()方法,中断其它线程,直到线程池中的线程数为0,则继续往下执行;
三、如果线程池中的线程为0,则将状态设置为TIDYING,设置成功后执行 terminated()方法,最后将线程状态设置为TERMINATED
源码如下:
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//先判断是否满足终止线程池的条件
//1.如果线程池的状态是RUNNING(不可终止)或者是TIDYING(该状态的线程池即将要执行或正在执行terminated()钩子方法),TERMINATED(该状态已经执行完成terminated()钩子方法),直接返回。
//2.如果线程池状态是SHUTDOWN,而且任务队列不是空的(该状态需要继续处理任务队列中的任务,不可被终止),也直接返回。
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//以下状态才会继续执行:
//1.如果线程池状态是SHUTDOWN,而且任务队列是空的(shutdown状态下,任务队列为空,可以被终止),向下进行。
//2.如果线程池状态是STOP(该状态下,不接收新任务,不执行任务队列中的任务,并中断正在执行中的线程,可以被终止),向下进行。
// workerCount不为0则还不能停止线程池,而且这时线程都处于空闲等待的状态
// 需要中断让线程“醒”过来,醒过来的线程才能继续处理shutdown的信号。
if (workerCountOf(c) != 0) { // Eligible to terminate
// runWoker方法中w.unlock就是为了可以被中断,getTask方法也处理了中断。
// ONLY_ONE:这里只需要中断1个线程去处理shutdown信号就可以了。
interruptIdleWorkers(ONLY_ONE);
return;
}
//满足以下两个条件才会继续执行
//1.线程池状态是STOP且 工作线程池中的线程wc是0
//2.线程池状态是SHUTDOWN而且工作线程池wc(pool)和任务队列(queue)都是空的
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//进入TIDYING状态,线程池的状态被原子操作ctl.compareAndSet(c, ctlOf(TIDYING, 0)将状态设置为TIDYING,(因为tryTerminate方法会在多处调用,存在竞争)
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//进入TERMINATED状态
//进一步在terminated结束之后的finally块中通过ctl.set(ctlOf(TERMINATED, 0))设置为TERMINATED。
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll(); //最后执行termination.signalAll(),会唤醒awaitTermination方法中由于执行termination.awaitNanos(nanos)操作进入等待状态的线程
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
拒绝策略
以下两种情况会执行拒绝任务操作:
- 如果当前线程池状态为非RUNNING装状态
- 当队列满了,workder线程数到了最大值,而且没有空闲的worker线程执行任务:
有内置的以下四种拒绝策略:
AbortPolicy 抛出异常RejectedExecutionException (默认策略)
CallerRunsPolicy 当前生产者线程执行 (如果线程池被关闭了,以后任务就都要由生产者线程自己去执行了)
DiscardOldestPolicy 将队列中最后一个任务出队,将新的任务入队 (直接丢掉一个旧的,接收一个新的,场景少吧)
DiscardPolicy 什么都不做,相当于忽略当前任务(估计没人愿意这样做)
当然我们也可以通过实现RejectedExecutionHandler类的rejectedExecution方法来实现我们自己的拒绝策略
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
*
* <p>In the absence of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
线程状态
线程池提供了一些方法监视线程池的状态,如下所示:
ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(10));
// 当前线程池中的工作线程数;也就是返回成员变量private final HashSet<Worker> workers = new HashSet<Worker>()的大小
pool.getPoolSize();
// 队列中的任务; 也就是返回成员变量 private final BlockingQueue<Runnable> workQueue;的大小
pool.getQueue().size();
// 线程正在执行的任务; 遍历workers,返回加锁的worker数量(加锁,说明这个线程正在执行任务)
pool.getActiveCount();
// 已经执行完成的任务; ThreadPoolExecutor.completedTaskCount+每个Worker.completedTasks ,线程池记录的完成任务数量和每个worker线程记录的完成的任务的数量;
pool.getCompletedTaskCount();
// 全部的任务数,队列任务+正在执行+已经执行完成
pool.getTaskCount();
// 核心线程数;
pool.getCorePoolSize();
// 最大线程数;
pool.getMaximumPoolSize();
// 线程池中曾经最大的线程数量;
pool.getLargestPoolSize();
// 线程超时时间
pool.getKeepAliveTime(TimeUnit.SECONDS);
// 是否允许coreThread超时;
pool.allowsCoreThreadTimeOut();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
注意任务执行失败也会计数,完成的任务数包含实行失败的任务;
一个线程池实例管理类
自己写了一个管理类,还不完善,先放这里:
package com.zqz.studycheck.threadpool;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 连接池管理类
*
* @author zqz
*
*/
public class ZQZThreadPool extends ThreadPoolExecutor {
public static final AtomicBoolean lock = new AtomicBoolean();
public static final Map<String, ThreadPoolExecutor> poolManager = new ConcurrentHashMap<String, ThreadPoolExecutor>();
public static final ThreadFactory defaultThreadFactor = new DefaultThreadFactory();
/**
* 获取连接池实例
*
* @param poolName
* 自定义连接池名称前缀 ,更好的区分不同的连接池;
* @param corePoolSize
* 核心线程数
* @param maximumPoolSize
* 最大线程数
* @param keepAliveTime
* 空闲线程存活时间,单位是秒
* @param workQueue
* 任务队列
* @return
*/
public static ThreadPoolExecutor getInstance(String poolName, int corePoolSize, int maximumPoolSize,
long keepAliveTime, BlockingQueue<Runnable> workQueue) {
while (!lock.compareAndSet(false, true))
;
ThreadPoolExecutor pool = poolManager.get(poolName);
try {
if (pool == null) {
pool = new ZQZThreadPool(poolName, corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
workQueue, defaultThreadFactor);
poolManager.put(poolName, pool);
return pool;
} else {
return poolManager.get(poolName);
}
} finally {
lock.compareAndSet(true, false);
}
}
/**
* 私有构造方法
* @param poolName
* @param corePoolSize
* @param maximumPoolSize
* @param keepAliveTime
* @param unit
* @param workQueue
* @param threadFactory
*/
private ZQZThreadPool(String poolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
if (!poolManager.containsKey(poolName)) {
poolManager.put(poolName, this);
}
}
/**
* 返回指定线程池状态
* @param name
* @return
*/
public static PoolInfo monitor(String name) {
ThreadPoolExecutor pool = poolManager.get(name);
if (pool == null)
return null;
PoolInfo poolInfo = new PoolInfo();
// 当前线程池中的工作线程数;
poolInfo.setPoolSize(pool.getPoolSize());
// 队列中的任务;
poolInfo.setQueueSize(pool.getQueue().size());
// 线程正在执行的任务;
poolInfo.setActiveCount(pool.getActiveCount());
// 已经执行完成的任务;
poolInfo.setCompletedTaskCount(pool.getCompletedTaskCount());
// 是否允许coreThread超时;
poolInfo.setAllowsCoreThreadTimeOut(pool.allowsCoreThreadTimeOut());
// 核心线程数;
poolInfo.setCorePoolSize(pool.getCorePoolSize());
// 最大线程数;
poolInfo.setMaximumPoolSize(pool.getMaximumPoolSize());
// 线程池中曾经最大的线程数量;
poolInfo.setLargestPoolSize(pool.getLargestPoolSize());
// 线程超时时间
poolInfo.setKeepAliveTime(pool.getKeepAliveTime(TimeUnit.SECONDS));
// 全部的任务数,队列任务+正在执行+已经执行完成
pool.getTaskCount();
return poolInfo;
}
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private int stackSize = 0;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "zqz-pool-" + poolNumber.getAndIncrement() + "-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), stackSize);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
/**
* 测试
* @param args
*/
public static void main(String[] args) {
String poolName = "test";
ThreadPoolExecutor pool = ZQZThreadPool.getInstance(poolName, 1, 10, 0, new ArrayBlockingQueue<Runnable>(10));
pool.execute(new Runnable() {
public void run() {
System.out.println("test");
}
});
PoolInfo poolInfo = ZQZThreadPool.monitor(poolName);
System.out.println(poolInfo.getPoolSize());
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
未完待续
上一篇: HashMap底层原理(转帖)
下一篇: 前端框架Vue.js学习(转帖)