欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spring源码解析 (七) 事务底层源码实现

程序员文章站 2022-03-29 23:25:12
...

1.spring事务的本质

数据库中的事务默认存在,只不过每次都自动提交了,数据库中这个参数--autoCommitd=true

其本质是 begin sql commit

而spring事务其本质是数据库一致,也是begin sql commit ,只不过将autoCommitd设置为了false。

而每个事务其实都是一个数据库连接,根据不同的事务传播机制进行挂起,保存点等操作。

spring事务执行还是通过代理织入的

一个小demo:如果对象直接调用自己的方法,那么这个@Transactional事务就不会触发,因为事务执行的本质还是代理,直接调用就是原始对象方法的调用,没有进行事务的代理逻辑,但如果自己注入自己,换成txService.a(),事务代理逻辑就会触发,因为txService属于代理对象,进行了代理逻辑的织入。

@Component
public class TxService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private TxService txService;

    public void insert11(){
        System.out.println("insert11");
       // txService.a();
        a();
    }
    @Transactional
    public void  a(){
        System.out.println("A");
        jdbcTemplate.execute("insert grade values(13,2,2,2)");
        int i =100/0;
    }
}

2.事务底层源码分析

首先引入了@EnableTransactionManagement

里面注册了两个bean,AutoProxyRegistrar和ProxyTransactionManagementConfiguration

AutoProxyRegistrar主要作用是开启自动代理

ProxyTransactionManagementConfiguration中定义了三个bean:

  1. BeanFactoryTransactionAttributeSourceAdvisor:一个PointcutAdvisor
  2. AnnotationTransactionAttributeSource:就是Pointcut
  3. TransactionInterceptor:就是代理逻辑Advice

(TransactionAttributeSource其实就是代表一个@Transactional)

接下来我们查看一下事务的执行源码。

事务执行源码路径:也是通过Interceptor进行实现的。

TransactionInterceptor.invoke()-》invokeWithinTransaction()

查看invokeWithinTransaction,该方法为事务执行的流程。

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
      final InvocationCallback invocation) throws Throwable {

   // If the transaction attribute is null, the method is non-transactional.
   TransactionAttributeSource tas = getTransactionAttributeSource();
   // 获取到当前方法或类上的@Transactional注解的信息
   final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
   // 得到一个TransactionManager
   final TransactionManager tm = determineTransactionManager(txAttr);

   if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
      ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
         if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
            throw new TransactionUsageException(
                  "Unsupported annotated transaction on suspending function detected: " + method +
                  ". Use TransactionalOperator.transactional extensions instead.");
         }
         ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
         if (adapter == null) {
            throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
                  method.getReturnType());
         }
         return new ReactiveTransactionSupport(adapter);
      });
      return txSupport.invokeWithinTransaction(
            method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
   }

   // 转化为PlatformTransactionManager
   PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
   // 根据当前执行的类中的某个方法以及@Transactional注解的信息生成一个唯一标志,这个标记会用来作为事务名
   final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

   if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {

      // Standard transaction demarcation with getTransaction and commit/rollback calls.
      // 创建事务,并得到事务信息,后面需要事务信息用来进行提交或回滚,这里不一定创建了事务,也可能延用了已存在的事务
      TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

      Object retVal;
      try {
         // This is an around advice: Invoke the next interceptor in the chain.
         // This will normally result in a target object being invoked.
         // 执行业务方法逻辑
         retVal = invocation.proceedWithInvocation();
      }
      catch (Throwable ex) {
         // target invocation exception
         // 出现异常的情况下,如果异常需要rollback,则回滚,否则则会提交
         completeTransactionAfterThrowing(txInfo, ex);
         // 出现异常后会先执行finally中的方法逻辑在抛出当前异常
         throw ex;
      }
      finally {
         // 清除ThreadLocal中的事务信息
         cleanupTransactionInfo(txInfo);
      }

      if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
         // Set rollback-only in case of Vavr failure matching our rollback rules...
         TransactionStatus status = txInfo.getTransactionStatus();
         if (status != null && txAttr != null) {
            retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
         }
      }

      // 提交事务
      commitTransactionAfterReturning(txInfo);
      return retVal;
   }

   else {
      final ThrowableHolder throwableHolder = new ThrowableHolder();

      // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
      try {
         Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
            TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
            try {
               Object retVal = invocation.proceedWithInvocation();
               if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                  // Set rollback-only in case of Vavr failure matching our rollback rules...
                  retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
               }
               return retVal;
            }
            catch (Throwable ex) {
               if (txAttr.rollbackOn(ex)) {
                  // A RuntimeException: will lead to a rollback.
                  if (ex instanceof RuntimeException) {
                     throw (RuntimeException) ex;
                  }
                  else {
                     throw new ThrowableHolderException(ex);
                  }
               }
               else {
                  // A normal return value: will lead to a commit.
                  throwableHolder.throwable = ex;
                  return null;
               }
            }
            finally {
               cleanupTransactionInfo(txInfo);
            }
         });

         // Check result state: It might indicate a Throwable to rethrow.
         if (throwableHolder.throwable != null) {
            throw throwableHolder.throwable;
         }
         return result;
      }
      catch (ThrowableHolderException ex) {
         throw ex.getCause();
      }
      catch (TransactionSystemException ex2) {
         if (throwableHolder.throwable != null) {
            logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
            ex2.initApplicationException(throwableHolder.throwable);
         }
         throw ex2;
      }
      catch (Throwable ex2) {
         if (throwableHolder.throwable != null) {
            logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
         }
         throw ex2;
      }
   }
}

