Java并发——CompletionService
CompletionService
接口CompletionService的功能是以异步的方式生产新的任务,一边处理已完成任务的结果,这样可以将执行任务与处理任务分离开来进行处理。使用submit执行任务,使用take取得已完成的任务,并按照完成这些任务的时间顺序处理它们的结果。
接口 Completion Service的结构比较简洁,仅有一个实现类 Executor Completion Service,该类的构造方法如图所示。
从构造方法的声明中可以发现,类 Executor Completion Service需要依赖于 Executor对象,大部分的实现也就是使用线程池 ThreadPoolExecutor对象。
take()方法
public class TestDemo {
public static void main(String[] args) {
try {
//take:获取并移除下一个已完成任务的Futrue,如果目前不存在这样的任务,则阻塞。
ExecutorService executorService = Executors.newCachedThreadPool();
CompletionService cs = new ExecutorCompletionService(executorService);
for (int i = 0; i < 10; i++) {
cs.submit(new Callable<String>() {
@Override
public String call() throws Exception {
long sleepValue = (int)(Math.random() * 1000);
System.out.println("sleep=" + sleepValue + " " + Thread.currentThread().getName());
Thread.sleep(sleepValue);
return "返回值:" + sleepValue + " " + Thread.currentThread().getName();
}
});
}
for (int i = 0; i < 10; i++) {
System.out.println(cs.take().get());
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
从运行结果来看,方法 take()是按任务执行的速度,从快到慢的顺序获得Future对象,因为打印的时间是从小到大。
sleep=738 pool-1-thread-1
sleep=911 pool-1-thread-8
sleep=467 pool-1-thread-10
sleep=407 pool-1-thread-7
sleep=131 pool-1-thread-3
sleep=79 pool-1-thread-6
sleep=563 pool-1-thread-4
sleep=869 pool-1-thread-5
sleep=545 pool-1-thread-2
sleep=551 pool-1-thread-9
返回值:79 pool-1-thread-6
返回值:131 pool-1-thread-3
返回值:407 pool-1-thread-7
返回值:467 pool-1-thread-10
返回值:545 pool-1-thread-2
返回值:551 pool-1-thread-9
返回值:563 pool-1-thread-4
返回值:738 pool-1-thread-1
返回值:869 pool-1-thread-5
返回值:911 pool-1-thread-8
poll()方法
方法poll()的作用是获取并移除表示下一个已完成任务的 Future,如果不存在这样的任务,则返回null,方法pol()无阻
塞的效果:
public class TestDemo2 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
CompletionService cs = new ExecutorCompletionService(executorService);
for (int i = 0; i < 1; i++) {
cs.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(3000);
System.out.println("3秒过后");
return "返回值";
}
});
}
for (int i = 0; i < 1; i++) {
System.out.println(cs.poll());
}
}
}
poll()方法返回null,因为当前没有任何已完成任务的Future对象,所以返回为null。poll()方法不具有阻塞特性。
poll(long timeout, TimeUnit unit)
方法Future pol(long timeout, TimeUnit unit)的作用是等待指定的timeout时间,在timeout时间之内获取到值时立即向下继续执行,如果超时也立即向下执行。
public class TestDemo3 {
public static void main(String[] args) {
try {
MyCallableA a = new MyCallableA();
MyCallableB b = new MyCallableB();
Executor executor = Executors.newSingleThreadExecutor();
CompletionService cs = new ExecutorCompletionService(executor);
cs.submit(a);
cs.submit(b);
for (int i = 0; i < 2; i++) {
System.out.println("zzzzzzzzzzz" + " " + cs.take());
}
System.out.println("main end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class MyCallableA implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("MyCallableA begin " + System.currentTimeMillis());
Thread.sleep(1000);
System.out.println("MyCallableA end " + System.currentTimeMillis());
return "returnA";
}
}
class MyCallableB implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("MyCallableB begin " + System.currentTimeMillis());
Thread.sleep(5000);
int i = 0;
if (i == 0) {
throw new Exception("抛出异常!");
}
System.out.println("MyCallableB end " + System.currentTimeMillis());
return "returnB";
}
}
MyCallableB虽然出现异常,但是并没有调用FutureTask的get()方法,所以不出现异常。
调用get()方法:
调换A和B的执行顺序:
任务B出现异常,任务A并未输出。
调用poll()方法:
public class TestDemo3 {
public static void main(String[] args) {
try {
MyCallableA a = new MyCallableA();
MyCallableB b = new MyCallableB();
Executor executor = Executors.newSingleThreadExecutor();
CompletionService cs = new ExecutorCompletionService(executor);
cs.submit(a);
cs.submit(b);
for (int i = 0; i < 2; i++) {
System.out.println("zzzzzzzzzzz" + " " + cs.poll());
}
Thread.sleep(6000);
System.out.println("A处" + " " + cs.poll());
System.out.println("B处" + " " + cs.poll());
System.out.println("main end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class MyCallableA implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("MyCallableA begin " + System.currentTimeMillis());
Thread.sleep(1000);
System.out.println("MyCallableA end " + System.currentTimeMillis());
return "returnA";
}
}
class MyCallableB implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("MyCallableB begin " + System.currentTimeMillis());
Thread.sleep(5000);
int i = 0;
if (i == 0) {
throw new Exception("抛出异常!");
}
System.out.println("MyCallableB end " + System.currentTimeMillis());
return "returnB";
}
}
任务B出现异常后,poll()返回值为null
poll()后调用get()方法获取返回值:
A任务返回值正常,任务B返回值出现异常。
调换A和B任务执行顺序:
任务A并未打印,任务B抛出异常。
Future submit(Runnable task, V result)
public class TestDemo4 {
public static void main(String[] args) {
UserInfo userInfo = new UserInfo();
MyRunnable5 myRunnable5 = new MyRunnable5(userInfo);
Executor executor = Executors.newCachedThreadPool();
CompletionService cs = new ExecutorCompletionService(executor);
Future<UserInfo> future = cs.submit(myRunnable5, userInfo);
try {
System.out.println(future.get().getUsername() + " " + future.get().getPassword());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
class UserInfo {
private String username;
private String password;
public UserInfo() {
super();
}
public UserInfo(String username, String password) {
this.username = username;
this.password = password;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
class MyRunnable5 implements Runnable {
private UserInfo userInfo;
public MyRunnable5(UserInfo userInfo) {
this.userInfo = userInfo;
}
@Override
public void run() {
userInfo.setUsername("usernameValue");
userInfo.setPassword("passwordValue");
System.out.println("run运行了 ");
}
}
总结
接口CompletionService完全可以避免FutureTask类阻塞的缺点,可更加有效地处理Future的返回值,也就是哪个任务先执行完,CompletionService 就先取得这个任务的返回值再处理。