Java并发之线程池
线程池是为了避免频繁地创建和销毁线程而设计的,可以让创建的线程复用。
1、常用线程池
ThreadPollExecutor表示一个线程池,Executors扮演着线程池工厂的角色,通过Executos可以取得一个特定功能的线程池。
// 返回固定线程数量的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//返回一个只有一个线程的线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
//线程数量不确定,适合 short-lived asynchronous tasks
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
2、计划任务
newScheduledThreadPool() 返回ScheduledExecutorService 对象,可以根据时间需要对线程进行调度。
ScheduledExecutorService 对象的方法如下:
//delay时间后,执行command
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
//delay时间后,执行callable,相比command,callable有返回值
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
//固定频率的周期性调度
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
long period, TimeUnit unit);
//上一个任务执行完后,过delay时间后的延时执行
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
long delay, TimeUnit unit);
scheduleAtFixedRate:任务调度的频率是一定的。以上一个任务开始执行的时间为起点,之后的period时间后,调度下一次任务。若前面的任务没有完成,则调度也不会启动。周期如果太短,那任务就会在上一个任务结束后,立即调用。
scheduleWithFixedDelay:在上一个任务结束后,再经过delay时间进行任务调度。
注意: 如果任务遇到异常,那后续的所有子任务都会停止调度,所以必须保证异常要即时处理。
3、自定义线程池
从以上核心线程池的实现可以发现,他们都是ThreadPoolExecutorde 封装,
public ThreadPoolExecutor(int corePoolSize, //指定线程池中的线程数量
int maximumPoolSize, //线程池中最大线程数量
long keepAliveTime, //当线程池中的线程数量大于corePoolSize时,多余的空闲线程的存活时间
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
可以根据自己的需求,重新设置以上的参数来自定义线程池。例如:
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
//设置为守护线程
t.setDaemon(true);
System.out.println("Created " + t);
return t;
}
});
线程管理机制:
4.拒绝策略
最后一个参数RejectedExecutionHandler handler 指定拒绝策略,也就是任务数量超过了系统实际承载的能力时,该如何处理。
当线程池中的线程已经用完了,无法为新任务服务,同时等待队列也已经排满,再也塞不下新任务的时候,就需要有一套机制来合理处理这个问题。
JDK提供了如下四种策略:
ThreadPoolExecutor.AbortPolicy(默认):直接丢弃任务并抛出RejectedExecutionException
ThreadPoolExecutor.CallerRunsPolicy:会使用当前提交任务的线程去执行任务,这种策略会导致任务提交的速度下降。
ThreadPoolExecutor.DiscardPolicy:直接丢弃任务,不予任何处理。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃任务队列中的头部的任务,然后再次尝试提交当前任务。
5、扩展线程池
当我们想监控每个任务执行的开始和结束时间,或者其他一些自定义的增强功能,就可以使用ThreadPoolExecutor提供的接口来实现。
ThreadPoolExecutor.Worker 的 runWorker(Worker w) 内部一部分实现如下:
beforeExecute(wt, task); //运行前
Throwable thrown = null;
try {
task.run(); //运行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); //运行结束
}
runWorker() 会被线程池以多线程模式异步调用,即会被多个线程同时访问,所以beforeExecute()、afterExecute() 也将同时被多线程访问。
beforeExecute()、afterExecute() 默认是空的实现,我们可以自己来实现对线程池的运行状态进行跟踪,输出调试信息。
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("准备执行:" + r.name);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("执行完成:" + r.name);
}
@Override
protected void terminated() {
System.out.println("线程池退出");
}
};
....
es.shutdown();
shutdown() 关闭线程池。不会立即暴力地终止所有任务,会等到所有的任务执行完后再关闭线程池。但当shutdown() 执行后,就不能再接受其他新的任务了。
6、堆栈信息
异常堆栈对于调试非常重要,如果没有异常堆栈信息,排查问题时,就得加班熬夜了。
线程池可能会吃掉程序抛出的异常,导致我们对程序的错误一无所知。
submit()会吃掉抛出的异常,可以改成如下两种方式:
Future fu = es.submit(Runnable r);
fu.get();
es.execute(Runnable r);
以上两种方式只会得到部分堆栈信息,如果想要得到全部的堆栈信息,就得要自定义线程池了,让他在调度任务之前,先保存一下提交任务线程的堆栈信息。
上一篇: Android实现自定义轮播图片控件示例
下一篇: React的setState()