欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java并发——CompletionService

程序员文章站 2022-05-03 20:58:38
...

CompletionService

接口CompletionService的功能是以异步的方式生产新的任务,一边处理已完成任务的结果,这样可以将执行任务与处理任务分离开来进行处理。使用submit执行任务,使用take取得已完成的任务,并按照完成这些任务的时间顺序处理它们的结果。

接口 Completion Service的结构比较简洁,仅有一个实现类 Executor Completion Service,该类的构造方法如图所示。
Java并发——CompletionService

从构造方法的声明中可以发现,类 Executor Completion Service需要依赖于 Executor对象,大部分的实现也就是使用线程池 ThreadPoolExecutor对象。

take()方法

public class TestDemo {
    public static void main(String[] args) {
        try {
            //take:获取并移除下一个已完成任务的Futrue,如果目前不存在这样的任务,则阻塞。
            ExecutorService executorService = Executors.newCachedThreadPool();
            CompletionService cs = new ExecutorCompletionService(executorService);
            for (int i = 0; i < 10; i++) {
                cs.submit(new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        long sleepValue = (int)(Math.random() * 1000);
                        System.out.println("sleep=" + sleepValue + " " + Thread.currentThread().getName());
                        Thread.sleep(sleepValue);
                        return "返回值:" + sleepValue + " " + Thread.currentThread().getName();
                    }
                });
            }
            for (int i = 0; i < 10; i++) {
                System.out.println(cs.take().get());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

从运行结果来看,方法 take()是按任务执行的速度,从快到慢的顺序获得Future对象,因为打印的时间是从小到大。

sleep=738 pool-1-thread-1
sleep=911 pool-1-thread-8
sleep=467 pool-1-thread-10
sleep=407 pool-1-thread-7
sleep=131 pool-1-thread-3
sleep=79 pool-1-thread-6
sleep=563 pool-1-thread-4
sleep=869 pool-1-thread-5
sleep=545 pool-1-thread-2
sleep=551 pool-1-thread-9
返回值:79 pool-1-thread-6
返回值:131 pool-1-thread-3
返回值:407 pool-1-thread-7
返回值:467 pool-1-thread-10
返回值:545 pool-1-thread-2
返回值:551 pool-1-thread-9
返回值:563 pool-1-thread-4
返回值:738 pool-1-thread-1
返回值:869 pool-1-thread-5
返回值:911 pool-1-thread-8

poll()方法

方法poll()的作用是获取并移除表示下一个已完成任务的 Future,如果不存在这样的任务,则返回null,方法pol()无阻
塞的效果:

public class TestDemo2 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        CompletionService cs = new ExecutorCompletionService(executorService);
        for (int i = 0; i < 1; i++) {
            cs.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    Thread.sleep(3000);
                    System.out.println("3秒过后");
                    return "返回值";
                }
            });
        }
        for (int i = 0; i < 1; i++) {
            System.out.println(cs.poll());
        }
    }
}

poll()方法返回null,因为当前没有任何已完成任务的Future对象,所以返回为null。poll()方法不具有阻塞特性。
Java并发——CompletionService

poll(long timeout, TimeUnit unit)

方法Future pol(long timeout, TimeUnit unit)的作用是等待指定的timeout时间,在timeout时间之内获取到值时立即向下继续执行,如果超时也立即向下执行。

