欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java JUC包源码分析 - 线程池ThreadPoolExecutor

程序员文章站 2024-03-02 15:33:40
...

线程池的相关类结构 

Java JUC包源码分析 - 线程池ThreadPoolExecutor

线程池就是存储了已创建指定个数的线程的集合,当需要用线程执行任务的时候,就可以从线程池中拿一个空闲的线程来执行任务。先看下源码,源码分析完了再看线程池的周边应用,以及几个思考。

第一部分,先看下线程池的结构,然后是如何创建线程池。

第二部分,线程池的状态

第三部分,提交任务到线程池的方法

第四部分,关闭线程池

第五部分,拒绝策略

 

第一部分:

public class ThreadPoolExecutor extends AbstractExecutorService {

    // 成员变量
    // 任务队列
    private final BlockingQueue<Runnable> workQueue;
    // 可重入锁
    private final ReentrantLock mainLock = new ReentrantLock();
    // 工作线程集合
    private final HashSet<Worker> workers = new HashSet<Worker>();
    // 阻塞条件
    private final Condition termination = mainLock.newCondition();
    // 记录着最大能达到的的线程池大小
    private int largestPoolSize;
    // 已经执行完的任务个数
    private long completedTaskCount;
    // 线程工厂
    private volatile ThreadFactory threadFactory;
    // 当线程池饱和或者关闭时的拒绝策略
    private volatile RejectedExecutionHandler handler;
    // 空闲线程在空闲时存活的时间
    private volatile long keepAliveTime;
    // 是否允许核心线程超时,默认false。false是当空闲时也保持活着状态,true是核心线程使用        
    // keepAliveTime时间来控制等待任务的超时时间
    private volatile boolean allowCoreThreadTimeOut;
    // 核心线程池的大小
    private volatile int corePoolSize;
    // 最大的线程池大小
    // private volatile int maximumPoolSize;
    // 默认的拒绝策略是中断,抛异常
    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();


    // 构造函数
    /**
    *
    *corePoolSize:核心线程池大小,也就是说有这么多线程一直活着,直到关闭线程池
    *maximumPoolSize:最大的线程数,也就是说除了常驻的线程外,当阻塞队列满的时候,还可以新建 
    *maximumPoolSize-corePoolSize个线程来处理任务
    *keepAliveTime:就是那些可扩展出来的空闲线程当空闲时的存活时间
    *unit:是上面空闲时间的单位
    *workqueue:工作队列,当提交任务后,任务已经占满核心线程来,就会被添加到这个阻塞队列里,等待
    *被线程执行
    *threadfactory:创建线程的工厂,可以自己实现,也可以利用另一个构造函数使用默认的工厂
    *handler:拒绝策略,当阻塞队列满时,这时候新添加任务进来后,对这个任务的拒绝策略,有4种
    */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

    // 这个构造函数使用了默认的线程工厂和默认的拒绝策略,其他两个构造函数是只默认一个参数
    // 默认的线程工厂就是创建的线程是非守护线程,优先级为NORM_PRIORITY,详情看后面介绍
    // 默认的拒绝策略就是当阻塞队列满了,来了新任务,则抛出异常
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {

        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    // 其他两个不看了...
}

Executors.defaultThreadFactory():
    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }

ThreadPoolExecutor的内部类:
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
        // 新建线程时,设置线程为非守护线程,优先级为NORM_PRIORITY
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

// 默认的拒绝策略:
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        // 直接抛异常
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }


// 再看下用Executors创建线程池的方法(不建议用这种方式创建线程池),这个类的设计风格,突然想起 
// effective Java里面的静态工厂方法创建对象:
public class Executors {
    // 创建固定大小的线程池,空闲线程被立即回收,任务阻塞队列是*队列,
    // 最大可以是Integer.MAX_VALUE(问题在于这,可以一直提交任务进去,占满内存)
    // 默认的线程工厂和拒绝策略
    // 还有个重载方法,可以传线程工厂进去
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    // 创建只有一个线程的线程池,问题与上面的fix一样
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    // 创建一个可以无限增加线程数的线程池(问题也很大,想想线程数可以一直创建下去)
    // 空闲线程超过60秒就回收
    // 任务队列是一个栈的形式(非公平),不缓存任务,任务提交后直接创建线程允许
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    // 创建一个可延迟执行的线程池,核心线程池为1,线程池最大可达Integer.MAX_VALUE
    // 可延迟的工作队列
    // 创建委托类管理已创建的可延迟的线程池
    // 延迟相关的内容,下文还会讲到
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
}

// 线程池的子类
public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
    // 延迟执行的线程池,最大线程数固定Integer.MAX_VALUE,任务队列固定为DelayedWorkQueue
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    // 提交延迟任务,不需要返回结果.提交后delay时间后才会执行
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        // 装饰任务,返回第二个参数:任务
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    // 提交延迟任务,需要返回结果。提交后delay时间后才会执行
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     * @throws IllegalArgumentException   {@inheritDoc}
     */
    // 提交后delay时间后才会执行,然后每间隔period时间执行一次,直到线程池关闭
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
    // 延时执行任务
    private void delayedExecute(RunnableScheduledFuture<?> task) {
        // 如果线程池关闭了,就拒绝
        if (isShutdown())
            reject(task);
        else {
            // 获取线程池的任务队列,并把任务添加进去
            super.getQueue().add(task);
            // 再次判断线程池是否关闭了,关闭了并且当前运行状态不能运行,就把任务从队列移除
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                // 新建线程,准备启动任务
                ensurePrestart();
        }
    }

    void ensurePrestart() {
        // 如果工作线程数比核心线程数少就新建线程放到核心线程池中
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        // 如果工作线程数为0,则扩展一个任务为null的线程出来(纯粹新建一个线程),因为最大线程数 
        // 是Integer.MAX_VALUE,所以直接建新线程
        else if (wc == 0)
            addWorker(null, false);
    }

}

