欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

多线程编程学习十一(ThreadPoolExecutor 详解).

程序员文章站 2023-04-05 16:14:10
一、ThreadPoolExecutor 参数说明 corePoolSize:核心线程池的大小。当提交一个任务到线程池时,核心线程池会创建一个核心线程来执行任务,即使其他核心线程能够执行新任务也会创建线程,等到需要执行的任务数大于核心线程池基本大小时就不再创建。如果调用了线程池的 prestartA ......

一、threadpoolexecutor 参数说明

    public threadpoolexecutor(int corepoolsize,
                              int maximumpoolsize,
                              long keepalivetime,
                              timeunit unit,
                              blockingqueue<runnable> workqueue,
                              threadfactory threadfactory,
                              rejectedexecutionhandler handler)
  • corepoolsize:核心线程池的大小。当提交一个任务到线程池时,核心线程池会创建一个核心线程来执行任务,即使其他核心线程能够执行新任务也会创建线程,等到需要执行的任务数大于核心线程池基本大小时就不再创建。如果调用了线程池的 prestartallcorethreads() 方法,核心线程池会提前创建并启动所有核心线程。
  • workqueue:任务队列。当核心线程池中没有线程时,所提交的任务会被暂存在队列中。java 提供了多种。
  • maximumpoolsize:线程池允许创建的最大线程数。如果队列也满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的空闲线程执行任务。值得注意的是,如果使用了*的任务队列则这个参数不起作用。
  • keepalivetime:当线程池中的线程数大于 corepoolsize 时,keepalivetime 为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。值得注意的是,如果使用了*的任务队列则这个参数不起作用。
  • timeunit:线程活动保持时间的单位。
  • threadfactory:创建线程的工厂。可以通过线程工厂给每个创建出来的线程设置符合业务的名字。

    // 依赖 guava
    new threadfactorybuilder().setnameformat("xx-task-%d").build();
  • handler:饱和策略。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。java 提供了以下4种策略:

    • abortpolicy:默认。直接抛出异常。
    • callerrunspolicy:只用调用者所在线程来运行任务。
    • discardoldestpolicy:丢弃队列里最近的一个任务,并执行当前任务。
    • discardpolicy:不处理,丢弃掉。

tips: 一般我们称核心线程池中的线程为核心线程,这部分线程不会被回收;超过任务队列后,创建的线程为空闲线程,这部分线程会被回收(回收时间即 keepalivetime)

二、常见的 threadpoolexecutor 介绍

executors 是创建 threadpoolexecutor 和 scheduledthreadpoolexecutor 的工厂类。

java 提供了多种类型的 threadpoolexecutor,比较常见的有 fixedthreadpool、singlethreadexecutor、cachedthreadpool等。

fixedthreadpool

    public static executorservice newfixedthreadpool(int nthreads) {
        return new threadpoolexecutor(nthreads, nthreads,
                                      0l, timeunit.milliseconds,
                                      new linkedblockingqueue<runnable>());
    }

fixedthreadpool 被称为可重用固定线程数的线程池。可以看到 corepoolsize 和 maximumpoolsize 都被设置成了 nthreads;keepalivetime设置为0l,意味着多余的空闲线程会被立即终止;使用了阻塞队列 linkedblockingqueue 作为线程的工作队列(队列的容量为 integer.max_value)。

fixedthreadpool 所存在的问题是,由于队列的容量为 integer.max_value,基本可以认为是*的,所以 maximumpoolsize 和 keepalivetime 参数都不会生效,饱和拒绝策略也不会执行,会造成任务大量堆积在阻塞队列中。

fixedthreadpool 适用于为了满足资源管理的需求,而需要限制线程数量的应用场景。


singlethreadexecutor

    public static executorservice newsinglethreadexecutor() {
        return new finalizabledelegatedexecutorservice
            (new threadpoolexecutor(1, 1,
                                    0l, timeunit.milliseconds,
                                    new linkedblockingqueue<runnable>()));
    }

singlethreadexecutor 是使用单个线程的线程池。可以看到 corepoolsize 和 maximumpoolsize 被设置为1,其他参数与 fixedthreadpool 相同,所以所带来的风险也和 fixedthreadpool 一致,就不赘述了。

singlethreadexecutor 适用于需要保证顺序的执行各个任务。


