欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring-事务的源码分析(七)

程序员文章站 2024-01-28 17:07:16
...

Spring-事务的源码分析(七)

都知道事务是通过spring AOP来实现的;前面文章已经分析过AOP的实现原理了,而事务只不过是AOP中的一个增强器而已;所以接下来将分析一下事务增强器的原理。

spring的核心就是IC依赖注入,那么就要先解析依赖配置,然后再注入。所以spring的功能都会出现两块,一块是解析mxl,一块是构建BeanDefinition。
事务增强器也是这样,先要解析事务的标签,然后才是执行事务。

解析tx事务标签

来看一下我们在xml中定义的事务标签是在那里被解析的?

public class TxNamespaceHandler extends NamespaceHandlerSupport {
    static final String TRANSACTION_MANAGER_ATTRIBUTE = "transaction-manager";
    static final String DEFAULT_TRANSACTION_MANAGER_BEAN_NAME = "transactionManager";
    static String getTransactionManagerName(Element element) {
        return (element.hasAttribute(TRANSACTION_MANAGER_ATTRIBUTE) ?
                element.getAttribute(TRANSACTION_MANAGER_ATTRIBUTE) : DEFAULT_TRANSACTION_MANAGER_BEAN_NAME);
    }

    @Override
    public void init() {
        registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
        registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
        registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
    }
}

从代码中可以看出,事务的开始一切都从:AnnotationDrivenBeanDefinitionParser 类的parse开始了。

Spring-事务的源码分析(七)

开始解析xml标签了

/**
     * 解析annotation-driven开头的标签--> {@code <tx:annotation-driven/>} tag. Will
     * {@link AopNamespaceUtils#registerAutoProxyCreatorIfNecessary register an AutoProxyCreator}
     * with the container as necessary.
     */
    @Override
    public BeanDefinition parse(Element element, ParserContext parserContext) {
        registerTransactionalEventListenerFactory(parserContext);
        String mode = element.getAttribute("mode");
        if ("aspectj".equals(mode)) {
            // mode="aspectj"
            registerTransactionAspect(element, parserContext);
        }
        else {
            // mode="proxy"
            AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
        }
        return null;
    }

解析tx事务的标签的核心全部在InfrastructureAdvisorAutoProxyCreator这个里面,
InfrastructureAdvisorAutoProxyCreator -> AbstractAdvisorAutoProxyCreator -> AbstractAutoProxyCreator
所有的解析工作大部分都在AbstractAutoProxyCreator类里面解析完成的。前面已经分析了aop时已经分析完毕了,所以在这里就不继续分析了。

执行事务增强器

当判断某个bean适用于事务增强时,也就是用于增强器BeanFactoryTransactionAttributeSourceAdvisor这个类,所以在自定义标签解析时,注入的类成为了整个事务功能的基础。
BeanFactoryTransactionAttributeSourceAdvisor作为Advisor的实现类,需要遵从Advisor的处理方式,当代理被调用时会调用这个类的增强方法,也就是此bean的Advise,又因为在事务标签解析时我们把TransactionInterceptor类型的bean注入到了BeanFactoryTransactionAttributeSourceAdvisor中,所以在调用事务增强器增强的代理类时会先执行TransactionInterceptor进行增强,也就是执行TransactionInterceptor中的invoke方法完成整个事务的逻辑。

时序图:
Spring-事务的源码分析(七)

从时序图中可以看出:
1. 解析注解事务中的属性;
2. 根据属性设置创建事务;
3. 在当前线程中执行用户真正的代码;
4. 如果执行过程中有异常则执行回滚逻辑;
5. 如果没有异常,则提交事务。
在前面讲解事务时说到过,spring事务是基于数据库本身的事务的,从时序图中也可以看出,创建事务,回滚事务,提交事务,操作都需要去操作数据库连接类DataSourceTransactionManager。

创建事务

看着时序图已经可以往下走了,下面将展示源码中个人觉得重要的点,其他不重要的由于篇幅问题就不帖出来了。
第一步:整个事务执行框架
TransactionInterceptor中的invoke执行了TransactionAspectSupport的invokeWithinTransaction方法。

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
            throws Throwable {

        //获取事务属性,如果没有属性,则该方法没有事务 ** getTransactionAttribute重点**
        final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
        final PlatformTransactionManager tm = determineTransactionManager(txAttr); //获取beanFactory中的事务管理器
        final String joinpointIdentification = methodIdentification(method, targetClass);
        //声明式事务处理
        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            //创建事务
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
            Object retVal = null;
            try {
                // This is an around advice: Invoke the next interceptor in the chain.
                // 执行被增强的方法
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // 异常回滚
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally { //清除信息
                cleanupTransactionInfo(txInfo);
            } //提交事务
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }

        else {
            // 编程式事务处理It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
            .......
            .......
         }
    }

