欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java并发之线程池

程序员文章站 2024-03-02 15:55:46
...

线程池是为了避免频繁地创建和销毁线程而设计的,可以让创建的线程复用。

1、常用线程池

Java并发之线程池
ThreadPollExecutor表示一个线程池,Executors扮演着线程池工厂的角色,通过Executos可以取得一个特定功能的线程池。

// 返回固定线程数量的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
               new LinkedBlockingQueue<Runnable>());
}
//返回一个只有一个线程的线程池
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,
               0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
//线程数量不确定,适合 short-lived asynchronous tasks
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
               60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}

2、计划任务

newScheduledThreadPool() 返回ScheduledExecutorService 对象,可以根据时间需要对线程进行调度。
ScheduledExecutorService 对象的方法如下:

//delay时间后,执行command
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

//delay时间后,执行callable,相比command,callable有返回值
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

//固定频率的周期性调度
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
                                                  long period, TimeUnit unit);
//上一个任务执行完后,过delay时间后的延时执行                                                  
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
                                                  long delay, TimeUnit unit);

scheduleAtFixedRate:任务调度的频率是一定的。以上一个任务开始执行的时间为起点,之后的period时间后,调度下一次任务。若前面的任务没有完成,则调度也不会启动。周期如果太短,那任务就会在上一个任务结束后,立即调用。

scheduleWithFixedDelay:在上一个任务结束后,再经过delay时间进行任务调度。

注意: 如果任务遇到异常,那后续的所有子任务都会停止调度,所以必须保证异常要即时处理。

3、自定义线程池

从以上核心线程池的实现可以发现,他们都是ThreadPoolExecutorde 封装,

public ThreadPoolExecutor(int corePoolSize,    //指定线程池中的线程数量
                          int maximumPoolSize, //线程池中最大线程数量 
                          long keepAliveTime,  //当线程池中的线程数量大于corePoolSize时,多余的空闲线程的存活时间
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

可以根据自己的需求,重新设置以上的参数来自定义线程池。例如:

ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, 
                   new SynchronousQueue<Runnable>(),
                   new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        //设置为守护线程
                        t.setDaemon(true);
                        System.out.println("Created " + t);
                        return t;
                    }
});

线程管理机制:
Java并发之线程池

4.拒绝策略

最后一个参数RejectedExecutionHandler handler 指定拒绝策略,也就是任务数量超过了系统实际承载的能力时,该如何处理。
当线程池中的线程已经用完了,无法为新任务服务,同时等待队列也已经排满,再也塞不下新任务的时候,就需要有一套机制来合理处理这个问题。

JDK提供了如下四种策略:
ThreadPoolExecutor.AbortPolicy(默认):直接丢弃任务并抛出RejectedExecutionException
ThreadPoolExecutor.CallerRunsPolicy:会使用当前提交任务的线程去执行任务,这种策略会导致任务提交的速度下降。
ThreadPoolExecutor.DiscardPolicy:直接丢弃任务,不予任何处理。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃任务队列中的头部的任务,然后再次尝试提交当前任务。

5、扩展线程池

当我们想监控每个任务执行的开始和结束时间,或者其他一些自定义的增强功能,就可以使用ThreadPoolExecutor提供的接口来实现。
ThreadPoolExecutor.Worker 的 runWorker(Worker w) 内部一部分实现如下:

beforeExecute(wt, task);              //运行前
Throwable thrown = null;
try {
    task.run();                       //运行任务
} catch (RuntimeException x) {
    thrown = x; throw x;
} catch (Error x) {
    thrown = x; throw x;
} catch (Throwable x) {
    thrown = x; throw new Error(x);
} finally {
    afterExecute(task, thrown);       //运行结束
}                    

runWorker() 会被线程池以多线程模式异步调用,即会被多个线程同时访问,所以beforeExecute()、afterExecute() 也将同时被多线程访问。
beforeExecute()、afterExecute() 默认是空的实现,我们可以自己来实现对线程池的运行状态进行跟踪,输出调试信息。

ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, 
                   new LinkedBlockingQueue<Runnable>()) {

                    @Override
                    protected void beforeExecute(Thread t, Runnable r) {
                        System.out.println("准备执行:"  + r.name);
                    }

                    @Override
                    protected void afterExecute(Runnable r, Throwable t) {
                        System.out.println("执行完成:"  + r.name);
                    }

                    @Override
                    protected void terminated() {
                        System.out.println("线程池退出");
                    }
};
....
es.shutdown();

shutdown() 关闭线程池。不会立即暴力地终止所有任务,会等到所有的任务执行完后再关闭线程池。但当shutdown() 执行后,就不能再接受其他新的任务了。

6、堆栈信息

异常堆栈对于调试非常重要,如果没有异常堆栈信息,排查问题时,就得加班熬夜了。
线程池可能会吃掉程序抛出的异常,导致我们对程序的错误一无所知。

submit()会吃掉抛出的异常,可以改成如下两种方式:

Future fu = es.submit(Runnable r);
fu.get();

es.execute(Runnable r);

以上两种方式只会得到部分堆栈信息,如果想要得到全部的堆栈信息,就得要自定义线程池了,让他在调度任务之前,先保存一下提交任务线程的堆栈信息。