线程池实践
程序员文章站
2024-01-20 17:26:04
...
目录
原生线程池
直接构建线程池包装实例, 然后放入spring容器, 使用的地方直接以注入的方式获取即可
/**
* <p>
* 异步处理类,相当于线程池, 内部封装线程池
* </p>
*/
@Component
public class MyTaskAsynExecutor {
Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* corePoolSize设置为1:空闲时不需要过多的浪费资源
*/
final int corePoolSize = 1;
/**
* keepAliveTime设置为5秒:对账处理的执行状态理论上是间断且持续的
*/
final int keepAliveTime = 5;
/**
* 工作队列长度设置为500
*/
final int queueCapacity = 600;
/**
* 最大处理线程数设置为10
*/
final int maximumPoolSize = 10;
/**
* 线程池
*/
private ThreadPoolExecutor executor;
@PostConstruct
public void init(){
executor = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(queueCapacity),
//设置守护线程
r -> {
Thread thread = new Thread(r, "RecTask");
thread.setDaemon(true);
return thread;
},
//如果超出队列限制将不予处理
(r, e) -> {
logger.warn("RecTaskAsynExecutor:线程池溢出");
return;
}
);
}
@PreDestroy
public void shutdown(){
executor.shutdown();
}
/**
* 执行异步操作
* @param runnable
*/
public void execute(Runnable runnable){
executor.execute(runnable);
}
}
这个类用来调用线程池,然后提供统一的提交任务的接口
/**
* 数据异步执行类
*/
@Slf4j
@Component
public class MyProcessExecutor {
@Autowired
MyTaskAsynExecutor myTaskAsynExecutor;
/**
* 执行异步操作
*/
public void execute(NeedSuspendExecutor needSuspendExecutor){
myTaskAsynExecutor.execute(()->{
try {
needSuspendExecutor.execute();
} catch (Exception t){
log.error("DA执行调度任务报错");
}
});
}
public interface NeedSuspendExecutor{
void execute() throws Exception;
}
}
调用方式如下:采用匿名内部类的方式调用
myProcessExecutor.execute(() -> functionService.insertData(po));
SpringBoot的方式实现
先配置(@EnableAsync) ,初始化线程池
/**
* springboot 线程池配置
*/
@Configuration
@EnableAsync
public class SpringBootExecutorConfig {
/**
* myExecutorOne
* @return
*/
@Bean(name = "myExecutorOne")
public TaskExecutor myExecutorOne() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(5);
//配置最大线程数
executor.setMaxPoolSize(10);
//配置队列大小
executor.setQueueCapacity(500);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("async-myExecutorOne");
/**
* rejection-policy:当pool已经达到max size的时候,如何处理新任务
* CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
/**
* myExecutorTwo
* @return
*/
@Bean(name = "myExecutorTwo")
public TaskExecutor myExecutorTwo() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(3);
//配置最大线程数
executor.setMaxPoolSize(6);
//配置队列大小
executor.setQueueCapacity(200);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("async-myExecutorTwo");
/**
* rejection-policy:当pool已经达到max size的时候,如何处理新任务
* CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
}
线程池的使用(@Async)
@Service
public class SpringBootExecutorService {
@Async("myExecutorOne")
public void calculateOne(){
System.out.println("复杂的计算");
}
@Async("myExecutorTwo")
public void calculateTwo(){
System.out.println("复杂的计算");
}
}
封装线程池
/**
* 该类继承ThreadPoolExecutor类,覆盖了shutdown(), shutdownNow(), beforeExecute() 和 afterExecute()
* 方法来统计线程池的执行情况
*
*/
public class ExecutorsExtend extends ThreadPoolExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(ExecutorsExtend.class);
// 保存任务开始执行的时间,当任务结束时,用任务结束时间减去开始时间计算任务执行时间
private ConcurrentHashMap<String, Date> startTimes;
// 线程池名称,一般以业务名称命名,方便区分
private String poolName;
/**
* 调用父类的构造方法,并初始化HashMap和线程池名称
*
* @param corePoolSize
* 线程池核心线程数
* @param maximumPoolSize
* 线程池最大线程数
* @param keepAliveTime
* 线程的最大空闲时间
* @param unit
* 空闲时间的单位
* @param workQueue
* 保存被提交任务的队列
* @param poolName
* 线程池名称
*/
public ExecutorsExtend(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
String poolName) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new EventThreadFactory(poolName));
this.startTimes = new ConcurrentHashMap<>();
this.poolName = poolName;
}
/**
* 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况
*/
@Override
public void shutdown() {
// 统计已执行任务、正在执行任务、未执行任务数量
LOGGER.info(String.format(this.poolName + " Going to shutdown. Executed tasks: %d, Running tasks: %d, Pending tasks: %d",
this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()));
super.shutdown();
}
/**
* 线程池立即关闭时,统计线程池情况
*/
@Override
public List<Runnable> shutdownNow() {
// 统计已执行任务、正在执行任务、未执行任务数量
LOGGER.info(
String.format(this.poolName + " Going to immediately shutdown. Executed tasks: %d, Running tasks: %d, Pending tasks: %d",
this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()));
return super.shutdownNow();
}
/**
* 任务执行之前,记录任务开始时间
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
startTimes.put(String.valueOf(r.hashCode()), new Date());
}
/**
* 任务执行之后,计算任务结束时间
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
Date finishDate = new Date();
long diff = finishDate.getTime() - startDate.getTime();
// 统计任务耗时、初始线程数、核心线程数、正在执行的任务数量、已完成任务数量、任务总数、队列里缓存的任务数量、池中存在的最大线程数、最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止
LOGGER.info(String.format(this.poolName
+ "-pool-monitor: Duration: %d ms, PoolSize: %d, CorePoolSize: %d, Active: %d, Completed: %d, Task: %d, Queue: %d, LargestPoolSize: %d, MaximumPoolSize: %d, KeepAliveTime: %d, isShutdown: %s, isTerminated: %s",
diff, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(),
this.getQueue().size(), this.getLargestPoolSize(), this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS),
this.isShutdown(), this.isTerminated()));
}
/**
* 创建固定线程池,代码源于Executors.newFixedThreadPool方法,这里增加了poolName
*
* @param nThreads
* 线程数量
* @param poolName
* 线程池名称
* @return ExecutorService对象
*/
public static ExecutorService newFixedThreadPool(int nThreads, String poolName) {
return new ExecutorsExtend(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), poolName);
}
/**
* 创建缓存型线程池,代码源于Executors.newCachedThreadPool方法,这里增加了poolName
*
* @param poolName
* 线程池名称
* @return ExecutorService对象
*/
public static ExecutorService newCachedThreadPool(String poolName) {
return new ExecutorsExtend(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), poolName);
}
/**
* 生成线程池所用的线程,只是改写了线程池默认的线程工厂,传入线程池名称,便于问题追踪
*/
static class EventThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
/**
* 初始化线程工厂
*
* @param poolName
* 线程池名称
*/
EventThreadFactory(String poolName) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
}
上一篇: (比较器)比较器问题引出
下一篇: (Java基础类库)对象克隆