可以看到整个事务在上面的方法中已经全部包含了,创建事务,执行代码,回滚,提交。spring把具体操作又拆分到各个业务类中了。

第二步:获取事务注解属性
就是解析方法上面的:@Transactional(propagation = Propagation.REQUIRES_NEW)这个里面的属性设置。
跟着上面的时序图直接来到parseTransactionAnnotation这个方法中看看。

//开始解析各种事务的标签
    protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
        RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
        Propagation propagation = attributes.getEnum("propagation");
        rbta.setPropagationBehavior(propagation.value());
        Isolation isolation = attributes.getEnum("isolation");
        rbta.setIsolationLevel(isolation.value());
        rbta.setTimeout(attributes.getNumber("timeout").intValue());
        rbta.setReadOnly(attributes.getBoolean("readOnly"));
        rbta.setQualifier(attributes.getString("value"));
        ArrayList<RollbackRuleAttribute> rollBackRules = new ArrayList<RollbackRuleAttribute>();
        Class<?>[] rbf = attributes.getClassArray("rollbackFor");
        for (Class<?> rbRule : rbf) {
            RollbackRuleAttribute rule = new RollbackRuleAttribute(rbRule);
            rollBackRules.add(rule);
        }
        String[] rbfc = attributes.getStringArray("rollbackForClassName");
        for (String rbRule : rbfc) {
            RollbackRuleAttribute rule = new RollbackRuleAttribute(rbRule);
            rollBackRules.add(rule);
        }
        Class<?>[] nrbf = attributes.getClassArray("noRollbackFor");
        for (Class<?> rbRule : nrbf) {
            NoRollbackRuleAttribute rule = new NoRollbackRuleAttribute(rbRule);
            rollBackRules.add(rule);
        }
        String[] nrbfc = attributes.getStringArray("noRollbackForClassName");
        for (String rbRule : nrbfc) {
            NoRollbackRuleAttribute rule = new NoRollbackRuleAttribute(rbRule);
            rollBackRules.add(rule);
        }
        rbta.getRollbackRules().addAll(rollBackRules);
        return rbta;
    }

第三步:根据上面获取的属性,创建事务
TransactionAspectSupport.createTransactionIfNecessary

protected TransactionInfo createTransactionIfNecessary(
            PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {

        // If no name specified, apply method identification as transaction name.
        if (txAttr != null && txAttr.getName() == null) {
            txAttr = new DelegatingTransactionAttribute(txAttr) {
                @Override
                public String getName() {
                    return joinpointIdentification;
                }
            };
        }

        TransactionStatus status = null;
        if (txAttr != null) {
            if (tm != null) { //获取事务状态信息,**getTransaction重要**包活设置一些事务的传播性
                status = tm.getTransaction(txAttr);
            }
            else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                            "] because no transaction manager has been configured");
                }
            }
        } //准备事务信息**prepareTransactionInfo**
        return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    }

设置事务的状态
AbstractPlatformTransactionManager.getTransaction

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
        Object transaction = doGetTransaction(); //获取事务**doGetTransaction**

        // Cache debug flag to avoid repeated checks.
        boolean debugEnabled = logger.isDebugEnabled();

        if (definition == null) {
            // Use defaults if no transaction definition given.
            definition = new DefaultTransactionDefinition();
        }
        //判断当时是否存在事务,判断依据为当前线程记录的连接不为空且连接中的事务属性不为空
        if (isExistingTransaction(transaction)) {
            // 当前线程已经存在事务 **handleExistingTransaction** Existing transaction found -> check propagation behavior to find out how to behave.
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }

        // 事务超时
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }

        // 当前不存在事务,但是事务类型为:PROPAGATION_MANDATORY的时候会抛一个异常 -> check propagation behavior to find out how to proceed.
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
            }
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                doBegin(transaction, definition); //**doBegin**构造事务,包括设置connectionHolder,隔离级别,timeout,如果是新连接,绑定到当前线程
                prepareSynchronization(status, definition); //新同步事务的设置,针对当前线程的设置,这里用到了ThreadLocal哦。
                return status;
            }
            catch (RuntimeException ex) {
                resume(null, suspendedResources);
                throw ex;
            }
            catch (Error err) {
                resume(null, suspendedResources);
                throw err;
            }
        }
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + definition);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
        }
    }

