欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spring @Async 的使用与实现的示例代码

程序员文章站 2024-03-31 11:00:16
首先spring aop有两个重要的基础接口,advisor和pointcutadvisor,接口声明如下: advisor接口声明: public inter...

首先spring aop有两个重要的基础接口,advisor和pointcutadvisor,接口声明如下:

advisor接口声明:

public interface advisor {
  advice getadvice();
  boolean isperinstance();

}

pointcutadvisor的接口声明:

public interface pointcutadvisor extends advisor {

  /**
   * get the pointcut that drives this advisor.
   */
  pointcut getpointcut();

}

pointcutadvisor用来获取一个切点以及这个切点的处理器(advise)。

@async注解使用后置处理器beanpostprocessor的子类asyncannotationbeanpostprocessor来实现bean处理 :

asyncannotationadvisor继承了pointcutadvisor接口。并且在asyncannotationbeanpostprocessor实现了其父类接口的beanfactoryaware中的setbeanfactory初始化。spring一旦创建beanfactory回调成功,就会回调这个方法。保证advisor对象最先被初始化。

  @override
  public void setbeanfactory(beanfactory beanfactory) {
    super.setbeanfactory(beanfactory);

    asyncannotationadvisor advisor = new asyncannotationadvisor(this.executor, this.exceptionhandler);
    if (this.asyncannotationtype != null) {
      advisor.setasyncannotationtype(this.asyncannotationtype);
    }
    advisor.setbeanfactory(beanfactory);
    this.advisor = advisor;
  }

}

具体的后置处理是通过asyncannotationbeanpostprocessor的后置bean处理是通过其父类abstractadvisingbeanpostprocessor来实现的。abstractadvisingbeanpostprocessor提供的后置bean处理方法对所有的自定义注解的bean处理方法时通用的。其具体的代码如下:

