欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

基于ThreadPoolTaskExecutor的使用说明

程序员文章站 2022-06-28 14:53:12
目录threadpooltaskexecutor的使用springboot 配置提交任务threadpooltaskexecutor配置问题有关spring中threadpooltaskexecuto...

threadpooltaskexecutor的使用

当我们需要实现并发、异步等操作时,通常都会使用到threadpooltaskexecutor,现对其使用稍作总结。

springboot 配置

基于ThreadPoolTaskExecutor的使用说明

基于ThreadPoolTaskExecutor的使用说明

提交任务

  • 无返回值的任务使用execute(runnable)
  • 有返回值的任务使用submit(runnable)

处理流程

当一个任务被提交到线程池时,首先查看线程池的核心线程是否都在执行任务,否就选择一条线程执行任务,是就执行第二步。

查看核心线程池是否已满,不满就创建一条线程执行任务,否则执行第三步。

查看任务队列是否已满,不满就将任务存储在任务队列中,否则执行第四步。

查看线程池是否已满,不满就创建一条线程执行任务,否则就按照策略处理无法执行的任务。

在threadpoolexecutor中表现为:

如果当前运行的线程数小于corepoolsize,那么就创建线程来执行任务(执行时需要获取全局锁)。

如果运行的线程大于或等于corepoolsize,那么就把task加入blockqueue。

如果创建的线程数量大于blockqueue的最大容量,那么创建新线程来执行该任务。

如果创建线程导致当前运行的线程数超过maximumpoolsize,就根据饱和策略来拒绝该任务。

关闭线程池

调用shutdown或者shutdownnow,两者都不会接受新的任务,而且通过调用要停止线程的interrupt方法来中断线程,有可能线程永远不会被中断,不同之处在于shutdownnow会首先将线程池的状态设置为stop,然后尝试停止所有线程(有可能导致部分任务没有执行完)然后返回未执行任务的列表。而shutdown则只是将线程池的状态设置为shutdown,然后中断所有没有执行任务的线程,并将剩余的任务执行完。

配置线程个数

如果是cpu密集型任务,那么线程池的线程个数应该尽量少一些,一般为cpu的个数+1条线程。

如果是io密集型任务,那么线程池的线程可以放的很大,如2*cpu的个数。

对于混合型任务,如果可以拆分的话,通过拆分成cpu密集型和io密集型两种来提高执行效率;如果不能拆分的的话就可以根据实际情况来调整线程池中线程的个数。

监控线程池状态

常用状态

taskcount:线程需要执行的任务个数。

completedtaskcount:线程池在运行过程中已完成的任务数。

largestpoolsize:线程池曾经创建过的最大线程数量。

getpoolsize:获取当前线程池的线程数量。

getactivecount:获取活动的线程的数量

通过继承线程池,重写beforeexecute,afterexecute和terminated方法来在线程执行任务前,线程执行任务结束,和线程终结前获取线程的运行情况,根据具体情况调整线程池的线程数量。

threadpooltaskexecutor配置问题

最近线上出现一个奇葩问题,使用的是threadpooltaskexecutor来处理后续服务调用,刚开始运行threadpooltaskexecutor处理后续服务调用是没有问题的,但是一段时间之后,发现后续服务一直没有被调用,导致了极其严重的后果

有关spring中threadpooltaskexecutor具体如下

<bean id="threadpooltaskexecutor" 
            class="org.springframework.scheduling.concurrent.threadpooltaskexecutor">
    <!-- 核心线程数,默认为1 -->
    <property name="corepoolsize" value="5" />
    <!-- 最大线程数,默认为integer.max_value -->
    <property name="maxpoolsize" value="16" />
    <!-- 队列最大长度,一般需要设置值>=notifyscheduledmainexecutor.maxnum;默认为integer.max_value -->
    <!--<property name="queuecapacity" value="10" />-->
    <!-- 线程池维护线程所允许的空闲时间,默认为60s -->
    <property name="keepaliveseconds" value="300" />
    <!-- 线程池对拒绝任务(无线程可用)的处理策略,
        目前只支持abortpolicy、callerrunspolicy;默认为后者 
    -->
    <property name="rejectedexecutionhandler">
        <!-- abortpolicy:直接抛出java.util.concurrent.rejectedexecutionexception异常 -->
        <!-- callerrunspolicy:
            主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,
        -->
        <!-- discardoldestpolicy:
            抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行
             -->
        <!-- discardpolicy:
            抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 
        -->
        <bean class="java.util.concurrent.threadpoolexecutor$callerrunspolicy" />
    </property>
</bean>

那就不得不了解一下java.util.concurrent包下executor构架了

回忆一下线程池工作原理

如果当前运行的线程少于corepoolsize,则创建新线程来执行任务(需要获得全局锁)