我们先抛开事务中已经存在事务的判断,先来继续创建事务。
走到了:doBegin。
进入到了DataSourceTransactionManager这个的doBegin

protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;

        try {
            if (txObject.getConnectionHolder() == null ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                Connection newCon = this.dataSource.getConnection();
                if (logger.isDebugEnabled()) {
                    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }

            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();
            //设置事务的隔离级别**prepareConnectionForTransaction**
            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);

            // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
            // so we don't want to do it unnecessarily (for example if we've explicitly
            // configured the connection pool to set it already).
            if (con.getAutoCommit()) { //数据库的事务为自定提交,则进入
                txObject.setMustRestoreAutoCommit(true);//改为spring控制提交
                if (logger.isDebugEnabled()) {
                    logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }
                con.setAutoCommit(false); //取消数据库的自动提交
            } //设置当前线程有事务,后续进来的线程判断的时候就可以按照这个来做判断了
            txObject.getConnectionHolder().setTransactionActive(true);

            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }
            // Bind the session holder to the thread.
            if (txObject.isNewConnectionHolder()) {
                TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
            }
        }

        catch (Throwable ex) {
            if (txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, this.dataSource);
                txObject.setConnectionHolder(null, false);
            }
            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
    }

看到了怎么创建事务了嘛啊?就是连接数据库,然后更改当前数据库的连接的事务提交为false,当前执行了sql之后就不会自动提交了。

接下来进入到:
AbstractPlatformTransactionManager.prepareSynchronization

/**
     * Initialize transaction synchronization as appropriate.,将当前事务记录到ThreadLocal中
     */
    protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
        if (status.isNewSynchronization()) {
            TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
                    definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
                            definition.getIsolationLevel() : null);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
            TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
            TransactionSynchronizationManager.initSynchronization();
        }
    }

这个方法里面主要是对:新同步事务的设置,针对当前线程的设置,这里用到了ThreadLocal哦。把刚建立好的事务信息存储到ThreadLocal里面。(这也就是下面在一个事务中开启多线程之后,多线程里面的数据库操作不在一个事务里面的原因了)
到这里,总算创建完毕一个事务了,整个过程就那么多了。

事务中已经存在事务(事务的传播属性)

如果当前事务中已经存在事务了,这就是事务的传播属性了。

private TransactionStatus handleExistingTransaction(
            TransactionDefinition definition, Object transaction, boolean debugEnabled)
            throws TransactionException {
        //可以看出PROPAGATION_NEVER这个事务类型,如果当前有事务,直接抛异常,比较简单粗暴。
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
            throw new IllegalTransactionStateException(
                    "Existing transaction found for transaction marked with propagation 'never'");
        }
        //忽略当前事务,以非事务方式执行操作,如果当前存在事务就把当前事务挂起,执行完后恢复事务(忽略当前事务)
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction");
            }
            Object suspendedResources = suspend(transaction);
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(
                    definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }
        //新事务处理
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction, creating new transaction with name [" +
                        definition.getName() + "]");
            }
            SuspendedResourcesHolder suspendedResources = suspend(transaction); //将原事务挂起
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                doBegin(transaction, definition); //连接数据库,设置当前线程连接数据库的信息
                prepareSynchronization(status, definition);  //将一些事务信息绑定到threadLocal线程中 //这两句代码和创建事务时的代码一样,说明开启一个新事务,相当于开启一个新的数据库连接
                return status;
            }
            catch (RuntimeException beginEx) {
                resumeAfterBeginException(transaction, suspendedResources, beginEx);
                throw beginEx;
            }
            catch (Error beginErr) {
                resumeAfterBeginException(transaction, suspendedResources, beginErr);
                throw beginErr;
            }
        }
        //嵌套事务处理
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            if (!isNestedTransactionAllowed()) {
                throw new NestedTransactionNotSupportedException(
                        "Transaction manager does not allow nested transactions by default - " +
                        "specify 'nestedTransactionAllowed' property with value 'true'");
            }
            if (debugEnabled) {
                logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
            }
            if (useSavepointForNestedTransaction()) {
                // 创建一个保存点
                // through the SavepointManager API implemented by TransactionStatus.
                // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
                DefaultTransactionStatus status =
                        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                status.createAndHoldSavepoint(); //创建一个保存点
                return status;
            }
            else {
                // 如果不能建立保存点,则创建一个新事务吧,和上面的创建一个新事务一样的处理。
                // Usually only for JTA: Spring synchronization might get activated here
                // in case of a pre-existing JTA transaction.
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, null);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
        }

        // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
        if (debugEnabled) {
            logger.debug("Participating in existing transaction");
        }
        if (isValidateExistingTransaction()) {
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                    Constants isoConstants = DefaultTransactionDefinition.constants;
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] specifies isolation level which is incompatible with existing transaction: " +
                            (currentIsolationLevel != null ?
                                    isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                    "(unknown)"));
                }
            }
            if (!definition.isReadOnly()) {
                if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] is not marked as read-only but existing transaction is");
                }
            }
        }
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    }

