欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

线程池实践

程序员文章站 2024-01-20 17:26:04
...

目录

 

原生线程池

SpringBoot的方式实现

封装线程池


原生线程池

直接构建线程池包装实例, 然后放入spring容器, 使用的地方直接以注入的方式获取即可

/**
 * <p>
 * 异步处理类,相当于线程池, 内部封装线程池
 * </p>
 */
@Component
public class MyTaskAsynExecutor {
  Logger logger = LoggerFactory.getLogger(this.getClass());
  /**
   * corePoolSize设置为1:空闲时不需要过多的浪费资源
   */
  final int corePoolSize = 1;
  /**
   * keepAliveTime设置为5秒:对账处理的执行状态理论上是间断且持续的
   */
  final int keepAliveTime = 5;
  /**
   * 工作队列长度设置为500
   */
  final int queueCapacity = 600;
  /**
   * 最大处理线程数设置为10
   */
  final int maximumPoolSize = 10;
  /**
   * 线程池
   */
  private ThreadPoolExecutor executor;

  @PostConstruct
  public void init(){
    executor = new ThreadPoolExecutor(
        corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
        new LinkedBlockingDeque<>(queueCapacity),
        //设置守护线程
        r -> {
          Thread thread = new Thread(r, "RecTask");
          thread.setDaemon(true);
          return thread;
        },
        //如果超出队列限制将不予处理
        (r, e) -> {
          logger.warn("RecTaskAsynExecutor:线程池溢出");
          return;
        }
    );
  }

  @PreDestroy
  public void shutdown(){
    executor.shutdown();
  }

  /**
   * 执行异步操作
   * @param runnable
   */
  public void execute(Runnable runnable){
    executor.execute(runnable);
  }
}

 这个类用来调用线程池,然后提供统一的提交任务的接口

/**
 * 数据异步执行类
 */
@Slf4j
@Component
public class MyProcessExecutor {

  @Autowired
  MyTaskAsynExecutor myTaskAsynExecutor;
  
  /**
   * 执行异步操作
   */
  public void execute(NeedSuspendExecutor needSuspendExecutor){
    myTaskAsynExecutor.execute(()->{
      try {
        needSuspendExecutor.execute();
      } catch (Exception t){
        log.error("DA执行调度任务报错");
      }
    });
  }

  public interface NeedSuspendExecutor{
    void execute() throws Exception;
  }
}

调用方式如下:采用匿名内部类的方式调用

myProcessExecutor.execute(() -> functionService.insertData(po));

 

SpringBoot的方式实现

先配置(@EnableAsync) ,初始化线程池

/**
 * springboot 线程池配置
 */
@Configuration
@EnableAsync
public class SpringBootExecutorConfig {
    /**
     * myExecutorOne
     * @return
     */
    @Bean(name = "myExecutorOne")
    public TaskExecutor myExecutorOne() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(5);
        //配置最大线程数
        executor.setMaxPoolSize(10);
        //配置队列大小
        executor.setQueueCapacity(500);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("async-myExecutorOne");

        /**
         * rejection-policy:当pool已经达到max size的时候,如何处理新任务
         * CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
         */
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }

    /**
     * myExecutorTwo
     * @return
     */
    @Bean(name = "myExecutorTwo")
    public TaskExecutor myExecutorTwo() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(3);
        //配置最大线程数
        executor.setMaxPoolSize(6);
        //配置队列大小
        executor.setQueueCapacity(200);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("async-myExecutorTwo");

        /**
         * rejection-policy:当pool已经达到max size的时候,如何处理新任务
         * CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
         */
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }
}

线程池的使用(@Async)

@Service
public class SpringBootExecutorService {
    @Async("myExecutorOne")
    public void calculateOne(){
        System.out.println("复杂的计算");
    }
    @Async("myExecutorTwo")
    public void calculateTwo(){
        System.out.println("复杂的计算");
    }
}

 


封装线程池

