欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ThreadPoolExecuter的实现原理(ZT)

程序员文章站 2022-04-03 14:55:04
...

转帖地址:https://blog.csdn.net/zqz_zqz/article/details/69488570?locationNum=12&fps=1

 

线程池基本在每个应用中都会用到,而线程池涉及到的细节非常多,要想用好它,仅仅是了解它的api调用是不行的,而且如果你经常分析java线程堆栈,不了解线程池,那么涉及到线程池堆栈的代码也很难看懂,所以作为java程序员,应该好好研究下线程池的实现!

类结构图

ThreadPoolExecuter的实现原理(ZT)
            
    
    博客分类: ThreadPoolExecuter 线程池ThreadPoolExecuter 

示例

public class ThreadPoolTest {
    //固定大小的线程池:
    //初始化一个指定线程数的线程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作为阻塞队列,当线程池没有可执行任务时,也不会释放线程。
    private static ExecutorService executor = Executors.newFixedThreadPool(10);
    private static ThreadPoolExecutor executor_ = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

    //缓存线程池:
    //初始化一个可以缓存线程的线程池,默认缓存60s,线程池的线程数可达到Integer.MAX_VALUE,即2147483647,内部使用SynchronousQueue作为阻塞队列;
    //和newFixedThreadPool创建的线程池不同,newCachedThreadPool在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销;
    //所以,使用该线程池时,一定要注意控制并发的任务数,否则创建大量的线程可能导致严重的性能问题;
    private static ExecutorService executor2 = Executors.newCachedThreadPool();
    private static ExecutorService executor2_ = new ThreadPoolExecutor(0,
Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());

    //初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行,内部使用LinkedBlockingQueue作为阻塞队列。
    private static ExecutorService executor3 = Executors.newSingleThreadExecutor();
//  因为FinalizableDelegatedExecutorService类是不可直接访问的,这样写会报错,所以注释掉
//  private static ExecutorService executor3_ = new FinalizableDelegatedExecutorService
//            (new ThreadPoolExecutor(1, 1,
//                    0L, TimeUnit.MILLISECONDS,
//                    new LinkedBlockingQueue<Runnable>()));

    //定时任务线程池:
    //初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据。
    private static ScheduledExecutorService executor4 = Executors.newScheduledThreadPool(5);
//  因为new DelayedWorkQueue()这个类是内部类所以这里也不可以直接这样写,这样写是为了让大家了解它的实现本质
//  private static ExecutorService executor4_ = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 0, NANOSECONDS,
//            new DelayedWorkQueue());  


    public static void main(String[] args){
        if(!executor.isShutdown())
            executor.execute(new Task());
        Future f = executor.submit(new Task());
        executor_.execute(new Task());
        executor_.getLargestPoolSize();
    }

