欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

读书笔记(java并发编程实战——CompletionService)

程序员文章站 2022-03-10 12:33:42
...

原文请参考微信公众号(欢迎关注公众号:coding_song):https://mp.weixin.qq.com/s/R50Eh4kTDtA031i-yMUZAw 

 

Callable&Future

Callbale描述的是抽象的计算任务,有明确的起点,并且最终会结束;

 

@FunctionalInterface

public interface Callable<V>{

    V call()throwsException;

}

 

 

Future表示一个任务的生命周期 ,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。Future的get方法取决于任务的状态(尚未开始、正在运行、已完成),如果任务已经完成,get会立即返回结果或抛出一个异常;如果任务没有完成,则get将阻塞直到任务完成返回结果;如果任务被取消,则get抛出CancellationException;

Future的ge(long var1, TimeUnit var3)方法,可以设置超时时间,超时后可以做一些默认的处理,比如页面上展示广告信息,当获取某个广告时获取超时了,超时异常处理时可以设置一个默认广告位,而不至于什么都不显示

public interface Future<V>{

    boolean cancel(boolean var1);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException,ExecutionException;

    V get(long var1,TimeUnit var3) throws InterruptedException,ExecutionException,TimeoutException;

}

CompletionService

CompletionService将Executor和BlockingQueue的功能融合在一起,将Callable任务提交给CompletionService来执行,然后使用类似于队列操作的take和poll等方法来获得已完成的结果,而这些结果会在完成是被封装为Future

 

public interface CompletionService<V>{

    Future<V> submit(Callable<V> var1);

    Future<V> submit(Runnable var1, V var2);

    Future<V> take() throws InterruptedException;

    Future<V> poll();

    Future<V> poll(long var1,TimeUnit var3) throws InterruptedException;

}
 

 

ExcutorCompletionService实现了CompletionService,在构造函数中创建一个BlockingQueue来保存计算完成的结果,当计算完成时,调用FutureTask的done方法,将完成的结果添加到BlockingQueue中;队列的take和poll方法在得出结果之前是阻塞的

public class ExecutorCompletionService<V> implements CompletionService<V>{

    private final Executor executor;

    private final AbstractExecutorService aes;

    private final BlockingQueue<Future<V>> completionQueue;


    /**

     * FutureTask extension to enqueue upon completion

     */

    private class QueueingFuture extends FutureTask<Void>{

        QueueingFuture(RunnableFuture<V> task){

            super(task,null);

            this.task = task;

        }

        protected void done(){ completionQueue.add(task);}

        private final Future<V> task;

    }

    public Future<V> take()throws InterruptedException{

        return completionQueue.take();

    }

    public Future<V> poll(){

        return completionQueue.poll();

    }

    public Future<V> poll(long timeout,TimeUnit unit)

            throws InterruptedException{

        return completionQueue.poll(timeout, unit);

    }

    // 省略其他方法

}

 

CompletionService的使用:创建n个任务,将其提交到一个线程池,保留n个Future,可使用限时的get方法通过Future串行地获取每个结果;

 

public class CompletionServiceTest{


    private final ExecutorService executorService;


    private final static Integer COUNT =10;


    public CompletionServiceTest(ExecutorService executorService){

        this.executorService = executorService;

    }


    public void test()throws InterruptedException,ExecutionException{

        CompletionService<Object> completionService = new ExecutorCompletionService<>(executorService);

        for(int i =0; i < COUNT; i++){

            int finalI = i;

            completionService.submit(newCallable(){

                @Override

                public Object call()throws Exception{

                    return"done"+ finalI;

                }

            });

        }

        for(int i =0; i < COUNT; i++){

            Future<Object> future = completionService.take();

            Object object = future.get();

        }
 

    }

}

 

上述列子描述多个ExecutorCompletionService共享一个executorService,CompletionService的作用就相当于一组计算的句柄,与Future作为单个计算的句柄是非常类似的。通过记录提交给CompletionService的任务数量,并计算出已经获取的已完成结果的数量,及时使用一个共享的ExecutorService,也能知道已经获得了所有任务结果的时间

CompletionService应用场景:

(1)动态加载数据、下载图片等,一旦队列中有了数据,就可以陆续返回加载到的数据,不需要等到所有数据都加载完成才返回;滚动网页显示加载图片可以用其实现。

(2)从不同数据源加载数据,一个ExecutorCompletionService从一个数据源中获取数据,然后通过各个ExecutorCompletionService返回的结果,再做数据整合

(3)多线程并行处理数据,可以大大提高程序处理时间,提高性能

相关标签: 读书