cachedthreadpool

    public static executorservice newcachedthreadpool() {
        return new threadpoolexecutor(0, integer.max_value,
                                      60l, timeunit.seconds,
                                      new synchronousqueue<runnable>());
    }

cachedthreadpool 是一个会根据需要创建新线程的线程池。可以看到 corepoolsize 被设置为 0,所以创建的线程都为空闲线程;maximumpoolsize 被设置为 integer.max_value(基本可认为*),意味着可以创建无限数量的空闲线程;keepalivetime 设置为60l,意味着空闲线程等待新任务的最长时间为60秒;使用没有容量的 synchronousqueue 作为线程池的工作队列。

cachedthreadpool 所存在的问题是, 如果主线程提交任务的速度高于maximumpool 中线程处理任务的速度时,cachedthreadpool 会不断创建新线程。极端情况下,cachedthreadpool会因为创建过多线程而耗尽cpu和内存资源。

cachedthreadpool 适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。

三、 自建 threadpoolexecutor 线程池

鉴于上面提到的风险,我们更提倡使用 threadpoolexecutor 去创建线程池,而不用 executors 工厂去创建。

以下是一个 threadpoolexecutor 创建线程池的 demo 实例:

public class pool {

    static threadfactory threadfactory = new threadfactorybuilder().setnameformat("pool-task-%d").build();
    static executorservice executor = new threadpoolexecutor(runtime.getruntime().availableprocessors() * 2,
            200, 0l, timeunit.milliseconds, new linkedblockingqueue<>(1024),
            threadfactory, new threadpoolexecutor.abortpolicy());

    public static void main(string[] args) throws executionexception, interruptedexception {
        // 1. 无返回值的任务执行 -> runnable
        executor.execute(() -> system.out.println("hello world"));
        // 2. 有返回值的任务执行 -> callable
        future<string> future = executor.submit(() -> "hello world");
        // get 方法会阻塞线程执行等待返回结果
        string result = future.get();
        system.out.println(result);

        // 3. 监控线程池
        monitor();

        // 4. 关闭线程池
        shutdownandawaittermination();

        monitor();
    }

    private static void monitor() {
        threadpoolexecutor threadpoolexecutor = (threadpoolexecutor) pool.executor;
        system.out.println("【线程池任务】线程池中曾经创建过的最大线程数:" + threadpoolexecutor.getlargestpoolsize());
        system.out.println("【线程池任务】线程池中线程数:" + threadpoolexecutor.getpoolsize());
        system.out.println("【线程池任务】线程池中活动的线程数:" + threadpoolexecutor.getactivecount());
        system.out.println("【线程池任务】队列中等待执行的任务数:" + threadpoolexecutor.getqueue().size());
        system.out.println("【线程池任务】线程池已执行完任务数:" + threadpoolexecutor.getcompletedtaskcount());
    }

    /**
     * 关闭线程池
     * 1. shutdown、shutdownnow 的原理都是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程。
     * 2. shutdownnow:将线程池的状态设置成 stop,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。
     * 3. shutdown:将线程池的状态设置成 shutdown 状态,然后中断所有没有正在执行任务的线程。
     */
    private static void shutdownandawaittermination() {
        // 禁止提交新任务
        executor.shutdown();
        try {
            // 等待现有任务终止
            if (!executor.awaittermination(60, timeunit.seconds)) {
                // 取消当前正在执行的任务
                executor.shutdownnow();
                // 等待一段时间让任务响应被取消
                if (!executor.awaittermination(60, timeunit.seconds)) {
                    system.err.println("pool did not terminate");
                }
            }
        } catch (interruptedexception ie) {
            // 如果当前线程也中断,则取消
            executor.shutdownnow();
            // 保留中断状态
            thread.currentthread().interrupt();
        }
    }
}

创建线程池需要注意以下几点:

  1. cpu 密集型任务应配置尽可能小的线程,如配置 ncpu+1 个线程。
  2. io 密集型任务(数据库读写等)应配置尽可能多的线程,如配置 ncpu*2 个线程。
  3. 优先级不同的任务可以使用优先级队列 priorityblockingqueue 来处理。
  4. 建议使用有界队列。可以避免创建数量非常多的线程,甚至拖垮系统。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。