【原】手写spring async异步组件
程序员文章站
2022-10-05 21:54:51
最近在工作中使用到了spring自带的Async,主要是为了把其中耗时多、响应慢、计算复杂的业务抽取几个模块出来,并行查询。不得不说spring自带的比传统线程池提交在代码层次上看起来优雅简洁了不少,直接返回一个AsyncResult,然后调用方通过Future接收。今天就打算自己手动实现这个异步组 ......
最近在工作中使用到了spring自带的async,主要是为了把其中耗时多、响应慢、计算复杂的业务抽取几个模块出来,并行查询。不得不说spring自带的比传统线程池提交在代码层次上看起来优雅简洁了不少,直接返回一个asyncresult,然后调用方通过future接收。今天就打算自己手动实现这个异步组件,并借此学习其中的原理。先思考一个问题,为什么直接调用一个普通的方法就能实现异步?第一个想到的就是代理,所以本章就从代理模式出发。
总体实现思路流程:客户端调用后-》通过代理模式代理-》重写submit方法并返回future-》把future 放到自定义的异步返回包装类-》客户端直接拿到返回的future 进行get。
1.新建代理接口
public interface iasyncproxy {
/**
* 获取代理对象
* 1. 如果是实现了接口,默认使用 dynamic proxy 即可。
* 2. 如果没有实现接口,默认使用 cglib 实现代理。
* @return 代理对象
*/
object proxy();
}
由于代理分2种,接口的代理还有无接口的代理,所以这里定义成接口方便扩展。
2.新建动态代理实现类
public class dynamicproxy implements invocationhandler, iasyncproxy {
/**
* 被代理的对象
*/
private final object target;
public dynamicproxy(object target) {
this.target = target;
}
/**
* 这种方式虽然实现了异步执行
*
* @param proxy 原始对象
* @param method 方法
* @param args 入参
* @return 结果
* @throws throwable 异常
*/
@override
@suppresswarnings("all")
public object invoke(object proxy, method method, object[] args) throws throwable {
return asyncexecutor.submit(target, method, args);
}
@override
public object proxy() {
// 我们要代理哪个真实对象,就将该对象传进去,最后是通过该真实对象来调用其方法的
invocationhandler handler = new dynamicproxy(target);
return proxy.newproxyinstance(handler.getclass().getclassloader(),
target.getclass().getinterfaces(), handler);
}
}
由于篇幅有限,这里只讲动态代理,至于cglib代理实现方案和接口代理原理是一致的。最终要的是代理之后我们要如何交给异步处理,所以在invoke方法内,我通过线程池去提交一个任务,细心的可以发现asyncexecutor在jdk包里是没有的,这个类是我自己定义的。至于原因有以下几个:
1.jdk自带的executorservice的submit方法无法满足, 所以需要重新实现executorservice做扩展,重写submit方法。
2.submit之后需要包装统一的返回结果,保持 实现类边返回的类型和代理返回类型一致。
3.定义异步接口
/**
* <p> 异步执行结果 </p>
/
public interface iasyncresult<t> extends future<t> {
/**
* 获取执行的结果
* @return 结果
*/
object getresult();
}
public abstract class abstractasyncresult<t> implements iasyncresult<t> {
@override
public boolean cancel(boolean mayinterruptifrunning) {
return false;
}
@override
public boolean iscancelled() {
return false;
}
@override
public boolean isdone() {
return false;
}
@override
public t get() throws interruptedexception, executionexception {
try {
return this.get(asyncconstant.default_time_out, timeunit.seconds);
} catch (timeoutexception e) {
throw new runtimeexception(e);
}
}
}
/**
* 异步执行结果
*/
public class asyncresult<t> extends abstractasyncresult<t> {
/**
* future 信息
*/
private future<t> future;
/**
* 结果
*/
private object value;
/**
* 获取执行的结果
* @return 结果
*/
@override
public object getresult() {
// 直接返回结果
if(future == null) {
return this.getvalue();
}
try {
t t = future.get();
// 这里拿到的 asyncresult 对象
if(null != t) {
return ((asyncresult)t).getvalue();
}
return null;
} catch (interruptedexception | executionexception e) {
throw new runtimeexception(e);
}
}
@override
public t get(long timeout, timeunit unit) throws interruptedexception, executionexception, timeoutexception {
return future.get(timeout, unit);
}
public object getvalue() {
return this.value;
}
public void setvalue(object value) {
this.value = value;
}
public void setfuture(future<t> future) {
this.future = future;
}
}
4.定义一个异步接口,继承executorservice
/**
* <p> 异步框架执行器 </p>
public interface iasyncexecutor extends executorservice {
}
package com.github.houbb.async.core.executor;
import java.util.concurrent.*;
/**
* 异步执行器
*/
public class asyncexecutor extends threadpoolexecutor implements iasyncexecutor {
//region 私有属性
/**
* 是否初始化
*/
private static volatile boolean isinit = false;
/**
* 是否被销毁
*/
private static volatile boolean isdestroy = false;
/**
* 线程执行器
*/
private static executorservice executorservice = null;
//endregion
//region 构造器
public asyncexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue) {
super(corepoolsize, maximumpoolsize, keepalivetime, unit, workqueue);
}
public asyncexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, threadfactory threadfactory) {
super(corepoolsize, maximumpoolsize, keepalivetime, unit, workqueue, threadfactory);
}
public asyncexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, rejectedexecutionhandler handler) {
super(corepoolsize, maximumpoolsize, keepalivetime, unit, workqueue, handler);
}
public asyncexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, threadfactory threadfactory, rejectedexecutionhandler handler) {
super(corepoolsize, maximumpoolsize, keepalivetime, unit, workqueue, threadfactory, handler);
}
//endregion
@suppresswarnings("all")
public static <t> iasyncresult<t> submit(object target, method method, object[] objects) {
// 初始化的判断
if(!isinit) {
init();
}
future future = executorservice.submit(new runnable() {
@override
public void run() {
try {
method.invoke(target, objects);
} catch (illegalaccessexception | illegalargumentexception | invocationtargetexception e) {
// todo auto-generated catch block
e.printstacktrace();
}
}
});
//future future = executorservice.submit(() -> method.invoke(target, objects));
asyncresult<t> asyncresult = new asyncresult<>();
asyncresult.setfuture(future);
return asyncresult;
}
/**
* 初始化
* 1. 暂时不添加配置相关的信息
* 2. 最后调整状态
*/
private static synchronized void init() {
try {
if(isinit) {
return;
}
// 各种属性配置
// 淘汰策略
// 最佳线程数量
executorservice = executors.newfixedthreadpool(10);
updateexecutorstatus(true);
} catch (exception e) {
throw new asyncruntimeexception(e);
}
}
/**
* 销毁容器
* 1. 销毁的时候进行等待,确保任务的正常执行完成。
* 2. 任务执行的统计信息,后期添加。
*/
private static synchronized void destroy() {
if(isdestroy) {
return;
}
executorservice = null;
updateexecutorstatus(false);
}
/**
* 更新执行器的状态
* @param initstatus 初始化状态
*/
private static void updateexecutorstatus(final boolean initstatus) {
isinit = initstatus;
isdestroy = !isinit;
}
}
推荐阅读
-
Spring @Async异步执行方法
-
浅谈Spring @Async异步线程池用法总结
-
Spring中@Async注解执行异步任务的方法
-
浅谈Spring @Async异步线程池用法总结
-
Spring中@Async注解执行异步任务的方法
-
Spring Boot @Async 异步任务执行方法
-
spring boot使用自定义配置的线程池执行Async异步任务
-
Spring Boot利用@Async如何实现异步调用:自定义线程池
-
Spring Boot利用@Async异步调用:使用Future及定义超时详解
-
Spring Boot利用@Async异步调用:ThreadPoolTaskScheduler线程池的优雅关闭详解