@override
  public object postprocessafterinitialization(object bean, string beanname) {
    if (bean instanceof aopinfrastructurebean) {
      // ignore aop infrastructure such as scoped proxies.
      return bean;
    }
   
   /*
   
   * bean对象如果是一个proxyfactory对象。proxyfactory继承了advisedsupport,而    advisedsupport又继承了advised接口。这个时候就把不同的advisor添加起来。
   *
    if (bean instanceof advised) {
      advised advised = (advised) bean;
      if (!advised.isfrozen() && iseligible(aoputils.gettargetclass(bean))) {
        // add our local advisor to the existing proxy's advisor chain...
        if (this.beforeexistingadvisors) {
          advised.addadvisor(0, this.advisor);
        }
        else {
          advised.addadvisor(this.advisor);
        }
        return bean;
      }
    }

    if (iseligible(bean, beanname)) {
      proxyfactory proxyfactory = prepareproxyfactory(bean, beanname);
      if (!proxyfactory.isproxytargetclass()) {
        evaluateproxyinterfaces(bean.getclass(), proxyfactory);
      }
      proxyfactory.addadvisor(this.advisor);
      customizeproxyfactory(proxyfactory);
      return proxyfactory.getproxy(getproxyclassloader());
    }

可以看得出来,iseligible用于判断这个类或者这个类中的某个方法是否含有注解。这个方法最终进入到aoputils的canapply方法中间:

public static boolean canapply(advisor advisor, class<?> targetclass, boolean hasintroductions) {
    if (advisor instanceof introductionadvisor) {
      return ((introductionadvisor) advisor).getclassfilter().matches(targetclass);
    }
    else if (advisor instanceof pointcutadvisor) {
      pointcutadvisor pca = (pointcutadvisor) advisor;
      return canapply(pca.getpointcut(), targetclass, hasintroductions);
    }
    else {
      // it doesn't have a pointcut so we assume it applies.
      return true;
    }
  }

这里的advisor就是asyncannotationadvisor对象。然后调用asyncannotationadvisor对象的getpointcut()方法,得到了pointcut对象。在aop规范中间,表示一个具体的切点。那么在方法上注释@async注解,就意味着声明了一个切点。

然后再根据pointcut判断是否含有指定的注解。

切点的执行

由于生成了jdk动态代理对象,那么每一个方法的执行必然进入到jdkdynamicaopproxy中的invoke方法中间去执行:

@override
  public object invoke(object proxy, method method, object[] args) throws throwable {
    methodinvocation invocation;
    object oldproxy = null;
    boolean setproxycontext = false;

    targetsource targetsource = this.advised.targetsource;
    class<?> targetclass = null;
    object target = null;

    try {
      if (!this.equalsdefined && aoputils.isequalsmethod(method)) {
        // the target does not implement the equals(object) method itself.
        return equals(args[0]);
      }
      else if (!this.hashcodedefined && aoputils.ishashcodemethod(method)) {
        // the target does not implement the hashcode() method itself.
        return hashcode();
      }
      else if (method.getdeclaringclass() == decoratingproxy.class) {
        // there is only getdecoratedclass() declared -> dispatch to proxy config.
        return aopproxyutils.ultimatetargetclass(this.advised);
      }
      else if (!this.advised.opaque && method.getdeclaringclass().isinterface() &&
          method.getdeclaringclass().isassignablefrom(advised.class)) {
        // service invocations on proxyconfig with the proxy config...
        return aoputils.invokejoinpointusingreflection(this.advised, method, args);
      }

      object retval;

      if (this.advised.exposeproxy) {
        // make invocation available if necessary.
        oldproxy = aopcontext.setcurrentproxy(proxy);
        setproxycontext = true;
      }

      // may be null. get as late as possible to minimize the time we "own" the target,
      // in case it comes from a pool.
      target = targetsource.gettarget();
      if (target != null) {
        targetclass = target.getclass();
      }

      // get the interception chain for this method.
      list<object> chain = this.advised.getinterceptorsanddynamicinterceptionadvice(method, targetclass);

      // check whether we have any advice. if we don't, we can fallback on direct
      // reflective invocation of the target, and avoid creating a methodinvocation.
      if (chain.isempty()) {
        // we can skip creating a methodinvocation: just invoke the target directly
        // note that the final invoker must be an invokerinterceptor so we know it does
        // nothing but a reflective operation on the target, and no hot swapping or fancy proxying.
        object[] argstouse = aopproxyutils.adaptargumentsifnecessary(method, args);
        retval = aoputils.invokejoinpointusingreflection(target, method, argstouse);
      }
      else {
        // we need to create a method invocation...
        invocation = new reflectivemethodinvocation(proxy, target, method, args, targetclass, chain);
        // proceed to the joinpoint through the interceptor chain.
        retval = invocation.proceed();
      }

      // massage return value if necessary.
      class<?> returntype = method.getreturntype();
      if (retval != null && retval == target && returntype.isinstance(proxy) &&
          !rawtargetaccess.class.isassignablefrom(method.getdeclaringclass())) {
        // special case: it returned "this" and the return type of the method
        // is type-compatible. note that we can't help if the target sets
        // a reference to itself in another returned object.
        retval = proxy;
      }
      else if (retval == null && returntype != void.type && returntype.isprimitive()) {
        throw new aopinvocationexception(
            "null return value from advice does not match primitive return type for: " + method);
      }
      return retval;
    }
    finally {
      if (target != null && !targetsource.isstatic()) {
        // must have come from targetsource.
        targetsource.releasetarget(target);
      }
      if (setproxycontext) {
        // restore old proxy.
        aopcontext.setcurrentproxy(oldproxy);
      }
    }
  }

重点的执行语句:

// 获取拦截器
      list<object> chain = this.advised.getinterceptorsanddynamicinterceptionadvice(method, targetclass);

      // check whether we have any advice. if we don't, we can fallback on direct
      // reflective invocation of the target, and avoid creating a methodinvocation.
      if (chain.isempty()) {
        // we can skip creating a methodinvocation: just invoke the target directly
        // note that the final invoker must be an invokerinterceptor so we know it does
        // nothing but a reflective operation on the target, and no hot swapping or fancy proxying.
        object[] argstouse = aopproxyutils.adaptargumentsifnecessary(method, args);
        retval = aoputils.invokejoinpointusingreflection(target, method, argstouse);
      }
      else {
      
        // 根据拦截器来执行
        invocation = new reflectivemethodinvocation(proxy, target, method, args, targetclass, chain);
        // proceed to the joinpoint through the interceptor chain.
        retval = invocation.proceed();
      }

@async注解的拦截器是asyncexecutioninterceptor,它继承了methodinterceptor接口。而methodinterceptor就是aop规范中的advice(切点的处理器)。

自定义注解

由于其bean处理器是通用的,所以只要实现pointcutadvisor和具体的处理器就好了。首先自定义一个注解,只要方法加入了这个注解,就可以输出这个方法的开始时间和截止时间,注解的名字叫做@log:

@target({elementtype.method, elementtype.type})
@retention(retentionpolicy.runtime)
@documented
public @interface log {

}

定义一个简单的方法用于测试:

public interface idemoservice {

  void add(int a, int b);
  string getname();
}
@service
public class demoserviceimpl implements idemoservice {

  
  @log
  
  public void add(int a, int b) {
    system.out.println(thread.currentthread().getname());
    system.out.println(a + b);

  }

  @override
  public string getname() {
    system.out.println("demoserviceimpl.getname");
    return "demoserviceimpl";
  }

}

定义advisor:

public class logannotationadvisor extends abstractpointcutadvisor {

  private advice advice;

  private pointcut pointcut;

  public logannotationadvisor() {

    this.advice = new logannotationinterceptor();
  }


  @override
  public advice getadvice() {

    return this.advice;
  }

  @override
  public boolean isperinstance() {

    return false;
  }

  @override
  public pointcut getpointcut() {

    return this.pointcut;
  }

  public void setasyncannotationtype(class<? extends annotation> asyncannotationtype) {
    assert.notnull(asyncannotationtype, "'asyncannotationtype' must not be null");
    set<class<? extends annotation>> asyncannotationtypes = new hashset<class<? extends annotation>>();
    asyncannotationtypes.add(asyncannotationtype);
    this.pointcut = buildpointcut(asyncannotationtypes);
  }

  protected pointcut buildpointcut(set<class<? extends annotation>> asyncannotationtypes) {
    composablepointcut result = null;
    for (class<? extends annotation> asyncannotationtype : asyncannotationtypes) {
      pointcut cpc = new annotationmatchingpointcut(asyncannotationtype, true);
      pointcut mpc = annotationmatchingpointcut.formethodannotation(asyncannotationtype);
      if (result == null) {
        result = new composablepointcut(cpc).union(mpc);
      } else {
        result.union(cpc).union(mpc);
      }
    }
    return result;
  }

}

定义具体的处理器:

public class logannotationinterceptor implements methodinterceptor, ordered {

  @override
  public int getorder() {

    return ordered.highest_precedence;
  }

  @override
  public object invoke(methodinvocation invocation) throws throwable {
    system.out.println("开始执行");
    object result = invocation.proceed();
    system.out.println("结束执行");
    return result;
  }

}

定义@log专属的beanpostprocesser对象:

@suppresswarnings("serial")
@service
public class logannotationbeanpostprocesser extends abstractbeanfactoryawareadvisingpostprocessor {

  @override
  public void setbeanfactory(beanfactory beanfactory) {
    super.setbeanfactory(beanfactory);
    logannotationadvisor advisor = new logannotationadvisor();
    advisor.setasyncannotationtype(log.class);
    this.advisor = advisor;
  }



}

对bean的后置处理方法直接沿用其父类的方法。当然也可以自定义其后置处理方法,那么就需要自己判断这个对象的方法是否含有注解,并且生成代理对象:

@override
  public object postprocessafterinitialization(object bean, string beanname) {

    method[] methods = reflectionutils.getalldeclaredmethods(bean.getclass());
    for (method method : methods) {
      if (method.isannotationpresent(log.class)) {
        proxyfactory proxyfactory = prepareproxyfactory(bean, beanname);
        system.out.println(proxyfactory);
        if (!proxyfactory.isproxytargetclass()) {
          evaluateproxyinterfaces(bean.getclass(), proxyfactory);
        }
        proxyfactory.addadvisor(this.advisor);  
        customizeproxyfactory(proxyfactory);
        return proxyfactory.getproxy(getproxyclassloader());
      }
    }
    return bean;

  }

测试注解是否是正常运行的:

public class main {
  public static void main(string[] args) {
    @suppresswarnings("resource")
    classpathxmlapplicationcontext context = new classpathxmlapplicationcontext("application-context.xml"); 
     idemoservice demoservice = context.getbean(idemoservice.class);   
     demoservice.add(1, 2);
     demoservice.getname();
////     asyncannotationadvisor
//    asyncannotationbeanpostprocessor
     
    
  }
}

输出:

开始执行
main
3
结束执行
demoserviceimpl.getname

功能一切正常。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。