/**
 * 该类继承ThreadPoolExecutor类,覆盖了shutdown(), shutdownNow(), beforeExecute() 和 afterExecute()
 * 方法来统计线程池的执行情况
 *
 */
public class ExecutorsExtend extends ThreadPoolExecutor {

    private static final Logger LOGGER = LoggerFactory.getLogger(ExecutorsExtend.class);

    // 保存任务开始执行的时间,当任务结束时,用任务结束时间减去开始时间计算任务执行时间
    private ConcurrentHashMap<String, Date> startTimes;

    // 线程池名称,一般以业务名称命名,方便区分
    private String poolName;

    /**
     * 调用父类的构造方法,并初始化HashMap和线程池名称
     *
     * @param corePoolSize
     *            线程池核心线程数
     * @param maximumPoolSize
     *            线程池最大线程数
     * @param keepAliveTime
     *            线程的最大空闲时间
     * @param unit
     *            空闲时间的单位
     * @param workQueue
     *            保存被提交任务的队列
     * @param poolName
     *            线程池名称
     */
    public ExecutorsExtend(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
                           String poolName) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new EventThreadFactory(poolName));
        this.startTimes = new ConcurrentHashMap<>();
        this.poolName = poolName;
    }

    /**
     * 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况
     */
    @Override
    public void shutdown() {
        // 统计已执行任务、正在执行任务、未执行任务数量
        LOGGER.info(String.format(this.poolName + " Going to shutdown. Executed tasks: %d, Running tasks: %d, Pending tasks: %d",
                this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()));
        super.shutdown();
    }

    /**
     * 线程池立即关闭时,统计线程池情况
     */
    @Override
    public List<Runnable> shutdownNow() {
        // 统计已执行任务、正在执行任务、未执行任务数量
        LOGGER.info(
                String.format(this.poolName + " Going to immediately shutdown. Executed tasks: %d, Running tasks: %d, Pending tasks: %d",
                        this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()));
        return super.shutdownNow();
    }

    /**
     * 任务执行之前,记录任务开始时间
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        startTimes.put(String.valueOf(r.hashCode()), new Date());
    }

    /**
     * 任务执行之后,计算任务结束时间
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
        Date finishDate = new Date();
        long diff = finishDate.getTime() - startDate.getTime();
        // 统计任务耗时、初始线程数、核心线程数、正在执行的任务数量、已完成任务数量、任务总数、队列里缓存的任务数量、池中存在的最大线程数、最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止
        LOGGER.info(String.format(this.poolName
                        + "-pool-monitor: Duration: %d ms, PoolSize: %d, CorePoolSize: %d, Active: %d, Completed: %d, Task: %d, Queue: %d, LargestPoolSize: %d, MaximumPoolSize: %d,  KeepAliveTime: %d, isShutdown: %s, isTerminated: %s",
                diff, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(),
                this.getQueue().size(), this.getLargestPoolSize(), this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS),
                this.isShutdown(), this.isTerminated()));
    }

    /**
     * 创建固定线程池,代码源于Executors.newFixedThreadPool方法,这里增加了poolName
     *
     * @param nThreads
     *            线程数量
     * @param poolName
     *            线程池名称
     * @return ExecutorService对象
     */
    public static ExecutorService newFixedThreadPool(int nThreads, String poolName) {
        return new ExecutorsExtend(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), poolName);
    }

    /**
     * 创建缓存型线程池,代码源于Executors.newCachedThreadPool方法,这里增加了poolName
     *
     * @param poolName
     *            线程池名称
     * @return ExecutorService对象
     */
    public static ExecutorService newCachedThreadPool(String poolName) {
        return new ExecutorsExtend(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), poolName);
    }

    /**
     * 生成线程池所用的线程,只是改写了线程池默认的线程工厂,传入线程池名称,便于问题追踪
     */
    static class EventThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        /**
         * 初始化线程工厂
         *
         * @param poolName
         *            线程池名称
         */
        EventThreadFactory(String poolName) {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            if (t.isDaemon()) {
                t.setDaemon(false);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }
}

 

 

相关标签: 百宝箱