欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

非主流并发工具之 CompletionService 非主流并发工具之 CompletionServicejavacompletionservice多线程 

程序员文章站 2022-04-20 10:58:32
...

非主流并发工具之 CompletionService

CompletionService 接口的实例可以充当生产者和消费者的中间处理引擎,从而达到将提交任务和处理结果的代码进行解耦的目的。生产者调用 submit 方法提交任务,而消费者调用 poll (非阻塞)或 take (阻塞)方法获取下一个结果:这一特征看起来和阻塞队列(BlockingQueue )类似,两者的区别在于 CompletionService 要负责任务的处理,而阻塞队列则不会。

在 JDK 中,该接口只有一个实现类 ExecutorCompletionService ,该类使用创建时提供的 Executor 对象(通常是线程池)来执行任务,然后将结果放入一个阻塞队列中:果然本就是一家亲啊!ExecutorCompletionService 将线程池和阻塞队列糅合在一起,仅仅通过三个方法,就实现了任务的异步处理,可谓并发编程初学者的神兵利器!

接下来看一个例子。楼主有一大堆 *.java 文件,需要计算它们的代码总行数。利用 ExecutorCompletionService 可以写出很简单的多线程处理代码:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public int countLines(List<Path> javaFiles) throws Exception {
     // 根据处理器数量创建线程池。虽然多线程并不保证能够提升性能,但适量地
     // 开线程一般可以从系统骗取更多资源。
     ExecutorService es = Executors.newFixedThreadPool(
             Runtime.getRuntime().availableProcessors() * 2 );
     // 使用 ExecutorCompletionService 内建的阻塞队列。
     CompletionService cs = new ExecutorCompletionService(es);
 
     // 按文件向 CompletionService 提交任务。
     for ( final Path javaFile : javaFiles) {
         cs.submit( new Callable<Integer>() {
             @Override
             public Integer call() throws Exception {
                 // 略去计算单个文件行数的代码。
                 return countLines(javaFile);
             }
         });
     }
 
     try {
         int loc = 0 ;
         int size = javaFiles.size();
         for ( int i = 0 ; i < size; i++) {
             // take 方法等待下一个结果并返回 Future 对象。不直接返回计算结果是为了
             // 捕获计算时可能抛出的异常。
             // poll 不等待,有结果就返回一个 Future 对象,否则返回 null。
             loc += cs.take().get();
         }
         return loc;
     } finally {
         // 关闭线程池。也可以将线程池提升为字段以便重用。
         // 如果任务线程(Callable#call)能响应中断,用 shutdownNow 更好。
         es.shutdown();
     }
}

最后,CompletionService 也不是到处都能用,它不适合处理任务数量有限但个数不可知的场景。例如,要统计某个文件夹中的文件个数,在遍历子文件夹的时候也会“递归地”提交新的任务,但最后到底提交了多少,以及在什么时候提交完了所有任务,都是未知数,无论 CompletionService 还是线程池都无法进行判断。这种情况只能直接用线程池来处理。