guava笔记5-并发 博客分类: guavajava相关
一. ListenableFuture是用来增强Future的功能的。
我们知道Future表示一个异步计算任务,当任务完成时可以得到计算结果。如果我们希望一旦计算完成就拿到结果展示给用户或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做,不断代码复杂,而且效率低下。
ListenableFuture,顾名思义,就是可以监听的Future。我们可以为ListenableFuture增加Listener监听器,当任务完成时,直接执行某个线程,或者我们可以直接为ListenableFuture设置回调函数,当任务完成时,自动执行该回调方法。
下面我们看看ListenableFuture的实际用法。
(1)怎样得到ListenableFuture对象?
我们知道ExecutorService.submit(Callable) 会返回Future对象,那么ListeningExecutorService.submit(Callable)会返回ListenableFuture对象。
ListeningExecutorService可以通过MoreExecutors.listeningDecorator(ExecutorService)来得到。
(2)增加Listenable功能
方法一:直接添加监听
ListenableFuture. addListener(Runnable listener, Executor executor)
可以为ListenableFuture增加一个监听,当线程计算完成时,自动执行一个Runnable线程。
方法二:添加回调方法
Futures.addCallback(ListenableFuture, new FutureCallback<T>() {
public void onSuccess(T t) {
}
public void onFailure(Throwable thrown) {
}
});
Futures的静态方法addCallback可以为ListenableFuture对象添加回调函数,回调里面可以定义计算成功时和失败时分别的操作。onSuccess里面可以把计算结果做为参数传入。onFailure里面可以把异常做为参数传入。
那我们一般应该怎么选择这两种方式呢?建议直接用第二种,因为这种方式可以把计算结果做为参数传入。其实,第二种的内部实现就是用的第一种方式,但是用起来会更加的简洁。
(3)ListenableFutureTask
我们知道,FutureTask实现了Future和Runnable接口。
FutureTask有2个构造方法:
FutureTask(Callable<V> callable)
创建一个 FutureTask,一旦运行就执行给定的 Callable。
FutureTask(Runnable runnable, V result)
创建一个 FutureTask,一旦运行就执行给定的 Runnable,并安排成功完成时 get 返回给定的结果
Thread thread = new Thread(futureTask);
thread.start();
当futureTask被某个线程执行时,就会自动执行Callable或者Runnable,Callable的call方法返回计算结果,对应Runnable作为参数的版本,另一个参数V result将作为结果进行返回。
当计算完成时,futureTask.get()方法就会返回这个结果。
可以看到,这段代码很复杂,而且计算完成时缺乏回调机制。ListenableFutureTask正是为了解决这些问题。ListenableFutureTask同样有2个版本的方法。
ListenableFutureTask.create(Callable<V>)
ListenableFutureTask.create(Runnable, V)
很简单,这2个方法都返回ListenableFutureTask<V>对象,然后就可以用之前所说的方式直接添加回调函数或者回调接口。
(4)返回值固定,不需要计算时
有时候,我们的Future不需要实现一个方法来计算返回值,而只需要返回一个固定值来做为返回值,那么可以直接使用 SettableFuture,或者跟 SettableFuture 一样,自己来继承AbstractFuture<V>。
SettableFuture提供了2个方法,分别用来设置成功时的返回值,或者失败时抛出的异常。
booleanset(V value)
booleansetException(Throwable throwable)
另外还有静态方法create()会构造一个初始状态的SettableFuture对象。
示例代码:
SettableFuture<Integer> future = SettableFuture.create();
future.set(100);//自己根据业务逻辑,来设置返回值
// or future.setException(new Throwable()); //或者计算失败,抛出异常
Futures.addCallback(future, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
}
@Override
public void onFailure(Throwable t) {
}
});
(5)旧代码改造:将Future封装为ListenableFutureTask
对于老的使用Future对象的代码,如果希望使用ListenableFutureTask,可以考虑使用JdkFutureAdapters.listenInPoolThread(Future) 来将Future对象直接封装为ListenableFutureTask。
但是,guava规范强烈建议我们还是按照之前的方法来产生ListenableFutureTask对象,所以如果时间允许,最好代码重构更加彻底一点。
(6)ListenableFutureTask链条
有时候,我们得到一个计算结果后,会再调用另一个方法,再次进行计算,得到一个新的计算结果,这时候我们可以使用Futures.transform方法来将两次计算链起来。
public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
AsyncFunction<? super I, ? extends O> function,
Executor executor)
Input表示第一个异步计算任务,AsyncFunction表示得到计算结果后,怎样再次转换为另一个计算任务,Executor表示运行转换函数的线程。
这个理解起来确实费劲,还是看一个示例吧:
//get a random integer
class Task1 implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println("task1 begin");
Thread.sleep(2000);
System.out.println("task1 end");
return new Random().nextInt();
}
}
//add suffix to an integer
class Task2 implements Callable<String>{
private Integer i;
public Task2(Integer i){
this.i = i;
}
@Override
public String call() throws Exception {
System.out.println("task2 begin");
Thread.sleep(4000);
System.out.println("task2 end");
return i+"suffix";
}
}
@Test
public void testLisenableFutureChain() throws InterruptedException, ExecutionException{
ExecutorService normalService = Executors.newFixedThreadPool(100);
final ListeningExecutorService service = MoreExecutors.listeningDecorator(normalService);
ListenableFuture<Integer> future1 = service.submit(new Task1());
AsyncFunction<Integer,String> asyncFunction = new AsyncFunction<Integer,String>(){
@Override
public ListenableFuture<String> apply(Integer input) throws Exception {
return service.submit(new Task2(input));
}};
ListenableFuture<String> futures2 = Futures.transform(future1, asyncFunction);
System.out.println(futures2.get());
}
这个例子展示了两个任务是怎样依次执行的。第一个任务得到一个整数,第二个任务在整数的后面加上后缀。第一个任务完成后立刻执行第二个任务。
除了transform(ListenableFuture<A>, AsyncFunction<A, B>, /*Executor*/)方法,Futures中还有几个其他的方法,也是用于几个Listenable任务组合执行:
Futures.transform(ListenableFuture<A>, Function<A, B>, /*Executor*/) 转换的函数是同步函数,内部处理其实是把这个Function转化为立刻返回成功的AsyncFunction,然后调用参数为AsyncFunction的transform方法。
Futures.allAsList(Iterable<ListenableFuture<V>>) 将多个任务的计算结果拼成一个list对象。注意:只要有一个任务计算失败,整个任务就是失败的。
Futures.successfulAsList(Iterable<ListenableFuture<V>>) 同上,但是只把计算成果的结果返回,对于计算失败的任务直接抛弃。
(7)CheckedFuture
得到ListenableFuture对象后,如果执行失败,调用get方法会直接抛出系统异常,如InterruptedException,CancellationException,ExecutionException,我们可能需要手动捕获这些异常,然后转换为应用异常。
CheckedFuture继承自ListenableFuture,新增了checkedGet方法,这个方法会自动把InterruptedException,CancellationException,ExecutionException转化为指定的异常类型。
Futures.makeChecked(ListenableFuture<V>, Function<Exception, X>)可以把普通的ListenableFuture转换为CheckedFuture类型。Function用于定义这三种异常分别转换为何种异常。
二.Guava的线程支持除了ListenableFuture,还有个ServiceManager,如果你想编写一个类似Tomcat的服务器程序,这个可以派上用场。
Service代表一个服务,ServiceManager用来对这些服务进行管理和状态维护。
Service.State这个枚举类型定义了service常见的6种状态:FAILED,NEW,RUNNING,STARTING,STOPPING,TERMINATED。
一般的Server可以按下面的方式实现。
class Server {
public static void main(String[] args) {
Set<Service> services = ...;
ServiceManager manager = new ServiceManager(services);
manager.addListener(new Listener() {
public void stopped() {}
public void healthy() {
// Services have been initialized and are healthy, start accepting requests...
}
public void failure(Service service) {
// Something failed, at this point we could log it, notify a load balancer, or take
// some other action. For now we will just exit.
System.exit(1);
}
},
MoreExecutors.sameThreadExecutor());
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
// Give the services 5 seconds to stop to ensure that we are responsive to shutdown
// requests.
try {
manager.stopAsync().awaitStopped(5, TimeUnit.SECONDS);
} catch (TimeoutException timeout) {
// stopping timed out
}
}
});
manager.startAsync(); // start all the services asynchronously
}
}}
上一篇: 夜光精讲 Opentcs 框架与实际AGV方案(二)
下一篇: 【以太坊】- 私有节点