回滚事务

在看事务回滚之前先来看一段代码:

    try {
        // This is an around advice: Invoke the next interceptor in the chain.
        // 1.执行被增强的方法
        retVal = invocation.proceedWithInvocation();
    }
    catch (Throwable ex) {
        // 2.异常回滚
        completeTransactionAfterThrowing(txInfo, ex);
        throw ex;
    }

try里面是执行我们的业务代码,就是说,只要业务代码里面有异常,都会出发回滚机制,这里只是说回滚机制哦!

TransactionAspectSupport. completeTransactionAfterThrowing


protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
        if (txInfo != null && txInfo.hasTransaction()) {
            if (logger.isTraceEnabled()) {
                logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
                        "] after exception: " + ex);
            } //判断异常是否回滚。DefaultTransactionAttribute的rollbackOn里面这个里面只有RuntimeException与error才会发生回滚
            if (txInfo.transactionAttribute.rollbackOn(ex)) {
                try {
                   //开始回滚了
                    txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
                }
                catch (TransactionSystemException ex2) {
                    logger.error("Application exception overridden by rollback exception", ex);
                    ex2.initApplicationException(ex);
                    throw ex2;
                }
                catch (RuntimeException ex2) {
                    logger.error("Application exception overridden by rollback exception", ex);
                    throw ex2;
                }
                catch (Error err) {
                    logger.error("Application exception overridden by rollback error", ex);
                    throw err;
                }
            }
            else {
                // We don't roll back on this exception.
                // Will still roll back if TransactionStatus.isRollbackOnly() is true.
                try {
                   //如果没有经过上面的回滚,那么还是会被执行提交的哦
                    txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
                }
                catch (TransactionSystemException ex2) {
                    logger.error("Application exception overridden by commit exception", ex);
                    ex2.initApplicationException(ex);
                    throw ex2;
                }
                catch (RuntimeException ex2) {
                    logger.error("Application exception overridden by commit exception", ex);
                    throw ex2;
                }
                catch (Error err) {
                    logger.error("Application exception overridden by commit error", ex);
                    throw err;
                }
            }
        }
    }

rollbackOn()里面是这样写的:

   return (ex instanceof RuntimeException || ex instanceof Error);

所以默认情况下面只会回滚RuntimeException 和Error的异常,而Exception异常则不会被回滚。

回滚:AbstractPlatformTransactionManager.rollback -> processRollback

private void processRollback(DefaultTransactionStatus status) {
        try {
            try {
                triggerBeforeCompletion(status);
                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Rolling back transaction to savepoint");
                    } //如果有保存点,当前事务为单独的线程则回退到保存点 **rollbackToHeldSavepoint**
                    status.rollbackToHeldSavepoint();
                }
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction rollback");
                    } //如果当前事务为独立的新事务,则直接回退
                    doRollback(status); //直接执行数据库的callback回滚
                }
                else if (status.hasTransaction()) {
                    if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                        if (status.isDebug()) {
                            logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                        } //如果当前事务不是独立的事务,那么只能标记状态,等待事务链执行完毕后一起回滚
                        doSetRollbackOnly(status);
                    }
                    else {
                        if (status.isDebug()) {
                            logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
                        }
                    }
                }
                else {
                    logger.debug("Should roll back transaction but cannot - no transaction available");
                }
            }
            catch (RuntimeException ex) {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                throw ex;
            }
            catch (Error err) {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                throw err;
            }
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
        }
        finally { //清空前面记录的资源,并将挂起的线程恢复,前面有挂起,这里有恢复哦!
            cleanupAfterCompletion(status);
        }
    }