public class TestDemo3 {
    public static void main(String[] args) {
        try {
            MyCallableA a =  new MyCallableA();
            MyCallableB b = new MyCallableB();

            Executor executor = Executors.newSingleThreadExecutor();
            CompletionService cs = new ExecutorCompletionService(executor);
            cs.submit(a);
            cs.submit(b);

            for (int i = 0; i < 2; i++) {
                System.out.println("zzzzzzzzzzz" + " " + cs.take());
            }
            System.out.println("main end");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class MyCallableA implements Callable<String> {

    @Override
    public String call() throws Exception {
        System.out.println("MyCallableA begin " + System.currentTimeMillis());
        Thread.sleep(1000);
        System.out.println("MyCallableA end " + System.currentTimeMillis());
        return "returnA";
    }
}

class MyCallableB implements Callable<String> {

    @Override
    public String call() throws Exception {
        System.out.println("MyCallableB begin " + System.currentTimeMillis());
        Thread.sleep(5000);
        int i = 0;
        if (i == 0) {
            throw new Exception("抛出异常!");
        }
        System.out.println("MyCallableB end " + System.currentTimeMillis());
        return "returnB";
    }
}

MyCallableB虽然出现异常,但是并没有调用FutureTask的get()方法,所以不出现异常。
Java并发——CompletionService

调用get()方法:
Java并发——CompletionService
Java并发——CompletionService

调换A和B的执行顺序:
Java并发——CompletionService
任务B出现异常,任务A并未输出。
Java并发——CompletionService

调用poll()方法:

public class TestDemo3 {
    public static void main(String[] args) {
        try {
            MyCallableA a =  new MyCallableA();
            MyCallableB b = new MyCallableB();

            Executor executor = Executors.newSingleThreadExecutor();
            CompletionService cs = new ExecutorCompletionService(executor);
            cs.submit(a);
            cs.submit(b);

            for (int i = 0; i < 2; i++) {
                System.out.println("zzzzzzzzzzz" + " " + cs.poll());
            }
            Thread.sleep(6000);
            System.out.println("A处" + " " + cs.poll());
            System.out.println("B处" + " " + cs.poll());
            System.out.println("main end");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class MyCallableA implements Callable<String> {

    @Override
    public String call() throws Exception {
        System.out.println("MyCallableA begin " + System.currentTimeMillis());
        Thread.sleep(1000);
        System.out.println("MyCallableA end " + System.currentTimeMillis());
        return "returnA";
    }
}

class MyCallableB implements Callable<String> {

    @Override
    public String call() throws Exception {
        System.out.println("MyCallableB begin " + System.currentTimeMillis());
        Thread.sleep(5000);
        int i = 0;
        if (i == 0) {
            throw new Exception("抛出异常!");
        }
        System.out.println("MyCallableB end " + System.currentTimeMillis());
        return "returnB";
    }
}

任务B出现异常后,poll()返回值为null
Java并发——CompletionService

poll()后调用get()方法获取返回值:
Java并发——CompletionService
A任务返回值正常,任务B返回值出现异常。
Java并发——CompletionService

调换A和B任务执行顺序:
Java并发——CompletionService
任务A并未打印,任务B抛出异常。
Java并发——CompletionService

Future submit(Runnable task, V result)

public class TestDemo4 {
    public static void main(String[] args) {
        UserInfo userInfo = new UserInfo();
        MyRunnable5 myRunnable5 = new MyRunnable5(userInfo);

        Executor executor = Executors.newCachedThreadPool();
        CompletionService cs = new ExecutorCompletionService(executor);
        Future<UserInfo> future = cs.submit(myRunnable5, userInfo);
        try {
            System.out.println(future.get().getUsername() + " " + future.get().getPassword());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

class UserInfo {
    private String username;
    private String password;

    public UserInfo() {
        super();
    }

    public UserInfo(String username, String password) {
        this.username = username;
        this.password = password;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}


class MyRunnable5 implements Runnable {
    private UserInfo userInfo;

    public MyRunnable5(UserInfo userInfo) {
        this.userInfo = userInfo;
    }

    @Override
    public void run() {
        userInfo.setUsername("usernameValue");
        userInfo.setPassword("passwordValue");
        System.out.println("run运行了 ");
    }
}

Java并发——CompletionService

总结

接口CompletionService完全可以避免FutureTask类阻塞的缺点,可更加有效地处理Future的返回值,也就是哪个任务先执行完,CompletionService 就先取得这个任务的返回值再处理。