接下来进入createTransactionIfNecessary方法查看事务的创建过程。

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
      @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

   // If no name specified, apply method identification as transaction name.
   // 事务的名字
   if (txAttr != null && txAttr.getName() == null) {
      txAttr = new DelegatingTransactionAttribute(txAttr) {
         @Override
         public String getName() {
            return joinpointIdentification;
         }
      };
   }

   // 事务状态对象,
   TransactionStatus status = null;
   if (txAttr != null) {
      if (tm != null) {
          //获得事务
         status = tm.getTransaction(txAttr);
      }
      else {
         if (logger.isDebugEnabled()) {
            logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                  "] because no transaction manager has been configured");
         }
      }
   }


   return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

进入AbstractPlatformTransactionManager.getTransaction()方法

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
      throws TransactionException {

   // Use defaults if no transaction definition given.
   // TransactionDefinition就是TransactionAttribute,就是@Transactional注解信息
   // 表示一个事务的定义,通过@Transactional注解也就是在定义一个事务
   TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

   // 获取一个事务对象,每次都会生成一个DataSourceTransactionObject,而重点是这个对象中是否已经存在一个数据库连接
   Object transaction = doGetTransaction(); 
   boolean debugEnabled = logger.isDebugEnabled();

   // 如果已经存在一个数据库连接,表示当前线程已经存在一个事务
   if (isExistingTransaction(transaction)) {
      // Existing transaction found -> check propagation behavior to find out how to behave.
      // 存在事务的情况下,按不同的传播级别进行处理
      return handleExistingTransaction(def, transaction, debugEnabled);
   }

   // Check definition settings for new transaction.
   // timeout如果为-1,表示没有时间限制
   if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
      throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
   }

   // 当前线程还不存在事务
   // No existing transaction found -> check propagation behavior to find out how to proceed.
   if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
      throw new IllegalTransactionStateException(
            "No existing transaction found for transaction marked with propagation 'mandatory'");
   }
   else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
         def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
         def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
      // 挂起
      SuspendedResourcesHolder suspendedResources = suspend(null);
      if (debugEnabled) {
         logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
      }
      try {
         boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);

         // 事务对象中包括:
         // 1. 事务的定义
         // 2. 事务对象
         // 3. 是否是新事务
         DefaultTransactionStatus status = newTransactionStatus(
               def, transaction, true, newSynchronization, debugEnabled, suspendedResources);

         // 开启事务,获得新的Connection对象
         doBegin(transaction, def);
         // 初始化TransactionSynchronizationManager中的属性
         prepareSynchronization(status, def);
         return status;
      }
      catch (RuntimeException | Error ex) {
         resume(null, suspendedResources);
         throw ex;
      }
   }
   else {
      // 不会doBegin,不会真的开启事务,也就是不会把Connection的autoCommit设置为false,sql没有在事务中执行
      // Create "empty" transaction: no actual transaction, but potentially synchronization.
      if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
         logger.warn("Custom isolation level specified but no actual transaction initiated; " +
               "isolation level will effectively be ignored: " + def);
      }
      //
      boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
      return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
   }
}

继续向里面走,看核心方法DataSourceTransactionManager.doGetTransaction(),这里就是创建了一个DataSourceTransactionObject对象