    static class Task implements Runnable{
        public void run(){
            System.out.println(Thread.currentThread().getName());
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

自带线程池的各种坑

  • Executors.newFixedThreadPool(10);

固定大小的线程池,它的实现

new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
  • 1

初始化一个指定线程数的线程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作为阻塞队列,超时时间为0,当线程池没有可执行任务时,也不会释放线程。 
因为队列LinkedBlockingQueue大小为默认的Integer.MAX_VALUE,可以无限的往里面添加任务,直到内存溢出;

  • Executors.newCachedThreadPool();

缓存线程池,它的实现:

new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
  • 1

初始化一个可以缓存线程的线程池,默认超时时间60s,线程池的最小线程数时0,但是最大线程数为Integer.MAX_VALUE,即2147483647,内部使用SynchronousQueue作为阻塞队列;

因为线程池的最大值了Integer.MAX_VALUE,会导致无限创建线程;所以,使用该线程池时,一定要注意控制并发的任务数,如果短时有大量任务要执行,就会创建大量的线程,导致严重的性能问题(线程上下文切换带来的开销),线程创建占用堆外内存,如果任务对象也不小,它就会使堆外内存和堆内内存其中的一个先耗尽,导致oom;

  • Executors.newSingleThreadExecutor()

单线程线程池,它的实现

new FinalizableDelegatedExecutorService(
    new ThreadPoolExecutor(1, 1,0L, 
                        TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>()
                        )
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

同newFixedThreadPool线程池一样,队列用的是LinkedBlockingQueue,队列大小为默认的Integer.MAX_VALUE,可以无限的往里面添加任务,直到内存溢出;

基础参数

我们先来总结一下线程池的这些参数,后面再上源码就好理解了 
core,maxPoolSize,keepalive 
执行任务时 
1. 如果线程池中线程数量 < core,新建一个线程执行任务; 
2. 如果线程池中线程数量 >= core ,则将任务放入任务队列 
3. 如果线程池中线程数量 >= core 且 < maxPoolSize,且任务队列满了,则创建新的线程; 
4. 如果线程池中线程数量 > core ,当线程空闲时间超过了keepalive时,则会销毁线程;由此可见线程池的队列如果是*队列,那么设置线程池最大数量是无效的;

源码分析java.util.concurrent.ThreadPoolExecutor

这是最常用的一个类,我们建立的线程池大部分都是用它实现的,所以重点来分析下这个类的源码;

构造方法

它的构造方法有很多,但是最终调用的都是下面这个构造方法

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

参数说明

  • corePoolSize(核心线程池大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。

  • maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了*的任务队列这个参数就没什么效果。

  • ThreadFactory:用于设置创建线程的工厂。 默认使用Executors内部类DefaultThreadFactory,可以通过实现ThreadFactory接口,写自己的Factory,通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常又帮助;

  • keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。

  • TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

  • workQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。

    1.ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
    2.LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    3.SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    4.PriorityBlockingQueue:一个具有优先级得无限阻塞队列。
    
    • 1
    • 2
    • 3
    • 4
    • 5
  • RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。

        这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是提供的四种策略。
        1.AbortPolicy:直接抛出异常。默认策略
        2.CallerRunsPolicy:只用调用者所在线程来运行任务。
        3.DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
        4.DiscardPolicy:不处理,丢弃掉。
        当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

重要的成员变量

先看下重要的成员变量ctl及其相关常量

ctl

它记录了当前线程池的运行状态和线程池内的线程数;一个变量是怎么记录两个值的呢?它是一个AtomicInteger 类型,有32个字节,这个32个字节中,高3位用来标识线程池的运行状态,低29位用来标识线程池内当前存在的线程数;

//利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));   
  • 1
  • 2
  • 3

线程池状态

线程池有5种状态,这五种状态由五个静态常量标识,每种状态的值的大小 
RUNNING < shutdown < stop < tidying < terminated;

//32-3 = 29 ,低位29位存储线程池中线程数
private static final int COUNT_BITS = Integer.SIZE - 3;   
//线程池最多可以有536870911个线程,一般绝对创建不到这么大
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

//RUNNING线程池能接受新任务(只有running状态才会接收新任务),并且可以运行队列中的任务
//-1的二进制为32个1,移位后为:11100000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;

//SHUTDOWN不再接受新任务,但仍可以执行队列中的任务
//0的二进制为32个0,移位后还是全0
private static final int SHUTDOWN = 0 << COUNT_BITS;

//STOP不再接受新任务,不再执行队列中的任务,而且要中断正在处理的任务
//1的二进制为前面31个0,最后一个1,移位后为:00100000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;

//TIDYING所有任务均已终止,workerCount的值为0,转到TIDYING状态的线程即将要执行terminated()钩子方法.
//2的二进制为01000000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;

//TERMINATED terminated()方法执行结束.
//3移位后01100000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

要牢记以下几点:

  1. 只有RUNNING状态下才会接收新任务;
  2. 只有RUNNING状态和SHUTDOWN状态才会执行任务队列中的任务;
  3. 其它状态都不会接收新任务,不会执行任务队列中的任务;

状态之间转换关系如下

  • RUNNING -> SHUTDOWN 
    调用了shutdown方法,线程池实现了finalize方法,在里面调用了shutdown方法,因此shutdown可能是在finalize中被隐式调用的 
    (RUNNING or SHUTDOWN) -> STOP 
    调用了shutdownNow方法
  • SHUTDOWN -> TIDYING 
    当队列和线程池均为空的时候
  • STOP -> TIDYING 
    当线程池为空的时候
  • TIDYING -> TERMINATED 
    处于TIDYING状态后最终会进入TERMINATED状态

与ctl相关的三个方法

//获取线程池的状态,也就是将ctl低29位都置为0后的值
private static int runStateOf(int c)     { return c & ~CAPACITY; } 
//获取线程池中线程数,也就是ctl低29位的值
private static int workerCountOf(int c)  { return c & CAPACITY; }  
//设置ctl的值,rs为线程池状态,wc为线程数;
private static int ctlOf(int rs, int wc) { return rs | wc; }       
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

workers

用来存储线程池中的线程,线程都被封装成了Worker对象
private final HashSet<Worker> workers = new HashSet<Worker>();
  • 1
  • 2
  • 3

completedTaskCount

//记录了已经销毁的线程,完成的任务总数;
private long completedTaskCount;
  • 1
  • 2
  • 3

线程池的运行

前面内容都是理解源码的基础,下面开始讲解重要的运行方法,阅读前了解前面的内容才能更好的理解下面方法的运行原理;

添加任务execute方法

线程池是调用该方法来添加任务的,所以我们就从这个方法看起; 
它传入的参数为实现了Runnable接口的对象,要执行的任务写在它的run方法中;


    //添加新任务
    public void execute(Runnable command) {
        //如果任务为null直接抛出异常
        if (command == null)
            throw new NullPointerException();
        //获取当前线程池的ctl值,不知道它作用的看前面说明
        int c = ctl.get();

        //如果当前线程数小于核心线程数,这时候任务不会进入任务队列,会创建新的工作线程直接执行任务;
        if (workerCountOf(c) < corePoolSize) { 
            //添加新的工作线程执行任务,addWorker方法后面分析
            if (addWorker(command, true))
                return;
            //addWorker操作返回false,说明添加新的工作线程失败,则获取当前线程池状态;(线程池数量小于corePoolSize情况下,创建新的工作线程失败,是因为线程池的状态发生了改变,已经处于非Running状态,或shutdown状态且任务队列为空)
            c = ctl.get();
        }

        //以下两种情况继续执行后面代码
        //1.前面的判断中,线程池中线程数小于核心线程数,并且创建新的工作线程失败;
        //2.前面的判断中,线程池中线程数大于等于核心线程数

        //线程池处于RUNNING状态,说明线程池中线程已经>=corePoolSize,这时候要将任务放入队列中,等待执行;
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //再次检查线程池的状态,如果线程池状态变了,非RUNNING状态下不会接收新的任务,需要将任务移除,成功从队列中删除任务,则执行reject方法处理任务;
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)//如果线程池的状态没有改变,且池中无线程
            // 两种情况进入以该分支
            //1.线程池处于RUNNING状态,线程池中没有线程了,因为有新任务进入队列所以要创建工作线程(这时候新任务已经在队列中,所以下面创建worker线程时第一个参数,要执行的任务为null,只是创建一个新的工作线程并启动它,让它自己去队列中取任务执行)
            //2.线程池处于非RUNNING状态但是任务移除失败,导致任务队列中仍然有任务,但是线程池中的线程数为0,则创建新的工作线程,处理队列中的任务;
                addWorker(null, false);
        // 两种情况执行下面分支:
        // 1.非RUNNING状态拒绝新的任务,并且无法创建新的线程,则拒绝任务
        // 2.线程池处于RUNNING状态,线程池线程数量已经大于等于coresize,任务就需要放入队列,如果任务入队失败,说明队列满了,则创建新的线程,创建成功则新线程继续执行任务,如果创建失败说明线程池中线程数已经超过maximumPoolSize,则拒绝任务
        }else if (!addWorker(command, false))
            reject(command);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

往线程池添加线程addWorker方法

往线程池中添加工作线程,线程会被封装成Worker对象,放入到works线程池中(可以先看下一小节“内部类Worker”的实现后再看这个方法,也可以先不用管Worker类,先看addWorker的实现过程); 
它的执行过程如下:

  • 增加线程时,先判断当前线程池的状态允不允许创建新的线程,如果允许再判断线程池有没有达到 限制,如果条件都满足,才继续执行;
  • 先增加线程数计数ctl,增加计数成功后,才会去创建线程;
  • 创建线程是通过work对象来创建的,创建成功后,将work对象放入到works线程池中(就是一个hashSet);
  • 添加完成后,更新largestPoolSize值(线程池中创建过的线程最大数量),最后启动线程,如果参数firstTask不为null,则执行第一个要执行的任务,然后循环去任务队列中取任务来执行; 

成功添加worker工作线程需要线程池处于以下两种状态中的一种

  1. 线程池处于RUNNING状态
  2. 线程池处于SHUTDOWN状态,且创建线程的时候没有传入新的任务(此状态下不接收新任务),且任务队列不为空(此状态下,要执行完任务队列中的剩余任务才能关闭);


    private boolean addWorker(Runnable firstTask, boolean core) {
        //以下for循环,增加线程数计数,ctl,只增加计数,不增加线程,只有增加计数成功,才会增加线程
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            //这个代码块的判断,如果是STOP,TIDYING和TERMINATED这三种状态,都会返回false。(这几种状态不会接收新任务,也不再执行队列中的任务,中断当前执行的任务)
            //如果是SHUTDOWN,firstTask不为空(SHUTDOWN状态下,不会接收新任务)或 者workQueue是空(队列里面都没有任务了,也就不需要线程了),返回false。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            //只有满足以下两种条件才会继续创建worker线程对象
            //1.RUNNING状态,
            //2.shutdown状态,且firstTask为null(因为shutdown状态下不再接收新任务),队列不是空(shutdown状态下需要继续处理队列中的任务)
             通过自旋的方式增加线程池线程数
            for (;;) {
                int wc = workerCountOf(c);
                //1.如果线程数大于最大可创建的线程数CAPACITY,直接返回false;
                //2.判断当前是要根据corePoolSize,还是maximumPoolSize进行创建线程(corePoolSize是基本线程池大小,未达到corePoolSize前按照corePollSize来限制线程池大小,达到corePoolSize后,并且任务队列也满了,才会按照maximumPoolSize限制线程池大小)
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))//将WorkerCount通过CAS操作增加1,成功的话直接跳出两层循环;
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)//否则则判断当前线程池的状态,如果现在获取到的状态与进入自旋的状态不一致的话,那么则通过continue retry重新进行状态的判断
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        //以下代码块是创建Worker线程对象,并启动

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask); //创建一个新的Worker对象
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock(); //获取线程池的重入锁后,
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    // RUNNING状态 || SHUTDONW状态下,没有新的任务,只是处理任务队列中剩余的任务;
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //如果线程是活动状态,直接抛出异常,因为线程刚创建,还没有执行start方法,一定不会是活动状态; 
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        // 将新启动的线程添加到线程池中
                        workers.add(w); 
                        // 更新largestPoolSize的值,largestPoolSize成员变量保存线程池中创建过的线程最大数量
                        int s = workers.size();
                        //将线程池中创建过的线程最大数量,设置给largestPoolSize,可以通过getLargestPoolSize()方法获取,注意这个方法只能在 ThreadPoolExecutor中调用,Executer,ExecuterService,AbstractExecutorService中都是没有这个方法的
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 启动新添加的线程,这个线程首先执行firstTask,然后不停的从队列中取任务执行
                // 当等待keepAlieTime还没有任务执行则该线程结束。见runWoker和getTask方法的代码。
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        }finally {
            if (! workerStarted)
                addWorkerFailed(w);
            }
            return workerStarted;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83

内部类Worker

它是ThreadPoolExecutor的一个内部类

private final class Worker  extends AbstractQueuedSynchronizer implements Runnable
  • 1
  • 2

由它的定义可以知它实现了Runnable接口,是一个线程,还继承了AQS类,实现了加锁机制;

它利用AQS框架实现了一个简单的非重入的互斥锁, 实现互斥锁主要目的是为了中断的时候判断线程是在空闲还是运行,它的state只有三个值 ,初始状态为不可加锁状态-1,无锁状态为0,加锁状态为1,可以看shutdown、shutdownNow、runWorker方法来分析它锁的作用。

Worker的构造方法

构造方法里面要重点关注一下getThreadFactory()这个方法