1.如果有保存点,则直接回滚到保存点

public void rollbackToHeldSavepoint() throws TransactionException {
        if (!hasSavepoint()) {
            throw new TransactionUsageException(
                    "Cannot roll back to savepoint - no savepoint associated with current transaction");
        }
        getSavepointManager().rollbackToSavepoint(getSavepoint()); //**rollbackToSavepoint**数据库保存点回滚
        getSavepointManager().releaseSavepoint(getSavepoint());
        setSavepoint(null);
    }

JdbcTransactionObjectSupport.rollbackToSavepoint

@Override
    public void rollbackToSavepoint(Object savepoint) throws TransactionException {
        ConnectionHolder conHolder = getConnectionHolderForSavepoint();
        try {
            conHolder.getConnection().rollback((Savepoint) savepoint);
        }
        catch (Throwable ex) {
            throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex);
        }
    }

2.回滚数据,还是的靠操作数据库,所以执行的是DataSourceTransactionManager.doRollback;

@Override
    protected void doRollback(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
            logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
        }
        try {
            con.rollback();
        }
        catch (SQLException ex) {
            throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
        }
    }

提交事务

AbstractPlatformTransactionManager.processCommit

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
            boolean beforeCompletionInvoked = false;
            try {
                prepareForCommit(status);
                triggerBeforeCommit(status);
                triggerBeforeCompletion(status);
                beforeCompletionInvoked = true;
                boolean globalRollbackOnly = false;
                if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                    globalRollbackOnly = status.isGlobalRollbackOnly();
                }
                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Releasing transaction savepoint");
                    } //如果存在保存点,则清除保存点信息
                    status.releaseHeldSavepoint();
                }
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction commit");
                    } //独立事务则直接提交
                    doCommit(status);
                }
                // Throw UnexpectedRollbackException if we have a global rollback-only
                // marker but still didn't get a corresponding exception from commit.
                if (globalRollbackOnly) {
                    throw new UnexpectedRollbackException(
                            "Transaction silently rolled back because it has been marked as rollback-only");
                }
            }
            catch (UnexpectedRollbackException ex) {
                // can only be caused by doCommit
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
                throw ex;
            }
            catch (TransactionException ex) {
                // can only be caused by doCommit
                if (isRollbackOnCommitFailure()) {
                    doRollbackOnCommitException(status, ex);
                }
                else {
                    triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                }
                throw ex;
            }
            catch (RuntimeException ex) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                } //提交过程中出现异常回滚
                doRollbackOnCommitException(status, ex);
                throw ex;
            }
            catch (Error err) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, err);
                throw err;
            }

            // Trigger afterCommit callbacks, with an exception thrown there
            // propagated to callers but the transaction still considered as committed.
            try {
                triggerAfterCommit(status);
            }
            finally {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
            }

        }
        finally {
            cleanupAfterCompletion(status);
        }
    }

同样的提交事务也得靠数据库来操作
DataSourceTransactionManager.doCommit

protected void doCommit(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
            logger.debug("Committing JDBC transaction on Connection [" + con + "]");
        }
        try {
            con.commit();
        }
        catch (SQLException ex) {
            throw new TransactionSystemException("Could not commit JDBC transaction", ex);
        }
    }

事务开启多线程后无效原因

上面源码分析了那么多,来聊聊工作中遇到的在事务中开启多线程后,事务无效的问题。
示例:

public static void main(String args[]){
    @Transactional
        public void serviceHI() {
            userInfoExtendService.serviceI();
            try {
                new Thread(new Runnable() {
                    public void run() {
                        userInfoExtendService.serviceH();
                    }
                }).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
            userInfoExtendService.serviceJ();
        }
}

执行serviceH的时候被放入多线程里面了额,那么serviceH里面是否抛异常都和外面的serviceI,serviceJ()无关了。
相当于serviceH和serviceI,serviceJ()不在一个事务里面了。加入serviceJ抛异常了,serviceI和serviceJ会被一起回滚,而serviceH则不会。

为什么呢?
看源码

AbstractPlatformTransactionManager.prepareSynchronization

/**
     * Initialize transaction synchronization as appropriate.,将当前事务记录到ThreadLocal中
     */
    protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
        if (status.isNewSynchronization()) {
            TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
                    definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
                            definition.getIsolationLevel() : null);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
            TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
            TransactionSynchronizationManager.initSynchronization();
        }
    }

