欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

ThreadPoolTaskExecutor多线程使用

程序员文章站 2022-04-21 10:35:14
首先准备几个配置,复制粘贴即可.1.线程池package com.futuredata.cloud.cube.async;import lombok.extern.slf4j.Slf4j;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import org.springframework.util.concurrent.ListenableFuture;import java.util.concu...

首先准备几个配置,复制粘贴即可.
1.线程池

package com.futuredata.cloud.cube.async;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程池
 */
@Slf4j
public class FdThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
    private void showThreadPoolInfo(String prefix){
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        if(null==threadPoolExecutor){
            return;
        }

        log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
                this.getThreadNamePrefix(),
                prefix,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size());
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }
}

2.线程池配置类

package com.futuredata.cloud.cube.config;

import com.futuredata.cloud.cube.async.FdThreadPoolTaskExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程池配置
 */
@Configuration
@Slf4j
public class ExecutorConfig {
    @Bean("showThreadPoolInfo")
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new FdThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(20);
        //配置最大线程数
        executor.setMaxPoolSize(20);
        //配置队列大小
        executor.setQueueCapacity(99999);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("async-service-");

        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }
}

3.在需要多线程异步处理的方法上面加上@Async注解即可,

 @Async("showThreadPoolInfo")//括号内是指定的线程池方法名,
    public void toResult(参数){
    ......
    }

4.下面是注意事项
// 如下方式会使@Async失效 @Async==线程池(前人总结,这里借用,很经典)
// 一、异步方法使用static修饰
// 二、异步类没有使用@Component注解(或其他注解)导致spring无法扫描到异步类
// 三、异步方法不能与被调用的异步方法在同一个类中
// 四、类中需要使用@Autowired或@Resource等注解自动注入,不能自己手动new对象
// 五、如果使用SpringBoot框架必须在启动类中增加@EnableAsync注解

关于返回值问题,
1.一般多线程处理的返回值为void,只是进行业务处理,比如大量数据入库,不需要返回值
2.返回值接收类型必须是线程安全的类,用Future接收,用Future里面的get()方法可以获取存入的值.

//举个例子
@Async
10     public Future<String> asyncInvokeReturnFuture(int i) {
11         log.info("asyncInvokeReturnFuture, parementer={}", i);
12         Future<String> future;
13         try {
14             Thread.sleep(1000 * 1);
15             future = new AsyncResult<String>("success:" + i);
16             throw new IllegalArgumentException("a");
17         } catch (InterruptedException e) {
18             future = new AsyncResult<String>("error");
19         } catch(IllegalArgumentException e){
20             future = new AsyncResult<String>("error-IllegalArgumentException");
21         }
22         return future;
23     }

效果图:圈红的就是线程日志
ThreadPoolTaskExecutor多线程使用

暂时就这么多,后续会完善对多线程的理解,目前还在探索学习中.
记录下我的理解(就本次在循环中调用外部接口获取返回值存入数据库这个需求发现的问题)
1.Tomcat自带的线程池,每个用户访问会建立一个连接,每个请求为一个线程顺序执行直到请求完毕
2.开启多线程,就是在一个请求的情况下,一个线程在执行的时候突然分出很多线程去处理被@Async的这个异步方法,但是主线程也就是用户的那个线程还是继续一条线往下执行.如果多线程处理的数据需要被使用到,但主线程并不会去等待你多线程的异步方法执行完毕,这时候很容易会报错,如果加同步锁解决这个问题,但是多线程的效果就没用了.这个场景是我在for循环中调用异步方法出现的.
比较乱,还可能不对,可能一段时间后再会过来看会觉得现在的理解很搞笑哈,就这样,记录下,别当正确的.

本文地址:https://blog.csdn.net/whlqunzhu/article/details/107673185

相关标签: 多线程