欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

解密@Transactional和@Async传播有效性

程序员文章站 2022-05-03 17:10:40
...

微信公众号文章列表:关注公众号(coding_song)阅读更清晰

微信原文地址:https://mp.weixin.qq.com/s/fx1tDzezuE3NWqGpIOU7og

 

现象

Controller类中调用Service类中标有@Async或@Transactional注解的方法,此方法上的@Async或@Transactional会生效,而在Service类中A方法直接调用标有@Async或@Transactional注解的B方法,B方法上的@Async或@Transactional注解不会生效;如果A方法中重新通过动态代理生成一个Service Bean类,再通过这个新生成的Bean调用B方法,则B方法上的@Async或@Transactional注解将会生效;当B方法上同时含有@Async和@Transactional注解时,只会有一个注解生效,@Async会生效而@Transactional不会生效

原因

Spring项目中,A类调用B类的方法,是A类通过CglibAopProxy动态代理来生成B类,而B类中方法之间的调用,是直接调用其方法,不会再通过动态代理重新生成一个相同的B类,再调用目标方法;CglibAopProxy动态代理生成B类时,会获取到B类目标方法上面的所有注解(@Async或@Transactional等),并获取这些注解的拦截器和拦截器切面(AsyncAnnotationAdvisor和BeanFactoryTransactionAttributeSourceAdvisor),然后判断这些切面的是否是PointcutAdvisor的子类,方法是否匹配注解方法匹配器AnnotataionMethodMatcher,如果是,则获取这个拦截器切面对应的方法拦截器(AnnotationAsyncExceutionInterceptor或TransactionInterceptor),获取到这些方法拦截器后(如果有多个),只取第一个拦截器,如果第一个是TransactionInterceptor拦截器,则会为该方法开启一个事物;如果第一个是AnnotationAsyncExceutionInterceptor,则会为这个方法创建一个线程,让此方法运行在新建的线程中,由此起到了异步执行的作用

样例代码

Controller层代码

  1. @RestController

  2. publicclassTestController{

  3.  

  4.    @Autowired

  5.    privateTestService testService;

  6.  

  7.    @GetMapping("/test")

  8.    publicvoid test(){

  9.        testService.getId();

  10.    }

  11.  

  12. }

服务层代码及@Async和@Transactional生效说明

  1. @Service

  2. publicclassTestServiceImplimplementsTestService,ApplicationContextAware{

  3.  

  4.    privateApplicationContext applicationContext;

  5.  

  6.    @Autowired

  7.    privateTestService testService;

  8.  

  9.    @Override

  10.    publicvoid setApplicationContext(ApplicationContext applicationContext)throwsBeansException{

  11.        this.applicationContext = applicationContext;

  12.    }

  13.  

  14.    /**

  15.     * 当Controller层调用此方法时,@Transactional生效,将会开启一个事物

  16.     * @return

  17.     */

  18.    @Override

  19.    @Transactional(rollbackFor =RuntimeException.class)

  20.    publicInteger getId(){

  21.        TestService testService =(TestService) applicationContext.getBean("testServiceImpl");

  22.        Integer integer = testService.testTransactional();

  23.        return integer;

  24.    }

  25.  

  26.    /**

  27.     * 1)本类中,如果是通过applicationContext.getBean创建一个TestService后,再调用此方法,

  28.     * 由于方法上同时有@Async和@Transactional注解,获取到这两个注解的拦截器后,只会取第一个拦截器进行后续的操作,

  29.     * 所以@Async会生效,而@Transactional不会生效

  30.     * 2)如果是直接调testTransactional()方法,而不是从新生成一个新的TestService Bean来调用此方法,

  31.     * 则此类上的@Async和@Transactional注解都不会生效,因为不会走AOP动态代理CglibAopProxy,

  32.     * 而@Async和@Transactional注解生效的实现,是在CglibAopProxy中实现的

  33.     * @return

  34.     */

  35.    @Override

  36.    @Async

  37.    @Transactional(rollbackFor =RuntimeException.class)

  38.    publicInteger testTransactional(){

  39.        Integer test = test();

  40.        return test;

  41.    }

  42.  

  43.    /**

  44.     * 此方法上的@Async和@Transactional都不会生效,因为在testTransactional()是直接调用此方法

  45.     * @return

  46.     */

  47.    @Async

  48.    @Transactional(rollbackFor =RuntimeException.class)

  49.    privateInteger test(){

  50.        return10;

  51.    }

  52. }

源码分析

