0318 guava并发工具
并发是一个难题,但是可以通过使用强力简单的抽象来显著的简化,为了简化问题,guava扩展了future接口,即 listenablefuture (可以监听的future)。
我强烈建议你在你的所有代码里使用listenablefuture去替代future,原因如下:
- 很多的futures 类的方法需要它。(futures工具类使用)
- 它比后来改造为listenablefutrue更简单。(早点使用比重构更简单)
- 工具方法的提供者不需要提供future和listenablefuture方法的变体。(不需要兼容两套)
接口
一个传统的futrue代表一个异步计算的结果:一个可能完成也可能没有完成输出结果的计算。
一个future可以用在进度计算,或者说是 一个提供给我们结果的服务的承诺。
一个listenablefuture允许注册当你在计算完成的时候的回调,或者计算已经完成了。
这个简单的增强让高效支持多种操作成为可能。而future接口并不能支持。
listenblefuture中添加的基本操作是
addlistener(runnable , executor ),
它指出了当未来计算完成时,指定的runnable会在指定的executor中运行。
增加回调
很多用户喜欢使用 futures.addcallback(listenablefuture,futurecallback,executor)方法。
futurecallback实现了下面两个方法:
- onsuccess(v) 当未来成功执行的动作,基于计算结果
- onfailure(throwable) 当未来失败执行的动作,基于失败
创建
相较于jdk提供的 executorservice.submit(callable)方法来初始化一个异步计算。它返回一个常规的future,
guava提供了listeningexecutorservice接口,它返回listenablefuture。
把executorservice转换为listenableexecutorservice
使用:moreexecutors.listeningdecorator(executorservice)
基础用法如下:
/** * 说明:使用例子代码 * @author carter * 创建时间: 2020年03月19日 9:54 上午 **/ @slf4j public class listenablefutureutils { public static void main(string[] args) { listeningexecutorservice service = moreexecutors.listeningdecorator( executors.newfixedthreadpool(10)); final listenablefuture<aresult> listenablefuture = service.submit(() -> { try { timeunit.seconds.sleep(5); } catch (interruptedexception e) { e.printstacktrace(); } return new aresult(30, "male", 1); }); futures.addcallback(listenablefuture, new futurecallback<aresult>() { @override public void onsuccess(aresult aresult) { log.info("计算成功,{}",aresult); } @override public void onfailure(throwable throwable) { log.error("计算错误",throwable); } },service); } @data @allargsconstructor public static class aresult{ private integer age; private string sex; private integer id; } }
相对的,如果你想从基于futuretask的api转换过来,
guava提供了
listenablefuturetask.create(callable)
和
listenablefuturetask.create(runnable)
不同于jdk,listenablefuturetask并不是直接扩展的。
如果你喜欢抽象的设置future的值,而不是实现一个方法然后计算值,可以考虑使用abstractfuture或使用settablefuture ;
如果你必须转换future为listenablefuture,你别无选择,必须使用 jdkfutureadapters.listeninpoolthread(future)来转换future为listenablefuture
任何时候只要可能,推荐你修改源码让它返回一个 listenablefuture
应用
使用listenablfuture最重要的原因是可以使用链式异步操作。
代码如下:
package com.xxx.demo; import com.google.common.util.concurrent.asyncfunction; import com.google.common.util.concurrent.futures; import com.google.common.util.concurrent.listenablefuture; import lombok.allargsconstructor; import lombok.data; /** * 说明:异步操作链 * @author carter * 创建时间: 2020年03月19日 10:11 上午 **/ public class applicationutils { public static void main(string[] args) { query query = new query(30); listenablefuture<rowkey> rowkeyfuture = lookup(query); asyncfunction<rowkey, queryresult> queryfun = rowkey -> readdata(rowkey); final listenablefuture<queryresult> queryresultlistenablefuture = futures.transformasync(rowkeyfuture, queryfun); } private static listenablefuture<queryresult> readdata(rowkey rowkey) { return null; } private static listenablefuture<rowkey> lookup(query query) { return null; } @data @allargsconstructor public static class rowkey { private string id; } @data @allargsconstructor public static class query { private integer age; } @data @allargsconstructor public static class queryresult { private string id; private string age; } }
很多其他高效支持的操作listenablefuture提供,而future不提供。
不同的操作可以被不同的线程池执行,一个简单的listenablefuture可以有多个操作去等待。
只要一个操作开始,其他多个操作应该开始,fan-out, 千帆竞发。
listenablefuture可以实现这样的操作:它触发了所有请求的回调。
通过少量的工作,我们可以 fan-in.
触发一个listenablefuture 来获得计算结果,当其他的future结束的时候。
futures.allaslist是一个例子。
方法介绍:
方法 | 描述 |
---|---|
transformasync(listenablefuture , asyncfunction , executor) | 返回一个新的listenablefuture,它的结果是执行异步函数的返回,函数入参是listenablefuture的返回结果; |
transform(listenablefuture , function , executor) | 返回一个新的listenablefuture,它的结果是执行函数的返回,函数入参是listenablefuture的返回结果; |
allaslist(iterable |
返回一个listenablefuture,它的结果是一个list,包含每一个列表中的listenablefuture的执行结果,任何一个listenablefuture执行失败或者取消,最后的返回结果取消 |
successfullaslist(iterable |
返回一个listenablefuture,它的结果是一个list,包含每一个列表中的listenablefuture的执行结果,成功的是结果,失败或者取消的值使用null替代 |
asyncfunction<a,b> 提供了一个方法 , listenablefuture apply(a inpunt),它可以用来异步的转换值。
代码如下:
package com.xxx.demo; import com.google.common.collect.lists; import com.google.common.util.concurrent.futurecallback; import com.google.common.util.concurrent.futures; import com.google.common.util.concurrent.listenablefuture; import lombok.allargsconstructor; import lombok.data; import lombok.extern.slf4j.slf4j; import java.util.list; /** * 说明:成功执行结果汇集 * @author carter * 创建时间: 2020年03月19日 10:34 上午 **/ @slf4j public class test3 { public static void main(string[] args) { list<listenablefuture<queryresult>> querys = lists.newlinkedlist(); final listenablefuture<list<queryresult>> successfulaslist = futures.successfulaslist(querys); futures.addcallback(successfulaslist, new futurecallback<list<queryresult>>() { @override public void onsuccess(list<queryresult> queryresults) { log.info("执行结果列表:{}",queryresults); } @override public void onfailure(throwable throwable) { log.error("执行失败",throwable); } }); } @data @allargsconstructor public static class queryresult{ private integer age; } }
嵌套的future
你的代码调用一个通用接口并返回一个future,很可能最终返回一个嵌套的future.
package com.xxx.demo; import com.google.common.util.concurrent.listenablefuture; import com.google.common.util.concurrent.listeningexecutorservice; import com.google.common.util.concurrent.moreexecutors; import lombok.allargsconstructor; import lombok.data; import java.util.concurrent.callable; import java.util.concurrent.executors; /** * 说明:嵌套的listenablefuture * @author carter * 创建时间: 2020年03月19日 10:43 上午 **/ public class test4 { public static void main(string[] args) { final listeningexecutorservice executorservice = moreexecutors .listeningdecorator(executors.newfixedthreadpool(2)); final listeningexecutorservice otherexecutorservice = moreexecutors .listeningdecorator(executors.newfixedthreadpool(2)); callable<foo> othercallback = ()->new foo("aaa"); final listenablefuture<listenablefuture<foo>> submit = executorservice.submit(() -> otherexecutorservice.submit(othercallback)); } @data @allargsconstructor public static class foo{ private string name; } }
例子最后返回的是: listenablefuture<listenablefuture
但是,除非特别关注 否则 othercallback抛出的异常会被压制。 比如:transform,transformasyn, submit, submitasync方法。 原创不易,转载请注明出处。
这个代码不对,因为当外层的future 取消的时候,无法传播到内层的future,
这也是一个 使用get()检查别的future或者listnener的常规的错误,
为了避免这种情况,所有的guava的future处理方法(有些从jdk来),有 *async版本来安全的解开这个嵌套。深入研究
上一篇: C++文件读写操作
下一篇: 她是努尔哈赤的女儿,为何却一生婚姻坎坷?