        //参数为Worker线程运行后第一个要执行的任务
        Worker(Runnable firstTask) { 
            //设置ASQ的state为-1  设置worker处于不可加锁的状态,看后面的tryAcquire方法,只有state为0时才允许加锁,worker线程运行以后才会把state置为0
            setState(-1); 
            //设置第一个运行的任务
            this.firstTask = firstTask;  
            //创建线程,将this自己传入进去;getThreadFactory()见后面详解
            this.thread = getThreadFactory().newThread(this); 
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
线程的创建getThreadFactory();

默认会在构造方法中传入Executors.defaultThreadFactory(),该方法然会一个DefaultThreadFactory();

public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
}
  • 1
  • 2
  • 3
 static class DefaultThreadFactory implements ThreadFactory {
        //线程池编号
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        //线程池中线程所属线程组
        private final ThreadGroup group;
        //线程池中线程编号
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        //线程名称前缀
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            //设置线程名称为"pool-线程池的编号-thread-线程的编号"
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        //创建新的线程
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            //设置为非守护线程
            if (t.isDaemon())
                t.setDaemon(false);
            //设置优先级为NORMAL为5
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

一般我们最好不要用默认的线程池,可以继承该类,给线程指定一个识别度高的名字,出了问题好排查;

Worker的成员变量