在Controller类中注入Service时,是通过Cglib动态代理生成的Service,所以当Controller类调用Service中的方法时,会调用CglibAopProxy内部类DynamicAdvisedInterceptor的intercept()方法

  1. @Override

  2. @Nullable

  3. publicObject intercept(Object proxy,Method method,Object[] args,MethodProxy methodProxy)throwsThrowable{

  4.    Object oldProxy =null;

  5.    boolean setProxyContext =false;

  6.    Object target =null;

  7.    TargetSource targetSource =this.advised.getTargetSource();

  8.    try{

  9.        if(this.advised.exposeProxy){

  10.            // Make invocation available if necessary.

  11.            oldProxy =AopContext.setCurrentProxy(proxy);

  12.            setProxyContext =true;

  13.        }

  14.        // Get as late as possible to minimize the time we "own" the target, in case it comes from a pool...

  15.        target = targetSource.getTarget();

  16.        Class<?> targetClass =(target !=null? target.getClass():null);

  17.        //获取方法上的所有注解的切面拦截器

  18.        List<Object> chain =this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);

  19.        Object retVal;

  20.        //当在方法上没有获取到相应注解的拦截器时,直接进入方法执行相应逻辑

  21.        if(chain.isEmpty()&&Modifier.isPublic(method.getModifiers())){

  22.            // We can skip creating a MethodInvocation: just invoke the target directly.

  23.            // Note that the final invoker must be an InvokerInterceptor, so we know

  24.            // it does nothing but a reflective operation on the target, and no hot

  25.            // swapping or fancy proxying.

  26.            Object[] argsToUse =AopProxyUtils.adaptArgumentsIfNecessary(method, args);

  27.            retVal = methodProxy.invoke(target, argsToUse);

  28.        }else{

  29.            // 获取当方法拦截器后,通过反射调用具体的拦截器方法

  30.                        //接下来先调用ReflectiveMethodInvocation的proceed方法

  31.            retVal =newCglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();

  32.        }

  33.        retVal = processReturnType(proxy, target, method, retVal);

  34.        return retVal;

  35.    }

  36.    //省略finally代码

  37. }