来看看这个TransactionSynchronizationManager类里面定义的成员变量

public abstract class TransactionSynchronizationManager {

    private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class);

    private static final ThreadLocal<Map<Object, Object>> resources =
            new NamedThreadLocal<Map<Object, Object>>("Transactional resources");

    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
            new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");

    private static final ThreadLocal<String> currentTransactionName =
            new NamedThreadLocal<String>("Current transaction name");

    private static final ThreadLocal<Boolean> currentTransactionReadOnly =
            new NamedThreadLocal<Boolean>("Current transaction read-only status");

    private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
            new NamedThreadLocal<Integer>("Current transaction isolation level");

    private static final ThreadLocal<Boolean> actualTransactionActive =
            new NamedThreadLocal<Boolean>("Actual transaction active");
            .....
}

spring与数据库建立连接,每个线程都会有一个自己的连接,而这些连接的状态和连接属性,全部被存放到了ThreadLocal里面了,都知道ThreadLoacl的key是使用当前线程的值来作为key值的。

上面的代码中:main方法启动默认就是一个线程,serviceI,serviceJ都在这个主线程中,而现在searchH单独开启了一个线程,serviceH就在自线程里面了。
所以在开启事务,回滚事务时,serviceI,serviceJ在同一个连接里面,而serviceH在另外一个连接里面。

Spring-事务的源码分析(七)

事务说白了就是“绑架”一个数据库连接,更改数据库的自动提交功能,使用手动提交机制。spring把这一系列操作属性及数据库连接放到了ThreadLocal中,而Thread Local的key为“当前线程的值”,可以说一个线程一个连接一个事务,在一个事务中开启多线程后,多线程就不在当前事务中了,所以serviceH在serviceHI这个的事务中是无效的。

事务注意事项

  1. @Transactional 只能被应用到public方法上, 对于其它非public的方法,如果标记了@Transactional也不会报错,但方法没有事务功能.
  2. 用 spring 事务管理器,由spring来负责数据库的打开,提交,回滚.默认遇到运行期例外(throw new RuntimeException(“注释”);)会回滚,即遇到不受检查(unchecked)的例外时回滚;而遇到需要捕获的例外(throw new Exception(“注释”);)不会回滚,即遇到受检查的例外(就是非运行时抛出的异常,编译器会检查到的异常叫受检查例外或说受检查异常)时,需我们指定方式来让事务回滚 要想所有异常都回滚,要加上
@Transactional( rollbackFor={Exception.class,其它异常}) .如果让unchecked例外不回滚: @Transactional(notRollbackFor=RunTimeException.class)
如下:
@Transactional(rollbackFor=Exception.class) //指定回滚,遇到异常Exception时回滚
public void methodName() {
throw new Exception("注释");

}
@Transactional(noRollbackFor=Exception.class)//指定不回滚,遇到运行期例外(throw new RuntimeException("注释");)会回滚
public ItimDaoImpl getItemDaoImpl() {
throw new RuntimeException("注释");
}
  1. Spring团队的建议是你在具体的类(或类的方法)上使用 @Transactional 注解,而不要使用在类所要实现的任何接口上。你当然可以在接口上使用 @Transactional 注解,但是这将只能当你设置了基于接口的代理时它才生效。因为注解是 不能继承 的,这就意味着如果你正在使用基于类的代理时,那么事务的设置将不能被基于类的代理所识别,而且对象也将不会被事务代理所包装(将被确认为严重的)。因 此,请接受Spring团队的建议并且在具体的类上使用 @Transactional 注解。
  2. @Transactional 注解可以被应用于接口定义和接口方法、类定义和类的 public 方法上。然而,请注意仅仅 @Transactional 注解的出现不足于开启事务行为,它仅仅 是一种元数据,能够被可以识别 @Transactional 注解和上述的配置适当的具有事务行为的beans所使用。上面的例子中,其实正是 元素的出现 开启 了事务行为。

参考

《Spring源码深度解析》书