解密@Transactional和@Async传播有效性
微信公众号文章列表:关注公众号(coding_song)阅读更清晰
微信原文地址:https://mp.weixin.qq.com/s/fx1tDzezuE3NWqGpIOU7og
现象
Controller类中调用Service类中标有@Async或@Transactional注解的方法,此方法上的@Async或@Transactional会生效,而在Service类中A方法直接调用标有@Async或@Transactional注解的B方法,B方法上的@Async或@Transactional注解不会生效;如果A方法中重新通过动态代理生成一个Service Bean类,再通过这个新生成的Bean调用B方法,则B方法上的@Async或@Transactional注解将会生效;当B方法上同时含有@Async和@Transactional注解时,只会有一个注解生效,@Async会生效而@Transactional不会生效
原因
Spring项目中,A类调用B类的方法,是A类通过CglibAopProxy动态代理来生成B类,而B类中方法之间的调用,是直接调用其方法,不会再通过动态代理重新生成一个相同的B类,再调用目标方法;CglibAopProxy动态代理生成B类时,会获取到B类目标方法上面的所有注解(@Async或@Transactional等),并获取这些注解的拦截器和拦截器切面(AsyncAnnotationAdvisor和BeanFactoryTransactionAttributeSourceAdvisor),然后判断这些切面的是否是PointcutAdvisor的子类,方法是否匹配注解方法匹配器AnnotataionMethodMatcher,如果是,则获取这个拦截器切面对应的方法拦截器(AnnotationAsyncExceutionInterceptor或TransactionInterceptor),获取到这些方法拦截器后(如果有多个),只取第一个拦截器,如果第一个是TransactionInterceptor拦截器,则会为该方法开启一个事物;如果第一个是AnnotationAsyncExceutionInterceptor,则会为这个方法创建一个线程,让此方法运行在新建的线程中,由此起到了异步执行的作用
样例代码
Controller层代码
-
@RestController
-
publicclassTestController{
-
-
@Autowired
-
privateTestService testService;
-
-
@GetMapping("/test")
-
publicvoid test(){
-
testService.getId();
-
}
-
-
}
服务层代码及@Async和@Transactional生效说明
-
@Service
-
publicclassTestServiceImplimplementsTestService,ApplicationContextAware{
-
-
privateApplicationContext applicationContext;
-
-
@Autowired
-
privateTestService testService;
-
-
@Override
-
publicvoid setApplicationContext(ApplicationContext applicationContext)throwsBeansException{
-
this.applicationContext = applicationContext;
-
}
-
-
/**
-
* 当Controller层调用此方法时,@Transactional生效,将会开启一个事物
-
* @return
-
*/
-
@Override
-
@Transactional(rollbackFor =RuntimeException.class)
-
publicInteger getId(){
-
TestService testService =(TestService) applicationContext.getBean("testServiceImpl");
-
Integer integer = testService.testTransactional();
-
return integer;
-
}
-
-
/**
-
* 1)本类中,如果是通过applicationContext.getBean创建一个TestService后,再调用此方法,
-
* 由于方法上同时有@Async和@Transactional注解,获取到这两个注解的拦截器后,只会取第一个拦截器进行后续的操作,
-
* 所以@Async会生效,而@Transactional不会生效
-
* 2)如果是直接调testTransactional()方法,而不是从新生成一个新的TestService Bean来调用此方法,
-
* 则此类上的@Async和@Transactional注解都不会生效,因为不会走AOP动态代理CglibAopProxy,
-
* 而@Async和@Transactional注解生效的实现,是在CglibAopProxy中实现的
-
* @return
-
*/
-
@Override
-
@Async
-
@Transactional(rollbackFor =RuntimeException.class)
-
publicInteger testTransactional(){
-
Integer test = test();
-
return test;
-
}
-
-
/**
-
* 此方法上的@Async和@Transactional都不会生效,因为在testTransactional()是直接调用此方法
-
* @return
-
*/
-
@Async
-
@Transactional(rollbackFor =RuntimeException.class)
-
privateInteger test(){
-
return10;
-
}
-
}
源码分析
在Controller类中注入Service时,是通过Cglib动态代理生成的Service,所以当Controller类调用Service中的方法时,会调用CglibAopProxy内部类DynamicAdvisedInterceptor的intercept()方法
-
@Override
-
@Nullable
-
publicObject intercept(Object proxy,Method method,Object[] args,MethodProxy methodProxy)throwsThrowable{
-
Object oldProxy =null;
-
boolean setProxyContext =false;
-
Object target =null;
-
TargetSource targetSource =this.advised.getTargetSource();
-
try{
-
if(this.advised.exposeProxy){
-
// Make invocation available if necessary.
-
oldProxy =AopContext.setCurrentProxy(proxy);
-
setProxyContext =true;
-
}
-
// Get as late as possible to minimize the time we "own" the target, in case it comes from a pool...
-
target = targetSource.getTarget();
-
Class<?> targetClass =(target !=null? target.getClass():null);
-
//获取方法上的所有注解的切面拦截器
-
List<Object> chain =this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
-
Object retVal;
-
//当在方法上没有获取到相应注解的拦截器时,直接进入方法执行相应逻辑
-
if(chain.isEmpty()&&Modifier.isPublic(method.getModifiers())){
-
// We can skip creating a MethodInvocation: just invoke the target directly.
-
// Note that the final invoker must be an InvokerInterceptor, so we know
-
// it does nothing but a reflective operation on the target, and no hot
-
// swapping or fancy proxying.
-
Object[] argsToUse =AopProxyUtils.adaptArgumentsIfNecessary(method, args);
-
retVal = methodProxy.invoke(target, argsToUse);
-
}else{
-
// 获取当方法拦截器后,通过反射调用具体的拦截器方法
-
//接下来先调用ReflectiveMethodInvocation的proceed方法
-
retVal =newCglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
-
}
-
retVal = processReturnType(proxy, target, method, retVal);
-
return retVal;
-
}
-
//省略finally代码
-
}
ReflectiveMethodInvocation.proceed方法只获取拦截器切面列表中的第一个拦截器切面,执行相应的拦截器逻辑
-
@Override
-
@Nullable
-
publicObject proceed()throwsThrowable{
-
// We start with an index of -1 and increment early.
-
if(this.currentInterceptorIndex ==this.interceptorsAndDynamicMethodMatchers.size()-1){
-
return invokeJoinpoint();
-
}
-
-
//如果方法上有多个拦截器或拦截器切面,只取第一个拦截器或拦截器切面
-
Object interceptorOrInterceptionAdvice =
-
this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
-
if(interceptorOrInterceptionAdvice instanceofInterceptorAndDynamicMethodMatcher){
-
// Evaluate dynamic method matcher here: static part will already have
-
// been evaluated and found to match.
-
InterceptorAndDynamicMethodMatcher dm =
-
(InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
-
Class<?> targetClass =(this.targetClass !=null?this.targetClass :this.method.getDeclaringClass());
-
if(dm.methodMatcher.matches(this.method, targetClass,this.arguments)){
-
return dm.interceptor.invoke(this);
-
}else{
-
// Dynamic matching failed.
-
// Skip this interceptor and invoke the next in the chain.
-
return proceed();
-
}
-
}
-
else{
-
// It's an interceptor, so we just invoke it: The pointcut will have
-
// been evaluated statically before this object was constructed.
-
//调用具体的拦截器
-
return((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
-
}
-
}
如果第一个拦截器切面是TransactionInterceptor,则调用TransactionInterceptor相关的方法,执行相应的逻辑,开启事务等
-
@Override
-
@Nullable
-
publicObject invoke(MethodInvocation invocation)throwsThrowable{
-
// Work out the target class: may be {@code null}.
-
// The TransactionAttributeSource should be passed the target class
-
// as well as the method, which may be from an interface.
-
Class<?> targetClass =(invocation.getThis()!=null?AopUtils.getTargetClass(invocation.getThis()):null);
-
-
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
-
//适配TransactionAspectSupport的invokeWithinTransaction方法
-
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
-
}
-
-
@Nullable
-
protectedObject invokeWithinTransaction(Method method,@NullableClass<?> targetClass,
-
finalInvocationCallback invocation)throwsThrowable{
-
-
// If the transaction attribute is null, the method is non-transactional.
-
TransactionAttributeSource tas = getTransactionAttributeSource();
-
finalTransactionAttribute txAttr =(tas !=null? tas.getTransactionAttribute(method, targetClass):null);
-
finalPlatformTransactionManager tm = determineTransactionManager(txAttr);
-
finalString joinpointIdentification = methodIdentification(method, targetClass, txAttr);
-
-
if(txAttr ==null||!(tm instanceofCallbackPreferringPlatformTransactionManager)){
-
// Standard transaction demarcation with getTransaction and commit/rollback calls.
-
//调用事务管理器的getTransaction和commit/rollback方法,获取一个事务对象,并开启事物
-
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
-
Object retVal =null;
-
try{
-
// This is an around advice: Invoke the next interceptor in the chain.
-
// This will normally result in a target object being invoked.
-
//调用目标类的方法
-
retVal = invocation.proceedWithInvocation();
-
}
-
catch(Throwable ex){
-
// target invocation exception
-
completeTransactionAfterThrowing(txInfo, ex);
-
throw ex;
-
}
-
finally{
-
//执行目标方法后,关闭事务,清除事物对象
-
cleanupTransactionInfo(txInfo);
-
}
-
//结果返回后提交事物状态等信息
-
commitTransactionAfterReturning(txInfo);
-
return retVal;
-
}else{
-
//省略其他代码
-
}
-
}
-
/**
-
* 根据给定的事务参数创建一个事物.
-
* @see #getTransactionAttributeSource()
-
*/
-
protectedTransactionInfo createTransactionIfNecessary(@NullablePlatformTransactionManager tm,
-
@NullableTransactionAttribute txAttr,finalString joinpointIdentification){
-
-
// If no name specified, apply method identification as transaction name.
-
if(txAttr !=null&& txAttr.getName()==null){
-
txAttr =newDelegatingTransactionAttribute(txAttr){
-
@Override
-
publicString getName(){
-
return joinpointIdentification;
-
}
-
};
-
}
-
-
TransactionStatus status =null;
-
if(txAttr !=null){
-
if(tm !=null){
-
//开启事物
-
status = tm.getTransaction(txAttr);
-
}
-
else{
-
if(logger.isDebugEnabled()){
-
logger.debug("Skipping transactional joinpoint ["+ joinpointIdentification +
-
"] because no transaction manager has been configured");
-
}
-
}
-
}
-
//准备事物对象信息
-
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
-
}
-
/**
-
* 委托doGetTransaction方法,isExistingTransaction判断条件和doBegin方法处理事务传播行为
-
* This implementation handles propagation behavior. Delegates to
-
* {@code doGetTransaction}, {@code isExistingTransaction}
-
* and {@code doBegin}.
-
* @see #doGetTransaction
-
* @see #isExistingTransaction
-
* @see #doBegin
-
*/
-
@Override
-
publicfinalTransactionStatus getTransaction(@NullableTransactionDefinition definition)throwsTransactionException{
-
Object transaction = doGetTransaction();
-
-
// Cache debug flag to avoid repeated checks.
-
boolean debugEnabled = logger.isDebugEnabled();
-
-
if(definition ==null){
-
// Use defaults if no transaction definition given.
-
definition =newDefaultTransactionDefinition();
-
}
-
//如果已经有事务存在,直接处理已存在的事务
-
if(isExistingTransaction(transaction)){
-
// Existing transaction found -> check propagation behavior to find out how to behave.
-
return handleExistingTransaction(definition, transaction, debugEnabled);
-
}
-
-
// Check definition settings for new transaction.
-
if(definition.getTimeout()<TransactionDefinition.TIMEOUT_DEFAULT){
-
thrownewInvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
-
}
-
-
// No existing transaction found -> check propagation behavior to find out how to proceed.
-
if(definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_MANDATORY){
-
thrownewIllegalTransactionStateException(
-
"No existing transaction found for transaction marked with propagation 'mandatory'");
-
}
-
elseif(definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_REQUIRED ||
-
definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
-
definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_NESTED){
-
SuspendedResourcesHolder suspendedResources = suspend(null);
-
if(debugEnabled){
-
logger.debug("Creating new transaction with name ["+ definition.getName()+"]: "+ definition);
-
}
-
try{
-
boolean newSynchronization =(getTransactionSynchronization()!= SYNCHRONIZATION_NEVER);
-
DefaultTransactionStatus status = newTransactionStatus(
-
definition, transaction,true, newSynchronization, debugEnabled, suspendedResources);
-
//开启事务
-
doBegin(transaction, definition);
-
prepareSynchronization(status, definition);
-
return status;
-
}
-
catch(RuntimeException|Error ex){
-
resume(null, suspendedResources);
-
throw ex;
-
}
-
}
-
else{
-
// Create "empty" transaction: no actual transaction, but potentially synchronization.
-
if(definition.getIsolationLevel()!=TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()){
-
logger.warn("Custom isolation level specified but no actual transaction initiated; "+
-
"isolation level will effectively be ignored: "+ definition);
-
}
-
boolean newSynchronization =(getTransactionSynchronization()== SYNCHRONIZATION_ALWAYS);
-
return prepareTransactionStatus(definition,null,true, newSynchronization, debugEnabled,null);
-
}
-
}
如果第一个拦截器切面是AsyncExecutionInterceptor,则会获取一个任务执行器,再创建一个任务,然后从线程池ThreadPoolExecutor中获取一个线程来执行这个任务
-
@Override
-
@Nullable
-
publicObject invoke(finalMethodInvocation invocation)throwsThrowable{
-
Class<?> targetClass =(invocation.getThis()!=null?AopUtils.getTargetClass(invocation.getThis()):null);
-
Method specificMethod =ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
-
finalMethod userDeclaredMethod =BridgeMethodResolver.findBridgedMethod(specificMethod);
-
//获取一个异步任务执行器
-
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
-
if(executor ==null){
-
thrownewIllegalStateException(
-
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
-
}
-
//创建一个任务
-
Callable<Object> task =()->{
-
try{
-
Object result = invocation.proceed();
-
if(result instanceofFuture){
-
return((Future<?>) result).get();
-
}
-
}
-
catch(ExecutionException ex){
-
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
-
}
-
catch(Throwable ex){
-
handleError(ex, userDeclaredMethod, invocation.getArguments());
-
}
-
returnnull;
-
};
-
//最终会从ThreadPoolExecutor获取一个线程来提交需要执行的任务
-
return doSubmit(task, executor, invocation.getMethod().getReturnType());
-
}
-
/**
-
* AsyncExecutionAspectSupport.doSubmit()
-
* 提交任务
-
*/
-
@Nullable
-
protectedObject doSubmit(Callable<Object> task,AsyncTaskExecutor executor,Class<?> returnType){
-
if(CompletableFuture.class.isAssignableFrom(returnType)){
-
returnCompletableFuture.supplyAsync(()->{
-
try{
-
return task.call();
-
}
-
catch(Throwable ex){
-
thrownewCompletionException(ex);
-
}
-
}, executor);
-
}
-
elseif(ListenableFuture.class.isAssignableFrom(returnType)){
-
return((AsyncListenableTaskExecutor) executor).submitListenable(task);
-
}
-
elseif(Future.class.isAssignableFrom(returnType)){
-
return executor.submit(task);
-
}
-
else{
-
//继续提交任务
-
executor.submit(task);
-
returnnull;
-
}
-
}
-
/**
-
* ThreadPoolTaskExecutor.submit() 从线程池中获取一个线程来执行任务
-
*/
-
public<T>Future<T> submit(Callable<T> task){
-
ThreadPoolExecutor executor =this.getThreadPoolExecutor();
-
-
try{
-
return executor.submit(task);
-
}catch(RejectedExecutionException var4){
-
thrownewTaskRejectedException("Executor ["+ executor +"] did not accept task: "+ task, var4);
-
}
-
}
解决方案
在Service中A方法调用B方法,要使B方法上的@Async或@Transcational注解生效,可在A方法中通过ApplicationContext.getBean()来从新获取一个新的Service对象,代码实现如下
-
@Service
-
publicclassTestServiceImplimplementsTestService,ApplicationContextAware{
-
//省略其他代码
-
@Override
-
@Transactional(rollbackFor =RuntimeException.class)
-
publicInteger getId(){
-
TestService testService =(TestService) applicationContext.getBean("testServiceImpl");
-
Integer integer = testService.testTransactional();
-
return integer;
-
}
-
-
@Override
-
@Transactional(rollbackFor =RuntimeException.class)
-
publicInteger testTransactional(){
-
Integer test = test();
-
return test;
-
}
-
}
上述testTransactional()方法上的@Transactional将生效
上一篇: 分布式(ACID 分表分库 CAP理论)
下一篇: Spring中@Async用法总结