 //被封装的线程,就是它自己;
 final Thread thread;
 //传入的它要执行的第一个任务,如果firstTask为空就从任务队列中取任务执行
 Runnable firstTask;
 //记录执行完成的任务数量,如果执行任务过程中出现异常,仍然会计数;
 volatile long completedTasks
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

worker线程的加锁解锁

worker的加锁解锁机制是基于AQS框架的,要完全弄明白它的加锁解锁机制请看AQS框架的实现,在这里只是简单介绍一下:

        //尝试加锁方法,将状态从0设置为1;如果不是0则加锁失败,在worker线程没有启动前是-1状态,无法加锁
        //该方法重写了父类AQS的同名方法
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        //尝试释放锁的方法,直接将state置为0
        //该方法重写了父类AQS的同名方法
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        //注意:tryAcquire与tryRelease是重写了AQS父类的方法,且不可以直接调用,它们被以下方法调用实现加锁解锁操作

        //加锁:acquire法是它父类AQS类的方法,会调用tryAcquire方法加锁
        public void lock()        { acquire(1); }
        //尝试加锁
        public boolean tryLock()  { return tryAcquire(1); }
        //解锁:release方法是它父类AQS类的方法,会调用tryRelease方法
        public void unlock()      { release(1); }
        //返回锁状态
        public boolean isLocked() { return isHeldExclusively(); }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

Worker线程执行任务runWorker (重要)

看完了Worker线程的创建,再来看看Worker线程的运行,Worker的run方法中会调用runWorker方法来获循环取任务并执行;

   final void runWorker(Worker w) {
        //当前线程
        Thread wt = Thread.currentThread();
        //获取当前Worker线程创建时,指定的第一个要执行的任务,也可以不指定任务,那么它自己就会去任务队列中取任务;
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 在构造方法里面将state设置为了-1,执行该方法就将state置为了0,这样就可以加锁了,-1状态下是无法加锁的,看Worker类的tryAcquire方法
        w.unlock(); 
        //该变量代表任务执行是否发生异常,默认值为true发生了异常,后面会用到这个变量
        boolean completedAbruptly = true;
        try {
            //如果创建worker时传入了第一个任务,则执行第一个任务,否则 从任务队列中获取任务getTask(),getTask()后面分析;
            while (task != null || (task = getTask()) != null) { 
                //线程加锁
                w.lock();
                /**
                 * 先判断线程池状态是否允许继续执行任务:
                 * 1.如果是stop<tidying<terminated(这种状态是不接受任务,且不执行任务的),并且线程是非中断状态
                 * 2.shutingdown,runing ,处于中断状态(并复位中断标志),如果这个时候其它线程执行了shutdownNow方法,shutdownNow方法会把状态设置为STOP
                 * 
                 * 这个时候则中断线程
                 **/
                if ((   
                        runStateAtLeast(ctl.get(), STOP) || 
                        (
                            Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) 
                        ) 

                    ) 
                     &&
                    !wt.isInterrupted())
                    wt.interrupt();

                /**
                 *开始执行任务
                 */

                try {
                    //任务执行前要做的处理:这个方法是空的,什么都不做,一般会通过继承ThreadPoolExecute类后重写该方法实现自己的功能;传入参数为当前线程与要执行的任务
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //任务执行后要做的处理:这个方法也是空的,什么都不做,一般会通过继承ThreadPoolExecute类后重写该方法实现自己的功能;参数为当前任务和执行任务时抛出的异常
                        afterExecute(task, thrown); 
                    }
                } finally {
                    task = null;
                    //增加完成任务计数
                    w.completedTasks++;   
                    w.unlock();
                }
            }

            /**
             *退出while循环,线程结束;
             **/

