《Java并发编程》之四:Executor框架
ExecutoreService增加了生命周期的管理,有三种状态:运行、关闭、已终止。
public class LifecycleWebServer {
private final ExecutorService exec = Executors.newCachedThreadPool();
public void start() throws IOException {
ServerSocket socket = new ServerSocket(80);
while (!exec.isShutdown()) {
try {
final Socket conn = socket.accept();
exec.execute(new Runnable() {
public void run() {
handleRequest(conn);
}
});
} catch (RejectedExecutionException e) {
if (!exec.isShutdown())
log("task submission rejected", e);
}
}
}
public void stop() {
exec.shutdown();
}
private void log(String msg, Exception e) {
Logger.getAnonymousLogger().log(Level.WARNING, msg, e);
}
void handleRequest(Socket connection) {
Request req = readRequest(connection);
if (isShutdownRequest(req))
stop();
else
dispatchRequest(req);
}
interface Request {
}
private Request readRequest(Socket s) {
return null;
}
private void dispatchRequest(Request r) {
}
private boolean isShutdownRequest(Request r) {
return false;
}
}
6.2.5 延迟任务和周期任务
我们通常会用Timer管理延迟或周期执行的一些任务,但是这是不安全的,因为Timer存在一些缺陷,我们应该考虑使用ScheduledThreadPoolExecutor来代替它,可以通过ScheduledThreadPoolExecutor构造函数或者是newScheduledThreadPool工厂方法创建该类的对象。
6.3.2 携带结果的任务Callable和Future
Executor执行的任务有4个生命周期阶段:创建、提交、开始、完成。如果想要取消某些任务,在Executor框架中,已提交但尚未开始的任务可以取消,但对于那些已经开始执行的任务,只有当它们能响应中断时,才能取消。
Future表示一个任务的生命周期,并提供了相应的方法判断是否已经完成或取消,以及获取任务的结果和取消任务等。
ExecutorService中所有submit方法都将返回一个Future,从而将一个Runnable或Callable提交给Executor,并得到一个Future用来获取任务执行结果或者取消任务。还可以显示为某个指定的Runnable或者Callable实例化一个FutureTask。
6.3.5 CompletionService:Executor与BlockingQueue
如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Futrue,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮训来判断任务是否完成,这个太繁琐的。我们可以通过CompletionService优化这种方案。
示例:使用CompletionService实现页面渲染器:
public abstract class Renderer {
private final ExecutorService executor;
Renderer(ExecutorService executor) {
this.executor = executor;
}
void renderPage(CharSequence source) {
final List<ImageInfo> info = scanForImageInfo(source);
CompletionService<ImageData> completionService =
new ExecutorCompletionService<ImageData>(executor);
for (final ImageInfo imageInfo : info)
completionService.submit(new Callable<ImageData>() {
public ImageData call() {
return imageInfo.downloadImage();
}
});
renderText(source);
try {
for (int t = 0, n = info.size(); t < n; t++) {
Future<ImageData> f = completionService.take();
ImageData imageData = f.get();
renderImage(imageData);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
interface ImageData {
}
interface ImageInfo {
ImageData downloadImage();
}
abstract void renderText(CharSequence s);
abstract List<ImageInfo> scanForImageInfo(CharSequence s);
abstract void renderImage(ImageData i);
}
6.3.7 为任务设置时限:
public class RenderWithTimeBudget {
private static final Ad DEFAULT_AD = new Ad();
private static final long TIME_BUDGET = 1000;
private static final ExecutorService exec = Executors.newCachedThreadPool();
Page renderPageWithAd() throws InterruptedException {
long endNanos = System.nanoTime() + TIME_BUDGET;
Future<Ad> f = exec.submit(new FetchAdTask());
// Render the page while waiting for the ad
Page page = renderPageBody();
Ad ad;
try {
// Only wait for the remaining time budget
long timeLeft = endNanos - System.nanoTime();
ad = f.get(timeLeft, NANOSECONDS);
} catch (ExecutionException e) {
ad = DEFAULT_AD;
} catch (TimeoutException e) {
ad = DEFAULT_AD;
f.cancel(true);
}
page.setAd(ad);
return page;
}
Page renderPageBody() {
return new Page();
}
static class Ad {
}
static class Page {
public void setAd(Ad ad) {
}
}
static class FetchAdTask implements Callable<Ad> {
public Ad call() {
return new Ad();
}
}
}
6.3.8 还有一个很好用的invokeAll方法
支持限时的invokeAll,将多个任务提交到一个ExecutorService得到结果。invokeAll方法参数为一组任务,并返回一组Future。这两个集合有着相同的结构。invokeAll按照任务集合中迭代器顺序将所有Future添加到返回集合中,从而使调用者能将各个Future与其表示的Callable关联起来。当所以任务都执行完成时,或者调用线程中断,或者超时,invokeAll将返回。当超过指定时限后,任何未完成任务都将取消。
用一个限时报价服务来阐明:
public class TimeBudget {
private static ExecutorService exec = Executors.newCachedThreadPool();
public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo, Set<TravelCompany> companies, Comparator<TravelQuote> ranking, long time, TimeUnit unit)
throws InterruptedException {
List<QuoteTask> tasks = new ArrayList<QuoteTask>();
for (TravelCompany company : companies)
tasks.add(new QuoteTask(company, travelInfo));
List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit);
List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size());
Iterator<QuoteTask> taskIter = tasks.iterator();
for (Future<TravelQuote> f : futures) {
QuoteTask task = taskIter.next();
try {
quotes.add(f.get());
} catch (ExecutionException e) {
quotes.add(task.getFailureQuote(e.getCause()));
} catch (CancellationException e) {
quotes.add(task.getTimeoutQuote(e));
}
}
Collections.sort(quotes, ranking);
return quotes;
}
}
class QuoteTask implements Callable<TravelQuote> {
private final TravelCompany company;
private final TravelInfo travelInfo;
public QuoteTask(TravelCompany company, TravelInfo travelInfo) {
this.company = company;
this.travelInfo = travelInfo;
}
TravelQuote getFailureQuote(Throwable t) {
return null;
}
TravelQuote getTimeoutQuote(CancellationException e) {
return null;
}
public TravelQuote call() throws Exception {
return company.solicitQuote(travelInfo);
}
}
interface TravelCompany {
TravelQuote solicitQuote(TravelInfo travelInfo) throws Exception;
}
interface TravelQuote {
}
interface TravelInfo {
}
本人博客已搬家,新地址为:http://yidao620c.github.io/
上一篇: SQL之WITH语句进阶
下一篇: 并发