protected Object doGetTransaction() {
   DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    // 是否允许嵌套事务,默认是允许的,在DataSourceTransactionManager无参构造方法中进行了设置
   txObject.setSavepointAllowed(isNestedTransactionAllowed());
   // 从当前线程ThreadLocal中获取当前DataSource所对应的数据库连接,可能为null
   ConnectionHolder conHolder =
         (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
   txObject.setConnectionHolder(conHolder, false);
   return txObject;
}

回到流程中去,到 doBegin方法,开启一个事务

protected void doBegin(Object transaction, TransactionDefinition definition) {
   DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
   Connection con = null;

   try {
      // 如果事务对象内没有连接就从dataSource中获取一个连接
      if (!txObject.hasConnectionHolder() ||
            txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
         // isSynchronizedWithTransaction 我的理解,表示连接是否和事务同步,表示每开启一个事务就从DataSource中获取一个连接
         // 默认是false,所以在开启事务时,除非当前事务对象中还没有数据库连接才会从DataSource中去获取一个连接
         Connection newCon = obtainDataSource().getConnection();
         if (logger.isDebugEnabled()) {
            logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
         }
         // 设置为newConnectionHolder为true
         txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
      }

      // 如果在开启事务时,事务对象中已经有数据库连接了
      txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
      con = txObject.getConnectionHolder().getConnection();

      // 设置数据库连接的隔离级别
      // 如果当前事务中的隔离级别跟数据库的隔离级别不一样就返回数据库的隔离级别并记录下来,事务结束后恢复
      Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
      txObject.setPreviousIsolationLevel(previousIsolationLevel);
      txObject.setReadOnly(definition.isReadOnly());

      // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
      // so we don't want to do it unnecessarily (for example if we've explicitly
      // configured the connection pool to set it already).
      // 把数据库连接的autoCommit设置为false,禁止数据库的自动提交
      if (con.getAutoCommit()) {
         txObject.setMustRestoreAutoCommit(true);
         if (logger.isDebugEnabled()) {
            logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
         }
         con.setAutoCommit(false);
      }

      prepareTransactionalConnection(con, definition);
      txObject.getConnectionHolder().setTransactionActive(true);

      // 设置数据库连接的超时时间
      int timeout = determineTimeout(definition);
      if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
         txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
      }

      // Bind the connection holder to the thread.
      if (txObject.isNewConnectionHolder()) {
         // 把新生成的数据库连接设置到当前线程的TheadLocal中进行缓存
         TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
      }
   }

   catch (Throwable ex) {
      if (txObject.isNewConnectionHolder()) {
         DataSourceUtils.releaseConnection(con, obtainDataSource());
         txObject.setConnectionHolder(null, false);
      }
      throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
   }
}

从一个小demo中总结一下流程:

@Component
public class TxService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private TxService txService;
@Transactional
public void insert11(){
    System.out.println("insert11");
    jdbcTemplate.execute("insert grade values(11,2,2,2)");
    txService.a();
}

@Transactional(propagation = Propagation.REQUIRES_NEW)    挂起test,开启新事务,执行a  
//@Transactional(propagation = Propagation.NOT_SUPPORTED)   挂起test,不开启事务,执行a
public void  a(){
    jdbcTemplate.execute("insert grade values(13,2,2,2)");
}
}

事务执行过程:

  1. 生成test事务状态对象
  2. test事务doBegin,获取并将数据库连接2825设置到test事务状态对象中
  3. 把test事务信息设置到事务同步管理器中
  4. 执行test业务逻辑方法(可以获取到test事务的信息)
    1. 生成a事务状态对象,并且可以获取到当前线程中已经存在的数据库连接2825
    2. 判断出来当前线程中已经存在事务
    3. 如果需要新开始事务,就先挂起数据库连接2825,挂起就是把test事务信息从事务同步管理器中转移到挂起资源对象中,并把当前a事务状态对象中的数据库连接设置为null
    4. a事务doBegin,新生成一个数据库连接2826,并设置到a事务状态对象中
    5. 把a事务信息设置到事务同步管理器中
    6. 执行a业务逻辑方法(可以利用事务同步管理器获取到a事务信息)
    7. 利用a事务状态对象,执行提交
    8. 提交之后会恢复所挂起的test事务,这里的恢复,其实只是把挂起资源对象中所保存的信息再转移回事务同步管理器

5.继续执行test业务逻辑方法(仍然可以获取到test事务的信息)

6.利用test事务状态对象,执行提交

关于事务的挂起和恢复说明一下:spring事务的挂起就是把事务管理同步器中的信息转移到挂起资源对象,恢复就是把挂起资源对象转移回事务同步管理器中。

事务传播类型

事务传播机制只针对于同一线程,如果是多线程,则各使用各的事务。

spring源码解析 (七) 事务底层源码实现