            //判断task.run()方法是否抛出了异常,如果没有则设置它为false,如果发生了异常,前面会直接抛出,中断方法继续执行,就不会执行下面这句;
            completedAbruptly = false;
        } finally {
            /**
             * 线程退出后的处理
             */
            processWorkerExit(w, completedAbruptly);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74

需要注意的是,线程如果执行任务过程中,业务代码抛出了异常,那么会将抛出的异常catch以后抛出,如果是Throwable类型的异常,则会封装成Error抛出,最后此线程退出,但是退出之前会将任务完成数照样+1,然后会在控制台上打印Error或者是RuntimeException 异常,这些异常不会被我们捕获,异常信息只会在控制台打出,不会再我们的log日志中打出; 
所以我们一定要自己去捕获并处理我们的异常,而不能抛出不管;

worker线程从任务队列里面获取任务getTask

从任务队列中获取任务

这是个for循环
1.先判断线程池状态是否允许取任务,不允许直接将线程数量减1 ,直接返回null;
2.若线程池状态允许取任务,则判断当前线程是否超时 ,若线程超时则将线程池数量减1,直接返回null;
3.若没有超时,则去任务队列取任务,取到的话返回任务,若超时则设置超时状态,继续循环,在下次循环中处理超时状态
  • 1
  • 2
  • 3
  • 4
  • 5

     private Runnable getTask() {
        // 如果判断当前线程池状态需要启用超时操作,那么任务队列取任务时使用的是带有超时的workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法,如果超时,则会将timeOut 变量设置为true,在下次执行for循环时根据timeOut来执行超时操作;
        boolean timedOut = false;  

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            /**
            * 以下分支在stop、tidying、terminated状态,或者在SHUTDOWN状态且任务队列为空时 退出当前线程
            * 
            * 判断线程池状态是否允许继续获取任务:
            * RUNNING<shutdown<stop<tidying<terminated;
            * rs >= SHUTDOWN,包含两部分判断操作
            *1.如果是rs > SHUTDOWN,即状态为stop、tidying、terminated;这时不再处理队列中的任务,直接返回null
            *2.如果是rs = SHUTDOWN ,rs>=STOP不成立,这时还需要处理队列中的任务除非队列为空,没有任务要处理,则返回null
            */
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                //自旋锁将ctl减1(也就是将线程池中的线程数减1)
                decrementWorkerCount();
                return null;
            }
            /**
             * 在RUNNING状态 或 shutdown状态且任务队列不为空时继续往下执行执行
             */

            /**
             * 以下做线程超时控制:
             * 启用超时控制需要满足至少一个条件
             * 1.allowCoreThreadTimeOut为true代表核心线程数可以做超时控制;
             * 2.如果当前线程数>corePoolSize核心线程数,也可以做超时控制;
             * 在以上前提下,再判断当前线程是否需要销毁:
             * 1.如果当前线程数大于maximumPoolSize,这肯定是不允许的,需要销毁当前线程;
             * 2.如果当前线程上次执行循环时,取任务操作超时,任务队列是空,需要销毁当前线程;
             */

            //获取线程池中线程数量
            int wc = workerCountOf(c);

            // timed变量用于判断是否需要进行超时控制。
            // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
            // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
            // 对于超过核心线程数量的这些线程,需要进行超时控制; 
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            /*
            * 超时销毁线程需要先满足以下两个条件之一
            * 1. wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
            * 2. timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次循环当前线程从任务队列中获取任务发生了超时,没有取到任务;
            *  满足上面两个条件之一的情况下,接下来判断,如果线程数量大于1,或者线程队列是空的,那么尝试将workerCount减1,减1成功则返回null,退出当前线程; 如果减1失败,则返回继续执行循环操作,重试。
            */
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                //尝试将线程池线程数量减一
                if (compareAndDecrementWorkerCount(c))
                    return null;
                //如果将线程池数量减一不成功则循环重试
                continue;
            }