ReflectiveMethodInvocation.proceed方法只获取拦截器切面列表中的第一个拦截器切面,执行相应的拦截器逻辑

  1. @Override

  2. @Nullable

  3. publicObject proceed()throwsThrowable{

  4.    //  We start with an index of -1 and increment early.

  5.    if(this.currentInterceptorIndex ==this.interceptorsAndDynamicMethodMatchers.size()-1){

  6.        return invokeJoinpoint();

  7.    }

  8.  

  9.    //如果方法上有多个拦截器或拦截器切面,只取第一个拦截器或拦截器切面

  10.    Object interceptorOrInterceptionAdvice =

  11.        this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);

  12.    if(interceptorOrInterceptionAdvice instanceofInterceptorAndDynamicMethodMatcher){

  13.        // Evaluate dynamic method matcher here: static part will already have

  14.        // been evaluated and found to match.

  15.        InterceptorAndDynamicMethodMatcher dm =

  16.                (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;

  17.        Class<?> targetClass =(this.targetClass !=null?this.targetClass :this.method.getDeclaringClass());

  18.        if(dm.methodMatcher.matches(this.method, targetClass,this.arguments)){

  19.            return dm.interceptor.invoke(this);

  20.        }else{

  21.            // Dynamic matching failed.

  22.            // Skip this interceptor and invoke the next in the chain.

  23.            return proceed();

  24.        }

  25.    }

  26.    else{

  27.        // It's an interceptor, so we just invoke it: The pointcut will have

  28.        // been evaluated statically before this object was constructed.

  29.        //调用具体的拦截器

  30.        return((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);

  31.    }

  32. }

如果第一个拦截器切面是TransactionInterceptor,则调用TransactionInterceptor相关的方法,执行相应的逻辑,开启事务等

  1. @Override

  2. @Nullable

  3. publicObject invoke(MethodInvocation invocation)throwsThrowable{

  4.    // Work out the target class: may be {@code null}.

  5.    // The TransactionAttributeSource should be passed the target class

  6.    // as well as the method, which may be from an interface.

  7.    Class<?> targetClass =(invocation.getThis()!=null?AopUtils.getTargetClass(invocation.getThis()):null);

  8.  

  9.    // Adapt to TransactionAspectSupport's invokeWithinTransaction...

  10.    //适配TransactionAspectSupport的invokeWithinTransaction方法

  11.    return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);

  12. }

  13.  

  14. @Nullable

  15. protectedObject invokeWithinTransaction(Method method,@NullableClass<?> targetClass,

  16.        finalInvocationCallback invocation)throwsThrowable{

  17.  

  18.    // If the transaction attribute is null, the method is non-transactional.

  19.    TransactionAttributeSource tas = getTransactionAttributeSource();

  20.    finalTransactionAttribute txAttr =(tas !=null? tas.getTransactionAttribute(method, targetClass):null);

  21.    finalPlatformTransactionManager tm = determineTransactionManager(txAttr);

  22.    finalString joinpointIdentification = methodIdentification(method, targetClass, txAttr);

  23.  

  24.    if(txAttr ==null||!(tm instanceofCallbackPreferringPlatformTransactionManager)){

  25.        // Standard transaction demarcation with getTransaction and commit/rollback calls.

  26.        //调用事务管理器的getTransaction和commit/rollback方法,获取一个事务对象,并开启事物

  27.        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

  28.        Object retVal =null;

  29.        try{

  30.            // This is an around advice: Invoke the next interceptor in the chain.

  31.            // This will normally result in a target object being invoked.

  32.            //调用目标类的方法

  33.            retVal = invocation.proceedWithInvocation();

  34.        }

  35.        catch(Throwable ex){

  36.            // target invocation exception

  37.            completeTransactionAfterThrowing(txInfo, ex);

  38.            throw ex;

  39.        }

  40.        finally{

  41.            //执行目标方法后,关闭事务,清除事物对象

  42.            cleanupTransactionInfo(txInfo);

  43.        }

  44.        //结果返回后提交事物状态等信息

  45.        commitTransactionAfterReturning(txInfo);

  46.        return retVal;

  47.    }else{

  48.        //省略其他代码

  49.    }

  50. }

  51. /**

  52. * 根据给定的事务参数创建一个事物.

  53. * @see #getTransactionAttributeSource()

  54. */

  55. protectedTransactionInfo createTransactionIfNecessary(@NullablePlatformTransactionManager tm,

  56.        @NullableTransactionAttribute txAttr,finalString joinpointIdentification){

  57.  

  58.    // If no name specified, apply method identification as transaction name.

  59.    if(txAttr !=null&& txAttr.getName()==null){

  60.        txAttr =newDelegatingTransactionAttribute(txAttr){

  61.            @Override

  62.            publicString getName(){

  63.                return joinpointIdentification;

  64.            }

  65.        };

  66.    }

  67.  

  68.    TransactionStatus status =null;

  69.    if(txAttr !=null){

  70.        if(tm !=null){

  71.            //开启事物

  72.            status = tm.getTransaction(txAttr);

  73.        }

  74.        else{

  75.            if(logger.isDebugEnabled()){

  76.                logger.debug("Skipping transactional joinpoint ["+ joinpointIdentification +

  77.                        "] because no transaction manager has been configured");

  78.            }

  79.        }

  80.    }

  81.    //准备事物对象信息

  82.    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);

  83. }

  84. /**

  85. * 委托doGetTransaction方法,isExistingTransaction判断条件和doBegin方法处理事务传播行为

  86. * This implementation handles propagation behavior. Delegates to

  87. * {@code doGetTransaction}, {@code isExistingTransaction}

  88. * and {@code doBegin}.

  89. * @see #doGetTransaction

  90. * @see #isExistingTransaction

  91. * @see #doBegin

  92. */

  93. @Override

  94. publicfinalTransactionStatus getTransaction(@NullableTransactionDefinition definition)throwsTransactionException{

  95.    Object transaction = doGetTransaction();

  96.  

  97.    // Cache debug flag to avoid repeated checks.

  98.    boolean debugEnabled = logger.isDebugEnabled();

  99.  

  100.    if(definition ==null){

  101.        // Use defaults if no transaction definition given.

  102.        definition =newDefaultTransactionDefinition();

  103.    }

  104.    //如果已经有事务存在,直接处理已存在的事务

  105.    if(isExistingTransaction(transaction)){

  106.        // Existing transaction found -> check propagation behavior to find out how to behave.

  107.        return handleExistingTransaction(definition, transaction, debugEnabled);

  108.    }

  109.  

  110.    // Check definition settings for new transaction.

  111.    if(definition.getTimeout()<TransactionDefinition.TIMEOUT_DEFAULT){

  112.        thrownewInvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());

  113.    }

  114.  

  115.    // No existing transaction found -> check propagation behavior to find out how to proceed.

  116.    if(definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_MANDATORY){

  117.        thrownewIllegalTransactionStateException(

  118.                "No existing transaction found for transaction marked with propagation 'mandatory'");

  119.    }

  120.    elseif(definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_REQUIRED ||

  121.            definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_REQUIRES_NEW ||

  122.            definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_NESTED){

  123.        SuspendedResourcesHolder suspendedResources = suspend(null);

  124.        if(debugEnabled){

  125.            logger.debug("Creating new transaction with name ["+ definition.getName()+"]: "+ definition);

  126.        }

  127.        try{

  128.            boolean newSynchronization =(getTransactionSynchronization()!= SYNCHRONIZATION_NEVER);

  129.            DefaultTransactionStatus status = newTransactionStatus(

  130.                    definition, transaction,true, newSynchronization, debugEnabled, suspendedResources);

  131.            //开启事务

  132.            doBegin(transaction, definition);

  133.            prepareSynchronization(status, definition);

  134.            return status;

  135.        }

  136.        catch(RuntimeException|Error ex){

  137.            resume(null, suspendedResources);

  138.            throw ex;

  139.        }

  140.    }

  141.    else{

  142.        // Create "empty" transaction: no actual transaction, but potentially synchronization.

  143.        if(definition.getIsolationLevel()!=TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()){

  144.            logger.warn("Custom isolation level specified but no actual transaction initiated; "+

  145.                    "isolation level will effectively be ignored: "+ definition);

  146.        }

  147.        boolean newSynchronization =(getTransactionSynchronization()== SYNCHRONIZATION_ALWAYS);

  148.        return prepareTransactionStatus(definition,null,true, newSynchronization, debugEnabled,null);

  149.    }

  150. }

 

