读书笔记(java并发编程实战——CompletionService)
原文请参考微信公众号(欢迎关注公众号:coding_song):https://mp.weixin.qq.com/s/R50Eh4kTDtA031i-yMUZAw
Callable&Future
Callbale描述的是抽象的计算任务,有明确的起点,并且最终会结束;
@FunctionalInterface public interface Callable<V>{ V call()throwsException; }
Future表示一个任务的生命周期 ,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。Future的get方法取决于任务的状态(尚未开始、正在运行、已完成),如果任务已经完成,get会立即返回结果或抛出一个异常;如果任务没有完成,则get将阻塞直到任务完成返回结果;如果任务被取消,则get抛出CancellationException;
Future的ge(long var1, TimeUnit var3)方法,可以设置超时时间,超时后可以做一些默认的处理,比如页面上展示广告信息,当获取某个广告时获取超时了,超时异常处理时可以设置一个默认广告位,而不至于什么都不显示
public interface Future<V>{ boolean cancel(boolean var1); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException,ExecutionException; V get(long var1,TimeUnit var3) throws InterruptedException,ExecutionException,TimeoutException; }
CompletionService
CompletionService将Executor和BlockingQueue的功能融合在一起,将Callable任务提交给CompletionService来执行,然后使用类似于队列操作的take和poll等方法来获得已完成的结果,而这些结果会在完成是被封装为Future
public interface CompletionService<V>{ Future<V> submit(Callable<V> var1); Future<V> submit(Runnable var1, V var2); Future<V> take() throws InterruptedException; Future<V> poll(); Future<V> poll(long var1,TimeUnit var3) throws InterruptedException; }
ExcutorCompletionService实现了CompletionService,在构造函数中创建一个BlockingQueue来保存计算完成的结果,当计算完成时,调用FutureTask的done方法,将完成的结果添加到BlockingQueue中;队列的take和poll方法在得出结果之前是阻塞的
public class ExecutorCompletionService<V> implements CompletionService<V>{ private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; /** * FutureTask extension to enqueue upon completion */ private class QueueingFuture extends FutureTask<Void>{ QueueingFuture(RunnableFuture<V> task){ super(task,null); this.task = task; } protected void done(){ completionQueue.add(task);} private final Future<V> task; } public Future<V> take()throws InterruptedException{ return completionQueue.take(); } public Future<V> poll(){ return completionQueue.poll(); } public Future<V> poll(long timeout,TimeUnit unit) throws InterruptedException{ return completionQueue.poll(timeout, unit); } // 省略其他方法 }
CompletionService的使用:创建n个任务,将其提交到一个线程池,保留n个Future,可使用限时的get方法通过Future串行地获取每个结果;
public class CompletionServiceTest{ private final ExecutorService executorService; private final static Integer COUNT =10; public CompletionServiceTest(ExecutorService executorService){ this.executorService = executorService; } public void test()throws InterruptedException,ExecutionException{ CompletionService<Object> completionService = new ExecutorCompletionService<>(executorService); for(int i =0; i < COUNT; i++){ int finalI = i; completionService.submit(newCallable(){ @Override public Object call()throws Exception{ return"done"+ finalI; } }); } for(int i =0; i < COUNT; i++){ Future<Object> future = completionService.take(); Object object = future.get(); } } }
上述列子描述多个ExecutorCompletionService共享一个executorService,CompletionService的作用就相当于一组计算的句柄,与Future作为单个计算的句柄是非常类似的。通过记录提交给CompletionService的任务数量,并计算出已经获取的已完成结果的数量,及时使用一个共享的ExecutorService,也能知道已经获得了所有任务结果的时间
CompletionService应用场景:
(1)动态加载数据、下载图片等,一旦队列中有了数据,就可以陆续返回加载到的数据,不需要等到所有数据都加载完成才返回;滚动网页显示加载图片可以用其实现。
(2)从不同数据源加载数据,一个ExecutorCompletionService从一个数据源中获取数据,然后通过各个ExecutorCompletionService返回的结果,再做数据整合
(3)多线程并行处理数据,可以大大提高程序处理时间,提高性能
上一篇: WebSocket的广播式
下一篇: JVM笔记八-堆参数调优