            /**
             * 如果没有超时,则继续去任务队列取任务执行;
             *取任务操作
             */
            try {
                //根据timed(是否启用超时控制)来判断执行poll操作还是执行take()操作还是执行有时间限制的poll操作,并返回获取到的任务;
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take(); 
                if (r != null)
                    return r;
                //如果poll操作等待超时,没有取到任务;则将timeOut设置为true;
                timedOut = true;
            } catch (InterruptedException retry) {
                //如果是因为线程中断导致没有取到任务;则设置timedOut=false继续执行循环,取任务
                timedOut = false;
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81

Worker线程的退出processWorkerExit

如果是处理任务发生异常导致的退出,则以自旋锁的方式将线程数减1; 
将当前worker执行完成的任务数,累加到completedTaskCount上; 
将当前线程移出线程池; 
尝试终止线程池; 
判断是否要新建workder线程; 
1.如果是RUNNING或SHUTDOWN状态,且worker是异常结束,会直接执行AddWorker操作; 
2.如果是RUNNING或SHUTDOWN状态,且worker是没有任务可做结束的,且allowCoreThreadTimeOut=false,且当前线程池中的线程数小于corePoolSize,则会创建addWorker线程; 
3.判断是否要添加一个新的线程:线程池是RUNNING或SHUTDOWN状态,worker线程如果是异常结束的,则直接添加一个新线程;如果当前线程池中的线程数小于最小线程数,也会创建一个新线程;

 private void processWorkerExit(Worker w, boolean completedAbruptly) {
         // 如果任务运行异常导致则completedAbruptly=true,则将线程池worker线程数减1,如果是没有获取到任务导致的completedAbruptly=false,则会在getTask()方法里面将线程数减1;
        if (completedAbruptly) 
            //自旋锁将ctl减1(也就是将线程池中的线程数减1)
            decrementWorkerCount(); 

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //退出前,将本线程已完成的任务数量,添加到已经完成任务的总数中;
            completedTaskCount += w.completedTasks;
            //线程队列中移除当前线程 
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        //尝试停止线程池
        tryTerminate();

       /**
        *判断是否要增加新的线程
        *如果满足以下条件则新增线程:
        * 一、当线程池是RUNNING或SHUTDOWN状态,且worker是异常结束,那么会直接addWorker;
        * 二、当线程池是RUNNING或SHUTDOWN状态,且worker是没有任务可做结束的;
        *   1.如果allowCoreThreadTimeOut=true,则判断等待队列不为空  ,且当前线程数是否小于1;
        *   2.如果allowCoreThreadTimeOut=false,则判断当前线程数是否小于小于corePoolSize;
        *   如果小于,则会创建addWorker线程;
        **/
        int c = ctl.get();
        //当线程池是RUNNING或SHUTDOWN状态,
        if (runStateLessThan(c, STOP)) {
            //如果非异常状况completedAbruptly=false,也就是没有获取到可执行的任务,则获取线程池允许的最小线程数,如果allowCoreThreadTimeOut为true说明允许核心线程超时,则最小线程数为0,否则最小线程数为corePoolSize;
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                //如果allowCoreThreadTimeOut=true,且任务队列有任务要执行,则将最最小线程数设置为1
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                //如果当前线程数大于等于最小线程数,则直接返回
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //以下两种情况会添加一个新的线程
            //1.worker是异常结束;
            //2.如果是非异常结束,且任务队列里面还有任务,
            addWorker(null, false);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

线程池的关闭

线程池的关闭有两个方法shutdown() 与shutdownNow() ;

shutdown会将线程池状态设置为SHUTDOWN状态,然后中断所有空闲线程,然后执行tryTerminate()方法(tryTerminate这个方法很重要,会在后面分析),来尝试终止线程池;

shutdownNow会将线程池状态设置为STOP状态,然后中断所有线程(不管有没有执行任务都设置为中断状态),然后执行tryTerminate()方法,来尝试终止线程池;

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 线程池状态设为SHUTDOWN,如果已经是shutdown<stop<tidying<terminated,也就是非RUNING状态则直接返回 
            advanceRunState(SHUTDOWN);
            // 中断空闲的没有执行任务的线程
            interruptIdleWorkers();
            onShutdown(); //空方法,子类覆盖实现
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // STOP状态:不再接受新任务且不再执行队列中的任务。
            advanceRunState(STOP);
            // 中断所有线程,无论空闲还是在执行任务
            interruptWorkers();
            // 将任务队列清空,并返回队列中还没有被执行的任务。
            tasks = drainQueue();
        }finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

这两个方法可以直接调用,来关闭线程池;shutdown方法还会在线程池被垃圾回收时调用,因为ThreadPoolExecuter重写了finalize方法

  protected void finalize() {
        shutdown();
  }
  • 1
  • 2
  • 3

关于finalize方法说明: 
垃圾回收时,如果判断对象不可达,且覆盖了finalize方法,则会将对象放入到F-Queue队列 ,有一个名为”Finalizer”的守护线程执行finalize方法,它的优先级为8,做最后的清理工作,执行finalize方法完毕后,GC会再次判断该对象是否可达,若不可达,则进行回收,否则,对象复活 
注意:网上很多人说 ,Finalizer线程的优先级低,个人认为这是不对的,Finalizer线程在jdk1.8的优先级是8,比我们创建线程默认优先级5要高,之前其它版本的jdk我记得导出的线程栈信息里面优先级是5,忘记是哪个版本的jdk了,即使是5优先级也不比自建的线程默认优先级低,总之我没见过优先级低于5的Finalizer线程; 
这个线程会不停的循环等待java.lang.ref.Finalizer.ReferenceQueue中的新增对象。一旦Finalizer线程发现队列中出现了新的对象,它会弹出该对象,调用它的finalize()方法,将该引用从Finalizer类中移除,因此下次GC再执行的时候,这个Finalizer实例以及它引用的那个对象就可以回垃圾回收掉了。 
大多数时候,Finalizer线程能够赶在下次GC带来更多的Finalizer对象前清空这个队列,但是当它的处理速度没法赶上新对象创建的速度,对象创建的速度要比Finalizer线程调用finalize()结束它们的速度要快,这导致最后堆中所有可用的空间都被耗尽了; 
当我们大量线程频繁创建重写了finalizer()方法的对象的情况下,高并发情况下,它可能会导致你内存的溢出;虽然Finalizer线程优先级高,但是毕竟它只有一个线程;最典型的例子就是数据库连接池,proxool,对要释放资源的操作加了锁,并在finalized方法中调用该加锁方法,在高并发情况下,锁竞争严重,finalized竞争到锁的几率减少,finalized无法立即释放资源,越来越多的对象finalized()方法无法被执行,资源无法被回收,最终导致导致oom;所以覆盖finalized方法,执行一定要快,不能有锁竞争的操作,否则在高并发下死的很惨; 
(proxool使用了cglib,它用WrappedConnection代理实际的Conneciton。在运行WrappedConnection的方法时,包括其finalize方法,都会调用Conneciton.isClosed()方法去判断是否真的需要执行某些操作。不幸的是JDBC中的这个方法是同步的,锁是连接对象本身。于是, Finalizer线程回收刚执行过的WrappedConnection对象时就总会与还在使用Connection的各个工作线程争用锁。)

线程池中线程的中断

线程池的中断也有两个方法 
interruptIdleWorkers 中断没有执行任务的线程; 
interruptWorkers 中断所有线程,不管线程有没有执行任务;

   //中断空闲线程,没有执行任务的线程会被中断,onlyOne参数用来标识是否只中断一个线程;
   private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //遍历所有的Worker线程
            for (Worker w : workers) {
                Thread t = w.thread;
                //如果线程没有被中断,w.tryLock()会调用tryAcquire()方法尝试加锁,加锁成功后会中断线程
                //为什么要w.tryLock(),因为在runWorker()方法的while循环执行任务之前会加锁,如果已经被加锁说明线程正在执行任务,不能被中断;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        //中断线程
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                //如果 onlyOne为true, for循环只执行一次就退出
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

    /**** 
     * 中断所有正在运行的线程,注意,这里与interruptIdelWorkers()方法不同的是,没有使用worker的AQS锁
     */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

尝试终止线程池tryTerminate

该方法会在很多地方调用,如添加worker线程失败的addWorkerFailed()方法,worker线程跳出执行任务的while 循环退出时的processWorkerExit()方法,关闭线程池的shutdown()和shutdownNow()方法,从任务队列移除任务的remove()方法;

该方法的作用是检测当前线程池的状态是否可以将线程池终止,如果可以终止则尝试着去终止线程,否则直接返回;

STOP-》TIDYING 与SHUTDOWN-》TIDYING状态的转换,就是在该方法中实现的,最终执行terminated()方法后会把线程状态设置为TERMINATED的状态;

尝试终止线程池执行过程;

一、重点内容先判断线程池的状态是否允许被终止

以下状态不可被终止:

1.如果线程池的状态是RUNNING(不可终止)
   或者是TIDYING(该状态一定执行过了tryTerminate方法,正在执行或即将执行terminated()方法,所以不需要重复执行),
   或者是TERMINATED(该状态已经执行完成terminated()钩子方法,已经是被终止状态了),
   以上三种状态直接返回。
2.如果线程池状态是SHUTDOWN,而且任务队列不是空的(该状态需要继续处理任务队列中的任务,不可被终止),也直接返回。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

以下两种状态线程池可以被终止:

1.如果线程池状态是SHUTDOWN,而且任务队列是空的(shutdown状态下,任务队列为空,可以被终止),向下进行。
2.如果线程池状态是STOP(该状态下,不接收新任务,不执行任务队列中的任务,并中断正在执行中的线程,可以被终止),向下进行。
  • 1
  • 2
  • 3

二、线程池状态可以被终止,如果线程池中仍然有线程,则尝试中断线程池中的线程

则尝试中断一个线程然后返回,被中断的这个线程执行完成退出后,又会调用tryTerminate()方法,中断其它线程,直到线程池中的线程数为0,则继续往下执行;

三、如果线程池中的线程为0,则将状态设置为TIDYING,设置成功后执行 terminated()方法,最后将线程状态设置为TERMINATED 
源码如下:

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            //先判断是否满足终止线程池的条件
            //1.如果线程池的状态是RUNNING(不可终止)或者是TIDYING(该状态的线程池即将要执行或正在执行terminated()钩子方法),TERMINATED(该状态已经执行完成terminated()钩子方法),直接返回。
            //2.如果线程池状态是SHUTDOWN,而且任务队列不是空的(该状态需要继续处理任务队列中的任务,不可被终止),也直接返回。
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;

            //以下状态才会继续执行:
            //1.如果线程池状态是SHUTDOWN,而且任务队列是空的(shutdown状态下,任务队列为空,可以被终止),向下进行。
            //2.如果线程池状态是STOP(该状态下,不接收新任务,不执行任务队列中的任务,并中断正在执行中的线程,可以被终止),向下进行。

            // workerCount不为0则还不能停止线程池,而且这时线程都处于空闲等待的状态
            // 需要中断让线程“醒”过来,醒过来的线程才能继续处理shutdown的信号。
            if (workerCountOf(c) != 0) { // Eligible to terminate 
                // runWoker方法中w.unlock就是为了可以被中断,getTask方法也处理了中断。
                // ONLY_ONE:这里只需要中断1个线程去处理shutdown信号就可以了。 
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            //满足以下两个条件才会继续执行
            //1.线程池状态是STOP且 工作线程池中的线程wc是0
            //2.线程池状态是SHUTDOWN而且工作线程池wc(pool)和任务队列(queue)都是空的
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //进入TIDYING状态,线程池的状态被原子操作ctl.compareAndSet(c, ctlOf(TIDYING, 0)将状态设置为TIDYING,(因为tryTerminate方法会在多处调用,存在竞争)
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        //进入TERMINATED状态
                        //进一步在terminated结束之后的finally块中通过ctl.set(ctlOf(TERMINATED, 0))设置为TERMINATED。
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();  //最后执行termination.signalAll(),会唤醒awaitTermination方法中由于执行termination.awaitNanos(nanos)操作进入等待状态的线程
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

拒绝策略

以下两种情况会执行拒绝任务操作:

  1. 如果当前线程池状态为非RUNNING装状态
  2. 当队列满了,workder线程数到了最大值,而且没有空闲的worker线程执行任务:

有内置的以下四种拒绝策略: 
AbortPolicy 抛出异常RejectedExecutionException (默认策略) 
CallerRunsPolicy 当前生产者线程执行 (如果线程池被关闭了,以后任务就都要由生产者线程自己去执行了) 
DiscardOldestPolicy 将队列中最后一个任务出队,将新的任务入队 (直接丢掉一个旧的,接收一个新的,场景少吧) 
DiscardPolicy 什么都不做,相当于忽略当前任务(估计没人愿意这样做)

当然我们也可以通过实现RejectedExecutionHandler类的rejectedExecution方法来实现我们自己的拒绝策略

public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

线程状态

线程池提供了一些方法监视线程池的状态,如下所示:

ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(10));
// 当前线程池中的工作线程数;也就是返回成员变量private final HashSet<Worker> workers = new HashSet<Worker>()的大小
pool.getPoolSize();
// 队列中的任务; 也就是返回成员变量 private final BlockingQueue<Runnable> workQueue;的大小
pool.getQueue().size();
// 线程正在执行的任务; 遍历workers,返回加锁的worker数量(加锁,说明这个线程正在执行任务)
pool.getActiveCount(); 
// 已经执行完成的任务; ThreadPoolExecutor.completedTaskCount+每个Worker.completedTasks ,线程池记录的完成任务数量和每个worker线程记录的完成的任务的数量;
pool.getCompletedTaskCount();
// 全部的任务数,队列任务+正在执行+已经执行完成
pool.getTaskCount();
// 核心线程数;
pool.getCorePoolSize();
// 最大线程数;
pool.getMaximumPoolSize();
// 线程池中曾经最大的线程数量;
pool.getLargestPoolSize();
// 线程超时时间
pool.getKeepAliveTime(TimeUnit.SECONDS);
// 是否允许coreThread超时;
pool.allowsCoreThreadTimeOut();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

注意任务执行失败也会计数,完成的任务数包含实行失败的任务;

一个线程池实例管理类

自己写了一个管理类,还不完善,先放这里:

package com.zqz.studycheck.threadpool;

import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 连接池管理类
 * 
 * @author zqz
 * 
 */
public class ZQZThreadPool extends ThreadPoolExecutor {

    public static final AtomicBoolean lock = new AtomicBoolean();

    public static final Map<String, ThreadPoolExecutor> poolManager = new ConcurrentHashMap<String, ThreadPoolExecutor>();
    public static final ThreadFactory defaultThreadFactor = new DefaultThreadFactory();

    /**
     * 获取连接池实例
     * 
     * @param poolName
     *            自定义连接池名称前缀 ,更好的区分不同的连接池;
     * @param corePoolSize
     *            核心线程数
     * @param maximumPoolSize
     *            最大线程数
     * @param keepAliveTime
     *            空闲线程存活时间,单位是秒
     * @param workQueue
     *            任务队列
     * @return
     */
    public static ThreadPoolExecutor getInstance(String poolName, int corePoolSize, int maximumPoolSize,
            long keepAliveTime, BlockingQueue<Runnable> workQueue) {
        while (!lock.compareAndSet(false, true))
            ;
        ThreadPoolExecutor pool = poolManager.get(poolName);
        try {
            if (pool == null) {
                pool = new ZQZThreadPool(poolName, corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
                        workQueue, defaultThreadFactor);
                poolManager.put(poolName, pool);
                return pool;
            } else {
                return poolManager.get(poolName);
            }
        } finally {
            lock.compareAndSet(true, false);
        }
    }

    /**
     * 私有构造方法
     * @param poolName
     * @param corePoolSize
     * @param maximumPoolSize
     * @param keepAliveTime
     * @param unit
     * @param workQueue
     * @param threadFactory
     */
    private ZQZThreadPool(String poolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        if (!poolManager.containsKey(poolName)) {
            poolManager.put(poolName, this);
        }
    }

    /**
     * 返回指定线程池状态
     * @param name
     * @return
     */
    public static PoolInfo monitor(String name) {
        ThreadPoolExecutor pool = poolManager.get(name);
        if (pool == null)
            return null;
        PoolInfo poolInfo = new PoolInfo();

        // 当前线程池中的工作线程数;
        poolInfo.setPoolSize(pool.getPoolSize());
        // 队列中的任务;
        poolInfo.setQueueSize(pool.getQueue().size());
        // 线程正在执行的任务;
        poolInfo.setActiveCount(pool.getActiveCount());
        // 已经执行完成的任务;
        poolInfo.setCompletedTaskCount(pool.getCompletedTaskCount());
        // 是否允许coreThread超时;
        poolInfo.setAllowsCoreThreadTimeOut(pool.allowsCoreThreadTimeOut());
        // 核心线程数;
        poolInfo.setCorePoolSize(pool.getCorePoolSize());
        // 最大线程数;
        poolInfo.setMaximumPoolSize(pool.getMaximumPoolSize());
        // 线程池中曾经最大的线程数量;
        poolInfo.setLargestPoolSize(pool.getLargestPoolSize());
        // 线程超时时间
        poolInfo.setKeepAliveTime(pool.getKeepAliveTime(TimeUnit.SECONDS));
        // 全部的任务数,队列任务+正在执行+已经执行完成
        pool.getTaskCount();
        return poolInfo;
    }

    /**
     * The default thread factory
     */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
        private int stackSize = 0;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            namePrefix = "zqz-pool-" + poolNumber.getAndIncrement() + "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), stackSize);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

    /**
     * 测试
     * @param args
     */
    public static void main(String[] args) {
        String poolName = "test";

        ThreadPoolExecutor pool = ZQZThreadPool.getInstance(poolName, 1, 10, 0, new ArrayBlockingQueue<Runnable>(10));

        pool.execute(new Runnable() {

            public void run() {
                System.out.println("test");
            }

        });
        PoolInfo poolInfo = ZQZThreadPool.monitor(poolName);
        System.out.println(poolInfo.getPoolSize());
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159

未完待续