如果运行的线程等于或多于corepoolsize ,则将任务加入blockingqueue

如果无法将任务加入blockingqueue(队列已满),则创建新的线程来处理任务(需要获得全局锁)

如果创建新线程将使当前运行的线程超出maxiumpoolsize,任务将被拒绝,并调用

rejectedexecutionhandler.rejectedexecution()方法

测试场景1

首先,注释queuecapacity的一行

任务:

public class customrunnable implements runnable {
    private int id;
    public customrunnable(int id) {
        this.id = id;
    }
    @override
    public void run() {
        try {
            system.out.println("begin execute "+ thread.currentthread().getname()
                    + "-- task id: "+ id);
            string rs =  clientutil.get("http://www.****.com");
            system.out.println("end execute task: "+ id);
        } catch (exception e) {
            e.printstacktrace();
        }
    }
}

测试案例:

@test
public void threadtest() throws interruptedexception {
    for (int i=0; i< 35; i++){
        thread t= new thread(new customrunnable(i));
        executor.execute(t);
    }
    thread.sleep(1800000);
}

测试结果:

七月 09, 2018 5:46:47 下午 org.springframework.scheduling.concurrent.threadpooltaskexecutor initialize
信息: initializing executorservice 'threadpooltaskexecutor'
begin execute threadpooltaskexecutor-1-- task id: 0
begin execute threadpooltaskexecutor-2-- task id: 1
begin execute threadpooltaskexecutor-3-- task id: 2
begin execute threadpooltaskexecutor-4-- task id: 3
begin execute threadpooltaskexecutor-5-- task id: 4
end execute task: 4
begin execute threadpooltaskexecutor-5-- task id: 5
end execute task: 1
begin execute threadpooltaskexecutor-2-- task id: 6
end execute task: 0
begin execute threadpooltaskexecutor-1-- task id: 7
end execute task: 2
begin execute threadpooltaskexecutor-3-- task id: 8
end execute task: 3
begin execute threadpooltaskexecutor-4-- task id: 9
...

可以发现,一开始线程池就创建了corepoolsize大小的线程,对于之后的新加进的任务,就放到blockingqueue中,默认是使用linkedblockingqueue,大小是integer.max_value,因为队列大小太大,所以就不会创建maxpoolsize大小的线程数量,因此,只有线程处理完当前任务,才会去处理下一个任务,所以,刚加进去的任务得不到立即处理

测试场景2

只需要打开queuecapacity的一行,其他不变

测试结果:

七月 09, 2018 6:07:13 下午 org.springframework.scheduling.concurrent.threadpooltaskexecutor initialize
信息: initializing executorservice 'threadpooltaskexecutor'
begin execute threadpooltaskexecutor-1-- task id: 0
begin execute threadpooltaskexecutor-2-- task id: 1
begin execute threadpooltaskexecutor-3-- task id: 2
begin execute threadpooltaskexecutor-4-- task id: 3
begin execute threadpooltaskexecutor-5-- task id: 4
begin execute threadpooltaskexecutor-6-- task id: 15
begin execute threadpooltaskexecutor-7-- task id: 16
begin execute threadpooltaskexecutor-8-- task id: 17
begin execute threadpooltaskexecutor-9-- task id: 18
begin execute threadpooltaskexecutor-10-- task id: 19
begin execute threadpooltaskexecutor-11-- task id: 20
begin execute threadpooltaskexecutor-12-- task id: 21
begin execute threadpooltaskexecutor-14-- task id: 23
begin execute threadpooltaskexecutor-15-- task id: 24
begin execute main-- task id: 26
begin execute threadpooltaskexecutor-13-- task id: 22
begin execute threadpooltaskexecutor-16-- task id: 25
begin execute threadpooltaskexecutor-11-- task id: 5
end execute task: 15
begin execute threadpooltaskexecutor-6-- task id: 6
end execute task: 23
begin execute threadpooltaskexecutor-14-- task id: 7
end execute task: 4
begin execute threadpooltaskexecutor-5-- task id: 8
end execute task: 17
begin execute threadpooltaskexecutor-8-- task id: 9
....

可以发现,因为初始任务数量大于corepoolsize大小,所以线程池初始化就创建了maxpoolsize大小数量的纯种,对于后续新加进的任务会入到blockingqueue队列中去,之后等待线程处理完一个任务之后再处理队列中的任务

猜想

线上出现这种原因可能就是因为queuecapacity被设置成了默认(integer.max_value),而且初始化纯种的corepoolsize数量过少,并且线程处理速度较慢(业务逻辑,网络请求等等原因),导致后续任务会一直填加到队列中去,迟迟得不到立即处理。

解决方案

手动设置queuecapacity大小,网络请求原因的话,可以设置超时时间;业务逻辑的话,另辟蹊径。。。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。