欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

ThreadPoolExecutor的构造方法详解以及应急线程的解释

程序员文章站 2022-05-04 11:50:40
线程池状态// 线程池的控制状态,用高3位来表示线程池的运行状态,低29位来表示线程池中工作线程的数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3; // 29 // 高三位来表示线程的运行状态 private static final int RUNNING = -1...

线程池状态

// 线程池的控制状态,用高3位来表示线程池的运行状态,低29位来表示线程池中工作线程的数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
	private static final int COUNT_BITS = Integer.SIZE - 3; // 29

  // 高三位来表示线程的运行状态
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

ThreadPoolExecutor的构造方法详解以及应急线程的解释

构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 
  • corePoolSize:表示核心线程数的数量
  • maximumPoolSize:表示最大的线程数
  • keepAliveTime:表示应急线程的存活时间
  • unit:表示应急线程的存活时间的单位
  • workQueue:阻塞队列,就是线程忙不过来的时候,任务要在队列里排队
  • threadFactory:线程工厂,可以为线程创建的时候起个好名字
  • handler:拒绝策略

keepAliveTime和unit是针对应急线程的,那么什么是应急线程呢?

先说一下线程池里线程的类型。

  • 核心线程
  • 应急线程

以下代码,我在线程池里放了2个核心线程,最大线程数为3(也就是说应急线程为1),阻塞队列的容量为2,此时运行下面的代码查看结果。

public static void main(String[] args) {
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2,3,1000L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(2));

        pool.execute(() -> {
            try {
                System.out.println("第一个任务");
                Thread.sleep(1000000L);
            } catch(InterruptedException e) {
                e.printStackTrace();
            }
        });

        pool.execute(() -> {
            try {
                System.out.println("第二个任务");
                Thread.sleep(1000000L);
            } catch(InterruptedException e) {
                e.printStackTrace();
            }
        });

        pool.execute(() -> {
            try {
                System.out.println("第三个任务");
                Thread.sleep(1000000L);
            } catch(InterruptedException e) {
                e.printStackTrace();
            }
        });

        pool.execute(() -> {
            try {
                System.out.println("第四个任务");
                Thread.sleep(1000000L);
            } catch(InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

ThreadPoolExecutor的构造方法详解以及应急线程的解释
发现只有两个任务被线程消费了,但是不是有三个线程吗???另一个线程不工作???

这两个任务是被两个核心线程消费的,而剩余的两个任务被放到阻塞队列里(阻塞队列的容量是2),这时候的情况不算“应急”的情况,所以应急线程并没有启动。

那什么时候算“应急”??
此时我们再加一个任务瞅瞅,2个核心线程在工作,空间为2的队列也满了,多出来来的任务怎么办呢??应急线程会出手吗?

		pool.execute(() -> {
            try {
                System.out.println("第五个任务");
                Thread.sleep(100000L);
            } catch(InterruptedException e) {
                e.printStackTrace();
            }
        });

查看结果发现三个任务被消费了~其实是应急线程出来应急了。
ThreadPoolExecutor的构造方法详解以及应急线程的解释

总结一下:如果队列选择了有界队列,而任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的应急线程来救急。而应急当然就是救急用的,所以当高峰过去而没有任务做,就会被结束,由keepAliveTime 和 unit 来控制。

拒绝策略
如果任务量特别多,核心线程+应急线程(maximumPoolSize )+ 队列大小都撑不住,这时候就会执行拒绝策略了。

  • AbortPolicy:丢弃任务并抛出RejectedExecutionException异常,默认策略;
  • CallerRunsPolicy:由调用execute方法的线程执行该任务;
  • DiscardPolicy:丢弃任务,但是不抛出异常;
  • DiscardOldestPolicy:丢弃阻塞队列最前面的任务,然后重新尝试执行任务(重复此过程)。
  • 实现RejectedExecutionHandler接口,自定义相关处理的过程,如记录日志或持久化存储不能处理的任务。

提交任务


// 执行一个任务
void execute(Runnable command) 

// submit实质还是执行了execute方法,获取返回结果
Future<?> submit(Runnable task)
<T> Future<T> submit(Runnable task, T result)
<T> Future<T> submit(Callable<T> task)

//执行所有的任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,  long timeout, TimeUnit unit)  throws InterruptedException

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消 
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)  throws InterruptedException, ExecutionException, TimeoutException;

本文地址:https://blog.csdn.net/axiang_/article/details/107341060