Spring @Async 的使用与实现的示例代码
首先spring aop有两个重要的基础接口,advisor和pointcutadvisor,接口声明如下:
advisor接口声明:
public interface advisor { advice getadvice(); boolean isperinstance(); }
pointcutadvisor的接口声明:
public interface pointcutadvisor extends advisor { /** * get the pointcut that drives this advisor. */ pointcut getpointcut(); }
pointcutadvisor用来获取一个切点以及这个切点的处理器(advise)。
@async注解使用后置处理器beanpostprocessor的子类asyncannotationbeanpostprocessor来实现bean处理 :
asyncannotationadvisor继承了pointcutadvisor接口。并且在asyncannotationbeanpostprocessor实现了其父类接口的beanfactoryaware中的setbeanfactory初始化。spring一旦创建beanfactory回调成功,就会回调这个方法。保证advisor对象最先被初始化。
@override public void setbeanfactory(beanfactory beanfactory) { super.setbeanfactory(beanfactory); asyncannotationadvisor advisor = new asyncannotationadvisor(this.executor, this.exceptionhandler); if (this.asyncannotationtype != null) { advisor.setasyncannotationtype(this.asyncannotationtype); } advisor.setbeanfactory(beanfactory); this.advisor = advisor; } }
具体的后置处理是通过asyncannotationbeanpostprocessor的后置bean处理是通过其父类abstractadvisingbeanpostprocessor来实现的。abstractadvisingbeanpostprocessor提供的后置bean处理方法对所有的自定义注解的bean处理方法时通用的。其具体的代码如下:
@override public object postprocessafterinitialization(object bean, string beanname) { if (bean instanceof aopinfrastructurebean) { // ignore aop infrastructure such as scoped proxies. return bean; } /* * bean对象如果是一个proxyfactory对象。proxyfactory继承了advisedsupport,而 advisedsupport又继承了advised接口。这个时候就把不同的advisor添加起来。 * if (bean instanceof advised) { advised advised = (advised) bean; if (!advised.isfrozen() && iseligible(aoputils.gettargetclass(bean))) { // add our local advisor to the existing proxy's advisor chain... if (this.beforeexistingadvisors) { advised.addadvisor(0, this.advisor); } else { advised.addadvisor(this.advisor); } return bean; } } if (iseligible(bean, beanname)) { proxyfactory proxyfactory = prepareproxyfactory(bean, beanname); if (!proxyfactory.isproxytargetclass()) { evaluateproxyinterfaces(bean.getclass(), proxyfactory); } proxyfactory.addadvisor(this.advisor); customizeproxyfactory(proxyfactory); return proxyfactory.getproxy(getproxyclassloader()); }
可以看得出来,iseligible用于判断这个类或者这个类中的某个方法是否含有注解。这个方法最终进入到aoputils的canapply方法中间:
public static boolean canapply(advisor advisor, class<?> targetclass, boolean hasintroductions) { if (advisor instanceof introductionadvisor) { return ((introductionadvisor) advisor).getclassfilter().matches(targetclass); } else if (advisor instanceof pointcutadvisor) { pointcutadvisor pca = (pointcutadvisor) advisor; return canapply(pca.getpointcut(), targetclass, hasintroductions); } else { // it doesn't have a pointcut so we assume it applies. return true; } }
这里的advisor就是asyncannotationadvisor对象。然后调用asyncannotationadvisor对象的getpointcut()方法,得到了pointcut对象。在aop规范中间,表示一个具体的切点。那么在方法上注释@async注解,就意味着声明了一个切点。
然后再根据pointcut判断是否含有指定的注解。
切点的执行
由于生成了jdk动态代理对象,那么每一个方法的执行必然进入到jdkdynamicaopproxy中的invoke方法中间去执行:
@override public object invoke(object proxy, method method, object[] args) throws throwable { methodinvocation invocation; object oldproxy = null; boolean setproxycontext = false; targetsource targetsource = this.advised.targetsource; class<?> targetclass = null; object target = null; try { if (!this.equalsdefined && aoputils.isequalsmethod(method)) { // the target does not implement the equals(object) method itself. return equals(args[0]); } else if (!this.hashcodedefined && aoputils.ishashcodemethod(method)) { // the target does not implement the hashcode() method itself. return hashcode(); } else if (method.getdeclaringclass() == decoratingproxy.class) { // there is only getdecoratedclass() declared -> dispatch to proxy config. return aopproxyutils.ultimatetargetclass(this.advised); } else if (!this.advised.opaque && method.getdeclaringclass().isinterface() && method.getdeclaringclass().isassignablefrom(advised.class)) { // service invocations on proxyconfig with the proxy config... return aoputils.invokejoinpointusingreflection(this.advised, method, args); } object retval; if (this.advised.exposeproxy) { // make invocation available if necessary. oldproxy = aopcontext.setcurrentproxy(proxy); setproxycontext = true; } // may be null. get as late as possible to minimize the time we "own" the target, // in case it comes from a pool. target = targetsource.gettarget(); if (target != null) { targetclass = target.getclass(); } // get the interception chain for this method. list<object> chain = this.advised.getinterceptorsanddynamicinterceptionadvice(method, targetclass); // check whether we have any advice. if we don't, we can fallback on direct // reflective invocation of the target, and avoid creating a methodinvocation. if (chain.isempty()) { // we can skip creating a methodinvocation: just invoke the target directly // note that the final invoker must be an invokerinterceptor so we know it does // nothing but a reflective operation on the target, and no hot swapping or fancy proxying. object[] argstouse = aopproxyutils.adaptargumentsifnecessary(method, args); retval = aoputils.invokejoinpointusingreflection(target, method, argstouse); } else { // we need to create a method invocation... invocation = new reflectivemethodinvocation(proxy, target, method, args, targetclass, chain); // proceed to the joinpoint through the interceptor chain. retval = invocation.proceed(); } // massage return value if necessary. class<?> returntype = method.getreturntype(); if (retval != null && retval == target && returntype.isinstance(proxy) && !rawtargetaccess.class.isassignablefrom(method.getdeclaringclass())) { // special case: it returned "this" and the return type of the method // is type-compatible. note that we can't help if the target sets // a reference to itself in another returned object. retval = proxy; } else if (retval == null && returntype != void.type && returntype.isprimitive()) { throw new aopinvocationexception( "null return value from advice does not match primitive return type for: " + method); } return retval; } finally { if (target != null && !targetsource.isstatic()) { // must have come from targetsource. targetsource.releasetarget(target); } if (setproxycontext) { // restore old proxy. aopcontext.setcurrentproxy(oldproxy); } } }
重点的执行语句:
// 获取拦截器 list<object> chain = this.advised.getinterceptorsanddynamicinterceptionadvice(method, targetclass); // check whether we have any advice. if we don't, we can fallback on direct // reflective invocation of the target, and avoid creating a methodinvocation. if (chain.isempty()) { // we can skip creating a methodinvocation: just invoke the target directly // note that the final invoker must be an invokerinterceptor so we know it does // nothing but a reflective operation on the target, and no hot swapping or fancy proxying. object[] argstouse = aopproxyutils.adaptargumentsifnecessary(method, args); retval = aoputils.invokejoinpointusingreflection(target, method, argstouse); } else { // 根据拦截器来执行 invocation = new reflectivemethodinvocation(proxy, target, method, args, targetclass, chain); // proceed to the joinpoint through the interceptor chain. retval = invocation.proceed(); }
@async注解的拦截器是asyncexecutioninterceptor,它继承了methodinterceptor接口。而methodinterceptor就是aop规范中的advice(切点的处理器)。
自定义注解
由于其bean处理器是通用的,所以只要实现pointcutadvisor和具体的处理器就好了。首先自定义一个注解,只要方法加入了这个注解,就可以输出这个方法的开始时间和截止时间,注解的名字叫做@log:
@target({elementtype.method, elementtype.type}) @retention(retentionpolicy.runtime) @documented public @interface log { }
定义一个简单的方法用于测试:
public interface idemoservice { void add(int a, int b); string getname(); } @service public class demoserviceimpl implements idemoservice { @log public void add(int a, int b) { system.out.println(thread.currentthread().getname()); system.out.println(a + b); } @override public string getname() { system.out.println("demoserviceimpl.getname"); return "demoserviceimpl"; } }
定义advisor:
public class logannotationadvisor extends abstractpointcutadvisor { private advice advice; private pointcut pointcut; public logannotationadvisor() { this.advice = new logannotationinterceptor(); } @override public advice getadvice() { return this.advice; } @override public boolean isperinstance() { return false; } @override public pointcut getpointcut() { return this.pointcut; } public void setasyncannotationtype(class<? extends annotation> asyncannotationtype) { assert.notnull(asyncannotationtype, "'asyncannotationtype' must not be null"); set<class<? extends annotation>> asyncannotationtypes = new hashset<class<? extends annotation>>(); asyncannotationtypes.add(asyncannotationtype); this.pointcut = buildpointcut(asyncannotationtypes); } protected pointcut buildpointcut(set<class<? extends annotation>> asyncannotationtypes) { composablepointcut result = null; for (class<? extends annotation> asyncannotationtype : asyncannotationtypes) { pointcut cpc = new annotationmatchingpointcut(asyncannotationtype, true); pointcut mpc = annotationmatchingpointcut.formethodannotation(asyncannotationtype); if (result == null) { result = new composablepointcut(cpc).union(mpc); } else { result.union(cpc).union(mpc); } } return result; } }
定义具体的处理器:
public class logannotationinterceptor implements methodinterceptor, ordered { @override public int getorder() { return ordered.highest_precedence; } @override public object invoke(methodinvocation invocation) throws throwable { system.out.println("开始执行"); object result = invocation.proceed(); system.out.println("结束执行"); return result; } }
定义@log专属的beanpostprocesser对象:
@suppresswarnings("serial") @service public class logannotationbeanpostprocesser extends abstractbeanfactoryawareadvisingpostprocessor { @override public void setbeanfactory(beanfactory beanfactory) { super.setbeanfactory(beanfactory); logannotationadvisor advisor = new logannotationadvisor(); advisor.setasyncannotationtype(log.class); this.advisor = advisor; } }
对bean的后置处理方法直接沿用其父类的方法。当然也可以自定义其后置处理方法,那么就需要自己判断这个对象的方法是否含有注解,并且生成代理对象:
@override public object postprocessafterinitialization(object bean, string beanname) { method[] methods = reflectionutils.getalldeclaredmethods(bean.getclass()); for (method method : methods) { if (method.isannotationpresent(log.class)) { proxyfactory proxyfactory = prepareproxyfactory(bean, beanname); system.out.println(proxyfactory); if (!proxyfactory.isproxytargetclass()) { evaluateproxyinterfaces(bean.getclass(), proxyfactory); } proxyfactory.addadvisor(this.advisor); customizeproxyfactory(proxyfactory); return proxyfactory.getproxy(getproxyclassloader()); } } return bean; }
测试注解是否是正常运行的:
public class main { public static void main(string[] args) { @suppresswarnings("resource") classpathxmlapplicationcontext context = new classpathxmlapplicationcontext("application-context.xml"); idemoservice demoservice = context.getbean(idemoservice.class); demoservice.add(1, 2); demoservice.getname(); //// asyncannotationadvisor // asyncannotationbeanpostprocessor } }
输出:
开始执行 main 3 结束执行 demoserviceimpl.getname
功能一切正常。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。