如果第一个拦截器切面是AsyncExecutionInterceptor,则会获取一个任务执行器,再创建一个任务,然后从线程池ThreadPoolExecutor中获取一个线程来执行这个任务

  1. @Override

  2. @Nullable

  3. publicObject invoke(finalMethodInvocation invocation)throwsThrowable{

  4.    Class<?> targetClass =(invocation.getThis()!=null?AopUtils.getTargetClass(invocation.getThis()):null);

  5.    Method specificMethod =ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);

  6.    finalMethod userDeclaredMethod =BridgeMethodResolver.findBridgedMethod(specificMethod);

  7.    //获取一个异步任务执行器

  8.    AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);

  9.    if(executor ==null){

  10.        thrownewIllegalStateException(

  11.                "No executor specified and no default executor set on AsyncExecutionInterceptor either");

  12.    }

  13.    //创建一个任务

  14.    Callable<Object> task =()->{

  15.        try{

  16.            Object result = invocation.proceed();

  17.            if(result instanceofFuture){

  18.                return((Future<?>) result).get();

  19.            }

  20.        }

  21.        catch(ExecutionException ex){

  22.            handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());

  23.        }

  24.        catch(Throwable ex){

  25.            handleError(ex, userDeclaredMethod, invocation.getArguments());

  26.        }

  27.        returnnull;

  28.    };

  29.    //最终会从ThreadPoolExecutor获取一个线程来提交需要执行的任务

  30.    return doSubmit(task, executor, invocation.getMethod().getReturnType());

  31. }

  32. /**

  33. * AsyncExecutionAspectSupport.doSubmit()

  34. * 提交任务

  35. */

  36. @Nullable

  37. protectedObject doSubmit(Callable<Object> task,AsyncTaskExecutor executor,Class<?> returnType){

  38.    if(CompletableFuture.class.isAssignableFrom(returnType)){

  39.        returnCompletableFuture.supplyAsync(()->{

  40.            try{

  41.                return task.call();

  42.            }

  43.            catch(Throwable ex){

  44.                thrownewCompletionException(ex);

  45.            }

  46.        }, executor);

  47.    }

  48.    elseif(ListenableFuture.class.isAssignableFrom(returnType)){

  49.        return((AsyncListenableTaskExecutor) executor).submitListenable(task);

  50.    }

  51.    elseif(Future.class.isAssignableFrom(returnType)){

  52.        return executor.submit(task);

  53.    }

  54.    else{

  55.        //继续提交任务

  56.        executor.submit(task);

  57.        returnnull;

  58.    }

  59. }

  60. /**

  61. * ThreadPoolTaskExecutor.submit() 从线程池中获取一个线程来执行任务

  62. */

  63. public<T>Future<T> submit(Callable<T> task){

  64.    ThreadPoolExecutor executor =this.getThreadPoolExecutor();

  65.  

  66.    try{

  67.        return executor.submit(task);

  68.    }catch(RejectedExecutionException var4){

  69.        thrownewTaskRejectedException("Executor ["+ executor +"] did not accept task: "+ task, var4);

  70.    }

  71. }

 

解决方案

在Service中A方法调用B方法,要使B方法上的@Async或@Transcational注解生效,可在A方法中通过ApplicationContext.getBean()来从新获取一个新的Service对象,代码实现如下

  1. @Service

  2. publicclassTestServiceImplimplementsTestService,ApplicationContextAware{

  3.    //省略其他代码

  4.    @Override

  5.    @Transactional(rollbackFor =RuntimeException.class)

  6.    publicInteger getId(){

  7.        TestService testService =(TestService) applicationContext.getBean("testServiceImpl");

  8.        Integer integer = testService.testTransactional();

  9.        return integer;

  10.    }

  11.  

  12.    @Override

  13.    @Transactional(rollbackFor =RuntimeException.class)

  14.    publicInteger testTransactional(){

  15.        Integer test = test();

  16.        return test;

  17.    }

  18. }

上述testTransactional()方法上的@Transactional将生效

相关标签: Transactional Async