第二部分,线程池的状态:

    // ctl记录里两个信息:一个是线程池的状态(高3位),一个是线程池的线程数量(低29位)
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    // 高3位值是111,此状态能够接受新任务,并且对已添加的任务处理
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 高3位值是000,此状态不能接受新任务,但能处理已添加的任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 高3位值是001,此状态不能接受新任务,不能处理已添加的任务,并且会中断正在执行的任务
    private static final int STOP       =  1 << COUNT_BITS;
    // 高3位值是010,当所有任务终止时,线程池会变成TIDYING状态,当线程池变为TIDYING状态时,会执
    // 行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程
    // 池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 高3位值是011,线程池彻底终止就变成TERMINNATED状态
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 附上英文的状态解释:
     *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

看一下线程池状态变化的过程:

Java JUC包源码分析 - 线程池ThreadPoolExecutor

第三部分,提交任务到线程池:

   public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        // 计算工作线程的数量,如果工作线程数小于核心线程数,则新建一个线程来执行这个任务
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果工作线程数达到了核心线程数,先判断线程池状态是否在运行,然后把任务放入任务阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 重复检查线程池状态,如果不是Running,就从任务队列移除上面添加的任务,并执行拒绝
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果工作线程数为0,则尝试新建一个任务为null的线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;
        // 然后,启动该线程从而执行任务。
        // 如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
        else if (!addWorker(command, false))
            reject(command);
    }
    // 添加任务到线程上执行
    private boolean addWorker(Runnable firstTask, boolean core) {

        // 这段主要是判断线程池的状态,确定能够添加任何到线程那执行
        retry:
        for (;;) {
            // 获取线程池的状态和数量的int值
            int c = ctl.get();
            // 获取线程池的状态
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 有效性检查:线程池关闭了,任务为null,工作队列为空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            // 死循环+cas保证线程安全
            for (;;) {
                // 获取工作线程个数
                int wc = workerCountOf(c);
                // 如果工作线程数达到最大容量或者是核心线程数大小就返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 工作线程数+1,失败就跳出循环重试
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 如果和之前的线程池状态不一致了,就继续从retry重新开始
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 新建一个工作线程,与任务绑定,等待run
            w = new Worker(firstTask);
            final Thread t = w.thread;
            // 判断线程
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 加锁
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    // 获取锁之后,再重新检查线程状态
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 如果发现新建的线程处于运行状态了,就抛出异常,因为还没有启动线程
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 把新建的线程放入线程池的集合里
                        workers.add(w);
                        // 记录放入线程池集合数量的最大值
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 如果添加成功,就启动线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如果线程没有start起来,就把那个线程从集合移除,线程数量ctl - 1
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    // 工作线程类
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        // 工厂失败时为null
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        // 需要运行的任务,可以为null
        Runnable firstTask;
        /** Per-thread task counter */
        // 每个线程完成的任务数
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        // 新建Worker
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
    }

    // submit是AbstractExecutorService的方法
    // submit方法也是调用execut方法
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    // 有返回值的submit
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    // 传入Callable,也有返回值
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

第四部分,关闭线程池:

    // 关闭连接池,不会接受新的任务,但是会把已经提交的任务执行完毕
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 安全检查
            checkShutdownAccess();
            // 把线程池状态置SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 中断空闲线程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    // 中断空闲线程
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 遍历线程集合,如果不处于中断状态就中断
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
    // 尝试把状态最后变成Terminated
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 判断中断的准入条件
            // 正在运行的不行,处于TIDYING不行,处于SHUTDOWN 但任务队列不为空的不行
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            // 如果工作线程不为0,就中断空闲线程
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 当线程终止了就调用这个方法
                        terminated();
                    } finally {
                        // 最后把ctl置0
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
   // 马上关闭线程池,把状态变STOP,终止正在执行的线程,返回等待执行的任务集合
   public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
    // 把任务队列里面的任务通过drainTo()放到一个list里面,如果是延时的队列,则可能失败,需要
    // 一个一个的删除
    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

第五部分,拒绝策略:

线程池产生拒绝的场景一般有两个:一个是线程池异常关闭了,另一个是添加到线程池的任务数量已经超过阻塞队里最大值了

四种拒绝策略:

AbortPolicy -- 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。 默认的拒绝策略!

CallerRunsPolicy -- 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。马上执行。

DiscardOldestPolicy -- 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。

DiscardPolicy -- 当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。

使用方式:

// 构造函数指定
static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 5, 2000, TimeUnit.MILLISECONDS, queue, new ThreadPoolExecutor.AbortPolicy());

// 线程池的set方法
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());