《Java并发编程》之六:线程池的使用
8.1 在任务与执行策略之间的隐形耦合
有些类型的任务需要明确指定执行策略,包括:
依赖性任务、使用线程封闭机制的任务、对响应时间敏感的任务、使用ThreadLocal的任务。
只有当任务都是同类型的并且相互独立时,线程池的性能才能达到最佳。
8.1.1 线程饥饿死锁
public class ThreadDeadlock {
ExecutorService exec = Executors.newSingleThreadExecutor();
public class LoadFileTask implements Callable<String> {
private final String fileName;
public LoadFileTask(String fileName) {
this.fileName = fileName;
}
public String call() throws Exception {
// Here's where we would actually read the file
return "";
}
}
public class RenderPageTask implements Callable<String> {
public String call() throws Exception {
Future<String> header, footer;
header = exec.submit(new LoadFileTask("header.html"));
footer = exec.submit(new LoadFileTask("footer.html"));
String page = renderBody();
// 将发生死锁 -- task waiting for result of subtask
return header.get() + page + footer.get();
}
private String renderBody() {
// Here's where we would actually render the page
return "";
}
}
}
每当提交了一个有依赖性的Executor任务的时候,也就是说这个任务依赖其他的任务的计算结果。要清除地知道可能会出现线程饥饿死锁,因此需要在代码或配置Executor的配置文件中记录线程池的大小限制或配置限制。
8.2 设置线程池的大小
根据Runtime.getRuntime().availableProcessors()动态计算处理器的个数
对于计算密集型的任务,在拥有N(cpu)个处理器的系统上,当线程池的大小为N + 1的时候能实现最优利用率。
对于Executor,newCachedThreadPool工厂方法是一种很好的默认的选择,它能提供比固定大小的线程池更好的排队性能,一般来讲,只要任务数量不会爆炸型增长,就选择这个。
而当需要限制当前任务的数量以满足资源管理需求的时候,那么可以选择固定大小的线程池,就像在接受网络客户请求服务器应用程序中,不然不限制,很容易产生过载的问题。服务器挂点
只有当任务都独立的时候,为线程池或工作队列设置界限才是合理的,如果任务之间存在依赖性,那么有界线程池或队列可能导致线程饥饿死锁问题,此时应该使用*线程池例如newCachedThreadPool。
8.3.3 饱和策略
当有界队列被填满后,饱和策略开始发挥作用。ThreadPoolExecutor的饱和策略可通过setRejectedExecutionHandler来修改。
JDK提供了几个饱和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。
AbortPolicy是默认的饱和策略,该策略抛出uncheckedException RejectedExecutionException。
DiscardPolicy抛弃策略会悄悄抛弃这个任务
DiscardOldestPolicy会抛弃下一个即将要执行的任务(因为这个任务肯定是在队列里面放最久的,排在队列最前头)。所以这个千万别跟PriorityBlockingQueue一起用。
使用Semaphore来控制任务的提交速率:
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command)
throws InterruptedException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
}
}
}
8.3.4 线程工厂
自己定制线程池创建的线程,比如为线程指定名字,设置自定义的UncaughtExceptionHandler,向Logger写日志,维护统计信息包括多少线程被创建和销毁,以及在线程被创建和终止的时候把调试信息写入日志。
public class MyAppThread extends Thread {
public static final String DEFAULT_NAME = "MyAppThread";
private static volatile boolean debugLifecycle = false;
private static final AtomicInteger created = new AtomicInteger();
private static final AtomicInteger alive = new AtomicInteger();
private static final Logger log = Logger.getAnonymousLogger();
public MyAppThread(Runnable r) {
this(r, DEFAULT_NAME);
}
public MyAppThread(Runnable runnable, String name) {
super(runnable, name + "-" + created.incrementAndGet());
setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
public void uncaughtException(Thread t,
Throwable e) {
log.log(Level.SEVERE,
"UNCAUGHT in thread " + t.getName(), e);
}
});
}
public void run() {
// Copy debug flag to ensure consistent value throughout.
boolean debug = debugLifecycle;
if (debug) log.log(Level.FINE, "Created " + getName());
try {
alive.incrementAndGet();
super.run();
} finally {
alive.decrementAndGet();
if (debug) log.log(Level.FINE, "Exiting " + getName());
}
}
public static int getThreadsCreated() {
return created.get();
}
public static int getThreadsAlive() {
return alive.get();
}
public static boolean getDebug() {
return debugLifecycle;
}
public static void setDebug(boolean b) {
debugLifecycle = b;
}
}
public class MyThreadFactory implements ThreadFactory {
private final String poolName;
public MyThreadFactory(String poolName) {
this.poolName = poolName;
}
public Thread newThread(Runnable runnable) {
return new MyAppThread(runnable, poolName);
}
}
8.4 扩展ThreadPoolExecutor
ThreadPoolExecutor是可扩展的,它提供了几个可以在子类中改写的方法:beforeExecute,afterExecute和terminated。
beforeExecute和afterExecute是在每个执行任务的线程调用任务的run方法之前和之后会执行的
terminated方法是在整个Executor关闭的时候,也就是所有任务都完成并且所有工作者线程关闭后执行,这个方法可以用来释放Executor在其生命周期分配的各种资源,此外还可以执行发送通知,记录日志或收集finalize统计信息等。
示例:给线程池添加统计信息:
public class TimingThreadPool extends ThreadPoolExecutor {
public TimingThreadPool() {
super(1, 1, 0L, TimeUnit.SECONDS, null);
}
private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
private final Logger log = Logger.getLogger("TimingThreadPool");
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
log.fine(String.format("Thread %s: start %s", t, r));
startTime.set(System.nanoTime());
}
protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
log.fine(String.format("Thread %s: end %s, time=%dns",
t, r, taskTime));
} finally {
super.afterExecute(r, t);
}
}
protected void terminated() {
try {
log.info(String.format("Terminated: avg time=%dns",
totalTime.get() / numTasks.get()));
} finally {
super.terminated();
}
}
}
8.5 递归算法的并行化
当串行循环中各个迭代操作之间彼此独立,并且每个迭代操作执行的工作量比管理一个新任务带来的开销更多,那么这个串行循环就适合并行化。
public abstract class TransformingSequential {
void processSequentially(List<Element> elements) {
for (Element e : elements)
process(e);
}
void processInParallel(Executor exec, List<Element> elements) {
for (final Element e : elements)
exec.execute(new Runnable() {
public void run() {
process(e);
}
});
}
public abstract void process(Element e);
public <T> void sequentialRecursive(List<Node<T>> nodes,
Collection<T> results) {
for (Node<T> n : nodes) {
results.add(n.compute());
sequentialRecursive(n.getChildren(), results);
}
}
public <T> void parallelRecursive(final Executor exec,
List<Node<T>> nodes,
final Collection<T> results) {
for (final Node<T> n : nodes) {
exec.execute(new Runnable() {
public void run() {
results.add(n.compute());
}
});
parallelRecursive(exec, n.getChildren(), results);
}
}
public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
parallelRecursive(exec, nodes, resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return resultQueue;
}
interface Element {
}
interface Node<T> {
T compute();
List<Node<T>> getChildren();
}
}
示例:谜题框架
这项技术的一种强大应用就是解决一些谜题,这些谜题都需要找出一系列的操作从初始状态转换到目标状态,例如类似于 搬箱子 、 Hi-Q 、 四色方柱Instant Insanity和其他的棋牌谜题
具体算法描述和解法,请参考我的另一篇算法文章: 利用递归算法并行化解决谜题框架
本人博客已搬家,新地址为:http://yidao620c.github.io/
上一篇: Swagger使用说明
下一篇: 用一段PHP代码实现获取一言API的信息